From e44639e9af74427b71f1556ff7ec9f7606373e0d Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Sun, 31 Dec 2023 21:19:24 +0100 Subject: [PATCH] fix: properly handle dynamic pool with zero minimum size MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- .c8rc.json | 4 +- CHANGELOG.md | 4 ++ src/pools/abstract-pool.ts | 44 +++++++++---------- src/pools/cluster/dynamic.ts | 5 ++- .../fair-share-worker-choice-strategy.ts | 3 ++ .../least-busy-worker-choice-strategy.ts | 3 ++ .../least-elu-worker-choice-strategy.ts | 3 ++ .../least-used-worker-choice-strategy.ts | 3 ++ .../worker-choice-strategy-context.ts | 14 +++--- src/pools/thread/dynamic.ts | 5 ++- tests/pools/abstract-pool.test.mjs | 1 + tests/pools/cluster/dynamic.test.mjs | 19 ++++++++ .../selection-strategies.test.mjs | 19 -------- .../worker-choice-strategy-context.test.mjs | 28 ------------ tests/pools/thread/dynamic.test.mjs | 19 ++++++++ 15 files changed, 90 insertions(+), 84 deletions(-) diff --git a/.c8rc.json b/.c8rc.json index 4ef7222d..b09dc7b8 100644 --- a/.c8rc.json +++ b/.c8rc.json @@ -2,6 +2,6 @@ "check-coverage": true, "lines": 90, "statements": 90, - "functions": 92, - "branches": 92 + "functions": 90, + "branches": 90 } diff --git a/CHANGELOG.md b/CHANGELOG.md index ac5116c4..ca04a2e1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed + +- Properly handle dynamic pool with zero minimum size. + ## [3.1.13] - 2023-12-30 ### Changed diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index b6f3a5d0..536f4a03 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -607,18 +607,15 @@ export abstract class AbstractPool< private setTaskStealing (): void { for (const [workerNodeKey] of this.workerNodes.entries()) { - this.workerNodes[workerNodeKey].on( - 'idleWorkerNode', - this.handleIdleWorkerNodeEvent - ) + this.workerNodes[workerNodeKey].on('idle', this.handleWorkerNodeIdleEvent) } } private unsetTaskStealing (): void { for (const [workerNodeKey] of this.workerNodes.entries()) { this.workerNodes[workerNodeKey].off( - 'idleWorkerNode', - this.handleIdleWorkerNodeEvent + 'idle', + this.handleWorkerNodeIdleEvent ) } } @@ -627,7 +624,7 @@ export abstract class AbstractPool< for (const [workerNodeKey] of this.workerNodes.entries()) { this.workerNodes[workerNodeKey].on( 'backPressure', - this.handleBackPressureEvent + this.handleWorkerNodeBackPressureEvent ) } } @@ -636,7 +633,7 @@ export abstract class AbstractPool< for (const [workerNodeKey] of this.workerNodes.entries()) { this.workerNodes[workerNodeKey].off( 'backPressure', - this.handleBackPressureEvent + this.handleWorkerNodeBackPressureEvent ) } } @@ -1298,7 +1295,6 @@ export abstract class AbstractPool< }) } }) - const workerInfo = this.getWorkerInfo(workerNodeKey) this.sendToWorker(workerNodeKey, { checkActive: true }) @@ -1313,12 +1309,13 @@ export abstract class AbstractPool< }) } } - workerInfo.dynamic = true + const workerNode = this.workerNodes[workerNodeKey] + workerNode.info.dynamic = true if ( this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady || this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage ) { - workerInfo.ready = true + workerNode.info.ready = true } this.checkAndEmitDynamicWorkerCreationEvents() return workerNodeKey @@ -1382,14 +1379,14 @@ export abstract class AbstractPool< if (this.opts.enableTasksQueue === true) { if (this.opts.tasksQueueOptions?.taskStealing === true) { this.workerNodes[workerNodeKey].on( - 'idleWorkerNode', - this.handleIdleWorkerNodeEvent + 'idle', + this.handleWorkerNodeIdleEvent ) } if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) { this.workerNodes[workerNodeKey].on( 'backPressure', - this.handleBackPressureEvent + this.handleWorkerNodeBackPressureEvent ) } } @@ -1523,7 +1520,7 @@ export abstract class AbstractPool< } } - private readonly handleIdleWorkerNodeEvent = ( + private readonly handleWorkerNodeIdleEvent = ( eventDetail: WorkerNodeEventDetail, previousStolenTask?: Task ): void => { @@ -1594,7 +1591,7 @@ export abstract class AbstractPool< } sleep(exponentialDelay(workerNodeTasksUsage.sequentiallyStolen)) .then(() => { - this.handleIdleWorkerNodeEvent(eventDetail, stolenTask) + this.handleWorkerNodeIdleEvent(eventDetail, stolenTask) return undefined }) .catch(EMPTY_FUNCTION) @@ -1627,7 +1624,7 @@ export abstract class AbstractPool< } } - private readonly handleBackPressureEvent = ( + private readonly handleWorkerNodeBackPressureEvent = ( eventDetail: WorkerNodeEventDetail ): void => { if ( @@ -1696,15 +1693,14 @@ export abstract class AbstractPool< private handleWorkerReadyResponse (message: MessageValue): void { const { workerId, ready, taskFunctionNames } = message - if (ready === false) { + if (ready == null || !ready) { // eslint-disable-next-line @typescript-eslint/no-non-null-assertion throw new Error(`Worker ${workerId!} failed to initialize`) } - const workerInfo = this.getWorkerInfo( - this.getWorkerNodeKeyByWorkerId(workerId) - ) - workerInfo.ready = ready as boolean - workerInfo.taskFunctionNames = taskFunctionNames + const workerNode = + this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)] + workerNode.info.ready = ready + workerNode.info.taskFunctionNames = taskFunctionNames if (!this.readyEventEmitted && this.ready) { this.readyEventEmitted = true this.emitter?.emit(PoolEvents.ready, this.info) @@ -1753,7 +1749,7 @@ export abstract class AbstractPool< this.tasksQueueSize(workerNodeKey) === 0 && workerNodeTasksUsage.sequentiallyStolen === 0 ) { - workerNode.emit('idleWorkerNode', { + workerNode.emit('idle', { // eslint-disable-next-line @typescript-eslint/no-non-null-assertion workerId: workerId!, workerNodeKey diff --git a/src/pools/cluster/dynamic.ts b/src/pools/cluster/dynamic.ts index 8ef11c00..564dbf54 100644 --- a/src/pools/cluster/dynamic.ts +++ b/src/pools/cluster/dynamic.ts @@ -41,7 +41,10 @@ export class DynamicClusterPool< /** @inheritDoc */ protected shallCreateDynamicWorker (): boolean { - return !this.full && this.internalBusy() + return ( + (!this.full && this.internalBusy()) || + (this.minimumNumberOfWorkers === 0 && this.workerNodes.length === 0) + ) } /** @inheritDoc */ diff --git a/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts b/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts index 58982f19..f8746558 100644 --- a/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts @@ -78,6 +78,9 @@ export class FairShareWorkerChoiceStrategy< } private fairShareNextWorkerNodeKey (): number | undefined { + if (this.pool.workerNodes.length === 0) { + return undefined + } return this.pool.workerNodes.reduce( (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => { if (workerNode.strategyData?.virtualTaskEndTimestamp == null) { diff --git a/src/pools/selection-strategies/least-busy-worker-choice-strategy.ts b/src/pools/selection-strategies/least-busy-worker-choice-strategy.ts index 0f2e1239..5ef438f6 100644 --- a/src/pools/selection-strategies/least-busy-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/least-busy-worker-choice-strategy.ts @@ -69,6 +69,9 @@ export class LeastBusyWorkerChoiceStrategy< } private leastBusyNextWorkerNodeKey (): number | undefined { + if (this.pool.workerNodes.length === 0) { + return undefined + } return this.pool.workerNodes.reduce( (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => { return this.isWorkerNodeReady(workerNodeKey) && diff --git a/src/pools/selection-strategies/least-elu-worker-choice-strategy.ts b/src/pools/selection-strategies/least-elu-worker-choice-strategy.ts index 588c75d9..c3fe32d1 100644 --- a/src/pools/selection-strategies/least-elu-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/least-elu-worker-choice-strategy.ts @@ -65,6 +65,9 @@ export class LeastEluWorkerChoiceStrategy< } private leastEluNextWorkerNodeKey (): number | undefined { + if (this.pool.workerNodes.length === 0) { + return undefined + } return this.pool.workerNodes.reduce( (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => { return this.isWorkerNodeReady(workerNodeKey) && diff --git a/src/pools/selection-strategies/least-used-worker-choice-strategy.ts b/src/pools/selection-strategies/least-used-worker-choice-strategy.ts index 5e8a09b4..e45d5754 100644 --- a/src/pools/selection-strategies/least-used-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/least-used-worker-choice-strategy.ts @@ -51,6 +51,9 @@ export class LeastUsedWorkerChoiceStrategy< } private leastUsedNextWorkerNodeKey (): number | undefined { + if (this.pool.workerNodes.length === 0) { + return undefined + } return this.pool.workerNodes.reduce( (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => { return this.isWorkerNodeReady(workerNodeKey) && diff --git a/src/pools/selection-strategies/worker-choice-strategy-context.ts b/src/pools/selection-strategies/worker-choice-strategy-context.ts index 9e18fdc4..46e7ac64 100644 --- a/src/pools/selection-strategies/worker-choice-strategy-context.ts +++ b/src/pools/selection-strategies/worker-choice-strategy-context.ts @@ -169,14 +169,10 @@ export class WorkerChoiceStrategyContext< * @throws {@link https://nodejs.org/api/errors.html#class-error} If after configured retries the worker node key is null or undefined. */ public execute (): number { - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - const workerChoiceStrategy = this.workerChoiceStrategies.get( - this.workerChoiceStrategy - )! - if (!workerChoiceStrategy.hasPoolWorkerNodesReady()) { - return this.execute() - } - return this.executeStrategy(workerChoiceStrategy) + return this.executeStrategy( + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this.workerChoiceStrategies.get(this.workerChoiceStrategy)! + ) } /** @@ -184,7 +180,7 @@ export class WorkerChoiceStrategyContext< * * @param workerChoiceStrategy - The worker choice strategy. * @returns The key of the worker node. - * @throws {@link https://nodejs.org/api/errors.html#class-error} If after configured retries the worker node key is null or undefined. + * @throws {@link https://nodejs.org/api/errors.html#class-error} If after computed retries the worker node key is null or undefined. */ private executeStrategy (workerChoiceStrategy: IWorkerChoiceStrategy): number { let workerNodeKey: number | undefined diff --git a/src/pools/thread/dynamic.ts b/src/pools/thread/dynamic.ts index 2a7d0149..836cae0b 100644 --- a/src/pools/thread/dynamic.ts +++ b/src/pools/thread/dynamic.ts @@ -41,7 +41,10 @@ export class DynamicThreadPool< /** @inheritDoc */ protected shallCreateDynamicWorker (): boolean { - return !this.full && this.internalBusy() + return ( + (!this.full && this.internalBusy()) || + (this.minimumNumberOfWorkers === 0 && this.workerNodes.length === 0) + ) } /** @inheritDoc */ diff --git a/tests/pools/abstract-pool.test.mjs b/tests/pools/abstract-pool.test.mjs index b4b9d706..3fa8e2d6 100644 --- a/tests/pools/abstract-pool.test.mjs +++ b/tests/pools/abstract-pool.test.mjs @@ -221,6 +221,7 @@ describe('Abstract pool test suite', () => { './tests/worker-files/thread/testWorker.mjs' ) expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource) + expect(pool.emitter.eventNames()).toStrictEqual([]) expect(pool.opts).toStrictEqual({ startWorkers: true, enableEvents: true, diff --git a/tests/pools/cluster/dynamic.test.mjs b/tests/pools/cluster/dynamic.test.mjs index 8c9196d1..870a5f4d 100644 --- a/tests/pools/cluster/dynamic.test.mjs +++ b/tests/pools/cluster/dynamic.test.mjs @@ -152,4 +152,23 @@ describe('Dynamic cluster pool test suite', () => { // We need to clean up the resources after our test await pool.destroy() }) + + it.skip('Verify that a pool with zero worker works', async () => { + const pool = new DynamicClusterPool( + 0, + max, + './tests/worker-files/thread/testWorker.mjs' + ) + expect(pool.starting).toBe(false) + expect(pool.workerNodes.length).toBe(pool.info.minSize) + const maxMultiplier = 10000 + const promises = new Set() + for (let i = 0; i < max * maxMultiplier; i++) { + promises.add(pool.execute()) + } + await Promise.all(promises) + expect(pool.workerNodes.length).toBe(max) + // We need to clean up the resources after our test + await pool.destroy() + }) }) diff --git a/tests/pools/selection-strategies/selection-strategies.test.mjs b/tests/pools/selection-strategies/selection-strategies.test.mjs index 8dc06262..bca3c82b 100644 --- a/tests/pools/selection-strategies/selection-strategies.test.mjs +++ b/tests/pools/selection-strategies/selection-strategies.test.mjs @@ -143,25 +143,6 @@ describe('Selection strategies test suite', () => { await pool.destroy() }) - it('Verify strategies wait for worker node readiness in dynamic pool', async () => { - const pool = new DynamicThreadPool( - min, - max, - './tests/worker-files/thread/testWorker.mjs' - ) - expect(pool.starting).toBe(false) - expect(pool.workerNodes.length).toBe(min) - const maxMultiplier = 10000 - const promises = new Set() - for (let i = 0; i < max * maxMultiplier; i++) { - promises.add(pool.execute()) - } - await Promise.all(promises) - expect(pool.workerNodes.length).toBe(max) - // We need to clean up the resources after our test - await pool.destroy() - }) - it('Verify ROUND_ROBIN strategy default policy', async () => { const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN let pool = new FixedThreadPool( diff --git a/tests/pools/selection-strategies/worker-choice-strategy-context.test.mjs b/tests/pools/selection-strategies/worker-choice-strategy-context.test.mjs index 85ec4263..77f1d4a1 100644 --- a/tests/pools/selection-strategies/worker-choice-strategy-context.test.mjs +++ b/tests/pools/selection-strategies/worker-choice-strategy-context.test.mjs @@ -158,11 +158,6 @@ describe('Worker choice strategy context test suite', () => { workerChoiceStrategyStub ) const chosenWorkerKey = workerChoiceStrategyContext.execute() - expect( - workerChoiceStrategyContext.workerChoiceStrategies.get( - workerChoiceStrategyContext.workerChoiceStrategy - ).hasPoolWorkerNodesReady.callCount - ).toBe(6) expect( workerChoiceStrategyContext.workerChoiceStrategies.get( workerChoiceStrategyContext.workerChoiceStrategy @@ -171,29 +166,6 @@ describe('Worker choice strategy context test suite', () => { expect(chosenWorkerKey).toBe(1) }) - it('Verify that execute() throws error if worker choice strategy recursion reach the maximum depth', () => { - const workerChoiceStrategyContext = new WorkerChoiceStrategyContext( - fixedPool - ) - const workerChoiceStrategyStub = createStubInstance( - RoundRobinWorkerChoiceStrategy, - { - hasPoolWorkerNodesReady: stub().returns(false), - choose: stub().returns(0) - } - ) - expect(workerChoiceStrategyContext.workerChoiceStrategy).toBe( - WorkerChoiceStrategies.ROUND_ROBIN - ) - workerChoiceStrategyContext.workerChoiceStrategies.set( - workerChoiceStrategyContext.workerChoiceStrategy, - workerChoiceStrategyStub - ) - expect(() => workerChoiceStrategyContext.execute()).toThrow( - new RangeError('Maximum call stack size exceeded') - ) - }) - it('Verify that execute() return the worker node key chosen by the strategy with dynamic pool', () => { const workerChoiceStrategyContext = new WorkerChoiceStrategyContext( dynamicPool diff --git a/tests/pools/thread/dynamic.test.mjs b/tests/pools/thread/dynamic.test.mjs index 06cf64c6..9d35ce8a 100644 --- a/tests/pools/thread/dynamic.test.mjs +++ b/tests/pools/thread/dynamic.test.mjs @@ -152,4 +152,23 @@ describe('Dynamic thread pool test suite', () => { // We need to clean up the resources after our test await pool.destroy() }) + + it('Verify that a pool with zero worker works', async () => { + const pool = new DynamicThreadPool( + 0, + max, + './tests/worker-files/thread/testWorker.mjs' + ) + expect(pool.starting).toBe(false) + expect(pool.workerNodes.length).toBe(pool.info.minSize) + const maxMultiplier = 10000 + const promises = new Set() + for (let i = 0; i < max * maxMultiplier; i++) { + promises.add(pool.execute()) + } + await Promise.all(promises) + expect(pool.workerNodes.length).toBe(max) + // We need to clean up the resources after our test + await pool.destroy() + }) }) -- 2.34.1