chore: v2.6.35
[poolifier.git] / src / pools / abstract-pool.ts
index a11a574147cf074dc938d1e0d47b3797e6e15ad2..6081902720554c2bcdb809d37f04d5f93ed89da7 100644 (file)
@@ -405,6 +405,13 @@ export abstract class AbstractPool<
       ...(this.opts.enableTasksQueue === true && {
         backPressure: this.hasBackPressure()
       }),
+      ...(this.opts.enableTasksQueue === true && {
+        stolenTasks: this.workerNodes.reduce(
+          (accumulator, workerNode) =>
+            accumulator + workerNode.usage.tasks.stolen,
+          0
+        )
+      }),
       failedTasks: this.workerNodes.reduce(
         (accumulator, workerNode) =>
           accumulator + workerNode.usage.tasks.failed,
@@ -427,21 +434,26 @@ export abstract class AbstractPool<
               )
             )
           ),
-          average: round(
-            average(
-              this.workerNodes.reduce<number[]>(
-                (accumulator, workerNode) =>
-                  accumulator.concat(workerNode.usage.runTime.history),
-                []
+          ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+            .runTime.average && {
+            average: round(
+              average(
+                this.workerNodes.reduce<number[]>(
+                  (accumulator, workerNode) =>
+                    accumulator.concat(workerNode.usage.runTime.history),
+                  []
+                )
               )
             )
-          ),
+          }),
           ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
             .runTime.median && {
             median: round(
               median(
-                this.workerNodes.map(
-                  (workerNode) => workerNode.usage.runTime?.median ?? 0
+                this.workerNodes.reduce<number[]>(
+                  (accumulator, workerNode) =>
+                    accumulator.concat(workerNode.usage.runTime.history),
+                  []
                 )
               )
             )
@@ -465,21 +477,26 @@ export abstract class AbstractPool<
               )
             )
           ),
-          average: round(
-            average(
-              this.workerNodes.reduce<number[]>(
-                (accumulator, workerNode) =>
-                  accumulator.concat(workerNode.usage.waitTime.history),
-                []
+          ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+            .waitTime.average && {
+            average: round(
+              average(
+                this.workerNodes.reduce<number[]>(
+                  (accumulator, workerNode) =>
+                    accumulator.concat(workerNode.usage.waitTime.history),
+                  []
+                )
               )
             )
-          ),
+          }),
           ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
             .waitTime.median && {
             median: round(
               median(
-                this.workerNodes.map(
-                  (workerNode) => workerNode.usage.waitTime?.median ?? 0
+                this.workerNodes.reduce<number[]>(
+                  (accumulator, workerNode) =>
+                    accumulator.concat(workerNode.usage.waitTime.history),
+                  []
                 )
               )
             )
@@ -775,6 +792,7 @@ export abstract class AbstractPool<
       if (
         this.opts.enableTasksQueue === false ||
         (this.opts.enableTasksQueue === true &&
+          this.tasksQueueSize(workerNodeKey) === 0 &&
           this.workerNodes[workerNodeKey].usage.tasks.executing <
             (this.opts.tasksQueueOptions?.concurrency as number))
       ) {
@@ -826,7 +844,7 @@ export abstract class AbstractPool<
    * @virtual
    */
   protected setupHook (): void {
-    // Intentionally empty
+    /** Intentionally empty */
   }
 
   /**
@@ -1189,15 +1207,8 @@ export abstract class AbstractPool<
     while (this.tasksQueueSize(workerNodeKey) > 0) {
       let destinationWorkerNodeKey!: number
       let minQueuedTasks = Infinity
-      let executeTask = false
       for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
         if (workerNode.info.ready && workerNodeId !== workerNodeKey) {
-          if (
-            workerNode.usage.tasks.executing <
-            (this.opts.tasksQueueOptions?.concurrency as number)
-          ) {
-            executeTask = true
-          }
           if (workerNode.usage.tasks.queued === 0) {
             destinationWorkerNodeKey = workerNodeId
             break
@@ -1209,12 +1220,16 @@ export abstract class AbstractPool<
         }
       }
       if (destinationWorkerNodeKey != null) {
+        const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
         const task = {
           ...(this.dequeueTask(workerNodeKey) as Task<Data>),
-          workerId: (this.getWorkerInfo(destinationWorkerNodeKey) as WorkerInfo)
-            .id as number
+          workerId: destinationWorkerNode.info.id as number
         }
-        if (executeTask) {
+        if (
+          this.tasksQueueSize(destinationWorkerNodeKey) === 0 &&
+          destinationWorkerNode.usage.tasks.executing <
+            (this.opts.tasksQueueOptions?.concurrency as number)
+        ) {
           this.executeTask(destinationWorkerNodeKey, task)
         } else {
           this.enqueueTask(destinationWorkerNodeKey, task)
@@ -1246,13 +1261,29 @@ export abstract class AbstractPool<
           workerId: destinationWorkerNode.info.id as number
         }
         if (
+          this.tasksQueueSize(destinationWorkerNodeKey) === 0 &&
           destinationWorkerNode.usage.tasks.executing <
-          (this.opts.tasksQueueOptions?.concurrency as number)
+            (this.opts.tasksQueueOptions?.concurrency as number)
         ) {
           this.executeTask(destinationWorkerNodeKey, task)
         } else {
           this.enqueueTask(destinationWorkerNodeKey, task)
         }
+        if (destinationWorkerNode?.usage != null) {
+          ++destinationWorkerNode.usage.tasks.stolen
+        }
+        if (
+          this.shallUpdateTaskFunctionWorkerUsage(destinationWorkerNodeKey) &&
+          destinationWorkerNode.getTaskFunctionWorkerUsage(
+            task.name as string
+          ) != null
+        ) {
+          const taskFunctionWorkerUsage =
+            destinationWorkerNode.getTaskFunctionWorkerUsage(
+              task.name as string
+            ) as WorkerUsage
+          ++taskFunctionWorkerUsage.tasks.stolen
+        }
         break
       }
     }
@@ -1280,13 +1311,26 @@ export abstract class AbstractPool<
           workerId: workerNode.info.id as number
         }
         if (
+          this.tasksQueueSize(workerNodeKey) === 0 &&
           workerNode.usage.tasks.executing <
-          (this.opts.tasksQueueOptions?.concurrency as number)
+            (this.opts.tasksQueueOptions?.concurrency as number)
         ) {
           this.executeTask(workerNodeKey, task)
         } else {
           this.enqueueTask(workerNodeKey, task)
         }
+        if (workerNode?.usage != null) {
+          ++workerNode.usage.tasks.stolen
+        }
+        if (
+          this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
+          workerNode.getTaskFunctionWorkerUsage(task.name as string) != null
+        ) {
+          const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
+            task.name as string
+          ) as WorkerUsage
+          ++taskFunctionWorkerUsage.tasks.stolen
+        }
       }
     }
   }