一个任务队列的module
发布于 10 年前 作者 XadillaX 6805 次浏览 最后一次编辑是 8 年前

事情很曲折,我某天在萌否收音机里面听到了一首歌,很好听,叫 hypnotized,于是红心了。

过了几天我再去听——发现这首歌变了。

最后经过多方面求证,我大概得出结果就是应该有人传错了歌,然后后来有人重新传了一遍,导致我听的不是原来那首歌了。那我那天听的那首歌到底叫什么名字呢?

然后大致看了一下,虽然歌被重新传了,但是这里显示的这首歌的时间没变!还是11分钟,目测是数据库没更新。

于是我就想了个笨办法,去爬收音机里面所有 tag 为 東方project 的专辑,然后跑到专辑页看歌曲的长度。

问题来了,如果我直接爬,然后爬完 callback 之后又直接爬,没有任何间隔,就相当于我在 DDOS 它的站子。或者即使没那么严重——反正最后到一定程度并发太大我就访问不了了。

于是我就想到了做一个任务队列的 module。该 module 的作用就是把一堆任务扔到队列中,完成一个才开始下一个。

然后如果同时执行一个也太慢,module 还允许你开多几个子队列同时执行。

模块的 repo 在 GitHub 上面。名字叫 Scarlet Task 的原因一是我本身就喜欢二小姐,二是为了纪念这次事件我是为了找有关二小姐的歌。

要安装也很简单:

$ npm install scarlet-task

然后 repo 的 README.md 里面有使用方法的——大致就是实例化一个对象,然后定义好某个任务的任务标识(可以是字符串,可以是 json 对象,可以是任何类型的数据),然后再定义好处理这个任务的函数,将这个数据推倒队列中即可。然后在处理函数中任务处理完的时候执行以下任务完成的函数即可。

12 回复

async 不是有类似功能吗?

楼主,我看了代码,如果任务队列里没任务了,流程是不是就结束了?async的queue好像是轮询,队列空了也不会停,不过它是并发的太耗资源,我的建议是在push的时候,判断一下流程有没有结束,如果结束就重新启动流程,不用手动执行taskQueue.taskDone(taskObject, true);

新加任务的时候有一个判断的——如果队列停了,那么新加任务的时候会自动开始;如果队列没停,那么只是把任务新加进去。

所以没任务是会结束流程的,但是一旦有新任务进来会重新开始。

async 整个库比我的大多了,而且基本上的项目都会在用。我这个只是一个小小的 module,所谓重复造轮子。

而且有时候并不需要 async 里面的 queue 那么多样化的定制,只需要一个简简单单的任务队列就够了。所以有时候重复造轮子也是有其意义在的。

你这不如用pythone ruby, 用node纯粹是浪费javascript这门语言。

先找出所有专辑的数量, 然后设定一个请求数量比如10个, 同时异步请求10个, 10个全部完成之后,启动下面的10个。

@XadillaX 了解,其实你可以跟async一样用一个队列,一个计数器,限制执行的任务个数,毕竟任务都是异步执行的,不必要多个队列,然后每个队列每次只执行一个任务

你这是同步编程的思维,你无法估计收集所有专辑要多少时间,还不是要一个线程收集链接,一个线程发请求,node用流程控制完全可以让这个在一个进程里同时处理

@dlutwuwei 这你就错了. 在编程领域,每一样东西都是可以统计的。

你的流程控制就是阻塞的线程, 每个时间段都只有1条线在运作, 完全是在浪费node多后台并行的功能。 每向操作系统提交一个业务,回调函数都可以很好的检测到整个流水线的结果。

如果有100条业务同时交给操作系统, node只需要统计和认证每个业务完成的回调, 即可完美的组织整个过程。 1个node线程就足以做这件事了,那还需要什么链接、请求?

@tulayang 我认同你的观点,node的特点在于io异步回调,一个node进程完全可以处理n多个回调,io一直都是并发的,回调不是并发,而node进程任务队列的大小有限制,需要控制住回调进入任务队列的数量。lz的队列确实是一个只进行一次io,但是完全可以进行io并发,async的queue就是这样设计的。

@tulayang 你要知道,如果爬虫真这样无限制地不阻塞并发,就相当于去DDOS对方的服务器了。我一开始就是这么做的,爬到地址就立刻马上处理——导致后来服务器返回各种错误。

而且我这里也是可以开多个子队列的,比如你开了1000,其实就相当于可以1000个一起发动了,而我对于子队列,理论上是没做数量限制的。

@tulayang 而且queue也并不是说所有子queue结束了才开始下一轮,而是一个子queue完成了这个子queue就马上开始下一个任务。

回到顶部