feat: continuous task stealing
[poolifier.git] / src / pools / abstract-pool.ts
index 062df07791af8666fd94282eac229aef466c7a1b..4abbd615efe13869feaff7b9e1b19ee584b17990 100644 (file)
@@ -405,6 +405,13 @@ export abstract class AbstractPool<
       ...(this.opts.enableTasksQueue === true && {
         backPressure: this.hasBackPressure()
       }),
+      ...(this.opts.enableTasksQueue === true && {
+        stolenTasks: this.workerNodes.reduce(
+          (accumulator, workerNode) =>
+            accumulator + workerNode.usage.tasks.stolen,
+          0
+        )
+      }),
       failedTasks: this.workerNodes.reduce(
         (accumulator, workerNode) =>
           accumulator + workerNode.usage.tasks.failed,
@@ -1262,6 +1269,14 @@ export abstract class AbstractPool<
         } else {
           this.enqueueTask(destinationWorkerNodeKey, task)
         }
+        ++destinationWorkerNode.usage.tasks.stolen
+        if (this.shallUpdateTaskFunctionWorkerUsage(destinationWorkerNodeKey)) {
+          const taskFunctionWorkerUsage =
+            destinationWorkerNode.getTaskFunctionWorkerUsage(
+              task.name as string
+            ) as WorkerUsage
+          ++taskFunctionWorkerUsage.tasks.stolen
+        }
         break
       }
     }
@@ -1297,6 +1312,13 @@ export abstract class AbstractPool<
         } else {
           this.enqueueTask(workerNodeKey, task)
         }
+        ++workerNode.usage.tasks.stolen
+        if (this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey)) {
+          const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
+            task.name as string
+          ) as WorkerUsage
+          ++taskFunctionWorkerUsage.tasks.stolen
+        }
       }
     }
   }