feat: add tasks wait time account per worker
authorJérôme Benoit <jerome.benoit@sap.com>
Mon, 29 May 2023 20:37:59 +0000 (22:37 +0200)
committerJérôme Benoit <jerome.benoit@sap.com>
Mon, 29 May 2023 20:37:59 +0000 (22:37 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
README.md
benchmarks/versus-external-pools/package.json
package.json
src/pools/abstract-pool.ts
src/pools/pool.ts
src/pools/selection-strategies/abstract-worker-choice-strategy.ts
src/pools/selection-strategies/fair-share-worker-choice-strategy.ts
src/pools/selection-strategies/less-busy-worker-choice-strategy.ts
src/pools/selection-strategies/selection-strategies-types.ts
src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts
src/pools/worker.ts

index 18b0d78a10645439c47d226e9a7587dbfc0fdcbc..1b7e641b40bd27313756961437e09f1ab5a9db0c 100644 (file)
--- a/README.md
+++ b/README.md
@@ -145,7 +145,7 @@ Remember that workers can only send and receive serializable data.
 
 ## Node versions
 
-Node versions >= 16.x are supported.
+Node versions >= 16.14.x are supported.
 
 ## [API](https://poolifier.github.io/poolifier/)
 
index 9d1ecc6b998e5446703771f8f2525b1fbd6ced81..b466e3c6fd6b73c30bab04bb7b8bfc65c1e14e9a 100644 (file)
@@ -6,7 +6,7 @@
   "main": "index.js",
   "author": "pioardi",
   "engines": {
-    "node": ">=14.14.0",
+    "node": ">=16.14.0",
     "pnpm": ">=8.6.0"
   },
   "volta": {
index 16edd5ca3e93c7bc4723847bc4fd9450a0fb3f99..6a189663a45948ba4fd423b418de5f68a7e4a8b3 100644 (file)
@@ -43,7 +43,7 @@
     ]
   },
   "engines": {
-    "node": ">=16.0.0",
+    "node": ">=16.14.0",
     "pnpm": ">=8.6.0"
   },
   "volta": {
index 4c82ef08a8cdad70602ebf20a64235b1cb3833f0..e396744603eed1011518cbb3f62b59bb3cedd305 100644 (file)
@@ -263,6 +263,10 @@ export abstract class AbstractPool<
         runTimeHistory: new CircularArray(),
         avgRunTime: 0,
         medRunTime: 0,
+        waitTime: 0,
+        waitTimeHistory: new CircularArray(),
+        avgWaitTime: 0,
+        medWaitTime: 0,
         error: 0
       })
     }
@@ -340,11 +344,13 @@ export abstract class AbstractPool<
 
   /** @inheritDoc */
   public async execute (data?: Data, name?: string): Promise<Response> {
+    const submissionTimestamp = performance.now()
     const workerNodeKey = this.chooseWorkerNode()
     const submittedTask: Task<Data> = {
       name,
       // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
       data: data ?? ({} as Data),
+      submissionTimestamp,
       id: crypto.randomUUID()
     }
     const res = new Promise<Response>((resolve, reject) => {
@@ -409,8 +415,22 @@ export abstract class AbstractPool<
    *
    * @param workerNodeKey - The worker node key.
    */
-  protected beforeTaskExecutionHook (workerNodeKey: number): void {
-    ++this.workerNodes[workerNodeKey].tasksUsage.running
+  protected beforeTaskExecutionHook (
+    workerNodeKey: number,
+    task: Task<Data>
+  ): void {
+    const workerTasksUsage = this.workerNodes[workerNodeKey].tasksUsage
+    ++workerTasksUsage.running
+    if (this.workerChoiceStrategyContext.getRequiredStatistics().waitTime) {
+      const waitTime = performance.now() - (task.submissionTimestamp ?? 0)
+      workerTasksUsage.waitTime += waitTime
+      if (
+        this.workerChoiceStrategyContext.getRequiredStatistics().medWaitTime
+      ) {
+        workerTasksUsage.waitTimeHistory.push(waitTime)
+        workerTasksUsage.medWaitTime = median(workerTasksUsage.waitTimeHistory)
+      }
+    }
   }
 
   /**
@@ -448,6 +468,14 @@ export abstract class AbstractPool<
         workerTasksUsage.medRunTime = median(workerTasksUsage.runTimeHistory)
       }
     }
+    if (
+      this.workerChoiceStrategyContext.getRequiredStatistics().waitTime &&
+      this.workerChoiceStrategyContext.getRequiredStatistics().avgWaitTime &&
+      workerTasksUsage.run !== 0
+    ) {
+      workerTasksUsage.avgWaitTime =
+        workerTasksUsage.waitTime / workerTasksUsage.run
+    }
   }
 
   /**
@@ -611,6 +639,10 @@ export abstract class AbstractPool<
         runTimeHistory: new CircularArray(),
         avgRunTime: 0,
         medRunTime: 0,
+        waitTime: 0,
+        waitTimeHistory: new CircularArray(),
+        avgWaitTime: 0,
+        medWaitTime: 0,
         error: 0
       },
       tasksQueue: new Queue<Task<Data>>()
@@ -650,7 +682,7 @@ export abstract class AbstractPool<
   }
 
   private executeTask (workerNodeKey: number, task: Task<Data>): void {
-    this.beforeTaskExecutionHook(workerNodeKey)
+    this.beforeTaskExecutionHook(workerNodeKey, task)
     this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
   }
 
index 0a015476f3af30e9750b092aadbd428282784b3f..14b30812d14bd21e55c0af4aee77db5daf8bc7e4 100644 (file)
@@ -1,4 +1,4 @@
-import EventEmitter from 'node:events'
+import EventEmitterAsyncResource from 'node:events'
 import type {
   ErrorHandler,
   ExitHandler,
@@ -32,7 +32,7 @@ export enum PoolType {
 /**
  * Pool events emitter.
  */
-export class PoolEmitter extends EventEmitter {}
+export class PoolEmitter extends EventEmitterAsyncResource {}
 
 /**
  * Enumeration of pool events.
index a4455d28a8e33c5bac1bceaf679b8feab5de533b..ad42f9184c78e5116332a507c003cd4530f8da9e 100644 (file)
@@ -27,7 +27,10 @@ export abstract class AbstractWorkerChoiceStrategy<
   public readonly requiredStatistics: RequiredStatistics = {
     runTime: false,
     avgRunTime: false,
-    medRunTime: false
+    medRunTime: false,
+    waitTime: false,
+    avgWaitTime: false,
+    medWaitTime: false
   }
 
   /**
@@ -52,6 +55,14 @@ export abstract class AbstractWorkerChoiceStrategy<
       this.requiredStatistics.avgRunTime = true
       this.requiredStatistics.medRunTime = opts.medRunTime as boolean
     }
+    if (this.requiredStatistics.avgWaitTime && opts.medWaitTime === true) {
+      this.requiredStatistics.avgWaitTime = false
+      this.requiredStatistics.medWaitTime = opts.medWaitTime as boolean
+    }
+    if (this.requiredStatistics.medWaitTime && opts.medWaitTime === false) {
+      this.requiredStatistics.avgWaitTime = true
+      this.requiredStatistics.medWaitTime = opts.medWaitTime as boolean
+    }
   }
 
   /** @inheritDoc */
index e26f6b8f5953dff0b3d6dc1d59469977ab264494..56d9dc361fe6e70255877ec6c1e6be6f33b70a70 100644 (file)
@@ -27,7 +27,10 @@ export class FairShareWorkerChoiceStrategy<
   public readonly requiredStatistics: RequiredStatistics = {
     runTime: true,
     avgRunTime: true,
-    medRunTime: false
+    medRunTime: false,
+    waitTime: false,
+    avgWaitTime: false,
+    medWaitTime: false
   }
 
   /**
index f518f9b17f311ad3a8ad262e3b7e5f0be15cc61b..02f51c6b0194dec3a0c21b38beadbad0a6b943ef 100644 (file)
@@ -26,7 +26,10 @@ export class LessBusyWorkerChoiceStrategy<
   public readonly requiredStatistics: RequiredStatistics = {
     runTime: true,
     avgRunTime: false,
-    medRunTime: false
+    medRunTime: false,
+    waitTime: false,
+    avgWaitTime: false,
+    medWaitTime: false
   }
 
   /** @inheritDoc */
index b95cf7f3ae22b6338fcf48940415e23337307715..94e2d3e6d3a8df349cf8aea49bb725dbdeaae4d4 100644 (file)
@@ -39,6 +39,12 @@ export interface WorkerChoiceStrategyOptions {
    * @defaultValue false
    */
   medRunTime?: boolean
+  /**
+   * Use tasks median wait time instead of average run time.
+   *
+   * @defaultValue false
+   */
+  medWaitTime?: boolean
   /**
    * Worker weights to use for weighted round robin worker selection strategy.
    * Weight is the tasks maximum average or median runtime in milliseconds.
@@ -66,6 +72,18 @@ export interface RequiredStatistics {
    * Require tasks median run time.
    */
   medRunTime: boolean
+  /**
+   * Require tasks wait time.
+   */
+  waitTime: boolean
+  /**
+   * Require tasks average wait time.
+   */
+  avgWaitTime: boolean
+  /**
+   * Require tasks median wait time.
+   */
+  medWaitTime: boolean
 }
 
 /**
index a0553fcb81ace9c910b1ecd755b9c123b640f362..df45feb3829a4972962d1f51aeeb1e7bcfbc1d7d 100644 (file)
@@ -28,7 +28,10 @@ export class WeightedRoundRobinWorkerChoiceStrategy<
   public readonly requiredStatistics: RequiredStatistics = {
     runTime: true,
     avgRunTime: true,
-    medRunTime: false
+    medRunTime: false,
+    waitTime: false,
+    avgWaitTime: false,
+    medWaitTime: false
   }
 
   /**
index 2db5f630fefa5a1e81161ca75362e7c73fc450a1..35d2c0abaa331ecad6e9a0404deaa90c00ef7d4c 100644 (file)
@@ -45,6 +45,10 @@ export interface Task<Data = unknown> {
    * Task input data that will be passed to the worker.
    */
   readonly data?: Data
+  /**
+   * Submission timestamp.
+   */
+  readonly submissionTimestamp?: number
   /**
    * Message UUID.
    */
@@ -81,6 +85,22 @@ export interface TasksUsage {
    * Median tasks runtime.
    */
   medRunTime: number
+  /**
+   * Tasks wait time.
+   */
+  waitTime: number
+  /**
+   * Tasks wait time history.
+   */
+  waitTimeHistory: CircularArray<number>
+  /**
+   * Average tasks wait time.
+   */
+  avgWaitTime: number
+  /**
+   * Median tasks wait time.
+   */
+  medWaitTime: number
   /**
    * Number of tasks errored.
    */