简单梳理Node.js创建子进程的方法(下)—— cluster
发布于 4 年前 作者 miser 3210 次浏览 来自 分享

前文简单梳理了Node.js使用child_process模块创建子进程的4种方法,execexecFileforkspawn。接下来我们看看cluster模块如何创建子进程,后续更多内容会介绍cluster.fork启动Net Server时候为何不会因为共同监听同一个端口而不报错。

cluster

  • fork: 衍生出一个新的工作进程,这只能通过主进程调用。

翻翻源码看看他们怎么实现的

// libuv
#define UV_VERSION_MAJOR 1
#define UV_VERSION_MINOR 33
#define UV_VERSION_PATCH 1

// V8
#define V8_MAJOR_VERSION 7
#define V8_MINOR_VERSION 8
#define V8_BUILD_NUMBER 279
#define V8_PATCH_LEVEL 17

// Node.js
#define NODE_MAJOR_VERSION 14
#define NODE_MINOR_VERSION 0
#define NODE_PATCH_VERSION 0

cluster源码位置

lib
  - internal
    - cluster
    - child.js
      - master.js
      - round_robin_handle.js
      - shared_handle.js
      - utils.js
      - worker.js
  - cluster.js

<br/>

官方示例

const cluster = require('cluster');
const http = require('http');
const numCPUs = require('os').cpus().length;

if (cluster.isMaster) {
  console.log(`主进程 ${process.pid} 正在运行`);

  // 衍生工作进程。
  for (let i = 0; i < numCPUs; i++) {
    cluster.fork();
  }

  cluster.on('exit', (worker, code, signal) => {
    console.log(`工作进程 ${worker.process.pid} 已退出`);
  });
} else {
  // 工作进程可以共享任何 TCP 连接。
  // 在本例子中,共享的是 HTTP 服务器。
  http.createServer((req, res) => {
    res.writeHead(200);
    res.end('你好世界\n');
  }).listen(8000);

  console.log(`工作进程 ${process.pid} 已启动`);
}

<br/>

Q:cluster.isMaster模块如何区分是主进程还是子进程的?

// lib/cluster.js
const childOrMaster = 'NODE_UNIQUE_ID' in process.env ? 'child' : 'master';
module.exports = require(`internal/cluster/${childOrMaster}`);

lib/cluster.js是整个cluster的入口,根据环境变量中是否有NODE_UNIQUE_ID来区分主或子进程。

主进程通过cluster.fork创建子进程的时候,会将NODE_UNIQUE_ID传入子进程的环境变量中,最后通过child_process.fork去创建新的子进程。

// lib/internal/cluster/master.js
// ...
cluster.isMaster = true;
// ...
// fork的代码下文还会提到
const workerEnv = { ...process.env, ...env, NODE_UNIQUE_ID: `${id}` };
return fork(cluster.settings.exec, cluster.settings.args, {
  // ...
  env: workerEnv,
});

// lib/internal/cluster/child.js
// ...
cluster.isMaster = false;

Q:cluster.fork创建子进程过程中做了些什么?

cluster.setupMaster是对整个环境参数的配置;

通过createWorkerProcess里的child_process.fork去创建子进程;

之后将其和一个Worker对象做关联,worker、其子进程和当前cluster(master)都会收到几乎相同的messageexitdisconnect事件,Worker这边不多扩展,可以查阅lib/internal/cluster/worker.js

子进程会监听internalMessage事件,什么是internalMessage事件呢?看看下面官方的介绍。

当发送 {cmd: ‘NODE_foo’} 消息时有一种特殊情况。 cmd 属性中包含 NODE_ 前缀的消息是预留给 Node.js 内核内部使用的,将不会触发子进程的 ‘message’事件。 相反,这种消息可使用 ‘internalMessage’ 事件触发,且会被 Node.js 内部消费。 应用程序应避免使用此类消息或监听’internalMessage’ 事件,因为它可能会被更改且不会通知。

此处internalMessage事件的回调方法是internal(worker, onmessage)internallib/internal/cluster/master.js里的方法,主要作用是判断监听的消息里面是否存在需要执行的回调,如果没有就会执行入参回调,这里指的是onmessage

onmessage里面有很多if-else语句,主要是根据cluster.child传送进来的消息类型(act)做出不同的处理,这里列出了一个queryServer(因为后面会介绍Net Server里多个子进程如何监听同一个端口的)。

// lib/internal/cluster/master.js
cluster.fork = function (env) {
  cluster.setupMaster();
  const id = ++ids;
  // 创建子进程
  const workerProcess = createWorkerProcess(id, env);
  //
  const worker = new Worker({
    id: id,
    process: workerProcess, // 新建的子进程
  });
  
  worker.on("message", function (message, handle) {
    cluster.emit("message", this, message, handle);
  });
  worker.process.once("exit", (exitCode, signalCode) => {
    // ...
    worker.state = "dead";
    worker.emit("exit", exitCode, signalCode);
    cluster.emit("exit", worker, exitCode, signalCode);
  });
  worker.process.once("disconnect", () => {
    // ...
    worker.state = "disconnected";
    worker.emit("disconnect");
    cluster.emit("disconnect", worker);
  });

  worker.process.on("internalMessage", internal(worker, onmessage));
  // 触发 fork 事件
  process.nextTick(emitForkNT, worker);
  cluster.workers[worker.id] = worker; 
  return worker;
};

function createWorkerProcess(id, env) {
  const workerEnv = { ...process.env, ...env, NODE_UNIQUE_ID: `${id}` };
  // ... 对一些参数的组合和设定
  // 调用child_process的fork方法创建子进程
  return fork(cluster.settings.exec, cluster.settings.args, {
    cwd: cluster.settings.cwd,
    env: workerEnv,
    silent: cluster.settings.silent,
    windowsHide: cluster.settings.windowsHide,
    execArgv: execArgv,
    stdio: cluster.settings.stdio,
    gid: cluster.settings.gid,
    uid: cluster.settings.uid,
  });
}
function onmessage(message, handle) {
  const worker = this;
  // ... if message.act 类型很多 这里主要讲 queryServer
  if (message.act === "queryServer") queryServer(worker, message);
}
// lib/internal/cluster/utils.js
// 在cluster的chilid和master里的send都会调用sendHelper
let seq = 0;
function sendHelper(proc, message, handle, cb) {
  if (!proc.connected) return false;
  // NODE_* 开头的命令触发 internalMessage 
  message = { cmd: "NODE_CLUSTER", ...message, seq };

  if (typeof cb === "function") callbacks.set(seq, cb); // 缓存回调

  seq += 1;
  // cluster/child.js handle => null
  // cluster/master.js handle => null
  return proc.send(message, handle);
}

以上就是cluster.fork的大致过程,引入一个Worker和internalMessage概念,之后会用得到。cluster的child和master之间传输信息,都是通过sendHelper方法。

Q:cluster.fork创建的子进程如何共同监听TCP端口?

解答这个问题,主要是看net模块如何创建Server的,还有就是cluster中childmaster如何通信的。

官方示例虽然用的是http创建的服务,但它底层是继承的net模块,为了方便梳理,我们从net.createServer入手一步步查看源码,主要的逻辑从listen开始。

Child部分:

cluster.child里创建一个TCP服务,参数port是8000,host没有传参;

调用listenInCluster方法,一看这名字就知道和cluster有关;

listen是在子进程里触发的,它会通过cluster._getServer拼出一个act为serverQuery的消息发送给cluster.master;

Master部分:

前文提到,master.onmessage方法会根据消息的act不同而做出不同的处理,此处正是serverQuery

进入queryServer方法,默认使用RoundRobinHandle循环分配任务;

RoundRobinHandle构造函数中,会调用net.createServer创建一个Server,由于是在cluster.master里调用的,所以会在listenInCluster里调用server._listen2,会new 出 TCP(src/tcp_wrap.cc)作为句柄,并将其赋给server._handle,至此cluster.master已经拥有了处理TCP请求的能力,不过master有该能力是不行的,还需要让child拥有才行;

RoundRobinHandle中一旦server触发了listening事件后,它会接管server._handle,用distribute重置其onconnection方法;

distribute的作用就是转发新的请求TCP给cluster.child,从free列队中取出一个之前add进来的worker(这个worker和cluster.child有关联关系),发送一个actnewconn的消息让其处理这个TCP;

回到Child部分:

cluster.child的onconnection收到actnewconn的请求后,会找到之前的创建的server,然后调用其onconnection(child里的该方法没有被重置)方法,然后封装出一个Socket对象,触发onnection事件;

到此,后面的就是普通业务逻辑代码了。

下面贴上了部分相关代码,还是比较多的,细节部分我也加上了注释。

// lib/net.js
function createServer(options, connectionListener) {
  return new Server(options, connectionListener);
}
function Server(options, connectionListener) {
  // ... 大量内置属性和参数的初始化
}
Server.prototype.listen = function (...args) {
  // ...
  // 传了port参数(8000),没有host
  var backlog;
  if (typeof options.port === "number" || typeof options.port === "string") {
    backlog = options.backlog || backlogFromArgs;
    if (options.host) {
      // ...
    } else {
      // 
      listenInCluster(
        this,
        null,
        options.port | 0,
        4,
        backlog,
        undefined,
        options.exclusive
      );
    }
    return this;
  }
	// ...
};


function listenInCluster(
  server,
  address,
  port,
  addressType,
  backlog,
  fd,
  exclusive,
  flags
) {
  exclusive = !!exclusive;

  if (cluster === undefined) cluster = require("cluster");

  if (cluster.isMaster || exclusive) {
    server._listen2(address, port, addressType, backlog, fd, flags);
    return;
  }

  const serverQuery = {
    address: address,
    port: port, 
    addressType: addressType,
    fd: fd,
    flags,
  };
  cluster._getServer(server, serverQuery, listenOnMasterHandle);

  function listenOnMasterHandle(err, handle) {
    // ...
    server._handle = handle;
    server._listen2(address, port, addressType, backlog, fd, flags);
  }
}
// lib/internal/cluster/child.js
cluster._getServer = function (obj, options, cb) {
  let address = options.address;
	// ...
  // 当前创建的server信息是否之前已经在cluster.child里查询过
  // 有的话就累加计数index值
  const indexesKey = [
    address, 
    options.port,
    options.addressType,
    options.fd,
  ].join(":"); 

  let index = indexes.get(indexesKey);

  if (index === undefined) index = 0;
  else index++;

  indexes.set(indexesKey, index);

  const message = {
    act: "queryServer",
    index,
    data: null,
    ...options,
  };
  
  // ...

  send(message, (reply, handle) => {
    // ...
    if (handle) shared(reply, handle, indexesKey, cb);
    // Shared listen socket.
    else rr(reply, indexesKey, cb); // Round-robin 返回的 handle 是null
  });
  // ...
};
function rr(message, indexesKey, cb) {
  if (message.errno) return cb(message.errno, null);

  var key = message.key;

  function listen(backlog) {
    return 0;
  }

  function close() {
    if (key === undefined) return;
    send({ act: "close", key });
    handles.delete(key);
    indexes.delete(indexesKey);
    key = undefined;
  }

  function getsockname(out) {
    if (key) Object.assign(out, message.sockname);
    return 0;
  }

  const handle = { close, listen, ref: noop, unref: noop };

  if (message.sockname) {
    handle.getsockname = getsockname; // TCP handles only.
  }

  assert(handles.has(key) === false);
  handles.set(key, handle);
  cb(0, handle); // 将封装好的handle,作为listenInCluster的回调handle,赋给server._handle
}
// Round-robin connection.
function onconnection(message, handle) {
  const key = message.key; // maseter里的key
  const server = handles.get(key); // 是client创建的server?
  const accepted = server !== undefined;

  send({ ack: message.seq, accepted }); // 答复 master

  // 虽然cluster.child rr里没有为server绑定,onconnection
  // 但是在cb回到net.js里,后面的逻辑绑定了onconnection方法
  if (accepted) server.onconnection(0, handle);
}
// lib/internal/cluster/master.js
function queryServer(worker, message) {
  // worker是cluster.child worker
  const key =
    `${message.address}:${message.port}:${message.addressType}:` +
    `${message.fd}:${message.index}`;
  var handle = handles.get(key);

  if (handle === undefined) {
  
    // 默认是RoundRobin,Shared模式暂不讨论有兴趣可以看源码
    var constructor = RoundRobinHandle;
    // ...
    handle = new constructor(
      key,
      address,
      message.port,
      message.addressType,
      message.fd,
      message.flags
    );
    handles.set(key, handle);
  }
  // ...
  // 将cluster.child的worker添加到handle中
  handle.add(worker, (errno, reply, handle) => {
    const { data } = handles.get(key);
    send(
      worker,
      {
        errno,
        key,
        ack: message.seq,
        data,
        ...reply,
      },
      handle // round_robin_handle 里返回的是一个null
    );
  });
}
// lib/internal/cluster/round_robin_handle.js
function RoundRobinHandle(key, address, port, addressType, fd, flags) {
  this.key = key;
  this.all = new Map();
  this.free = [];
  this.handles = [];
  this.handle = null;
  this.server = net.createServer(assert.fail);

  if (fd >= 0) this.server.listen({ fd });
  else if (port >= 0) {
    this.server.listen({
      port,
      host: address,
      // Currently, net module only supports `ipv6Only` option in `flags`.
      ipv6Only: Boolean(flags & constants.UV_TCP_IPV6ONLY),
    });
  } else this.server.listen(address); // UNIX socket path.

  this.server.once("listening", () => {
    this.handle = this.server._handle;
    // 重置onconnection方法
    // distribute 做任务派发
    this.handle.onconnection = (err, handle) => this.distribute(err, handle);
    this.server._handle = null;
    this.server = null;
  });
}

RoundRobinHandle.prototype.add = function (worker, send) {
  assert(this.all.has(worker.id) === false);
  this.all.set(worker.id, worker);

  const done = () => {
    if (this.handle.getsockname) {
      // tcp\udp 会有getsockname
      const out = {};
      this.handle.getsockname(out);
      // TODO(bnoordhuis) Check err.
      send(null, { sockname: out }, null);
    } else {
      send(null, null, null); // UNIX socket.
    }

    this.handoff(worker); // In case there are connections pending.
  };

  if (this.server === null) return done();

  // Still busy binding.
  this.server.once("listening", done);
  this.server.once("error", (err) => {
    send(err.errno, null);
  });
};

RoundRobinHandle.prototype.distribute = function (err, handle) {
  this.handles.push(handle);
  const worker = this.free.shift();

  if (worker) this.handoff(worker);
};

RoundRobinHandle.prototype.handoff = function (worker) {
  // worker如果不存在那就跳出
  if (this.all.has(worker.id) === false) {
    return; // Worker is closing (or has closed) the server.
  }

  const handle = this.handles.shift(); // 取出第一个待处理任务

  if (handle === undefined) {
    this.free.push(worker);  // 没有的话就会将worker归还到free里
    return;
  }

  const message = { act: "newconn", key: this.key };

  sendHelper(worker.process, message, handle, (reply) => {
    if (reply.accepted) handle.close();
    else this.distribute(0, handle); // Worker is shutting down. Send to another.

    this.handoff(worker);
  });
};

总结:

cluster利用child_process的fork方法创建子进程,并传入新的环境变量NODE_UNIQUE_ID用于区分主子进程从而在require(‘cluster’)时候可以加载到对应的master.jschild.js文件。另外在默认的RoundRobinHandle模式下,cluster子进程之所以可以共同监听同个TCP端口,是在net模块里面做了master和child区分,child并没有真正的监听端口,而是child会去master查询该Server是否已经存在,如果没有会在RoundRobinHandle中创建中创建server,一旦有新的TCP连接进入,会转发给free里的worker(cluster.child)处理。

回到顶部