From: Jérôme Benoit Date: Fri, 18 Aug 2023 16:59:13 +0000 (+0200) Subject: fix: fix back pressure event emission semantic X-Git-Tag: v2.6.29~10 X-Git-Url: https://git.piment-noir.org/?a=commitdiff_plain;h=9e8442454c11d9fba371dee3deadc9c26a49a335;p=poolifier.git fix: fix back pressure event emission semantic Signed-off-by: Jérôme Benoit --- diff --git a/CHANGELOG.md b/CHANGELOG.md index 3fb91510..74aac882 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,7 +15,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added -- Add back pressure detection on the worker node queue. Event `backPressure` is emitted when the worker node queue is full (size > poolMaxSize^2). +- Add back pressure detection on the worker node queue. Event `backPressure` is emitted all worker node queues are full (size > poolMaxSize^2). - Use back pressure detection in worker choice strategies. - Add worker choice strategies retries mechanism if no worker is eligible. diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 06d09e6b..53aad1fd 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -1270,13 +1270,19 @@ export abstract class AbstractPool< /** @inheritDoc */ public hasWorkerNodeBackPressure (workerNodeKey: number): boolean { - if ( + return ( this.opts.enableTasksQueue === true && this.workerNodes[workerNodeKey].hasBackPressure() - ) { - return true - } - return false + ) + } + + private hasBackPressure (): boolean { + return ( + this.opts.enableTasksQueue === true && + this.workerNodes.findIndex( + (workerNode) => !workerNode.hasBackPressure() + ) !== -1 + ) } /** @@ -1292,11 +1298,8 @@ export abstract class AbstractPool< private enqueueTask (workerNodeKey: number, task: Task): number { const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task) - if (this.hasWorkerNodeBackPressure(workerNodeKey)) { - this.emitter?.emit(PoolEvents.backPressure, { - workerId: this.getWorkerInfo(workerNodeKey).id, - ...this.info - }) + if (this.hasBackPressure()) { + this.emitter?.emit(PoolEvents.backPressure, this.info) } return tasksQueueSize } diff --git a/src/pools/pool.ts b/src/pools/pool.ts index c7b878ae..427dfd1c 100644 --- a/src/pools/pool.ts +++ b/src/pools/pool.ts @@ -201,7 +201,7 @@ export interface IPool< * - '`destroy`': Emitted when the pool is destroyed. * - `'error'`: Emitted when an uncaught error occurs. * - `'taskError'`: Emitted when an error occurs while executing a task. - * - `'backPressure'`: Emitted when a worker node has back pressure (i.e. its tasks queue is full). + * - `'backPressure'`: Emitted when all worker nodes have back pressure (i.e. their tasks queue is full). */ readonly emitter?: PoolEmitter /**