Merge pull request #773 from poolifier/elu-strategy
[poolifier.git] / src / pools / abstract-pool.ts
index e941fabb772690f0797c204226430f113ea97d43..3c690fb819e9c718163e86badcde28786a5dd849 100644 (file)
@@ -21,7 +21,13 @@ import {
   type TasksQueueOptions,
   type WorkerType
 } from './pool'
-import type { IWorker, Task, WorkerNode, WorkerUsage } from './worker'
+import type {
+  IWorker,
+  Task,
+  TaskStatistics,
+  WorkerNode,
+  WorkerUsage
+} from './worker'
 import {
   WorkerChoiceStrategies,
   type WorkerChoiceStrategy,
@@ -309,30 +315,10 @@ export abstract class AbstractPool<
       this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
     }
     for (const workerNode of this.workerNodes) {
-      this.setWorkerNodeTasksUsage(workerNode, {
-        tasks: {
-          executed: 0,
-          executing: 0,
-          queued:
-            this.opts.enableTasksQueue === true
-              ? workerNode.tasksQueue.size
-              : 0,
-          failed: 0
-        },
-        runTime: {
-          aggregation: 0,
-          average: 0,
-          median: 0,
-          history: new CircularArray()
-        },
-        waitTime: {
-          aggregation: 0,
-          average: 0,
-          median: 0,
-          history: new CircularArray()
-        },
-        elu: undefined
-      })
+      this.setWorkerNodeTasksUsage(
+        workerNode,
+        this.getWorkerUsage(workerNode.worker)
+      )
       this.setWorkerStatistics(workerNode.worker)
     }
   }
@@ -476,13 +462,15 @@ export abstract class AbstractPool<
    * Can be overridden.
    *
    * @param workerNodeKey - The worker node key.
+   * @param task - The task to execute.
    */
-  protected beforeTaskExecutionHook (workerNodeKey: number): void {
-    ++this.workerNodes[workerNodeKey].workerUsage.tasks.executing
-    if (this.opts.enableTasksQueue === true) {
-      this.workerNodes[workerNodeKey].workerUsage.tasks.queued =
-        this.tasksQueueSize(workerNodeKey)
-    }
+  protected beforeTaskExecutionHook (
+    workerNodeKey: number,
+    task: Task<Data>
+  ): void {
+    const workerUsage = this.workerNodes[workerNodeKey].workerUsage
+    ++workerUsage.tasks.executing
+    this.updateWaitTimeWorkerUsage(workerUsage, task)
   }
 
   /**
@@ -504,9 +492,7 @@ export abstract class AbstractPool<
     if (message.taskError != null) {
       ++workerTaskStatistics.failed
     }
-
     this.updateRunTimeWorkerUsage(workerUsage, message)
-    this.updateWaitTimeWorkerUsage(workerUsage, message)
     this.updateEluWorkerUsage(workerUsage, message)
   }
 
@@ -516,19 +502,20 @@ export abstract class AbstractPool<
   ): void {
     if (
       this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
+        .aggregate
     ) {
-      workerUsage.runTime.aggregation += message.taskPerformance?.runTime ?? 0
+      workerUsage.runTime.aggregate += message.taskPerformance?.runTime ?? 0
       if (
-        this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
-          .avgRunTime &&
+        this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
+          .average &&
         workerUsage.tasks.executed !== 0
       ) {
         workerUsage.runTime.average =
-          workerUsage.runTime.aggregation / workerUsage.tasks.executed
+          workerUsage.runTime.aggregate / workerUsage.tasks.executed
       }
       if (
-        this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
-          .medRunTime &&
+        this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
+          .median &&
         message.taskPerformance?.runTime != null
       ) {
         workerUsage.runTime.history.push(message.taskPerformance.runTime)
@@ -539,26 +526,29 @@ export abstract class AbstractPool<
 
   private updateWaitTimeWorkerUsage (
     workerUsage: WorkerUsage,
-    message: MessageValue<Response>
+    task: Task<Data>
   ): void {
+    const timestamp = performance.now()
+    const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
     if (
       this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime
+        .aggregate
     ) {
-      workerUsage.waitTime.aggregation += message.taskPerformance?.waitTime ?? 0
+      workerUsage.waitTime.aggregate += taskWaitTime ?? 0
       if (
         this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
-          .avgWaitTime &&
+          .waitTime.average &&
         workerUsage.tasks.executed !== 0
       ) {
         workerUsage.waitTime.average =
-          workerUsage.waitTime.aggregation / workerUsage.tasks.executed
+          workerUsage.waitTime.aggregate / workerUsage.tasks.executed
       }
       if (
         this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
-          .medWaitTime &&
-        message.taskPerformance?.waitTime != null
+          .waitTime.median &&
+        taskWaitTime != null
       ) {
-        workerUsage.waitTime.history.push(message.taskPerformance.waitTime)
+        workerUsage.waitTime.history.push(taskWaitTime)
         workerUsage.waitTime.median = median(workerUsage.waitTime.history)
       }
     }
@@ -759,52 +749,31 @@ export abstract class AbstractPool<
   private pushWorkerNode (worker: Worker): number {
     return this.workerNodes.push({
       worker,
-      workerUsage: {
-        tasks: {
-          executed: 0,
-          executing: 0,
-          queued: 0,
-          failed: 0
-        },
-        runTime: {
-          aggregation: 0,
-          average: 0,
-          median: 0,
-          history: new CircularArray()
-        },
-
-        waitTime: {
-          aggregation: 0,
-          average: 0,
-          median: 0,
-          history: new CircularArray()
-        },
-        elu: undefined
-      },
+      workerUsage: this.getWorkerUsage(worker),
       tasksQueue: new Queue<Task<Data>>()
     })
   }
 
-  /**
-   * Sets the given worker in the pool worker nodes.
-   *
-   * @param workerNodeKey - The worker node key.
-   * @param worker - The worker.
-   * @param workerUsage - The worker usage.
-   * @param tasksQueue - The worker task queue.
-   */
-  private setWorkerNode (
-    workerNodeKey: number,
-    worker: Worker,
-    workerUsage: WorkerUsage,
-    tasksQueue: Queue<Task<Data>>
-  ): void {
-    this.workerNodes[workerNodeKey] = {
-      worker,
-      workerUsage,
-      tasksQueue
-    }
-  }
+  // /**
+  //  * Sets the given worker in the pool worker nodes.
+  //  *
+  //  * @param workerNodeKey - The worker node key.
+  //  * @param worker - The worker.
+  //  * @param workerUsage - The worker usage.
+  //  * @param tasksQueue - The worker task queue.
+  //  */
+  // private setWorkerNode (
+  //   workerNodeKey: number,
+  //   worker: Worker,
+  //   workerUsage: WorkerUsage,
+  //   tasksQueue: Queue<Task<Data>>
+  // ): void {
+  //   this.workerNodes[workerNodeKey] = {
+  //     worker,
+  //     workerUsage,
+  //     tasksQueue
+  //   }
+  // }
 
   /**
    * Removes the given worker from the pool worker nodes.
@@ -820,7 +789,7 @@ export abstract class AbstractPool<
   }
 
   private executeTask (workerNodeKey: number, task: Task<Data>): void {
-    this.beforeTaskExecutionHook(workerNodeKey)
+    this.beforeTaskExecutionHook(workerNodeKey, task)
     this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
   }
 
@@ -858,13 +827,42 @@ export abstract class AbstractPool<
       statistics: {
         runTime:
           this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
-            .runTime,
-        waitTime:
-          this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
-            .waitTime,
+            .runTime.aggregate,
         elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
           .elu
       }
     })
   }
+
+  private getWorkerUsage (worker: Worker): WorkerUsage {
+    return {
+      tasks: this.getTaskStatistics(worker),
+      runTime: {
+        aggregate: 0,
+        average: 0,
+        median: 0,
+        history: new CircularArray()
+      },
+      waitTime: {
+        aggregate: 0,
+        average: 0,
+        median: 0,
+        history: new CircularArray()
+      },
+      elu: undefined
+    }
+  }
+
+  private getTaskStatistics (worker: Worker): TaskStatistics {
+    const queueSize =
+      this.workerNodes[this.getWorkerNodeKey(worker)]?.tasksQueue?.size
+    return {
+      executed: 0,
+      executing: 0,
+      get queued (): number {
+        return queueSize ?? 0
+      },
+      failed: 0
+    }
+  }
 }