perf: stop continuous task stealing at task executing
[poolifier.git] / src / pools / abstract-pool.ts
index a6ffcbbb3959caa7ac2480656a25798fa5223856..6081902720554c2bcdb809d37f04d5f93ed89da7 100644 (file)
@@ -405,6 +405,13 @@ export abstract class AbstractPool<
       ...(this.opts.enableTasksQueue === true && {
         backPressure: this.hasBackPressure()
       }),
+      ...(this.opts.enableTasksQueue === true && {
+        stolenTasks: this.workerNodes.reduce(
+          (accumulator, workerNode) =>
+            accumulator + workerNode.usage.tasks.stolen,
+          0
+        )
+      }),
       failedTasks: this.workerNodes.reduce(
         (accumulator, workerNode) =>
           accumulator + workerNode.usage.tasks.failed,
@@ -1253,18 +1260,29 @@ export abstract class AbstractPool<
           ...(sourceWorkerNode.popTask() as Task<Data>),
           workerId: destinationWorkerNode.info.id as number
         }
-        // Enqueue task for continuous task stealing
-        this.enqueueTask(destinationWorkerNodeKey, task)
-        // Avoid starvation
         if (
-          this.tasksQueueSize(destinationWorkerNodeKey) > 0 &&
+          this.tasksQueueSize(destinationWorkerNodeKey) === 0 &&
           destinationWorkerNode.usage.tasks.executing <
             (this.opts.tasksQueueOptions?.concurrency as number)
         ) {
-          this.executeTask(
-            destinationWorkerNodeKey,
-            this.dequeueTask(destinationWorkerNodeKey) as Task<Data>
-          )
+          this.executeTask(destinationWorkerNodeKey, task)
+        } else {
+          this.enqueueTask(destinationWorkerNodeKey, task)
+        }
+        if (destinationWorkerNode?.usage != null) {
+          ++destinationWorkerNode.usage.tasks.stolen
+        }
+        if (
+          this.shallUpdateTaskFunctionWorkerUsage(destinationWorkerNodeKey) &&
+          destinationWorkerNode.getTaskFunctionWorkerUsage(
+            task.name as string
+          ) != null
+        ) {
+          const taskFunctionWorkerUsage =
+            destinationWorkerNode.getTaskFunctionWorkerUsage(
+              task.name as string
+            ) as WorkerUsage
+          ++taskFunctionWorkerUsage.tasks.stolen
         }
         break
       }
@@ -1301,6 +1319,18 @@ export abstract class AbstractPool<
         } else {
           this.enqueueTask(workerNodeKey, task)
         }
+        if (workerNode?.usage != null) {
+          ++workerNode.usage.tasks.stolen
+        }
+        if (
+          this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
+          workerNode.getTaskFunctionWorkerUsage(task.name as string) != null
+        ) {
+          const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
+            task.name as string
+          ) as WorkerUsage
+          ++taskFunctionWorkerUsage.tasks.stolen
+        }
       }
     }
   }