stream.Writable()探究
发布于 6 年前 作者 youth7 4213 次浏览 来自 分享

stream.Writable是node中非常重要的类,其中的write()更是重中之重,理解这个方法的工作流程使得我们碰到node中各种Writable对象的时候心里更有底

分析之前先上write()的流程图,这个图只保留了主干部分而省略了一些细节,我们从中要留意几个重要步骤的时机和条件,它们都用黄色部分标出

  • 真正执行写操作
  • 写请求(即用户调用write()或者end()发起一个写操作的请求)的缓存与执行
  • 发射drain事件
  • 运行用户的callback

stream_write.jpg

writable()

writable()定义在文件lib/_stream_writable.js中,对部分源码做出注释

Writable.prototype.write = function(chunk, encoding, cb) {
  var state = this._writableState;//this._writableState是一个记录流内部状态的重要对象,我们稍后再分析它
  var ret = false;
  var isBuf = !state.objectMode && Stream._isUint8Array(chunk);//判断用户提供的数据类型是否为buffer
 
  if (isBuf && Object.getPrototypeOf(chunk) !== Buffer.prototype) {//转为node原生实现的buffer
    chunk = Stream._uint8ArrayToBuffer(chunk);
  }

  if (typeof encoding === 'function') {//判断是否指定了编码类型
    cb = encoding;
    encoding = null;
  }

  if (isBuf)
    encoding = 'buffer';//如果用户写入的数据类型为buffer,则编码类型必须为"buffer"
  else if (!encoding)
    encoding = state.defaultEncoding;//如果没有编码类型,则设置为默认编码

  if (typeof cb !== 'function')//如果没有提供回调函数则使用默认的回调函数
    cb = nop;

  if (state.ending)
    writeAfterEnd(this, cb);//writeAfterEnd的逻辑非常简单,先发射错误事件然后异步执行回调
  else if (isBuf || validChunk(this, state, chunk, cb)) {
    state.pendingcb++;//等待执行的callback数量加1
    ret = writeOrBuffer(this, state, isBuf, chunk, encoding, cb);//调用writeOrBuffer,由它决定是写入底层资源还是内部缓冲区
  }

  return ret;
};

write()首先对编码和用户传递过来的数据进行一些基本处理,然后观察自身状态。如果state.ending为真则抛出错误,否则进行下一步处理。从后面关于writeState的讨论我们可以知道state.ending为真的话意味着end()被调用但是尚未返回。正如文档上所说:

在调用了 stream.end() 方法之后,再调用 stream.write() 方法将会导致错误。

需要注意的是,写操作贯穿一系列的函数而不仅仅是当前函数,而其它函数也是由可能抛出错误的。因此文档上说明:

writable.write()方法向流中写入数据,并在数据处理完成后调用 callback 。如果有错误发生, callback 不一定以这个错误作为第一个参数并被调用。要确保可靠地检测到写入错误,应该监听 'error' 事件。

writeOrBuffer()

接下来看writeOrBuffer(),它的源码如下:

function writeOrBuffer(stream, state, isBuf, chunk, encoding, cb) {
  if (!isBuf) {//如果不是buffer类型,就根据编码解码
    var newChunk = decodeChunk(state, chunk, encoding);
    if (chunk !== newChunk) {
      isBuf = true;
      encoding = 'buffer';
      chunk = newChunk;
    }
  }
  var len = state.objectMode ? 1 : chunk.length;//处于对象模式的话则需要写入的数据长度为1,否则为chunk的长度

  state.length += len;//这个其实就是内部缓冲区的长度
  /**
  如果内部缓冲区的长度已经超过highWaterMark,则该次写操作会返回false,
  此时外部程序应该停止调用write()直到drain事件发生为止
  **/
  var ret = state.length < state.highWaterMark;
  if (!ret)
    state.needDrain = true;//说明缓冲区已经满,需要排空它

  if (state.writing || state.corked) {//如果处于写状态中或者调用了cork(),则把当前的写入请求先缓存起来,等到适当的时候再继续调用
    var last = state.lastBufferedRequest;
    state.lastBufferedRequest = {
      chunk,
      encoding,
      isBuf,
      callback: cb,
      next: null
    };
    if (last) {//如果链表不为空
      last.next = state.lastBufferedRequest;
    } else {//如果链表为空,则将这个写请求设为链表头部
      state.bufferedRequest = state.lastBufferedRequest;
    }
    state.bufferedRequestCount += 1;//更新链表的长度
  } else {//否则开始写入
    doWrite(stream, state, false, len, chunk, encoding, cb);
  }
  return ret;
}

writeOrBuffer()如其所名,根据当前的状态缓存写操作的请求或者直接进行写操作,如果有写操作正在执行或者调用了cork(),则把当前写请求缓存起来以供后用,否则调用doWrite()进行真正的写入。缓存起来的写操作在两种情况下可以得到执行:

  • 调用uncork()的时候
  • 调用onWrite()的时候onWrite()作为回调函数传递给了需要用户实现的_write()_writev(),因此当实现Writable流重写这两个方法的时候,当操作完成之后,无论成功失败,务必记得调用onWrite()。我看了一下fs.write(),确实在写入完成之后有调用它们。

至于uncork()onWrite()的调用时机我们下面再分析。

doWrite()

function doWrite(stream, state, writev, len, chunk, encoding, cb) {
  state.writelen = len;//需要写入的数据长度
  state.writecb = cb;//用户提供的回调
  state.writing = true;//标记正在进行写入
  state.sync = true;//是否同步调用
  if (writev)//_writev是批量写入数据,如果有就优先调用之
    stream._writev(chunk, state.onwrite);
  else//否则调用默认的版本
    stream._write(chunk, encoding, state.onwrite);
  state.sync = false;
}

doWrite()调用stream._writevstream._write真正执行写操作,这两个函数都是实现者负责提供的。doWrite()中设置了state.writing = true,这意味着后续的请求都会缓存起来。同时还设置了state.sync=true,意味着同步调用,这个设置项很有意思,在源码中的其它地方我们看到这样的注释:

defer the callback if we are being called synchronously to avoid piling up things on the stack

说明如果在同步模式下,用户的回调是需要异步执行,这是为了防止在栈上保存过多的信息。而异步情况下则没有这个限制,关于这样缘由不是十分清楚。

关于_writableState

_writableState是一个记录流内部状态的对象,在各个地方都会用到,在研究onWrite()等函数前需要先理解这个对象各个属性代表的意思

function WritableState(options, stream) {
  options = options || {};
  /*
  Duplex streams可以进行读和写,但是它内部共享一个option。
  在一些情况下要求读和写都需要设置option.XXXX属性的值。此时可以使用
  option.readableXXX 和 option.writableXXX来实现
  */
  var isDuplex = stream instanceof Stream.Duplex;

  this.objectMode = !!options.objectMode;

  if (isDuplex)
    this.objectMode = this.objectMode || !!options.writableObjectMode;

  // the point at which write() starts returning false
  // Note: 0 is a valid value, means that we always return false if
  // the entire buffer is not flushed immediately on write()
  this.highWaterMark = getHighWaterMark(this, options, 'writableHighWaterMark', isDuplex);  
  this.finalCalled = false;//_final()是否已经被调用  
  this.needDrain = false;// 是否需要发射drain事件 
  this.ending = false; // end()正在被调用 
  this.ended = false; // end()调用完毕返回 
  this.finished = false; //是否已经发射finish事件  
  this.destroyed = false;// 流是否已经被销毁
  /*
  字符串在传递给_write()时候是否先解码为buffer,一些node内部的流能通过这个属性在程序的底层进行优化
  */
  var noDecode = options.decodeStrings === false;
  this.decodeStrings = !noDecode;
  this.defaultEncoding = options.defaultEncoding || 'utf8';//默认编码
  /**
  缓冲区的长度,
  流的内部其实并没有维护一个真正的缓冲区,只是用了一个变量记录等待写入底层文件或者socket的数据的数量
  **/
  this.length = 0; 
  this.writing = false;//是否正在进行写操作
  this.corked = 0;//如果这个值为true,则所有的写操作请求都会被缓存直至调用了uncork()
  /*是否异步调用用户提供的callback,这个值为true的时候意味着需要异步调用用户的callback
  */
  this.sync = true;  
  this.bufferProcessing = false;//是否正在处理之前被缓冲下来的写请求  
  this.onwrite = onwrite.bind(undefined, stream);// 内部的回调,传递给 _write(chunk,cb)的回调函数
  
  this.writecb = null;// 用户的回调,传递给write(chunk,encoding,cb)  
  this.writelen = 0;// 调用_write()时候需要写入的数据的长度
  this.bufferedRequest = null;//被缓存的写请求的链表的表头
  this.lastBufferedRequest = null;//上一个被缓存的写请求
  this.pendingcb = 0;//等待被执行的用户回调函数数量,在发射finish事件的时候必须为0
  this.prefinished = false;//与同步的Transform streams相关,如果等待被执行的函数只剩下传递给_write的回调,则发射之  
  this.errorEmitted = false;//如果error事件已经发射,不应该再抛出错误  
  this.bufferedRequestCount = 0;// 链表长度
  //初始化第一个CorkedRequest,总是有一个CorkedRequest可以使用,并且内部最多维护两个CorkedRequest
  var corkReq = { next: null, entry: null, finish: undefined };
  corkReq.finish = onCorkedFinish.bind(undefined, corkReq, this);
  this.corkedRequestsFree = corkReq;
}

从上面可以看到,文档中所谓的writable流内部有一个缓冲区其实是并不存在的,流内部只是维护了一个变量,用来记录即将写入的数据的数量,如果这个数量小于writableHighWaterMark,则缓存这个写请求。因此文档中说了即使超出缓冲区大小也依然可以写是可以理解的。了解了这些状态的意义之后我们可以来看传递给_write()state.onwrite是如何利用_writableState的状态进行操作

onwrite()

onwrite()的主要功能是真实的写操作完成之后用来收尾,包括执行缓冲区中的写请求和更新流内部的状态。例如在fs.write()中可以看到onwrite()就是作为回调在执行的

WriteStream.prototype._write = function(data, encoding, cb) {
  //省略...
  fs.write(this.fd, data, 0, data.length, this.pos, (er, bytes) => {
    if (er) {
      if (this.autoClose) {
        this.destroy();
      }
      return cb(er);
    }
    this.bytesWritten += bytes;
    cb();
  });

  if (this.pos !== undefined)
    this.pos += data.length;
};
function onwrite(stream, er) {
  var state = stream._writableState;
  var sync = state.sync;
  var cb = state.writecb;//用户的callback,见上面_writeState的解释

  onwriteStateUpdate(state);//更新流内部的状态,更新之后意味着当前的写操作已经完成

  if (er)//如果_write操作发生任何错误则抛出异常,注意这个异常可能不是最原始的异常,见上面所描述
    onwriteError(stream, state, sync, er, cb);
  else {
    // 检查是否可以结束这个写操作了
    var finished = needFinish(state);

    if (!finished &&//不能处于finished状态
        !state.corked &&//不能处于corked状态
        !state.bufferProcessing &&//没有缓存的写操作正在被执行
        state.bufferedRequest) {//缓冲区不能为空
      clearBuffer(stream, state);
    }

    if (sync) {//如果是同步调用,则需要异步调用用户的callback,以防止往栈上堆积东西
      process.nextTick(afterWrite, stream, state, finished, cb);
    } else {
      afterWrite(stream, state, finished, cb);
    }
  }
}

其中onwriteStateUpdate()对流的各种状态进行了更新,包括以下几项:

function onwriteStateUpdate(state) {
  state.writing = false;//意味当前的写操作已经结束
  state.writecb = null;//清空用户提供的callback
  state.length -= state.writelen;//修改缓冲区长度
  state.writelen = 0;
}

可以看出onwriteStateUpdate()后意味着当前这个写操作已经完成。而needFinish(state)的判断标准如下:

function needFinish(state) {
  return (state.ending &&//调用了end
          state.length === 0 &&//缓冲区为空
          state.bufferedRequest === null &&//没有缓存的写操作
          !state.finished &&//finished的状态还不是true
          !state.writing);//不是正处于写操作中
}

它意味着当前写操作已经完成,并且缓冲区中的写操作也已经全部完成

总结起来就是onwrite()执行了真正的写入操作后,再去执行缓冲区中积压着的其它写请求。综上所述可以推论

  • 如果一次写操作没有完成,则剩下的写操作都会被缓存起来
  • 一旦有一项写操作完成,则会取出缓冲区中剩余的写操作并执行它们。

afterWrite()

afterWrite()是主干流程上最后一个重要的步骤了,它负责发射drain事件,通知调用者可以继续写入并且调用用户提供的callback

function afterWrite(stream, state, finished, cb) {
  if (!finished)
    onwriteDrain(stream, state);//如果不是处于finished状态,就发射drain事件
  state.pendingcb--;
  cb();//执行用户的callback
  finishMaybe(stream, state);//其它处理细节
}

function onwriteDrain(stream, state) {
  if (state.length === 0 && state.needDrain) {
    //如果缓冲区曾经超过警戒线,但是现在已经为空,就可以发射drain事件
    state.needDrain = false;
    stream.emit('drain');
  }
}
6 回复

学习了,感谢!

我是前年看stream的源代码的,当初看得不是太明白,觉得这部分代码太乱,就没深入研究,今天看你的梳理就好多了。我当时干脆就按照自己的理解实现了stream,write没有做缓存,直接抛给libuv了。由于我要实现完全的回调异步转协程同步,缓存比较难实现,不能够依靠write完成后的回调来触发下一次write。

这就带来个问题,单个协程内,socket不能同时读写,一个socket要等读完后然后写,写完后才能读,原本双工的socket就被搞成单工的了(当然这种理解也未必正确,系统调用write可能也不是真正的执行了一次socket传送,也许拷贝到缓冲区后就触发了回调,希望有大神可以讲解下)。我也参考的go的思路,go协程要实现socket双工就要读写各开一个协程,lua实现双工的思路也是类似的。

这个世界就是这样,没有哪种解决方案是完美的,总是需要各种妥协。

@coordcn 请问下大牛你平时用什么技术/做哪块业务,感觉水平远高于我,你做的那块我连个基本概念都没有。特别是协程这东西,我用过的语言都没这个元素,之前学习go的时候稍微碰了下但是印象不深刻。

@youth7 我是从nodejs转openresty的,nodejs还是回调异步阶段我就转了,那个阶段promise才刚刚起步,当初很多人信誓旦旦的说promise是回调异步的终极解决方案,后来generator/yield阶段又有很多人说终极解决方案,现在async/await来了终极论又出来了。这不是后面的终极打前面终极的脸么?我从回调异步到promise过渡阶段就对这种模式有怀疑了,刚好那段时间fibjs过来宣传,我就浏览下fibjs的代码,fibjs的整体思路我是认可的,但我不认可在v8基础上hack协程,在这个过程中我接触了协程,当时思路就被打开了,接触了很多协程相关的语言,最后由于自身能力有限,选择了相对简单的lua,我自己也基于libuv实现了一个玩具luaio,实现过程中,参考了fibjs,nodejs,luvit,openresty。原本雄心勃勃的想按照自己的思路再造一个lua版的ndoejs(跟luvit有本质的区别,luvit是nodejs的lua复刻版,luaio是想实现nodejs的openresty参考版),最终还是半途而废了,实现了process,socket,fs等。

我个人觉得现有的语言解决异步转同步问题都存在瑕疵,没有完美的技术路线,选择只是个人喜好而已,协程不是万能,async/await也不是万能,在编码难度和性能之间找一个平衡点就行了。如果当初nodejs没有排斥协程,没有排斥伪同步(现在实质上已经拥抱了),我也就没必要跟着折腾了,当然折腾的过程也学到了很多东西,读了很多源代码,但实质上个人收入反而没有坚持一种技术路线的人多,也没有选择热门语言的人多。

楼主这样的我是特别喜欢的,建议你能在node上坚持下去,不要犯我一样的错误,少走弯路,你对node研究水平已经超过这个论坛的99%的人。当然其他语言也要学习,有对比对技术的理解会更加深刻。

@coordcn 非常感谢!正好想趁机研究一下协程,这方面是空白。

回到顶部