From fb5a7307dfe8e86d539482dd807d821bf3c2fbc1 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Tue, 12 Dec 2023 23:34:50 +0100 Subject: [PATCH] fix: ensure worker choice strategy wait for worker nodes readiness MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- CHANGELOG.md | 8 ++ docs/api.md | 2 +- .../abstract-worker-choice-strategy.ts | 5 ++ .../selection-strategies-types.ts | 6 ++ .../worker-choice-strategy-context.ts | 52 +++++++----- .../worker-choice-strategy-context.test.mjs | 82 ++++++++++++++++++- 6 files changed, 131 insertions(+), 24 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 736b2930..802b352b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed + +- Ensure worker choice strategy wait for worker nodes readiness. + +### Changed + +- Remove infinite retries support in worker choice strategy to avoid configuration leading to possible infinite recursion or loop. + ## [3.0.12] - 2023-12-12 ### Changed diff --git a/docs/api.md b/docs/api.md index 4a0a3eb2..28282ac2 100644 --- a/docs/api.md +++ b/docs/api.md @@ -113,7 +113,7 @@ An object with these properties: - `workerChoiceStrategyOptions` (optional) - The worker choice strategy options object to use in this pool. Properties: - - `retries` (optional) - The number of retries to perform if no worker is eligible. `Infinity` means infinite retries. + - `retries` (optional) - The number of retries to perform if no worker is eligible. - `measurement` (optional) - The measurement to use in worker choice strategies: `runTime`, `waitTime` or `elu`. - `runTime` (optional) - Use the tasks [simple moving median](./worker-choice-strategies.md#simple-moving-median) runtime instead of the tasks simple moving average runtime in worker choice strategies. - `waitTime` (optional) - Use the tasks [simple moving median](./worker-choice-strategies.md#simple-moving-median) wait time instead of the tasks simple moving average wait time in worker choice strategies. diff --git a/src/pools/selection-strategies/abstract-worker-choice-strategy.ts b/src/pools/selection-strategies/abstract-worker-choice-strategy.ts index c3649333..f1d7dcf4 100644 --- a/src/pools/selection-strategies/abstract-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/abstract-worker-choice-strategy.ts @@ -116,6 +116,11 @@ export abstract class AbstractWorkerChoiceStrategy< this.setTaskStatisticsRequirements(this.opts) } + /** @inheritDoc */ + public hasPoolWorkerNodesReady (): boolean { + return this.pool.workerNodes.some(workerNode => workerNode.info.ready) + } + /** * Whether the worker node is ready or not. * diff --git a/src/pools/selection-strategies/selection-strategies-types.ts b/src/pools/selection-strategies/selection-strategies-types.ts index 5d490638..6990e65f 100644 --- a/src/pools/selection-strategies/selection-strategies-types.ts +++ b/src/pools/selection-strategies/selection-strategies-types.ts @@ -208,4 +208,10 @@ export interface IWorkerChoiceStrategy { * @param opts - The worker choice strategy options. */ readonly setOptions: (opts: WorkerChoiceStrategyOptions) => void + /** + * Whether the pool has worker nodes ready or not. + * + * @returns Whether the pool has worker nodes ready or not. + */ + readonly hasPoolWorkerNodesReady: () => boolean } diff --git a/src/pools/selection-strategies/worker-choice-strategy-context.ts b/src/pools/selection-strategies/worker-choice-strategy-context.ts index 0ca75e51..1772276c 100644 --- a/src/pools/selection-strategies/worker-choice-strategy-context.ts +++ b/src/pools/selection-strategies/worker-choice-strategy-context.ts @@ -34,11 +34,6 @@ export class WorkerChoiceStrategyContext< IWorkerChoiceStrategy > - /** - * The number of times the worker choice strategy in the context has been retried. - */ - private retriesCount = 0 - /** * Worker choice strategy context constructor. * @@ -168,27 +163,42 @@ export class WorkerChoiceStrategyContext< * Executes the worker choice strategy in the context algorithm. * * @returns The key of the worker node. - * @throws {@link https://nodejs.org/api/errors.html#class-error} If after configured retries the worker node key is null or undefined . + * @throws {@link https://nodejs.org/api/errors.html#class-error} If after configured retries the worker node key is null or undefined. + * @throws {@link https://nodejs.org/api/errors.html#class-error} If the maximum consecutive worker choice strategy executions has been reached. */ public execute (): number { - const workerNodeKey = ( - this.workerChoiceStrategies.get( - this.workerChoiceStrategy - ) as IWorkerChoiceStrategy - ).choose() - if ( - workerNodeKey == null && - (this.retriesCount < (this.opts.retries as number) || - this.opts.retries === Infinity) - ) { - this.retriesCount++ - return this.execute() - } else if (workerNodeKey == null) { + const workerChoiceStrategy = this.workerChoiceStrategies.get( + this.workerChoiceStrategy + ) as IWorkerChoiceStrategy + let workerNodeKey: number | undefined + const maxExecutionCount = 10000 + let executionCount = 0 + let chooseCount = 0 + let retriesCount = 0 + do { + if (workerChoiceStrategy.hasPoolWorkerNodesReady()) { + workerNodeKey = workerChoiceStrategy.choose() + if (chooseCount > 0) { + retriesCount++ + } + chooseCount++ + } + executionCount++ + } while ( + executionCount < maxExecutionCount && + (!workerChoiceStrategy.hasPoolWorkerNodesReady() || + (workerNodeKey == null && retriesCount < (this.opts.retries as number))) + ) + if (executionCount >= maxExecutionCount) { + throw new RangeError( + `Worker choice strategy consecutive executions has exceeded the maximum of ${maxExecutionCount}` + ) + } + if (workerNodeKey == null) { throw new Error( - `Worker node key chosen is null or undefined after ${this.retriesCount} retries` + `Worker node key chosen is null or undefined after ${retriesCount} retries` ) } - this.retriesCount = 0 return workerNodeKey } diff --git a/tests/pools/selection-strategies/worker-choice-strategy-context.test.mjs b/tests/pools/selection-strategies/worker-choice-strategy-context.test.mjs index 2a5f914e..a5447b73 100644 --- a/tests/pools/selection-strategies/worker-choice-strategy-context.test.mjs +++ b/tests/pools/selection-strategies/worker-choice-strategy-context.test.mjs @@ -49,13 +49,14 @@ describe('Worker choice strategy context test suite', () => { ) }) - it('Verify that execute() return the worker chosen by the strategy with fixed pool', () => { + it('Verify that execute() return the worker node key chosen by the strategy with fixed pool', () => { const workerChoiceStrategyContext = new WorkerChoiceStrategyContext( fixedPool ) const workerChoiceStrategyStub = createStubInstance( RoundRobinWorkerChoiceStrategy, { + hasPoolWorkerNodesReady: stub().returns(true), choose: stub().returns(0) } ) @@ -82,12 +83,14 @@ describe('Worker choice strategy context test suite', () => { const workerChoiceStrategyUndefinedStub = createStubInstance( RoundRobinWorkerChoiceStrategy, { + hasPoolWorkerNodesReady: stub().returns(true), choose: stub().returns(undefined) } ) const workerChoiceStrategyNullStub = createStubInstance( RoundRobinWorkerChoiceStrategy, { + hasPoolWorkerNodesReady: stub().returns(true), choose: stub().returns(null) } ) @@ -110,13 +113,88 @@ describe('Worker choice strategy context test suite', () => { ) }) - it('Verify that execute() return the worker chosen by the strategy with dynamic pool', () => { + it('Verify that execute() retry until a worker node is ready and chosen', () => { + const workerChoiceStrategyContext = new WorkerChoiceStrategyContext( + fixedPool + ) + const workerChoiceStrategyStub = createStubInstance( + RoundRobinWorkerChoiceStrategy, + { + hasPoolWorkerNodesReady: stub() + .onCall(0) + .returns(false) + .onCall(1) + .returns(false) + .onCall(2) + .returns(false) + .onCall(3) + .returns(false) + .onCall(4) + .returns(false) + .onCall(6) + .returns(false) + .onCall(7) + .returns(false) + .onCall(8) + .returns(false) + .returns(true), + choose: stub().returns(1) + } + ) + expect(workerChoiceStrategyContext.workerChoiceStrategy).toBe( + WorkerChoiceStrategies.ROUND_ROBIN + ) + workerChoiceStrategyContext.workerChoiceStrategies.set( + workerChoiceStrategyContext.workerChoiceStrategy, + workerChoiceStrategyStub + ) + const chosenWorkerKey = workerChoiceStrategyContext.execute() + expect( + workerChoiceStrategyContext.workerChoiceStrategies.get( + workerChoiceStrategyContext.workerChoiceStrategy + ).hasPoolWorkerNodesReady.callCount + ).toBe(12) + expect( + workerChoiceStrategyContext.workerChoiceStrategies.get( + workerChoiceStrategyContext.workerChoiceStrategy + ).choose.callCount + ).toBe(1) + expect(chosenWorkerKey).toBe(1) + }) + + it('Verify that execute() throws error if worker choice strategy consecutive executions has been reached', () => { + const workerChoiceStrategyContext = new WorkerChoiceStrategyContext( + fixedPool + ) + const workerChoiceStrategyStub = createStubInstance( + RoundRobinWorkerChoiceStrategy, + { + hasPoolWorkerNodesReady: stub().returns(false), + choose: stub().returns(0) + } + ) + expect(workerChoiceStrategyContext.workerChoiceStrategy).toBe( + WorkerChoiceStrategies.ROUND_ROBIN + ) + workerChoiceStrategyContext.workerChoiceStrategies.set( + workerChoiceStrategyContext.workerChoiceStrategy, + workerChoiceStrategyStub + ) + expect(() => workerChoiceStrategyContext.execute()).toThrow( + new RangeError( + 'Worker choice strategy consecutive executions has exceeded the maximum of 10000' + ) + ) + }) + + it('Verify that execute() return the worker node key chosen by the strategy with dynamic pool', () => { const workerChoiceStrategyContext = new WorkerChoiceStrategyContext( dynamicPool ) const workerChoiceStrategyStub = createStubInstance( RoundRobinWorkerChoiceStrategy, { + hasPoolWorkerNodesReady: stub().returns(true), choose: stub().returns(0) } ) -- 2.34.1