精华 nodejs-v0.10.0 初探
v0.10.0介绍 node-v0.10.0版本来了,对于性能官方肯定是说性能大大提高了 http://blog.nodejs.org/2013/03/11/node-v0-10-0-stable/ 大致翻译如下: 很高兴一个新的稳定版本的node发布了。 这个分支带来了很多显著的改进,主要是api的调整,让我们更加容易使用并且向后兼容。 在之前的帖子中,我们介绍了stream2的api调整了,如果你还没有阅读他,请尽快阅读他。(剩下的就是说之前stream的api不是很完善,现在他们下决心要让这个api更好)

更重要的是stream作为node的核心,改变之后接口变得更加易用了。所以强烈建议使用stream2的api,对于node 0.8,你可以安装 readable-stream 包来支持。

domain模块从实验级别提高到了不稳定级别(尼玛还是坑爹),使用了domain模块,我们不再依赖于 process.on(‘uncaughtException’) 这样的错误控制了,如果你还没有使用domain来做错误处理,那你就要仔细检查哪些中间件和异步回调了(感觉有点威胁啊~)。

在0.8以及之前, process.nextTick() 会在当前事件循环结束时调用,这样通常是会在I/O开始前被调用的。所以很多项目都会使用process.nextTick()让它晚点做,而在I/O之前,看上去这样是正确的。事实上在大负载的I/O情况下,nextTick可能工作不正常,出现线程竞争情况。所以在v0.10.0版本 process.nextTick() 会在js代码执行完成后调用,而不是写入事件循环,可以说变同步了。应该尽量避免使用 process.nextTick() 来做递归,如果非要这么做请使用 setImmediate 来代替。




API改变: https://github.com/joyent/node/wiki/Api-changes-between-v0.8-and-v0.10

大致意思是stream api的改变比较大, 1、增加了Readable, Writable, Duplex, and Transform的基类,我们可以直接从这些基类继承了 2、Readable streams 使用一个read方法,代替触发“data”的情况 3、增加一个"data"的监听器,或者调用“pause()” 和 “resume()” 会切换到旧stream模式 4、“data”的事件监听器永远不会错过第一个文件块,无论他们是否马上建立,pause 不再是咨询,而可以保证暂停 5、如果你不消耗数据,stream流永远是处于pause等待状态,而永远不会触发end事件 6、process.nextTick 会在当前事件结束调用,会在当前堆释放时执行,如果你打算递归的使用,请用setImmediate 代替 7、url.parse 将返回更多的信息如下:

// v0.8
> url.parse('http://foo')
{ protocol: 'http:',
  slashes: true,
  host: 'foo',
  hostname: 'foo',
  href: 'http://foo/',
  pathname: '/',
  path: '/' }

// 0.10
> url.parse('http://foo')
{ protocol: 'http:',
  slashes: true,
  auth: null,
  host: 'foo',
  port: null,
  hostname: 'foo',
  hash: null,
  search: null,
  query: null,
  pathname: '/',
  path: '/',
  href: 'http://foo/' }

8、domain模块对错误对象增加一个属性camelCase 代替snake_case 9、path.resolve 和 path.join 将会抛出异常,当传递的参数为空字符串时 10、dgram.Socket #bind() 会是一个异步方法,请在第二个参数增加回调函数 11、EventEmitter 基类的继承请使用新方法,以下方法将不会被支持:

function Child() {}
Child.prototype = new Parent(); // <-- NEVER EVER DO THIS!!


// Correct-Style Inheritance
function Child() {}
Child.prototype = Object.create(Parent.prototype, {
  constructor: {
    value: Child,
    enumerable: false,
    writable: true,
    configurable: true
// "Gee that's a lot of lines! I wish there was a helper method!"
// There is.  Do this:
util.inherits(Child, Parent);

12、增加的一些api 12.1、stream增加 Readable, Writable, Duplex, and Transform 基类 12.2、crypto 的api 有stream接口的支持 12.3、process增加getgroups(), setgroups(), initgroups() 12.4、crypto增加getHashes() getCiphers() 12.5、http模块增加 response.headersSent 属性 12.6、增加‘removeListener’这个事件的触发,可以监听这个事件 12.6、增加setImmediate() 和 clearImmediate() 函数 12.7、字符串解码器增加 decoder.end() 函数

stream2: 最后我们来看一下stream2的一些api和新用法,首先我们要升级node到v0.10.0

# node -v

stream是一个抽象类,它类继承自EventEmitter,例如http服务器就是一个stream,它是可读可写的,在早期的node版本中,stream类的接口是简单的,但是不够强大,可用性也不好。 1、不是当你调用read()函数,数据data将会马上接受,如果你想要做一些I/O来决定如何处理这些数据,你不得不建立一个buffer来存储那些数据。 2、pause()方法只是咨询,不能保证,可能当你调用pause()方法后,你还在接受data。

很多项目都无须调用data事件来监听,也不调用pause()和resume()方法。 我们看如下代码:

net.createServer(function(socket) {

  // we add an 'end' method, but never consume the data
  socket.on('end', function() {
    // It will never get here.
    socket.end('I got your message (but didnt read it)\n');




// Workaround
net.createServer(function(socket) {

  socket.on('end', function() {
    socket.end('I got your message (but didnt read it)\n');

  // start the flow of data, discarding it.


创建一个只读stream类实例: new stream.Readable([options]) options可配置 1、highWaterMark {Number} ,这个数字表示read stream 缓冲区,默认16kb 2、encoding {String} 表示buffer的类型,默认为null,可以传入字符串的格式,比如utf-8 3、objectMode {Boolean} 表示是返回一个buffer对象还是值返回这个buffer对象的size n

readable._read(size) 这个方法不应该被直接调用,应该被内部的readable class 调用。所有readable的stream都必须提供一个_read的方法从数据源来获取数据。 size参数是咨询的,可能不是很正确,当使用tcp或者tls时会忽略这个参数。所以没有必要等设定的size的数据都到了,才去调用stream.push(chunk)方法

readable.push(chunk) chunk {Buffer | null | String} 将块放入队列 return {Boolean} 如果为false则表示没有数据进行push 这个方法不是给readable消费者调用,而是给数据发送者调用,将数据push进队列里,push后,_read()方法就会从队列里读取数据了。 push方法明确的对readable队列内插入数据,当插入null时,则会发出数据发送完毕的信号 看一下简单的代码:

// source is an object with readStop() and readStart() methods,
// and an `ondata` member that gets called when it has data, and
// an `onend` member that gets called when the data is over.

var stream = new Readable(); //实例化

source.ondata = function(chunk) {
  // if push() returns false, then we need to stop reading from source
  if (!stream.push(chunk))

source.onend = function() {

// _read will be called when the stream wants to pull more data in
// the advisory size argument is ignored in this case.
stream._read = function(n) {

readable.unshift(chunk) chunk {Buffer | null | String} 将块从队列头部push return {Boolean} 如果为false则表示没有数据进行unshift 看实例代码:

// A parser for a simple data protocol.
// The "header" is a JSON object, followed by 2 \n characters, and
// then a message body.
// Note: This can be done more simply as a Transform stream.  See below.

function SimpleProtocol(source, options) { //定义一个SimpleProtocol 类
  if (!(this instanceof SimpleProtocol))
    return new SimpleProtocol(options);

  Readable.call(this, options);//执行readable构造函数
  this._inBody = false; //_inbody表示是否开始接受body
  this._sawFirstCr = false; //表示是否看见第一个换行

  // source is a readable stream, such as a socket or file
  this._source = source; //将数据源赋值到this._source,source是一个readable stream

  var self = this;
  source.on('end', function() { //当数据源 end 事件触发,则 SimpleProtocol 实例push一个null,表示没有数据push到队列中

  // give it a kick whenever the source is readable
  // read(0) will not consume any bytes
  source.on('readable', function() { //read(0) 表示不消费任何字节

  this._rawHeader = []; //存放待格式化的头部的数组
  this.header = null; 

SimpleProtocol.prototype = Object.create( //SimpleProtocol继承readable类
  Readable.prototype, { constructor: { value: SimpleProtocol }});

SimpleProtocol.prototype._read = function(n) {
  if (!this._inBody) { //如果还没接受body,表示接受head
    var chunk = this._source.read(); //从source中读取chunck

    // if the source doesn't have data, we don't have data yet.
    if (chunk === null)
      return this.push('');

    // check if the chunk has a \n\n 
    var split = -1;
    for (var i = 0; i < chunk.length; i++) { //开始逐个解析chunk中的字符串,查看是否包含2个连续的/n
      if (chunk[i] === 10) { // '\n'  //如果发现是/n
        if (this._sawFirstCr) { //如果上一个也是 /n 则这边为true
          split = i; //找到了连续了/n 表示头部结束了,跳出循环,记录位置
        } else { //如果上一个字符不是 /n ,则把 _sawFirstCr 设为true
          this._sawFirstCr = true;
      } else { //如果本字符不是 /n 则 _sawFirstCr 设置为false
        this._sawFirstCr = false;

    if (split === -1) { //如果没有找到2个/n则继续等待,push空字符串,将chunk暂时保存
      // still waiting for the \n\n
      // stash the chunk, and try again.
    } else {  //如果找到2个/n了,则表示以后就开始接受body
      this._inBody = true; //将inbody设置为true,表示今后开始接受body
      var h = chunk.slice(0, split); //将此次的chunk切分,头部放入待格式化数组
      var header = Buffer.concat(this._rawHeader).toString(); //然后将待格式化数组中的内容组合
      try {
        this.header = JSON.parse(header); //转化js对象
      } catch (er) {
        this.emit('error', new Error('invalid simple protocol data'));
      // now, because we got some extra data, unshift the rest
      // back into the read queue so that our consumer will see it.
      var b = chunk.slice(split); 

      // and let them know that we are done parsing the header.
      this.emit('header', this.header);  //触发header事件,并且把this.header作为参数传递过去
  } else { //表示开始接受body
    // from there on, just provide the data to our consumer.
    // careful not to push(null), since that would indicate EOF.
    var chunk = this._source.read();
    if (chunk) this.push(chunk);

// Usage:
var parser = new SimpleProtocol(source);
// Now parser is a readable stream that will emit 'header'
// with the parsed header data.
//这样parser就可以解析source的数据了,当header解析完毕,就会触发 header 事件

readable.wrap(stream) 主要是用来向前兼容的,如果你使用了旧的node库,还使用data事件触发和pause(),则可以使用wrap来创建一个readable stream,这个stream是用旧的的数据源,代码如下:

var OldReader = require('./old-api-module.js').OldReader;
var oreader = new OldReader;
var Readable = require('stream').Readable;
var myReader = new Readable().wrap(oreader);

myReader.on('readable', function() {
  myReader.read(); // etc.

Event: 'readable’ 当数据准备好被消费时,这个事件就会被触发,当这个事件触发,调用read()方法去消费数据

Event: 'end’ 当stream接受到eof关闭,则会触发此事件,指示没有数据会再发送,如果这个stream是可写的,则可能它还在被写入

Event: 'error’ 触发error事件

Event: 'close’ 当数据源关闭触发,并不是所有的数据源都会触发

readable.setEncoding(encoding) 设置data事件触发的参数是什么格式的,比如buffer或者utf-8

readable.read([size]) size {Number | null} 定义读取多少数据 Return: {Buffer | String | null} 返回buffer或者string 注意:这个方法是被stream消费者调用的 size的单位是bytes,如果不设置,则会返回内部buffer的整个内容。如果没有数据或者小于size的值,则null会返回,将来当返回更多了时候,会触发readable事件来让你消费数据 调用read(0)总是会返回null,并且会触发一次内部buffer的刷新,但是除此之外没有其他操作

readable.pipe(destination, [options]) destination {Writable Stream} 目标,一个可写入的stream options {Object} Optional 设定参数 end {Boolean} Default=true 或者是一个布尔值表示是否关闭可写的stream,默认true的话会触发end事件,关闭stream 注意 process.stderr 和 process.stdout 除非进程退出,否则不会被关闭

readable.unpipe([destination]) destination {Writable Stream} 可选参数 撤销一个先前建立的pipe,如果参数destination 没有提供,则先前所有建立的pipe都被移除

readable.pause() 切换readable stream为旧模式,当data事件触发,则使用一个data事件监听,而不是通过read()方法消费buffer缓存的数据。 停止数据流,当stream处于paused状态时,没有data事件会被触发

readable.resume() 在执行pause()方法之后,回复接受发送来的数据流

Class: stream.Writable 一个可写的流,拥有如下的方法,成员和事件 注意:可写流是一个抽象类, _write(chunk, encoding, cb) 设计是用来扩展底层实现的

new stream.Writable([options]) options {Object} highWaterMark {Number} 缓冲区,当写入开始后返回false decodeStrings {Boolean} 是否将string改写为buffer写入,默认是 如果要扩展writable 类,确认需要正确调用它的构造函数

writable._write(chunk, encoding, callback) chunk {Buffer | String} 写入的chunk一直是buffer,除非在实例化时显示声明 decodeStrings 为 false encoding {String} 如果chunk是字符串,则设置他的编码,如果chunk是buffer则会忽略此属性,注意chunk一直是buffer,除非显示的将decodeStrings 设置为false callback {Function} 此函数拥有一个可选的error参数,当处理完成提供的chunk会调用。


writable.write(chunk, [encoding], [callback]) chunk {Buffer | String} 写入的数据 encoding {String} Optional. 如果写入的chunk是string,则encode默认为utf-8 callback {Function} Optional. 当chunk成功写入后调用 Returns {Boolean} 返回布尔值

将chunk写入stream,返回true表示数据已经写入底层,放回false表明buffer已经满了,数据将会在将来发送过去,drain事件会在buffer空了时候触发。什么时候write返回false,这取决于 highWaterMark 的设置

writable.end([chunk], [encoding], [callback]) chunk {Buffer | String} Optional final data to be written encoding {String} Optional. If chunk is a string, then encoding defaults to 'utf8’ callback {Function} Optional. Called when the final chunk is successfully written. 调用这个方法表示最后的data写入了stream

Event: 'drain’ 当stream的可写缓冲区为空时触发,当stream.write()返回false时监听它

下面的比较简单不翻译了 Event: 'close’ Emitted when the underlying resource (for example, the backing file descriptor) has been closed. Not all streams will emit this.

Event: 'finish’ When end() is called and there are no more chunks to write, this event is emitted.

Event: 'pipe’ source {Readable Stream} Emitted when the stream is passed to a readable stream’s pipe method.

Event 'unpipe’ source {Readable Stream} Emitted when a previously established pipe() is removed using the source Readable stream’s unpipe() method.

Class: stream.Duplex duplex stream是一个可读可写流,类似TCP socket链接


new stream.Duplex(options) options {Object} 同上 allowHalfOpen {Boolean} Default=true. 如果设置为false,则stream会自动将readable stream关闭,当可写流关闭,反之亦然

Class: stream.Transform transform流是一个duplex流,它对于input和output存在因果关系,比如zlibstream或者一个crypto。

对于传入的数据和传出的数据时不同大小的,不同的数据块大小和不同的到达时间。例如:一个hash 流仅会output一个chunk当input输入完毕。一个zlib流可能会将输入流处理为更大或更小

不用提供 _read() 和 _write() ,Transform 类必须提供 _transform() 方法,并且可能有一个可设定的提供_flush() 方法

new strea.Transform([options]) options {Object} 同上

transform._transform(chunk, encoding, callback) chunk {Buffer | String} 写入的chunk一直是buffer,除非在实例化时显示声明 decodeStrings 为 false encoding {String}如果chunk是字符串,则设置他的编码,如果chunk是buffer则会忽略此属性,注意chunk一直是buffer,除非显示的将decodeStrings 设置为false 此函数拥有一个可选的error参数,当处理完成提供的chunk会调用。

所有的Transform流实现必须提供一个_transform 方法来接受输入和输出

_transform 必须做任何此类特殊Transform 类的事情,处理字节的写入,然后将他们发送到可读部分的接口。做一些异步I/O,处理事情等等。 调用 transform.push(outputChunk) 0次或多次,用来生成从输入到输出chunk块,次数是根据你想要对这个chunk输出多少块。 仅会在当前chunk已经完全被消费时才会调用callback函数。注意,可能有些特殊的输入input块没有output结果。

transform._flush(callback) callback {Function} 这个回调函数会有一个可选的error参数,当你执行完flushing任何其余的数据

在某些情况下,你的 transform 操作可能需要在流结束时,触发一个大一点的数据。比如zlib的压缩流会保存一些内部状态,所以它能达到最佳压缩状态,尽管到了最后,它还是需要一些数据,这样数据就会完整了。 在这情况,你需要执行 _flush 方法,它会被调用在非常后面,在所有数据都被消费掉,但是还没有出发end的信号之前。回调会在flush操作完成之后执行。

Example: SimpleProtocol parser 上面那个 simple protocol parser 可以被高级的 Transform stream class 更简单的实现出来。 在这个示例中,不用提供input作为参数,他将会通过pipe管道来解析,这将会更加通用的node 流方法。(代码不贴了,和上面一样)

Class: stream.PassThrough 这是一个对transform的简单实现,他就是简单的将input转为output输出,它的目的是用来做示例和测试的,但是也有偶尔的情况它可以派上用场。

实际使用 stream2类介绍完毕了,我们看一下新的stream2类如何在实际使用,http发送请求,获取yahoo网站

var http = require("http")
var options = {
  hostname: 'www.yahoo.com',
  port: 80,
  path: '/',
  method: 'GET'
var req = http.request(options, function(res) {
  //console.log('STATUS: ' + res.statusCode);
  //console.log('HEADERS: ' + JSON.stringify(res.headers));
  res.on('readable', function () {
req.on('error', function(e) {
  console.log('problem with request: ' + e.message);
// write data to request body


13 回复

buffer 要大改,现在几乎不敢使用,性能太差了

令人兴奋的还有 “After 0.12, the next major stable release will be 1.0. At that point, very little will change in terms of the day-to-day operation of the project, but it will mark a significant milestone in terms of our stability and willingness to add new features…”

0.12 。。。。 亮了







stream接口改动后,writable stream不再抛出error事件,on(‘error’,function(err){…})被废了,domain再坑爹也不得不用

@snoopy 只用封装了BUFFER的API,反正buffer类型不能出现在代码里即可…

缓存中的数据如果遇到‘/’会自动的结尾-.- 会提示Unexpected end of input

这尼玛 更新速度也太快了。。顶不住了 回去写python了 ;(

