[救助] stream.readable源码中的一个疑惑
发布于 6 年前 作者 Shasharoman 2521 次浏览 来自 问答

学习node.js stream工作原理过程中,在stream.readable中看到如下一段代码:

// If the user pushes more data while we're writing to dest then we'll end up
// in ondata again. However, we only want to increase awaitDrain once because
// dest will only emit one 'drain' event for the multiple writes.
// => Introduce a guard on increasing awaitDrain.
var increasedAwaitDrain = false;
src.on('data', ondata);

function ondata(chunk) {
    debug('ondata');
    increasedAwaitDrain = false;
    var ret = dest.write(chunk);
    if (false === ret && !increasedAwaitDrain) {
        // If the user unpiped during `dest.write()`, it is possible
        // to get stuck in a permanently paused state if that write
        // also returned false.
        // => Check whether `dest` is still a piping destination.
        if (((state.pipesCount === 1 && state.pipes === dest) ||
                (state.pipesCount > 1 && state.pipes.indexOf(dest) !== -1)) &&
            !cleanedUp) {
            debug('false write response, pause', src._readableState.awaitDrain);
            src._readableState.awaitDrain++;
            increasedAwaitDrain = true;
        }
        src.pause();
    }
}

虽然看了increasedAwaitDrain变量上的注释,但还是不能理解这个变量的作用,Node.js是单线程的,那么对于下面的这三行同步代码:

    increasedAwaitDrain = false;
    var ret = dest.write(chunk); // dest.write中有异步逻辑,但是同步返回
    if (false === ret && !increasedAwaitDrain) 
  • 第一行increasedAwaitDrain赋值成false
  • 第二行执行同步代码var ret = dest.write(chunk);
  • 第三行在if中用到了increasedAwaitDrain这个变量

我理解的是这个变量一定是false,而按照increasedAwaitDrain定义上方的注释,第三行代码中increasedAwaitDrain可能为true?这有点违背我的知识和经验…什么情况下if中increasedAwaitDrain的值为true???

百思不得解,望高手帮忙解答,感激不尽!!!

4 回复

awaitDrain 是控制 flow 的,不是 0 就是 1, 在触发 drain 的时候会自减一次,

function pipeOnDrain(src) {
  return function() {
    var state = src._readableState;
    debug('pipeOnDrain', state.awaitDrain);
    if (state.awaitDrain)
      state.awaitDrain--;
    if (state.awaitDrain === 0 && EE.listenerCount(src, 'data')) {
      state.flowing = true;
      flow(src);
    }
  };
}

正常是流程是 emit(‘data’) 触发一次 ondata, 如果 ret = dest.write(chunk) 返回 false (也就是 state.length < state.highWaterMark) awaitDrain + 1. 如果这时候你再次 push 一次,也返回 false 了, awaitDrain 2, 那么对不起 readable 就断流了。所以要加一个 increasedAwaitDrain ,


  var ret = state.length < state.highWaterMark;
  // we must ensure that previous needDrain will not be reset to false.
  if (!ret)
    state.needDrain = true;

应该是这样。。另外个人觉得看 stream 很不值, 看完两天就忘掉

@yviscool 你讲的内容我理解,我不是对pipe这个完整流程有疑惑,而是我在问题中提到的那三行代码:

increasedAwaitDrain = false;
var ret = dest.write(chunk); // dest.write中有异步逻辑,但是同步返回
if (false === ret && !increasedAwaitDrain) 
  • 第一行increasedAwaitDrain赋值成false
  • 第二行执行同步代码var ret = dest.write(chunk);
  • 第三行在if中用到了increasedAwaitDrain这个变量

我理解的是这个变量一定是false,而按照increasedAwaitDrain定义上方的注释,第三行代码中increasedAwaitDrain可能为true?这有点违背我的知识和经验…

// 多谢答主的友情tips,其实我不是为了学习Node.js而去看这段代码,是因为在实践遇到pipe相关问题才去翻代码看的 ^ _ ^

@Shasharoman false === false && !false …

已经确认increasedAwaitDrain在整段代码中就是一个无用存在,因为我看的是v8.11.1版本的代码,而最新版本的实现中已经没有这段代码

看来多翻翻Node.js源代码,保不定就可以给官方提PR了,哈哈

回到顶部