From 81c02522f2c5538a8e5ce341678724132d4da8ba Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Fri, 21 Jul 2023 20:29:08 +0200 Subject: [PATCH] fix: ensure worker node destroy semantic is always the same MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- src/pools/abstract-pool.ts | 34 +++++++--------------------------- src/pools/cluster/fixed.ts | 12 ++++++++++-- src/pools/thread/fixed.ts | 8 ++++++++ 3 files changed, 25 insertions(+), 29 deletions(-) diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 56dd7fa9..7c71c855 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -10,7 +10,6 @@ import { DEFAULT_TASK_NAME, DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, EMPTY_FUNCTION, - isAsyncFunction, isKillBehavior, isPlainObject, median, @@ -659,16 +658,8 @@ export abstract class AbstractPool< /** @inheritDoc */ public async destroy (): Promise { await Promise.all( - this.workerNodes.map(async (workerNode, workerNodeKey) => { - this.flushTasksQueue(workerNodeKey) - // FIXME: wait for tasks to be finished - const workerExitPromise = new Promise(resolve => { - workerNode.worker.on('exit', () => { - resolve() - }) - }) + this.workerNodes.map(async (_, workerNodeKey) => { await this.destroyWorkerNode(workerNodeKey) - await workerExitPromise }) ) } @@ -678,9 +669,7 @@ export abstract class AbstractPool< * * @param workerNodeKey - The worker node key. */ - protected abstract destroyWorkerNode ( - workerNodeKey: number - ): void | Promise + protected abstract destroyWorkerNode (workerNodeKey: number): Promise /** * Setup hook to execute code before worker nodes are created in the abstract constructor. @@ -910,6 +899,7 @@ export abstract class AbstractPool< message.workerId ) const workerUsage = this.workerNodes[localWorkerNodeKey].usage + // Kill message received from worker if ( isKillBehavior(KillBehaviors.HARD, message.kill) || (message.kill != null && @@ -919,17 +909,7 @@ export abstract class AbstractPool< workerUsage.tasks.executing === 0 && this.tasksQueueSize(localWorkerNodeKey) === 0))) ) { - // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime) - const destroyWorkerNodeBounded = this.destroyWorkerNode.bind(this) - if (isAsyncFunction(destroyWorkerNodeBounded)) { - ( - destroyWorkerNodeBounded as (workerNodeKey: number) => Promise - )(localWorkerNodeKey).catch(EMPTY_FUNCTION) - } else { - (destroyWorkerNodeBounded as (workerNodeKey: number) => void)( - localWorkerNodeKey - ) - } + this.destroyWorkerNode(localWorkerNodeKey).catch(EMPTY_FUNCTION) } }) const workerInfo = this.getWorkerInfo(workerNodeKey) @@ -1050,10 +1030,10 @@ export abstract class AbstractPool< return message => { this.checkMessageWorkerId(message) if (message.ready != null) { - // Worker ready response received + // Worker ready response received from worker this.handleWorkerReadyResponse(message) } else if (message.id != null) { - // Task execution response received + // Task execution response received from worker this.handleTaskExecutionResponse(message) } } @@ -1173,7 +1153,7 @@ export abstract class AbstractPool< return this.workerNodes[workerNodeKey].tasksQueueSize() } - private flushTasksQueue (workerNodeKey: number): void { + protected flushTasksQueue (workerNodeKey: number): void { while (this.tasksQueueSize(workerNodeKey) > 0) { this.executeTask( workerNodeKey, diff --git a/src/pools/cluster/fixed.ts b/src/pools/cluster/fixed.ts index eadbfcfe..8e264359 100644 --- a/src/pools/cluster/fixed.ts +++ b/src/pools/cluster/fixed.ts @@ -60,13 +60,21 @@ export class FixedClusterPool< } /** @inheritDoc */ - protected destroyWorkerNode (workerNodeKey: number): void { + protected async destroyWorkerNode (workerNodeKey: number): Promise { + this.flushTasksQueue(workerNodeKey) + // FIXME: wait for tasks to be finished const worker = this.workerNodes[workerNodeKey].worker - this.sendToWorker(workerNodeKey, { kill: true, workerId: worker.id }) + const workerExitPromise = new Promise(resolve => { + worker.on('exit', () => { + resolve() + }) + }) worker.on('disconnect', () => { worker.kill() }) + this.sendToWorker(workerNodeKey, { kill: true, workerId: worker.id }) worker.disconnect() + await workerExitPromise } /** @inheritDoc */ diff --git a/src/pools/thread/fixed.ts b/src/pools/thread/fixed.ts index f4f712df..a575d097 100644 --- a/src/pools/thread/fixed.ts +++ b/src/pools/thread/fixed.ts @@ -57,11 +57,19 @@ export class FixedThreadPool< /** @inheritDoc */ protected async destroyWorkerNode (workerNodeKey: number): Promise { + this.flushTasksQueue(workerNodeKey) + // FIXME: wait for tasks to be finished const workerNode = this.workerNodes[workerNodeKey] const worker = workerNode.worker + const workerExitPromise = new Promise(resolve => { + worker.on('exit', () => { + resolve() + }) + }) this.sendToWorker(workerNodeKey, { kill: true, workerId: worker.threadId }) workerNode.closeChannel() await worker.terminate() + await workerExitPromise } /** @inheritDoc */ -- 2.34.1