Node.js代码阅读笔记之libeio
发布于 12 年前 作者 coding 10760 次浏览 最后一次编辑是 8 年前

<p>近日读node.js代码,随手记下了一些,不对的地方还请大家指正~</p>

<p>在cnodejs里发帖,有些字符有特殊含义,比如下划线“_”会标识斜体。。。(恕我愚钝) 看不清楚可以看这里: <a href=“http://www.codingguy.net/?p=195”>http://www.codingguy.net/?p=195</a></p>

<hr/>

<p>这个库全称貌似为Enhanced IO,用多线程实现了异步IO操作,为什么不用libev?因为libev用epoll(linux平台),不支持regular file。没错,libeio就是给node.js的fs模块用的。</p>

<p><strong>从demo.c看如何使用</strong></p>

<p>翻开代码,有个demo.c,操作很多,常用文件io都包括了。精简一下代码,得到一个minidemo.c:</p>

<pre><code>int respipe [2];

void want_poll (void) { char dummy; printf (“want_poll ()\n”); write (respipe [1], &dummy, 1); }

void done_poll (void) { char dummy; printf (“done_poll ()\n”); read (respipe [0], &dummy, 1); }

int res_cb (eio_req *req) { printf (“res_cb(%d|%s) = %d\n”, req->type, req->data ? req->data : “?”, EIO_RESULT (req)); if (req->result < 0) abort (); return 0; }

void event_loop (void) { // an event loop. yeah. struct pollfd pfd; pfd.fd = respipe [0]; pfd.events = POLLIN; printf ("\nentering event loop\n"); while (eio_nreqs ()) { poll (&pfd, 1, -1); printf (“eio_poll () = %d\n”, eio_poll ()); } printf (“leaving event loop\n”); }

int main (void) { printf (“pipe ()\n”); if (pipe (respipe)) abort (); printf (“eio_init ()\n”); if (eio_init (want_poll, done_poll)) abort (); eio_mkdir (“eio-test-dir”, 0777, 0, res_cb, “mkdir”); event_loop (); } </code></pre>

<p>这个demo很简单,大体流程如下: <img src=“http://www.codingguy.net/wp-content/uploads/2012/02/demo.png” alt=“enter image description here” title="" /></p>

<ul> <li>创建pipe</li> <li>初始化eio,注册want<em>poll与done</em>poll</li> <li>发出异步操作:mkdir</li> <li>启动event loop</li> </ul>

<p>第一次看到这段代码,往往被pipe搞迷糊,为什么创建pipe呢?先不管为什么,我们先看是怎么用的。</p>

<p>创建匿名管道后,fd放在respipe数组中,respipe[ 0 ]用于读pipe,respipe[ 1 ]用于写pipe。在want<em>poll和done</em>poll函数中,分别对pipe进行写和读。而want<em>poll与done</em>poll则通过eio_init注册,即,赋值到全局变量中。</p>

<p>接着发出异步操作,创建一个目录eio<em>mkdir(……),这里的res</em>cb是操作完成之后的回调函数。</p>

<p>最后启动event loop。在这个函数中,有个大循环,eio<em>nreqs()表明尚未完成的请求数量,如果有未完成的请求,则poll respipe[0],即等待可读,若可读则调用eio</em>poll取回结果。eio<em>poll的执行也会回调res</em>cb函数。</p>

<p>关于前面提到的pipe的作用,这与want<em>poll与done</em>poll机制有关。want<em>poll在响应队列第一次装入请求包的时候被调用;done</em>poll在响应队列为空的时候被调用。want<em>poll向pipe写入数据、done</em>poll从pipe读出数据,读之前通过poll来等待数据可读,若可读,就说明有want<em>poll调用,有异步请求被处理。接下来就可以调用eio</em>poll去接收结果并做回调了。这样,起到了类似锁的作用,同步了生产者消费者类型的资源访问,避免了无意义的循环空转,节约处理器cycle。</p>

<p><strong>小窥源代码</strong></p>

<p>看过demo,我们了解了hello world般的流程。但为了清楚eio接口函数的细节,以及异步io的实现方式,我们要继续读eio.[ch]的代码。话说eio的代码文件并不多,简单介绍一下名字:</p>

<ul> <li>ecb.h</li> </ul>

<p>这个头文件叫libecb(只有一个ecb.h的lib),全称为The e compiler builtins header/library。它把gcc相关的许多功能封装起来,并兼容不同的gcc版本。比如<em>_ attribute _</em>(attrlist)这些编译器相关的属性,可以用ecb_attribute(attrlist)这个宏来写,且不用考虑编译器版本(3.1以上才支持这个玩意)。</p>

<ul> <li>eio.[ch]</li> </ul>

<p>就这么一对儿c文件和h文件,先不解释了</p>

<p>下面翻开源代码,从demo中出现的几个接口函数开始,分析一下eio的工作流程。</p>

<p><strong>一切的开始:eio_init</strong></p>

<p>使用eio,第一步就是调用eio<em>init做一堆初始化。初始化mutex、队列、各种计数器,还有注册want</em>poll与done_poll回调函数,这两个函数是由用户提供的。</p>

<pre><code>X_MUTEX_CREATE (wrklock); X_MUTEX_CREATE (reslock); X_MUTEX_CREATE (reqlock); X_COND_CREATE (reqwait); reqq_init (&req_queue); reqq_init (&res_queue); wrk_first.next = wrk_first.prev = &wrk_first; started = 0; idle = 0; nreqs = 0; nready = 0; npending = 0; want_poll_cb = want_poll; done_poll_cb = done_poll; </code></pre>

<p><strong>业务逻辑的入口:eio_xxx</strong></p>

<p>初始化之后,即可调用eio的异步io函数了。我们以前面demo中创建目录为例,调用eio_mkdir(…)函数。如其他异步io函数,函数体非常简洁,但是逻辑并不简洁:</p>

<pre><code>eio_req *eio_mkdir (const char *path, eio_mode_t mode, int pri, eio_cb cb, void *data) { REQ (EIO_MKDIR); PATH; req->int2 = (long)mode; SEND; } </code></pre>

<p>真是那各种宏啊,淡定一下,找找定义:</p>

<pre><code>#define REQ(rtype)
eio_req *req;
req = (eio_req *)calloc (1, sizeof *req);
if (!req)
return 0;
req->type = rtype;
req->pri = pri;
req->finish = cb;
req->data = data;
req->destroy = eio_api_destroy; </code></pre>

<p>这个宏创建了一个eio_req变量,注意是堆空间哦。然后根据rtype,也就是io操作的类型,对其赋值。那些cb等参数都不是宏里面的,从使用场景可以看出是eio _mkdir的参数。接下来是PATH,同样找到定义如下:</p>

<pre><code>#define PATH
req->flags |= EIO_FLAG_PTR1_FREE;
req->ptr1 = strdup (path);
if (!req->ptr1)
{
eio_api_destroy (req);
return 0;
} </code></pre>

<p>文件io操作嘛,自然要有path。接下来又填写了mode。以上这些操作都是为了构建一个eio<em>req变量。这个eio</em>req的结构定义如下:</p>

<pre><code>struct eio_req { eio_req volatile next; / private ETP / eio_ssize_t result; / result of syscall, e.g. result = read (… / off_t offs; / read, write, truncate, readahead, sync_file_range, fallocate: file offset, mknod: dev_t / size_t size; / read, write, readahead, sendfile, msync, mlock, sync_file_range, fallocate: length */ void ptr1; / all applicable requests: pathname, old name; readdir: optional eio_dirents */ void ptr2; / all applicable requests: new name or memory buffer; readdir: name strings / eio_tstamp nv1; / utime, futime: atime; busy: sleep time / eio_tstamp nv2; / utime, futime: mtime / int type; / EIO_xxx constant ETP / int int1; / all applicable requests: file descriptor; sendfile: output fd; open, msync, mlockall, readdir: flags / long int2; / chown, fchown: uid; sendfile: input fd; open, chmod, mkdir, mknod: file mode, sync_file_range, fallocate: flags / long int3; / chown, fchown: gid / int errorno; / errno value on syscall return / #if __i386 || __amd64 unsigned char cancelled; #else sig_atomic_t cancelled; #endif unsigned char flags; / private / signed char pri; / the priority */ void *data; eio_cb finish; void (*destroy)(eio_req req); / called when request no longer needed */ void (*feed)(eio_req req); / only used for group requests */ EIO_REQ_MEMBERS eio_req *grp, *grp_prev, *grp_next, grp_first; / private */ }; </code></pre>

<p>初始化了一个新的eio<em>req,接下只用了一个SEND完成其余工作,SEND定义很简单,只是调用了eio</em>submit函数:</p>

<pre><code>#define SEND eio_submit (req); return req </code></pre>

<p><strong>异步IO的抽象:eio_submit</strong></p>

<p>eio<em>submit只是etp</em>submit的一个封装:</p>

<pre><code>void eio_submit (eio_req *req) { etp_submit (req); } </code></pre>

<p>这个etp_submit几乎是所有io操作的入口,其中的关键代码如下:</p>

<pre><code>X_LOCK (reqlock); ++nreqs; ++nready; reqq_push (&req_queue, req); X_COND_SIGNAL (reqwait); X_UNLOCK (reqlock); etp_maybe_start_thread (); </code></pre>

<p>先做计数器累加,然后把刚刚初始化的eio<em>req放在req</em>queue,这是request队列,然后调动etp<em>maybe</em>start<em>thread(),在这个函数中会判断是否调用etp</em>start_thread()启动工作线程。判断逻辑如下:</p>

<pre><code>if (ecb_expect_true (etp_nthreads () >= wanted)) return; if (ecb_expect_true (0 <= (int)etp_nthreads () + (int)etp_npending () - (int)etp_nreqs ())) return; </code></pre>

<p>1. 先判断工作线程数量,如果已到达上限(wanted,默认是4),则直接返回,不做任何操作。 2. 如果当前的工作线程数量与已处理请求数量的和小于总的请求数量(包括完成与未完成的),则直接返回,不做任何操作 条件都满足了,接下来会调用etp<em>start</em>thread()函数,通过thread_create创建线程。</p>

<p><strong>线程?!没错,就是线程:eio_proc</strong></p>

<p>在etp<em>start</em>thread()中创建线程,线程函数etp_proc的代码大致如下:</p>

<pre><code>for (;;) { for (;;) { self->req = req = reqq_shift (&req_queue); if (req) break; } ETP_EXECUTE (self, req); if (!reqq_push (&res_queue, req) && want_poll_cb) want_poll_cb (); } </code></pre>

<p>流程概括如下:</p>

<ol> <li>从req_queue队列(即,待处理队列)中取出一个eio请求</li> <li>调用ETP_EXECUTE完成io的实质操作,这会是一个阻塞过程。</li> <li>将eio请求插入res_queue队列(即,已处理队列)</li> <li>调用want<em>poll</em>cb回调函数</li> </ol>

<p>这里要注意want<em>poll</em>cb的回调条件,当且仅当入队前res_queue为空,才会调用。</p>

<p><strong>执行同步IO:eio_execute</strong></p>

<p>线程函数中有ETP<em>EXECUTE出现,这个宏指代了eio</em>execute函数。在这个函数里面,根据eio<em>req的type域进行switch case,调用相应的io函数完成实质操作,并把结果写到eio</em>req中。示例代码如下:</p>

<pre><code>/* … / switch (req->type) { // case EIO_MKDIR: req->result = mkdir (req->ptr1, (eio_mode_t)req->int2); break; // } / … */ </code></pre>

<p>单纯从异步IO操作的执行看,前面介绍的流程已经能完成了,但是我们不仅要执行IO,还要获取执行的结果,并触发想要的回调。这是通过eio_poll来实现的:</p>

<p><strong>接收报告:eio_poll</strong></p>

<p>如同很多函数一样,eio<em>poll只是套了个壳儿,进而调用etp</em>poll。删减了一些代码,挑出重要逻辑如下:</p>

<pre><code>for (;;) { ETP_REQ *req; etp_maybe_start_thread (); req = reqq_shift (&res_queue); if (req) { if (!res_queue.size && done_poll_cb) done_poll_cb (); } –nreqs; } </code></pre>

<p>流程要点如下:</p>

<p>调用etp<em>maybe</em>start<em>thread(),上面已经讲过,在满足条件的情况下会创建工作线程。 从res</em>queue中取出一个eio<em>req 如果res</em>queue为空,则触发done<em>poll</em>cb回调</p>

<p><strong>总结</strong></p>

<p>就这样吧,再给出一张图来整体贯穿一下: <img src=“http://www.codingguy.net/wp-content/uploads/2012/02/all.png” alt=“enter image description here” title="" /></p>

4 回复

哎哟,不错哦~

多多交流

很不错的文章,对着代码看,很快就理解libeio的工作机制了~~

学习了,谢谢分享!

回到顶部