From d91689fda0fa7a85014ac25276cf2cf0a9d81ce2 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Wed, 25 Oct 2023 17:20:39 +0200 Subject: [PATCH] fix: fixes to pool initialization MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- CHANGELOG.md | 5 +++++ src/pools/abstract-pool.ts | 14 +++++++++++++- src/utils.ts | 24 ++++++++++++++++++++++++ tests/utils.test.mjs | 16 ++++++++++++++++ 4 files changed, 58 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 25e94333..eb0fa9cd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed + +- Ensure worker ready response can be received only once. +- Ensure pool ready event can be emitted only once. + ## [3.0.4] - 2023-10-20 ### Changed diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 00cf5617..1140ca1b 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -17,6 +17,7 @@ import { max, median, min, + once, round } from '../utils' import { KillBehaviors } from '../worker/worker-options' @@ -1539,10 +1540,21 @@ export abstract class AbstractPool< const workerInfo = this.getWorkerInfo( this.getWorkerNodeKeyByWorkerId(message.workerId) ) + if (!this.started && workerInfo.ready) { + throw new Error( + `Ready response already received by worker ${ + message.workerId as number + }` + ) + } workerInfo.ready = message.ready as boolean workerInfo.taskFunctionNames = message.taskFunctionNames if (this.ready) { - this.emitter?.emit(PoolEvents.ready, this.info) + const emitPoolReadyEventOnce = once( + () => this.emitter?.emit(PoolEvents.ready, this.info), + this + ) + emitPoolReadyEventOnce() } } diff --git a/src/utils.ts b/src/utils.ts index d7ce5a21..a0a9e530 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -250,3 +250,27 @@ export const min = (...args: number[]): number => */ export const max = (...args: number[]): number => args.reduce((maximum, num) => (maximum > num ? maximum : num), -Infinity) + +/** + * Wraps a function so that it can only be called once. + * + * @param fn - The function to wrap. + * @param context - The context to bind the function to. + * @returns The wrapped function. + * @internal + */ +// eslint-disable-next-line @typescript-eslint/no-explicit-any +export const once = ( + fn: (...args: A) => R, + context: T +): ((...args: A) => R) => { + let result: R + return (...args: A) => { + if (fn != null) { + result = fn.apply(context, args) + ;(fn as unknown as undefined) = (context as unknown as undefined) = + undefined + } + return result + } +} diff --git a/tests/utils.test.mjs b/tests/utils.test.mjs index c048141b..ac544f89 100644 --- a/tests/utils.test.mjs +++ b/tests/utils.test.mjs @@ -19,6 +19,7 @@ import { max, median, min, + once, round, secureRandom, sleep @@ -237,4 +238,19 @@ describe('Utils test suite', () => { expect(max(2, 1)).toBe(2) expect(max(1, 1)).toBe(1) }) + + it('Verify once()', () => { + let called = 0 + const fn = () => ++called + const onceFn = once(fn, this) + const result1 = onceFn() + expect(called).toBe(1) + expect(result1).toBe(1) + const result2 = onceFn() + expect(called).toBe(1) + expect(result2).toBe(1) + const result3 = onceFn() + expect(called).toBe(1) + expect(result3).toBe(1) + }) }) -- 2.34.1