From 3f09ed9f86ed44b7551d160452b80b978d14e8e3 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Thu, 20 Jul 2023 01:48:21 +0200 Subject: [PATCH] refactor: factor out worker communication channel closing MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- src/pools/abstract-pool.ts | 11 +++++------ src/pools/thread/fixed.ts | 4 +--- src/pools/worker-node.ts | 9 +++++++++ src/pools/worker.ts | 4 ++++ 4 files changed, 19 insertions(+), 9 deletions(-) diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index be1402a7..93023794 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -516,7 +516,7 @@ export abstract class AbstractPool< * @param worker - The worker. * @returns The worker node key if found in the pool worker nodes, `-1` otherwise. */ - private getWorkerNodeKey (worker: Worker): number { + protected getWorkerNodeKey (worker: Worker): number { return this.workerNodes.findIndex( workerNode => workerNode.worker === worker ) @@ -877,11 +877,7 @@ export abstract class AbstractPool< worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION) worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION) worker.once('exit', () => { - const workerInfo = this.getWorkerInfoByWorker(worker) - if (workerInfo.messageChannel != null) { - workerInfo.messageChannel?.port1.close() - workerInfo.messageChannel?.port1.close() - } + this.workerNodes[this.getWorkerNodeKey(worker)].closeChannel() this.removeWorkerNode(worker) }) @@ -1055,6 +1051,7 @@ export abstract class AbstractPool< * Gets the worker information from the given worker node key. * * @param workerNodeKey - The worker node key. + * @returns The worker information. */ private getWorkerInfo (workerNodeKey: number): WorkerInfo { return this.workerNodes[workerNodeKey].info @@ -1064,6 +1061,8 @@ export abstract class AbstractPool< * Gets the worker information from the given worker. * * @param worker - The worker. + * @returns The worker information. + * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker is not found. */ protected getWorkerInfoByWorker (worker: Worker): WorkerInfo { const workerNodeKey = this.getWorkerNodeKey(worker) diff --git a/src/pools/thread/fixed.ts b/src/pools/thread/fixed.ts index e3a94b32..bef82826 100644 --- a/src/pools/thread/fixed.ts +++ b/src/pools/thread/fixed.ts @@ -58,9 +58,7 @@ export class FixedThreadPool< /** @inheritDoc */ protected async destroyWorker (worker: Worker): Promise { this.sendToWorker(worker, { kill: true, workerId: worker.threadId }) - const workerInfo = this.getWorkerInfoByWorker(worker) - workerInfo.messageChannel?.port1.close() - workerInfo.messageChannel?.port2.close() + this.workerNodes[this.getWorkerNodeKey(worker)].closeChannel() await worker.terminate() } diff --git a/src/pools/worker-node.ts b/src/pools/worker-node.ts index 2c39393b..fcc09137 100644 --- a/src/pools/worker-node.ts +++ b/src/pools/worker-node.ts @@ -74,6 +74,15 @@ implements IWorkerNode { this.tasksUsage.clear() } + /** @inheritdoc */ + public closeChannel (): void { + if (this.info.messageChannel != null) { + this.info.messageChannel?.port1.close() + this.info.messageChannel?.port2.close() + delete this.info.messageChannel + } + } + /** @inheritdoc */ public getTaskWorkerUsage (name: string): WorkerUsage | undefined { if (!this.tasksUsage.has(name)) { diff --git a/src/pools/worker.ts b/src/pools/worker.ts index b7cd7f67..f1c29b36 100644 --- a/src/pools/worker.ts +++ b/src/pools/worker.ts @@ -242,6 +242,10 @@ export interface IWorkerNode { * Resets usage statistics . */ readonly resetUsage: () => void + /** + * Close communication channel. + */ + readonly closeChannel: () => void /** * Gets task worker usage statistics. */ -- 2.34.1