feat: add worker choice strategies retry mechanism
authorJérôme Benoit <jerome.benoit@sap.com>
Fri, 18 Aug 2023 14:26:04 +0000 (16:26 +0200)
committerJérôme Benoit <jerome.benoit@sap.com>
Fri, 18 Aug 2023 14:26:04 +0000 (16:26 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
15 files changed:
CHANGELOG.md
docs/api.md
src/pools/abstract-pool.ts
src/pools/selection-strategies/abstract-worker-choice-strategy.ts
src/pools/selection-strategies/fair-share-worker-choice-strategy.ts
src/pools/selection-strategies/interleaved-weighted-round-robin-worker-choice-strategy.ts
src/pools/selection-strategies/least-busy-worker-choice-strategy.ts
src/pools/selection-strategies/least-elu-worker-choice-strategy.ts
src/pools/selection-strategies/least-used-worker-choice-strategy.ts
src/pools/selection-strategies/round-robin-worker-choice-strategy.ts
src/pools/selection-strategies/selection-strategies-types.ts
src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts
src/pools/selection-strategies/worker-choice-strategy-context.ts
src/utils.ts
tests/pools/abstract/abstract-pool.test.js

index 29ab1b08e536e58d3a920a06db38d0f845ba4eab..05f5c7a944c8db3cb0023b8da573f4b8556071a0 100644 (file)
@@ -12,6 +12,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
 - Fix race condition between ready and task functions worker message handling at startup.
 - Fix duplicate task usage statistics computation per task function.
 
+### Added
+
+- Add back pressure detection on the worker node queue. Event `backPressure` is emitted when the worker node queue is full (size > poolMaxSize^2).
+- Use back pressure detection in worker choice strategies.
+- Add worker choice strategies retries mechanism if no worker is eligible.
+
 ## [2.6.28] - 2023-08-16
 
 ### Fixed
index 3a62d7635c754ddabb9e9d0766f897ec7c4f78bc..7b3766e72cc0978901ac6df7ddfd70bcc9d826ef 100644 (file)
@@ -74,13 +74,14 @@ An object with these properties:
 - `workerChoiceStrategyOptions` (optional) - The worker choice strategy options object to use in this pool.  
   Properties:
 
+  - `choiceRetries` (optional) - The number of retries to perform if no worker is eligible.
   - `measurement` (optional) - The measurement to use in worker choice strategies: `runTime`, `waitTime` or `elu`.
   - `runTime` (optional) - Use the tasks [median](./../docs/worker-choice-strategies.md#median) runtime instead of the tasks average runtime in worker choice strategies.
   - `waitTime` (optional) - Use the tasks [median](./../docs/worker-choice-strategies.md#median) wait time instead of the tasks average wait time in worker choice strategies.
   - `elu` (optional) - Use the tasks [median](./../docs/worker-choice-strategies.md#median) ELU instead of the tasks average ELU in worker choice strategies.
   - `weights` (optional) - The worker weights to use in weighted round robin worker choice strategies: `{ 0: 200, 1: 300, ..., n: 100 }`.
 
-  Default: `{ runTime: { median: false }, waitTime: { median: false }, elu: { median: false } }`
+  Default: `{ choiceRetries: 6, runTime: { median: false }, waitTime: { median: false }, elu: { median: false } }`
 
 - `restartWorkerOnError` (optional) - Restart worker on uncaught error in this pool.  
   Default: `true`
index 768114e4cafcce5249f04e448240f0e9eb7bc5c6..64fda2545627eec3fbcd3a7bdc3b7edd4ecb7ed2 100644 (file)
@@ -204,9 +204,10 @@ export abstract class AbstractPool<
       this.opts.workerChoiceStrategy =
         opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
       this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
-      this.opts.workerChoiceStrategyOptions =
-        opts.workerChoiceStrategyOptions ??
-        DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
+      this.opts.workerChoiceStrategyOptions = {
+        ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
+        ...opts.workerChoiceStrategyOptions
+      }
       this.checkValidWorkerChoiceStrategyOptions(
         this.opts.workerChoiceStrategyOptions
       )
@@ -244,6 +245,22 @@ export abstract class AbstractPool<
         'Invalid worker choice strategy options: must be a plain object'
       )
     }
+    if (
+      workerChoiceStrategyOptions.choiceRetries != null &&
+      !Number.isSafeInteger(workerChoiceStrategyOptions.choiceRetries)
+    ) {
+      throw new TypeError(
+        'Invalid worker choice strategy options: choice retries must be an integer'
+      )
+    }
+    if (
+      workerChoiceStrategyOptions.choiceRetries != null &&
+      workerChoiceStrategyOptions.choiceRetries <= 0
+    ) {
+      throw new RangeError(
+        `Invalid worker choice strategy options: choice retries '${workerChoiceStrategyOptions.choiceRetries}' must be greater than zero`
+      )
+    }
     if (
       workerChoiceStrategyOptions.weights != null &&
       Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
@@ -566,7 +583,10 @@ export abstract class AbstractPool<
     workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
   ): void {
     this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
-    this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
+    this.opts.workerChoiceStrategyOptions = {
+      ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
+      ...workerChoiceStrategyOptions
+    }
     this.workerChoiceStrategyContext.setOptions(
       this.opts.workerChoiceStrategyOptions
     )
index d4fcd8d0a95bae029d23ef386fd7cbd2d17a7dde..e6269422abb7b2e89d5021fc82ade5a443268b5d 100644 (file)
@@ -52,6 +52,7 @@ export abstract class AbstractWorkerChoiceStrategy<
     protected readonly pool: IPool<Worker, Data, Response>,
     protected opts: WorkerChoiceStrategyOptions = DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
   ) {
+    this.opts = { ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, ...opts }
     this.choose = this.choose.bind(this)
   }
 
@@ -100,7 +101,7 @@ export abstract class AbstractWorkerChoiceStrategy<
 
   /** @inheritDoc */
   public setOptions (opts: WorkerChoiceStrategyOptions): void {
-    this.opts = opts ?? DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
+    this.opts = { ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, ...opts }
     this.setTaskStatisticsRequirements(this.opts)
   }
 
@@ -110,7 +111,7 @@ export abstract class AbstractWorkerChoiceStrategy<
    * @param workerNodeKey - The worker node key.
    * @returns Whether the worker node is ready or not.
    */
-  protected isWorkerNodeReady (workerNodeKey: number): boolean {
+  private isWorkerNodeReady (workerNodeKey: number): boolean {
     return this.pool.workerNodes[workerNodeKey].info.ready
   }
 
@@ -120,10 +121,26 @@ export abstract class AbstractWorkerChoiceStrategy<
    * @param workerNodeKey - The worker node key.
    * @returns `true` if the worker node has back pressure, `false` otherwise.
    */
-  protected hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
+  private hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
     return this.pool.hasWorkerNodeBackPressure(workerNodeKey)
   }
 
+  /**
+   * Whether the worker node is eligible or not.
+   * A worker node is eligible if it is ready and does not have back pressure.
+   *
+   * @param workerNodeKey - The worker node key.
+   * @returns `true` if the worker node is eligible, `false` otherwise.
+   * @see {@link isWorkerNodeReady}
+   * @see {@link hasWorkerNodeBackPressure}
+   */
+  protected isWorkerNodeEligible (workerNodeKey: number): boolean {
+    return (
+      this.isWorkerNodeReady(workerNodeKey) &&
+      !this.hasWorkerNodeBackPressure(workerNodeKey)
+    )
+  }
+
   /**
    * Gets the worker task runtime.
    * If the task statistics require the average runtime, the average runtime is returned.
index bf30a4764d4ffdd5287fcbbad588baddca42b193..36d1cd4c407c89de4231911bd22cb57b23350781 100644 (file)
@@ -88,7 +88,7 @@ export class FairShareWorkerChoiceStrategy<
       const workerVirtualTaskEndTimestamp =
         this.workersVirtualTaskEndTimestamp[workerNodeKey]
       if (
-        this.isWorkerNodeReady(workerNodeKey) &&
+        this.isWorkerNodeEligible(workerNodeKey) &&
         workerVirtualTaskEndTimestamp < minWorkerVirtualTaskEndTimestamp
       ) {
         minWorkerVirtualTaskEndTimestamp = workerVirtualTaskEndTimestamp
index 26f9d909eed84f448e0244042d23dc289a26b96a..c45d6e0bf606f07a0ce792a5f68c2fbe017e2208 100644 (file)
@@ -81,7 +81,7 @@ export class InterleavedWeightedRoundRobinWorkerChoiceStrategy<
         const workerWeight =
           this.opts.weights?.[workerNodeKey] ?? this.defaultWorkerWeight
         if (
-          this.isWorkerNodeReady(workerNodeKey) &&
+          this.isWorkerNodeEligible(workerNodeKey) &&
           workerWeight >= this.roundWeights[roundIndex]
         ) {
           roundId = roundIndex
index 9a1550e06f60dfecc4339f56ac2c01ad3a7344cf..b3e9e6baf9b977500c0c7e846283fefdba5357df 100644 (file)
@@ -75,11 +75,11 @@ export class LeastBusyWorkerChoiceStrategy<
       const workerTime =
         (workerNode.usage.runTime?.aggregate ?? 0) +
         (workerNode.usage.waitTime?.aggregate ?? 0)
-      if (this.isWorkerNodeReady(workerNodeKey) && workerTime === 0) {
+      if (this.isWorkerNodeEligible(workerNodeKey) && workerTime === 0) {
         this.nextWorkerNodeKey = workerNodeKey
         break
       } else if (
-        this.isWorkerNodeReady(workerNodeKey) &&
+        this.isWorkerNodeEligible(workerNodeKey) &&
         workerTime < minTime
       ) {
         minTime = workerTime
index 7c837ceff3d7b7ca45a5bd9da29feaa70784ef0c..03e6ca0448e108b919cdd4912a74a61edc719995 100644 (file)
@@ -70,11 +70,11 @@ export class LeastEluWorkerChoiceStrategy<
     for (const [workerNodeKey, workerNode] of this.pool.workerNodes.entries()) {
       const workerUsage = workerNode.usage
       const workerElu = workerUsage.elu?.active?.aggregate ?? 0
-      if (this.isWorkerNodeReady(workerNodeKey) && workerElu === 0) {
+      if (this.isWorkerNodeEligible(workerNodeKey) && workerElu === 0) {
         this.nextWorkerNodeKey = workerNodeKey
         break
       } else if (
-        this.isWorkerNodeReady(workerNodeKey) &&
+        this.isWorkerNodeEligible(workerNodeKey) &&
         workerElu < minWorkerElu
       ) {
         minWorkerElu = workerElu
index e8a7218e160879bcaa97399d78a3b39ff7857bc1..e72efda2eb7db69411cc324e2e003c64fa08f529 100644 (file)
@@ -58,11 +58,11 @@ export class LeastUsedWorkerChoiceStrategy<
         workerTaskStatistics.executed +
         workerTaskStatistics.executing +
         workerTaskStatistics.queued
-      if (this.isWorkerNodeReady(workerNodeKey) && workerTasks === 0) {
+      if (this.isWorkerNodeEligible(workerNodeKey) && workerTasks === 0) {
         this.nextWorkerNodeKey = workerNodeKey
         break
       } else if (
-        this.isWorkerNodeReady(workerNodeKey) &&
+        this.isWorkerNodeEligible(workerNodeKey) &&
         workerTasks < minNumberOfTasks
       ) {
         minNumberOfTasks = workerTasks
index b023234a202cacc50b8f2505859e4e444bcf8f0f..55fa8ad73d4c7b7713a490cf07265e3634803779 100644 (file)
@@ -52,7 +52,7 @@ export class RoundRobinWorkerChoiceStrategy<
     const chosenWorkerNodeKey = this.nextWorkerNodeKey
     do {
       this.roundRobinNextWorkerNodeKey()
-    } while (!this.isWorkerNodeReady(this.nextWorkerNodeKey))
+    } while (!this.isWorkerNodeEligible(this.nextWorkerNodeKey))
     return chosenWorkerNodeKey
   }
 
index 563b04b00086a7a31a68f7cb25ca4bde6b513c92..58f36328f98e78dacc1f87fb42914e008f5b7f67 100644 (file)
@@ -70,7 +70,13 @@ export interface MeasurementOptions {
  */
 export interface WorkerChoiceStrategyOptions {
   /**
-   * Measurement to use for worker choice strategy.
+   * Number of worker choice retries to perform if no worker is eligible.
+   *
+   * @defaultValue 6
+   */
+  readonly choiceRetries?: number
+  /**
+   * Measurement to use in worker choice strategy supporting it.
    */
   readonly measurement?: Measurement
   /**
index fecba414ae1f8b18c29f566192ba577eb60b2288..b5c566d51b2e8e0d41261c11acc9956674be9b5b 100644 (file)
@@ -79,7 +79,7 @@ export class WeightedRoundRobinWorkerChoiceStrategy<
     const chosenWorkerNodeKey = this.nextWorkerNodeKey
     do {
       this.weightedRoundRobinNextWorkerNodeKey()
-    } while (!this.isWorkerNodeReady(this.nextWorkerNodeKey))
+    } while (!this.isWorkerNodeEligible(this.nextWorkerNodeKey))
     return chosenWorkerNodeKey
   }
 
index 122774dd0e48c157d5d7c7ce5bdc4ea590281087..8c5a9587debbd4b66fba9a7630838fcc44cabc3c 100644 (file)
@@ -34,6 +34,11 @@ export class WorkerChoiceStrategyContext<
   IWorkerChoiceStrategy
   >
 
+  /**
+   * The number of times the worker choice strategy in the context has been retried.
+   */
+  private choiceRetriesCount = 0
+
   /**
    * Worker choice strategy context constructor.
    *
@@ -44,8 +49,9 @@ export class WorkerChoiceStrategyContext<
   public constructor (
     pool: IPool<Worker, Data, Response>,
     private workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN,
-    opts: WorkerChoiceStrategyOptions = DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
+    private opts: WorkerChoiceStrategyOptions = DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
   ) {
+    this.opts = { ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, ...opts }
     this.execute = this.execute.bind(this)
     this.workerChoiceStrategies = new Map<
     WorkerChoiceStrategy,
@@ -119,7 +125,7 @@ export class WorkerChoiceStrategyContext<
   }
 
   /**
-   * Gets the worker choice strategy task statistics requirements in the context.
+   * Gets the worker choice strategy in the context task statistics requirements.
    *
    * @returns The task statistics requirements.
    */
@@ -146,7 +152,7 @@ export class WorkerChoiceStrategyContext<
   }
 
   /**
-   * Updates the worker node key in the worker choice strategy internals in the context.
+   * Updates the worker node key in the worker choice strategy in the context internals.
    *
    * @returns `true` if the update is successful, `false` otherwise.
    */
@@ -159,7 +165,7 @@ export class WorkerChoiceStrategyContext<
   }
 
   /**
-   * Executes the worker choice strategy algorithm in the context.
+   * Executes the worker choice strategy in the context algorithm.
    *
    * @returns The key of the worker node.
    * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker node key is null or undefined.
@@ -170,7 +176,13 @@ export class WorkerChoiceStrategyContext<
         this.workerChoiceStrategy
       ) as IWorkerChoiceStrategy
     ).choose()
-    if (workerNodeKey == null) {
+    if (
+      workerNodeKey == null &&
+      this.choiceRetriesCount < (this.opts.choiceRetries as number)
+    ) {
+      this.choiceRetriesCount++
+      return this.execute()
+    } else if (workerNodeKey == null) {
       throw new TypeError('Worker node key chosen is null or undefined')
     }
     return workerNodeKey
@@ -196,6 +208,7 @@ export class WorkerChoiceStrategyContext<
    * @param opts - The worker choice strategy options.
    */
   public setOptions (opts: WorkerChoiceStrategyOptions): void {
+    this.opts = { ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, ...opts }
     for (const workerChoiceStrategy of this.workerChoiceStrategies.values()) {
       workerChoiceStrategy.setOptions(opts)
     }
index 45e4fd2393d0533a5256754eec9075666a7f7ea1..c92edc91de4f68100baefe6c5fd14ded17492363 100644 (file)
@@ -23,6 +23,7 @@ export const EMPTY_FUNCTION: () => void = Object.freeze(() => {
  */
 export const DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS: WorkerChoiceStrategyOptions =
   {
+    choiceRetries: 6,
     runTime: { median: false },
     waitTime: { median: false },
     elu: { median: false }
@@ -57,6 +58,22 @@ export const availableParallelism = (): number => {
   return availableParallelism
 }
 
+/**
+ * Computes the retry delay in milliseconds using an exponential back off algorithm.
+ *
+ * @param retryNumber - The number of retries that have already been attempted
+ * @param maxDelayRatio - The maximum ratio of the delay that can be randomized
+ * @returns Delay in milliseconds
+ */
+export const exponentialDelay = (
+  retryNumber = 0,
+  maxDelayRatio = 0.2
+): number => {
+  const delay = Math.pow(2, retryNumber) * 100
+  const randomSum = delay * maxDelayRatio * Math.random() // 0-(maxDelayRatio*100)% of the delay
+  return delay + randomSum
+}
+
 /**
  * Computes the median of the given data set.
  *
index 5c367aa2c0ebf52be17b51dc8f961d70cfabc564..a395782c32a3e184ee784c78a5577127fd4adeb5 100644 (file)
@@ -167,6 +167,13 @@ describe('Abstract pool test suite', () => {
       WorkerChoiceStrategies.ROUND_ROBIN
     )
     expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
+      choiceRetries: 6,
+      runTime: { median: false },
+      waitTime: { median: false },
+      elu: { median: false }
+    })
+    expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
+      choiceRetries: 6,
       runTime: { median: false },
       waitTime: { median: false },
       elu: { median: false }
@@ -205,7 +212,17 @@ describe('Abstract pool test suite', () => {
       WorkerChoiceStrategies.LEAST_USED
     )
     expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
+      choiceRetries: 6,
       runTime: { median: true },
+      waitTime: { median: false },
+      elu: { median: false },
+      weights: { 0: 300, 1: 200 }
+    })
+    expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
+      choiceRetries: 6,
+      runTime: { median: true },
+      waitTime: { median: false },
+      elu: { median: false },
       weights: { 0: 300, 1: 200 }
     })
     expect(pool.opts.messageHandler).toStrictEqual(testHandler)
@@ -226,18 +243,6 @@ describe('Abstract pool test suite', () => {
           }
         )
     ).toThrowError("Invalid worker choice strategy 'invalidStrategy'")
-    expect(
-      () =>
-        new FixedThreadPool(
-          numberOfWorkers,
-          './tests/worker-files/thread/testWorker.js',
-          {
-            workerChoiceStrategyOptions: 'invalidOptions'
-          }
-        )
-    ).toThrowError(
-      'Invalid worker choice strategy options: must be a plain object'
-    )
     expect(
       () =>
         new FixedThreadPool(
@@ -304,6 +309,13 @@ describe('Abstract pool test suite', () => {
       { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
     )
     expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
+      choiceRetries: 6,
+      runTime: { median: false },
+      waitTime: { median: false },
+      elu: { median: false }
+    })
+    expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
+      choiceRetries: 6,
       runTime: { median: false },
       waitTime: { median: false },
       elu: { median: false }
@@ -311,6 +323,7 @@ describe('Abstract pool test suite', () => {
     for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
       .workerChoiceStrategies) {
       expect(workerChoiceStrategy.opts).toStrictEqual({
+        choiceRetries: 6,
         runTime: { median: false },
         waitTime: { median: false },
         elu: { median: false }
@@ -340,13 +353,23 @@ describe('Abstract pool test suite', () => {
       elu: { median: true }
     })
     expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
+      choiceRetries: 6,
+      runTime: { median: true },
+      waitTime: { median: false },
+      elu: { median: true }
+    })
+    expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
+      choiceRetries: 6,
       runTime: { median: true },
+      waitTime: { median: false },
       elu: { median: true }
     })
     for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
       .workerChoiceStrategies) {
       expect(workerChoiceStrategy.opts).toStrictEqual({
+        choiceRetries: 6,
         runTime: { median: true },
+        waitTime: { median: false },
         elu: { median: true }
       })
     }
@@ -374,13 +397,23 @@ describe('Abstract pool test suite', () => {
       elu: { median: false }
     })
     expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
+      choiceRetries: 6,
       runTime: { median: false },
+      waitTime: { median: false },
+      elu: { median: false }
+    })
+    expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
+      choiceRetries: 6,
+      runTime: { median: false },
+      waitTime: { median: false },
       elu: { median: false }
     })
     for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
       .workerChoiceStrategies) {
       expect(workerChoiceStrategy.opts).toStrictEqual({
+        choiceRetries: 6,
         runTime: { median: false },
+        waitTime: { median: false },
         elu: { median: false }
       })
     }