从异步队列到中间件实现 —— (1)异步队列 asynclist
发布于 13 年前 作者 willwen 7794 次浏览 最后一次编辑是 8 年前

之前给webjs的Router做了一个还算给力的东西:asynclist

基于小田的eventproxy,是用于并行处理一串的任务,比如说中间件。

因为是基于eventproxy,小问也把asynclist的使用方法做得和eventproxy相似,以便大家使用。

我们来看一个例子

//同步串行队列
var fs = require('fs');
function readBigFile () {
    for (var i = 0;i < 100;i++) fs.readFileSync('test.zip'); //3.2M
}
for (var i = 0;i < 10;i++) readBigFile();
console.log('Finished.');


//异步并行队列
var asynclist = require('asynclist');
var fs = require('fs');
function readBigFile () {
    for (var i = 0;i < 100;i++) fs.readFileSync('test.zip'); //3.2M
    tasks.trigger(true);
}

var tasksList = [];
for (var i = 0;i < 10;i++) tasksList.push(readBigFile);

var tasks = new asynclist(tasksList);
tasks.assign(function () {
    console.log('Finished.');
});
console.log('Start');
tasks.run();

其中test.zip的大小为3.2M

相信以上两端代码的差别的效果大家不得而知。

如果大家还不知道asynclist究竟有什么实用性,那么小问再说一个应用场景:

我们要设计一个RSS Reader,于是我们就要把一系列的RSS Url存储起来,然后定时更新。

于是当我们需要对已记录的RSS Url进行更新的时候,我们要怎么做呢?

假设以下为需要请求的Urls

var rssUrls = [
  'http://www.a.com/rss',
  'http://www.b.com/feed',
  'http://www.c.com/rss',
  'http://www.d.com/feed',
  'http://www.e.com/rss',
  'http://www.f.com/feed'
]

Q:Node.js的http.get不就是异步的么? A:是的,我们需要它。

Q:eventproxy不就是已经可以在所有任务完成以后相应了么? A:是的,但是eventproxy是需要以下做法

var eventproxy = require('eventproxy').EventProxy;
var BufferHelper = require('bufferhelper');
var http = require('http');
var url = require('url');

var tasks = new eventproxy();
//生成任务队列
var taskList = rssUrl.map(function (url) {
    return function () {
	//通过BufferHelper对数据进行拼接
	var bufferHelper = new BufferHelper();
	http.get(url.parse(url), function (res) {
	    res.on('data', function (chunk) {
	        bufferHelper.concat(chunk);
	    });
	    res.on('end', function () {
	        var data = bufferHelper.toBuffer().toString();
	        //完成获取,向eventproxy汇报数据
	        tasks.trigger('success', data);
	    });
	});
    }
});
tasks.after('success', rssUrls.length, function (rows) {
    //当全部完成以后输出数据
    console.log(rows);
});
taskList.forEach(function (task) {
    task();
});

Q:哇。。小问写的好详细啊! A:啊哈哈哈,那是当然。。这就是asynclist的实现方式嘛,亲。。==

好吧,上面是一个小DEMO,但是大家要知道,不是所有的的东西都会异步执行回调函数,很有可能会直接执行,这样是没有办法达到并行处理的。

那小问是如何做到并行的呢?

其实嘛,有细心阅读我们社区文章和官方API文档的童鞋,相比已经知道了。对的,就是Node.js中对epoll的封装——Ticks机制。

在上面的代码中,你之需要改动一行就可以了。

......
taskList.forEach(function (task) {
    //task();
    process.nextTick(task);
});
......

当然,上面这些是否觉得不雅观?来看看asynclist吧。

asynclist默认需要传入一个包含Function的Array

var asynclist = require('asynclist');

var tasksList = rssUrl.map(function (url) {
    return function () {
	var bufferHelper = new BufferHelper();
	http.get(url.parse(url), function (res) {
	    res.on('data', function (chunk) {
	        bufferHelper.concat(chunk);
	    });
	    res.on('end', function () {
	        var data = bufferHelper.toBuffer().toString();
	        tasks.trigger('success', data);
	    });
	});
    }
});
var tasks = new asynclist(list);
tasks.assign(function (rows) {
    console.log(rows);
});
tasks.run();

还觉得上面的那个map很不好看?asynclist也封装好了。

var asynclist = require('asynclist');

var getter = asynclist.compile(function (url) {
    return function () {
	var bufferHelper = new BufferHelper();
	http.get(url.parse(url), function (res) {
	    res.on('data', function (chunk) {
	        bufferHelper.concat(chunk);
	    });
	    res.on('end', function () {
	        var data = bufferHelper.toBuffer().toString();
	        tasks.trigger('success', data);
	    });
	});
    }
});

var tasks = new getter(rssUrls);
tasks.assign(function (rows) {
    console.log(rows);
});
tasks.run();

以上代码中getter是新生成的一个lister,你之需要把这个getter到处”扔“就可以了。

如果还不满足,想要给tasks传入参数?可以。。 只要把你需要传入的参数传入run()方法就可以了,不过asynclist最多支持传入三个参数。 PS:当然这个只是半行代码的事情。。

OK,今天先到这里,明天继续第二章,webjs的中间件实现。(想必已经有人猜到了吧。。)

4 回复

不错,小问很勤力啊!

第一段个例子有很大问题,你用readFileSync同步方法,10个并行读取没有效果的,等于10个串行读取。读一个文件要1秒的话,10个文件总归还是10秒。

oh yeah,老赵也来了

感觉可以改成10个下载文件. 10个同时下载和10个依次下载的区别. 这个由于网速的问题,差别应该就比较大了.

回到顶部