Merge branch 'master' of github.com:poolifier/poolifier into waittime
authorJérôme Benoit <jerome.benoit@sap.com>
Tue, 30 May 2023 08:52:30 +0000 (10:52 +0200)
committerJérôme Benoit <jerome.benoit@sap.com>
Tue, 30 May 2023 08:52:30 +0000 (10:52 +0200)
src/pools/abstract-pool.ts
src/pools/selection-strategies/abstract-worker-choice-strategy.ts
src/pools/selection-strategies/fair-share-worker-choice-strategy.ts
src/pools/selection-strategies/least-busy-worker-choice-strategy.ts
src/pools/selection-strategies/selection-strategies-types.ts
src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts
src/pools/worker.ts
src/utility-types.ts
src/worker/abstract-worker.ts

index 4c82ef08a8cdad70602ebf20a64235b1cb3833f0..2e769fc9c2660305bbbb84ab370468e7d4326da0 100644 (file)
@@ -263,6 +263,10 @@ export abstract class AbstractPool<
         runTimeHistory: new CircularArray(),
         avgRunTime: 0,
         medRunTime: 0,
+        waitTime: 0,
+        waitTimeHistory: new CircularArray(),
+        avgWaitTime: 0,
+        medWaitTime: 0,
         error: 0
       })
     }
@@ -340,11 +344,13 @@ export abstract class AbstractPool<
 
   /** @inheritDoc */
   public async execute (data?: Data, name?: string): Promise<Response> {
+    const submissionTimestamp = performance.now()
     const workerNodeKey = this.chooseWorkerNode()
     const submittedTask: Task<Data> = {
       name,
       // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
       data: data ?? ({} as Data),
+      submissionTimestamp,
       id: crypto.randomUUID()
     }
     const res = new Promise<Response>((resolve, reject) => {
@@ -448,6 +454,23 @@ export abstract class AbstractPool<
         workerTasksUsage.medRunTime = median(workerTasksUsage.runTimeHistory)
       }
     }
+    if (this.workerChoiceStrategyContext.getRequiredStatistics().waitTime) {
+      workerTasksUsage.waitTime += message.waitTime ?? 0
+      if (
+        this.workerChoiceStrategyContext.getRequiredStatistics().avgWaitTime &&
+        workerTasksUsage.run !== 0
+      ) {
+        workerTasksUsage.avgWaitTime =
+          workerTasksUsage.waitTime / workerTasksUsage.run
+      }
+      if (
+        this.workerChoiceStrategyContext.getRequiredStatistics().medWaitTime &&
+        message.waitTime != null
+      ) {
+        workerTasksUsage.waitTimeHistory.push(message.waitTime)
+        workerTasksUsage.medWaitTime = median(workerTasksUsage.waitTimeHistory)
+      }
+    }
   }
 
   /**
@@ -611,6 +634,10 @@ export abstract class AbstractPool<
         runTimeHistory: new CircularArray(),
         avgRunTime: 0,
         medRunTime: 0,
+        waitTime: 0,
+        waitTimeHistory: new CircularArray(),
+        avgWaitTime: 0,
+        medWaitTime: 0,
         error: 0
       },
       tasksQueue: new Queue<Task<Data>>()
index d969f0c0ad5ca224de0d30356d517821b1e76c3c..efa9578d8b2f889630aadec6da595ded93cd087f 100644 (file)
@@ -27,7 +27,10 @@ export abstract class AbstractWorkerChoiceStrategy<
   public readonly requiredStatistics: RequiredStatistics = {
     runTime: false,
     avgRunTime: false,
-    medRunTime: false
+    medRunTime: false,
+    waitTime: false,
+    avgWaitTime: false,
+    medWaitTime: false
   }
 
   /**
@@ -52,6 +55,14 @@ export abstract class AbstractWorkerChoiceStrategy<
       this.requiredStatistics.avgRunTime = true
       this.requiredStatistics.medRunTime = opts.medRunTime as boolean
     }
+    if (this.requiredStatistics.avgWaitTime && opts.medWaitTime === true) {
+      this.requiredStatistics.avgWaitTime = false
+      this.requiredStatistics.medWaitTime = opts.medWaitTime as boolean
+    }
+    if (this.requiredStatistics.medWaitTime && opts.medWaitTime === false) {
+      this.requiredStatistics.avgWaitTime = true
+      this.requiredStatistics.medWaitTime = opts.medWaitTime as boolean
+    }
   }
 
   /** @inheritDoc */
index e26f6b8f5953dff0b3d6dc1d59469977ab264494..56d9dc361fe6e70255877ec6c1e6be6f33b70a70 100644 (file)
@@ -27,7 +27,10 @@ export class FairShareWorkerChoiceStrategy<
   public readonly requiredStatistics: RequiredStatistics = {
     runTime: true,
     avgRunTime: true,
-    medRunTime: false
+    medRunTime: false,
+    waitTime: false,
+    avgWaitTime: false,
+    medWaitTime: false
   }
 
   /**
index 05caa42a0149ebc7dc29ef76edc3a025e8b24c2c..6e413825d666025022acf6018dfe47c77e8345fe 100644 (file)
@@ -26,7 +26,10 @@ export class LeastBusyWorkerChoiceStrategy<
   public readonly requiredStatistics: RequiredStatistics = {
     runTime: true,
     avgRunTime: false,
-    medRunTime: false
+    medRunTime: false,
+    waitTime: false,
+    avgWaitTime: false,
+    medWaitTime: false
   }
 
   /** @inheritDoc */
index 282e306db18a4c55bb1016e078ce8456ff64d843..c90036fe6f269dd7757482064a3fdba0b7a97e97 100644 (file)
@@ -39,6 +39,12 @@ export interface WorkerChoiceStrategyOptions {
    * @defaultValue false
    */
   medRunTime?: boolean
+  /**
+   * Use tasks median wait time instead of average runtime.
+   *
+   * @defaultValue false
+   */
+  medWaitTime?: boolean
   /**
    * Worker weights to use for weighted round robin worker selection strategy.
    * Weight is the tasks maximum average or median runtime in milliseconds.
@@ -66,6 +72,18 @@ export interface RequiredStatistics {
    * Require tasks median runtime.
    */
   medRunTime: boolean
+  /**
+   * Require tasks wait time.
+   */
+  waitTime: boolean
+  /**
+   * Require tasks average wait time.
+   */
+  avgWaitTime: boolean
+  /**
+   * Require tasks median wait time.
+   */
+  medWaitTime: boolean
 }
 
 /**
index a0553fcb81ace9c910b1ecd755b9c123b640f362..df45feb3829a4972962d1f51aeeb1e7bcfbc1d7d 100644 (file)
@@ -28,7 +28,10 @@ export class WeightedRoundRobinWorkerChoiceStrategy<
   public readonly requiredStatistics: RequiredStatistics = {
     runTime: true,
     avgRunTime: true,
-    medRunTime: false
+    medRunTime: false,
+    waitTime: false,
+    avgWaitTime: false,
+    medWaitTime: false
   }
 
   /**
index 2db5f630fefa5a1e81161ca75362e7c73fc450a1..35d2c0abaa331ecad6e9a0404deaa90c00ef7d4c 100644 (file)
@@ -45,6 +45,10 @@ export interface Task<Data = unknown> {
    * Task input data that will be passed to the worker.
    */
   readonly data?: Data
+  /**
+   * Submission timestamp.
+   */
+  readonly submissionTimestamp?: number
   /**
    * Message UUID.
    */
@@ -81,6 +85,22 @@ export interface TasksUsage {
    * Median tasks runtime.
    */
   medRunTime: number
+  /**
+   * Tasks wait time.
+   */
+  waitTime: number
+  /**
+   * Tasks wait time history.
+   */
+  waitTimeHistory: CircularArray<number>
+  /**
+   * Average tasks wait time.
+   */
+  avgWaitTime: number
+  /**
+   * Median tasks wait time.
+   */
+  medWaitTime: number
   /**
    * Number of tasks errored.
    */
index 08861ebd30045ae430de21eae055f9634a007341..cc04f2cb99113b9cb66305c102ae5c8fe88563ad 100644 (file)
@@ -33,6 +33,10 @@ export interface MessageValue<
    * Runtime.
    */
   readonly runTime?: number
+  /**
+   * Wait time.
+   */
+  readonly waitTime?: number
   /**
    * Reference to main worker.
    */
index 5d0c9492204e0361b8007ef4dccfb3c0154bebca..176a5893caeeec7cb9d82095840b97498b3822b0 100644 (file)
@@ -207,12 +207,14 @@ export abstract class AbstractWorker<
   ): void {
     try {
       const startTimestamp = performance.now()
+      const waitTime = startTimestamp - (message.submissionTimestamp ?? 0)
       const res = fn(message.data)
       const runTime = performance.now() - startTimestamp
       this.sendToMainWorker({
         data: res,
         id: message.id,
-        runTime
+        runTime,
+        waitTime
       })
     } catch (e) {
       const err = this.handleError(e as Error)
@@ -233,13 +235,15 @@ export abstract class AbstractWorker<
     message: MessageValue<Data>
   ): void {
     const startTimestamp = performance.now()
+    const waitTime = startTimestamp - (message.submissionTimestamp ?? 0)
     fn(message.data)
       .then(res => {
         const runTime = performance.now() - startTimestamp
         this.sendToMainWorker({
           data: res,
           id: message.id,
-          runTime
+          runTime,
+          waitTime
         })
         return null
       })