Egg 应用的终止过程

前言
背景
我当前在做的项目是用 Egg.js 开发的,其中涉及到循环消费队列消息的逻辑,但是有一个担心,就是在 worker
进程终止的时候消息处理到一半,既完成了业务逻辑,但又没有 ACK,导致消息相应超时,后面还会再消费到,可能会导致各种各样无法预料的问题。
环境
- Node.js:12.x;
- Egg.js:2.26.x;
目标
- 确保在 Egg 的
worker
进程退出之前,都停止循环消费新的通道消息; - 在 Egg 的
worker
进程退出,确保当前已经消费的通道消息全部执行完毕并 ACK。如果不行,可以延长退出的时间,尽量保证当前的消息都处理完成;
Egg 应用的退出过程
要在 Egg 程序停止之前做一些定制化逻辑首先需要了解 Egg 程序停止的步骤和原理。
通过 package.json 定义的 scripts 命令可以看出,在开发环境下和生产环境下,有两种不同的引导方式,一种是 egg-bin
,一种是 egg-scripts
。我们就随便选一种研究应该就可以。以 egg-scripts
为例。
// lib/cmd/stop.js@egg-scripts
// 在当前操作系统通过执行 ps -eo "pid,args" 并过滤,找到相关的 Egg master 进程
let processList = yield this.helper.findNodeProcess(item => {
const cmd = item.cmd;
return argv.title ?
cmd.includes('start-cluster') && cmd.includes(util.format(osRelated.titleTemplate, argv.title)) : cmd.includes('start-cluster');
});
let pids = processList.map(x => x.pid);
if (pids.length) {
this.logger.info('got master pid %j', pids);
this.helper.kill(pids);
// wait for 5s to confirm whether any worker process did not kill by master
yield sleep(argv.timeout || '5s');
} else {
this.logger.warn('can\'t detect any running egg process');
}
// lib/helper.js@egg-scripts
exports.kill = function(pids, signal) {
pids.forEach(pid => {
try {
process.kill(pid, signal); // 向目标 master 进程发送 SIGTERM 信号量
} catch (err) { /* istanbul ignore next */
if (err.code !== 'ESRCH') {
throw err;
}
}
});
};
首先我们也在 Shell 中执行一下 ps -eo "pid,args"
。
968 node /data/projects/iot-back-end/node_modules/egg-scripts/lib/start-cluster {"title":"egg-server-iot-eshop-backend","baseDir":"/data/projects/iot-back-end","framework":"/data/projects/iot-back-end/node_modules/egg"} --title=egg-server-iot-eshop-backend
981 /data/projects/iot-back-end/node_modules/node/bin/node /data/projects/iot-back-end/node_modules/egg-cluster/lib/agent_worker.js {"framework":"/data/projects/iot-back-end/node_modules/egg","baseDir":"/data/projects/iot-back-end","workers":2,"plugins":null,"https":false,"title":"egg-server-iot-eshop-backend","clusterPort":40605}
1003 /data/projects/iot-back-end/node_modules/node/bin/node /data/projects/iot-back-end/node_modules/egg-cluster/lib/app_worker.js {"framework":"/data/projects/iot-back-end/node_modules/egg","baseDir":"/data/projects/iot-back-end","workers":2,"plugins":null,"https":false,"title":"egg-server-iot-eshop-backend","clusterPort":40605}
1004 /data/projects/iot-back-end/node_modules/node/bin/node /data/projects/iot-back-end/node_modules/egg-cluster/lib/app_worker.js {"framework":"/data/projects/iot-back-end/node_modules/egg","baseDir":"/data/projects/iot-back-end","workers":2,"plugins":null,"https":false,"title":"egg-server-iot-eshop-backend","clusterPort":40605}
和 Egg 相关的进程有以下四个,根据字面意思分别是一个 master,一个 agent,两个 worker 进程。再看代码里的筛选条件,只向包含有 start-cluster
字符串的进程发送信号,也就是只向 master 进程发送。另外可以看出,master 进程的入口文件是 egg-scripts/lib/start-cluster
。
代码很短,结合进程信息可以看出,是执行了 egg 框架本身的 startCluster
方法。
// lib/start-cluster.js@egg-scripts
#!/usr/bin/env node
'use strict';
const options = JSON.parse(process.argv[2]);
require(options.framework).startCluster(options);
那让我们把视线转到 Egg 框架上,发现其实是调用了 egg-cluster
的 startCluster
方法。
// index.js@egg
/**
* Start egg application with cluster mode
* @since 1.0.0
*/
exports.startCluster = require('egg-cluster').startCluster;
我们在 egg-cluster
的代码中发现,startCluster
方法只是将一个叫做 Master
的类进行了实例化,而在 Master
的构造函数中,终于找到了我们想要的。然后顺着一直找,发现 master
进程再收到终止信号以后,也向 worker
进程发送了终止信号。
// lib/master.js@egg-cluster
// https://nodejs.org/api/process.html#process_signal_events
// https://en.wikipedia.org/wiki/Unix_signal
// kill(2) Ctrl-C
process.once('SIGINT', this.onSignal.bind(this, 'SIGINT'));
// kill(3) Ctrl-\
process.once('SIGQUIT', this.onSignal.bind(this, 'SIGQUIT'));
// kill(15) default
process.once('SIGTERM', this.onSignal.bind(this, 'SIGTERM'));
// lib/utils/terminate.js@egg-cluster
// kill process, if SIGTERM not work, try SIGKILL
function* killProcess(subProcess, timeout) {
subProcess.kill('SIGTERM');
yield Promise.race([
awaitEvent(subProcess, 'exit'),
sleep(timeout),
]);
if (subProcess.killed) return;
// SIGKILL: http://man7.org/linux/man-pages/man7/signal.7.html
// worker: https://github.com/nodejs/node/blob/master/lib/internal/cluster/worker.js#L22
// subProcess.kill is wrapped to subProcess.destroy, it will wait to disconnected.
(subProcess.process || subProcess).kill('SIGKILL');
}
那么 worker
进程又是响应的呢?一开始我并没有找到 process.on
相关的代码,后来发现了以下这段代码。再次翻阅了 graceful-process
的代码以后了解到 beforeExit
是一个钩子函数,在进程终止前会运行,这里运行的是 egg 框架的 Application
实例的 close
方法。
// lib/app_worker.js@egg-cluster
const gracefulExit = require('graceful-process');
...
const Application = require(options.framework).Application;
debug('new Application with options %j', options);
const app = new Application(options);
...
gracefulExit({
logger: consoleLogger,
label: 'app_worker',
beforeExit: () => app.close(),
});
再次回到 Egg 框架后发现,close
方法是继承于 egg-core
模块的。再先上追溯,最后找到了这里,发现在结束之前,会倒序执行一个集合保存的所有函数。通过官方文档不难猜出,这些函数应该就是是众多扩展、插件以及应用本身的 beforeClose
函数。
// lib/lifecycle.js@egg-core
async close() {
// close in reverse order: first created, last closed
const closeFns = Array.from(this[CLOSE_SET]);
for (const fn of closeFns.reverse()) {
await utils.callFn(fn);
this[CLOSE_SET].delete(fn);
}
// Be called after other close callbacks
this.app.emit('close');
this.removeAllListeners();
this.app.removeAllListeners();
this[IS_CLOSED] = true;
}
解决方案
原理了解到这里以后,我们就可以实现目标一了。
延长 worker 进程的退出时间
我们在应用的 app.js
文件中,定义 app
的一个属性叫做 consumeTask
,初始值为 true,然后在 beforeClose
中修改成 false,并休眠 5 秒钟,让已经消费到的任务都有充足的时间被处理完。
但是在修改完成以后,使用 npm run stop
命令后发现,因为超过了默认的超时等待时间 5 秒,没有退出的 worker 进程被强制 kill 掉了,即发送了 SIGKILL
信号。因此我们还需要修改退出的超时等待时间。
修改退出超时等待时间
修改比较简单,根据 egg-scripts
的文档,直接在 package.json 文件的 stop
命令中加入一个命令行参数就可以 --timeout=10s
。