fix: ensure worker choice strategy wait for worker nodes readiness
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Tue, 12 Dec 2023 22:34:50 +0000 (23:34 +0100)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Tue, 12 Dec 2023 22:34:50 +0000 (23:34 +0100)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
CHANGELOG.md
docs/api.md
src/pools/selection-strategies/abstract-worker-choice-strategy.ts
src/pools/selection-strategies/selection-strategies-types.ts
src/pools/selection-strategies/worker-choice-strategy-context.ts
tests/pools/selection-strategies/worker-choice-strategy-context.test.mjs

index 736b2930715b601244847ea01260dcbc34f6e818..802b352b6943ae3524f9d5afbc0ad84f1f95fc9d 100644 (file)
@@ -7,6 +7,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
 
 ## [Unreleased]
 
+### Fixed
+
+- Ensure worker choice strategy wait for worker nodes readiness.
+
+### Changed
+
+- Remove infinite retries support in worker choice strategy to avoid configuration leading to possible infinite recursion or loop.
+
 ## [3.0.12] - 2023-12-12
 
 ### Changed
index 4a0a3eb253b1bc5159b6fffb1ccdfee381c7d629..28282ac2314bc05322098d09ad5ad0d08a163595 100644 (file)
@@ -113,7 +113,7 @@ An object with these properties:
 - `workerChoiceStrategyOptions` (optional) - The worker choice strategy options object to use in this pool.  
   Properties:
 
-  - `retries` (optional) - The number of retries to perform if no worker is eligible. `Infinity` means infinite retries.
+  - `retries` (optional) - The number of retries to perform if no worker is eligible.
   - `measurement` (optional) - The measurement to use in worker choice strategies: `runTime`, `waitTime` or `elu`.
   - `runTime` (optional) - Use the tasks [simple moving median](./worker-choice-strategies.md#simple-moving-median) runtime instead of the tasks simple moving average runtime in worker choice strategies.
   - `waitTime` (optional) - Use the tasks [simple moving median](./worker-choice-strategies.md#simple-moving-median) wait time instead of the tasks simple moving average wait time in worker choice strategies.
index c364933358528ef3ade54a3cc3dd474cf2c2d9d3..f1d7dcf405b2e565f150137aa87e4cab1a79bd6d 100644 (file)
@@ -116,6 +116,11 @@ export abstract class AbstractWorkerChoiceStrategy<
     this.setTaskStatisticsRequirements(this.opts)
   }
 
+  /** @inheritDoc */
+  public hasPoolWorkerNodesReady (): boolean {
+    return this.pool.workerNodes.some(workerNode => workerNode.info.ready)
+  }
+
   /**
    * Whether the worker node is ready or not.
    *
index 5d490638ad2dbb9348cad7318b2258b83cbdad95..6990e65fcf365e340b118f2b6fcc0a00c2214a57 100644 (file)
@@ -208,4 +208,10 @@ export interface IWorkerChoiceStrategy {
    * @param opts - The worker choice strategy options.
    */
   readonly setOptions: (opts: WorkerChoiceStrategyOptions) => void
+  /**
+   * Whether the pool has worker nodes ready or not.
+   *
+   * @returns Whether the pool has worker nodes ready or not.
+   */
+  readonly hasPoolWorkerNodesReady: () => boolean
 }
index 0ca75e51a46a4b24b5244980afa1fc39f87acfb6..1772276c709a8cfba220df4aeafda075cdf92109 100644 (file)
@@ -34,11 +34,6 @@ export class WorkerChoiceStrategyContext<
   IWorkerChoiceStrategy
   >
 
-  /**
-   * The number of times the worker choice strategy in the context has been retried.
-   */
-  private retriesCount = 0
-
   /**
    * Worker choice strategy context constructor.
    *
@@ -168,27 +163,42 @@ export class WorkerChoiceStrategyContext<
    * Executes the worker choice strategy in the context algorithm.
    *
    * @returns The key of the worker node.
-   * @throws {@link https://nodejs.org/api/errors.html#class-error} If after configured retries the worker node key is null or undefined .
+   * @throws {@link https://nodejs.org/api/errors.html#class-error} If after configured retries the worker node key is null or undefined.
+   * @throws {@link https://nodejs.org/api/errors.html#class-error} If the maximum consecutive worker choice strategy executions has been reached.
    */
   public execute (): number {
-    const workerNodeKey = (
-      this.workerChoiceStrategies.get(
-        this.workerChoiceStrategy
-      ) as IWorkerChoiceStrategy
-    ).choose()
-    if (
-      workerNodeKey == null &&
-      (this.retriesCount < (this.opts.retries as number) ||
-        this.opts.retries === Infinity)
-    ) {
-      this.retriesCount++
-      return this.execute()
-    } else if (workerNodeKey == null) {
+    const workerChoiceStrategy = this.workerChoiceStrategies.get(
+      this.workerChoiceStrategy
+    ) as IWorkerChoiceStrategy
+    let workerNodeKey: number | undefined
+    const maxExecutionCount = 10000
+    let executionCount = 0
+    let chooseCount = 0
+    let retriesCount = 0
+    do {
+      if (workerChoiceStrategy.hasPoolWorkerNodesReady()) {
+        workerNodeKey = workerChoiceStrategy.choose()
+        if (chooseCount > 0) {
+          retriesCount++
+        }
+        chooseCount++
+      }
+      executionCount++
+    } while (
+      executionCount < maxExecutionCount &&
+      (!workerChoiceStrategy.hasPoolWorkerNodesReady() ||
+        (workerNodeKey == null && retriesCount < (this.opts.retries as number)))
+    )
+    if (executionCount >= maxExecutionCount) {
+      throw new RangeError(
+        `Worker choice strategy consecutive executions has exceeded the maximum of ${maxExecutionCount}`
+      )
+    }
+    if (workerNodeKey == null) {
       throw new Error(
-        `Worker node key chosen is null or undefined after ${this.retriesCount} retries`
+        `Worker node key chosen is null or undefined after ${retriesCount} retries`
       )
     }
-    this.retriesCount = 0
     return workerNodeKey
   }
 
index 2a5f914e9d89a3bf40828f3ef198482612422a0c..a5447b73f7fdaa7b3f5fa6a6905484b1d34f2b2b 100644 (file)
@@ -49,13 +49,14 @@ describe('Worker choice strategy context test suite', () => {
     )
   })
 
-  it('Verify that execute() return the worker chosen by the strategy with fixed pool', () => {
+  it('Verify that execute() return the worker node key chosen by the strategy with fixed pool', () => {
     const workerChoiceStrategyContext = new WorkerChoiceStrategyContext(
       fixedPool
     )
     const workerChoiceStrategyStub = createStubInstance(
       RoundRobinWorkerChoiceStrategy,
       {
+        hasPoolWorkerNodesReady: stub().returns(true),
         choose: stub().returns(0)
       }
     )
@@ -82,12 +83,14 @@ describe('Worker choice strategy context test suite', () => {
     const workerChoiceStrategyUndefinedStub = createStubInstance(
       RoundRobinWorkerChoiceStrategy,
       {
+        hasPoolWorkerNodesReady: stub().returns(true),
         choose: stub().returns(undefined)
       }
     )
     const workerChoiceStrategyNullStub = createStubInstance(
       RoundRobinWorkerChoiceStrategy,
       {
+        hasPoolWorkerNodesReady: stub().returns(true),
         choose: stub().returns(null)
       }
     )
@@ -110,13 +113,88 @@ describe('Worker choice strategy context test suite', () => {
     )
   })
 
-  it('Verify that execute() return the worker chosen by the strategy with dynamic pool', () => {
+  it('Verify that execute() retry until a worker node is ready and chosen', () => {
+    const workerChoiceStrategyContext = new WorkerChoiceStrategyContext(
+      fixedPool
+    )
+    const workerChoiceStrategyStub = createStubInstance(
+      RoundRobinWorkerChoiceStrategy,
+      {
+        hasPoolWorkerNodesReady: stub()
+          .onCall(0)
+          .returns(false)
+          .onCall(1)
+          .returns(false)
+          .onCall(2)
+          .returns(false)
+          .onCall(3)
+          .returns(false)
+          .onCall(4)
+          .returns(false)
+          .onCall(6)
+          .returns(false)
+          .onCall(7)
+          .returns(false)
+          .onCall(8)
+          .returns(false)
+          .returns(true),
+        choose: stub().returns(1)
+      }
+    )
+    expect(workerChoiceStrategyContext.workerChoiceStrategy).toBe(
+      WorkerChoiceStrategies.ROUND_ROBIN
+    )
+    workerChoiceStrategyContext.workerChoiceStrategies.set(
+      workerChoiceStrategyContext.workerChoiceStrategy,
+      workerChoiceStrategyStub
+    )
+    const chosenWorkerKey = workerChoiceStrategyContext.execute()
+    expect(
+      workerChoiceStrategyContext.workerChoiceStrategies.get(
+        workerChoiceStrategyContext.workerChoiceStrategy
+      ).hasPoolWorkerNodesReady.callCount
+    ).toBe(12)
+    expect(
+      workerChoiceStrategyContext.workerChoiceStrategies.get(
+        workerChoiceStrategyContext.workerChoiceStrategy
+      ).choose.callCount
+    ).toBe(1)
+    expect(chosenWorkerKey).toBe(1)
+  })
+
+  it('Verify that execute() throws error if worker choice strategy consecutive executions has been reached', () => {
+    const workerChoiceStrategyContext = new WorkerChoiceStrategyContext(
+      fixedPool
+    )
+    const workerChoiceStrategyStub = createStubInstance(
+      RoundRobinWorkerChoiceStrategy,
+      {
+        hasPoolWorkerNodesReady: stub().returns(false),
+        choose: stub().returns(0)
+      }
+    )
+    expect(workerChoiceStrategyContext.workerChoiceStrategy).toBe(
+      WorkerChoiceStrategies.ROUND_ROBIN
+    )
+    workerChoiceStrategyContext.workerChoiceStrategies.set(
+      workerChoiceStrategyContext.workerChoiceStrategy,
+      workerChoiceStrategyStub
+    )
+    expect(() => workerChoiceStrategyContext.execute()).toThrow(
+      new RangeError(
+        'Worker choice strategy consecutive executions has exceeded the maximum of 10000'
+      )
+    )
+  })
+
+  it('Verify that execute() return the worker node key chosen by the strategy with dynamic pool', () => {
     const workerChoiceStrategyContext = new WorkerChoiceStrategyContext(
       dynamicPool
     )
     const workerChoiceStrategyStub = createStubInstance(
       RoundRobinWorkerChoiceStrategy,
       {
+        hasPoolWorkerNodesReady: stub().returns(true),
         choose: stub().returns(0)
       }
     )