fix: avoid cascading tasks stealing under back pressure
[poolifier.git] / src / pools / abstract-pool.ts
index 7b0d541d8bcafaaaab1150fa01067c66acea30c2..507fcc10e91e809b13948b85325e413c1ccac621 100644 (file)
@@ -1190,7 +1190,7 @@ export abstract class AbstractPool<
 
   private redistributeQueuedTasks (workerNodeKey: number): void {
     while (this.tasksQueueSize(workerNodeKey) > 0) {
-      let destinationWorkerNodeKey: number = workerNodeKey
+      let destinationWorkerNodeKey!: number
       let minQueuedTasks = Infinity
       let executeTask = false
       for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
@@ -1211,15 +1211,17 @@ export abstract class AbstractPool<
           }
         }
       }
-      const task = {
-        ...(this.dequeueTask(workerNodeKey) as Task<Data>),
-        workerId: (this.getWorkerInfo(destinationWorkerNodeKey) as WorkerInfo)
-          .id as number
-      }
-      if (executeTask) {
-        this.executeTask(destinationWorkerNodeKey, task)
-      } else {
-        this.enqueueTask(destinationWorkerNodeKey, task)
+      if (destinationWorkerNodeKey != null) {
+        const task = {
+          ...(this.dequeueTask(workerNodeKey) as Task<Data>),
+          workerId: (this.getWorkerInfo(destinationWorkerNodeKey) as WorkerInfo)
+            .id as number
+        }
+        if (executeTask) {
+          this.executeTask(destinationWorkerNodeKey, task)
+        } else {
+          this.enqueueTask(destinationWorkerNodeKey, task)
+        }
       }
     }
   }
@@ -1234,6 +1236,9 @@ export abstract class AbstractPool<
           workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
       )
     for (const sourceWorkerNode of workerNodes) {
+      if (sourceWorkerNode.usage.tasks.queued === 0) {
+        break
+      }
       if (
         sourceWorkerNode.info.ready &&
         sourceWorkerNode.info.id !== workerId &&
@@ -1267,10 +1272,11 @@ export abstract class AbstractPool<
       )
     for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
       if (
+        sourceWorkerNode.usage.tasks.queued > 0 &&
         workerNode.info.ready &&
         workerNode.info.id !== workerId &&
-        sourceWorkerNode.usage.tasks.queued > 0 &&
-        !workerNode.hasBackPressure()
+        workerNode.usage.tasks.queued <
+          (this.opts.tasksQueueOptions?.size as number) - 1
       ) {
         const task = {
           ...(sourceWorkerNode.popTask() as Task<Data>),