node源码粗读(10):通过fs.write来看异步I/O和回调执行的整体流程
发布于 7 年前 作者 xtx1130 3373 次浏览 来自 分享

这篇文章主要从fs.write入手,简单讲述node中写文件同步与异步的实现以及详细解释异步I/O的回调如何通过AsyncWrap串起来nextTickMicroTasks

昨天有个朋友问我:

读源码什么都不懂,无从入手该怎么办?

我感觉透过现象来看本质是一个很好的入手方向。这篇文章就从我们熟悉的fs.writefs.writeSync入手,透过这些简单的API,来看看node究竟在里面做了什么。

fs.write和fs.writeSync

相信有一些node基础的开发者或者之前读过我的文章的读者都会知道console.log是基于process.stdout.write实现的,意即console.log是异步操作(可能有人会提出疑问:既然是异步,如何保证输出是正确的?请自己移步./lib/console.js查看)。所以如果我们想要调试node源码异步回调的时候,如果使用console.log会造成递归,这种情况下一般都会使用fs.writeSync。在js层面的源码中,fs.writefs.writeSync在调用的时候其实只差了一个参数:

// ./lib/fs.js
fs.write = function(fd, buffer, offset, length, position, callback) {
  function wrapper(err, written) {
    // Retain a reference to buffer so that it can't be GC'ed too soon.
    callback(err, written || 0, buffer);
  }
  // ...
  const req = new FSReqWrap();
  req.oncomplete = wrapper;

  if (isUint8Array(buffer)) {
   // ...
    return binding.writeBuffer(fd, buffer, offset, length, position, req); //注意这里
  }
  // ...
  return binding.writeString(fd, buffer, offset, length, req); //注意这里
};
fs.writeSync = function(fd, buffer, offset, length, position) {
  validateUint32(fd, 'fd');
  const ctx = {};
  let result;
  if (isUint8Array(buffer)) {
    // ...
    result = binding.writeBuffer(fd, buffer, offset, length, position,
                                 undefined, ctx); //注意这里
  } else {
    // ...
    result = binding.writeString(fd, buffer, offset, length,
                                 undefined, ctx); //注意这里
  }
  handleErrorFromBinding(ctx);
  return result;
};

通过对比可以发现,在调用writeBuffer或者writeString的时候,fs.write多了一个req的参数,而fs.writeSync拥有ctx参数。接下来我们去node_file.cc看看这两个到底区别在哪里。在这里我们以writeBuffer为例:

static void WriteString(const FunctionCallbackInfo<Value>& args) {
  // ...
  FSReqBase* req_wrap = GetReqWrap(env, args[4]);
  const bool is_async = req_wrap != nullptr;
  // ...
  if (is_async) {  // write(fd, string, pos, enc, req)
    CHECK_NE(req_wrap, nullptr);
    len = StringBytes::StorageSize(env->isolate(), value, enc);
    FSReqBase::FSReqBuffer& stack_buffer =
        req_wrap->Init("write", len, enc);
    // ...
    int err = uv_fs_write(env->event_loop(), req_wrap->req(),
                          fd, &uvbuf, 1, pos, AfterInteger); //注意这里
    req_wrap->Dispatched();
    if (err < 0) {
     // ...
    } else {
      req_wrap->SetReturnValue(args);
    }
  } else { 
    CHECK_EQ(argc, 6);
    fs_req_wrap req_wrap;
    // ...
    uv_buf_t uvbuf = uv_buf_init(buf, len);
    int bytesWritten = SyncCall(env, args[5], &req_wrap, "write",
                                uv_fs_write, fd, &uvbuf, 1, pos); //注意这里
    args.GetReturnValue().Set(bytesWritten);
  }
}

在这里我们可以很明显的开出来其中用一个if...else把同步和异步的逻辑区分开了。async直接调用uv_fs_write而sync则调用SyncCall()。当时我看到这里的时候还有些许错愕,因为node_file.cc中提供了AsyncCall()方法,单独这个API没有使用,于是我翻到了这个pr,是为了防止内存泄漏,所以才进行的单独的处理,具体信息可以去pr中了解。
对比一下uv_fs_write和下面的SyncCall,可以发现uv_fs_write多出了AfterInteger这个参数,AfterInteger定义如下:

void AfterInteger(uv_fs_t* req) {
  FSReqBase* req_wrap = static_cast<FSReqBase*>(req->data);
  FSReqAfterScope after(req_wrap, req);

  if (after.Proceed())
    req_wrap->Resolve(Integer::New(req_wrap->env()->isolate(), req->result));
}

其中after.Processd()定义如下:

bool FSReqAfterScope::Proceed() {
  if (req_->result < 0) {
    Reject(req_);
    return false;
  }
  return true;
}

根据函数中的if判断以及req_wrap->Resolve很明显可以看出来这是一个回调函数,经过比对可以发现uv_fs_write多出的AfterInteger其实是一个函数。req_wrap会在下面进行介绍,我们现在重点先关注一下uv_fs_write

uv_fs_write

视线转移到libuv中的fs.c文件中,其中定义了uv_fs_write,这里只摘抄重点的部分进行解读:

#define INIT(subtype)                                                         \
  do {                                                                        \
    if (req == NULL)                                                          \
      return UV_EINVAL;                                                       \
    UV_REQ_INIT(req, UV_FS);                                                  \
    req->fs_type = UV_FS_ ## subtype;                                         \
    req->result = 0;                                                          \
    req->ptr = NULL;                                                          \
    req->loop = loop;                                                         \
    req->path = NULL;                                                         \
    req->new_path = NULL;                                                     \
    req->bufs = NULL;                                                         \
    req->cb = cb;                                                             \
  }                                                                           \
  while (0)

#define POST                                                                  \
  do {                                                                        \
    if (cb != NULL) {                                                         \
      uv__req_register(loop, req);                                            \
      uv__work_submit(loop, &req->work_req, uv__fs_work, uv__fs_done);        \
      return 0;                                                               \
    }                                                                         \
    else {                                                                    \
      uv__fs_work(&req->work_req);                                            \
      return req->result;                                                     \
    }                                                                         \
  }                                                                           \
  while (0)
int uv_fs_write(uv_loop_t* loop,
                uv_fs_t* req,
                uv_file file,
                const uv_buf_t bufs[],
                unsigned int nbufs,
                int64_t off,
                uv_fs_cb cb) {
  INIT(WRITE);
  // ...
  POST;
}

根据定义可以发现,上文中的AfterInteger其实是作为cb参数传入到了uv_fs_write中。接下来注意一下POST宏:

 if (cb != NULL) {                                                         \
      uv__req_register(loop, req);                                            \
      uv__work_submit(loop, &req->work_req, uv__fs_work, uv__fs_done);        \
      return 0;                                                               \
    }                                                                         \
    else {                                                                    \
      uv__fs_work(&req->work_req);                                            \
      return req->result;                                                     \
    }    

在这里对cb进行了判断,如果无cb则直接调用uv__fs_work,如果有cb则会把uv__fs_work放到thread_pool中调用以形成异步I/O。libuv通过这种方式,实现了同步和异步。

异步的回调和同步的返回

同步fs.writeSync的返回

首先我们先关注一下同步的fs.writeSync的返回,视线回到node_file.cc中,关于同步的返回在这里:

int bytesWritten = SyncCall(env, args[5], &req_wrap, "write",
                                uv_fs_write, fd, &uvbuf, 1, pos);
    args.GetReturnValue().Set(bytesWritten);

可以很容易的看到,fs.writeSync的返回为写入的字节数。

异步fs.write的回调

在刚才已经介绍过fs.write会把AfterInteger作为cb传入到uv_fs_write中,在事件循环开始之后会在poll阶段执行AfterInteger回调。而AfterIntenger中最终执行的是:

req_wrap->Resolve(Integer::New(req_wrap->env()->isolate(), req->result));

Resolve代码如下:

void FSReqWrap::Resolve(Local<Value> value) {
  Local<Value> argv[2] {
    Null(env()->isolate()),
    value
  };
  MakeCallback(env()->oncomplete_string(), arraysize(argv), argv);
}

可以看到最终执行的是MakeCallback,如果读过我之前文章的读者,很容易联想到之前timers API的MakeCallback。这两个MakeCallback其实还是有区别的,区别就在于–FSReqWrap继承自AsyncWrap,而这个MakeCallback的声明在async_wrap-inl.h:

inline v8::MaybeLocal<v8::Value> AsyncWrap::MakeCallback(
    const v8::Local<v8::String> symbol,
    int argc,
    v8::Local<v8::Value>* argv) {
  v8::Local<v8::Value> cb_v = object()->Get(symbol);
  CHECK(cb_v->IsFunction());
  return MakeCallback(cb_v.As<v8::Function>(), argc, argv);
}

最终调用的return MakeCallback(cb_v.As<v8::Function>(), argc, argv);则在async_wrap.cc中:

MaybeLocal<Value> AsyncWrap::MakeCallback(const Local<Function> cb,
                                          int argc,
                                          Local<Value>* argv) {
  EmitTraceEventBefore();
  async_context context { get_async_id(), get_trigger_async_id() };
  MaybeLocal<Value> ret = InternalMakeCallback(
      env(), object(), cb, argc, argv, context);
  EmitTraceEventAfter();
  return ret;
}

InternalMakeCallback不知道大家还有没有印象,之前的文章中曾经介绍过,在node源码粗读(9):nextTick、timers API、MicroTasks注册到执行全阶段解读event-loop阶段章节,有这样一句话:

也正是InternalMakeCallbackInternalCallbackScope::Close使得libuv和v8紧紧的联系在了一起

没错,AsyncWrap::MakeCallback最终调用的还是node::InternalMakeCallback。殊途同归,最终使得整体形成了一个闭环(ps: InternalCallbackScope::Close会继续调用nextTick以及RunMicrotasks)。

by 小菜

原文地址:https://github.com/xtx1130/blog/issues/23,欢迎star和watch,如果有疑问可以在文章下面或者issue中留言。如果文中描述的有错误的地方,还请大神斧正。

回到顶部