From 227e9e9b75b8e004dd7651af5961490e58ffdda9 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Tue, 27 Aug 2024 15:47:38 +0200 Subject: [PATCH] fix: fix pool back pressure semantic on dynamic pool MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- src/pools/abstract-pool.ts | 47 +++++++++++++++++++----------- src/pools/cluster/dynamic.ts | 5 ++++ src/pools/cluster/fixed.ts | 5 ++++ src/pools/pool.ts | 2 +- src/pools/thread/dynamic.ts | 5 ++++ src/pools/thread/fixed.ts | 5 ++++ src/pools/worker-node.ts | 4 +-- src/pools/worker.ts | 2 +- tests/pools/abstract-pool.test.mjs | 5 ++-- 9 files changed, 57 insertions(+), 23 deletions(-) diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 15d7076f..339e22cb 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -229,7 +229,7 @@ export abstract class AbstractPool< ): void => { if ( this.cannotStealTask() || - this.hasBackPressure() || + this.backPressure || (this.info.stealingWorkerNodes ?? 0) > Math.round( this.workerNodes.length * @@ -794,6 +794,21 @@ export abstract class AbstractPool< return this.workerNodes[workerNodeKey]?.info } + /** + * Whether the worker nodes are back pressured or not. + * @returns Worker nodes back pressure boolean status. + */ + protected internalBackPressure (): boolean { + return ( + this.opts.enableTasksQueue === true && + this.workerNodes.reduce( + (accumulator, workerNode) => + workerNode.info.backPressure ? accumulator + 1 : accumulator, + 0 + ) === this.workerNodes.length + ) + } + /** * Whether worker nodes are executing concurrently their tasks quota or not. * @returns Worker nodes busyness boolean status. @@ -802,9 +817,9 @@ export abstract class AbstractPool< return ( this.workerNodes.reduce( (accumulator, _, workerNodeKey) => - this.isWorkerNodeIdle(workerNodeKey) ? accumulator + 1 : accumulator, + this.isWorkerNodeBusy(workerNodeKey) ? accumulator + 1 : accumulator, 0 - ) === 0 + ) === this.workerNodes.length ) } @@ -921,7 +936,7 @@ export abstract class AbstractPool< } private checkAndEmitTaskQueuingEvents (): void { - if (this.hasBackPressure()) { + if (this.backPressure) { this.emitter?.emit(PoolEvents.backPressure, this.info) } } @@ -1187,17 +1202,6 @@ export abstract class AbstractPool< this.checkAndEmitReadyEvent() } - private hasBackPressure (): boolean { - return ( - this.opts.enableTasksQueue === true && - this.workerNodes.reduce( - (accumulator, workerNode) => - workerNode.info.backPressure ? accumulator + 1 : accumulator, - 0 - ) === this.workerNodes.length - ) - } - private initEventEmitter (): void { this.emitter = new EventEmitterAsyncResource({ name: `poolifier:${this.type}-${this.worker}-pool`, @@ -1948,6 +1952,12 @@ export abstract class AbstractPool< this.started = true } + /** + * Whether the pool is back pressured or not. + * @returns The pool back pressure boolean status. + */ + protected abstract get backPressure (): boolean + /** * Whether the pool is busy or not. * @returns The pool busyness boolean status. @@ -1959,7 +1969,10 @@ export abstract class AbstractPool< * @returns The pool emptiness boolean status. */ protected get empty (): boolean { - return this.minimumNumberOfWorkers === 0 && this.workerNodes.length === 0 + return ( + this.minimumNumberOfWorkers === 0 && + this.workerNodes.length === this.minimumNumberOfWorkers + ) } /** @@ -2045,7 +2058,7 @@ export abstract class AbstractPool< ), }), ...(this.opts.enableTasksQueue === true && { - backPressure: this.hasBackPressure(), + backPressure: this.backPressure, }), ...(this.opts.enableTasksQueue === true && { stolenTasks: this.workerNodes.reduce( diff --git a/src/pools/cluster/dynamic.ts b/src/pools/cluster/dynamic.ts index f13ebf64..bb9cf8d9 100644 --- a/src/pools/cluster/dynamic.ts +++ b/src/pools/cluster/dynamic.ts @@ -48,6 +48,11 @@ export class DynamicClusterPool< return (!this.full && this.internalBusy()) || this.empty } + /** @inheritDoc */ + protected get backPressure (): boolean { + return this.full && this.internalBackPressure() + } + /** @inheritDoc */ protected get busy (): boolean { return this.full && this.internalBusy() diff --git a/src/pools/cluster/fixed.ts b/src/pools/cluster/fixed.ts index 2a12e080..0de2ae63 100644 --- a/src/pools/cluster/fixed.ts +++ b/src/pools/cluster/fixed.ts @@ -100,6 +100,11 @@ export class FixedClusterPool< return false } + /** @inheritDoc */ + protected get backPressure (): boolean { + return this.internalBackPressure() + } + /** @inheritDoc */ protected get busy (): boolean { return this.internalBusy() diff --git a/src/pools/pool.ts b/src/pools/pool.ts index a9493f9a..e4cec742 100644 --- a/src/pools/pool.ts +++ b/src/pools/pool.ts @@ -286,7 +286,7 @@ export interface IPool< * - `'destroy'`: Emitted when the pool is destroyed. * - `'error'`: Emitted when an uncaught error occurs. * - `'taskError'`: Emitted when an error occurs while executing a task. - * - `'backPressure'`: Emitted when all worker nodes have back pressure (i.e. their tasks queue is full: queue size \>= maximum queue size). + * - `'backPressure'`: Emitted when the number of workers created in the pool has reached the maximum size expected and are back pressured (i.e. their tasks queue is full: queue size \>= maximum queue size). */ readonly emitter?: EventEmitterAsyncResource /** diff --git a/src/pools/thread/dynamic.ts b/src/pools/thread/dynamic.ts index fc4115be..cbf06130 100644 --- a/src/pools/thread/dynamic.ts +++ b/src/pools/thread/dynamic.ts @@ -48,6 +48,11 @@ export class DynamicThreadPool< return (!this.full && this.internalBusy()) || this.empty } + /** @inheritDoc */ + protected get backPressure (): boolean { + return this.full && this.internalBackPressure() + } + /** @inheritDoc */ protected get busy (): boolean { return this.full && this.internalBusy() diff --git a/src/pools/thread/fixed.ts b/src/pools/thread/fixed.ts index f65a0cd5..9b89b3fe 100644 --- a/src/pools/thread/fixed.ts +++ b/src/pools/thread/fixed.ts @@ -120,6 +120,11 @@ export class FixedThreadPool< return false } + /** @inheritDoc */ + protected get backPressure (): boolean { + return this.internalBackPressure() + } + /** @inheritDoc */ protected get busy (): boolean { return this.internalBusy() diff --git a/src/pools/worker-node.ts b/src/pools/worker-node.ts index 867640f4..1a1f207b 100644 --- a/src/pools/worker-node.ts +++ b/src/pools/worker-node.ts @@ -88,8 +88,8 @@ export class WorkerNode } /** - * Whether the worker node has back pressure (i.e. its tasks queue is full). - * @returns `true` if the worker node has back pressure, `false` otherwise. + * Whether the worker node is back pressured or not. + * @returns `true` if the worker node is back pressured, `false` otherwise. */ private hasBackPressure (): boolean { return this.tasksQueue.size >= this.tasksQueueBackPressureSize diff --git a/src/pools/worker.ts b/src/pools/worker.ts index 3c9c6580..ff8aa620 100644 --- a/src/pools/worker.ts +++ b/src/pools/worker.ts @@ -149,7 +149,7 @@ export type WorkerType = keyof typeof WorkerTypes export interface WorkerInfo { /** * Back pressure flag. - * This flag is set to `true` when worker node tasks queue has back pressure. + * This flag is set to `true` when worker node tasks queue is back pressured. */ backPressure: boolean /** diff --git a/tests/pools/abstract-pool.test.mjs b/tests/pools/abstract-pool.test.mjs index f3e544ff..f236a804 100644 --- a/tests/pools/abstract-pool.test.mjs +++ b/tests/pools/abstract-pool.test.mjs @@ -1239,7 +1239,8 @@ describe('Abstract pool test suite', () => { enableTasksQueue: true, } ) - stub(pool, 'hasBackPressure').returns(true) + const backPressureGetterStub = stub().returns(true) + stub(pool, 'backPressure').get(backPressureGetterStub) expect(pool.emitter.eventNames()).toStrictEqual([]) const promises = new Set() let poolBackPressure = 0 @@ -1277,7 +1278,7 @@ describe('Abstract pool test suite', () => { worker: WorkerTypes.thread, workerNodes: expect.any(Number), }) - expect(pool.hasBackPressure.callCount).toBeGreaterThanOrEqual(7) + expect(backPressureGetterStub.callCount).toBeGreaterThanOrEqual(7) await pool.destroy() }) -- 2.34.1