refactor: code cleanups
[poolifier.git] / src / pools / abstract-pool.ts
index d65ac2ffe7a92775e428e1a28ba8867a681ec136..a23c7983bfc652fbfcb8ece4a75853dbed270ac5 100644 (file)
@@ -327,7 +327,7 @@ export abstract class AbstractPool<
         )
       }),
       busyWorkerNodes: this.workerNodes.reduce(
-        (accumulator, _workerNode, workerNodeKey) =>
+        (accumulator, _, workerNodeKey) =>
           this.isWorkerNodeBusy(workerNodeKey) ? accumulator + 1 : accumulator,
         0
       ),
@@ -376,14 +376,16 @@ export abstract class AbstractPool<
           minimum: round(
             min(
               ...this.workerNodes.map(
-                workerNode => workerNode.usage.runTime.minimum ?? Infinity
+                workerNode =>
+                  workerNode.usage.runTime.minimum ?? Number.POSITIVE_INFINITY
               )
             )
           ),
           maximum: round(
             max(
               ...this.workerNodes.map(
-                workerNode => workerNode.usage.runTime.maximum ?? -Infinity
+                workerNode =>
+                  workerNode.usage.runTime.maximum ?? Number.NEGATIVE_INFINITY
               )
             )
           ),
@@ -419,14 +421,16 @@ export abstract class AbstractPool<
           minimum: round(
             min(
               ...this.workerNodes.map(
-                workerNode => workerNode.usage.waitTime.minimum ?? Infinity
+                workerNode =>
+                  workerNode.usage.waitTime.minimum ?? Number.POSITIVE_INFINITY
               )
             )
           ),
           maximum: round(
             max(
               ...this.workerNodes.map(
-                workerNode => workerNode.usage.waitTime.maximum ?? -Infinity
+                workerNode =>
+                  workerNode.usage.waitTime.maximum ?? Number.NEGATIVE_INFINITY
               )
             )
           ),
@@ -455,6 +459,115 @@ export abstract class AbstractPool<
             )
           })
         }
+      }),
+      ...(this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
+        .elu.aggregate === true && {
+        elu: {
+          idle: {
+            minimum: round(
+              min(
+                ...this.workerNodes.map(
+                  workerNode =>
+                    workerNode.usage.elu.idle.minimum ??
+                    Number.POSITIVE_INFINITY
+                )
+              )
+            ),
+            maximum: round(
+              max(
+                ...this.workerNodes.map(
+                  workerNode =>
+                    workerNode.usage.elu.idle.maximum ??
+                    Number.NEGATIVE_INFINITY
+                )
+              )
+            ),
+            ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
+              .elu.average && {
+              average: round(
+                average(
+                  this.workerNodes.reduce<number[]>(
+                    (accumulator, workerNode) =>
+                      accumulator.concat(workerNode.usage.elu.idle.history),
+                    []
+                  )
+                )
+              )
+            }),
+            ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
+              .elu.median && {
+              median: round(
+                median(
+                  this.workerNodes.reduce<number[]>(
+                    (accumulator, workerNode) =>
+                      accumulator.concat(workerNode.usage.elu.idle.history),
+                    []
+                  )
+                )
+              )
+            })
+          },
+          active: {
+            minimum: round(
+              min(
+                ...this.workerNodes.map(
+                  workerNode =>
+                    workerNode.usage.elu.active.minimum ??
+                    Number.POSITIVE_INFINITY
+                )
+              )
+            ),
+            maximum: round(
+              max(
+                ...this.workerNodes.map(
+                  workerNode =>
+                    workerNode.usage.elu.active.maximum ??
+                    Number.NEGATIVE_INFINITY
+                )
+              )
+            ),
+            ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
+              .elu.average && {
+              average: round(
+                average(
+                  this.workerNodes.reduce<number[]>(
+                    (accumulator, workerNode) =>
+                      accumulator.concat(workerNode.usage.elu.active.history),
+                    []
+                  )
+                )
+              )
+            }),
+            ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
+              .elu.median && {
+              median: round(
+                median(
+                  this.workerNodes.reduce<number[]>(
+                    (accumulator, workerNode) =>
+                      accumulator.concat(workerNode.usage.elu.active.history),
+                    []
+                  )
+                )
+              )
+            })
+          },
+          utilization: {
+            average: round(
+              average(
+                this.workerNodes.map(
+                  workerNode => workerNode.usage.elu.utilization ?? 0
+                )
+              )
+            ),
+            median: round(
+              median(
+                this.workerNodes.map(
+                  workerNode => workerNode.usage.elu.utilization ?? 0
+                )
+              )
+            )
+          }
+        }
       })
     }
   }
@@ -571,7 +684,7 @@ export abstract class AbstractPool<
     }
     if (requireSync) {
       this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies(
-        this.getWorkerWorkerChoiceStrategies(),
+        this.getWorkerChoiceStrategies(),
         this.opts.workerChoiceStrategyOptions
       )
       for (const workerNodeKey of this.workerNodes.keys()) {
@@ -591,7 +704,7 @@ export abstract class AbstractPool<
         this.opts.workerChoiceStrategyOptions
       )
       this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies(
-        this.getWorkerWorkerChoiceStrategies(),
+        this.getWorkerChoiceStrategies(),
         this.opts.workerChoiceStrategyOptions
       )
       for (const workerNodeKey of this.workerNodes.keys()) {
@@ -871,7 +984,7 @@ export abstract class AbstractPool<
     })
     this.taskFunctions.set(name, fn)
     this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies(
-      this.getWorkerWorkerChoiceStrategies()
+      this.getWorkerChoiceStrategies()
     )
     for (const workerNodeKey of this.workerNodes.keys()) {
       this.sendStatisticsMessageToWorker(workerNodeKey)
@@ -898,7 +1011,7 @@ export abstract class AbstractPool<
     }
     this.taskFunctions.delete(name)
     this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies(
-      this.getWorkerWorkerChoiceStrategies()
+      this.getWorkerChoiceStrategies()
     )
     for (const workerNodeKey of this.workerNodes.keys()) {
       this.sendStatisticsMessageToWorker(workerNodeKey)
@@ -920,20 +1033,48 @@ export abstract class AbstractPool<
   }
 
   /**
-   * Gets task function strategy, if any.
+   * Gets task function worker choice strategy, if any.
    *
    * @param name - The task function name.
    * @returns The task function worker choice strategy if the task function worker choice strategy is defined, `undefined` otherwise.
    */
-  private readonly getTaskFunctionWorkerWorkerChoiceStrategy = (
+  private readonly getTaskFunctionWorkerChoiceStrategy = (
     name?: string
   ): WorkerChoiceStrategy | undefined => {
-    if (name != null) {
-      return this.listTaskFunctionsProperties().find(
-        (taskFunctionProperties: TaskFunctionProperties) =>
-          taskFunctionProperties.name === name
-      )?.strategy
+    name = name ?? DEFAULT_TASK_NAME
+    const taskFunctionsProperties = this.listTaskFunctionsProperties()
+    if (name === DEFAULT_TASK_NAME) {
+      name = taskFunctionsProperties[1]?.name
     }
+    return taskFunctionsProperties.find(
+      (taskFunctionProperties: TaskFunctionProperties) =>
+        taskFunctionProperties.name === name
+    )?.strategy
+  }
+
+  /**
+   * Gets worker node task function worker choice strategy, if any.
+   *
+   * @param workerNodeKey - The worker node key.
+   * @param name - The task function name.
+   * @returns The worker node task function worker choice strategy if the worker node task function worker choice strategy is defined, `undefined` otherwise.
+   */
+  private readonly getWorkerNodeTaskFunctionWorkerChoiceStrategy = (
+    workerNodeKey: number,
+    name?: string
+  ): WorkerChoiceStrategy | undefined => {
+    const workerInfo = this.getWorkerInfo(workerNodeKey)
+    if (workerInfo == null) {
+      return
+    }
+    name = name ?? DEFAULT_TASK_NAME
+    if (name === DEFAULT_TASK_NAME) {
+      name = workerInfo.taskFunctionsProperties?.[1]?.name
+    }
+    return workerInfo.taskFunctionsProperties?.find(
+      (taskFunctionProperties: TaskFunctionProperties) =>
+        taskFunctionProperties.name === name
+    )?.strategy
   }
 
   /**
@@ -941,18 +1082,24 @@ export abstract class AbstractPool<
    *
    * @param workerNodeKey - The worker node key.
    * @param name - The task function name.
-   * @returns The task function worker choice priority if the task function worker choice priority is defined, `undefined` otherwise.
+   * @returns The worker node task function priority if the worker node task function priority is defined, `undefined` otherwise.
    */
   private readonly getWorkerNodeTaskFunctionPriority = (
     workerNodeKey: number,
     name?: string
   ): number | undefined => {
-    if (name != null) {
-      return this.getWorkerInfo(workerNodeKey)?.taskFunctionsProperties?.find(
-        (taskFunctionProperties: TaskFunctionProperties) =>
-          taskFunctionProperties.name === name
-      )?.priority
+    const workerInfo = this.getWorkerInfo(workerNodeKey)
+    if (workerInfo == null) {
+      return
     }
+    name = name ?? DEFAULT_TASK_NAME
+    if (name === DEFAULT_TASK_NAME) {
+      name = workerInfo.taskFunctionsProperties?.[1]?.name
+    }
+    return workerInfo.taskFunctionsProperties?.find(
+      (taskFunctionProperties: TaskFunctionProperties) =>
+        taskFunctionProperties.name === name
+    )?.priority
   }
 
   /**
@@ -960,7 +1107,7 @@ export abstract class AbstractPool<
    *
    * @returns The worker choice strategies.
    */
-  private readonly getWorkerWorkerChoiceStrategies =
+  private readonly getWorkerChoiceStrategies =
     (): Set<WorkerChoiceStrategy> => {
       return new Set([
         // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
@@ -1028,15 +1175,16 @@ export abstract class AbstractPool<
         return
       }
       const timestamp = performance.now()
-      const taskFunctionStrategy =
-        this.getTaskFunctionWorkerWorkerChoiceStrategy(name)
-      const workerNodeKey = this.chooseWorkerNode(taskFunctionStrategy)
+      const workerNodeKey = this.chooseWorkerNode(name)
       const task: Task<Data> = {
         name: name ?? DEFAULT_TASK_NAME,
         // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
         data: data ?? ({} as Data),
         priority: this.getWorkerNodeTaskFunctionPriority(workerNodeKey, name),
-        strategy: taskFunctionStrategy,
+        strategy: this.getWorkerNodeTaskFunctionWorkerChoiceStrategy(
+          workerNodeKey,
+          name
+        ),
         transferList,
         timestamp,
         taskId: randomUUID()
@@ -1115,7 +1263,7 @@ export abstract class AbstractPool<
     }
     this.destroying = true
     await Promise.all(
-      this.workerNodes.map(async (_workerNode, workerNodeKey) => {
+      this.workerNodes.map(async (_, workerNodeKey) => {
         await this.destroyWorkerNode(workerNodeKey)
       })
     )
@@ -1306,14 +1454,12 @@ export abstract class AbstractPool<
   }
 
   /**
-   * Chooses a worker node for the next task given the worker choice strategy.
+   * Chooses a worker node for the next task.
    *
-   * @param workerChoiceStrategy - The worker choice strategy.
+   * @param name - The task function name.
    * @returns The chosen worker node key
    */
-  private chooseWorkerNode (
-    workerChoiceStrategy?: WorkerChoiceStrategy
-  ): number {
+  private chooseWorkerNode (name?: string): number {
     if (this.shallCreateDynamicWorker()) {
       const workerNodeKey = this.createAndSetupDynamicWorkerNode()
       if (
@@ -1324,7 +1470,9 @@ export abstract class AbstractPool<
       }
     }
     // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
-    return this.workerChoiceStrategiesContext!.execute(workerChoiceStrategy)
+    return this.workerChoiceStrategiesContext!.execute(
+      this.getTaskFunctionWorkerChoiceStrategy(name)
+    )
   }
 
   /**
@@ -1359,7 +1507,8 @@ export abstract class AbstractPool<
     ) {
       workerNode.usage.runTime.aggregate = min(
         ...this.workerNodes.map(
-          workerNode => workerNode.usage.runTime.aggregate ?? Infinity
+          workerNode =>
+            workerNode.usage.runTime.aggregate ?? Number.POSITIVE_INFINITY
         )
       )
     }
@@ -1369,7 +1518,8 @@ export abstract class AbstractPool<
     ) {
       workerNode.usage.waitTime.aggregate = min(
         ...this.workerNodes.map(
-          workerNode => workerNode.usage.waitTime.aggregate ?? Infinity
+          workerNode =>
+            workerNode.usage.waitTime.aggregate ?? Number.POSITIVE_INFINITY
         )
       )
     }
@@ -1379,7 +1529,8 @@ export abstract class AbstractPool<
     ) {
       workerNode.usage.elu.active.aggregate = min(
         ...this.workerNodes.map(
-          workerNode => workerNode.usage.elu.active.aggregate ?? Infinity
+          workerNode =>
+            workerNode.usage.elu.active.aggregate ?? Number.POSITIVE_INFINITY
         )
       )
     }
@@ -1461,6 +1612,7 @@ export abstract class AbstractPool<
       const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
         message.workerId
       )
+      const workerInfo = this.getWorkerInfo(localWorkerNodeKey)
       const workerUsage = this.workerNodes[localWorkerNodeKey]?.usage
       // Kill message received from worker
       if (
@@ -1469,6 +1621,8 @@ export abstract class AbstractPool<
           ((this.opts.enableTasksQueue === false &&
             workerUsage.tasks.executing === 0) ||
             (this.opts.enableTasksQueue === true &&
+              workerInfo != null &&
+              !workerInfo.stealing &&
               workerUsage.tasks.executing === 0 &&
               this.tasksQueueSize(localWorkerNodeKey) === 0)))
       ) {
@@ -1722,12 +1876,17 @@ export abstract class AbstractPool<
       )
     }
     const workerInfo = this.getWorkerInfo(workerNodeKey)
+    if (workerInfo == null) {
+      throw new Error(
+        `Worker node with key '${workerNodeKey}' not found in pool`
+      )
+    }
     if (
       this.cannotStealTask() ||
       (this.info.stealingWorkerNodes ?? 0) >
         Math.floor(this.workerNodes.length / 2)
     ) {
-      if (workerInfo != null && previousStolenTask != null) {
+      if (previousStolenTask != null) {
         workerInfo.stealing = false
         this.resetTaskSequentiallyStolenStatisticsWorkerUsage(
           workerNodeKey,
@@ -1739,7 +1898,6 @@ export abstract class AbstractPool<
     }
     const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
     if (
-      workerInfo != null &&
       previousStolenTask != null &&
       (workerNodeTasksUsage.executing > 0 ||
         this.tasksQueueSize(workerNodeKey) > 0)
@@ -1752,11 +1910,6 @@ export abstract class AbstractPool<
       )
       return
     }
-    if (workerInfo == null) {
-      throw new Error(
-        `Worker node with key '${workerNodeKey}' not found in pool`
-      )
-    }
     workerInfo.stealing = true
     const stolenTask = this.workerNodeStealTask(workerNodeKey)
     if (stolenTask != null) {
@@ -1814,12 +1967,12 @@ export abstract class AbstractPool<
     ) {
       return
     }
-    const { workerId } = eventDetail
     const sizeOffset = 1
     // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
     if (this.opts.tasksQueueOptions!.size! <= sizeOffset) {
       return
     }
+    const { workerId } = eventDetail
     const sourceWorkerNode =
       this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
     const workerNodes = this.workerNodes