fix: ensure tasks redistribution avoid task execution starvation
[poolifier.git] / src / pools / abstract-pool.ts
index 09871491cb7a1ee5d6e7b4ded48696accb2e26e3..0b359d93b6006c124c692fda70f9fb21926691f4 100644 (file)
@@ -115,6 +115,7 @@ export abstract class AbstractPool<
     this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
     this.executeTask = this.executeTask.bind(this)
     this.enqueueTask = this.enqueueTask.bind(this)
+    this.dequeueTask = this.dequeueTask.bind(this)
     this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
 
     if (this.opts.enableEvents === true) {
@@ -995,6 +996,7 @@ export abstract class AbstractPool<
     while (this.tasksQueueSize(workerNodeKey) > 0) {
       let targetWorkerNodeKey: number = workerNodeKey
       let minQueuedTasks = Infinity
+      let executeTask = false
       for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
         const workerInfo = this.getWorkerInfo(workerNodeId)
         if (
@@ -1002,6 +1004,9 @@ export abstract class AbstractPool<
           workerInfo.ready &&
           workerNode.usage.tasks.queued === 0
         ) {
+          if (workerNode.usage.tasks.executing === 0) {
+            executeTask = true
+          }
           targetWorkerNodeKey = workerNodeId
           break
         }
@@ -1014,10 +1019,17 @@ export abstract class AbstractPool<
           targetWorkerNodeKey = workerNodeId
         }
       }
-      this.enqueueTask(
-        targetWorkerNodeKey,
-        this.dequeueTask(workerNodeKey) as Task<Data>
-      )
+      if (executeTask) {
+        this.executeTask(
+          targetWorkerNodeKey,
+          this.dequeueTask(workerNodeKey) as Task<Data>
+        )
+      } else {
+        this.enqueueTask(
+          targetWorkerNodeKey,
+          this.dequeueTask(workerNodeKey) as Task<Data>
+        )
+      }
     }
   }