readable事件、data事件和背压(back pressure)
发布于 7 年前 作者 youth7 4846 次浏览 来自 分享

学习Readable流的时候一直对readabledata的关系不是很清楚,经过搜索和阅读源码后算是理清一点点,现在总结如下。

streamify-your-node-program :这是目前为止我看到的最为全面细致的关于流的教程,强烈建议先阅读。)

预备知识

read()Readable流的基石,无论流处于什么模式,只要是涉及读取数据最终都会转到read()上面来,它的主要功能是:

  • 读取缓冲区数据并返回给消费者,并按需发射各种事件
  • 按需调用_read()_read()会从底层汲取数据,并填充缓冲区

它的流程大致如下:

stream_read.jpg

务必记住read()是同步的,因此它并不是直接从底层数据那里读取数据,而是从缓冲区读取数据,而缓冲区的数据则是由_read()负责补充_read()可以是同步或者异步。 nodejs内部的实现经常会调用read(0),因为参数是0所以不会破坏缓冲区中的数据和状态,但可以触发_read()来从底层汲取数据并填充缓冲区。
_read()是流实现者需要重写的函数,它从底层汲取数据并填充缓冲区(flowing模式不会填充而是直接发送给消费者),它的大致流程如下:

stream__read.jpg

注意在addChunk()后会根据情况发射readable或者data事件,然后调用read()_read(0)➞…➞addChunk()从而形成一个循环,因此一旦调用了_read()流就会默默在底层读取数据,直到数据都消耗完为止

readable事件

文档上关于readable事件的描述如下

事实上, ‘readable’ 事件表明流有了新的动态:要么是有了新的数据,要么是到了流的尾部。 对于前者, stream.read() 将返回可用的数据。而对于后者, stream.read() 将返回 null。

由此我们可以知道readable事件意味着:

  • 流有了新的数据(注意,这里只说明有了新数据,至于新数据如何读取是调用者自己的事情
  • 流到达了尾部

你可以将下面的代码保存为test.js,修改size的值并运行,观察结果有何异同

"use strict";
const size = 1; //将size设为1或undefined
const rs = require("fs").createReadStream("./test.js");
rs.on("readable", () => {
	console.log(rs.read(size));
});

总之,readable只是负责通知用户流有了新的动态,事件发生的时候是否读取数据,如何读取数据则是调用者的事情(如果一直不读取事件,则数据会存在于缓冲区中) 。 例如可以给readable注册一个回调函数,该回调函数调用无参的read(),它会读取并清空缓冲区的全部数据,这样就使得每次readable发生的时候都可以读取到最新的数据。

readable的触发时机

readable在以下几种情况会被触发:

  • onEofChunk中,且_read()从底层汲取的数据为空。这个场景意味着流中的数据已经全部消耗完
  • addChunk()中,且_read()从底层汲取的数据不为空且处于pause模式,这个场景意味着流中有新数据
  • read(n)中,且n为0是的某些情况下(在测试过程中我一直无法触发这分支,不知道是对应哪种情况)。
  • 通过on()readable添加监听器,如果此时缓冲区有数据则会触发,这个场景意味着流中已经有数据可供read()直接调用。

data事件

data事件的意义则明确很多,文档上关于readable事件的描述如下(为了更加精确这里我们引用原文)

The ‘data’ event is emitted whenever the stream is relinquishing ownership of a chunk of data to a consumer.

readable不同的是,data事件代表的意义清晰单一:流将数据交付给消费者时触发。并且会将对应的数据通过回调传递给用户。

data的触发时机

从源码来看,有两个地方会触发data事件

  • read()中,如果缓冲区此时有数据可以返回给调用者,这种情况只会在调用pipe()时候发生,如果readable()被暂停过并重新启动,此时缓冲区内残留的数据会通过read()读出然后借助data事件传递出去。

  • addChunk(),此时_read()从底层汲取的数据不为空,且满足以下条件

    • 处于flowing模式
    • 缓冲区为空
    • 处于异步调用模式

    在这种情况下,数据直接就交付给消费者了,并没有在缓冲区缓存

而文档中的说法是:

当流转换到 flowing 模式时会触发该事件。调用readable.pipe(), readable.resume() 方法,或为 ‘data’ 事件添加回调可以将流转换到 flowing 模式。 ‘data’ 事件也会在调用 readable.read() 方法并有数据返回时触发。

似乎文档跟源码不太一致?其实调用readable.pipe()readable.resume()或为 data 事件添加回调,最终都会依次调用read()_read()addChunk()然后最终发射data事件

结合_read()的流程图,可以知道通过on()readabledata事件添加监听器后,程序就开始循环汲取底层数据直至消耗完为止

同时监听readabledata会怎样?

"use strict";
const rs = require("fs").createReadStream("./test.js");
rs.on("readable",()=>console.log("readable触发"));
rs.on("data",console.log);

程序输出如下:

node test.js
data触发 <Buffer 22 75 73 65 20 73 74 72 69 63 74 22 3b 0a 63 6f 6e 73 74 20 72 73 20 3d 20 72 65 71 75 69 72 65 28 22 66 73 22 29 2e 63 72 65 61 74 65 52 65 61 64 53 ... >
readable触发 null

似乎readable永远得不到数据?从上面的流程图我们知道,在addChunk()中当有新数据到来的时候,redabledata都有可能触发,那究竟触发哪个?看看addChunk()的源码

function addChunk(stream, state, chunk, addToFront) {
  //处于flowing模式,且缓冲区为空,且为异步调用时候,触发data事件	
  if (state.flowing && state.length === 0 && !state.sync) {
    state.awaitDrain = 0;
    stream.emit('data', chunk);
  } else {
    state.length += state.objectMode ? 1 : chunk.length;//更新缓冲区已有数据数量
    if (addToFront)
      state.buffer.unshift(chunk);//插入缓冲区头部
    else
      state.buffer.push(chunk);//插入缓冲区尾部

    if (state.needReadable)
      emitReadable(stream);//触发readable事件
  }
  maybeReadMore(stream, state);
}

由于为data事件添加回调会使得流进入flowing模式,因此我们的例子中,有新数据时只会发射data事件,而readable事件则流结束的时候发射一次

pipe的背压(back pressure)平衡机制

假设现在有一对ReadableWritable,要求编程实现从Readable里面读取数据然后写到Writable中。那么你面临的问题很有可能就是如果两者对数据的产生/消费速度不一致,那么需要手动协调两者速度使得任务可以完成。思路可能这样:

  • 0、Readable进入flowing模式,然后进入步骤2
  • 1,监听data事件,一旦有数据到达则进入步骤2,如果捕捉到end事件就结束任务
  • 2,将数据写入到Writable,如果返回true进入步骤1,否则进入步骤3
  • 3,Readable进入pause模式,并等待Writable发射drain事件
  • 4,如果Writable发射了drain事件,则返回步骤1

而事实上pipe()的过程和上述很相似,它的源码如下:

Readable.prototype.pipe = function(dest, pipeOpts) {
 //省略...
 var ondrain = pipeOnDrain(src);
  dest.on('drain', ondrain);//当写操作返回false的时候,正常情况下必然会在稍后触发一个drain事件
  src.on('data', ondata);
  function ondata(chunk) {
    var ret = dest.write(chunk);
    if (ret === false) {//如果写操作的返回值为false,则暂停readable流
      if (((state.pipesCount === 1 && state.pipes === dest) ||
           (state.pipesCount > 1 && state.pipes.indexOf(dest) !== -1)) &&
          !cleanedUp) {
        state.awaitDrain++;
      }
      src.pause();
    }
  }
  //省略...
  return dest;
};


function pipeOnDrain(src) {
  return function() {
    var state = src._readableState;
    if (state.awaitDrain)
      state.awaitDrain--;
    if (state.awaitDrain === 0 && EE.listenerCount(src, 'data')) {
      state.flowing = true;//将流重新设为flowing模式
      flow(src);//将缓冲区中残留的数据读取并重新触发data事件
    }
  };
}

可以看到:

  • 当向dest写入数据返回false时,马上调用src.pause()暂停流。src.pause()将暂停事件流,但不会暂停数据生成,也就是说src此时依然汲取底层数据填充缓冲区,只是暂停发射data事件,等到缓冲区的数据量超过警戒线才会停止汲取
  • 因为写入数据返回false,因此在稍后的某个时候dest必然会发射drain事件。
  • drain事件发生后,src再次进入flowing模式自动产生数据,同时将缓冲区中的残留数据写入dest

关于pipe()还可以参考这篇文章 通过源码解析 Node.js 中导流(pipe)的实现

2 回复

这个研究的深

@frank320 其实就看了下源码,很多前人已经说过,拾人牙慧而已

回到顶部