fix: ensure tasks redistribution avoid task execution starvation
authorJérôme Benoit <jerome.benoit@sap.com>
Fri, 21 Jul 2023 13:22:18 +0000 (15:22 +0200)
committerJérôme Benoit <jerome.benoit@sap.com>
Fri, 21 Jul 2023 13:22:18 +0000 (15:22 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
CHANGELOG.md
src/pools/abstract-pool.ts

index 4bdbedb70db4829af4d162123bb405f02d29c9ab..d424256f85a22a34505951cba68c33efb3c1de37 100644 (file)
@@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
 
 ## [Unreleased]
 
+### Fixed
+
+- Fix queued tasks redistribution on error task execution starvation.
+
 ### Changed
 
 - Drastically reduce lookups by worker in the worker nodes.
index 09871491cb7a1ee5d6e7b4ded48696accb2e26e3..0b359d93b6006c124c692fda70f9fb21926691f4 100644 (file)
@@ -115,6 +115,7 @@ export abstract class AbstractPool<
     this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
     this.executeTask = this.executeTask.bind(this)
     this.enqueueTask = this.enqueueTask.bind(this)
+    this.dequeueTask = this.dequeueTask.bind(this)
     this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
 
     if (this.opts.enableEvents === true) {
@@ -995,6 +996,7 @@ export abstract class AbstractPool<
     while (this.tasksQueueSize(workerNodeKey) > 0) {
       let targetWorkerNodeKey: number = workerNodeKey
       let minQueuedTasks = Infinity
+      let executeTask = false
       for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
         const workerInfo = this.getWorkerInfo(workerNodeId)
         if (
@@ -1002,6 +1004,9 @@ export abstract class AbstractPool<
           workerInfo.ready &&
           workerNode.usage.tasks.queued === 0
         ) {
+          if (workerNode.usage.tasks.executing === 0) {
+            executeTask = true
+          }
           targetWorkerNodeKey = workerNodeId
           break
         }
@@ -1014,10 +1019,17 @@ export abstract class AbstractPool<
           targetWorkerNodeKey = workerNodeId
         }
       }
-      this.enqueueTask(
-        targetWorkerNodeKey,
-        this.dequeueTask(workerNodeKey) as Task<Data>
-      )
+      if (executeTask) {
+        this.executeTask(
+          targetWorkerNodeKey,
+          this.dequeueTask(workerNodeKey) as Task<Data>
+        )
+      } else {
+        this.enqueueTask(
+          targetWorkerNodeKey,
+          this.dequeueTask(workerNodeKey) as Task<Data>
+        )
+      }
     }
   }