From 55082af96253bead6fb8d4d648c454ba71a38fb6 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Wed, 22 Nov 2023 19:47:27 +0100 Subject: [PATCH] fix: fix pool ready event emission MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- CHANGELOG.md | 5 +++-- src/pools/abstract-pool.ts | 16 ++++++++------- tests/pools/abstract-pool.test.mjs | 3 +++ tests/pools/cluster/dynamic.test.mjs | 1 + tests/pools/cluster/fixed.test.mjs | 1 + tests/pools/thread/dynamic.test.mjs | 1 + tests/pools/thread/fixed.test.mjs | 1 + tests/utils.test.mjs | 30 ++++++++++++++-------------- 8 files changed, 34 insertions(+), 24 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 60a03ae2..13defe7d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,12 +10,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed - Ensure pool statuses are checked at initialization, `start()` or `destroy()`. +- Ensure pool `ready` event can be emitted after several `start()/destroy()` cycles. ## [3.0.5] - 2023-10-27 ### Fixed -- Ensure pool ready event can be emitted only once. +- Ensure pool `ready` event can be emitted only once. ## [3.0.4] - 2023-10-20 @@ -272,7 +273,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed -- Fix race condition between ready and task functions worker message handling at startup. +- Fix race condition between readiness and task functions worker message handling at startup. - Fix duplicate task function worker usage statistics computation per task function. - Update task function worker usage statistics if and only if there's at least two different task functions. - Fix race condition at task function worker usage executing task computation leading to negative value. diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index de37c0f9..b14801b6 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -17,7 +17,6 @@ import { max, median, min, - once, round } from '../utils' import { KillBehaviors } from '../worker/worker-options' @@ -117,6 +116,10 @@ export abstract class AbstractPool< * Whether the pool is destroying or not. */ private destroying: boolean + /** + * Whether the pool ready event has been emitted or not. + */ + private readyEventEmitted: boolean /** * The start timestamp of the pool. */ @@ -167,6 +170,7 @@ export abstract class AbstractPool< this.started = false this.starting = false this.destroying = false + this.readyEventEmitted = false if (this.opts.startWorkers === true) { this.start() } @@ -982,6 +986,7 @@ export abstract class AbstractPool< ) this.emitter?.emit(PoolEvents.destroy, this.info) this.emitter?.emitDestroy() + this.readyEventEmitted = false this.destroying = false this.started = false } @@ -1572,12 +1577,9 @@ export abstract class AbstractPool< ) workerInfo.ready = message.ready as boolean workerInfo.taskFunctionNames = message.taskFunctionNames - if (this.ready) { - const emitPoolReadyEventOnce = once( - () => this.emitter?.emit(PoolEvents.ready, this.info), - this - ) - emitPoolReadyEventOnce() + if (!this.readyEventEmitted && this.ready) { + this.readyEventEmitted = true + this.emitter?.emit(PoolEvents.ready, this.info) } } diff --git a/tests/pools/abstract-pool.test.mjs b/tests/pools/abstract-pool.test.mjs index 535eb7e9..1246a7d0 100644 --- a/tests/pools/abstract-pool.test.mjs +++ b/tests/pools/abstract-pool.test.mjs @@ -915,6 +915,7 @@ describe('Abstract pool test suite', () => { ) expect(pool.info.started).toBe(false) expect(pool.info.ready).toBe(false) + expect(pool.readyEventEmitted).toBe(false) expect(pool.workerNodes).toStrictEqual([]) await expect(pool.execute()).rejects.toThrow( new Error('Cannot execute a task on not started pool') @@ -922,6 +923,8 @@ describe('Abstract pool test suite', () => { pool.start() expect(pool.info.started).toBe(true) expect(pool.info.ready).toBe(true) + await waitPoolEvents(pool, PoolEvents.ready, 1) + expect(pool.readyEventEmitted).toBe(true) expect(pool.workerNodes.length).toBe(numberOfWorkers) for (const workerNode of pool.workerNodes) { expect(workerNode).toBeInstanceOf(WorkerNode) diff --git a/tests/pools/cluster/dynamic.test.mjs b/tests/pools/cluster/dynamic.test.mjs index afc013f1..2c542375 100644 --- a/tests/pools/cluster/dynamic.test.mjs +++ b/tests/pools/cluster/dynamic.test.mjs @@ -67,6 +67,7 @@ describe('Dynamic cluster pool test suite', () => { await pool.destroy() const numberOfExitEvents = await exitPromise expect(pool.started).toBe(false) + expect(pool.readyEventEmitted).toBe(false) expect(pool.workerNodes.length).toBe(0) expect(numberOfExitEvents).toBe(min) expect(poolDestroy).toBe(1) diff --git a/tests/pools/cluster/fixed.test.mjs b/tests/pools/cluster/fixed.test.mjs index 426a64f0..c9b528b9 100644 --- a/tests/pools/cluster/fixed.test.mjs +++ b/tests/pools/cluster/fixed.test.mjs @@ -262,6 +262,7 @@ describe('Fixed cluster pool test suite', () => { await pool.destroy() const numberOfExitEvents = await exitPromise expect(pool.started).toBe(false) + expect(pool.readyEventEmitted).toBe(false) expect(pool.workerNodes.length).toBe(0) expect(numberOfExitEvents).toBe(numberOfWorkers) expect(poolDestroy).toBe(1) diff --git a/tests/pools/thread/dynamic.test.mjs b/tests/pools/thread/dynamic.test.mjs index 4fcd2d1b..ec9d51de 100644 --- a/tests/pools/thread/dynamic.test.mjs +++ b/tests/pools/thread/dynamic.test.mjs @@ -67,6 +67,7 @@ describe('Dynamic thread pool test suite', () => { await pool.destroy() const numberOfExitEvents = await exitPromise expect(pool.started).toBe(false) + expect(pool.readyEventEmitted).toBe(false) expect(pool.workerNodes.length).toBe(0) expect(numberOfExitEvents).toBe(min) expect(poolDestroy).toBe(1) diff --git a/tests/pools/thread/fixed.test.mjs b/tests/pools/thread/fixed.test.mjs index fc68fdb2..6b46512c 100644 --- a/tests/pools/thread/fixed.test.mjs +++ b/tests/pools/thread/fixed.test.mjs @@ -293,6 +293,7 @@ describe('Fixed thread pool test suite', () => { await pool.destroy() const numberOfExitEvents = await exitPromise expect(pool.started).toBe(false) + expect(pool.readyEventEmitted).toBe(false) expect(pool.workerNodes.length).toBe(0) expect(numberOfExitEvents).toBe(numberOfThreads) expect(poolDestroy).toBe(1) diff --git a/tests/utils.test.mjs b/tests/utils.test.mjs index ac544f89..be764cd2 100644 --- a/tests/utils.test.mjs +++ b/tests/utils.test.mjs @@ -19,7 +19,7 @@ import { max, median, min, - once, + // once, round, secureRandom, sleep @@ -239,18 +239,18 @@ describe('Utils test suite', () => { expect(max(1, 1)).toBe(1) }) - it('Verify once()', () => { - let called = 0 - const fn = () => ++called - const onceFn = once(fn, this) - const result1 = onceFn() - expect(called).toBe(1) - expect(result1).toBe(1) - const result2 = onceFn() - expect(called).toBe(1) - expect(result2).toBe(1) - const result3 = onceFn() - expect(called).toBe(1) - expect(result3).toBe(1) - }) + // it('Verify once()', () => { + // let called = 0 + // const fn = () => ++called + // const onceFn = once(fn, this) + // const result1 = onceFn() + // expect(called).toBe(1) + // expect(result1).toBe(1) + // const result2 = onceFn() + // expect(called).toBe(1) + // expect(result2).toBe(1) + // const result3 = onceFn() + // expect(called).toBe(1) + // expect(result3).toBe(1) + // }) }) -- 2.34.1