fix: reassign queued tasks on worker error
authorJérôme Benoit <jerome.benoit@sap.com>
Mon, 3 Jul 2023 15:37:07 +0000 (17:37 +0200)
committerJérôme Benoit <jerome.benoit@sap.com>
Mon, 3 Jul 2023 15:37:07 +0000 (17:37 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
CHANGELOG.md
src/pools/abstract-pool.ts

index dac2f30c8affae28850d1e7df40d0d975e32331f..4b767cf7b87266b4daf47c1d76100f1624b78a6e 100644 (file)
@@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
 
 ## [Unreleased]
 
+### Fixed
+
+- Ensure worker queued tasks at error are reassigned to other pool workers.
+
 ### Added
 
 - Add pool `utilization` ratio to pool information.
index e9be347603df21b2f56c025438d81e78a0790d31..69f6a592162ee1323fc5681a3905604bfb7782c6 100644 (file)
@@ -772,6 +772,33 @@ export abstract class AbstractPool<
       if (this.emitter != null) {
         this.emitter.emit(PoolEvents.error, error)
       }
+      if (this.opts.enableTasksQueue === true) {
+        const workerNodeKey = this.getWorkerNodeKey(worker)
+        while (this.tasksQueueSize(workerNodeKey) > 0) {
+          let targetWorkerNodeKey: number = workerNodeKey
+          let minQueuedTasks = Infinity
+          for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
+            if (
+              workerNodeId !== workerNodeKey &&
+              workerNode.usage.tasks.queued === 0
+            ) {
+              targetWorkerNodeKey = workerNodeId
+              break
+            }
+            if (
+              workerNodeId !== workerNodeKey &&
+              workerNode.usage.tasks.queued < minQueuedTasks
+            ) {
+              minQueuedTasks = workerNode.usage.tasks.queued
+              targetWorkerNodeKey = workerNodeId
+            }
+          }
+          this.enqueueTask(
+            targetWorkerNodeKey,
+            this.dequeueTask(workerNodeKey) as Task<Data>
+          )
+        }
+      }
       if (this.opts.restartWorkerOnError === true) {
         this.createAndSetupWorker()
       }