feat: add support for tasks ELU in fair share strategy
authorJérôme Benoit <jerome.benoit@sap.com>
Fri, 9 Jun 2023 18:53:16 +0000 (20:53 +0200)
committerJérôme Benoit <jerome.benoit@sap.com>
Fri, 9 Jun 2023 18:53:16 +0000 (20:53 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
CHANGELOG.md
README.md
src/index.ts
src/pools/abstract-pool.ts
src/pools/selection-strategies/abstract-worker-choice-strategy.ts
src/pools/selection-strategies/fair-share-worker-choice-strategy.ts
src/pools/selection-strategies/selection-strategies-types.ts
tests/pools/abstract/abstract-pool.test.js
tests/pools/selection-strategies/selection-strategies.test.js

index e2bb9a87e0a25518c7917e19a07094ac8bfe9a17..56bb1c833c8ee2fd0b34e8061242ce50797faa0a 100644 (file)
@@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
 ### Added
 
 - Add `LEAST_ELU` worker choice strategy (experimental).
+- Add tasks ELU instead of runtime support to `FAIR_SHARE` worker choice strategy.
 
 ### Changed
 
index e0f665fe72edb8d8207ac727c593504a4a7eca81..d3a4534f3ee33d13f957aee65ef6a6cefa446ae0 100644 (file)
--- a/README.md
+++ b/README.md
@@ -166,7 +166,7 @@ An object with these properties:
   - `WorkerChoiceStrategies.LEAST_ELU`: Submit tasks to the worker with the minimum event loop utilization (ELU) (experimental)
   - `WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN`: Submit tasks to worker by using a weighted round robin scheduling algorithm based on tasks execution time
   - `WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN`: Submit tasks to worker by using an interleaved weighted round robin scheduling algorithm based on tasks execution time (experimental)
-  - `WorkerChoiceStrategies.FAIR_SHARE`: Submit tasks to worker by using a fair share tasks scheduling algorithm based on tasks execution time
+  - `WorkerChoiceStrategies.FAIR_SHARE`: Submit tasks to worker by using a fair share tasks scheduling algorithm based on tasks execution time or ELU
 
   `WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN`, `WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN` and `WorkerChoiceStrategies.FAIR_SHARE` strategies are targeted to heavy and long tasks.  
   Default: `WorkerChoiceStrategies.ROUND_ROBIN`
@@ -174,11 +174,13 @@ An object with these properties:
 - `workerChoiceStrategyOptions` (optional) - The worker choice strategy options object to use in this pool.  
   Properties:
 
+  - `measurement` (optional) - The measurement to use in worker choice strategies: `runTime`, `waitTime` or `elu`.
   - `runTime` (optional) - Use the tasks median runtime instead of the tasks average runtime in worker choice strategies.
   - `waitTime` (optional) - Use the tasks median wait time instead of the tasks average wait time in worker choice strategies.
-  - `weights` (optional) - The worker weights to use in the weighted round robin worker choice strategy: `{ 0: 200, 1: 300, ..., n: 100 }`
+  - `elu` (optional) - Use the tasks median ELU instead of the tasks average ELU in worker choice strategies.
+  - `weights` (optional) - The worker weights to use in the weighted round robin worker choice strategy: `{ 0: 200, 1: 300, ..., n: 100 }`.
 
-  Default: `{ runTime: { median: false }, waitTime: { median: false } }`
+  Default: `{ runTime: { median: false }, waitTime: { median: false }, elu: { median: false } }`
 
 - `restartWorkerOnError` (optional) - Restart worker on uncaught error in this pool.  
   Default: `true`
index 55355d2e33bc6dbf01e31eca234cbcb56c0a78ad..8b827876fbb2e1f4d864be2b378c6723b6bd883c 100644 (file)
@@ -17,6 +17,7 @@ export type {
 } from './pools/pool'
 export type {
   ErrorHandler,
+  EventLoopUtilizationMeasurementStatistics,
   ExitHandler,
   IWorker,
   MeasurementStatistics,
@@ -27,9 +28,15 @@ export type {
   WorkerNode,
   WorkerUsage
 } from './pools/worker'
-export { WorkerChoiceStrategies } from './pools/selection-strategies/selection-strategies-types'
+export {
+  Measurements,
+  WorkerChoiceStrategies
+} from './pools/selection-strategies/selection-strategies-types'
 export type {
   IWorkerChoiceStrategy,
+  Measurement,
+  MeasurementOptions,
+  MeasurementStatisticsRequirements,
   TaskStatisticsRequirements,
   WorkerChoiceStrategy,
   WorkerChoiceStrategyOptions
index f082b8d144993b28621cc58e2941551c8da4a264..427678c4042fd41426afca99889d808ade97b861 100644 (file)
@@ -563,10 +563,8 @@ export abstract class AbstractPool<
         .aggregate
     ) {
       if (workerUsage.elu != null && message.taskPerformance?.elu != null) {
-        workerUsage.elu.idle.aggregate =
-          workerUsage.elu.idle.aggregate + message.taskPerformance.elu.idle
-        workerUsage.elu.active.aggregate =
-          workerUsage.elu.active.aggregate + message.taskPerformance.elu.active
+        workerUsage.elu.idle.aggregate += message.taskPerformance.elu.idle
+        workerUsage.elu.active.aggregate += message.taskPerformance.elu.active
         workerUsage.elu.utilization =
           (workerUsage.elu.utilization +
             message.taskPerformance.elu.utilization) /
index 8f71b502fd9ee32aa46b4cb9190909ecabc2ff58..d9a573417ec2f96c0cbf68bc88a6931aeb68de29 100644 (file)
@@ -170,8 +170,8 @@ export abstract class AbstractWorkerChoiceStrategy<
 
   /**
    * Gets the worker task ELU.
-   * If the task statistics require the ELU, the average ELU is returned.
-   * If the task statistics require the ELU, the median ELU is returned.
+   * If the task statistics require the average ELU, the average ELU is returned.
+   * If the task statistics require the median ELU, the median ELU is returned.
    *
    * @param workerNodeKey - The worker node key.
    * @returns The worker task ELU.
index 52b25fb947cd38c08c024ab5c816b90f5adeae0a..ff9eaf36a93791b2d4e0002c2decd22402e92607 100644 (file)
@@ -2,10 +2,11 @@ import { DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS } from '../../utils'
 import type { IPool } from '../pool'
 import type { IWorker } from '../worker'
 import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy'
-import type {
-  IWorkerChoiceStrategy,
-  TaskStatisticsRequirements,
-  WorkerChoiceStrategyOptions
+import {
+  type IWorkerChoiceStrategy,
+  Measurements,
+  type TaskStatisticsRequirements,
+  type WorkerChoiceStrategyOptions
 } from './selection-strategies-types'
 
 /**
@@ -36,8 +37,8 @@ export class FairShareWorkerChoiceStrategy<
       median: false
     },
     elu: {
-      aggregate: false,
-      average: false,
+      aggregate: true,
+      average: true,
       median: false
     }
   }
@@ -109,9 +110,11 @@ export class FairShareWorkerChoiceStrategy<
     workerNodeKey: number,
     workerVirtualTaskStartTimestamp: number
   ): number {
-    return (
-      workerVirtualTaskStartTimestamp + this.getWorkerTaskRunTime(workerNodeKey)
-    )
+    const workerTaskRunTime =
+      this.opts.measurement === Measurements.elu
+        ? this.getWorkerTaskElu(workerNodeKey)
+        : this.getWorkerTaskRunTime(workerNodeKey)
+    return workerVirtualTaskStartTimestamp + workerTaskRunTime
   }
 
   private getWorkerVirtualTaskStartTimestamp (workerNodeKey: number): number {
index c578ddc3f9fb24e289f5caf8fa542ec402d63834..b0b036ee9069893098507e91a6913e1948a2c82a 100644 (file)
@@ -41,10 +41,24 @@ export const WorkerChoiceStrategies = Object.freeze({
  */
 export type WorkerChoiceStrategy = keyof typeof WorkerChoiceStrategies
 
+/**
+ * Enumeration of measurements.
+ */
+export const Measurements = Object.freeze({
+  runTime: 'runTime',
+  waitTime: 'waitTime',
+  elu: 'elu'
+} as const)
+
+/**
+ * Measurement.
+ */
+export type Measurement = keyof typeof Measurements
+
 /**
  * Measurement options.
  */
-interface MeasurementOptions {
+export interface MeasurementOptions {
   /**
    * Set measurement median.
    */
@@ -55,6 +69,10 @@ interface MeasurementOptions {
  * Worker choice strategy options.
  */
 export interface WorkerChoiceStrategyOptions {
+  /**
+   * Measurement to use for worker choice strategy.
+   */
+  measurement?: Measurement
   /**
    * Runtime options.
    *
@@ -87,7 +105,7 @@ export interface WorkerChoiceStrategyOptions {
  *
  * @internal
  */
-interface MeasurementStatisticsRequirements {
+export interface MeasurementStatisticsRequirements {
   /**
    * Require measurement aggregate.
    */
index 25566a6a2ac115e581f435eb290861031a570530..4b6f46a607b02b3f501d97b87acb588ffb0bf4c0 100644 (file)
@@ -210,19 +210,24 @@ describe('Abstract pool test suite', () => {
         median: false
       },
       elu: {
-        aggregate: false,
-        average: false,
+        aggregate: true,
+        average: true,
         median: false
       }
     })
-    pool.setWorkerChoiceStrategyOptions({ runTime: { median: true } })
+    pool.setWorkerChoiceStrategyOptions({
+      runTime: { median: true },
+      elu: { median: true }
+    })
     expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
-      runTime: { median: true }
+      runTime: { median: true },
+      elu: { median: true }
     })
     for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
       .workerChoiceStrategies) {
       expect(workerChoiceStrategy.opts).toStrictEqual({
-        runTime: { median: true }
+        runTime: { median: true },
+        elu: { median: true }
       })
     }
     expect(
@@ -239,19 +244,24 @@ describe('Abstract pool test suite', () => {
         median: false
       },
       elu: {
-        aggregate: false,
+        aggregate: true,
         average: false,
-        median: false
+        median: true
       }
     })
-    pool.setWorkerChoiceStrategyOptions({ runTime: { median: false } })
+    pool.setWorkerChoiceStrategyOptions({
+      runTime: { median: false },
+      elu: { median: false }
+    })
     expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
-      runTime: { median: false }
+      runTime: { median: false },
+      elu: { median: false }
     })
     for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
       .workerChoiceStrategies) {
       expect(workerChoiceStrategy.opts).toStrictEqual({
-        runTime: { median: false }
+        runTime: { median: false },
+        elu: { median: false }
       })
     }
     expect(
@@ -268,8 +278,8 @@ describe('Abstract pool test suite', () => {
         median: false
       },
       elu: {
-        aggregate: false,
-        average: false,
+        aggregate: true,
+        average: true,
         median: false
       }
     })
index 9b581ff57a9f052d93bca4edfe67e46c266ce8b6..e9376281efbe66dec0a5f193b03a9fc02bf039fd 100644 (file)
@@ -898,8 +898,8 @@ describe('Selection strategies test suite', () => {
         median: false
       },
       elu: {
-        aggregate: false,
-        average: false,
+        aggregate: true,
+        average: true,
         median: false
       }
     })
@@ -924,8 +924,8 @@ describe('Selection strategies test suite', () => {
         median: false
       },
       elu: {
-        aggregate: false,
-        average: false,
+        aggregate: true,
+        average: true,
         median: false
       }
     })
@@ -974,16 +974,18 @@ describe('Selection strategies test suite', () => {
             history: expect.any(CircularArray)
           },
           active: {
-            aggregate: 0,
-            average: 0,
+            aggregate: expect.any(Number),
+            average: expect.any(Number),
             median: 0,
             history: expect.any(CircularArray)
           },
-          utilization: 0
+          utilization: expect.any(Number)
         }
       })
       expect(workerNode.workerUsage.runTime.aggregate).toBeGreaterThan(0)
       expect(workerNode.workerUsage.runTime.average).toBeGreaterThan(0)
+      expect(workerNode.workerUsage.elu.utilization).toBeGreaterThanOrEqual(0)
+      expect(workerNode.workerUsage.elu.utilization).toBeLessThanOrEqual(1)
     }
     expect(
       pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
@@ -1036,16 +1038,18 @@ describe('Selection strategies test suite', () => {
             history: expect.any(CircularArray)
           },
           active: {
-            aggregate: 0,
-            average: 0,
+            aggregate: expect.any(Number),
+            average: expect.any(Number),
             median: 0,
             history: expect.any(CircularArray)
           },
-          utilization: 0
+          utilization: expect.any(Number)
         }
       })
       expect(workerNode.workerUsage.runTime.aggregate).toBeGreaterThan(0)
       expect(workerNode.workerUsage.runTime.average).toBeGreaterThan(0)
+      expect(workerNode.workerUsage.elu.utilization).toBeGreaterThanOrEqual(0)
+      expect(workerNode.workerUsage.elu.utilization).toBeLessThanOrEqual(1)
     }
     expect(
       pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
@@ -1103,16 +1107,18 @@ describe('Selection strategies test suite', () => {
             history: expect.any(CircularArray)
           },
           active: {
-            aggregate: 0,
-            average: 0,
+            aggregate: expect.any(Number),
+            average: expect.any(Number),
             median: 0,
             history: expect.any(CircularArray)
           },
-          utilization: 0
+          utilization: expect.any(Number)
         }
       })
       expect(workerNode.workerUsage.runTime.aggregate).toBeGreaterThan(0)
       expect(workerNode.workerUsage.runTime.median).toBeGreaterThan(0)
+      expect(workerNode.workerUsage.elu.utilization).toBeGreaterThanOrEqual(0)
+      expect(workerNode.workerUsage.elu.utilization).toBeLessThanOrEqual(1)
     }
     expect(
       pool.workerChoiceStrategyContext.workerChoiceStrategies.get(