From 1e3214b63e262557aadebf3c57e8388de6a4bbe4 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Mon, 14 Aug 2023 21:02:13 +0200 Subject: [PATCH] feat: add worker kill handler success or failure reporting MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- src/pools/abstract-pool.ts | 19 ++++++++++++++++++- src/pools/cluster/fixed.ts | 2 +- src/pools/thread/fixed.ts | 2 +- src/utility-types.ts | 2 +- src/worker/abstract-worker.ts | 23 +++++++++++++++++++---- tests/worker/abstract-worker.test.js | 5 +++++ 6 files changed, 45 insertions(+), 8 deletions(-) diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index eb303232..1d27428e 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -694,6 +694,23 @@ export abstract class AbstractPool< ) } + protected async sendKillMessageToWorker ( + workerNodeKey: number, + workerId: number + ): Promise { + const waitForKillResponse = new Promise((resolve, reject) => { + this.registerWorkerMessageListener(workerNodeKey, (message) => { + if (message.kill === 'success') { + resolve() + } else if (message.kill === 'failure') { + reject(new Error('Worker kill message handling failed')) + } + }) + }) + this.sendToWorker(workerNodeKey, { kill: true, workerId }) + await waitForKillResponse + } + /** * Terminates the worker node given its worker node key. * @@ -934,7 +951,7 @@ export abstract class AbstractPool< // Kill message received from worker if ( isKillBehavior(KillBehaviors.HARD, message.kill) || - (message.kill != null && + (isKillBehavior(KillBehaviors.SOFT, message.kill) && ((this.opts.enableTasksQueue === false && workerUsage.tasks.executing === 0) || (this.opts.enableTasksQueue === true && diff --git a/src/pools/cluster/fixed.ts b/src/pools/cluster/fixed.ts index b5b32527..9022c702 100644 --- a/src/pools/cluster/fixed.ts +++ b/src/pools/cluster/fixed.ts @@ -72,7 +72,7 @@ export class FixedClusterPool< worker.on('disconnect', () => { worker.kill() }) - this.sendToWorker(workerNodeKey, { kill: true, workerId: worker.id }) + await this.sendKillMessageToWorker(workerNodeKey, worker.id) worker.disconnect() await waitWorkerExit } diff --git a/src/pools/thread/fixed.ts b/src/pools/thread/fixed.ts index a193091f..13e4a11a 100644 --- a/src/pools/thread/fixed.ts +++ b/src/pools/thread/fixed.ts @@ -67,7 +67,7 @@ export class FixedThreadPool< resolve() }) }) - this.sendToWorker(workerNodeKey, { kill: true, workerId: worker.threadId }) + await this.sendKillMessageToWorker(workerNodeKey, worker.threadId) workerNode.closeChannel() await worker.terminate() await waitWorkerExit diff --git a/src/utility-types.ts b/src/utility-types.ts index d6869557..1783b4e6 100644 --- a/src/utility-types.ts +++ b/src/utility-types.ts @@ -101,7 +101,7 @@ export interface MessageValue /** * Kill code. */ - readonly kill?: KillBehavior | true + readonly kill?: KillBehavior | true | 'success' | 'failure' /** * Task error. */ diff --git a/src/worker/abstract-worker.ts b/src/worker/abstract-worker.ts index 493ca0b2..56a7696f 100644 --- a/src/worker/abstract-worker.ts +++ b/src/worker/abstract-worker.ts @@ -328,12 +328,27 @@ export abstract class AbstractWorker< this.stopCheckActive() if (isAsyncFunction(this.opts.killHandler)) { (this.opts.killHandler?.() as Promise) - .then(() => this.emitDestroy()) + .then(() => { + this.sendToMainWorker({ kill: 'success', workerId: this.id }) + return null + }) + .catch(() => { + this.sendToMainWorker({ kill: 'failure', workerId: this.id }) + }) + .finally(() => { + this.emitDestroy() + }) .catch(EMPTY_FUNCTION) } else { - // eslint-disable-next-line @typescript-eslint/no-invalid-void-type - this.opts.killHandler?.() as void - this.emitDestroy() + try { + // eslint-disable-next-line @typescript-eslint/no-invalid-void-type + this.opts.killHandler?.() as void + this.sendToMainWorker({ kill: 'success', workerId: this.id }) + } catch (error) { + this.sendToMainWorker({ kill: 'failure', workerId: this.id }) + } finally { + this.emitDestroy() + } } } diff --git a/tests/worker/abstract-worker.test.js b/tests/worker/abstract-worker.test.js index 46370777..9efacab5 100644 --- a/tests/worker/abstract-worker.test.js +++ b/tests/worker/abstract-worker.test.js @@ -132,7 +132,12 @@ describe('Abstract worker test suite', () => { killHandler: sinon.stub().returns() }) worker.isMain = false + worker.getMainWorker = sinon.stub().returns({ + id: 1, + send: sinon.stub().returns() + }) worker.handleKillMessage() + expect(worker.getMainWorker().send.calledOnce).toBe(true) expect(worker.opts.killHandler.calledOnce).toBe(true) }) -- 2.34.1