From: Jérôme Benoit Date: Wed, 20 Sep 2023 16:14:36 +0000 (+0200) Subject: fix: unregister worker callbacks after usage X-Git-Tag: v2.7.1~9 X-Git-Url: https://git.piment-noir.org/?a=commitdiff_plain;h=ae036c3e73796126b7f1138129b6b18ef6bcef8c;p=poolifier.git fix: unregister worker callbacks after usage Signed-off-by: Jérôme Benoit --- diff --git a/.eslintrc.js b/.eslintrc.js index 97f787ae..56484916 100644 --- a/.eslintrc.js +++ b/.eslintrc.js @@ -94,7 +94,6 @@ module.exports = defineConfig({ 'typedoc', 'unlink', 'unref', - 'unregister', 'utf8', 'webcrypto', 'workerpool', diff --git a/CHANGELOG.md b/CHANGELOG.md index 083f8179..7b37a041 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed + +- Ensure worker message listener used one time are removed after usage. + ## [2.7.0] - 2023-09-19 ### Fixed diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 52f9c0e3..88e5fe79 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -30,12 +30,12 @@ import { PoolTypes, type TasksQueueOptions } from './pool' -import type { - IWorker, - IWorkerNode, - WorkerInfo, - WorkerType, - WorkerUsage +import { + type IWorker, + type IWorkerNode, + type WorkerInfo, + type WorkerType, + type WorkerUsage } from './worker' import { type MeasurementStatisticsRequirements, @@ -682,28 +682,38 @@ export abstract class AbstractPool< message: MessageValue ): Promise { return await new Promise((resolve, reject) => { - const workerId = this.getWorkerInfo(workerNodeKey).id as number - this.registerWorkerMessageListener(workerNodeKey, message => { + const taskFunctionOperationListener = ( + message: MessageValue + ): void => { + this.checkMessageWorkerId(message) + const workerId = this.getWorkerInfo(workerNodeKey).id as number if ( - message.workerId === workerId && - message.taskFunctionOperationStatus === true - ) { - resolve(true) - } else if ( - message.workerId === workerId && - message.taskFunctionOperationStatus === false + message.taskFunctionOperationStatus != null && + message.workerId === workerId ) { - reject( - new Error( - `Task function operation '${ - message.taskFunctionOperation as string - }' failed on worker ${message.workerId} with error: '${ - message.workerError?.message as string - }'` + if (message.taskFunctionOperationStatus) { + resolve(true) + } else if (!message.taskFunctionOperationStatus) { + reject( + new Error( + `Task function operation '${ + message.taskFunctionOperation as string + }' failed on worker ${message.workerId} with error: '${ + message.workerError?.message as string + }'` + ) ) + } + this.deregisterWorkerMessageListener( + this.getWorkerNodeKeyByWorkerId(message.workerId), + taskFunctionOperationListener ) } - }) + } + this.registerWorkerMessageListener( + workerNodeKey, + taskFunctionOperationListener + ) this.sendToWorker(workerNodeKey, message) }) } @@ -712,20 +722,21 @@ export abstract class AbstractPool< message: MessageValue ): Promise { return await new Promise((resolve, reject) => { - const responsesReceived = new Array>() - for (const [workerNodeKey] of this.workerNodes.entries()) { - this.registerWorkerMessageListener(workerNodeKey, message => { - if (message.taskFunctionOperationStatus != null) { - responsesReceived.push(message) + const responsesReceived = new Array>() + const taskFunctionOperationsListener = ( + message: MessageValue + ): void => { + this.checkMessageWorkerId(message) + if (message.taskFunctionOperationStatus != null) { + responsesReceived.push(message) + if (responsesReceived.length === this.workerNodes.length) { if ( - responsesReceived.length === this.workerNodes.length && responsesReceived.every( message => message.taskFunctionOperationStatus === true ) ) { resolve(true) } else if ( - responsesReceived.length === this.workerNodes.length && responsesReceived.some( message => message.taskFunctionOperationStatus === false ) @@ -745,8 +756,18 @@ export abstract class AbstractPool< ) ) } + this.deregisterWorkerMessageListener( + this.getWorkerNodeKeyByWorkerId(message.workerId), + taskFunctionOperationsListener + ) } - }) + } + } + for (const [workerNodeKey] of this.workerNodes.entries()) { + this.registerWorkerMessageListener( + workerNodeKey, + taskFunctionOperationsListener + ) this.sendToWorker(workerNodeKey, message) } }) @@ -924,19 +945,21 @@ export abstract class AbstractPool< workerNodeKey: number ): Promise { await new Promise((resolve, reject) => { - this.registerWorkerMessageListener(workerNodeKey, message => { + const killMessageListener = (message: MessageValue): void => { + this.checkMessageWorkerId(message) if (message.kill === 'success') { resolve() } else if (message.kill === 'failure') { reject( new Error( - `Worker ${ + `Kill message handling failed on worker ${ message.workerId as number - } kill message handling failed` + }` ) ) } - }) + } + this.registerWorkerMessageListener(workerNodeKey, killMessageListener) this.sendToWorker(workerNodeKey, { kill: true }) }) } @@ -1278,6 +1301,32 @@ export abstract class AbstractPool< listener: (message: MessageValue) => void ): void + /** + * Registers once a listener callback on the worker given its worker node key. + * + * @param workerNodeKey - The worker node key. + * @param listener - The message listener callback. + */ + protected abstract registerOnceWorkerMessageListener< + Message extends Data | Response + >( + workerNodeKey: number, + listener: (message: MessageValue) => void + ): void + + /** + * Deregisters a listener callback on the worker given its worker node key. + * + * @param workerNodeKey - The worker node key. + * @param listener - The message listener callback. + */ + protected abstract deregisterWorkerMessageListener< + Message extends Data | Response + >( + workerNodeKey: number, + listener: (message: MessageValue) => void + ): void + /** * Method hooked up after a worker node has been newly created. * Can be overridden. diff --git a/src/pools/cluster/fixed.ts b/src/pools/cluster/fixed.ts index 9470cccd..d457b377 100644 --- a/src/pools/cluster/fixed.ts +++ b/src/pools/cluster/fixed.ts @@ -66,11 +66,11 @@ export class FixedClusterPool< const workerNode = this.workerNodes[workerNodeKey] const worker = workerNode.worker const waitWorkerExit = new Promise(resolve => { - worker.on('exit', () => { + worker.once('exit', () => { resolve() }) }) - worker.on('disconnect', () => { + worker.once('disconnect', () => { worker.kill() }) await this.sendKillMessageToWorker(workerNodeKey) @@ -104,6 +104,22 @@ export class FixedClusterPool< this.workerNodes[workerNodeKey].worker.on('message', listener) } + /** @inheritDoc */ + protected registerOnceWorkerMessageListener( + workerNodeKey: number, + listener: (message: MessageValue) => void + ): void { + this.workerNodes[workerNodeKey].worker.once('message', listener) + } + + /** @inheritDoc */ + protected deregisterWorkerMessageListener( + workerNodeKey: number, + listener: (message: MessageValue) => void + ): void { + this.workerNodes[workerNodeKey].worker.off('message', listener) + } + /** @inheritDoc */ protected createWorker (): Worker { return cluster.fork(this.opts.env) diff --git a/src/pools/thread/fixed.ts b/src/pools/thread/fixed.ts index 2ad94a7b..db278485 100644 --- a/src/pools/thread/fixed.ts +++ b/src/pools/thread/fixed.ts @@ -63,7 +63,7 @@ export class FixedThreadPool< const workerNode = this.workerNodes[workerNodeKey] const worker = workerNode.worker const waitWorkerExit = new Promise(resolve => { - worker.on('exit', () => { + worker.once('exit', () => { resolve() }) }) @@ -112,6 +112,26 @@ export class FixedThreadPool< ).port1.on('message', listener) } + /** @inheritDoc */ + protected registerOnceWorkerMessageListener( + workerNodeKey: number, + listener: (message: MessageValue) => void + ): void { + ( + this.workerNodes[workerNodeKey].messageChannel as MessageChannel + ).port1.once('message', listener) + } + + /** @inheritDoc */ + protected deregisterWorkerMessageListener( + workerNodeKey: number, + listener: (message: MessageValue) => void + ): void { + ( + this.workerNodes[workerNodeKey].messageChannel as MessageChannel + ).port1.off('message', listener) + } + /** @inheritDoc */ protected createWorker (): Worker { return new Worker(this.filePath, { diff --git a/src/worker/abstract-worker.ts b/src/worker/abstract-worker.ts index 311a56ad..b8e4b167 100644 --- a/src/worker/abstract-worker.ts +++ b/src/worker/abstract-worker.ts @@ -100,7 +100,7 @@ export abstract class AbstractWorker< this.checkTaskFunctions(taskFunctions) this.checkWorkerOptions(this.opts) if (!this.isMain) { - this.getMainWorker().on('message', this.handleReadyMessage.bind(this)) + this.getMainWorker().once('message', this.handleReadyMessage.bind(this)) } }