chore: v4.0.13
[poolifier.git] / src / pools / abstract-pool.ts
index a23c7983bfc652fbfcb8ece4a75853dbed270ac5..8f57a9067e0967bf87dd303efabfe694b00bdbc6 100644 (file)
@@ -4,6 +4,7 @@ import { EventEmitterAsyncResource } from 'node:events'
 import { performance } from 'node:perf_hooks'
 import type { TransferListItem } from 'node:worker_threads'
 
+import { defaultBucketSize } from '../priority-queue.js'
 import type {
   MessageValue,
   PromiseResponseWrapper,
@@ -395,7 +396,9 @@ export abstract class AbstractPool<
               average(
                 this.workerNodes.reduce<number[]>(
                   (accumulator, workerNode) =>
-                    accumulator.concat(workerNode.usage.runTime.history),
+                    accumulator.concat(
+                      workerNode.usage.runTime.history.toArray()
+                    ),
                   []
                 )
               )
@@ -407,7 +410,9 @@ export abstract class AbstractPool<
               median(
                 this.workerNodes.reduce<number[]>(
                   (accumulator, workerNode) =>
-                    accumulator.concat(workerNode.usage.runTime.history),
+                    accumulator.concat(
+                      workerNode.usage.runTime.history.toArray()
+                    ),
                   []
                 )
               )
@@ -440,7 +445,9 @@ export abstract class AbstractPool<
               average(
                 this.workerNodes.reduce<number[]>(
                   (accumulator, workerNode) =>
-                    accumulator.concat(workerNode.usage.waitTime.history),
+                    accumulator.concat(
+                      workerNode.usage.waitTime.history.toArray()
+                    ),
                   []
                 )
               )
@@ -452,7 +459,9 @@ export abstract class AbstractPool<
               median(
                 this.workerNodes.reduce<number[]>(
                   (accumulator, workerNode) =>
-                    accumulator.concat(workerNode.usage.waitTime.history),
+                    accumulator.concat(
+                      workerNode.usage.waitTime.history.toArray()
+                    ),
                   []
                 )
               )
@@ -488,7 +497,9 @@ export abstract class AbstractPool<
                 average(
                   this.workerNodes.reduce<number[]>(
                     (accumulator, workerNode) =>
-                      accumulator.concat(workerNode.usage.elu.idle.history),
+                      accumulator.concat(
+                        workerNode.usage.elu.idle.history.toArray()
+                      ),
                     []
                   )
                 )
@@ -500,7 +511,9 @@ export abstract class AbstractPool<
                 median(
                   this.workerNodes.reduce<number[]>(
                     (accumulator, workerNode) =>
-                      accumulator.concat(workerNode.usage.elu.idle.history),
+                      accumulator.concat(
+                        workerNode.usage.elu.idle.history.toArray()
+                      ),
                     []
                   )
                 )
@@ -532,7 +545,9 @@ export abstract class AbstractPool<
                 average(
                   this.workerNodes.reduce<number[]>(
                     (accumulator, workerNode) =>
-                      accumulator.concat(workerNode.usage.elu.active.history),
+                      accumulator.concat(
+                        workerNode.usage.elu.active.history.toArray()
+                      ),
                     []
                   )
                 )
@@ -544,7 +559,9 @@ export abstract class AbstractPool<
                 median(
                   this.workerNodes.reduce<number[]>(
                     (accumulator, workerNode) =>
-                      accumulator.concat(workerNode.usage.elu.active.history),
+                      accumulator.concat(
+                        workerNode.usage.elu.active.history.toArray()
+                      ),
                     []
                   )
                 )
@@ -1457,7 +1474,7 @@ export abstract class AbstractPool<
    * Chooses a worker node for the next task.
    *
    * @param name - The task function name.
-   * @returns The chosen worker node key
+   * @returns The chosen worker node key.
    */
   private chooseWorkerNode (name?: string): number {
     if (this.shallCreateDynamicWorker()) {
@@ -2008,6 +2025,12 @@ export abstract class AbstractPool<
     }
   }
 
+  private setTasksQueuePriority (workerNodeKey: number): void {
+    this.workerNodes[workerNodeKey].setTasksQueuePriority(
+      this.getTasksQueuePriority()
+    )
+  }
+
   /**
    * This method is the message listener registered on each worker.
    */
@@ -2026,6 +2049,7 @@ export abstract class AbstractPool<
       if (workerInfo != null) {
         workerInfo.taskFunctionsProperties = taskFunctionsProperties
         this.sendStatisticsMessageToWorker(workerNodeKey)
+        this.setTasksQueuePriority(workerNodeKey)
       }
     } else if (taskId != null) {
       // Task execution response received from worker
@@ -2050,6 +2074,7 @@ export abstract class AbstractPool<
     workerNode.info.ready = ready
     workerNode.info.taskFunctionsProperties = taskFunctionsProperties
     this.sendStatisticsMessageToWorker(workerNodeKey)
+    this.setTasksQueuePriority(workerNodeKey)
     this.checkAndEmitReadyEvent()
   }
 
@@ -2137,6 +2162,12 @@ export abstract class AbstractPool<
     return this.workerNodes[workerNodeKey]?.info
   }
 
+  private getTasksQueuePriority (): boolean {
+    return this.listTaskFunctionsProperties().some(
+      taskFunctionProperties => taskFunctionProperties.priority != null
+    )
+  }
+
   /**
    * Creates a worker node.
    *
@@ -2154,8 +2185,8 @@ export abstract class AbstractPool<
           getDefaultTasksQueueOptions(
             this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
           ).size,
-        tasksQueueBucketSize:
-          (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers) * 2
+        tasksQueueBucketSize: defaultBucketSize,
+        tasksQueuePriority: this.getTasksQueuePriority()
       }
     )
     // Flag the worker node as ready at pool startup.