feat: use SMA and SMM for worker tasks usage
[poolifier.git] / src / pools / abstract-pool.ts
index 507fcc10e91e809b13948b85325e413c1ccac621..a11a574147cf074dc938d1e0d47b3797e6e15ad2 100644 (file)
@@ -12,6 +12,7 @@ import {
   DEFAULT_TASK_NAME,
   DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
   EMPTY_FUNCTION,
+  average,
   isKillBehavior,
   isPlainObject,
   median,
@@ -427,16 +428,13 @@ export abstract class AbstractPool<
             )
           ),
           average: round(
-            this.workerNodes.reduce(
-              (accumulator, workerNode) =>
-                accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
-              0
-            ) /
-              this.workerNodes.reduce(
+            average(
+              this.workerNodes.reduce<number[]>(
                 (accumulator, workerNode) =>
-                  accumulator + (workerNode.usage.tasks?.executed ?? 0),
-                0
+                  accumulator.concat(workerNode.usage.runTime.history),
+                []
               )
+            )
           ),
           ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
             .runTime.median && {
@@ -468,16 +466,13 @@ export abstract class AbstractPool<
             )
           ),
           average: round(
-            this.workerNodes.reduce(
-              (accumulator, workerNode) =>
-                accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
-              0
-            ) /
-              this.workerNodes.reduce(
+            average(
+              this.workerNodes.reduce<number[]>(
                 (accumulator, workerNode) =>
-                  accumulator + (workerNode.usage.tasks?.executed ?? 0),
-                0
+                  accumulator.concat(workerNode.usage.waitTime.history),
+                []
               )
+            )
           ),
           ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
             .waitTime.median && {
@@ -940,11 +935,13 @@ export abstract class AbstractPool<
     workerUsage: WorkerUsage,
     message: MessageValue<Response>
   ): void {
+    if (message.taskError != null) {
+      return
+    }
     updateMeasurementStatistics(
       workerUsage.runTime,
       this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
-      message.taskPerformance?.runTime ?? 0,
-      workerUsage.tasks.executed
+      message.taskPerformance?.runTime ?? 0
     )
   }
 
@@ -957,8 +954,7 @@ export abstract class AbstractPool<
     updateMeasurementStatistics(
       workerUsage.waitTime,
       this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
-      taskWaitTime,
-      workerUsage.tasks.executed
+      taskWaitTime
     )
   }
 
@@ -966,19 +962,20 @@ export abstract class AbstractPool<
     workerUsage: WorkerUsage,
     message: MessageValue<Response>
   ): void {
+    if (message.taskError != null) {
+      return
+    }
     const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
       this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
     updateMeasurementStatistics(
       workerUsage.elu.active,
       eluTaskStatisticsRequirements,
-      message.taskPerformance?.elu?.active ?? 0,
-      workerUsage.tasks.executed
+      message.taskPerformance?.elu?.active ?? 0
     )
     updateMeasurementStatistics(
       workerUsage.elu.idle,
       eluTaskStatisticsRequirements,
-      message.taskPerformance?.elu?.idle ?? 0,
-      workerUsage.tasks.executed
+      message.taskPerformance?.elu?.idle ?? 0
     )
     if (eluTaskStatisticsRequirements.aggregate) {
       if (message.taskPerformance?.elu != null) {