精华 一个基于Promise的nodejs队列控制类
发布于 10 年前 作者 cnwhy 12797 次浏览 最后一次编辑是 8 年前 来自 分享

场景一:
现在的web应用可能都是一个这样的结构:

http服务(node) > 接口(业务逻辑) > 数据库

很多时候,瓶颈一般出现在业务层,或者数据层。更多的可能是某一个业务的处理,拉下整个系统的性能。
当用户或一些不怀好意的人,故意大量调用这些处理逻辑,好吧,你nodejs是非阻塞的,这一大波处理请求就一窝蜂冲到到业务层,很可能导致整个系统性能下降,或者瘫痪。
如果这个时候,node层就把这些耗资源的请求,排好队,控制好并发,甚至分用户排成队,配置好不同身份的用户并发量,不是很爽。
场景二:
应用中,可能会有一些无需即时处理的同一类业务,处理前都需要收集一次资源,处理业务,处理后再清理一次。高消耗工作主要在收集或是清理上。 如果我们将要处理的业务暂存到队列中,当队列数量到达一个值或是某个时间点时,我们一次性处理完队列中的任务。在消耗上,只做一次“收集”、“清理”的操作。

我的想法是,并不需要把整个业务的后续处理全都放到队列中去,而只是将高消耗的那一部分放入队列,利用Promise的异部处理机制来处理后续的操作。 在编写代码的时候你几乎可以忘记队列的存在,但是他就在那里默默的工作着,代码可读性和灵活性没有丝毫影响。

项目地址:
https://github.com/cnwhy/queue-fun
安装

npm install quque-fun
7 回复

v0.2.1 发布了

  • 这次修复大量bug,应该可以用到正式环境上了 欢迎大家 issues
  • 队列增加批量添加方法
  • 新增浏览器端支持文件(queue-fun.min.js)

还是贴个demo吧

var queuefun = require('queue-fun');
var Queue = queuefun.Queue(); //初始化Promise异步队列类
var q = queuefun.Q;  //配合使用的Promise流程控制类,也可以使用q.js代替

//实列化一个最大并发为1的队列
var queue1 = new Queue(1); 

//定义一个Promise风格的异步方法
function testfun(i){
	var deferred = q.defer();
	setTimeout(function(){
		deferred.resolve(i)
	},300)
	return deferred.promise;
}
var log = function(a){ console.log(a); }
//向队列添加运行单元
queue1.push(testfun,[1]).then(console.log); 
//插入普通方法会按Promises/A+规则反回promise
queue1.push(function(){return 2;}).then(console.log);
//插入优先执行项 (后进先出)
queue1.unshift(testfun,[0]).then(console.log);
//批量插入多个运行项 array
queue1.allArray([3,4],testfun,{'event_succ':log}).then(console.log) 
//批量插入多个运行项 map 
queue1.allMap({'a':5,'b':6,'c':7},testfun,{'event_succ':log}).then(console.log)
//执行队列
queue1.start();

/*
0
1
2
3
4
[ 3, 4 ]
5
6
7
{ a: 5, b: 6, c: 7 }
*/

我之前做过一个类似的q-parallel,用来做爬虫,在一个队列任务里面爬另外一个网站的数据,可以控制同时并发的任务数,我的需求比较简单,后来发现原来tj大神也写过一个类似的库 co-parallel,他用了es6的Generator 大神写得简单,就这么几行

	  var thread = require('co-thread');
	  module.exports = function *parallel(thunks, n){
		  var n = Math.min(n || 5, thunks.length);
		  var ret = [];
		  var index = 0;

		function *next() {
			var i = index++;
			ret[i] = yield thunks[i];
			if (index < thunks.length) yield next;
		}

		yield thread(next, n);

		return ret;
  };

bluebird 中的 Promise.map async.parallelLimit 都是干这个的

@magicdawn 这个队列是持续存在的,可以随时加入队列。为每一个运行项定义后续。 而且,这个队列可以定义更多的功能,比如超时,出错重试等。

@hcnode 我写这个的宗旨在于创建一个持续存在的公用队列,想用的时候随时用,不影响后续操作。不只是控制并发。

就拿爬虫打个比方: 采集整个站点,不可能把URL先猜好,放那里吧,只能是这样。

  1. 先爬一个入口页
  2. 存储并分析URL拿到后续需要爬取的URL(去外网链接,去重等)
  3. 再去分别爬取分析后的URL
  4. 回到第2步。

这样设计刚开始没问题,但只要运行不到10秒,系统就受不了。平均每个页面算10个有效链接,爬3层深10x10x10,可能系统就会同时request上千个页面,不是被禁就是自己假死/溢出,显然不合理。

这里就很有必要引入一个队列:

  1. 定义一个并发为10的队列
  2. 先爬一个入口页
  3. 存储并分析URL拿到后续需要爬取的URL(去外网链接,去重等)
  4. 再分别爬取分析后的URL的request(url)放入队列中。

简化的代码如下

var queuefun = require('queue-fun');
var Queue = queuefun.Queue(); //初始化Promise异步队列类
var q = queuefun.Q;  //配合使用的Promise流程控制类,也可以使用q.js代替
var queue1 = new queue(10);
function getbody(url){
   var deferred = q.defer();
   request(url,function(err,res){
      if(err) return deferred.reject(err)
      deferred.resolve(res.body);
    })
    return deferred.promise;
}
//存储并分析URL拿到后续需要爬取的URL(去外网链接,去重等)
function saveAndGeturls(body){
  //...
  return arr;
}
//要爬取的URL都插入队列中
function toQueue(arr){
  arr.forEach(function(v,i){
    queue1.go(getbody,[v])
    .then(function(body){
      var arr = saveAndGeturls(body)
      toQueue(arr);
    })
  })
}
toQueue(['http://baidu.com']) //从入口页开始

@cnwhy 是的,因为我并没有做队列的维护和管理,因为我的需求比较简单,呵呵,我觉得你这里也可以再优化一下,把执行的任务,将promise封装一下,调用者就只管defer的action:

queue1.go(function(url, defer){
	request(url,function(err,res){
	  if(err) return deferred.reject(err)
	  deferred.resolve(res.body);
	})
},[v])

@hcnode 其实现阶段所有的API 都是可以商量的,毕竟我一个人考虑到的有限,欢迎到issues讨论。
顺便说一句,如果API有变动,我会改变中的的版本号,当然我我会尽量做好兼容

回到顶部