From: Jérôme Benoit Date: Mon, 3 Jul 2023 15:37:07 +0000 (+0200) Subject: fix: reassign queued tasks on worker error X-Git-Tag: v2.6.7~6 X-Git-Url: https://git.piment-noir.org/?a=commitdiff_plain;h=5bc91f3ed2c64188ebea92776fe0250a716b7c5f;p=poolifier.git fix: reassign queued tasks on worker error Signed-off-by: Jérôme Benoit --- diff --git a/CHANGELOG.md b/CHANGELOG.md index dac2f30c..4b767cf7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed + +- Ensure worker queued tasks at error are reassigned to other pool workers. + ### Added - Add pool `utilization` ratio to pool information. diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index e9be3476..69f6a592 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -772,6 +772,33 @@ export abstract class AbstractPool< if (this.emitter != null) { this.emitter.emit(PoolEvents.error, error) } + if (this.opts.enableTasksQueue === true) { + const workerNodeKey = this.getWorkerNodeKey(worker) + while (this.tasksQueueSize(workerNodeKey) > 0) { + let targetWorkerNodeKey: number = workerNodeKey + let minQueuedTasks = Infinity + for (const [workerNodeId, workerNode] of this.workerNodes.entries()) { + if ( + workerNodeId !== workerNodeKey && + workerNode.usage.tasks.queued === 0 + ) { + targetWorkerNodeKey = workerNodeId + break + } + if ( + workerNodeId !== workerNodeKey && + workerNode.usage.tasks.queued < minQueuedTasks + ) { + minQueuedTasks = workerNode.usage.tasks.queued + targetWorkerNodeKey = workerNodeId + } + } + this.enqueueTask( + targetWorkerNodeKey, + this.dequeueTask(workerNodeKey) as Task + ) + } + } if (this.opts.restartWorkerOnError === true) { this.createAndSetupWorker() }