From 5bc91f3ed2c64188ebea92776fe0250a716b7c5f Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Mon, 3 Jul 2023 17:37:07 +0200 Subject: [PATCH] fix: reassign queued tasks on worker error MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- CHANGELOG.md | 4 ++++ src/pools/abstract-pool.ts | 27 +++++++++++++++++++++++++++ 2 files changed, 31 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index dac2f30c..4b767cf7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed + +- Ensure worker queued tasks at error are reassigned to other pool workers. + ### Added - Add pool `utilization` ratio to pool information. diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index e9be3476..69f6a592 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -772,6 +772,33 @@ export abstract class AbstractPool< if (this.emitter != null) { this.emitter.emit(PoolEvents.error, error) } + if (this.opts.enableTasksQueue === true) { + const workerNodeKey = this.getWorkerNodeKey(worker) + while (this.tasksQueueSize(workerNodeKey) > 0) { + let targetWorkerNodeKey: number = workerNodeKey + let minQueuedTasks = Infinity + for (const [workerNodeId, workerNode] of this.workerNodes.entries()) { + if ( + workerNodeId !== workerNodeKey && + workerNode.usage.tasks.queued === 0 + ) { + targetWorkerNodeKey = workerNodeId + break + } + if ( + workerNodeId !== workerNodeKey && + workerNode.usage.tasks.queued < minQueuedTasks + ) { + minQueuedTasks = workerNode.usage.tasks.queued + targetWorkerNodeKey = workerNodeId + } + } + this.enqueueTask( + targetWorkerNodeKey, + this.dequeueTask(workerNodeKey) as Task + ) + } + } if (this.opts.restartWorkerOnError === true) { this.createAndSetupWorker() } -- 2.34.1