From: Jérôme Benoit Date: Mon, 10 Jul 2023 23:08:36 +0000 (+0200) Subject: fix: refine pool statuses semantic X-Git-Tag: v2.6.15~11 X-Git-Url: https://git.piment-noir.org/?a=commitdiff_plain;h=d2c73f82d13f6a30d46e829709ad21373271727a;p=poolifier.git fix: refine pool statuses semantic Signed-off-by: Jérôme Benoit --- diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index f1392edd..10a5ce42 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -411,17 +411,16 @@ export abstract class AbstractPool< } private get starting (): boolean { - return ( - this.workerNodes.length < this.minSize || - (this.workerNodes.length >= this.minSize && - this.workerNodes.some(workerNode => !workerNode.info.ready)) - ) + return this.workerNodes.length < this.minSize } private get ready (): boolean { return ( this.workerNodes.length >= this.minSize && - this.workerNodes.every(workerNode => workerNode.info.ready) + this.workerNodes.every( + (workerNode, workerNodeKey) => + workerNodeKey < this.minSize && workerNode.info.ready + ) ) } @@ -980,6 +979,7 @@ export abstract class AbstractPool< } }) const workerInfo = this.getWorkerInfo(this.getWorkerNodeKey(worker)) + workerInfo.ready = true workerInfo.dynamic = true this.sendToWorker(worker, { checkAlive: true, @@ -1011,12 +1011,16 @@ export abstract class AbstractPool< // Listen to worker messages. this.registerWorkerMessageListener(worker, this.workerListener()) // Send startup message to worker. + this.sendWorkerStartupMessage(worker) + // Setup worker task statistics computation. + this.setWorkerStatistics(worker) + } + + private sendWorkerStartupMessage (worker: Worker): void { this.sendToWorker(worker, { ready: false, workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number }) - // Setup worker task statistics computation. - this.setWorkerStatistics(worker) } private redistributeQueuedTasks (workerNodeKey: number): void { @@ -1057,7 +1061,7 @@ export abstract class AbstractPool< protected workerListener (): (message: MessageValue) => void { return message => { this.checkMessageWorkerId(message) - if (message.ready != null && message.workerId != null) { + if (message.ready != null) { // Worker ready message received this.handleWorkerReadyMessage(message) } else if (message.id != null) { @@ -1130,6 +1134,10 @@ export abstract class AbstractPool< * @returns The worker nodes length. */ private pushWorkerNode (worker: Worker): number { + const workerNode = new WorkerNode(worker, this.worker) + if (this.starting) { + workerNode.info.ready = true + } return this.workerNodes.push(new WorkerNode(worker, this.worker)) } diff --git a/src/utility-types.ts b/src/utility-types.ts index 28bf19e2..135dcefb 100644 --- a/src/utility-types.ts +++ b/src/utility-types.ts @@ -134,11 +134,11 @@ export interface PromiseResponseWrapper< /** * Resolve callback to fulfill the promise. */ - readonly resolve: (value: Response) => void + readonly resolve: (value: Response | PromiseLike) => void /** * Reject callback to reject the promise. */ - readonly reject: (reason?: string) => void + readonly reject: (reason?: unknown) => void /** * The worker handling the execution. */ diff --git a/tests/pools/cluster/fixed.test.js b/tests/pools/cluster/fixed.test.js index c93155b0..1d0f7b8c 100644 --- a/tests/pools/cluster/fixed.test.js +++ b/tests/pools/cluster/fixed.test.js @@ -87,9 +87,7 @@ describe('Fixed cluster pool test suite', () => { ) let poolReady = 0 pool1.emitter.on(PoolEvents.ready, () => ++poolReady) - if (!pool1.info.ready) { - await waitPoolEvents(pool1, PoolEvents.ready, 1) - } + await waitPoolEvents(pool1, PoolEvents.ready, 1) expect(poolReady).toBe(1) }) diff --git a/tests/pools/selection-strategies/selection-strategies.test.js b/tests/pools/selection-strategies/selection-strategies.test.js index 535f5f97..8894ba31 100644 --- a/tests/pools/selection-strategies/selection-strategies.test.js +++ b/tests/pools/selection-strategies/selection-strategies.test.js @@ -1717,9 +1717,8 @@ describe('Selection strategies test suite', () => { WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN } ) - if (!pool.info.ready) { - await waitPoolEvents(pool, PoolEvents.ready, 1) - } + // FIXME: shall not be needed + await waitPoolEvents(pool, PoolEvents.ready, 1) // TODO: Create a better test to cover `InterleavedWeightedRoundRobinWorkerChoiceStrategy#choose` const promises = new Set() const maxMultiplier = 2 @@ -1790,9 +1789,6 @@ describe('Selection strategies test suite', () => { WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN } ) - if (!pool.info.ready) { - await waitPoolEvents(pool, PoolEvents.ready, 1) - } // TODO: Create a better test to cover `InterleavedWeightedRoundRobinWorkerChoiceStrategy#choose` const promises = new Set() const maxMultiplier = 2 @@ -1803,7 +1799,7 @@ describe('Selection strategies test suite', () => { for (const workerNode of pool.workerNodes) { expect(workerNode.usage).toStrictEqual({ tasks: { - executed: expect.any(Number), + executed: maxMultiplier, executing: 0, queued: 0, maxQueued: 0, @@ -1839,7 +1835,7 @@ describe('Selection strategies test suite', () => { pool.workerChoiceStrategyContext.workerChoiceStrategies.get( pool.workerChoiceStrategyContext.workerChoiceStrategy ).nextWorkerNodeKey - ).toBe(1) + ).toBe(0) expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( pool.workerChoiceStrategyContext.workerChoiceStrategy diff --git a/tests/pools/thread/fixed.test.js b/tests/pools/thread/fixed.test.js index 636121fd..de81cb68 100644 --- a/tests/pools/thread/fixed.test.js +++ b/tests/pools/thread/fixed.test.js @@ -87,9 +87,7 @@ describe('Fixed thread pool test suite', () => { ) let poolReady = 0 pool1.emitter.on(PoolEvents.ready, () => ++poolReady) - if (!pool1.info.ready) { - await waitPoolEvents(pool1, PoolEvents.ready, 1) - } + await waitPoolEvents(pool1, PoolEvents.ready, 1) expect(poolReady).toBe(1) })