feat: account tasks wait time more accurately
[poolifier.git] / src / pools / abstract-pool.ts
index 4c82ef08a8cdad70602ebf20a64235b1cb3833f0..81012fa0b4f736559fca73eb395bf7c622bce48a 100644 (file)
@@ -263,6 +263,10 @@ export abstract class AbstractPool<
         runTimeHistory: new CircularArray(),
         avgRunTime: 0,
         medRunTime: 0,
+        waitTime: 0,
+        waitTimeHistory: new CircularArray(),
+        avgWaitTime: 0,
+        medWaitTime: 0,
         error: 0
       })
     }
@@ -340,11 +344,13 @@ export abstract class AbstractPool<
 
   /** @inheritDoc */
   public async execute (data?: Data, name?: string): Promise<Response> {
+    const submissionTimestamp = performance.now()
     const workerNodeKey = this.chooseWorkerNode()
     const submittedTask: Task<Data> = {
       name,
       // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
       data: data ?? ({} as Data),
+      submissionTimestamp,
       id: crypto.randomUUID()
     }
     const res = new Promise<Response>((resolve, reject) => {
@@ -409,7 +415,10 @@ export abstract class AbstractPool<
    *
    * @param workerNodeKey - The worker node key.
    */
-  protected beforeTaskExecutionHook (workerNodeKey: number): void {
+  protected beforeTaskExecutionHook (
+    workerNodeKey: number,
+    task: Task<Data>
+  ): void {
     ++this.workerNodes[workerNodeKey].tasksUsage.running
   }
 
@@ -448,6 +457,23 @@ export abstract class AbstractPool<
         workerTasksUsage.medRunTime = median(workerTasksUsage.runTimeHistory)
       }
     }
+    if (this.workerChoiceStrategyContext.getRequiredStatistics().waitTime) {
+      workerTasksUsage.waitTime += message.waitTime ?? 0
+      if (
+        this.workerChoiceStrategyContext.getRequiredStatistics().avgWaitTime &&
+        workerTasksUsage.run !== 0
+      ) {
+        workerTasksUsage.avgWaitTime =
+          workerTasksUsage.waitTime / workerTasksUsage.run
+      }
+      if (
+        this.workerChoiceStrategyContext.getRequiredStatistics().medWaitTime &&
+        message.waitTime != null
+      ) {
+        workerTasksUsage.waitTimeHistory.push(message.waitTime)
+        workerTasksUsage.medWaitTime = median(workerTasksUsage.waitTimeHistory)
+      }
+    }
   }
 
   /**
@@ -611,6 +637,10 @@ export abstract class AbstractPool<
         runTimeHistory: new CircularArray(),
         avgRunTime: 0,
         medRunTime: 0,
+        waitTime: 0,
+        waitTimeHistory: new CircularArray(),
+        avgWaitTime: 0,
+        medWaitTime: 0,
         error: 0
       },
       tasksQueue: new Queue<Task<Data>>()
@@ -650,7 +680,7 @@ export abstract class AbstractPool<
   }
 
   private executeTask (workerNodeKey: number, task: Task<Data>): void {
-    this.beforeTaskExecutionHook(workerNodeKey)
+    this.beforeTaskExecutionHook(workerNodeKey, task)
     this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
   }