fix: fix task wait time computation
authorJérôme Benoit <jerome.benoit@sap.com>
Thu, 8 Jun 2023 21:56:04 +0000 (23:56 +0200)
committerJérôme Benoit <jerome.benoit@sap.com>
Thu, 8 Jun 2023 21:56:04 +0000 (23:56 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
12 files changed:
CHANGELOG.md
README.md
src/pools/abstract-pool.ts
src/pools/selection-strategies/abstract-worker-choice-strategy.ts
src/pools/selection-strategies/least-busy-worker-choice-strategy.ts
src/pools/selection-strategies/least-used-worker-choice-strategy.ts
src/utility-types.ts
src/worker/abstract-worker.ts
src/worker/worker-options.ts
tests/pools/cluster/dynamic.test.js
tests/pools/selection-strategies/selection-strategies.test.js
tests/pools/thread/dynamic.test.js

index fd67064c91d056e6ca45a8b11a892875266636a9..01c1dab45f43214835436d40ea9eb887e000686e 100644 (file)
@@ -7,6 +7,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
 
 ## [Unreleased]
 
+### Changed
+
+- Refactor pool worker node usage internals.
+
+### Fixed
+
+- Fix wait time accounting.
+- Ensure worker choice strategy `LEAST_BUSY` accounts also tasks wait time.
+- Ensure worker choice strategy `LEAST_USED` accounts also queued tasks.
+
 ## [2.5.4] - 2023-06-07
 
 ### Added
@@ -142,7 +152,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
 ### Fixed
 
 - Ensure one task at a time is executed per worker with tasks queueing enabled.
-- Properly count worker running tasks with tasks queueing enabled.
+- Properly count worker executing tasks with tasks queueing enabled.
 
 ## [2.4.5] - 2023-04-09
 
index 4c7b33a9368048b9f6cc9853affaa687e8e808c0..5383863d23741c7d74c4eb585c05a75556be2db6 100644 (file)
--- a/README.md
+++ b/README.md
@@ -161,8 +161,8 @@ An object with these properties:
 - `workerChoiceStrategy` (optional) - The worker choice strategy to use in this pool:
 
   - `WorkerChoiceStrategies.ROUND_ROBIN`: Submit tasks to worker in a round robin fashion
-  - `WorkerChoiceStrategies.LEAST_USED`: Submit tasks to the worker with the minimum number of running and ran tasks
-  - `WorkerChoiceStrategies.LEAST_BUSY`: Submit tasks to the worker with the minimum tasks total execution time
+  - `WorkerChoiceStrategies.LEAST_USED`: Submit tasks to the worker with the minimum number of executed, executing and queued tasks
+  - `WorkerChoiceStrategies.LEAST_BUSY`: Submit tasks to the worker with the minimum tasks total execution and wait time
   - `WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN`: Submit tasks to worker by using a weighted round robin scheduling algorithm based on tasks execution time
   - `WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN`: Submit tasks to worker by using an interleaved weighted round robin scheduling algorithm based on tasks execution time (experimental)
   - `WorkerChoiceStrategies.FAIR_SHARE`: Submit tasks to worker by using a fair share tasks scheduling algorithm based on tasks execution time
@@ -238,8 +238,8 @@ This method will call the terminate method on each worker.
   Default: `60000`
 
 - `killBehavior` (optional) - Dictates if your async unit (worker/process) will be deleted in case that a task is active on it.  
-  **KillBehaviors.SOFT**: If `currentTime - lastActiveTime` is greater than `maxInactiveTime` but a task is still running, then the worker **won't** be deleted.  
-  **KillBehaviors.HARD**: If `currentTime - lastActiveTime` is greater than `maxInactiveTime` but a task is still running, then the worker will be deleted.  
+  **KillBehaviors.SOFT**: If `currentTime - lastActiveTime` is greater than `maxInactiveTime` but a task is still executing, then the worker **won't** be deleted.  
+  **KillBehaviors.HARD**: If `currentTime - lastActiveTime` is greater than `maxInactiveTime` but a task is still executing, then the worker will be deleted.  
   This option only apply to the newly created workers.  
   Default: `KillBehaviors.SOFT`
 
index db0d5455d7987002ae5cd8fd5fe8a4ea231942bb..ca660951932fcf24219b98a5ba996f524085e0c5 100644 (file)
@@ -462,9 +462,15 @@ export abstract class AbstractPool<
    * Can be overridden.
    *
    * @param workerNodeKey - The worker node key.
+   * @param task - The task to execute.
    */
-  protected beforeTaskExecutionHook (workerNodeKey: number): void {
-    ++this.workerNodes[workerNodeKey].workerUsage.tasks.executing
+  protected beforeTaskExecutionHook (
+    workerNodeKey: number,
+    task: Task<Data>
+  ): void {
+    const workerUsage = this.workerNodes[workerNodeKey].workerUsage
+    ++workerUsage.tasks.executing
+    this.updateWaitTimeWorkerUsage(workerUsage, task)
   }
 
   /**
@@ -486,9 +492,7 @@ export abstract class AbstractPool<
     if (message.taskError != null) {
       ++workerTaskStatistics.failed
     }
-
     this.updateRunTimeWorkerUsage(workerUsage, message)
-    this.updateWaitTimeWorkerUsage(workerUsage, message)
     this.updateEluWorkerUsage(workerUsage, message)
   }
 
@@ -521,12 +525,14 @@ export abstract class AbstractPool<
 
   private updateWaitTimeWorkerUsage (
     workerUsage: WorkerUsage,
-    message: MessageValue<Response>
+    task: Task<Data>
   ): void {
+    const timestamp = performance.now()
+    const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
     if (
       this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime
     ) {
-      workerUsage.waitTime.aggregation += message.taskPerformance?.waitTime ?? 0
+      workerUsage.waitTime.aggregation += taskWaitTime ?? 0
       if (
         this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
           .avgWaitTime &&
@@ -538,9 +544,9 @@ export abstract class AbstractPool<
       if (
         this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
           .medWaitTime &&
-        message.taskPerformance?.waitTime != null
+        taskWaitTime != null
       ) {
-        workerUsage.waitTime.history.push(message.taskPerformance.waitTime)
+        workerUsage.waitTime.history.push(taskWaitTime)
         workerUsage.waitTime.median = median(workerUsage.waitTime.history)
       }
     }
@@ -781,7 +787,7 @@ export abstract class AbstractPool<
   }
 
   private executeTask (workerNodeKey: number, task: Task<Data>): void {
-    this.beforeTaskExecutionHook(workerNodeKey)
+    this.beforeTaskExecutionHook(workerNodeKey, task)
     this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
   }
 
@@ -820,9 +826,6 @@ export abstract class AbstractPool<
         runTime:
           this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
             .runTime,
-        waitTime:
-          this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
-            .waitTime,
         elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
           .elu
       }
@@ -831,7 +834,7 @@ export abstract class AbstractPool<
 
   private getWorkerUsage (worker: Worker): WorkerUsage {
     return {
-      tasks: this.getTaskStatistics(this, worker),
+      tasks: this.getTaskStatistics(worker),
       runTime: {
         aggregation: 0,
         average: 0,
@@ -848,15 +851,14 @@ export abstract class AbstractPool<
     }
   }
 
-  private getTaskStatistics (
-    self: AbstractPool<Worker, Data, Response>,
-    worker: Worker
-  ): TaskStatistics {
+  private getTaskStatistics (worker: Worker): TaskStatistics {
+    const queueSize =
+      this.workerNodes[this.getWorkerNodeKey(worker)]?.tasksQueue?.size
     return {
       executed: 0,
       executing: 0,
       get queued (): number {
-        return self.tasksQueueSize(self.getWorkerNodeKey(worker))
+        return queueSize ?? 0
       },
       failed: 0
     }
index b5b77ff6e261519d2bbef3154ca701e059d77a16..41573ab43dcafe60560b25924fbf1adb94901970 100644 (file)
@@ -154,7 +154,7 @@ export abstract class AbstractWorkerChoiceStrategy<
   /**
    * Finds the first free worker node key based on the number of tasks the worker has applied.
    *
-   * If a worker is found with `0` running tasks, it is detected as free and its worker node key is returned.
+   * If a worker is found with `0` executing tasks, it is detected as free and its worker node key is returned.
    *
    * If no free worker is found, `-1` is returned.
    *
@@ -169,7 +169,7 @@ export abstract class AbstractWorkerChoiceStrategy<
   /**
    * Finds the last free worker node key based on the number of tasks the worker has applied.
    *
-   * If a worker is found with `0` running tasks, it is detected as free and its worker node key is returned.
+   * If a worker is found with `0` executing tasks, it is detected as free and its worker node key is returned.
    *
    * If no free worker is found, `-1` is returned.
    *
index f171ab5f50b1a90d7ed48550f96dbb4d42b5e015..3c0856856189ab7d7af4c4f2a625ae9058ac9029 100644 (file)
@@ -27,7 +27,7 @@ export class LeastBusyWorkerChoiceStrategy<
     runTime: true,
     avgRunTime: false,
     medRunTime: false,
-    waitTime: false,
+    waitTime: true,
     avgWaitTime: false,
     medWaitTime: false,
     elu: false
@@ -54,14 +54,16 @@ export class LeastBusyWorkerChoiceStrategy<
 
   /** @inheritDoc */
   public choose (): number {
-    let minRunTime = Infinity
+    let minTime = Infinity
     let leastBusyWorkerNodeKey!: number
     for (const [workerNodeKey, workerNode] of this.pool.workerNodes.entries()) {
-      const workerRunTime = workerNode.workerUsage.runTime.aggregation
-      if (workerRunTime === 0) {
+      const workerTime =
+        workerNode.workerUsage.runTime.aggregation +
+        workerNode.workerUsage.waitTime.aggregation
+      if (workerTime === 0) {
         return workerNodeKey
-      } else if (workerRunTime < minRunTime) {
-        minRunTime = workerRunTime
+      } else if (workerTime < minTime) {
+        minTime = workerTime
         leastBusyWorkerNodeKey = workerNodeKey
       }
     }
index ee7834a6dc1bb12aeff39f25abf1f2b17f8dca49..528bca1dc683adf0ac25cdd8f53ba4a2737b2294 100644 (file)
@@ -51,7 +51,9 @@ export class LeastUsedWorkerChoiceStrategy<
     for (const [workerNodeKey, workerNode] of this.pool.workerNodes.entries()) {
       const workerTaskStatistics = workerNode.workerUsage.tasks
       const workerTasks =
-        workerTaskStatistics.executed + workerTaskStatistics.executing
+        workerTaskStatistics.executed +
+        workerTaskStatistics.executing +
+        workerTaskStatistics.queued
       if (workerTasks === 0) {
         return workerNodeKey
       } else if (workerTasks < minNumberOfTasks) {
index 57ff000330e29cc7dffb3541a65438c3edf65b80..bfbc2a84e376ac1db341614f6427943d5405285f 100644 (file)
@@ -39,10 +39,6 @@ export interface TaskPerformance {
    * Task runtime.
    */
   runTime?: number
-  /**
-   * Task wait time.
-   */
-  waitTime?: number
   /**
    * Task event loop utilization.
    */
@@ -54,7 +50,6 @@ export interface TaskPerformance {
  */
 export interface WorkerStatistics {
   runTime: boolean
-  waitTime: boolean
   elu: boolean
 }
 
index 76a7ee70ba96a93e6a92934a6b9cc54d81dbe821..a84bb0691fc588d3a8e51b0fe1696d7c85bb7dc9 100644 (file)
@@ -224,7 +224,7 @@ export abstract class AbstractWorker<
     message: MessageValue<Data>
   ): void {
     try {
-      let taskPerformance = this.beginTaskPerformance(message)
+      let taskPerformance = this.beginTaskPerformance()
       const res = fn(message.data)
       taskPerformance = this.endTaskPerformance(taskPerformance)
       this.sendToMainWorker({
@@ -256,7 +256,7 @@ export abstract class AbstractWorker<
     fn: WorkerAsyncFunction<Data, Response>,
     message: MessageValue<Data>
   ): void {
-    let taskPerformance = this.beginTaskPerformance(message)
+    let taskPerformance = this.beginTaskPerformance()
     fn(message.data)
       .then(res => {
         taskPerformance = this.endTaskPerformance(taskPerformance)
@@ -297,13 +297,9 @@ export abstract class AbstractWorker<
     return fn
   }
 
-  private beginTaskPerformance (message: MessageValue<Data>): TaskPerformance {
-    const timestamp = performance.now()
+  private beginTaskPerformance (): TaskPerformance {
     return {
-      timestamp,
-      ...(this.statistics.waitTime && {
-        waitTime: timestamp - (message.timestamp ?? timestamp)
-      }),
+      timestamp: performance.now(),
       ...(this.statistics.elu && { elu: performance.eventLoopUtilization() })
     }
   }
index e0cd392f0966769f197859b67bd53e4c1b8304f1..b71a90e5817b6304c2b19a745127c47bbaff7ac2 100644 (file)
@@ -3,11 +3,11 @@
  */
 export const KillBehaviors = Object.freeze({
   /**
-   * If `currentTime - lastActiveTime` is greater than `maxInactiveTime` but a task is still running, then the worker **wont** be deleted.
+   * If `currentTime - lastActiveTime` is greater than `maxInactiveTime` but a task is still executing, then the worker **wont** be deleted.
    */
   SOFT: 'SOFT',
   /**
-   * If `currentTime - lastActiveTime` is greater than `maxInactiveTime` but a task is still running, then the worker will be deleted.
+   * If `currentTime - lastActiveTime` is greater than `maxInactiveTime` but a task is still executing, then the worker will be deleted.
    */
   HARD: 'HARD'
 } as const)
@@ -59,8 +59,8 @@ export interface WorkerOptions {
   /**
    * `killBehavior` dictates if your async unit (worker/process) will be deleted in case that a task is active on it.
    *
-   * - SOFT: If `currentTime - lastActiveTime` is greater than `maxInactiveTime` but a task is still running, then the worker **won't** be deleted.
-   * - HARD: If `currentTime - lastActiveTime` is greater than `maxInactiveTime` but a task is still running, then the worker will be deleted.
+   * - SOFT: If `currentTime - lastActiveTime` is greater than `maxInactiveTime` but a task is still executing, then the worker **won't** be deleted.
+   * - HARD: If `currentTime - lastActiveTime` is greater than `maxInactiveTime` but a task is still executing, then the worker will be deleted.
    *
    * This option only apply to the newly created workers.
    *
index 291629f59b93f195eaf2d9b4a57f5e2649786af0..a5250e538a2eedf562d43981998adf4207092e72 100644 (file)
@@ -82,15 +82,15 @@ describe('Dynamic cluster pool test suite', () => {
     await pool1.destroy()
   })
 
-  it('Verify scale processes up and down is working when long running task is used:hard', async () => {
+  it('Verify scale processes up and down is working when long executing task is used:hard', async () => {
     const longRunningPool = new DynamicClusterPool(
       min,
       max,
       './tests/worker-files/cluster/longRunningWorkerHardBehavior.js',
       {
         errorHandler: e => console.error(e),
-        onlineHandler: () => console.log('long running worker is online'),
-        exitHandler: () => console.log('long running worker exited')
+        onlineHandler: () => console.log('long executing worker is online'),
+        exitHandler: () => console.log('long executing worker exited')
       }
     )
     expect(longRunningPool.workerNodes.length).toBe(min)
@@ -109,15 +109,15 @@ describe('Dynamic cluster pool test suite', () => {
     await longRunningPool.destroy()
   })
 
-  it('Verify scale processes up and down is working when long running task is used:soft', async () => {
+  it('Verify scale processes up and down is working when long executing task is used:soft', async () => {
     const longRunningPool = new DynamicClusterPool(
       min,
       max,
       './tests/worker-files/cluster/longRunningWorkerSoftBehavior.js',
       {
         errorHandler: e => console.error(e),
-        onlineHandler: () => console.log('long running worker is online'),
-        exitHandler: () => console.log('long running worker exited')
+        onlineHandler: () => console.log('long executing worker is online'),
+        exitHandler: () => console.log('long executing worker exited')
       }
     )
     expect(longRunningPool.workerNodes.length).toBe(min)
@@ -126,7 +126,7 @@ describe('Dynamic cluster pool test suite', () => {
     }
     expect(longRunningPool.workerNodes.length).toBe(max)
     await TestUtils.sleep(1500)
-    // Here we expect the workerNodes to be at the max size since the task is still running
+    // Here we expect the workerNodes to be at the max size since the task is still executing
     expect(longRunningPool.workerNodes.length).toBe(max)
     // We need to clean up the resources after our test
     await longRunningPool.destroy()
index 7a67f6c4b628922db22a760de4d3f5bafc117fea..809d5350fc28441f6ac05ccab93a052340698fe0 100644 (file)
@@ -446,7 +446,7 @@ describe('Selection strategies test suite', () => {
       runTime: true,
       avgRunTime: false,
       medRunTime: false,
-      waitTime: false,
+      waitTime: true,
       avgWaitTime: false,
       medWaitTime: false,
       elu: false
@@ -464,7 +464,7 @@ describe('Selection strategies test suite', () => {
       runTime: true,
       avgRunTime: false,
       medRunTime: false,
-      waitTime: false,
+      waitTime: true,
       avgWaitTime: false,
       medWaitTime: false,
       elu: false
@@ -501,7 +501,7 @@ describe('Selection strategies test suite', () => {
           history: expect.any(CircularArray)
         },
         waitTime: {
-          aggregation: 0,
+          aggregation: expect.any(Number),
           average: 0,
           median: 0,
           history: expect.any(CircularArray)
@@ -515,6 +515,9 @@ describe('Selection strategies test suite', () => {
       expect(workerNode.workerUsage.runTime.aggregation).toBeGreaterThanOrEqual(
         0
       )
+      expect(
+        workerNode.workerUsage.waitTime.aggregation
+      ).toBeGreaterThanOrEqual(0)
     }
     // We need to clean up the resources after our test
     await pool.destroy()
@@ -549,7 +552,7 @@ describe('Selection strategies test suite', () => {
           history: expect.any(CircularArray)
         },
         waitTime: {
-          aggregation: 0,
+          aggregation: expect.any(Number),
           average: 0,
           median: 0,
           history: expect.any(CircularArray)
@@ -561,6 +564,7 @@ describe('Selection strategies test suite', () => {
         max * maxMultiplier
       )
       expect(workerNode.workerUsage.runTime.aggregation).toBeGreaterThan(0)
+      expect(workerNode.workerUsage.waitTime.aggregation).toBeGreaterThan(0)
     }
     // We need to clean up the resources after our test
     await pool.destroy()
index 404f2113d95c3cbefc27ddd9efa8da08b1eeb1cd..1f2a66f2d74a0390aa48d64265040931bb234fd1 100644 (file)
@@ -82,15 +82,15 @@ describe('Dynamic thread pool test suite', () => {
     await pool1.destroy()
   })
 
-  it('Verify scale thread up and down is working when long running task is used:hard', async () => {
+  it('Verify scale thread up and down is working when long executing task is used:hard', async () => {
     const longRunningPool = new DynamicThreadPool(
       min,
       max,
       './tests/worker-files/thread/longRunningWorkerHardBehavior.js',
       {
         errorHandler: e => console.error(e),
-        onlineHandler: () => console.log('long running worker is online'),
-        exitHandler: () => console.log('long running worker exited')
+        onlineHandler: () => console.log('long executing worker is online'),
+        exitHandler: () => console.log('long executing worker exited')
       }
     )
     expect(longRunningPool.workerNodes.length).toBe(min)
@@ -109,15 +109,15 @@ describe('Dynamic thread pool test suite', () => {
     await longRunningPool.destroy()
   })
 
-  it('Verify scale thread up and down is working when long running task is used:soft', async () => {
+  it('Verify scale thread up and down is working when long executing task is used:soft', async () => {
     const longRunningPool = new DynamicThreadPool(
       min,
       max,
       './tests/worker-files/thread/longRunningWorkerSoftBehavior.js',
       {
         errorHandler: e => console.error(e),
-        onlineHandler: () => console.log('long running worker is online'),
-        exitHandler: () => console.log('long running worker exited')
+        onlineHandler: () => console.log('long executing worker is online'),
+        exitHandler: () => console.log('long executing worker exited')
       }
     )
     expect(longRunningPool.workerNodes.length).toBe(min)
@@ -126,7 +126,7 @@ describe('Dynamic thread pool test suite', () => {
     }
     expect(longRunningPool.workerNodes.length).toBe(max)
     await TestUtils.sleep(1500)
-    // Here we expect the workerNodes to be at the max size since the task is still running
+    // Here we expect the workerNodes to be at the max size since the task is still executing
     expect(longRunningPool.workerNodes.length).toBe(max)
     // We need to clean up the resources after our test
     await longRunningPool.destroy()