refactor: abstract out measurement statistics
[poolifier.git] / src / pools / abstract-pool.ts
index 85a47e38f7c248f7a92ff9e53166e2121048015c..51f77901383181f04725ca1bb84ec1c60f241855 100644 (file)
@@ -21,7 +21,7 @@ import {
   type TasksQueueOptions,
   type WorkerType
 } from './pool'
-import type { IWorker, Task, TasksUsage, WorkerNode } from './worker'
+import type { IWorker, Task, WorkerNode, WorkerUsage } from './worker'
 import {
   WorkerChoiceStrategies,
   type WorkerChoiceStrategy,
@@ -222,17 +222,26 @@ export abstract class AbstractPool<
       workerNodes: this.workerNodes.length,
       idleWorkerNodes: this.workerNodes.reduce(
         (accumulator, workerNode) =>
-          workerNode.tasksUsage.running === 0 ? accumulator + 1 : accumulator,
+          workerNode.workerUsage.tasks.executing === 0
+            ? accumulator + 1
+            : accumulator,
         0
       ),
       busyWorkerNodes: this.workerNodes.reduce(
         (accumulator, workerNode) =>
-          workerNode.tasksUsage.running > 0 ? accumulator + 1 : accumulator,
+          workerNode.workerUsage.tasks.executing > 0
+            ? accumulator + 1
+            : accumulator,
         0
       ),
-      runningTasks: this.workerNodes.reduce(
+      executedTasks: this.workerNodes.reduce(
         (accumulator, workerNode) =>
-          accumulator + workerNode.tasksUsage.running,
+          accumulator + workerNode.workerUsage.tasks.executed,
+        0
+      ),
+      executingTasks: this.workerNodes.reduce(
+        (accumulator, workerNode) =>
+          accumulator + workerNode.workerUsage.tasks.executing,
         0
       ),
       queuedTasks: this.workerNodes.reduce(
@@ -243,6 +252,11 @@ export abstract class AbstractPool<
         (accumulator, workerNode) =>
           accumulator + workerNode.tasksQueue.maxSize,
         0
+      ),
+      failedTasks: this.workerNodes.reduce(
+        (accumulator, workerNode) =>
+          accumulator + workerNode.workerUsage.tasks.failed,
+        0
       )
     }
   }
@@ -296,17 +310,27 @@ export abstract class AbstractPool<
     }
     for (const workerNode of this.workerNodes) {
       this.setWorkerNodeTasksUsage(workerNode, {
-        ran: 0,
-        running: 0,
-        runTime: 0,
-        runTimeHistory: new CircularArray(),
-        avgRunTime: 0,
-        medRunTime: 0,
-        waitTime: 0,
-        waitTimeHistory: new CircularArray(),
-        avgWaitTime: 0,
-        medWaitTime: 0,
-        error: 0,
+        tasks: {
+          executing: 0,
+          executed: 0,
+          queued:
+            this.opts.enableTasksQueue === true
+              ? workerNode.tasksQueue.size
+              : 0,
+          failed: 0
+        },
+        runTime: {
+          aggregation: 0,
+          average: 0,
+          median: 0,
+          history: new CircularArray()
+        },
+        waitTime: {
+          aggregation: 0,
+          average: 0,
+          median: 0,
+          history: new CircularArray()
+        },
         elu: undefined
       })
       this.setWorkerStatistics(workerNode.worker)
@@ -374,7 +398,7 @@ export abstract class AbstractPool<
   protected internalBusy (): boolean {
     return (
       this.workerNodes.findIndex(workerNode => {
-        return workerNode.tasksUsage.running === 0
+        return workerNode.workerUsage.tasks.executing === 0
       }) === -1
     )
   }
@@ -400,7 +424,7 @@ export abstract class AbstractPool<
     if (
       this.opts.enableTasksQueue === true &&
       (this.busy ||
-        this.workerNodes[workerNodeKey].tasksUsage.running >=
+        this.workerNodes[workerNodeKey].workerUsage.tasks.executing >=
           ((this.opts.tasksQueueOptions as TasksQueueOptions)
             .concurrency as number))
     ) {
@@ -454,7 +478,11 @@ export abstract class AbstractPool<
    * @param workerNodeKey - The worker node key.
    */
   protected beforeTaskExecutionHook (workerNodeKey: number): void {
-    ++this.workerNodes[workerNodeKey].tasksUsage.running
+    ++this.workerNodes[workerNodeKey].workerUsage.tasks.executing
+    if (this.opts.enableTasksQueue === true) {
+      this.workerNodes[workerNodeKey].workerUsage.tasks.queued =
+        this.tasksQueueSize(workerNodeKey)
+    }
   }
 
   /**
@@ -468,66 +496,68 @@ export abstract class AbstractPool<
     worker: Worker,
     message: MessageValue<Response>
   ): void {
-    const workerTasksUsage =
-      this.workerNodes[this.getWorkerNodeKey(worker)].tasksUsage
-    --workerTasksUsage.running
-    ++workerTasksUsage.ran
-    if (message.error != null) {
-      ++workerTasksUsage.error
+    const workerUsage =
+      this.workerNodes[this.getWorkerNodeKey(worker)].workerUsage
+    const workerTaskStatistics = workerUsage.tasks
+    --workerTaskStatistics.executing
+    ++workerTaskStatistics.executed
+    if (message.taskError != null) {
+      ++workerTaskStatistics.failed
     }
-    this.updateRunTimeTasksUsage(workerTasksUsage, message)
-    this.updateWaitTimeTasksUsage(workerTasksUsage, message)
-    this.updateEluTasksUsage(workerTasksUsage, message)
+
+    this.updateRunTimeWorkerUsage(workerUsage, message)
+    this.updateWaitTimeWorkerUsage(workerUsage, message)
+    this.updateEluWorkerUsage(workerUsage, message)
   }
 
-  private updateRunTimeTasksUsage (
-    workerTasksUsage: TasksUsage,
+  private updateRunTimeWorkerUsage (
+    workerUsage: WorkerUsage,
     message: MessageValue<Response>
   ): void {
     if (this.workerChoiceStrategyContext.getTaskStatistics().runTime) {
-      workerTasksUsage.runTime += message.taskPerformance?.runTime ?? 0
+      workerUsage.runTime.aggregation += message.taskPerformance?.runTime ?? 0
       if (
         this.workerChoiceStrategyContext.getTaskStatistics().avgRunTime &&
-        workerTasksUsage.ran !== 0
+        workerUsage.tasks.executed !== 0
       ) {
-        workerTasksUsage.avgRunTime =
-          workerTasksUsage.runTime / workerTasksUsage.ran
+        workerUsage.runTime.average =
+          workerUsage.runTime.aggregation / workerUsage.tasks.executed
       }
       if (
         this.workerChoiceStrategyContext.getTaskStatistics().medRunTime &&
         message.taskPerformance?.runTime != null
       ) {
-        workerTasksUsage.runTimeHistory.push(message.taskPerformance.runTime)
-        workerTasksUsage.medRunTime = median(workerTasksUsage.runTimeHistory)
+        workerUsage.runTime.history.push(message.taskPerformance.runTime)
+        workerUsage.runTime.median = median(workerUsage.runTime.history)
       }
     }
   }
 
-  private updateWaitTimeTasksUsage (
-    workerTasksUsage: TasksUsage,
+  private updateWaitTimeWorkerUsage (
+    workerUsage: WorkerUsage,
     message: MessageValue<Response>
   ): void {
     if (this.workerChoiceStrategyContext.getTaskStatistics().waitTime) {
-      workerTasksUsage.waitTime += message.taskPerformance?.waitTime ?? 0
+      workerUsage.waitTime.aggregation += message.taskPerformance?.waitTime ?? 0
       if (
         this.workerChoiceStrategyContext.getTaskStatistics().avgWaitTime &&
-        workerTasksUsage.ran !== 0
+        workerUsage.tasks.executed !== 0
       ) {
-        workerTasksUsage.avgWaitTime =
-          workerTasksUsage.waitTime / workerTasksUsage.ran
+        workerUsage.waitTime.average =
+          workerUsage.waitTime.aggregation / workerUsage.tasks.executed
       }
       if (
         this.workerChoiceStrategyContext.getTaskStatistics().medWaitTime &&
         message.taskPerformance?.waitTime != null
       ) {
-        workerTasksUsage.waitTimeHistory.push(message.taskPerformance.waitTime)
-        workerTasksUsage.medWaitTime = median(workerTasksUsage.waitTimeHistory)
+        workerUsage.waitTime.history.push(message.taskPerformance.waitTime)
+        workerUsage.waitTime.median = median(workerUsage.waitTime.history)
       }
     }
   }
 
-  private updateEluTasksUsage (
-    workerTasksUsage: TasksUsage,
+  private updateEluWorkerUsage (
+    workerTasksUsage: WorkerUsage,
     message: MessageValue<Response>
   ): void {
     if (this.workerChoiceStrategyContext.getTaskStatistics().elu) {
@@ -566,7 +596,8 @@ export abstract class AbstractPool<
         if (
           isKillBehavior(KillBehaviors.HARD, message.kill) ||
           (message.kill != null &&
-            this.workerNodes[currentWorkerNodeKey].tasksUsage.running === 0)
+            this.workerNodes[currentWorkerNodeKey].workerUsage.tasks
+              .executing === 0)
         ) {
           // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
           this.flushTasksQueue(currentWorkerNodeKey)
@@ -662,13 +693,10 @@ export abstract class AbstractPool<
         // Task execution response received
         const promiseResponse = this.promiseResponseMap.get(message.id)
         if (promiseResponse != null) {
-          if (message.error != null) {
-            promiseResponse.reject(message.error)
+          if (message.taskError != null) {
+            promiseResponse.reject(message.taskError.message)
             if (this.emitter != null) {
-              this.emitter.emit(PoolEvents.taskError, {
-                error: message.error,
-                errorData: message.errorData
-              })
+              this.emitter.emit(PoolEvents.taskError, message.taskError)
             }
           } else {
             promiseResponse.resolve(message.data as Response)
@@ -705,13 +733,13 @@ export abstract class AbstractPool<
    * Sets the given worker node its tasks usage in the pool.
    *
    * @param workerNode - The worker node.
-   * @param tasksUsage - The worker node tasks usage.
+   * @param workerUsage - The worker usage.
    */
   private setWorkerNodeTasksUsage (
     workerNode: WorkerNode<Worker, Data>,
-    tasksUsage: TasksUsage
+    workerUsage: WorkerUsage
   ): void {
-    workerNode.tasksUsage = tasksUsage
+    workerNode.workerUsage = workerUsage
   }
 
   /**
@@ -723,18 +751,26 @@ export abstract class AbstractPool<
   private pushWorkerNode (worker: Worker): number {
     return this.workerNodes.push({
       worker,
-      tasksUsage: {
-        ran: 0,
-        running: 0,
-        runTime: 0,
-        runTimeHistory: new CircularArray(),
-        avgRunTime: 0,
-        medRunTime: 0,
-        waitTime: 0,
-        waitTimeHistory: new CircularArray(),
-        avgWaitTime: 0,
-        medWaitTime: 0,
-        error: 0,
+      workerUsage: {
+        tasks: {
+          executed: 0,
+          executing: 0,
+          queued: 0,
+          failed: 0
+        },
+        runTime: {
+          aggregation: 0,
+          average: 0,
+          median: 0,
+          history: new CircularArray()
+        },
+
+        waitTime: {
+          aggregation: 0,
+          average: 0,
+          median: 0,
+          history: new CircularArray()
+        },
         elu: undefined
       },
       tasksQueue: new Queue<Task<Data>>()
@@ -746,18 +782,18 @@ export abstract class AbstractPool<
    *
    * @param workerNodeKey - The worker node key.
    * @param worker - The worker.
-   * @param tasksUsage - The worker tasks usage.
+   * @param workerUsage - The worker usage.
    * @param tasksQueue - The worker task queue.
    */
   private setWorkerNode (
     workerNodeKey: number,
     worker: Worker,
-    tasksUsage: TasksUsage,
+    workerUsage: WorkerUsage,
     tasksQueue: Queue<Task<Data>>
   ): void {
     this.workerNodes[workerNodeKey] = {
       worker,
-      tasksUsage,
+      workerUsage,
       tasksQueue
     }
   }