Egg源码解析之egg-cluster
发布于 2 年前 作者 OnedayLiu 5073 次浏览 来自 分享

egg-cluster是什么

为了将多核CPU的性能发挥到极致,最大程度地榨干服务器资源,egg采用多进程模型,解决了一个Node.js进程只能运行在一个CPU上的问题,egg-cluster是用于egg多进程管理的基础模块,负责底层的IPC通道的建立以及处理各进程的通信

egg多进程模型

168aa630b7012b187c5f7ddd6872e2e130532548.png

  • master 主进程
  • worker master的子进程,一般是根据服务器有多少个CPU启动多少个这样的worker进程,主要用于对外服务,处理各种业务层面的事情
  • agent master的子进程,主要处理公共资源的访问,如文件监听,或者帮worker处理一些公共事务,如一些事情是不需要每个worker都做一次的,agent帮忙做完之后通知它们执行之后的操作

master类似于一个守护进程的存在:

  • 负责agent的启动、退出、重启
  • 负责各个worker进程的启动、退出、以及refork,在开发模式下负责重启
  • 负责agent和各个worker之间的通信
  • 负责各个worker之间的通信

各进程的启动顺序:

  • master启动后先启动agent进程
  • agent初始化成功后,通过IPC通道通知master
  • master根据CPU的个数启动相同数目的worker进程
  • worker进程初始化成功后,通过IPC通道通知master
  • 所有的进程初始化成功后,master通知agent和各个worker进程应用启动成功

启动方式差异:

从上图可以看出,master启动agentworker的方式明显不一样,启动agent使用的是child_process的fork模式,启动各个worker使用的是cluster的fork模式,为什么不能都使用同一种方式来启动?因为它们所负责处理的事情性质是不一样的,agent是类似于作为各个worker秘书的存在,只负责帮它们处理轻量级的服务,是不直接对外提供http访问的,所以mastercluster.fork把各个worker启动起来,并提供对外http访问,这些workercluster的预处理下能够对同一端口进行监听而不会产生端口冲突,同时使用round-robin策略进行负载均衡把收到的http请求合理地分配给各个worker进行处理

进程间通信:

masteragent/worker是real communication,agentworker之间以及各个worker之间是virtual communication

  • master继承了events模块,拥有events监听、发送消息的能力,master进程自身是通过订阅者模式来进行事务处理的,所以在master的源码里面并没有看到过多的callback hell
  • masteragent的父进程,相互可以通过IPC通道进行通信
  • masterworker的父进程,相互可以通过IPC通道进行通信
  • agent和各个worker之间毕竟是不同进程,是无法直接进行通信的,所以需要借助master的力量进行转发,egg-cluster封装了一个messenger的工具类,对各个进程间消息转发进行了封装
  • 各个worker之间由于是不同进程,也是无法直接进行通信的,需要借助master的力量进行转发,原理同上

各进程的状态通知

  • worker启动成功后master会对其状态进行监听,对于退出或者失联的worker master是清楚的,在这情况下master会对这些worker之前所绑定的事件进行销毁防止内存泄露,并且通知agent,最后refork出同等数量的worker保证业务的顺利进行,对worker的fork和refork操作都是通过工具类cfork进行的
  • agent启动成功后master会对其状态进行监听,对于退出或者失联的agent master是清楚的,在这情况下master会对这些agent之前所绑定的事件进行销毁防止内存泄露,并且通知各个worker,最后重启agent进程保证业务的顺利进行
  • master退出了或者失联了,worker怎么办?不用担心,cluster已经做好了这样的处理,当父进程退出后子进程自动退出
  • master退出了或者失联了,agent也像worker一样退出吗?然而并不是!这是child_process.forkcluster.fork的不同之处,master退出了或者失联了,agent进程还继续运行,但是它的父进程已经不在了,它将会被init进程收养,从而成为孤儿进程,当这样的孤儿进程越来越多的时候服务器就会越来越卡。所以master退出后需要指定agent也一起退出!

开发模式

开发模式下agent会监听相关文件的改动,然后通知masterworker进行重启操作

开发模式下开启egg-development插件,对相关文件进行监听,监听到有文件改动的话向master发送reload-worker事件

Talk is cheap. Show me the code

准备工作

写这篇文章的时候egg社区版最新版是 1.6.0 ,下面的内容以该版本为准

读源码前需要理解两个模块的作用:

  • messenger,负责masteragentworkerIPC通信的消息转发
  • cfork,负责worker的启动,状态监听以及refork操作

egg是通过index.js作为入口文件进行启动的,输入以下代码然后就可以成功启动了

const egg = require('egg');
egg.startCluster(options, () => {
  console.log('started');
});

入口文件代码如此简单,那egg底层做了些什么?比如egg.startCluster这个方法里面做了些什么?

exports.startCluster = require('egg-cluster').startCluster;

原来egg.startClusteregg-cluster模块暴露的一个API

// egg-cluster/index.js
const Master = require('./lib/master');
exports.startCluster = function(options, callback) {
  new Master(options).ready(callback);
};

startCluster主要做了这些事情

  • 启动master进程
  • egg启动成功后执行callback方法,比如希望在egg启动成功后执行一些业务上的初始化操作

Master(egg-cluster/lib/master.js)

// Master继承了events模块,拥有events监听、发送消息的能力
class Master extends EventEmitter {} 

Master#constructor

constructor里面大致可以分为5个部分:

constructor(options) {
  super();
  this.options = parseOptions(options);
  // new一个Messenger实例
  this.messenger = new Messenger(this);
  // 借用ready模块的方法
  ready.mixin(this);
  this.isProduction = isProduction();
  this.isDebug = isDebug();
  ...
  ...
  // 根据不同运行环境(local、test、prod)设置日志输出级别
  this.logger = new ConsoleLogger({ level: process.env.EGG_MASTER_LOGGER_LEVEL || 'INFO' });
  ...
}
// master启动成功后通知parent、app worker、agent
this.ready(() => {
  this.isStarted = true;
  const stickyMsg = this.options.sticky ? ' with STICKY MODE!' : '';
  this.logger.info('[master] %s started on %s://127.0.0.1:%s (%sms)%s',
  frameworkPkg.name, this.options.https ? 'https' : 'http',
  this.options.port, Date.now() - startTime, stickyMsg);

  const action = 'egg-ready';
  this.messenger.send({ action, to: 'parent' });
  this.messenger.send({ action, to: 'app', data: this.options });
  this.messenger.send({ action, to: 'agent', data: this.options });
});
// 监听agent退出
this.on('agent-exit', this.onAgentExit.bind(this));
// 监听agent启动
this.on('agent-start', this.onAgentStart.bind(this));
// 监听app worker退出
this.on('app-exit', this.onAppExit.bind(this));
// 监听app worker启动
this.on('app-start', this.onAppStart.bind(this));
// 开发环境下监听app worker重启
this.on('reload-worker', this.onReload.bind(this));

// 监听agent启动,注意这里只执行一次
this.once('agent-start', this.forkAppWorkers.bind(this));
// master监听自身的退出及退出后的处理

// kill(2) Ctrl-C     监听SIGINT信号
process.once('SIGINT', this.onSignal.bind(this, 'SIGINT'));
// kill(3) Ctrl-\     监听SIGQUIT信号
process.once('SIGQUIT', this.onSignal.bind(this, 'SIGQUIT'));
// kill(15) default   监听SIGTERM信号
process.once('SIGTERM', this.onSignal.bind(this, 'SIGTERM'));

// 监听exit事件
process.once('exit', this.onExit.bind(this));
// 监听端口冲突
detectPort((err, port) => {
  /* istanbul ignore if */
  if (err) {
    err.name = 'ClusterPortConflictError';
    err.message = '[master] try get free port error, ' + err.message;
    this.logger.error(err);
    process.exit(1);
    return;
  }
  this.options.clusterPort = port;
  this.forkAgentWorker(); // 如果端口没有冲突则执行该方法
});

Master#forkAgentWorker

master进程以child_process模式启动agent进程

forkAgentWorker() {
  this.agentStartTime = Date.now();
  const args = [ JSON.stringify(this.options) ];
  const opt = { execArgv: process.execArgv.concat([ '--debug-port=5856' ]) };
  
  // 以child_process.fork模式启动agent worker,此时agent成为master的子进程
  const agentWorker = this.agentWorker = childprocess.fork(agentWorkerFile, args, opt);
  
  // 记录agent的id
  agentWorker.id = ++this.agentWorkerIndex;
  
  this.log('[master] agent_worker#%s:%s start with clusterPort:%s',
  agentWorker.id, agentWorker.pid, this.options.clusterPort);

  // master监听从agent发送给master的消息, 并打上消息来源(msg.from = 'agent')
  // 将消息通过messenger发送出去
  agentWorker.on('message', msg => {
	if (typeof msg === 'string') msg = { action: msg, data: msg };
    msg.from = 'agent';
    this.messenger.send(msg);
  });
  
  // master监听agent的异常,并打上对应的log信息方便问题排查
  agentWorker.on('error', err => {
    err.name = 'AgentWorkerError';
    err.id = agentWorker.id;
    err.pid = agentWorker.pid;
    this.logger.error(err);
  });
  
  // master监听agent的退出
  // 并通过messenger发送agent的'agent-exit'事件给master
  // 告诉master说agent退出了
  agentWorker.once('exit', (code, signal) => {
    this.messenger.send({
      action: 'agent-exit',
      data: { code, signal },
      to: 'master',
      from: 'agent',
    });
  });
}

到这里,agent worker已完成启动,并且master对其进行监听,这里有个疑问

agent启动成功后是如何通知master进行下一步操作的?

const agentWorker = this.agentWorker = childprocess.fork(agentWorkerFile, args, opt);

child_process.fork模式启动agent worker,读取的是agent_worker.js,截取里面的一段代码

// egg-cluster/lib/agent_worker.js

agent.ready(() => {
  agent.removeListener('error', startErrorHandler);
  process.send({ action: 'agent-start', to: 'master' });
});

agent启动成功后调用process.send()通知mastermaster监听到该消息通过messenger转发出去

// Master#forkAgentWorker
agentWorker.on('message', msg => {
  if (typeof msg === 'string') msg = { action: msg, data: msg };
  msg.from = 'agent';
  this.messenger.send(msg);
});

最终由master进行agent-start事件的响应

// Master#constructor
...
...
this.on('agent-start', this.onAgentStart.bind(this));
...
this.once('agent-start', this.forkAppWorkers.bind(this));
...
...

Master#onAgentStart

agent启动后的操作

onAgentStart() {
  // agent启动成功后向app worker发送'egg-pids'事件并带上agent pid
  this.messenger.send({ action: 'egg-pids', to: 'app', data: [ this.agentWorker.pid ] });
  // 向app worker发送'agent-start'事件
  this.messenger.send({ action: 'agent-start', to: 'app' });
  this.logger.info('[master] agent_worker#%s:%s started (%sms)',
  this.agentWorker.id, this.agentWorker.pid, Date.now() - this.agentStartTime);
}

值得注意的是此时app worker还没启动,所以该消息会被丢弃,后续如果发生agent重启的情况会被app worker监听到

Master#forkAppWorkers

master进程以cluster模式启动app worker进程

forkAppWorkers() {
  this.appStartTime = Date.now();
  this.isAllAppWorkerStarted = false;
  this.startSuccessCount = 0;

  this.workers = new Map();

  const args = [ JSON.stringify(this.options) ];
  this.log('[master] start appWorker with args %j', args);
  
  // 以cluster模式启动app worker进程
  cfork({
    exec: appWorkerFile,
    args,
    silent: false,
    count: this.options.workers,
    // 在开发环境下不会进行refork,方便排查问题
    refork: this.isProduction,
  });

  // master监听各个app worker进程的消息
  cluster.on('fork', worker => {
    this.workers.set(worker.process.pid, worker);
    worker.on('message', msg => {
      if (typeof msg === 'string') msg = { action: msg, data: msg };
      msg.from = 'app';
      this.messenger.send(msg);
    });
    this.log('[master] app_worker#%s:%s start, state: %s, current workers: %j',
  worker.id, worker.process.pid, worker.state, Object.keys(cluster.workers));
  });
  
  // master监听各个app worker进程的disconnect事件并记录到log
  cluster.on('disconnect', worker => {
    this.logger.info('[master] app_worker#%s:%s disconnect, suicide: %s, state: %s, current workers: %j',
    worker.id, worker.process.pid, worker.exitedAfterDisconnect, worker.state, Object.keys(cluster.workers));
  });
  
  // master监听各个app worker进程的exit事件,并向master发送'app-exit'事件,将app worker退出后的事情交给master处理
  cluster.on('exit', (worker, code, signal) => {
    this.messenger.send({
      action: 'app-exit',
      data: { workerPid: worker.process.pid, code, signal },
      to: 'master',
      from: 'app',
    });
  });
  
  // master监听各个app worker进程的listening事件,表示各个app worker已经可以开始工作了
  cluster.on('listening', (worker, address) => {
    this.messenger.send({
      action: 'app-start',
      data: { workerPid: worker.process.pid, address },
      to: 'master',
      from: 'app',
    });
  });
}

app worker启动后,跟agent一样,通过messengerapp-start事件发送给master,由master继续处理

// Master#constructor

...
...
this.on('app-start', this.onAppStart.bind(this));
...
...

Master#onAppStart

app worker启动后的操作

onAppStart(data) {
  const worker = this.workers.get(data.workerPid);
  
  ...
  
  // app worker启动成功后通知agent
  this.messenger.send({
   action: 'egg-pids',
   to: 'agent',
   data: getListeningWorker(this.workers),
  });
  
  ...
  
  // app worker准备好了
  if (this.options.sticky) {
   this.startMasterSocketServer(err => {
     if (err) return this.ready(err);
     this.ready(true);
   });
  } else {
   this.ready(true);
  }
}

这时agent和各个app worker已经ready了,master也可以做好准备了,执行ready后的操作,把egg-ready事件发送给parentappagent,告诉它们已经ready了,可以开始干活

this.ready(() => {
  ...
  const action = 'egg-ready';
  this.messenger.send({ action, to: 'parent' });
  this.messenger.send({ action, to: 'app', data: this.options });
  this.messenger.send({ action, to: 'agent', data: this.options });
});

Master#onAgentExit

agent退出后的处理

onAgentExit(data) {
  ...
  // 告诉各个app worker,agent退出了
  this.messenger.send({ action: 'egg-pids', to: 'app', data: [] });
  
  ...
  // 记录异常信息,方便问题排查
  const err = new Error(util.format('[master] agent_worker#%s:%s died (code: %s, signal: %s)',
      agentWorker.id, agentWorker.pid, data.code, data.signal));
    err.name = 'AgentWorkerDiedError';
  this.logger.error(err);
  
  // 移除事件监听,防止内存泄露
  agentWorker.removeAllListeners();
  
  ...
  // 把'agent-worker-died'通知parent进程后重启agent进程
  this.log('[master] try to start a new agent_worker after 1s ...');
  setTimeout(() => {
    this.logger.info('[master] new agent_worker starting...');
    this.forkAgentWorker();
  }, 1000);
  this.messenger.send({
    action: 'agent-worker-died',
    to: 'parent',
  });
}

Master#onAppExit

app worker退出后的处理

onAppExit(data) {
  ...
  // 记录异常信息,方便问题排查
  if (!worker.isDevReload) {
    const signal = data.code;
    const message = util.format(
   '[master] app_worker#%s:%s died (code: %s, signal: %s, suicide: %s, state: %s), current workers: %j',
   worker.id, worker.process.pid, worker.process.exitCode, signal,
   worker.exitedAfterDisconnect, worker.state,
    Object.keys(cluster.workers)
    );
    const err = new Error(message);
    err.name = 'AppWorkerDiedError';
    this.logger.error(err);
  }
  
  // 移除事件监听,防止内存泄露
  worker.removeAllListeners();
  this.workers.delete(data.workerPid);
  
  // 发送'egg-pids'事件给agent,告诉它目前处于alive状态的app worker pid
  this.messenger.send({ action: 'egg-pids', to: 'agent', data: getListeningWorker(this.workers) });
  
  // 发送'app-worker-died'的消息给parent进程
  this.messenger.send({
    action: 'app-worker-died',
    to: 'parent',
  });
}

Master#onReload

开发模式下监听文件的改动,对app worker进行重启操作

  • 开发模式下开启egg-development插件,对相关文件进行监听,监听到有文件改动的话向master发送reload-worker事件
process.send({
  to: 'master',
  action: 'reload-worker',
});
  • master通过监听reload-worker事件后执行onReload方法
this.on('reload-worker', this.onReload.bind(this));
  • onReload通过cluster-reload模块进行重启操作
onReload() {
  this.log('[master] reload workers...');
  for (const id in cluster.workers) {
    const worker = cluster.workers[id];
    worker.isDevReload = true;
  }
  require('cluster-reload')(this.options.workers);
}

Master#onExit

master退出后的处理,该方法主要是打相关的log

Master#onSignal和Master#close

测试的时候,master对收到的各个系统signal进行响应

// kill(2) Ctrl-C
process.once('SIGINT', this.onSignal.bind(this, 'SIGINT'));
// kill(3) Ctrl-\
process.once('SIGQUIT', this.onSignal.bind(this, 'SIGQUIT'));
// kill(15) default
process.once('SIGTERM', this.onSignal.bind(this, 'SIGTERM'));
  • 杀死各个app worker进程
  • 杀死agent进程
  • 退出master进程
close() {
  this.closed = true;
  this.killAppWorkers();
  this.killAgentWorker();
  this.log('[master] close done, exiting with code:0');
  process.exit(0);
}
12 回复

牛逼👍🏻

@atian25 给Egg推广做点贡献~

牛逼死了,急需这样的文章

666,点赞

关于优雅退出那里,可以看下 https://zhuanlan.zhihu.com/p/25457918

image.png

@atian25 解释的很详细,学习了

哇塞,写得好棒啊,厉害了

@OnedayLiu 发一份到知乎你的个人文章,然后投稿给我们的知乎专栏?

回到顶部