Node.js 是如何异步判断文件是否存在?
发布于 10 天前 作者 lolBig 537 次浏览 来自 分享

通常我们在讨论 Node.js 的时候都会涉及到异步这个特性。实际上 Node.js 在执行异步调用的时候,不同的场景下有着不同的处理方式。本文将通过 libuv 源码来分析 Node.js 是如何通过 libuv 的线程池完成异步调用。本文描述的 Node.js 版本为 v11.15.0,libuv 版本为 1.24.0

以下面的代码为例,它通过调用 fs.access 来异步地判断文件是否存在并在回调中打印日志,在 Node.js 中这是一个典型的异步调用。

const fs = require('fs')
const cb = function (err) {
  console.log(`Is myfile exists: ${!err}`)
}
fs.access('myfile', cb)

在分析上面这段代码的调用过程之前,我们先来了解一些 libuv 概念。

什么类型的请求 libuv 会把它放到线程池去执行

主动通过 libuv 发起的操作被 libuv 称为请求( uv_req_t ),libuv 的线程池作用于以下 4 种枚举的异步请求:

其它的 UV_CONNECTUV_WRITEUDP_SEND 等则并不会通过线程池去执行。

线程池请求分类

这 4 种枚举请求 libuv 内部把它们分为 3 种任务类型( uv__work_kind ):

  • UV__WORK_CPU:CPU 密集型,UV_WORK 类型的请求被定义为这种类型。因此根据这个分类,不推荐在 uv_queue_work 中做 I/O 密集的操作。
  • UV__WORK_FAST_IO:快 IO 型,UV_FS 类型的请求被定义为这种类型。
  • UV__WORK_SLOW_IO:慢 IO 型,UV_GETADDRINFOUV_GETNAMEINFO 类型的请求被定义为这种类型。

UV__WORK_SLOW_IO 执行不同于 UV__WORK_CPUUV__WORK_FAST_IO ,libuv 执行它的时候流程会有些差异,这个后面会提到。

线程池是如何初始化的

libuv 通过init_threads 函数初始化线程池,初始化时会根据一个名为 UV_THREADPOOL_SIZE 的环境变量来初始化内部线程池的大小,线程最大数量为 128 ,默认为 4 。如果以单进程的架构去部署服务,可以根据服务器 CPU 的核心数量及业务情况来设置线程池大小,达到资源利用的最大化。uv loop 线程在创建 worker 线程时,会初始化以下变量:

  • 信号量 sem:在创建线程时与线程进行同步,每个线程创建好后将会通过这个信号量告知 uv loop 线程自己已经初始化完毕,可以开始处理请求了。当所有线程都初始化完成后这个信号量将被销毁,即完成线程池的初始化。
  • 条件变量 cond:线程创建完成后通过这个条件变量进入阻塞状态( uv_cond_wait ),直到其它线程通过 uv_cond_signal 将其唤醒。
  • 互斥量 mutex:对下面 3 个临界资源进行互斥访问。
  • 请求队列 wq:线程池收到 UV__WORK_CPUUV__WORK_FAST_IO 类型的请求后将其插到此队列的尾部,并通过 uv_cond_signal 唤醒 worker 线程去处理,这是线程池请求的主队列。
  • 慢 I/O 队列 slow_io_pending_wq:线程池收到 UV__WORK_SLOW_IO 类型的请求后将其插到此队列的尾部。
  • 慢 I/O 标志位节点 run_slow_work_message:当存在慢 I/O 请求时,用来作为一个标志位放在请求队列 wq 中,表示当前有慢 I/O 请求,worker 线程处理请求时需要关注慢 I/O 队列的请求;当慢 I/O 队列的请求都处理完毕后这个标志位将从请求队列 wq 中移除。

worker 线程的入口函数均为 worker 函数,这个我们后面再说。 init_threads 实现如下:

static void init_threads(void) {
  unsigned int i;
  const char* val;
  uv_sem_t sem;

  // 6-23 行初始化线程池大小
  nthreads = ARRAY_SIZE(default_threads);
  val = getenv("UV_THREADPOOL_SIZE"); // 根据环境变量设置线程池大小
  if (val != NULL)
    nthreads = atoi(val);
  if (nthreads == 0)
    nthreads = 1;
  if (nthreads > MAX_THREADPOOL_SIZE)
    nthreads = MAX_THREADPOOL_SIZE;

  threads = default_threads;
  if (nthreads > ARRAY_SIZE(default_threads)) {
    threads = uv__malloc(nthreads * sizeof(threads[0]));
    if (threads == NULL) {
      nthreads = ARRAY_SIZE(default_threads);
      threads = default_threads;
    }
  }
  // 初始化条件变量
  if (uv_cond_init(&cond))
    abort();

  // 初始化互斥量
  if (uv_mutex_init(&mutex))
    abort();

  // 初始化队列和节点
  QUEUE_INIT(&wq); // 工作队列
  QUEUE_INIT(&slow_io_pending_wq); // 慢 I/O 队列
  QUEUE_INIT(&run_slow_work_message); // 如果有慢 I/O 请求,将此节点作为标志位插入到 wq 中

  // 初始化信号量
  if (uv_sem_init(&sem, 0))
    abort(); // 后续线程同步需要依赖这个信号量,因此这个信号量创建失败了则终止进程

  // 创建 worker 线程
  for (i = 0; i < nthreads; i++)
    if (uv_thread_create(threads + i, worker, &sem)) // 初始化 worker 线程
      abort(); // woker 线程创建错误原因为 EAGAIN、EINVAL、EPERM 其中之一,具体请参考 man3
  
  // 等待 worker 创建完成
  for (i = 0; i < nthreads; i++)
    uv_sem_wait(&sem); // 等待 worker 线程创建完毕

  // 回收信号量资源
  uv_sem_destroy(&sem);
}

请求是如何放到线程池去执行的

libuv 有两个函数可以创建多线程请求:

uv__work_submit 函数主要做 2 件事:

  1. 调用 init_threads 初始化线程池,因为线程池的创建是惰性的,只有用到的时候才会创建。
  2. 调用内部的 post 函数将请求插入到请求队列中。

实现如下:

void uv__work_submit(uv_loop_t* loop,
                     struct uv__work* w,
                     enum uv__work_kind kind,
                     void (*work)(struct uv__work* w),
                     void (*done)(struct uv__work* w, int status)) {
  // 在收到请求后才开始初始化线程池,但是只会初始化一次
  uv_once(&once, init_once);
  w->loop = loop;
  w->work = work;
  w->done = done;
  post(&w->wq, kind);
}

static void init_once(void) {
  // fork 后子进程的 mutex 、condition variables 等 pthread 变量的状态是父进程 fork 时的复制,所以子进程创建时需要重置状态
  // 具体请参考 http://man7.org/linux/man-pages/man2/fork.2.html
  if (pthread_atfork(NULL, NULL, &reset_once))
    abort();
  // 初始化线程池
  init_threads();
}

static void reset_once(void) {
  // 重置 once 变量
  uv_once_t child_once = UV_ONCE_INIT;
  memcpy(&once, &child_once, sizeof(child_once));
}

post 函数主要做 2 件事:

  1. 判断请求的请求类型是否是 UV__WORK_SLOW_IO
    • 如果是,将这个请求插到慢 I/O 请求队列 slow_io_pending_wq 的尾部,同时在请求队列 wq 的尾部插入一个 run_slow_work_message 节点作为标志位,告知请求队列 wq 当前存在慢 I/O 请求。
    • 如果不是,将请求插到请求队列 wq 尾部。
  2. 如果有空闲的线程,唤醒某一个去执行请求。

并发的慢 I/O 的请求数量不会超过线程池大小的一半,这样做的好处是避免多个慢 I/O 的请求在某段时间内把所有线程都占满,导致其它能够快速执行的请求需要排队。

post 函数实现如下:

static void post(QUEUE* q, enum uv__work_kind kind) {
  // 加锁
  uv_mutex_lock(&mutex);
  if (kind == UV__WORK_SLOW_IO) {
    /* 插入到慢 I/O 队列中 */
    QUEUE_INSERT_TAIL(&slow_io_pending_wq, q);
    /* 如果 run_slow_work_message 节点不为空代表其已在 wq 队列中,无需再次插入 */
    if (!QUEUE_EMPTY(&run_slow_work_message)) {
      uv_mutex_unlock(&mutex);
      return;
    }
    // 不在 wq 队列中则将 run_slow_work_message 作为标志位插到 wq 尾部
    q = &run_slow_work_message;
  }
  // 将请求插到请求队列尾部
  QUEUE_INSERT_TAIL(&wq, q);
  // 如果有空闲的线程,唤醒某一个去执行请求
  if (idle_threads > 0)
    uv_cond_signal(&cond); // 唤醒一个 worker 线程
  uv_mutex_unlock(&mutex);
}

worker 线程的入口函数 worker 在线程创建好并初始化完成后将按照下面的步骤不断的循环:

  1. 等待唤醒。
  2. 取出请求队列 wq 或者慢 I/O 请求队列的头部请求去执行。
  3. 通知 uv loop 线程完成了一个请求的处理。
  4. 回到 1 。
static void worker(void* arg) {
  struct uv__work* w;
  QUEUE* q;
  int is_slow_work;

  // 通知 uv loop 线程此 worker 线程已创建完毕
  uv_sem_post((uv_sem_t*) arg);
  arg = NULL;

  uv_mutex_lock(&mutex);
  // 通过这个死循环来不断的执行请求
  for (;;) {
    /*
    	这个 while 有2个判断
    	1. 在多核处理器下,pthread_cond_signal 可能会激活多于一个线程,通过一个 while 来避免这种情况导致的问题,具体请参考 https://linux.die.net/man/3/pthread_cond_signal
    	2. 限制慢 I/O 请求的数量小于线程数量的一半
    */
    while (QUEUE_EMPTY(&wq) ||
           (QUEUE_HEAD(&wq) == &run_slow_work_message &&
            QUEUE_NEXT(&run_slow_work_message) == &wq &&
            slow_io_work_running >= slow_work_thread_threshold())) {
      idle_threads += 1;
      // worker 线程初始化完成或没有请求执行时进入阻塞状态,直到被新的请求唤醒
      uv_cond_wait(&cond, &mutex);
      idle_threads -= 1;
    }
    // 唤醒并且达到执行请求的条件后取出队列头部的请求
    q = QUEUE_HEAD(&wq);
    // 如果头部请求是退出,则跳出循环,结束 worker 线程
    if (q == &exit_message) {
      // 继续唤醒其它 worker 去结束线程
      uv_cond_signal(&cond);
      uv_mutex_unlock(&mutex);
      break;
    }

    // 将这个请求节点从请求队列 wq 中移除
    QUEUE_REMOVE(q);
    QUEUE_INIT(q);

    is_slow_work = 0;
    // 如果这个请求是慢 I/O 的标志位
    if (q == &run_slow_work_message) {
      /* 控制慢 I/O 请求数量,超过则插到队列尾部,等待前面的请求执行完 */
      if (slow_io_work_running >= slow_work_thread_threshold()) {
        QUEUE_INSERT_TAIL(&wq, q);
        continue;
      }

      /* 判断慢 I/O 请求队列中是否有请求,请求有可能被取消 */
      if (QUEUE_EMPTY(&slow_io_pending_wq))
        continue;

      is_slow_work = 1;
      slow_io_work_running++;

      // 取出慢 I/O 请求队列中头部的请求
      q = QUEUE_HEAD(&slow_io_pending_wq);
      QUEUE_REMOVE(q);
      QUEUE_INIT(q);

      // 如果慢 I/O 请求队列中还有请求,则将 run_slow_work_message 这个标志位重新插到请求队列 wq 的尾部
      if (!QUEUE_EMPTY(&slow_io_pending_wq)) {
        QUEUE_INSERT_TAIL(&wq, &run_slow_work_message);
        if (idle_threads > 0)
          uv_cond_signal(&cond); // 唤醒一个线程继续执行
      }
    }

    uv_mutex_unlock(&mutex);

    w = QUEUE_DATA(q, struct uv__work, wq);
    // 上面处理了这多,终于在这里开始执行请求的函数了
    w->work(w);

    uv_mutex_lock(&w->loop->wq_mutex);
    w->work = NULL;
    
    // 为保证线程安全,请求执行完后不会立即回调请求,而是将完成的请求插到已完成的请求队列中,在uv loop 线程完成回调
    QUEUE_INSERT_TAIL(&w->loop->wq, &w->wq);
    // 通过 uv_async_send 同步 uv loop 线程:线程池完成了一个请求
    uv_async_send(&w->loop->wq_async);
    uv_mutex_unlock(&w->loop->wq_mutex);

    uv_mutex_lock(&mutex);
    if (is_slow_work) {
      slow_io_work_running--;
    }
  }
}

请求在 worker 执行完后是如何同步 uv loop 所在的线程

uv_loop_init 时,线程池的 wq_async(uv_async_t) 句柄通过 uv_async_init 初始化并插入到 uv loop 的 async_handles 队列中,然后在 uv loop 线程中遍历 async_handles 队列并完成回调。

worker 线程 和 uv loop 线程通过 uv_async_send 进行同步,而uv_async_send 只做了一件事:向 async_wfd 句柄写了一个长度为 1 个字节的字符串(只有 \0 这个字符)。

uv_async_send 实现如下:

int uv_async_send(uv_async_t* handle) {
  if (ACCESS_ONCE(int, handle->pending) != 0)
    return 0;
  // cmpxchgi 函数设置标志位,如果已经设置过则不会重复调用 uv__async_send
  if (cmpxchgi(&handle->pending, 0, 1) == 0)
    uv__async_send(handle->loop);

  return 0;
}

static void uv__async_send(uv_loop_t* loop) {
  const void* buf;
  ssize_t len;
  int fd;
  int r;

  buf = "";
  len = 1;
  fd = loop->async_wfd;

#if defined(__linux__)
  if (fd == -1) {
    static const uint64_t val = 1;
    buf = &val;
    len = sizeof(val);
    fd = loop->async_io_watcher.fd;  /* eventfd */
  }
#endif

  do
    r = write(fd, buf, len); // 向 fd 写入内容
  while (r == -1 && errno == EINTR);

  if (r == len)
    return;

  if (r == -1)
    if (errno == EAGAIN || errno == EWOULDBLOCK)
      return;

  abort();
}

async_wfd 写内容为什么能做到同步呢?实际上在 worker 线程对 async_wfd 写入时,uv loop 线程同时也在不断的循环去接收处理各种各样的事件或请求,其中就包括对 async_wfd 可读事件的监听。

uv loop 是在 uv_run 函数中执行的,它在 Node.js 启动时 被调用, uv_run 实现如下:

int uv_run(uv_loop_t* loop, uv_run_mode mode) {
  int timeout;
  int r;
  int ran_pending;

  r = uv__loop_alive(loop);
  if (!r)
    uv__update_time(loop);

  while (r != 0 && loop->stop_flag == 0) {
    // 更新计时器时间
    uv__update_time(loop);
    // 回调超时的计时器,setTimeout、setInterval 都是由这个函数回调
    uv__run_timers(loop);
    // 处理某些没有在 uv__io_poll 完成的回调
    ran_pending = uv__run_pending(loop);
    // 官方解释:Idle handle is needed only to stop the event loop from blocking in poll.
    // 实际上 napi 中某些函数比如 napi_call_threadsafe_function 会往 idle 队列中插入回调,然后在这个阶段执行
    uv__run_idle(loop);
    // process._startProfilerIdleNotifier 的回调
    uv__run_prepare(loop);

    timeout = 0;
    if ((mode == UV_RUN_ONCE && !ran_pending) || mode == UV_RUN_DEFAULT)
      timeout = uv_backend_timeout(loop); // 计算 uv__io_poll 超时时间,算法请参考 https://github.com/libuv/libuv/blob/v1.24.0/src/unix/core.c#L318

    // 对 async_wfd 可读的监听在 uv__io_poll 这个函数中
    // 第二个参数 timeout 为上面计算出来,用来设置 epoll_wait 等函数等待 I/O 事件的时间
    uv__io_poll(loop, timeout);
    // setImmediate 的回调
    // ps: 个人觉得从实现上讲 setImmediate 和 nextTick 应该互换名字 :-)
    uv__run_check(loop);
    // 关闭句柄是个异步操作
    // 一般结束 uv loop 时会先调用 uv_walk 遍历所有句柄并关闭它们,然后再执行一次 uv loop 通过这个函数来完成关闭,最后再调用 uv_loop_close,否则的话会出现内存泄露
    uv__run_closing_handles(loop);

    if (mode == UV_RUN_ONCE) {
      /* UV_RUN_ONCE implies forward progress: at least one callback must have
       * been invoked when it returns. uv__io_poll() can return without doing
       * I/O (meaning: no callbacks) when its timeout expires - which means we
       * have pending timers that satisfy the forward progress constraint.
       *
       * UV_RUN_NOWAIT makes no guarantees about progress so it's omitted from
       * the check.
       */
      uv__update_time(loop);
      uv__run_timers(loop);
    }

    r = uv__loop_alive(loop);
    if (mode == UV_RUN_ONCE || mode == UV_RUN_NOWAIT)
      break;
  }

  /* The if statement lets gcc compile it to a conditional store. Avoids
   * dirtying a cache line.
   */
  if (loop->stop_flag != 0)
    loop->stop_flag = 0;

  return r;
}

可以看到 uv loop 里面其实就是在不断的循环去更新计时器、处理各种类型的回调、轮询 I/O 事件,Node.js 的异步便是通过 uv loop 完成的。

libuv 的异步采用的是 Reactor 模型进行多路复用,在 uv__io_poll 中去处理 I/O 相关的事件, uv__io_poll 在不同的平台下通过 epollkqueue 等不同的方式实现。所以当往 async_wfd 写入内容时,在 uv__io_poll 中将会轮询到 async_wfd 可读的事件,这个事件仅仅是用来通知 uv loop 线程: 非 uv loop 线程有回调需要在 uv loop 线程执行。

当轮询到 async_wfd 可读后,uv__io_poll 会回调对应的函数 uv__async_io,它主要做了下面 2 件事:

  1. 读取数据,确认是否有 uv_async_send 调用,数据内容并不关心。
  2. 遍历 async_handles 句柄队列 ,判断是否有事件,如果有的话执行它的回调。

实现如下:

static void uv__async_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
  char buf[1024];
  ssize_t r;
  QUEUE queue;
  QUEUE* q;
  uv_async_t* h;

  assert(w == &loop->async_io_watcher);

  // 这个 for 循环用来确认是否有 uv_async_send 调用
  for (;;) {
    r = read(w->fd, buf, sizeof(buf));

    if (r == sizeof(buf))
      continue;

    if (r != -1)
      break;

    if (errno == EAGAIN || errno == EWOULDBLOCK)
      break;

    if (errno == EINTR)
      continue;

    abort();
  }
 
  // 交换 loop->async_handle 和 queue内容,避免在遍历 loop->async_handles 时有新的 async_handle 插入到队列
  // loop->async_handles 队列中除了线程池的句柄还有其它的
  QUEUE_MOVE(&loop->async_handles, &queue);
  while (!QUEUE_EMPTY(&queue)) {
    q = QUEUE_HEAD(&queue);
    h = QUEUE_DATA(q, uv_async_t, queue);

    QUEUE_REMOVE(q);
    // 将 uv_async_t 重新插入到 loop->async_handles 中,uv_async_t 需要手动调用 uv__async_stop 才会从队列中移除
    QUEUE_INSERT_TAIL(&loop->async_handles, q);

    // 确认这个 async_handle 是否需要回调
    if (cmpxchgi(&h->pending, 1, 0) == 0)
      continue;

    if (h->async_cb == NULL)
      continue;

    // 调用通过 uv_async_init 初始化 uv_async_t 时绑定的回调函数
    // 线程池的 uv_async_t 是在 uv_loop_init 时初始化的,它绑定的回调是 uv__work_done
    // 因此如果 h == loop->wq_async,这里 h->async_cb 实际是调用了 uv__work_done(h);
    // 详情请参考 https://github.com/libuv/libuv/blob/v1.24.0/src/unix/loop.c#L88
    h->async_cb(h);
  }
}

调用线程池的 h->async_cb 后会回到线程池的 uv__work_done 函数:

void uv__work_done(uv_async_t* handle) {
  struct uv__work* w;
  uv_loop_t* loop;
  QUEUE* q;
  QUEUE wq;
  int err;

  loop = container_of(handle, uv_loop_t, wq_async);
  uv_mutex_lock(&loop->wq_mutex);
  // 清空已完成的 loop->wq 队列
  QUEUE_MOVE(&loop->wq, &wq);
  uv_mutex_unlock(&loop->wq_mutex);

  while (!QUEUE_EMPTY(&wq)) {
    q = QUEUE_HEAD(&wq);
    QUEUE_REMOVE(q);

    w = container_of(q, struct uv__work, wq);
    // 如果在回调前调用了 uv_cancel 取消请求,则即使请求已经执行完,依旧算出错
    err = (w->work == uv__cancelled) ? UV_ECANCELED : 0;
    w->done(w, err);
  }
}

最后通过 w->done(w, err) 回调 uv__fs_done,并由 uv__fs_done 回调 JS 函数:

static void uv__fs_done(struct uv__work* w, int status) {
  uv_fs_t* req;

  req = container_of(w, uv_fs_t, work_req);
  uv__req_unregister(req->loop, req);

  // 如果取消了则抛出异常
  if (status == UV_ECANCELED) {
    assert(req->result == 0);
    req->result = UV_ECANCELED;
  }

  // 回调 JS
  req->cb(req);
}

以上就是 libuv 是线程池从创建到执行多线程请求的过程。

fs.access 调用过程分析

再回到文章开头提到的代码,我们来分析它的调用过程。

const fs = require('fs')
const cb = function (err) {
  console.log(`Is myfile exists: ${!err}`)
}
fs.access('myfile', cb)

假设线程池大小为 2 ,下面描述了执行 fs.access 时 3 个线程的状态(略过了 Node.js 启动和 JavaScript 和 Native 函数调用过程),时间轴从上到下:

空白代表处于阻塞状态,-代表线程尚未启动

uv loop thread worker thread 1 worker thread 2
fs.access(‘myfile’, cb) - -
JavaScript 通过 v8 调用 Native 函数 - -
uv_fs_access - -
uv__work_submit - -
init_threads worker worker
uv_sem_wait uv_sem_post uv_sem_post
uv_cond_wait uv_cond_wait
uv_cond_signal
uv__io_poll access
uv__io_poll
uv__io_poll uv_async_send
uv__io_poll uv_cond_wait
uv__io_poll
uv__async_io
uv__work_done
uv__fs_done
Native 通过 v8 回调 JavaScript 函数
cb
console.log(`Is myfile exists: ${exists}`)

可以看到调用过程如下:

  1. 通过 Node.js 启动时对 JavaScript 函数与 Native 函数的绑定,fs.access 最终会进入到 Native 函数中,而 Native 函数会调用 libuv 的 uv_fs_access 函数来判断文件是否可以访问。(这里略过 JavaScript 如何通过 v8 调用 Native 函数)
  2. uv_fs_access 在 uv loop 线程向线程池提交了一个多线程请求。
  3. 由于线程池是惰性的,在执行请求前,先进行了初始化线程池的操作。
  4. 线程池初始化完成后唤醒了 worker thread 1 去执行请求,同时 uv loop 线程不断的轮询是否完成了请求。
  5. worker thread 1 同步的调用 access 函数判断目标文件是否可读。
  6. access 函数完成后, worker thread 1 通过 uv_async_send 同步 uv loop 线程请求已完成,同时自身进入阻塞状态,等待新的请求将其唤醒。
  7. uv loop 线程发现请求执行完成后通过一系列回调回到 uv__fs_done
  8. uv__fs_done 回调 JavaScript 函数打印日志。(这里略过 uv__fs_done 是如何通过 v8 回调到 JavaScript)

整个过程由于没有新的请求进来, worker thread 2 始终处于阻塞状态。

结束语

通过对 fs.access 的调用过程分析,我们了解了 libuv 是如何通过线程池进行异步调用的。另外也可以看到针对不同的平台,libuv 对 uv__io_poll 的实现是不同的,后面我们将介绍 uv__io_poll 实现异步 I/O 的方式。

1 回复

先码后看, 看起来很吊

回到顶部