Allow worker choice strategy to specify their statistics requirements
authorJérôme Benoit <jerome.benoit@sap.com>
Mon, 10 Oct 2022 08:14:02 +0000 (10:14 +0200)
committerJérôme Benoit <jerome.benoit@sap.com>
Mon, 10 Oct 2022 08:14:02 +0000 (10:14 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
src/pools/abstract-pool.ts
src/pools/selection-strategies/abstract-worker-choice-strategy.ts
src/pools/selection-strategies/dynamic-pool-worker-choice-strategy.ts
src/pools/selection-strategies/fair-share-worker-choice-strategy.ts
src/pools/selection-strategies/selection-strategies-types.ts
src/pools/selection-strategies/weighted-round-robin-choice-strategy.ts
src/pools/selection-strategies/worker-choice-strategy-context.ts
tests/pools/abstract/abstract-pool.test.js
tests/pools/selection-strategies/selection-strategies.test.js

index 0921f45644446c18512f876a0a13b3379f8026fb..e204e1241811c2c0279520706fdce39ad247e881 100644 (file)
@@ -279,7 +279,7 @@ export abstract class AbstractPool<
   protected removeWorker (worker: Worker): void {
     // Clean worker from data structure
     this.workers.splice(this.getWorkerIndex(worker), 1)
-    this.resetWorkerTasksUsage(worker)
+    this.removeWorkerTasksUsage(worker)
   }
 
   /**
@@ -393,7 +393,7 @@ export abstract class AbstractPool<
   }
 
   /**
-   * Increase the number of tasks that the given worker has applied.
+   * Increases the number of tasks that the given worker has applied.
    *
    * @param worker Worker which running tasks is increased.
    */
@@ -402,7 +402,7 @@ export abstract class AbstractPool<
   }
 
   /**
-   * Decrease the number of tasks that the given worker has applied.
+   * Decreases the number of tasks that the given worker has applied.
    *
    * @param worker Worker which running tasks is decreased.
    */
@@ -411,7 +411,7 @@ export abstract class AbstractPool<
   }
 
   /**
-   * Step the number of tasks that the given worker has applied.
+   * Steps the number of tasks that the given worker has applied.
    *
    * @param worker Worker which running tasks are stepped.
    * @param step Number of running tasks step.
@@ -427,12 +427,12 @@ export abstract class AbstractPool<
   }
 
   /**
-   * Step the number of tasks that the given worker has run.
+   * Steps the number of tasks that the given worker has run.
    *
    * @param worker Worker which has run tasks.
    * @param step Number of run tasks step.
    */
-  private stepWorkerRunTasks (worker: Worker, step: number) {
+  private stepWorkerRunTasks (worker: Worker, step: number): void {
     const tasksUsage = this.workersTasksUsage.get(worker)
     if (tasksUsage !== undefined) {
       tasksUsage.run = tasksUsage.run + step
@@ -443,7 +443,7 @@ export abstract class AbstractPool<
   }
 
   /**
-   * Update tasks run time for the given worker.
+   * Updates tasks run time for the given worker.
    *
    * @param worker Worker which run the task.
    * @param taskRunTime Worker task run time.
@@ -451,23 +451,28 @@ export abstract class AbstractPool<
   private updateWorkerTasksRunTime (
     worker: Worker,
     taskRunTime: number | undefined
-  ) {
-    const tasksUsage = this.workersTasksUsage.get(worker)
-    if (tasksUsage !== undefined && tasksUsage.run !== 0) {
-      tasksUsage.runTime += taskRunTime ?? 0
-      tasksUsage.avgRunTime = tasksUsage.runTime / tasksUsage.run
-      this.workersTasksUsage.set(worker, tasksUsage)
-    } else {
-      throw new Error(WORKER_NOT_FOUND_TASKS_USAGE_MAP)
+  ): void {
+    if (
+      this.workerChoiceStrategyContext.getWorkerChoiceStrategy()
+        .requiredStatistics.runTime === true
+    ) {
+      const tasksUsage = this.workersTasksUsage.get(worker)
+      if (tasksUsage !== undefined && tasksUsage.run !== 0) {
+        tasksUsage.runTime += taskRunTime ?? 0
+        tasksUsage.avgRunTime = tasksUsage.runTime / tasksUsage.run
+        this.workersTasksUsage.set(worker, tasksUsage)
+      } else {
+        throw new Error(WORKER_NOT_FOUND_TASKS_USAGE_MAP)
+      }
     }
   }
 
   /**
-   * Reset worker tasks usage statistics.
+   * Removes worker tasks usage statistics.
    *
    * @param worker The worker.
    */
-  private resetWorkerTasksUsage (worker: Worker): void {
+  private removeWorkerTasksUsage (worker: Worker): void {
     this.workersTasksUsage.delete(worker)
   }
 }
index fc9074daa7c119d2feff3e45b44f73b161943480..cd50f700bdb74d1fe75f6f37a00d3867ece5c178 100644 (file)
@@ -1,7 +1,10 @@
 import type { AbstractPoolWorker } from '../abstract-pool-worker'
 import type { IPoolInternal } from '../pool-internal'
 import { PoolType } from '../pool-internal'
-import type { IWorkerChoiceStrategy } from './selection-strategies-types'
+import type {
+  IWorkerChoiceStrategy,
+  RequiredStatistics
+} from './selection-strategies-types'
 
 /**
  * Abstract worker choice strategy class.
@@ -17,6 +20,10 @@ export abstract class AbstractWorkerChoiceStrategy<
 > implements IWorkerChoiceStrategy<Worker> {
   /** @inheritDoc */
   public isDynamicPool: boolean = this.pool.type === PoolType.DYNAMIC
+  /** @inheritDoc */
+  public requiredStatistics: RequiredStatistics = {
+    runTime: false
+  }
 
   /**
    * Constructs a worker choice strategy attached to the pool.
index c2053af44f6c72f57768aee83c6ef5f8a47f8b9b..68b48e32744739c5060458b71d815d1889a3640a 100644 (file)
@@ -39,6 +39,7 @@ export class DynamicPoolWorkerChoiceStrategy<
       this.pool,
       workerChoiceStrategy
     )
+    this.requiredStatistics = this.workerChoiceStrategy.requiredStatistics
   }
 
   /** @inheritDoc */
index f500343f55dfb4a4a8ee88f9ffeccddad3006c75..f8f1821269144ed59f7b33b4cb21ab27e2ea7eeb 100644 (file)
@@ -1,5 +1,6 @@
 import type { AbstractPoolWorker } from '../abstract-pool-worker'
 import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy'
+import type { RequiredStatistics } from './selection-strategies-types'
 
 /**
  * Worker virtual task timestamp.
@@ -22,6 +23,11 @@ export class FairShareWorkerChoiceStrategy<
   Data,
   Response
 > extends AbstractWorkerChoiceStrategy<Worker, Data, Response> {
+  /** @inheritDoc */
+  public requiredStatistics: RequiredStatistics = {
+    runTime: true
+  }
+
   /**
    *  Worker last virtual task execution timestamp.
    */
index dd545d41f590acdeada321782ee23fc51606168f..4d302f3defb1e7d5619d37693daf4104c944f377 100644 (file)
@@ -27,6 +27,13 @@ export const WorkerChoiceStrategies = Object.freeze({
  */
 export type WorkerChoiceStrategy = keyof typeof WorkerChoiceStrategies
 
+/**
+ * Tasks usage statistics requirements.
+ */
+export type RequiredStatistics = {
+  runTime: boolean
+}
+
 /**
  * Worker choice strategy interface.
  *
@@ -37,6 +44,10 @@ export interface IWorkerChoiceStrategy<Worker extends AbstractPoolWorker> {
    * Is the pool attached to the strategy dynamic?.
    */
   isDynamicPool: boolean
+  /**
+   * Required tasks usage statistics.
+   */
+  requiredStatistics: RequiredStatistics
   /**
    * Choose a worker in the pool.
    */
index db8fa235a8654f1c94b7e6f4c7e85ffdc82778f9..6e48f82f2bc877bcf396e0631fb02d9f76dab9a0 100644 (file)
@@ -2,6 +2,7 @@ import { cpus } from 'os'
 import type { AbstractPoolWorker } from '../abstract-pool-worker'
 import type { IPoolInternal } from '../pool-internal'
 import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy'
+import type { RequiredStatistics } from './selection-strategies-types'
 
 /**
  * Task run time.
@@ -24,6 +25,11 @@ export class WeightedRoundRobinWorkerChoiceStrategy<
   Data,
   Response
 > extends AbstractWorkerChoiceStrategy<Worker, Data, Response> {
+  /** @inheritDoc */
+  public requiredStatistics: RequiredStatistics = {
+    runTime: true
+  }
+
   /**
    * Worker index where the previous task was submitted.
    */
index b84a8632ffc35a78a71742aa206274997b45eb6d..8290f89ed9a143a8420a45cb1c5965454bebe35e 100644 (file)
@@ -60,6 +60,15 @@ export class WorkerChoiceStrategyContext<
     )
   }
 
+  /**
+   * Get the worker choice strategy used in the context.
+   *
+   * @returns The worker choice strategy.
+   */
+  public getWorkerChoiceStrategy (): IWorkerChoiceStrategy<Worker> {
+    return this.workerChoiceStrategy
+  }
+
   /**
    * Set the worker choice strategy to use in the context.
    *
index 1e337ff9f4a3a3cd18800434d65965679c7d6535..6a4da63d8a479b28410464bf9073118b28444fe4 100644 (file)
@@ -158,7 +158,7 @@ describe('Abstract pool test suite', () => {
     pool.destroy()
   })
 
-  it('Simulate worker not found during updateWorkerTasksRunTime', () => {
+  it('Simulate worker not found during updateWorkerTasksRunTime with strategy not requiring it', () => {
     const pool = new StubPoolWithWorkerTasksUsageMapClear(
       numberOfWorkers,
       './tests/worker-files/cluster/testWorker.js',
@@ -168,6 +168,21 @@ describe('Abstract pool test suite', () => {
     )
     // Simulate worker not found.
     pool.removeAllWorker()
+    expect(() => pool.updateWorkerTasksRunTime()).not.toThrowError()
+    pool.destroy()
+  })
+
+  it('Simulate worker not found during updateWorkerTasksRunTime with strategy requiring it', () => {
+    const pool = new StubPoolWithWorkerTasksUsageMapClear(
+      numberOfWorkers,
+      './tests/worker-files/cluster/testWorker.js',
+      {
+        workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE,
+        errorHandler: e => console.error(e)
+      }
+    )
+    // Simulate worker not found.
+    pool.removeAllWorker()
     expect(() => pool.updateWorkerTasksRunTime()).toThrowError(
       workerNotFoundInTasksUsageMapError
     )
index b12a8df4b726438ba173f867ea59e19123f00fb1..39895b213ff6baf1e88167827ac8cb0f717002b1 100644 (file)
@@ -46,6 +46,32 @@ describe('Selection strategies test suite', () => {
     await pool.destroy()
   })
 
+  it('Verify ROUND_ROBIN strategy default tasks usage statistics requirements', async () => {
+    const min = 0
+    const max = 3
+    let pool = new FixedThreadPool(
+      max,
+      './tests/worker-files/thread/testWorker.js'
+    )
+    pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.ROUND_ROBIN)
+    expect(
+      pool.workerChoiceStrategyContext.getWorkerChoiceStrategy()
+        .requiredStatistics.runTime
+    ).toBe(false)
+    pool = new DynamicThreadPool(
+      min,
+      max,
+      './tests/worker-files/thread/testWorker.js'
+    )
+    pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.ROUND_ROBIN)
+    expect(
+      pool.workerChoiceStrategyContext.getWorkerChoiceStrategy()
+        .requiredStatistics.runTime
+    ).toBe(false)
+    // We need to clean up the resources after our test
+    await pool.destroy()
+  })
+
   it('Verify ROUND_ROBIN strategy can be run in a fixed pool', async () => {
     const max = 3
     const pool = new FixedThreadPool(
@@ -116,6 +142,32 @@ describe('Selection strategies test suite', () => {
     await pool.destroy()
   })
 
+  it('Verify LESS_RECENTLY_USED strategy default tasks usage statistics requirements', async () => {
+    const min = 0
+    const max = 3
+    let pool = new FixedThreadPool(
+      max,
+      './tests/worker-files/thread/testWorker.js'
+    )
+    pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.LESS_RECENTLY_USED)
+    expect(
+      pool.workerChoiceStrategyContext.getWorkerChoiceStrategy()
+        .requiredStatistics.runTime
+    ).toBe(false)
+    pool = new DynamicThreadPool(
+      min,
+      max,
+      './tests/worker-files/thread/testWorker.js'
+    )
+    pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.LESS_RECENTLY_USED)
+    expect(
+      pool.workerChoiceStrategyContext.getWorkerChoiceStrategy()
+        .requiredStatistics.runTime
+    ).toBe(false)
+    // We need to clean up the resources after our test
+    await pool.destroy()
+  })
+
   it('Verify LESS_RECENTLY_USED strategy can be run in a fixed pool', async () => {
     const max = 3
     const pool = new FixedThreadPool(
@@ -180,6 +232,32 @@ describe('Selection strategies test suite', () => {
     await pool.destroy()
   })
 
+  it('Verify FAIR_SHARE strategy default tasks usage statistics requirements', async () => {
+    const min = 0
+    const max = 3
+    let pool = new FixedThreadPool(
+      max,
+      './tests/worker-files/thread/testWorker.js'
+    )
+    pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
+    expect(
+      pool.workerChoiceStrategyContext.getWorkerChoiceStrategy()
+        .requiredStatistics.runTime
+    ).toBe(true)
+    pool = new DynamicThreadPool(
+      min,
+      max,
+      './tests/worker-files/thread/testWorker.js'
+    )
+    pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
+    expect(
+      pool.workerChoiceStrategyContext.getWorkerChoiceStrategy()
+        .requiredStatistics.runTime
+    ).toBe(true)
+    // We need to clean up the resources after our test
+    await pool.destroy()
+  })
+
   it('Verify FAIR_SHARE strategy can be run in a fixed pool', async () => {
     const max = 3
     const pool = new FixedThreadPool(
@@ -244,6 +322,32 @@ describe('Selection strategies test suite', () => {
     await pool.destroy()
   })
 
+  it('Verify WEIGHTED_ROUND_ROBIN strategy default tasks usage statistics requirements', async () => {
+    const min = 0
+    const max = 3
+    let pool = new FixedThreadPool(
+      max,
+      './tests/worker-files/thread/testWorker.js'
+    )
+    pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN)
+    expect(
+      pool.workerChoiceStrategyContext.getWorkerChoiceStrategy()
+        .requiredStatistics.runTime
+    ).toBe(true)
+    pool = new DynamicThreadPool(
+      min,
+      max,
+      './tests/worker-files/thread/testWorker.js'
+    )
+    pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN)
+    expect(
+      pool.workerChoiceStrategyContext.getWorkerChoiceStrategy()
+        .requiredStatistics.runTime
+    ).toBe(true)
+    // We need to clean up the resources after our test
+    await pool.destroy()
+  })
+
   it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a fixed pool', async () => {
     const max = 3
     const pool = new FixedThreadPool(