seq-queue:在node.js中如何保持请求处理顺序的一些想法
发布于 12 年前 作者 changchang 14324 次浏览 最后一次编辑是 8 年前

前段时间项目需要,写了这个小模块https://github.com/changchang/seq-queue,现在拿出来分享一下~

seq-queue主要是满足一些对请求处理顺序有要求的场景。一般在http处理中,对用户连续的请求可以进行并行处理,无须关心请求的处理顺序。但在另外一些场景下则不然。比如:一个游戏服务器处理,对于玩家的一系列操作,如:combo连招,则希望有严格的执行顺序。又如,数据库服务器处理客户端的prepare,execute,close等这些请求,也需要保证请求的执行顺序。seq-queue则是结合了node.js异步回调的环境设计的一个保持请求顺序化处理的模块。

###简单的例子 seq-queue的结构非常简单,可以看作是一个FIFO队列,里面的任务只有在它之前的所有任务都被执行完毕后才会被执行。使用也很简单,例子如下:

var SeqQueue = require('seq-queue');

var queue = SeqQueue.createQueue(1000);

queue.push(function(task) {
	setTimeout(function() {
		console.log('hello');
		task.done();
	}, 500);
});

queue.push(function(task) {
	console.log('world');
	task.done();
});

在服务端,只要为每个client session创建一个seq-queue实例,将需要顺序执行的请求放入其中即可。

###seq-queue的状态机
seq-queue实例的状态机可以简单用下面的图来描述:
seq-queue fsm

queue实例创建后默认是idle状态。

通过push方法往queue中添加任务后,变为busy状态。当前任务执行完后会调next方法执行下一个任务,如果无任务可执行则回到idle状态。

可以通过close(false)方法优雅关闭,进入closed状态。closed状态下不会接收新的任务,但会持续执行完队列中剩余的任务。所有剩余任务完成后,进入drained状态,queue实例生命周期结束。

也可以通过close(true)方法强制关闭,直接进入drained状态。

###纠结点
seq-queue的设计中,主要有下面两个地方比较纠结的。

第一点是,queue中的任务被封装成function的形式,带一个参数task。当任务结束的时候,需要显式地调一下task.done()来通知queue当前任务执行完毕。这个主要是由node.js异步回调的风格所决定的。在node.js中,一个任务的function返回了,并不代表一个任务处理的所有流程已结束,后面可能还有一大堆的回调在等着执行。seq-queue中则主要是借鉴了node-unit里的风格,传递一个task参数,提醒使用者记得执行完毕后调用一下task.done()来结束处理。

第二点就是超时的机制。因为seq-queue是顺序处理的,如果用户忘了调task.done()或是在某个callback中抛了个uncaught exception无疾而终了,queue则会因此而被堵死。为了避免这种情况,queue中的每个任务都设置了一个超时时间,如果超时了会忽略掉当前任务而执行下一个任务。这是异步环境下,异常状态处理的一个权宜之计吧。queue默认的全局超时时间是3s,可以通过createQueue方法指定当前实例的全局超时时间,也可以在push方法中为每个任务设置任务相关的超时时间。超时的任务中再调task.done()不会影响queue的调度。

###小结
seq-queue写的比较仓促,也没来得及做充分的调查看是否已经有提供类似功能的模块,功能方面也主要是往项目的需求上面靠。大家之前有什么类似的工具,或是有什么好想法,再或者是觉得有什么地方不爽的,都来吐槽一下吧~ :)

之前拜读了@Python发烧友 在NodeParty深圳站上推荐的mocha + should来进行BDD,也借此机会实践了一把,觉得还是挺不错的~

9 回复

太久没上了,吐槽一下,md的语法跟github上的不大一样,参考了一下http://cnodejs.org/topic/4efc372a6d88ae8a6f000019 才把代码样式弄好@@

不错啊,就是如果大量的请求过来,是不是队列会成为瓶颈呢?

有没考虑多个worker进程的问题呐?

一般会为每一个客户端的连接创建一个队列,如果一个客户端瞬间发送大量的串行处理请求的话,确实有可能会导致这个客户端得不到及时的响应,尤其是当中有比较耗时的请求,但不会影响其他客户端的处理。一般这种有串行化需求的请求应当尽可能的简单,快速返回,否则后面的请求可能会受到影响。

我使用的场景是客户端与服务器端之间是一个固定的连接(socket或websocket),也就是说在一个会话期间,客户端是与某个worker绑定在一起的,多个worker没有影响。如果是http环境的话,可以考虑采用session sticky,把请求定向到同一个worker上。不过在http中可能串行化处理的需求可能不多:)

顺序执行确实是一个很常见的需求,和@leizongmin 有同样的疑问,多进程的时候如何保证?

最开始是有考虑采用async的series或queue的,但都不大符合需求。async.series的串行操作是一开始就确定了的,封装到数组中做为参数进行调度,我需要的是能随着请求的到达,动态的把任务添加到队列中。async.queue concurrency=1时比较符合初始的需求,但后来要添加超时的机制,发现还是得在外面包装一层,最后就干脆自己实现一个queue了@@

如果请求是随机分配给worker进程的话确实有问题,主要的思路就是把同一个会话的请求都定向到同一个worker上面去,像session sticky之类的@@

回到顶部