精华 node源码详解(六) —— 从server.listen 到事件循环
发布于 9 年前 作者 bigtree9307 9530668 次浏览 最后一次编辑是 8 年前 来自 分享

本作品采用知识共享署名 4.0 国际许可协议进行许可。转载联系作者并保留声明头部与原文链接https://luzeshu.com/blog/nodesource6
本博客同步在https://cnodejs.org/topic/5716137fe84805cd5410ea21
本博客同步在http://www.cnblogs.com/papertree/p/5398008.html


我们在第3-5篇博客讲了js代码如何调用到C++接口的机制,其中暗含的require、process.binding这些过程。

这篇博客以server.listen(80)为例,讲这两点:

  1. js代码深入、作用到libuv事件循环的过程【1.1节的问题2】

  2. libuv事件循环本身的过程【1.1节的问题3】

6.1 js到事件循环 —— 数据结构

6.1.1事件循环的核心数据结构 —— struct uv_loop_s default_loop_struct;

还记得2.2节的流程图吗,js代码里面执行网络io操作,最终保存一个io观察者到default_loop_struct,在node进入事件循环的时候,再获取io观察者进行监听。

来看看struct uv_loop_s 的结构体定义:

6-1-1(3).png <center>图6-1-1</center>

在这篇博客里主要关系的是watcher_queue、watchers、nwatchers、nfds这四个成员。

watcher_queue:io观察者链表,链表原理看6.4节。

watchers:是一个uv__io_t 类型的二级指针。这里维护的是一个io观察者映射表【实际是以fd为下标索引的数组】。

nwatchers:watchers数组的size,因为是堆分配的动态数组,所以需要维护数组的长度。

nfds:监听了多少个fd,不同于nwatchers,因为watchers里面很多元素是空的。

【注:c语言里面经常会有 “typedef struct uv_loop_s uv_loop_t”、“typedef struct uv__io_s uv__io_t”这种写法去给结构体类型起别名,这样的好处是用uv_loop_s去定义一个变量需要加上struct,而通过typedef的别名不用,比如:

struct uv_loop_s default_loop_struct; uv_loop_t default_loop_struct; 这两种写法是一样的。】

6.1.2 io观察者结构体 —— struct uv__io_s

6.1.1中看到,我们的网络io操作最终会封装成一个io观察者,保存到default_loop_struct的io观察者映射表——watchers 里面。

来看一下封装的io观察者的定义:

6-1-2.png <center>图6-1-2</center>

可以看到一个io观察者封装了:

fd:文件描述符,操作系统对进程监听的网络端口、或者打开文件的一个标记

cb:回调函数,当相应的io观察者监听的事件被激活之后,被libuv事件循环调用的回调函数

events:交给libuv的事件循环(epoll_wait)进行监听的事件

6.1.3 持有io观察者的结构体 —— 比如struct uv_tcp_s

io观察者结构体(uv__io_s) 是我们调用server.listen()之后,与libuv事件循环的交互数据。

事件循环数据结构default_loop_struct 维护uv__io_s的映射表 —— watchers成员。

而用户的每一个io操作流程,最终也通过某个结构体来持有这个io观察者。比如当进行tcp的 io操作时,其对应的io观察者,由uv_tcp_s 结构体的 io_watcher成员持有:

6-1-3.png <center>图6-1-3</center>

6.2 js到事件循环 —— 流程

6.1节讲了几个结构体和数据类型。这一节以这几行示例代码,介绍从js代码的io操作到保存io观察者的流程:

var http = require('http');

function requestListener(req, res) {
    res.end('hello world');
}

var server = http.createServer(requestListener);
server.listen(80);

<center>代码6-2-1</center>

其实这里http模块里面做的事情很简单,6-2-1示例代码等效于:

const Server = require('_http_server').Server;

function requestListener(req, res) {
    res.end('hello world');
}

var server = new Server(requestListener);
server.listen(80);

<center>代码6-2-2</center>

面向用户的接口仅仅是一个requestListener回调函数、监听端口,那么调用server.listen(80)之后,经过多少个环节才形成一个io观察者?io观察者的回调函数被调用之后,又经过多少个环节才回调到用户的requestListener?

来看下有多少层:

6.2.1 http层Server类 —— lib/_http_server.js

上述示例代码直接交互的是http Server类,看代码:

6-2-1 (2).png <center>图6-2-1</center>

A. 设置环节 —— requestListener

当用户new Server产生一个server对象时,server添加’request’事件监听器。

B. 回调环节 —— connectionListener

可以看到http层的Server类继承了socket层(net.js)的Server类。并添加’connection’事件监听器,当有连接到来时,由socket层的Server类发射’connection’事件,http层connectionListener被调用,拿到来自socket层的一个socket对象,进行跟http协议相关的处理,把http请求相关的数据封装成req、res两个对象,emit 'request’事件,把req、res传给用户的requestListener回调函数。

6.2.2 socket层Server类 —— lib/net.js

net.Server是负责socket层的Server类,也是http.Server的基类:

6-2-2.png <center>图6-2-2</center>

A. listen环节 —— 'connection’事件

在执行listen操作时,socket层Server类给self._handle.onconnection赋上回调函数。self._handle是更下层的TCP类对象。

6-2-3.png <center>图6-2-3</center>

B. 回调环节 —— onconnection函数

当有连接到来时,底层回调了TCP类的onconnection函数(self._handle.onconnection),并传过来一个clientHandle,onconnection把clientHandle封装成socket对象,并发射’connection’事件,把socket传给上层的connectionListener监听器。

6.2.3 node C++层TCP类 —— src/tcp_wrap.cc

上面说到socket层的Server类与下层的交互是通过this._handle —— TCP类对象。【注意了TCP不是C++本身的类,而是C++用来表示js类的 FunctionTemplate】

6-2-4.png <center>图6-2-4</center>

A. listen环节 —— TCPWrap::OnConnection

看到TCP这一层,执行listen时传给下层的回调函数是TCPWrap::OnConnection,而且可以看到与这一层交互的下一层就是libuv的接口了 —— uv_listen。

B. 回调环节 —— onconnection

上面讲到socket层Server类通过self._handle.onconnection = onconnection去设置回调函数。

这一层可以看到onconnection函数在TCPWrap::OnConnection里面通过tcp_wrap->MakeCallback去回调。

关于MakeCallback的实现在AsyncWrap类 —— TCPWrap的基类:

6-2-5.png <center>图6-2-5</center>

这里有一行重要的代码 env() -> tick_callback_function() -> Call()。里面确保了当每次从C++陷入js领域、执行完js代码之后,会执行到诸如process.nextTick()设置的回调函数。

通过2.2节我们可以知道,执行js代码只有两个时机:

  1. 刚启动的时候执行app.js文件

  2. 异步回调函数被触发(注意回调函数有可能是被同步回调的)

那么这里的AsyncWrap::MakeCallback()就是每次执行js异步回调函数时,从C++域陷入js域的位置。

6.2.4 libuv层 uv_tcp_t结构体 —— deps/uv/src/unix/tcp.c

在app.js里面的server.listen(80),通过http.Server -> net.Server -> TCPWrap,终于到达了libuv层。这一层,我们看到6.1节的数据结构的使用细节。关于io观察者如何被保存、如何被事件循环取出使用的细节,我们看6.3节。

6-2-6.png <center>图6-2-6</center>

看到uv_tcp_listen操作,通过调用uv__io_start 把自身的io_watcher(定义在6.1.2节)注册进tcp->loop(理解成6.1.1节里面的default_loop_struct —— 事件循环的数据结构)。

这里注意到,从上层传过来的cb(TCPWrap::OnConnection)保存在了tcp->connection_cb,而tcp->io_watcher.cb 保存的是 uv__server_io。

当有连接到来时,事件循环直接调用的cb是io_watcher里面的uv__server_io,里面先执行uv__accept等操作,再回调到stream->connection_cb。【注意到右边文件的stream->connection_cb实际上就是左边文件的tcp->connection_cb,uv_stream_t可以理解成uv_tcp_t的一个基类】

6.3 事件循环与io观察者

6.3.1 io观察者的保存

6.2.4节讲到libuv层封装了io观察者,通过uv__io_start,把io观察者保存到指定的事件循环数据结构 —— loop。来看看uv__io_start的细节:

6-3-1.png <center>图6-3-1</center>

这里的loop就是6.1.1节中的事件循环数据结构体,w就是6.1.2节中的io观察者结构体。

可以看到,添加一个io观察者需要两步操作:

  1. 使用QUEUE_INSERT_TAIL 往loop->watcher_queue 添加io观察者,链表原理看6.4节。

  2. 把io观察者保存在loop->watchers中 —— 以fd为索引的数组。loop->watchers实际上类似于映射表的功能,而不是观察者队列。

6.3.2 事件循环的核心 —— io观察者的取出与回调

在2.2节的运行流程中知道事件循环最终调用了uv_run()进入了epoll_wait()等待,而uv_run的这个事件循环是调用了uv__io_poll(),那么来看看这个最终的循环:

6-3-2.png <center>图6-3-2</center>

通过2.2节的运行流程,我们知道在js代码里面添加一个io观察者(比如调用server.listen())是先通过保存io观察者(uv__io_t 结构体)到uv_loop_t结构体的watcher_queue里面,而不是马上注册到epoll_wait()进行监听的。

当js代码执行完毕,进入C++域,再进入到uv__io_poll的时候,就需要这几个步骤:

  1. 遍历 loop->watcher_queue,取出所有io观察者,这里取出的w就是图6-3-1中调用uv__io_start保存的io观察者 —— w。

  2. 取出了w之后,调用epoll_ctl(),把w->fd(io观察者对应的fd)注册给系统的epoll机制,那么epoll_wait()时就监听对应的fd。

  3. 当epoll_wait()返回了,拿出有事件到来的fd,这个时候loop->watchers 映射表就起到作用了,通过fd拿出对应的io观察者 —— w,调用w->cb()。

6.3.3 setTimeout —— epoll_wait的timeout

看到epoll_wait有个timeout参数,这里正是setTimeout的原理。试想一下,epoll_wait所监听的所有io观察者对应的fd都没有事件触发,而setTimeout所设置的timeout到达了,那么epoll_wait()也是需要返回,让setTimeout的回调函数能够得以运行的。

6.4 io观察者链表

注意到4个点:

  1. uv_loop_t 结构体的io观察者链表是void* [2]类型的watcher_queue来维护。

  2. uv__io_t(io观察者) 结构体也拥有一个void* watcher_queue[2]。

  3. 在uv__io_start里面,通过QUEUE_INSERT_TAIL宏,往loop->watcher_queue里面添加w->watcher_queue,而不是w(io观察者本身)。

  4. 在uv__io_poll里面,通过QUEUE_HEAD宏,从loop->watcher_queue里面取出元素 q,这个q事实上只是w->watcher_queue字段,需要通过QUEUE_DATA宏,从q去取出w。

【这跟c语言结构体的内存模型有关,可以通过一个成员的地址减去结构体内成员的偏移量,计算出结构体的在进程空间的内存地址。这也是QUEUE_DATA宏所做的事。】

可以先来看看这几个宏的定义:

6-4-1.png <center>图6-4-1</center>

我们来看看下面这个图,第一个状态是uv_loop_t和两个uv__io_t里的watcher_queue成员执行了QUEUE_ININ之后的状态。

第二、三个状态是依次通过QUEUE_INSERT_TAIL宏往uv_loop_t的watcher_queue里面添加uv__io_t的watcher_queue之后的状态。

io观察者链表 (2).png <center>图6-4-2</center>

16 回复

非常棒的文章,思路很清晰,受教了。

####6.3.3

int uv_run(uv_loop_t* loop, uv_run_mode mode) {
  int timeout;
  int r;
  int ran_pending;

  r = uv__loop_alive(loop);
  if (!r)
    uv__update_time(loop);

  while (r != 0 && loop->stop_flag == 0) {
    uv__update_time(loop);
    uv__run_timers(loop);
    ran_pending = uv__run_pending(loop);
    uv__run_idle(loop);
    uv__run_prepare(loop);

    timeout = 0;
    if ((mode == UV_RUN_ONCE && !ran_pending) || mode == UV_RUN_DEFAULT)
      timeout = uv_backend_timeout(loop);

    /* 按照我的理解,这个timeout主要是保证setTimeout能够及时执行,timer本身并不是epoll驱动的 */
    uv__io_poll(loop, timeout);
    uv__run_check(loop);
    uv__run_closing_handles(loop);

    if (mode == UV_RUN_ONCE) {
      /* UV_RUN_ONCE implies forward progress: at least one callback must have
       * been invoked when it returns. uv__io_poll() can return without doing
       * I/O (meaning: no callbacks) when its timeout expires - which means we
       * have pending timers that satisfy the forward progress constraint.
       *
       * UV_RUN_NOWAIT makes no guarantees about progress so it's omitted from
       * the check.
       */
      uv__update_time(loop);
      uv__run_timers(loop);
    }

    r = uv__loop_alive(loop);
    if (mode == UV_RUN_ONCE || mode == UV_RUN_NOWAIT)
      break;
  }

  /* The if statement lets gcc compile it to a conditional store. Avoids
   * dirtying a cache line.
   */
  if (loop->stop_flag != 0)
    loop->stop_flag = 0;

  return r;
}

uv__io_poll调用我做了注释,有兴趣的话讨论下。

####6.4 libuv的双向链表真是丑陋,不明白他们为什么不用linux的list,linux的更清晰,容易理解。

@coordcn timeout确实是为了setTimeout的定时能够被及时执行,但是“epoll驱动”并不是很明白,因为epoll只是对“很多fd”进行了监听,而那些异步io的异步回调实际上已经被层层封装了~

@bigtree9307

我的意思是epoll的timeout和setTimeout并无直接的关系,即便将uv__io_poll(loop, timeout)注释掉(可以理解为没有注册IO事件,uv__io_poll会立即返回,不会执行epoll相关调用),setTimeout也会执行,epoll的timeout只是保证setTimeout得到及时的执行,不会因为epoll_wait调用阻塞时间过长而过分损失计时精度。

你参考下下uv_backend_timeout(loop)的代码

int uv__next_timeout(const uv_loop_t* loop) {
  const struct heap_node* heap_node;
  const uv_timer_t* handle;
  uint64_t diff;

  heap_node = heap_min((const struct heap*) &loop->timer_heap);
  if (heap_node == NULL)
    /* epoll无事件则一直阻塞直到事件到来  */
    return -1; /* block indefinitely */

  handle = container_of(heap_node, const uv_timer_t, heap_node);
  if (handle->timeout <= loop->time)
    /* epoll有无事件都立即返回 */
    return 0;

  diff = handle->timeout - loop->time;
  if (diff > INT_MAX)
    diff = INT_MAX;

  /* epoll无事件则一直阻塞到过了diff时间后返回 */
  return diff;
}

@coordcn 我明白你的意思了,setTimeout的回调函数被调用确实不在uv__io_poll里面,而是uv_run里面的uv__run_timers,因为我觉得setTimeout的timeout是在epoll_wait()里面做超时判定的,所以也算有点关系,当然跟你说的“只是保证setTimeout得到及时的执行”一个意思~

@bigtree9307 对,我就是这个意思,timer即便没有epoll也照样可以运行的。

@coordcn 代码的 LICENCE 问题。

@yjhjstz 这个还真是个问题。

如果我自己的代码中也用了linux的代码,MIT licence会不会有冲突?

@coordcn 应该还是有问题。libuv 我提 PR 的时候用到了 list.h, 说不行。

@yjhjstz 的确冲突的,用list.h会传染GPL。这个问题还蛮严重的。

一路看到这边,想问下: uv__run_pending()这个函数,处理的是哪一个等待队列的里面的事件呢

@coordcn 将uv__io_poll(loop, timeout)注释掉 其实就是类似uv_run(env->event_loop(), UV_RUN_NOWAIT)的调用方式,不会影响到每一次loop中对uv_run_timers的执行 还有最近看libuv这一块,想请教你一个问题

/* internal.h文件里 */
/* loop */
void uv__run_idle(uv_loop_t* loop);
void uv__run_check(uv_loop_t* loop);
void uv__run_prepare(uv_loop_t* loop);

这三个函数是在loop-watcher.c中类似工厂模式的生成的,我现在有一个疑问,分别是哪些事件会注册到

&loop->idle_handles
&loop->check_handles
&loop->prepare_handles

的queue中呢?

@hyj1991

http://nikhilm.github.io/uvbook/index.html

这个文档里又idle的用法,没有check和prepare的。libuv的test目录里有这两者用法,你自己研究下吧,我也不知道是具体用在哪里的。

感觉上可能和timer是相关的东西。

@coordcn 帮了我大忙哇,thx

如果是在进入第一次Loop之前的JS解析部分,肯定是process.NextTick先执行:

Module.runMain = function () {
	// Load the main module--the command line argument.
	Module._load(process.argv[1], null, true);
	// Handle any nextTicks added in the first tick of the program
	//此时setImmediately刚刚调用uv_check_start注册到loop,而event loop第一次尚未进入
	process._tickCallback();
};

如果是在I/O或者Timer的回调中同时注册的process.nextTick和setImmediately事件,也是process.nextTick先执行。因为两者的回调最终都会调用async-wrap.cc中的MakeCallback方法:

Local<Value> AsyncWrap::MakeCallback(){
	//...
	//回调函数执行
	Local<Value> ret = cb->Call(context, argc, argv);
	//...
	//process._tickCallback函数执行
	env()->tick_callback_function()->Call(process, 0, nullptr);
}

而_tickCallback函数会讲nextTickQueue中的回调函数一次都执行掉,所以依旧是先process…nextTick函数先执行

大神,该如何看懂这个机制啊,c不懂啊

回到顶部