Add WRR worker choice strategy
authorJérôme Benoit <jerome.benoit@sap.com>
Sun, 9 Oct 2022 20:06:28 +0000 (22:06 +0200)
committerJérôme Benoit <jerome.benoit@sap.com>
Sun, 9 Oct 2022 20:06:28 +0000 (22:06 +0200)
Close #363

Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
src/pools/selection-strategies/selection-strategies-types.ts
src/pools/selection-strategies/selection-strategies-utils.ts
src/pools/selection-strategies/weighted-round-robin-choice-strategy.ts [new file with mode: 0644]
tests/pools/selection-strategies/selection-strategies.test.js

index edefcfaac429d8c0d24fa897300650d141ab7f3c..359cfb948c6334dc507a3d0894149790aef584f9 100644 (file)
@@ -11,7 +11,11 @@ export const WorkerChoiceStrategies = Object.freeze({
   /**
    * Less recently used worker selection strategy.
    */
-  LESS_RECENTLY_USED: 'LESS_RECENTLY_USED'
+  LESS_RECENTLY_USED: 'LESS_RECENTLY_USED',
+  /**
+   * Weighted round robin worker selection strategy.
+   */
+  WEIGHTED_ROUND_ROBIN: 'WEIGHTED_ROUND_ROBIN'
 } as const)
 
 /**
index e76a699878289f85a8b2324bbeca729a3fcc0c0d..34c425b3541e806b770e1693e0aee6da7217e4f8 100644 (file)
@@ -7,6 +7,7 @@ import type {
   WorkerChoiceStrategy
 } from './selection-strategies-types'
 import { WorkerChoiceStrategies } from './selection-strategies-types'
+import { WeightedRoundRobinWorkerChoiceStrategy } from './weighted-round-robin-choice-strategy'
 
 /**
  * Worker selection strategies helpers class.
@@ -32,6 +33,8 @@ export class SelectionStrategiesUtils {
         return new RoundRobinWorkerChoiceStrategy(pool)
       case WorkerChoiceStrategies.LESS_RECENTLY_USED:
         return new LessRecentlyUsedWorkerChoiceStrategy(pool)
+      case WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN:
+        return new WeightedRoundRobinWorkerChoiceStrategy(pool)
       default:
         throw new Error(
           // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
diff --git a/src/pools/selection-strategies/weighted-round-robin-choice-strategy.ts b/src/pools/selection-strategies/weighted-round-robin-choice-strategy.ts
new file mode 100644 (file)
index 0000000..cdb8801
--- /dev/null
@@ -0,0 +1,125 @@
+import { cpus } from 'os'
+import type { AbstractPoolWorker } from '../abstract-pool-worker'
+import type { IPoolInternal } from '../pool-internal'
+import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy'
+
+/**
+ * Task run time.
+ */
+type TaskRunTime = {
+  weight: number
+  runTime: number
+}
+
+/**
+ * Selects the next worker with a weighted round robin scheduling algorithm.
+ * Loosely modeled after the weighted round robin queueing algorithm: https://en.wikipedia.org/wiki/Weighted_round_robin.
+ *
+ * @template Worker Type of worker which manages the strategy.
+ * @template Data Type of data sent to the worker. This can only be serializable data.
+ * @template Response Type of response of execution. This can only be serializable data.
+ */
+export class WeightedRoundRobinWorkerChoiceStrategy<
+  Worker extends AbstractPoolWorker,
+  Data,
+  Response
+> extends AbstractWorkerChoiceStrategy<Worker, Data, Response> {
+  /**
+   * Worker index where the previous task was submitted.
+   */
+  private previousWorkerIndex: number = 0
+  /**
+   * Worker index where the current task will be submitted.
+   */
+  private currentWorkerIndex: number = 0
+  /**
+   * Default worker weight.
+   */
+  private defaultWorkerWeight: number
+  /**
+   * Per worker task runtime map.
+   */
+  private workerTaskRunTime: Map<Worker, TaskRunTime> = new Map<
+    Worker,
+    TaskRunTime
+  >()
+
+  /**
+   * Constructs a worker choice strategy that selects based a weighted round robin scheduling algorithm.
+   *
+   * @param pool The pool instance.
+   */
+  public constructor (pool: IPoolInternal<Worker, Data, Response>) {
+    super(pool)
+    this.defaultWorkerWeight = this.computeWorkerWeight()
+    this.initWorkerTaskRunTime()
+  }
+
+  /** @inheritDoc */
+  public choose (): Worker {
+    const currentWorker = this.pool.workers[this.currentWorkerIndex]
+    if (this.isDynamicPool === true) {
+      this.workerTaskRunTime.has(currentWorker) === false &&
+        this.workerTaskRunTime.set(currentWorker, {
+          weight: this.defaultWorkerWeight,
+          runTime: 0
+        })
+    }
+    const workerVirtualTaskRunTime =
+      this.getWorkerVirtualTaskRunTime(currentWorker) ?? 0
+    const workerTaskWeight =
+      this.workerTaskRunTime.get(currentWorker)?.weight ??
+      this.defaultWorkerWeight
+    if (this.currentWorkerIndex === this.previousWorkerIndex) {
+      const workerTaskRunTime =
+        (this.workerTaskRunTime.get(currentWorker)?.runTime ?? 0) +
+        workerVirtualTaskRunTime
+      this.workerTaskRunTime.set(currentWorker, {
+        weight: workerTaskWeight,
+        runTime: workerTaskRunTime
+      })
+    } else {
+      this.workerTaskRunTime.set(currentWorker, {
+        weight: workerTaskWeight,
+        runTime: 0
+      })
+    }
+    if (
+      workerVirtualTaskRunTime <
+      (this.workerTaskRunTime.get(currentWorker) ?? this.defaultWorkerWeight)
+    ) {
+      this.previousWorkerIndex = this.currentWorkerIndex
+    } else {
+      this.previousWorkerIndex = this.currentWorkerIndex
+      this.currentWorkerIndex =
+        this.pool.workers.length - 1 === this.currentWorkerIndex
+          ? 0
+          : this.currentWorkerIndex + 1
+    }
+    return this.pool.workers[this.currentWorkerIndex]
+  }
+
+  private computeWorkerWeight () {
+    let cpusCycleTimeWeight = 0
+    for (let cpu = 0; cpu < cpus().length; cpu++) {
+      // CPU estimated cycle time
+      const numberOfDigit = cpus()[cpu].speed.toString().length - 1
+      const cpuCycleTime = 1 / (cpus()[cpu].speed / Math.pow(10, numberOfDigit))
+      cpusCycleTimeWeight += cpuCycleTime * Math.pow(10, numberOfDigit)
+    }
+    return cpusCycleTimeWeight / cpus().length
+  }
+
+  private initWorkerTaskRunTime () {
+    for (const worker of this.pool.workers) {
+      this.workerTaskRunTime.set(worker, {
+        weight: this.defaultWorkerWeight,
+        runTime: 0
+      })
+    }
+  }
+
+  private getWorkerVirtualTaskRunTime (worker: Worker): number | undefined {
+    return this.pool.getWorkerAverageTasksRunTime(worker)
+  }
+}
index 6448c667dee24cb3420c6744303a4db1eb5c65d1..93649c39c2c667c8b5b518e9e1067abc8ad8b2cd 100644 (file)
@@ -9,6 +9,9 @@ describe('Selection strategies test suite', () => {
   it('Verify that WorkerChoiceStrategies enumeration provides string values', () => {
     expect(WorkerChoiceStrategies.ROUND_ROBIN).toBe('ROUND_ROBIN')
     expect(WorkerChoiceStrategies.LESS_RECENTLY_USED).toBe('LESS_RECENTLY_USED')
+    expect(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN).toBe(
+      'WEIGHTED_ROUND_ROBIN'
+    )
   })
 
   it('Verify ROUND_ROBIN strategy is the default at pool creation', async () => {
@@ -148,6 +151,70 @@ describe('Selection strategies test suite', () => {
     await pool.destroy()
   })
 
+  it('Verify WEIGHTED_ROUND_ROBIN strategy is taken at pool creation', async () => {
+    const max = 3
+    const pool = new FixedThreadPool(
+      max,
+      './tests/worker-files/thread/testWorker.js',
+      { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
+    )
+    expect(pool.opts.workerChoiceStrategy).toBe(
+      WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
+    )
+    // We need to clean up the resources after our test
+    await pool.destroy()
+  })
+
+  it('Verify WEIGHTED_ROUND_ROBIN strategy can be set after pool creation', async () => {
+    const max = 3
+    const pool = new FixedThreadPool(
+      max,
+      './tests/worker-files/thread/testWorker.js'
+    )
+    pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN)
+    expect(pool.opts.workerChoiceStrategy).toBe(
+      WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
+    )
+    // We need to clean up the resources after our test
+    await pool.destroy()
+  })
+
+  it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a fixed pool', async () => {
+    const max = 3
+    const pool = new FixedThreadPool(
+      max,
+      './tests/worker-files/thread/testWorker.js',
+      { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
+    )
+    // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
+    const promises = []
+    for (let i = 0; i < max * 2; i++) {
+      promises.push(pool.execute({ test: 'test' }))
+    }
+    await Promise.all(promises)
+    // We need to clean up the resources after our test
+    await pool.destroy()
+  })
+
+  it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
+    const min = 0
+    const max = 3
+    const pool = new DynamicThreadPool(
+      min,
+      max,
+      './tests/worker-files/thread/testWorker.js',
+      { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
+    )
+    // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
+    const promises = []
+    for (let i = 0; i < max * 2; i++) {
+      promises.push(pool.execute({ test: 'test' }))
+    }
+    await Promise.all(promises)
+    // We need to clean up the resources after our test
+    await pool.destroy()
+  })
+
   it('Verify unknown strategies throw error', () => {
     const min = 1
     const max = 3