node源码粗读(11):透过console.log来看stream.write
发布于 6 年前 作者 xtx1130 3565 次浏览 来自 分享

这篇文章从console.log入手,对流的写入进行层层剖析。

console.log浅析

console.log的实现浅析很容易,直接转到console.js中:

// ./lib/console.js

// ...
Console.prototype.log = function log(...args) {
  write(this._ignoreErrors,
        this._stdout,
        this[kFormatForStdout](args),
        this._stdoutErrorHandler,
        this[kGroupIndent]);
};

其中write函数定义如下:

function write(ignoreErrors, stream, string, errorhandler, groupIndent) {
 // ...
  try {
    // Add and later remove a noop error handler to catch synchronous errors.
    stream.once('error', noop);

    stream.write(string, errorhandler);
  } catch (e) {
    // console is a debugging utility, so it swallowing errors is not desirable
    // even in edge cases such as low stack space.
    if (isStackOverflowError(e))
      throw e;
    // Sorry, there's no proper way to pass along the error here.
  } finally {
    stream.removeListener('error', noop);
  }
}

stream则是在创建实例的时候传递进去的:

module.exports = new Console({
  stdout: process.stdout,
  stderr: process.stderr
});

由此可见,console.log实质是process.stdout.write

process.stdout的实现

接下来,我们直接溯源到process.stdout,相关源码在stdio.js中:

// ./lib/internal/process/stdio.js
// ...
function getStdout() {
    if (stdout) return stdout;
    stdout = createWritableStdioStream(1);
    stdout.destroySoon = stdout.destroy;
    stdout._destroy = function(er, cb) {
      // Avoid errors if we already emitted
      er = er || new ERR_STDOUT_CLOSE();
      cb(er);
    };
    if (stdout.isTTY) {
      process.on('SIGWINCH', () => stdout._refreshSize());
    }
    return stdout;
  }
// ...
Object.defineProperty(process, 'stdout', {
    configurable: true,
    enumerable: true,
    get: getStdout
  });
// ...
function createWritableStdioStream(fd) {
  var stream;
  const tty_wrap = process.binding('tty_wrap');

  // Note stream._type is used for test-module-load-list.js

  switch (tty_wrap.guessHandleType(fd)) {
    case 'TTY':
      var tty = require('tty');
      stream = new tty.WriteStream(fd);
      stream._type = 'tty';
      break;

    case 'FILE':
      var fs = require('internal/fs');
      stream = new fs.SyncWriteStream(fd, { autoClose: false });
      stream._type = 'fs';
      break;

    case 'PIPE':
    case 'TCP':
      var net = require('net');
      stream = new net.Socket({
        fd: fd,
        readable: false,
        writable: true
      });
      stream._type = 'pipe';
      break;

    default:
      // Probably an error on in uv_guess_handle()
      throw new ERR_UNKNOWN_STREAM_TYPE();
  }

由此可见,process.stdout本质上是getStdout,而getStdout调用了createWritableStdioStream方法来创建的stdout,最终由tty.WriteStream创建了一个stream并返回给getStdout

net.socket介绍

调用栈就不再一一溯源了,在这里简单介绍一下。
tty中,会调用:

// ./lib/tty.js
  net.Socket.call(this, {
    handle: tty,
    readable: false,
    writable: true
  });
inherits(WriteStream, net.Socket);

进而在net.js中通过:

stream.Duplex.call(this, options);

最终生成了一个Duplex流(可读写流,网上资料很多不做过多介绍)。而Duplex流的write操作则来自_stream_writable.js:

// ./lib/_stream_duplex.js
function Duplex(options) {
  // ...
  Writable.call(this, options);
  //...
  if (options && options.readable === false)
    this.readable = false;

  if (options && options.writable === false)
    this.writable = false;

}

至此,可以列出基本的观光路线了:

console.js -> stdio.js ->tty.js -> net.js -> _stream_duplex.js -> _stream_writable.js

接下来才真正开始进行源码的解读。

process.stdout.write在js层对数据的写入

我们重点放到_stream_writable.js中,在创建Writable类的时候会进行如下操作:

// ./lib/_stream_writable.js
function Writable(options) {
// ...
if (options) {
    if (typeof options.write === 'function')
      this._write = options.write;

    if (typeof options.writev === 'function')
      this._writev = options.writev;

    if (typeof options.destroy === 'function')
      this._destroy = options.destroy;

    if (typeof options.final === 'function')
      this._final = options.final;
  }
// ...
}

重点注意一下this._write = options.write;在这里我就不详细溯源了,其实options.write来自net.js中的:

// ../lib/net.js
Socket.prototype._write = function(data, encoding, cb) {
 // ...
  var req = createWriteWrap(this._handle, afterWrite);
  if (writev)
    writevGeneric(this, req, data, cb);
  else
    writeGeneric(this, req, data, encoding, cb);
  if (req.async)
    this[kLastWriteQueueSize] = req.bytes;
};

writev是处理多数据流块用的,在这里我们只关注writeGeneric(this, req, data, encoding, cb);

createWriteWrap以及writeGeneric来自模块stream_base_commons.js

// ./lib/internal/stream_base_commons.js
const { WriteWrap } = process.binding('stream_wrap');
function createWriteWrap(handle, oncomplete) {
  var req = new WriteWrap();

  req.handle = handle;
  req.oncomplete = oncomplete;
  req.async = false;

  return req;
}

function writeGeneric(self, req, data, encoding, cb) {
  var err = handleWriteReq(req, data, encoding);

  afterWriteDispatched(self, req, err, cb);
}

代码只有简单的两行,却是最终console.log能最终打印到控制台的关键代码。

stream.write的底层实现

我们先详细解读一下WriteWrap,WriteWrap来自内部stream_wrap,视线直接转移到stream_wrap.cc中,其中有对WriteWrap的定义:

void LibuvStreamWrap::Initialize(Local<Object> target,
                                 Local<Value> unused,
                                 Local<Context> context) {
  // ...
  Local<FunctionTemplate> ww =
      FunctionTemplate::New(env->isolate(), is_construct_call_callback);
  ww->InstanceTemplate()->SetInternalFieldCount(StreamReq::kStreamReqField + 1);
  Local<String> writeWrapString =
      FIXED_ONE_BYTE_STRING(env->isolate(), "WriteWrap");
  ww->SetClassName(writeWrapString);
  AsyncWrap::AddWrapMethods(env, ww);
  target->Set(writeWrapString, ww->GetFunction());
  env->set_write_wrap_template(ww->InstanceTemplate());
  // ...
}

在这里稍微注意下AsyncWrap::AddWrapMethods,这是AsyncWrap专门针对异步操作增加getAsyncId方法用的API。我们转到stream_wrap的头文件stream_wrap.h中看看LibuvStreamWrap是如何定义的:

// ./src/stream_wrap.h
class LibuvStreamWrap : public HandleWrap, public StreamBase {
 public:
  // ...
  int GetFD() override;
  bool IsAlive() override;
  bool IsClosing() override;
  bool IsIPCPipe() override;

  // JavaScript functions
  int ReadStart() override;
  int ReadStop() override;

  // Resource implementation
  int DoShutdown(ShutdownWrap* req_wrap) override;
  int DoTryWrite(uv_buf_t** bufs, size_t* count) override;
  int DoWrite(WriteWrap* w,
              uv_buf_t* bufs,
              size_t count,
              uv_stream_t* send_handle) override;
 // ...
  ShutdownWrap* CreateShutdownWrap(v8::Local<v8::Object> object) override;
  WriteWrap* CreateWriteWrap(v8::Local<v8::Object> object) override;
}

LibuvStreamWrap继承于HandleWrapStreamBase,这样就很好理解了。HandleWrap包裹的是libuv handle,而StreamBase则完全是关于stream的操作,StreamBase提供基本的读写能力的同时,开放出来一些方法让子类去覆写,实现定制化的需求,例如DoTryWriteStreamBase中在写入流的时候调用的方法,而这个方法是专门提供出来让子类去覆写的。LibuvStreamWrap在构建的时候,通过覆写一些方法,实现对流定制化的操作。

stream.write的写入过程

接下来我们看一下流的写入过程,js方面调用的API就不过多介绍了,通过js可以定位到stream_base.cc中的函数:

int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
  //..
  if (try_write) {
    data_size = StringBytes::Write(env->isolate(),
                                   stack_storage,
                                   storage_size,
                                   string,
                                   enc);
    buf = uv_buf_init(stack_storage, data_size);

    uv_buf_t* bufs = &buf;
    size_t count = 1;
    err = DoTryWrite(&bufs, &count);

    synchronously_written = count == 0 ? data_size : data_size - buf.len;
    bytes_written_ += synchronously_written;
    // ...
    }
  }
  //..
}

很容易注意到,真正的写入过程是刚才所提到的DoTryWrite,我们跟进一下DoTryWrite,由于这个函数被覆写了,所以我们直接去stream_wrap.cc

int LibuvStreamWrap::DoTryWrite(uv_buf_t** bufs, size_t* count) {
  int err;
  size_t written;
  uv_buf_t* vbufs = *bufs;
  size_t vcount = *count;

  err = uv_try_write(stream(), vbufs, vcount);
  if (err == UV_ENOSYS || err == UV_EAGAIN)
    return 0;
  if (err < 0)
    return err;

  written = err;
  for (; vcount > 0; vbufs++, vcount--) {
    // Slice
    if (vbufs[0].len > written) {
      vbufs[0].base += written;
      vbufs[0].len -= written;
      written = 0;
      break;
    // Discard
    } else {
      written -= vbufs[0].len;
    }
  }
  *bufs = vbufs;
  *count = vcount;
  return 0;
}

可以看到,其实最终写入调用的函数是libuv的uv_try_writeuv_try_write本质和uv_write是一样的,只不过uv_try_write会预先判断这个流能不能立即完成,如果不能立即完成,则不会将写入请求排入queue中。可以简单看一下uv_try_write代码,在stream.c中:

int uv_try_write(uv_stream_t* stream,
                 const uv_buf_t bufs[],
                 unsigned int nbufs) {
  int r;
  int has_pollout;
  size_t written;
  size_t req_size;
  uv_write_t req;

  /* Connecting or already writing some data */
  if (stream->connect_req != NULL || stream->write_queue_size != 0)
    return UV_EAGAIN;

  has_pollout = uv__io_active(&stream->io_watcher, POLLOUT);

  r = uv_write(&req, stream, bufs, nbufs, uv_try_write_cb);
  if (r != 0)
    return r;
  // ...
}

可以看到最终,还是使用r = uv_write(&req, stream, bufs, nbufs, uv_try_write_cb);uv_write实现的流的写入操作。

console.log写入之后的回调

流的写入流程已经分析完了,接下来看一下写入之后做了什么。视线回到_stream_writable.js中:

function onwrite(stream, er) {
  var state = stream._writableState;
  var sync = state.sync;
  var cb = state.writecb;
  // ...
    if (sync) {
      process.nextTick(afterWrite, stream, state, finished, cb);
    } else {
      afterWrite(stream, state, finished, cb);
    }
  }
}

js的代码我就不一一溯源了,onwrite函数可以从Writable.prototype.write中追溯过来,由于state.synctrue所以会走入到process.nextTick(afterWrite, stream, state, finished, cb);这块的逻辑。而nextTick在创建TickObject时,会有如下逻辑:

// ./lib/internal/process/next_tick.js
  class TickObject {
    constructor(callback, args, triggerAsyncId) {
      // this must be set to null first to avoid function tracking
      // on the hidden class, revisit in V8 versions after 6.2
      this.callback = null;
      this.callback = callback;
      this.args = args;

      const asyncId = newAsyncId();
      this[async_id_symbol] = asyncId;
      this[trigger_async_id_symbol] = triggerAsyncId;

      if (initHooksExist()) {
        emitInit(asyncId,
                 'TickObject',
                 triggerAsyncId,
                 this);
      }
    }
  }

nextTick在创建之初,便被AsyncWrap包裹,并且作为异步事件的’TickObject’资源类型。所以在任何涉及到AsyncHooks回调中调用console.log,会直接导致AsyncHooks无限递归调用
nextTick说完了,那么大家可能还会好奇,console.log通过nextTick执行了什么回调,简单的从上面代码来看是afterWriteafterWrite代码如下:

// ./lib/_stream_writable.js
function afterWrite(stream, state, finished, cb) {
  if (!finished)
    onwriteDrain(stream, state);
  state.pendingcb--;
  cb();
  finishMaybe(stream, state);
}

可以看到最终调用的回调函数是cb,这个cb在哪里进行定义的呢?经过溯源,这个cb的定义出现在console.js中:

var prop = {
    writable: true,
    enumerable: false,
    configurable: true
  };
prop.value = createWriteErrorHandler(stdout);
Object.defineProperty(this, '_stdoutErrorHandler', prop);
function createWriteErrorHandler(stream) {
  return (err) => {
    if (err !== null && !stream._writableState.errorEmitted) {
      if (stream.listenerCount('error') === 0) {
        stream.on('error', noop);
      }
    }
  };
}

createWriteErrorHandler()是这个cb的真面目。代码意图也很明显,意即有错误的时候才会触发其中的逻辑,并且如果在错误没有被监听的情况下,增加错误监听函数noop(空函数)。

结语

至此,整个console.log的过程分析完毕,由于其中涉及到的部分很多,和整体逻辑关系不大的部分,例如tty_wrap.cc等没有逐步分析,而且js的调用栈由于篇幅关系,也没有一一分析。本篇文章可以帮助读者把console.logstream以及AsyncHook串联起来,其中省略一些溯源步骤还望读者见谅。

by 小菜

原文地址:https://github.com/xtx1130/blog/issues/24,欢迎watch和star,如果文中有讲解错误的地方,还请大神斧正。

1 回复

可以回复我一下么

回到顶部