精华 node源码详解(七) —— 文件异步io、线程池【互斥锁、条件变量、管道、事件对象】
发布于 9 年前 作者 bigtree9307 9529665 次浏览 最后一次编辑是 8 年前 来自 分享

本作品采用知识共享署名 4.0 国际许可协议进行许可。转载联系作者并保留声明头部与原文链接https://luzeshu.com/blog/nodesource7
本博客同步在https://cnodejs.org/topic/571618c7e84805cd5410ea26
本博客同步在http://www.cnblogs.com/papertree/p/5405202.html


在上篇博客讲到,网络io通过封装io观察者(uv__io_t),添加到loop->watcher_queue队列。在2.2节中讲到文件异步io不同于网络io,文件异步io把请求操作交给线程池处理,所有线程池的异步io操作统一由一个io观察者来管理,等线程池处理完毕再通过该io观察者告知事件循环(epoll_wait)有异步io操作完成,需要在事件循环的线程执行回调函数。

这篇博客分以下几个部分讲解其中的细节:

1 从文件异步io操作到封装请求交给线程池的过程

2 线程池的原理、相关的系统支持【互斥锁、条件变量】

3 线程池完成io操作后,告知主线程/事件循环的方式 —— 线程池统一的io观察者,及相关的系统支持【管道、事件对象】

4 主线程epoll_wait收到线程池的通知后,回调到文件异步io操作的callback的过程

7.1 文件异步io到线程池

上一篇博客以server.listen(80)为例来讲解网络io,这一篇以fs.writeFile(‘xxx’, function (err, data) {});为例来讲解文件异步io。

js代码到libuv的函数,经历了几个层次(6.2.1节-6.2.4节,“原生js lib模块 -> node C++模块 -> libuv模块”),这几个层次文件io和网络io是类似的,就忽略了。

这里只针对libuv的文件异步io如何封装成请求对象交给线程池。

7.1.1 libuv的文件io请求对象 —— uv_fs_t

看一下libuv的异步读文件代码,deps/uv/src/unix/fs.c:

7-1-1.png <center>图7-1-1</center>

可以看到一次异步文件读操作在libuv层被封装到一个uv_fs_t的结构体,req->cb是来自上层的回调函数(node C++层:src/node_file.cc 的After函数)。

异步io请求最后调用uv__work_submit,把异步io请求提交给线程池。这里有两个函数:

uv__fs_work:这个是文件io的处理函数,可以看到当cb为NULL的时候,即非异步模式,uv__fs_work在当前线程(事件循环所在线程)直接被调用。如果cb != NULL,即文件io为异步模式,此时把uv__fs_work和uv__fs_done提交给线程池。

uv__fs_done:这个是异步文件io结束后的回调函数。在uv__fs_done里面会回调上层C++模块的cb函数(即req->cb)。

<font color=“red”>这里需要注意的是,异步模式下,把uv__fs_work、uv__fs_done当成参数调用uv__work_submit向线程池提交异步io请求,此时io操作的主体 —— uv__fs_work函数是在线程池里执行的。但是uv__fs_done必须在事件循环的线程里被回调,因为这个函数最终会回调到用户js代码的回调函数,而js代码里的所有代码必须在同个线程里面。</font>

7.1.2 线程池的请求对象 —— struct uv__work

来看看uv__work_submit做了什么:

图7-1-2.png <center>图7-1-2</center>

uv__work_submit 把传进来的uv__fs_work、uv__fs_done封装到uv__work结构体里面,这个结构体表示一个线程操作的请求。通过post把请求提交给线程池,post的原理7.2节讲。

看到post函数里面的QUEUE_INSERT_TAIL,把该uv__work对象加进wq链表里面。<font color=“red”>wq是一个全局静态变量。也就是说,进程空间里的所有线程共用同一个wq链表</font>。wq队列的使用在最下面的7.4.2节会用到。

至于通过void* [2]类型的成员变量w->wq去维护一个链表的机制,在6.4节里有介绍。

7.2 线程池的原理 —— 条件变量与互斥锁

7.2.1 条件变量与互斥锁基础

1 互斥锁 —— pthread_mutex_t mutex

系统通过pthread_mutex_t结构、及相关的pthread_mutex_lock()、pthread_mutex_unlock()来对共享资源的请求进行加锁、解锁。

2 条件变量 —— pthread_cond_t condition

系统通过pthread_cond_t结构、及相关的pthread_cond_wait()、pthread_cond_signal()函数来实现线程间等待、通知的机制。

【注意:系统提供的条件变量机制必须结合互斥锁使用,也就是pthread_cond_wait(&condition, &mutex)需要传条件变量与一个互斥体结构,而且pthread_cond_wait之前必须获得互斥锁。其中原因简单来说就是条件变量本身也是需要加锁保护的资源。具体解释可以参考:http://stackoverflow.com/questions/6312342/pthread-cond-wait-and-mutex-requirement】

7.2.2 线程池原理

来看看threadpool.c 文件的几个相关函数:

图7-2-1.png <center>图7-2-1</center>

这里有四个环节:

1 创建工作线程:

这里的init_once函数调用uv_thread_create创建了nthreads数量的工作线程,nthread默认为4。worker为工作线程的执行函数。

看到图7-1-2,有一行uv_once(&once, init_once); 【uv_once对应的系统调用是pthread_once】。该行代码保证了init_once 有且仅被执行一次。在第一次调用uv__work_submit()时会执行一次init_once()。

2 工作线程进入等待:

看到worker线程最终会陷入uv_cond_wait【对应的系统调用是pthread_cond_wait】进行等待,且idle_threads自增。

这里的&cond、&mutex分别是一个全局的静态条件变量、互斥体。

3 提交任务到线程池:

看到post函数通过uv_cond_signal【对应的系统调用是pthread_cond_signal】向相应的条件变量——cond发送信号,处在uv_cond_wait挂起等待的工作线程当中的某个被激活。

worker线程往下执行,从wq取出w(保存的过程见7.1节),执行w->work()(对应7.1节中的uv_fs_work)。

4 通知主线程的事件循环:

工作线程完成任务后,调用uv_async_send通知主线程某个统一的io观察者。这里的机制7.3节讲。

7.3 线程池统一的io观察者 —— 管道、事件对象

7.3.1 管道、事件对象

管道、事件对象都是系统提供的机制,都可以用于线程间发送数据,所以这里可以用于线程间的通知。

1 管道

管道的相关系统调用是pipe()、pipe2()。参考 http://man7.org/linux/man-pages/man2/pipe.2.html

管道会创建两个fd,往fd[1]写数据,那么fd[0]就会收到数据。那么只需要把fd[0]添加到epoll_wait()所监听的io观察者队列里面,在工作线程需要通知的时候往fd[1]写数据,即能在主线程的epoll里面监听其他工作线程任务完成的通知。

2 事件对象

事件对象的相关系统调用是eventfd()、eventfd2()。参考 http://man7.org/linux/man-pages/man2/eventfd.2.html

与管道不同的是eventfd()只会创建一个fd,事件对象的读写都通过这个fd。事件对象内部维护一个counter,往fd写一个8字节的整数,会往counter加,而读的时候会返回counter,如果counter为0,那么读操作会阻塞住(fd为阻塞模式)。而这个fd也是可以交由epoll机制进行监听的,那么也可以达到使用管道一样的目的。

3 使用哪个?

这里libuv创建异步io观察者fd时,优先使用eventfd,如果系统不支持事件对象,就使用管道替代。看一下相关实现:

7-3-1.png <center>图7-3-1</center>

可以看到使用uv__eventfd2返回-1(errno = ENOSYS)时,uv__async_start里面使用管道替代了事件对象。而判断系统是否支持eventfd,是通过__NR_eventfd2宏去判断。

<font color=“red”>这里需要注意的是:使用宏进行判断__NR_eventfd是否defined是在编译期,而uv__async_start的执行是在运行期,也就是说,如果你在不支持事件对象的系统编译之后,在支持事件对象的系统上运行,那么uv__eventfd2始终是返回-1的。</font>

7.3.2 异步io观察者

7.3.2.1 数据结构 —— struct uv__async

在6.1.3节讲了持有io观察者的结构体 uv_tcp_s,6.2.4节讲了网络io操作如何封装成uv_tcp_t结构体、并构造对应的io_watcher,6.3.1和6.4节讲了如何把io_watcher加进uv_loop_t default_loop_struct的watcher_queue队列里。

那么类似于网络io操作的io观察者(uv__io_t io_watcher)由uv_tcp_s结构体来持有,这里要讨论的异步io观察者也是由一个数据结构(struct uv__async)持有的io观察者。通过把持有的io观察者(io_watcher)加进loop->watcher_queue队列,来加进到epoll的观察者队列中。

看到6.1.1节中关于struct uv_loop_s default_loop_struct的截图,发现uv_loop_s里面有个成员 struct uv__async async_watcher。这个就是管理统一异步io观察者的数据结构,一个事件循环结构体(uv_loop_t)有且只有一个。类似于uv_tcp_s。

看一些uv__async的定义,也持有一个uv__io_t io_watcher,还有封装了一个cb:

7-3-2.png <center>图7-3-2</center>

7.3.2.2 异步io观察者的保存与回调

我们知道一个uv_tcp_t的io观察者,是在用户调用了网络io之后,才加进到loop->watcher_queue里面的。那么这个异步io观察者是在node启动时,通过一连串调用node::Start() -> uv_default_loop() -> uv_loop_init() -> uv_async_init() -> uv__async_start(),最终调用uv__io_start(),把loop->async_watcher所持有的io_watcher加进loop->watcher_queue的。uv__async_start()也是创建事件对象/管道的地方,在上图的7-3-1可以看到。

来看一下loop->async_watcher和loop->async_watcher.io_watcher封装的回调函数。

7-3-3.png <center>图7-3-3</center>

可以看到loop->async_watcher.io_watcher->cb 是uv__async_io;

loop->async_watcher.cb 是uv__async_event。

7.2.2节讲到worker线程完成w->work()之后,通过uv_async_send通知异步io观察者,uv_async_send的操作就是往事件对象/管道写东西,那么当io观察者收到数据,uv_run()里面的epoll_wait()返回该io_watcher的fd时,uv__async_io会先被回调,在uv__async_io里面会进而调用uv__async_event。看下代码:

7-3-4.png <center>图7-3-4</center>

uv__aysnc_io里面取出的wa就是loop->async_watcher,所以wa->cb就是uv__async_event。

7.4 线程池异步io之后的回调工作

讲到uv__async_event这一步,我们回想一下此时应该执行什么处理:worker线程执行完了w->work()(其中w是提交线程池的请求结构体 uv__work),然后通知事件循环需要在主线程执行w->done(),而通知的这个过程就是通过 uv_async_send()往管道/事件对象写数据,激活epoll_wait(),根据返回的fd,由loop->watchers映射表拿到异步io观察者 —— loop->async_watcher.io_watcher,然后层层回调到uv__async_event,那么这个时候,我们是否要调用线程池完成了w->work()之后剩余的w->done()?

7.4.1 uv__async_event() 到 uv__work_done()

node里面多次使用void*[2]类型来维护一个链表,loop->async_handles也是。可以看到图6-1-1。那么async_handles保存什么链表呢?

看到图7-3-4,uv__async_event()就是从loop->async_handles链表里,取出struct uv_async_t结构类型的元素h,并调用回调函数h->async_cb()。

再看到图7-3-3,uv_async_init()里面,往loop->async_handles里面添加了struct uv_async_t* t。7.3.2.2节讲到的一系列调用流程有:uv_loop_init() -> uv_async_init(),看下uv_loop_init()调用uv_async_init()的代码:

7-4-1.png <center>图7-4-1</center>

可以看到uv_loop_init()传给uv_async_init()的uv_async_t 是loop->wq_async,而async_cb是uv__work_done。

所以最终异步io观察者被激活之后,主线程回调到了uv__work_done()。uv__work_done在线程池模块(deps/uv/src/threadpool.c)里面。

7.4.2 uv__work_done()

看一下uv__work_done()的代码:

7-4-2.png <center>图7-4-2</center>

在7.1.2节就讲了post()提交请求时,往全局队列wq添加一个uv__work数据结构,那么最终uv__work_done()被调用的时候,从该wq取出所有w,执行w->done(),完成最终的回调。这里的w->done()就是7.1节中提到的fs__work_done()。

注意了,这里的uv__work_done()是在主线程执行的,也就是你的js代码由始至终在同一个线程里面执行。

15 回复

你的一系列文章到现在为止是我看到的最清晰,最准确的nodejs源码解析了。

@LanceHBZhang 同学的系列文章也不错,有兴趣的同学可以对照着看,相互交流,互相促进。

希望社区多一点研究低层的人,知其然,知其所以然。

@coordcn 哈哈谢谢~

@coordcn Screenshot from 2016-04-19 21:45:09.png

你好, 看到你对node的底层比较了解, 麻烦请教个困扰我一段时间的问题. 上面图中是 top -Hp [Node PID] 看到的结果. 里面有 4个 v8 Worker 线程和 4个 额外的 node 线程. 1 我应该如果理解这几个线程和最上面那个 Node 主线程的关系? 2 另外, V8 worker 线程和下面的四个 node 线程有什么不同? 3 后面8个线程的累计运行时间远远小于主线程, 但也不是0. 想知道那些代码会在这些线程上执行.

@Chunlin-Li 7.2节里面的线程池讲的就是这一块,下面四个node线程就是那4个工作线程,里面的worker函数就是工作线程的执行代码,几个worker线程和主线程的交互也是7.3节主要讲的~那4个V8线程是v8的线程,具体v8的代码还没深究,不过和四个node线程肯定不同的,那4个node线程是libuv的worker线程池,负责文件异步io等的操作

@bigtree9307 非常感谢! 我猜测 v8 worker 应该是执行 C++ 代码的线程. 比如使用 C++ Addon 写非IO类的代码, 就会是在 v8 worker 上执行. 但只是猜测. 我也不确定.

@Chunlin-Li 不客气哈哈

那mongodb/mysql驱动里面的js是单线程执行还是多线程执行 ?

@yakczh 这个就需要去看mysql/mongodb里面的实现了,因为node也提供了把任务交给线程池执行的渠道

@Chunlin-Li

V8 WorkerThread 是V8的工作线程,具体做什么的要看V8的源代码,我猜测可能用来做一些耗时的,可能阻塞主线程的工作,具体做什么就要召唤高手来回答了。

https://github.com/nodejs/node/blob/master/deps/v8/include/v8-platform.h 内有注释可参考

https://github.com/nodejs/node/blob/master/src/node.cc 参见 4379行 4311行

四个node线程 @bigtree9707 已经回答了,就是libuv的工作线程,主要用来做一些文件读取等工作的。

@coordcn 谢谢解答.

我个人理解 V8 线程即可以同步处理, 也可以异步处理. 默认调用方式占用 js 主线程在 v8 中执行 c/c++ 代码, 如果通过 AsyncWorker 封装后则是异步执行. 我觉得这些 Async 的 c/c++ 代码就是在 那些 V8 Worker 线程上工作的.

这方面的文章比较少, 抽空我测试一下.

@Chunlin-Li

AsyncWorker是在libuv的线程池上运行的,就是你看到的四个node线程。

V8 WorkerThread是V8建立的,跟AsyncWorker是不一样的。

https://github.com/nodejs/node/tree/master/deps/v8/src/libplatform 目录里的文件worker-thread.cc是具体实现,default-platform.cc对worker-thread.cc进行的包装,node.cc第4379行可以看到platform创建和初始化。

https://github.com/nodejs/node/blob/master/deps/v8/src/heap/mark-compact.cc 515行使用了,说明这些线程跟堆清理相关的,V8将GC的部分工作交给了别的线程来完成。

https://github.com/nodejs/node/blob/master/deps/v8/src/optimizing-compile-dispatcher.cc 282行使用了,说明这些线程还被用来做编译深度优化工作。

@coordcn

高人, 受教了! 非常感谢!

有没有其他node Socket 多线程内核的实现 , 原生node ab 1000并发就垮了

回到顶部