Merge branch 'master' into interleaved-weighted-round-robin-worker-choice-strategy
authorJérôme Benoit <jerome.benoit@sap.com>
Sat, 27 May 2023 19:48:49 +0000 (21:48 +0200)
committerGitHub <noreply@github.com>
Sat, 27 May 2023 19:48:49 +0000 (21:48 +0200)
src/pools/selection-strategies/interleaved-weighted-round-robin-worker-choice-strategy.ts [new file with mode: 0644]
src/pools/selection-strategies/selection-strategies-types.ts
src/pools/selection-strategies/worker-choice-strategy-context.ts
tests/pools/selection-strategies/selection-strategies.test.js

diff --git a/src/pools/selection-strategies/interleaved-weighted-round-robin-worker-choice-strategy.ts b/src/pools/selection-strategies/interleaved-weighted-round-robin-worker-choice-strategy.ts
new file mode 100644 (file)
index 0000000..12a8055
--- /dev/null
@@ -0,0 +1,149 @@
+import { cpus } from 'node:os'
+import type { IWorker } from '../worker'
+import type { IPool } from '../pool'
+import { DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS } from '../../utils'
+import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy'
+import type {
+  IWorkerChoiceStrategy,
+  WorkerChoiceStrategyOptions
+} from './selection-strategies-types'
+
+/**
+ * Selects the next worker with an interleaved weighted round robin scheduling algorithm.
+ *
+ * @typeParam Worker - Type of worker which manages the strategy.
+ * @typeParam Data - Type of data sent to the worker. This can only be serializable data.
+ * @typeParam Response - Type of execution response. This can only be serializable data.
+ */
+export class InterleavedWeightedRoundRobinWorkerChoiceStrategy<
+    Worker extends IWorker,
+    Data = unknown,
+    Response = unknown
+  >
+  extends AbstractWorkerChoiceStrategy<Worker, Data, Response>
+  implements IWorkerChoiceStrategy {
+  /**
+   * Worker node id where the current task will be submitted.
+   */
+  private currentWorkerNodeId: number = 0
+  /**
+   * Current round id.
+   * This is used to determine the current round weight.
+   */
+  private currentRoundId: number = 0
+  /**
+   * Round weights.
+   */
+  private roundWeights: number[]
+  /**
+   * Default worker weight.
+   */
+  private readonly defaultWorkerWeight: number
+
+  /** @inheritDoc */
+  public constructor (
+    pool: IPool<Worker, Data, Response>,
+    opts: WorkerChoiceStrategyOptions = DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
+  ) {
+    super(pool, opts)
+    this.setRequiredStatistics(this.opts)
+    this.defaultWorkerWeight = this.computeDefaultWorkerWeight()
+    this.roundWeights = this.getRoundWeights()
+  }
+
+  /** @inheritDoc */
+  public reset (): boolean {
+    this.currentWorkerNodeId = 0
+    this.currentRoundId = 0
+    return true
+  }
+
+  /** @inheritDoc */
+  public update (): boolean {
+    return true
+  }
+
+  /** @inheritDoc */
+  public choose (): number {
+    let roundId: number | undefined
+    let workerNodeId: number | undefined
+    for (
+      let roundIndex = this.currentRoundId;
+      roundIndex < this.roundWeights.length;
+      roundIndex++
+    ) {
+      for (
+        let workerNodeKey = this.currentWorkerNodeId;
+        workerNodeKey < this.pool.workerNodes.length;
+        workerNodeKey++
+      ) {
+        const workerWeight =
+          this.opts.weights?.[workerNodeKey] ?? this.defaultWorkerWeight
+        if (workerWeight >= this.roundWeights[roundIndex]) {
+          roundId = roundIndex
+          workerNodeId = workerNodeKey
+          break
+        }
+      }
+    }
+    this.currentRoundId = roundId ?? 0
+    this.currentWorkerNodeId = workerNodeId ?? 0
+    const chosenWorkerNodeKey = this.currentWorkerNodeId
+    if (this.currentWorkerNodeId === this.pool.workerNodes.length - 1) {
+      this.currentWorkerNodeId = 0
+      this.currentRoundId =
+        this.currentRoundId === this.roundWeights.length - 1
+          ? 0
+          : this.currentRoundId + 1
+    } else {
+      this.currentWorkerNodeId = this.currentWorkerNodeId + 1
+    }
+    return chosenWorkerNodeKey
+  }
+
+  /** @inheritDoc */
+  public remove (workerNodeKey: number): boolean {
+    if (this.currentWorkerNodeId === workerNodeKey) {
+      if (this.pool.workerNodes.length === 0) {
+        this.currentWorkerNodeId = 0
+      } else if (this.currentWorkerNodeId > this.pool.workerNodes.length - 1) {
+        this.currentWorkerNodeId = this.pool.workerNodes.length - 1
+        this.currentRoundId =
+          this.currentRoundId === this.roundWeights.length - 1
+            ? 0
+            : this.currentRoundId + 1
+      }
+    }
+    return true
+  }
+
+  /** @inheritDoc */
+  public setOptions (opts: WorkerChoiceStrategyOptions): void {
+    super.setOptions(opts)
+    this.roundWeights = this.getRoundWeights()
+  }
+
+  private computeDefaultWorkerWeight (): number {
+    let cpusCycleTimeWeight = 0
+    for (const cpu of cpus()) {
+      // CPU estimated cycle time
+      const numberOfDigits = cpu.speed.toString().length - 1
+      const cpuCycleTime = 1 / (cpu.speed / Math.pow(10, numberOfDigits))
+      cpusCycleTimeWeight += cpuCycleTime * Math.pow(10, numberOfDigits)
+    }
+    return Math.round(cpusCycleTimeWeight / cpus().length)
+  }
+
+  private getRoundWeights (): number[] {
+    if (this.opts.weights == null) {
+      return [this.defaultWorkerWeight]
+    }
+    return [
+      ...new Set(
+        Object.values(this.opts.weights)
+          .slice()
+          .sort((a, b) => a - b)
+      )
+    ]
+  }
+}
index b95cf7f3ae22b6338fcf48940415e23337307715..6ae695ca6dca6141d48cef31dd46c6be5cb08bbb 100644 (file)
@@ -21,7 +21,13 @@ export const WorkerChoiceStrategies = Object.freeze({
   /**
    * Weighted round robin worker selection strategy.
    */
-  WEIGHTED_ROUND_ROBIN: 'WEIGHTED_ROUND_ROBIN'
+  WEIGHTED_ROUND_ROBIN: 'WEIGHTED_ROUND_ROBIN',
+  /**
+   * Interleaved weighted round robin worker selection strategy.
+   *
+   * @experimental
+   */
+  INTERLEAVED_WEIGHTED_ROUND_ROBIN: 'INTERLEAVED_WEIGHTED_ROUND_ROBIN'
 } as const)
 
 /**
index e07808bcefe837ef5289be98db46ac3664607219..0d0d269c076537bd2018cc4cc9450ea43584a6b8 100644 (file)
@@ -2,6 +2,7 @@ import { DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS } from '../../utils'
 import type { IPool } from '../pool'
 import type { IWorker } from '../worker'
 import { FairShareWorkerChoiceStrategy } from './fair-share-worker-choice-strategy'
+import { InterleavedWeightedRoundRobinWorkerChoiceStrategy } from './interleaved-weighted-round-robin-worker-choice-strategy'
 import { LessBusyWorkerChoiceStrategy } from './less-busy-worker-choice-strategy'
 import { LessUsedWorkerChoiceStrategy } from './less-used-worker-choice-strategy'
 import { RoundRobinWorkerChoiceStrategy } from './round-robin-worker-choice-strategy'
@@ -83,6 +84,14 @@ export class WorkerChoiceStrategyContext<
         Data,
         Response
         >(pool, opts)
+      ],
+      [
+        WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN,
+        new (InterleavedWeightedRoundRobinWorkerChoiceStrategy.bind(this))<
+        Worker,
+        Data,
+        Response
+        >(pool, opts)
       ]
     ])
   }
index 11a9aab3caa2001b799d5a8e48526481cef7da19..9a85c3ff9de048b9f401a7ca2530b710528219e0 100644 (file)
@@ -18,6 +18,9 @@ describe('Selection strategies test suite', () => {
     expect(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN).toBe(
       'WEIGHTED_ROUND_ROBIN'
     )
+    expect(WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN).toBe(
+      'INTERLEAVED_WEIGHTED_ROUND_ROBIN'
+    )
   })
 
   it('Verify ROUND_ROBIN strategy is the default at pool creation', async () => {