From 07e0c9e591f9fa6715ba94e52c647b7ee3d2b9c7 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Sat, 16 Dec 2023 10:39:35 +0100 Subject: [PATCH] refactor: factor out worker node termination code MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- src/pools/abstract-pool.ts | 20 ++++++++++++++------ src/pools/cluster/fixed.ts | 20 -------------------- src/pools/thread/fixed.ts | 18 ------------------ src/pools/worker-node.ts | 33 ++++++++++++++++++++++++++------- src/pools/worker.ts | 24 ++++++++++++++++++++---- 5 files changed, 60 insertions(+), 55 deletions(-) diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 56be9a1a..28297985 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -1043,7 +1043,14 @@ export abstract class AbstractPool< * * @param workerNodeKey - The worker node key. */ - protected abstract destroyWorkerNode (workerNodeKey: number): Promise + protected async destroyWorkerNode (workerNodeKey: number): Promise { + this.flagWorkerNodeAsNotReady(workerNodeKey) + this.flushTasksQueue(workerNodeKey) + // FIXME: wait for tasks to be finished + const workerNode = this.workerNodes[workerNodeKey] + await this.sendKillMessageToWorker(workerNodeKey) + await workerNode.terminate() + } /** * Setup hook to execute code before worker nodes are created in the abstract constructor. @@ -1279,17 +1286,15 @@ export abstract class AbstractPool< ) workerNode.registerWorkerEventHandler('error', (error: Error) => { const workerNodeKey = this.getWorkerNodeKeyByWorker(workerNode.worker) - this.flagWorkerNodeAsNotReady(workerNodeKey) - const workerInfo = this.getWorkerInfo(workerNodeKey) + workerNode.info.ready = false this.emitter?.emit(PoolEvents.error, error) - this.workerNodes[workerNodeKey].closeChannel() if ( this.started && !this.starting && !this.destroying && this.opts.restartWorkerOnError === true ) { - if (workerInfo.dynamic) { + if (workerNode.info.dynamic) { this.createAndSetupDynamicWorkerNode() } else { this.createAndSetupWorkerNode() @@ -1298,6 +1303,9 @@ export abstract class AbstractPool< if (this.started && this.opts.enableTasksQueue === true) { this.redistributeQueuedTasks(workerNodeKey) } + workerNode.terminate().catch(error => { + this.emitter?.emit(PoolEvents.error, error) + }) }) workerNode.registerWorkerEventHandler( 'exit', @@ -1846,7 +1854,7 @@ export abstract class AbstractPool< } /** - * Removes the worker node associated to the give given worker from the pool worker nodes. + * Removes the worker node associated to the given worker from the pool worker nodes. * * @param worker - The worker. */ diff --git a/src/pools/cluster/fixed.ts b/src/pools/cluster/fixed.ts index f3fb54bb..46cabdcb 100644 --- a/src/pools/cluster/fixed.ts +++ b/src/pools/cluster/fixed.ts @@ -41,26 +41,6 @@ export class FixedClusterPool< return cluster.isPrimary } - /** @inheritDoc */ - protected async destroyWorkerNode (workerNodeKey: number): Promise { - this.flagWorkerNodeAsNotReady(workerNodeKey) - this.flushTasksQueue(workerNodeKey) - // FIXME: wait for tasks to be finished - const workerNode = this.workerNodes[workerNodeKey] - const waitWorkerExit = new Promise(resolve => { - workerNode.registerOnceWorkerEventHandler('exit', () => { - resolve() - }) - }) - workerNode.registerOnceWorkerEventHandler('disconnect', () => { - workerNode.worker.kill() - }) - await this.sendKillMessageToWorker(workerNodeKey) - workerNode.removeAllListeners() - workerNode.worker.disconnect() - await waitWorkerExit - } - /** @inheritDoc */ protected sendToWorker ( workerNodeKey: number, diff --git a/src/pools/thread/fixed.ts b/src/pools/thread/fixed.ts index e5074e39..19783027 100644 --- a/src/pools/thread/fixed.ts +++ b/src/pools/thread/fixed.ts @@ -42,24 +42,6 @@ export class FixedThreadPool< return isMainThread } - /** @inheritDoc */ - protected async destroyWorkerNode (workerNodeKey: number): Promise { - this.flagWorkerNodeAsNotReady(workerNodeKey) - this.flushTasksQueue(workerNodeKey) - // FIXME: wait for tasks to be finished - const workerNode = this.workerNodes[workerNodeKey] - const waitWorkerExit = new Promise(resolve => { - workerNode.registerOnceWorkerEventHandler('exit', () => { - resolve() - }) - }) - await this.sendKillMessageToWorker(workerNodeKey) - workerNode.closeChannel() - workerNode.removeAllListeners() - await workerNode.worker.terminate() - await waitWorkerExit - } - /** @inheritDoc */ protected sendToWorker ( workerNodeKey: number, diff --git a/src/pools/worker-node.ts b/src/pools/worker-node.ts index 9a98458e..7a6a1500 100644 --- a/src/pools/worker-node.ts +++ b/src/pools/worker-node.ts @@ -124,14 +124,23 @@ export class WorkerNode } /** @inheritdoc */ - public closeChannel (): void { - if (this.messageChannel != null) { - this.messageChannel.port1.unref() - this.messageChannel.port2.unref() - this.messageChannel.port1.close() - this.messageChannel.port2.close() - delete this.messageChannel + public async terminate (): Promise { + const waitWorkerExit = new Promise(resolve => { + this.registerOnceWorkerEventHandler('exit', () => { + resolve() + }) + }) + this.closeMessageChannel() + this.removeAllListeners() + if (this.info.type === WorkerTypes.thread) { + await this.worker.terminate?.() + } else if (this.info.type === WorkerTypes.cluster) { + this.registerOnceWorkerEventHandler('disconnect', () => { + this.worker.kill?.() + }) + this.worker.disconnect?.() } + await waitWorkerExit } /** @inheritdoc */ @@ -187,6 +196,16 @@ export class WorkerNode return this.taskFunctionsUsage.delete(name) } + private closeMessageChannel (): void { + if (this.messageChannel != null) { + this.messageChannel.port1.unref() + this.messageChannel.port2.unref() + this.messageChannel.port1.close() + this.messageChannel.port2.close() + delete this.messageChannel + } + } + private initWorkerInfo (worker: Worker): WorkerInfo { return { id: getWorkerId(worker), diff --git a/src/pools/worker.ts b/src/pools/worker.ts index 1a94ef60..da4cf172 100644 --- a/src/pools/worker.ts +++ b/src/pools/worker.ts @@ -198,9 +198,12 @@ export interface StrategyData { */ export interface IWorker { /** - * Worker id. + * Cluster worker id. */ readonly id?: number + /** + * Worker thread worker id. + */ readonly threadId?: number /** * Registers an event listener. @@ -230,6 +233,19 @@ export interface IWorker { | ErrorHandler | ExitHandler ) => void + /** + * Stop all JavaScript execution in the worker thread as soon as possible. + * Returns a Promise for the exit code that is fulfilled when the `'exit' event` is emitted. + */ + readonly terminate?: () => Promise + /** + * Cluster worker disconnect. + */ + readonly disconnect?: () => void + /** + * Cluster worker kill. + */ + readonly kill?: (signal?: string) => void } /** @@ -270,7 +286,7 @@ export interface IWorkerNode */ strategyData?: StrategyData /** - * Message channel (worker_threads only). + * Message channel (worker thread only). */ readonly messageChannel?: MessageChannel /** @@ -325,9 +341,9 @@ export interface IWorkerNode */ readonly resetUsage: () => void /** - * Closes communication channel. + * Terminates the worker node. */ - readonly closeChannel: () => void + readonly terminate: () => Promise /** * Registers a worker event handler. * -- 2.34.1