From: Jérôme Benoit Date: Fri, 5 Jan 2024 22:33:09 +0000 (+0100) Subject: fix: fix dynamic pool with minimum # of workers set to zero X-Git-Tag: v3.1.18~3^2~1 X-Git-Url: https://git.piment-noir.org/?a=commitdiff_plain;h=8e8d9101768c28914dabc95c14740985e18f1f04;p=poolifier.git fix: fix dynamic pool with minimum # of workers set to zero Signed-off-by: Jérôme Benoit --- diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index f98d5cde..ab114e1a 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -443,6 +443,9 @@ export abstract class AbstractPool< * The pool readiness boolean status. */ private get ready (): boolean { + if (this.empty) { + return false + } return ( this.workerNodes.reduce( (accumulator, workerNode) => @@ -454,6 +457,16 @@ export abstract class AbstractPool< ) } + /** + * The pool emptiness boolean status. + */ + protected get empty (): boolean { + if (this.minimumNumberOfWorkers === 0 && this.workerNodes.length === 0) { + return true + } + return false + } + /** * The approximate pool utilization. * @@ -1711,6 +1724,13 @@ export abstract class AbstractPool< } } + private checkAndEmitReadyEvent (): void { + if (!this.readyEventEmitted && this.ready) { + this.emitter?.emit(PoolEvents.ready, this.info) + this.readyEventEmitted = true + } + } + private handleWorkerReadyResponse (message: MessageValue): void { const { workerId, ready, taskFunctionNames } = message if (ready == null || !ready) { @@ -1720,10 +1740,7 @@ export abstract class AbstractPool< this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)] workerNode.info.ready = ready workerNode.info.taskFunctionNames = taskFunctionNames - if (!this.readyEventEmitted && this.ready) { - this.emitter?.emit(PoolEvents.ready, this.info) - this.readyEventEmitted = true - } + this.checkAndEmitReadyEvent() } private handleTaskExecutionResponse (message: MessageValue): void { @@ -1847,6 +1864,13 @@ export abstract class AbstractPool< return workerNodeKey } + private checkAndEmitEmptyEvent (): void { + if (this.empty) { + this.emitter?.emit(PoolEvents.empty, this.info) + this.readyEventEmitted = false + } + } + /** * Removes the worker node from the pool worker nodes. * @@ -1858,6 +1882,7 @@ export abstract class AbstractPool< this.workerNodes.splice(workerNodeKey, 1) this.workerChoiceStrategyContext?.remove(workerNodeKey) } + this.checkAndEmitEmptyEvent() } protected flagWorkerNodeAsNotReady (workerNodeKey: number): void { diff --git a/src/pools/cluster/dynamic.ts b/src/pools/cluster/dynamic.ts index 7b2c3798..e8e51947 100644 --- a/src/pools/cluster/dynamic.ts +++ b/src/pools/cluster/dynamic.ts @@ -40,10 +40,7 @@ export class DynamicClusterPool< /** @inheritDoc */ protected shallCreateDynamicWorker (): boolean { - return ( - (!this.full && this.internalBusy()) || - (this.minimumNumberOfWorkers === 0 && this.workerNodes.length === 0) - ) + return (!this.full && this.internalBusy()) || this.empty } /** @inheritDoc */ diff --git a/src/pools/pool.ts b/src/pools/pool.ts index 3aa9feb7..cda63d88 100644 --- a/src/pools/pool.ts +++ b/src/pools/pool.ts @@ -42,6 +42,7 @@ export const PoolEvents = Object.freeze({ ready: 'ready', busy: 'busy', full: 'full', + empty: 'empty', destroy: 'destroy', error: 'error', taskError: 'taskError', @@ -246,9 +247,10 @@ export interface IPool< * * Events that can currently be listened to: * - * - `'ready'`: Emitted when the number of workers created in the pool has reached the minimum size expected and are ready. + * - `'ready'`: Emitted when the number of workers created in the pool has reached the minimum size expected and are ready. If the pool is dynamic with a minimum number of workers is set to zero, this event is emitted when at least one dynamic worker is ready. * - `'busy'`: Emitted when the number of workers created in the pool has reached the maximum size expected and are executing concurrently their tasks quota. * - `'full'`: Emitted when the pool is dynamic and the number of workers created has reached the maximum size expected. + * - `'empty'`: Emitted when the pool is dynamic with a minimum number of workers set to zero and the number of workers has reached the minimum size expected. * - `'destroy'`: Emitted when the pool is destroyed. * - `'error'`: Emitted when an uncaught error occurs. * - `'taskError'`: Emitted when an error occurs while executing a task. diff --git a/src/pools/selection-strategies/abstract-worker-choice-strategy.ts b/src/pools/selection-strategies/abstract-worker-choice-strategy.ts index 55f43092..f2443742 100644 --- a/src/pools/selection-strategies/abstract-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/abstract-worker-choice-strategy.ts @@ -132,11 +132,14 @@ export abstract class AbstractWorkerChoiceStrategy< } /** - * Check the next worker node readiness. + * Check the next worker node key. */ - protected checkNextWorkerNodeReadiness (): void { - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - if (!this.isWorkerNodeReady(this.nextWorkerNodeKey!)) { + protected checkNextWorkerNodeKey (): void { + if ( + this.nextWorkerNodeKey != null && + (this.nextWorkerNodeKey < 0 || + !this.isWorkerNodeReady(this.nextWorkerNodeKey)) + ) { delete this.nextWorkerNodeKey } } @@ -189,6 +192,9 @@ export abstract class AbstractWorkerChoiceStrategy< * @param workerNodeKey - The worker node key. */ protected setPreviousWorkerNodeKey (workerNodeKey: number | undefined): void { - this.previousWorkerNodeKey = workerNodeKey ?? this.previousWorkerNodeKey + this.previousWorkerNodeKey = + workerNodeKey != null && workerNodeKey >= 0 + ? workerNodeKey + : this.previousWorkerNodeKey } } diff --git a/src/pools/selection-strategies/interleaved-weighted-round-robin-worker-choice-strategy.ts b/src/pools/selection-strategies/interleaved-weighted-round-robin-worker-choice-strategy.ts index 2d604e71..97a3aa04 100644 --- a/src/pools/selection-strategies/interleaved-weighted-round-robin-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/interleaved-weighted-round-robin-worker-choice-strategy.ts @@ -114,7 +114,9 @@ export class InterleavedWeightedRoundRobinWorkerChoiceStrategy< } private interleavedWeightedRoundRobinNextWorkerNodeId (): void { - if ( + if (this.pool.workerNodes.length === 0) { + this.workerNodeId = 0 + } else if ( this.roundId === this.roundWeights.length - 1 && this.workerNodeId === this.pool.workerNodes.length - 1 ) { diff --git a/src/pools/selection-strategies/round-robin-worker-choice-strategy.ts b/src/pools/selection-strategies/round-robin-worker-choice-strategy.ts index 41326a78..53f3ba4f 100644 --- a/src/pools/selection-strategies/round-robin-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/round-robin-worker-choice-strategy.ts @@ -44,7 +44,7 @@ export class RoundRobinWorkerChoiceStrategy< const chosenWorkerNodeKey = this.nextWorkerNodeKey this.setPreviousWorkerNodeKey(chosenWorkerNodeKey) this.roundRobinNextWorkerNodeKey() - this.checkNextWorkerNodeReadiness() + this.checkNextWorkerNodeKey() return chosenWorkerNodeKey } diff --git a/src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts b/src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts index 94a832f6..38b6dd75 100644 --- a/src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts @@ -64,7 +64,7 @@ export class WeightedRoundRobinWorkerChoiceStrategy< public choose (): number | undefined { this.setPreviousWorkerNodeKey(this.nextWorkerNodeKey) this.weightedRoundRobinNextWorkerNodeKey() - this.checkNextWorkerNodeReadiness() + this.checkNextWorkerNodeKey() return this.nextWorkerNodeKey } diff --git a/src/pools/selection-strategies/worker-choice-strategy-context.ts b/src/pools/selection-strategies/worker-choice-strategy-context.ts index 40e2e18f..b7e0a4a7 100644 --- a/src/pools/selection-strategies/worker-choice-strategy-context.ts +++ b/src/pools/selection-strategies/worker-choice-strategy-context.ts @@ -188,6 +188,9 @@ export class WorkerChoiceStrategyContext< let retriesCount = 0 do { workerNodeKey = workerChoiceStrategy.choose() + if (workerNodeKey != null && workerNodeKey < 0) { + workerNodeKey = undefined + } if (workerNodeKey == null && chooseCount > 0) { retriesCount++ } diff --git a/src/pools/thread/dynamic.ts b/src/pools/thread/dynamic.ts index fde37104..8e795512 100644 --- a/src/pools/thread/dynamic.ts +++ b/src/pools/thread/dynamic.ts @@ -40,10 +40,7 @@ export class DynamicThreadPool< /** @inheritDoc */ protected shallCreateDynamicWorker (): boolean { - return ( - (!this.full && this.internalBusy()) || - (this.minimumNumberOfWorkers === 0 && this.workerNodes.length === 0) - ) + return (!this.full && this.internalBusy()) || this.empty } /** @inheritDoc */ diff --git a/tests/pools/abstract-pool.test.mjs b/tests/pools/abstract-pool.test.mjs index 3fa8e2d6..84a0195e 100644 --- a/tests/pools/abstract-pool.test.mjs +++ b/tests/pools/abstract-pool.test.mjs @@ -862,8 +862,8 @@ describe('Abstract pool test suite', () => { ) expect(pool.info.started).toBe(false) expect(pool.info.ready).toBe(false) - expect(pool.readyEventEmitted).toBe(false) expect(pool.workerNodes).toStrictEqual([]) + expect(pool.readyEventEmitted).toBe(false) await expect(pool.execute()).rejects.toThrow( new Error('Cannot execute a task on not started pool') ) diff --git a/tests/pools/cluster/dynamic.test.mjs b/tests/pools/cluster/dynamic.test.mjs index fa318f4b..d2a250c1 100644 --- a/tests/pools/cluster/dynamic.test.mjs +++ b/tests/pools/cluster/dynamic.test.mjs @@ -1,7 +1,11 @@ import { expect } from 'expect' -import { DynamicClusterPool, PoolEvents } from '../../../lib/index.cjs' +import { + DynamicClusterPool, + PoolEvents, + WorkerChoiceStrategies +} from '../../../lib/index.cjs' import { TaskFunctions } from '../../test-types.cjs' -import { sleep, waitWorkerEvents } from '../../test-utils.cjs' +import { sleep, waitPoolEvents, waitWorkerEvents } from '../../test-utils.cjs' describe('Dynamic cluster pool test suite', () => { const min = 1 @@ -160,18 +164,25 @@ describe('Dynamic cluster pool test suite', () => { './tests/worker-files/thread/testWorker.mjs' ) expect(pool.starting).toBe(false) - expect(pool.workerNodes.length).toBe(pool.info.minSize) - for (let run = 0; run < 4; run++) { - // pool.enableTasksQueue(true, { concurrency: 2 }) - const maxMultiplier = 10000 - const promises = new Set() - for (let i = 0; i < max * maxMultiplier; i++) { - promises.add(pool.execute()) + for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) { + pool.setWorkerChoiceStrategy(workerChoiceStrategy) + expect(pool.readyEventEmitted).toBe(false) + for (let run = 0; run < 2; run++) { + run % 2 !== 0 && pool.enableTasksQueue(true) + const maxMultiplier = 4 + const promises = new Set() + expect(pool.workerNodes.length).toBe(pool.info.minSize) + for (let i = 0; i < max * maxMultiplier; i++) { + promises.add(pool.execute()) + } + await Promise.all(promises) + expect(pool.readyEventEmitted).toBe(true) + expect(pool.workerNodes.length).toBeGreaterThan(pool.info.minSize) + expect(pool.workerNodes.length).toBeLessThanOrEqual(pool.info.maxSize) + await waitPoolEvents(pool, PoolEvents.empty, 1) + expect(pool.readyEventEmitted).toBe(false) + expect(pool.workerNodes.length).toBe(pool.info.minSize) } - await Promise.all(promises) - expect(pool.workerNodes.length).toBe(max) - await waitWorkerEvents(pool, 'exit', max) - expect(pool.workerNodes.length).toBe(pool.info.minSize) } // We need to clean up the resources after our test await pool.destroy() diff --git a/tests/pools/thread/dynamic.test.mjs b/tests/pools/thread/dynamic.test.mjs index e0e05c4e..2b5dd611 100644 --- a/tests/pools/thread/dynamic.test.mjs +++ b/tests/pools/thread/dynamic.test.mjs @@ -1,7 +1,11 @@ import { expect } from 'expect' -import { DynamicThreadPool, PoolEvents } from '../../../lib/index.cjs' +import { + DynamicThreadPool, + PoolEvents, + WorkerChoiceStrategies +} from '../../../lib/index.cjs' import { TaskFunctions } from '../../test-types.cjs' -import { sleep, waitWorkerEvents } from '../../test-utils.cjs' +import { sleep, waitPoolEvents, waitWorkerEvents } from '../../test-utils.cjs' describe('Dynamic thread pool test suite', () => { const min = 1 @@ -160,18 +164,25 @@ describe('Dynamic thread pool test suite', () => { './tests/worker-files/thread/testWorker.mjs' ) expect(pool.starting).toBe(false) - expect(pool.workerNodes.length).toBe(pool.info.minSize) - for (let run = 0; run < 4; run++) { - // pool.enableTasksQueue(true, { concurrency: 2 }) - const maxMultiplier = 10000 - const promises = new Set() - for (let i = 0; i < max * maxMultiplier; i++) { - promises.add(pool.execute()) + for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) { + pool.setWorkerChoiceStrategy(workerChoiceStrategy) + expect(pool.readyEventEmitted).toBe(false) + for (let run = 0; run < 2; run++) { + run % 2 !== 0 && pool.enableTasksQueue(true) + const maxMultiplier = 4 + const promises = new Set() + expect(pool.workerNodes.length).toBe(pool.info.minSize) + for (let i = 0; i < max * maxMultiplier; i++) { + promises.add(pool.execute()) + } + await Promise.all(promises) + expect(pool.readyEventEmitted).toBe(true) + expect(pool.workerNodes.length).toBeGreaterThan(pool.info.minSize) + expect(pool.workerNodes.length).toBeLessThanOrEqual(pool.info.maxSize) + await waitPoolEvents(pool, PoolEvents.empty, 1) + expect(pool.readyEventEmitted).toBe(false) + expect(pool.workerNodes.length).toBe(pool.info.minSize) } - await Promise.all(promises) - expect(pool.workerNodes.length).toBe(max) - await waitWorkerEvents(pool, 'exit', max) - expect(pool.workerNodes.length).toBe(pool.info.minSize) } // We need to clean up the resources after our test await pool.destroy() diff --git a/tests/pools/thread/fixed.test.mjs b/tests/pools/thread/fixed.test.mjs index 85415d63..44e7d80b 100644 --- a/tests/pools/thread/fixed.test.mjs +++ b/tests/pools/thread/fixed.test.mjs @@ -300,8 +300,8 @@ describe('Fixed thread pool test suite', () => { await pool.destroy() const numberOfExitEvents = await exitPromise expect(pool.started).toBe(false) - expect(pool.readyEventEmitted).toBe(false) expect(pool.emitter.eventNames()).toStrictEqual([]) + expect(pool.readyEventEmitted).toBe(false) expect(pool.workerNodes.length).toBe(0) expect(numberOfExitEvents).toBe(numberOfThreads) expect(poolDestroy).toBe(1)