前言

背景

我当前在做的项目是用 Egg.js 开发的,其中涉及到循环消费队列消息的逻辑,但是有一个担心,就是在 worker 进程终止的时候消息处理到一半,既完成了业务逻辑,但又没有 ACK,导致消息相应超时,后面还会再消费到,可能会导致各种各样无法预料的问题。

环境

  • Node.js:12.x;
  • Egg.js:2.26.x;

目标

  1. 确保在 Egg 的 worker 进程退出之前,都停止循环消费新的通道消息;
  2. 在 Egg 的 worker 进程退出,确保当前已经消费的通道消息全部执行完毕并 ACK。如果不行,可以延长退出的时间,尽量保证当前的消息都处理完成;

Egg 应用的退出过程

要在 Egg 程序停止之前做一些定制化逻辑首先需要了解 Egg 程序停止的步骤和原理。

通过 package.json 定义的 scripts 命令可以看出,在开发环境下和生产环境下,有两种不同的引导方式,一种是 egg-bin,一种是 egg-scripts。我们就随便选一种研究应该就可以。以 egg-scripts 为例。

// lib/cmd/[email protected]
// 在当前操作系统通过执行 ps -eo "pid,args" 并过滤,找到相关的 Egg master 进程
let processList = yield this.helper.findNodeProcess(item => {
const cmd = item.cmd;
return argv.title ?
cmd.includes('start-cluster') && cmd.includes(util.format(osRelated.titleTemplate, argv.title)) : cmd.includes('start-cluster');
});
let pids = processList.map(x => x.pid);

if (pids.length) {
  this.logger.info('got master pid %j', pids);
  this.helper.kill(pids);
  // wait for 5s to confirm whether any worker process did not kill by master
  yield sleep(argv.timeout || '5s');
} else {
  this.logger.warn('can\'t detect any running egg process');
}
// lib/[email protected]
exports.kill = function(pids, signal) {
  pids.forEach(pid => {
    try {
      process.kill(pid, signal); // 向目标 master 进程发送 SIGTERM 信号量
    } catch (err) { /* istanbul ignore next */
      if (err.code !== 'ESRCH') {
        throw err;
      }
    }
  });
};

首先我们也在 Shell 中执行一下 ps -eo "pid,args"

968 node /data/projects/iot-back-end/node_modules/egg-scripts/lib/start-cluster {"title":"egg-server-iot-eshop-backend","baseDir":"/data/projects/iot-back-end","framework":"/data/projects/iot-back-end/node_modules/egg"} --title=egg-server-iot-eshop-backend
981 /data/projects/iot-back-end/node_modules/node/bin/node /data/projects/iot-back-end/node_modules/egg-cluster/lib/agent_worker.js {"framework":"/data/projects/iot-back-end/node_modules/egg","baseDir":"/data/projects/iot-back-end","workers":2,"plugins":null,"https":false,"title":"egg-server-iot-eshop-backend","clusterPort":40605}
1003 /data/projects/iot-back-end/node_modules/node/bin/node /data/projects/iot-back-end/node_modules/egg-cluster/lib/app_worker.js {"framework":"/data/projects/iot-back-end/node_modules/egg","baseDir":"/data/projects/iot-back-end","workers":2,"plugins":null,"https":false,"title":"egg-server-iot-eshop-backend","clusterPort":40605}
1004 /data/projects/iot-back-end/node_modules/node/bin/node /data/projects/iot-back-end/node_modules/egg-cluster/lib/app_worker.js {"framework":"/data/projects/iot-back-end/node_modules/egg","baseDir":"/data/projects/iot-back-end","workers":2,"plugins":null,"https":false,"title":"egg-server-iot-eshop-backend","clusterPort":40605}

和 Egg 相关的进程有以下四个,根据字面意思分别是一个 master,一个 agent,两个 worker 进程。再看代码里的筛选条件,只向包含有 start-cluster 字符串的进程发送信号,也就是只向 master 进程发送。另外可以看出,master 进程的入口文件是 egg-scripts/lib/start-cluster

代码很短,结合进程信息可以看出,是执行了 egg 框架本身的 startCluster 方法。

// lib/[email protected]
#!/usr/bin/env node

'use strict';

const options = JSON.parse(process.argv[2]);
require(options.framework).startCluster(options);

那让我们把视线转到 Egg 框架上,发现其实是调用了 egg-clusterstartCluster 方法。

// [email protected]
/**
 * Start egg application with cluster mode
 * @since 1.0.0
 */
exports.startCluster = require('egg-cluster').startCluster;

我们在 egg-cluster 的代码中发现,startCluster 方法只是将一个叫做 Master 的类进行了实例化,而在 Master 的构造函数中,终于找到了我们想要的。然后顺着一直找,发现 master 进程再收到终止信号以后,也向 worker 进程发送了终止信号。

// lib/[email protected]
// https://nodejs.org/api/process.html#process_signal_events
// https://en.wikipedia.org/wiki/Unix_signal
// kill(2) Ctrl-C
process.once('SIGINT', this.onSignal.bind(this, 'SIGINT'));
// kill(3) Ctrl-\
process.once('SIGQUIT', this.onSignal.bind(this, 'SIGQUIT'));
// kill(15) default
process.once('SIGTERM', this.onSignal.bind(this, 'SIGTERM'));

// lib/utils/[email protected]
// kill process, if SIGTERM not work, try SIGKILL
function* killProcess(subProcess, timeout) {
  subProcess.kill('SIGTERM');
  yield Promise.race([
    awaitEvent(subProcess, 'exit'),
    sleep(timeout),
  ]);
  if (subProcess.killed) return;
  // SIGKILL: http://man7.org/linux/man-pages/man7/signal.7.html
  // worker: https://github.com/nodejs/node/blob/master/lib/internal/cluster/worker.js#L22
  // subProcess.kill is wrapped to subProcess.destroy, it will wait to disconnected.
  (subProcess.process || subProcess).kill('SIGKILL');
}

那么 worker 进程又是响应的呢?一开始我并没有找到 process.on 相关的代码,后来发现了以下这段代码。再次翻阅了 graceful-process 的代码以后了解到 beforeExit 是一个钩子函数,在进程终止前会运行,这里运行的是 egg 框架的 Application 实例的 close 方法。

// lib/[email protected]

const gracefulExit = require('graceful-process');
...
const Application = require(options.framework).Application;
debug('new Application with options %j', options);
const app = new Application(options);

...

gracefulExit({
  logger: consoleLogger,
  label: 'app_worker',
  beforeExit: () => app.close(),
});

再次回到 Egg 框架后发现,close 方法是继承于 egg-core 模块的。再先上追溯,最后找到了这里,发现在结束之前,会倒序执行一个集合保存的所有函数。通过官方文档不难猜出,这些函数应该就是是众多扩展、插件以及应用本身的 beforeClose 函数。

// lib/[email protected]

async close() {
    // close in reverse order: first created, last closed
    const closeFns = Array.from(this[CLOSE_SET]);
    for (const fn of closeFns.reverse()) {
      await utils.callFn(fn);
      this[CLOSE_SET].delete(fn);
    }
    // Be called after other close callbacks
    this.app.emit('close');
    this.removeAllListeners();
    this.app.removeAllListeners();
    this[IS_CLOSED] = true;
}

解决方案

原理了解到这里以后,我们就可以实现目标一了。

延长 worker 进程的退出时间

我们在应用的 app.js 文件中,定义 app 的一个属性叫做 consumeTask,初始值为 true,然后在 beforeClose 中修改成 false,并休眠 5 秒钟,让已经消费到的任务都有充足的时间被处理完。

但是在修改完成以后,使用 npm run stop 命令后发现,因为超过了默认的超时等待时间 5 秒,没有退出的 worker 进程被强制 kill 掉了,即发送了 SIGKILL 信号。因此我们还需要修改退出的超时等待时间。

修改退出超时等待时间

修改比较简单,根据 egg-scripts 的文档,直接在 package.json 文件的 stop 命令中加入一个命令行参数就可以 --timeout=10s

参考资料