Merge pull request #2343 from poolifier/combined-prs-branch
[poolifier.git] / src / pools / abstract-pool.ts
index 649d5f0d0d5e268b26f3285f3dd27aaae52a1c2d..8f57a9067e0967bf87dd303efabfe694b00bdbc6 100644 (file)
@@ -4,6 +4,7 @@ import { EventEmitterAsyncResource } from 'node:events'
 import { performance } from 'node:perf_hooks'
 import type { TransferListItem } from 'node:worker_threads'
 
+import { defaultBucketSize } from '../priority-queue.js'
 import type {
   MessageValue,
   PromiseResponseWrapper,
@@ -376,14 +377,16 @@ export abstract class AbstractPool<
           minimum: round(
             min(
               ...this.workerNodes.map(
-                workerNode => workerNode.usage.runTime.minimum ?? Infinity
+                workerNode =>
+                  workerNode.usage.runTime.minimum ?? Number.POSITIVE_INFINITY
               )
             )
           ),
           maximum: round(
             max(
               ...this.workerNodes.map(
-                workerNode => workerNode.usage.runTime.maximum ?? -Infinity
+                workerNode =>
+                  workerNode.usage.runTime.maximum ?? Number.NEGATIVE_INFINITY
               )
             )
           ),
@@ -393,7 +396,9 @@ export abstract class AbstractPool<
               average(
                 this.workerNodes.reduce<number[]>(
                   (accumulator, workerNode) =>
-                    accumulator.concat(workerNode.usage.runTime.history),
+                    accumulator.concat(
+                      workerNode.usage.runTime.history.toArray()
+                    ),
                   []
                 )
               )
@@ -405,7 +410,9 @@ export abstract class AbstractPool<
               median(
                 this.workerNodes.reduce<number[]>(
                   (accumulator, workerNode) =>
-                    accumulator.concat(workerNode.usage.runTime.history),
+                    accumulator.concat(
+                      workerNode.usage.runTime.history.toArray()
+                    ),
                   []
                 )
               )
@@ -419,14 +426,16 @@ export abstract class AbstractPool<
           minimum: round(
             min(
               ...this.workerNodes.map(
-                workerNode => workerNode.usage.waitTime.minimum ?? Infinity
+                workerNode =>
+                  workerNode.usage.waitTime.minimum ?? Number.POSITIVE_INFINITY
               )
             )
           ),
           maximum: round(
             max(
               ...this.workerNodes.map(
-                workerNode => workerNode.usage.waitTime.maximum ?? -Infinity
+                workerNode =>
+                  workerNode.usage.waitTime.maximum ?? Number.NEGATIVE_INFINITY
               )
             )
           ),
@@ -436,7 +445,9 @@ export abstract class AbstractPool<
               average(
                 this.workerNodes.reduce<number[]>(
                   (accumulator, workerNode) =>
-                    accumulator.concat(workerNode.usage.waitTime.history),
+                    accumulator.concat(
+                      workerNode.usage.waitTime.history.toArray()
+                    ),
                   []
                 )
               )
@@ -448,7 +459,9 @@ export abstract class AbstractPool<
               median(
                 this.workerNodes.reduce<number[]>(
                   (accumulator, workerNode) =>
-                    accumulator.concat(workerNode.usage.waitTime.history),
+                    accumulator.concat(
+                      workerNode.usage.waitTime.history.toArray()
+                    ),
                   []
                 )
               )
@@ -463,14 +476,18 @@ export abstract class AbstractPool<
             minimum: round(
               min(
                 ...this.workerNodes.map(
-                  workerNode => workerNode.usage.elu.idle.minimum ?? Infinity
+                  workerNode =>
+                    workerNode.usage.elu.idle.minimum ??
+                    Number.POSITIVE_INFINITY
                 )
               )
             ),
             maximum: round(
               max(
                 ...this.workerNodes.map(
-                  workerNode => workerNode.usage.elu.idle.maximum ?? -Infinity
+                  workerNode =>
+                    workerNode.usage.elu.idle.maximum ??
+                    Number.NEGATIVE_INFINITY
                 )
               )
             ),
@@ -480,7 +497,9 @@ export abstract class AbstractPool<
                 average(
                   this.workerNodes.reduce<number[]>(
                     (accumulator, workerNode) =>
-                      accumulator.concat(workerNode.usage.elu.idle.history),
+                      accumulator.concat(
+                        workerNode.usage.elu.idle.history.toArray()
+                      ),
                     []
                   )
                 )
@@ -492,7 +511,9 @@ export abstract class AbstractPool<
                 median(
                   this.workerNodes.reduce<number[]>(
                     (accumulator, workerNode) =>
-                      accumulator.concat(workerNode.usage.elu.idle.history),
+                      accumulator.concat(
+                        workerNode.usage.elu.idle.history.toArray()
+                      ),
                     []
                   )
                 )
@@ -503,14 +524,18 @@ export abstract class AbstractPool<
             minimum: round(
               min(
                 ...this.workerNodes.map(
-                  workerNode => workerNode.usage.elu.active.minimum ?? Infinity
+                  workerNode =>
+                    workerNode.usage.elu.active.minimum ??
+                    Number.POSITIVE_INFINITY
                 )
               )
             ),
             maximum: round(
               max(
                 ...this.workerNodes.map(
-                  workerNode => workerNode.usage.elu.active.maximum ?? -Infinity
+                  workerNode =>
+                    workerNode.usage.elu.active.maximum ??
+                    Number.NEGATIVE_INFINITY
                 )
               )
             ),
@@ -520,7 +545,9 @@ export abstract class AbstractPool<
                 average(
                   this.workerNodes.reduce<number[]>(
                     (accumulator, workerNode) =>
-                      accumulator.concat(workerNode.usage.elu.active.history),
+                      accumulator.concat(
+                        workerNode.usage.elu.active.history.toArray()
+                      ),
                     []
                   )
                 )
@@ -532,7 +559,9 @@ export abstract class AbstractPool<
                 median(
                   this.workerNodes.reduce<number[]>(
                     (accumulator, workerNode) =>
-                      accumulator.concat(workerNode.usage.elu.active.history),
+                      accumulator.concat(
+                        workerNode.usage.elu.active.history.toArray()
+                      ),
                     []
                   )
                 )
@@ -1445,7 +1474,7 @@ export abstract class AbstractPool<
    * Chooses a worker node for the next task.
    *
    * @param name - The task function name.
-   * @returns The chosen worker node key
+   * @returns The chosen worker node key.
    */
   private chooseWorkerNode (name?: string): number {
     if (this.shallCreateDynamicWorker()) {
@@ -1495,7 +1524,8 @@ export abstract class AbstractPool<
     ) {
       workerNode.usage.runTime.aggregate = min(
         ...this.workerNodes.map(
-          workerNode => workerNode.usage.runTime.aggregate ?? Infinity
+          workerNode =>
+            workerNode.usage.runTime.aggregate ?? Number.POSITIVE_INFINITY
         )
       )
     }
@@ -1505,7 +1535,8 @@ export abstract class AbstractPool<
     ) {
       workerNode.usage.waitTime.aggregate = min(
         ...this.workerNodes.map(
-          workerNode => workerNode.usage.waitTime.aggregate ?? Infinity
+          workerNode =>
+            workerNode.usage.waitTime.aggregate ?? Number.POSITIVE_INFINITY
         )
       )
     }
@@ -1515,7 +1546,8 @@ export abstract class AbstractPool<
     ) {
       workerNode.usage.elu.active.aggregate = min(
         ...this.workerNodes.map(
-          workerNode => workerNode.usage.elu.active.aggregate ?? Infinity
+          workerNode =>
+            workerNode.usage.elu.active.aggregate ?? Number.POSITIVE_INFINITY
         )
       )
     }
@@ -1597,6 +1629,7 @@ export abstract class AbstractPool<
       const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
         message.workerId
       )
+      const workerInfo = this.getWorkerInfo(localWorkerNodeKey)
       const workerUsage = this.workerNodes[localWorkerNodeKey]?.usage
       // Kill message received from worker
       if (
@@ -1605,6 +1638,8 @@ export abstract class AbstractPool<
           ((this.opts.enableTasksQueue === false &&
             workerUsage.tasks.executing === 0) ||
             (this.opts.enableTasksQueue === true &&
+              workerInfo != null &&
+              !workerInfo.stealing &&
               workerUsage.tasks.executing === 0 &&
               this.tasksQueueSize(localWorkerNodeKey) === 0)))
       ) {
@@ -1990,6 +2025,12 @@ export abstract class AbstractPool<
     }
   }
 
+  private setTasksQueuePriority (workerNodeKey: number): void {
+    this.workerNodes[workerNodeKey].setTasksQueuePriority(
+      this.getTasksQueuePriority()
+    )
+  }
+
   /**
    * This method is the message listener registered on each worker.
    */
@@ -2008,6 +2049,7 @@ export abstract class AbstractPool<
       if (workerInfo != null) {
         workerInfo.taskFunctionsProperties = taskFunctionsProperties
         this.sendStatisticsMessageToWorker(workerNodeKey)
+        this.setTasksQueuePriority(workerNodeKey)
       }
     } else if (taskId != null) {
       // Task execution response received from worker
@@ -2032,6 +2074,7 @@ export abstract class AbstractPool<
     workerNode.info.ready = ready
     workerNode.info.taskFunctionsProperties = taskFunctionsProperties
     this.sendStatisticsMessageToWorker(workerNodeKey)
+    this.setTasksQueuePriority(workerNodeKey)
     this.checkAndEmitReadyEvent()
   }
 
@@ -2119,6 +2162,12 @@ export abstract class AbstractPool<
     return this.workerNodes[workerNodeKey]?.info
   }
 
+  private getTasksQueuePriority (): boolean {
+    return this.listTaskFunctionsProperties().some(
+      taskFunctionProperties => taskFunctionProperties.priority != null
+    )
+  }
+
   /**
    * Creates a worker node.
    *
@@ -2136,8 +2185,8 @@ export abstract class AbstractPool<
           getDefaultTasksQueueOptions(
             this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
           ).size,
-        tasksQueueBucketSize:
-          (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers) * 2
+        tasksQueueBucketSize: defaultBucketSize,
+        tasksQueuePriority: this.getTasksQueuePriority()
       }
     )
     // Flag the worker node as ready at pool startup.