docs: add changelog entry
[poolifier.git] / src / pools / abstract-pool.ts
index 8a264c1944f713be2fef1782c2e47d201711b397..ebdb0dd8924c0e18f71a456c620b1d08a6ad2e6b 100644 (file)
@@ -376,14 +376,16 @@ export abstract class AbstractPool<
           minimum: round(
             min(
               ...this.workerNodes.map(
-                workerNode => workerNode.usage.runTime.minimum ?? Infinity
+                workerNode =>
+                  workerNode.usage.runTime.minimum ?? Number.POSITIVE_INFINITY
               )
             )
           ),
           maximum: round(
             max(
               ...this.workerNodes.map(
-                workerNode => workerNode.usage.runTime.maximum ?? -Infinity
+                workerNode =>
+                  workerNode.usage.runTime.maximum ?? Number.NEGATIVE_INFINITY
               )
             )
           ),
@@ -393,7 +395,9 @@ export abstract class AbstractPool<
               average(
                 this.workerNodes.reduce<number[]>(
                   (accumulator, workerNode) =>
-                    accumulator.concat(workerNode.usage.runTime.history),
+                    accumulator.concat(
+                      workerNode.usage.runTime.history.toArray()
+                    ),
                   []
                 )
               )
@@ -405,7 +409,9 @@ export abstract class AbstractPool<
               median(
                 this.workerNodes.reduce<number[]>(
                   (accumulator, workerNode) =>
-                    accumulator.concat(workerNode.usage.runTime.history),
+                    accumulator.concat(
+                      workerNode.usage.runTime.history.toArray()
+                    ),
                   []
                 )
               )
@@ -419,14 +425,16 @@ export abstract class AbstractPool<
           minimum: round(
             min(
               ...this.workerNodes.map(
-                workerNode => workerNode.usage.waitTime.minimum ?? Infinity
+                workerNode =>
+                  workerNode.usage.waitTime.minimum ?? Number.POSITIVE_INFINITY
               )
             )
           ),
           maximum: round(
             max(
               ...this.workerNodes.map(
-                workerNode => workerNode.usage.waitTime.maximum ?? -Infinity
+                workerNode =>
+                  workerNode.usage.waitTime.maximum ?? Number.NEGATIVE_INFINITY
               )
             )
           ),
@@ -436,7 +444,9 @@ export abstract class AbstractPool<
               average(
                 this.workerNodes.reduce<number[]>(
                   (accumulator, workerNode) =>
-                    accumulator.concat(workerNode.usage.waitTime.history),
+                    accumulator.concat(
+                      workerNode.usage.waitTime.history.toArray()
+                    ),
                   []
                 )
               )
@@ -448,7 +458,9 @@ export abstract class AbstractPool<
               median(
                 this.workerNodes.reduce<number[]>(
                   (accumulator, workerNode) =>
-                    accumulator.concat(workerNode.usage.waitTime.history),
+                    accumulator.concat(
+                      workerNode.usage.waitTime.history.toArray()
+                    ),
                   []
                 )
               )
@@ -463,14 +475,18 @@ export abstract class AbstractPool<
             minimum: round(
               min(
                 ...this.workerNodes.map(
-                  workerNode => workerNode.usage.elu.idle.minimum ?? Infinity
+                  workerNode =>
+                    workerNode.usage.elu.idle.minimum ??
+                    Number.POSITIVE_INFINITY
                 )
               )
             ),
             maximum: round(
               max(
                 ...this.workerNodes.map(
-                  workerNode => workerNode.usage.elu.idle.maximum ?? -Infinity
+                  workerNode =>
+                    workerNode.usage.elu.idle.maximum ??
+                    Number.NEGATIVE_INFINITY
                 )
               )
             ),
@@ -480,7 +496,9 @@ export abstract class AbstractPool<
                 average(
                   this.workerNodes.reduce<number[]>(
                     (accumulator, workerNode) =>
-                      accumulator.concat(workerNode.usage.elu.idle.history),
+                      accumulator.concat(
+                        workerNode.usage.elu.idle.history.toArray()
+                      ),
                     []
                   )
                 )
@@ -492,7 +510,9 @@ export abstract class AbstractPool<
                 median(
                   this.workerNodes.reduce<number[]>(
                     (accumulator, workerNode) =>
-                      accumulator.concat(workerNode.usage.elu.idle.history),
+                      accumulator.concat(
+                        workerNode.usage.elu.idle.history.toArray()
+                      ),
                     []
                   )
                 )
@@ -503,14 +523,18 @@ export abstract class AbstractPool<
             minimum: round(
               min(
                 ...this.workerNodes.map(
-                  workerNode => workerNode.usage.elu.active.minimum ?? Infinity
+                  workerNode =>
+                    workerNode.usage.elu.active.minimum ??
+                    Number.POSITIVE_INFINITY
                 )
               )
             ),
             maximum: round(
               max(
                 ...this.workerNodes.map(
-                  workerNode => workerNode.usage.elu.active.maximum ?? -Infinity
+                  workerNode =>
+                    workerNode.usage.elu.active.maximum ??
+                    Number.NEGATIVE_INFINITY
                 )
               )
             ),
@@ -520,7 +544,9 @@ export abstract class AbstractPool<
                 average(
                   this.workerNodes.reduce<number[]>(
                     (accumulator, workerNode) =>
-                      accumulator.concat(workerNode.usage.elu.active.history),
+                      accumulator.concat(
+                        workerNode.usage.elu.active.history.toArray()
+                      ),
                     []
                   )
                 )
@@ -532,12 +558,30 @@ export abstract class AbstractPool<
                 median(
                   this.workerNodes.reduce<number[]>(
                     (accumulator, workerNode) =>
-                      accumulator.concat(workerNode.usage.elu.active.history),
+                      accumulator.concat(
+                        workerNode.usage.elu.active.history.toArray()
+                      ),
                     []
                   )
                 )
               )
             })
+          },
+          utilization: {
+            average: round(
+              average(
+                this.workerNodes.map(
+                  workerNode => workerNode.usage.elu.utilization ?? 0
+                )
+              )
+            ),
+            median: round(
+              median(
+                this.workerNodes.map(
+                  workerNode => workerNode.usage.elu.utilization ?? 0
+                )
+              )
+            )
           }
         }
       })
@@ -656,7 +700,7 @@ export abstract class AbstractPool<
     }
     if (requireSync) {
       this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies(
-        this.getWorkerWorkerChoiceStrategies(),
+        this.getWorkerChoiceStrategies(),
         this.opts.workerChoiceStrategyOptions
       )
       for (const workerNodeKey of this.workerNodes.keys()) {
@@ -676,7 +720,7 @@ export abstract class AbstractPool<
         this.opts.workerChoiceStrategyOptions
       )
       this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies(
-        this.getWorkerWorkerChoiceStrategies(),
+        this.getWorkerChoiceStrategies(),
         this.opts.workerChoiceStrategyOptions
       )
       for (const workerNodeKey of this.workerNodes.keys()) {
@@ -956,7 +1000,7 @@ export abstract class AbstractPool<
     })
     this.taskFunctions.set(name, fn)
     this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies(
-      this.getWorkerWorkerChoiceStrategies()
+      this.getWorkerChoiceStrategies()
     )
     for (const workerNodeKey of this.workerNodes.keys()) {
       this.sendStatisticsMessageToWorker(workerNodeKey)
@@ -983,7 +1027,7 @@ export abstract class AbstractPool<
     }
     this.taskFunctions.delete(name)
     this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies(
-      this.getWorkerWorkerChoiceStrategies()
+      this.getWorkerChoiceStrategies()
     )
     for (const workerNodeKey of this.workerNodes.keys()) {
       this.sendStatisticsMessageToWorker(workerNodeKey)
@@ -1005,20 +1049,48 @@ export abstract class AbstractPool<
   }
 
   /**
-   * Gets task function strategy, if any.
+   * Gets task function worker choice strategy, if any.
    *
    * @param name - The task function name.
    * @returns The task function worker choice strategy if the task function worker choice strategy is defined, `undefined` otherwise.
    */
-  private readonly getTaskFunctionWorkerWorkerChoiceStrategy = (
+  private readonly getTaskFunctionWorkerChoiceStrategy = (
+    name?: string
+  ): WorkerChoiceStrategy | undefined => {
+    name = name ?? DEFAULT_TASK_NAME
+    const taskFunctionsProperties = this.listTaskFunctionsProperties()
+    if (name === DEFAULT_TASK_NAME) {
+      name = taskFunctionsProperties[1]?.name
+    }
+    return taskFunctionsProperties.find(
+      (taskFunctionProperties: TaskFunctionProperties) =>
+        taskFunctionProperties.name === name
+    )?.strategy
+  }
+
+  /**
+   * Gets worker node task function worker choice strategy, if any.
+   *
+   * @param workerNodeKey - The worker node key.
+   * @param name - The task function name.
+   * @returns The worker node task function worker choice strategy if the worker node task function worker choice strategy is defined, `undefined` otherwise.
+   */
+  private readonly getWorkerNodeTaskFunctionWorkerChoiceStrategy = (
+    workerNodeKey: number,
     name?: string
   ): WorkerChoiceStrategy | undefined => {
-    if (name != null) {
-      return this.listTaskFunctionsProperties().find(
-        (taskFunctionProperties: TaskFunctionProperties) =>
-          taskFunctionProperties.name === name
-      )?.strategy
+    const workerInfo = this.getWorkerInfo(workerNodeKey)
+    if (workerInfo == null) {
+      return
     }
+    name = name ?? DEFAULT_TASK_NAME
+    if (name === DEFAULT_TASK_NAME) {
+      name = workerInfo.taskFunctionsProperties?.[1]?.name
+    }
+    return workerInfo.taskFunctionsProperties?.find(
+      (taskFunctionProperties: TaskFunctionProperties) =>
+        taskFunctionProperties.name === name
+    )?.strategy
   }
 
   /**
@@ -1026,18 +1098,24 @@ export abstract class AbstractPool<
    *
    * @param workerNodeKey - The worker node key.
    * @param name - The task function name.
-   * @returns The task function worker choice priority if the task function worker choice priority is defined, `undefined` otherwise.
+   * @returns The worker node task function priority if the worker node task function priority is defined, `undefined` otherwise.
    */
   private readonly getWorkerNodeTaskFunctionPriority = (
     workerNodeKey: number,
     name?: string
   ): number | undefined => {
-    if (name != null) {
-      return this.getWorkerInfo(workerNodeKey)?.taskFunctionsProperties?.find(
-        (taskFunctionProperties: TaskFunctionProperties) =>
-          taskFunctionProperties.name === name
-      )?.priority
+    const workerInfo = this.getWorkerInfo(workerNodeKey)
+    if (workerInfo == null) {
+      return
+    }
+    name = name ?? DEFAULT_TASK_NAME
+    if (name === DEFAULT_TASK_NAME) {
+      name = workerInfo.taskFunctionsProperties?.[1]?.name
     }
+    return workerInfo.taskFunctionsProperties?.find(
+      (taskFunctionProperties: TaskFunctionProperties) =>
+        taskFunctionProperties.name === name
+    )?.priority
   }
 
   /**
@@ -1045,7 +1123,7 @@ export abstract class AbstractPool<
    *
    * @returns The worker choice strategies.
    */
-  private readonly getWorkerWorkerChoiceStrategies =
+  private readonly getWorkerChoiceStrategies =
     (): Set<WorkerChoiceStrategy> => {
       return new Set([
         // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
@@ -1113,15 +1191,16 @@ export abstract class AbstractPool<
         return
       }
       const timestamp = performance.now()
-      const taskFunctionStrategy =
-        this.getTaskFunctionWorkerWorkerChoiceStrategy(name)
-      const workerNodeKey = this.chooseWorkerNode(taskFunctionStrategy)
+      const workerNodeKey = this.chooseWorkerNode(name)
       const task: Task<Data> = {
         name: name ?? DEFAULT_TASK_NAME,
         // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
         data: data ?? ({} as Data),
         priority: this.getWorkerNodeTaskFunctionPriority(workerNodeKey, name),
-        strategy: taskFunctionStrategy,
+        strategy: this.getWorkerNodeTaskFunctionWorkerChoiceStrategy(
+          workerNodeKey,
+          name
+        ),
         transferList,
         timestamp,
         taskId: randomUUID()
@@ -1391,14 +1470,12 @@ export abstract class AbstractPool<
   }
 
   /**
-   * Chooses a worker node for the next task given the worker choice strategy.
+   * Chooses a worker node for the next task.
    *
-   * @param workerChoiceStrategy - The worker choice strategy.
-   * @returns The chosen worker node key
+   * @param name - The task function name.
+   * @returns The chosen worker node key.
    */
-  private chooseWorkerNode (
-    workerChoiceStrategy?: WorkerChoiceStrategy
-  ): number {
+  private chooseWorkerNode (name?: string): number {
     if (this.shallCreateDynamicWorker()) {
       const workerNodeKey = this.createAndSetupDynamicWorkerNode()
       if (
@@ -1409,7 +1486,9 @@ export abstract class AbstractPool<
       }
     }
     // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
-    return this.workerChoiceStrategiesContext!.execute(workerChoiceStrategy)
+    return this.workerChoiceStrategiesContext!.execute(
+      this.getTaskFunctionWorkerChoiceStrategy(name)
+    )
   }
 
   /**
@@ -1444,7 +1523,8 @@ export abstract class AbstractPool<
     ) {
       workerNode.usage.runTime.aggregate = min(
         ...this.workerNodes.map(
-          workerNode => workerNode.usage.runTime.aggregate ?? Infinity
+          workerNode =>
+            workerNode.usage.runTime.aggregate ?? Number.POSITIVE_INFINITY
         )
       )
     }
@@ -1454,7 +1534,8 @@ export abstract class AbstractPool<
     ) {
       workerNode.usage.waitTime.aggregate = min(
         ...this.workerNodes.map(
-          workerNode => workerNode.usage.waitTime.aggregate ?? Infinity
+          workerNode =>
+            workerNode.usage.waitTime.aggregate ?? Number.POSITIVE_INFINITY
         )
       )
     }
@@ -1464,7 +1545,8 @@ export abstract class AbstractPool<
     ) {
       workerNode.usage.elu.active.aggregate = min(
         ...this.workerNodes.map(
-          workerNode => workerNode.usage.elu.active.aggregate ?? Infinity
+          workerNode =>
+            workerNode.usage.elu.active.aggregate ?? Number.POSITIVE_INFINITY
         )
       )
     }
@@ -1546,6 +1628,7 @@ export abstract class AbstractPool<
       const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
         message.workerId
       )
+      const workerInfo = this.getWorkerInfo(localWorkerNodeKey)
       const workerUsage = this.workerNodes[localWorkerNodeKey]?.usage
       // Kill message received from worker
       if (
@@ -1554,6 +1637,8 @@ export abstract class AbstractPool<
           ((this.opts.enableTasksQueue === false &&
             workerUsage.tasks.executing === 0) ||
             (this.opts.enableTasksQueue === true &&
+              workerInfo != null &&
+              !workerInfo.stealing &&
               workerUsage.tasks.executing === 0 &&
               this.tasksQueueSize(localWorkerNodeKey) === 0)))
       ) {