From 10ecf8fd5f751ffb5477284ae1b6935f3c81ec2d Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Fri, 21 Jul 2023 15:22:18 +0200 Subject: [PATCH] fix: ensure tasks redistribution avoid task execution starvation MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- CHANGELOG.md | 4 ++++ src/pools/abstract-pool.ts | 20 ++++++++++++++++---- 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4bdbedb7..d424256f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed + +- Fix queued tasks redistribution on error task execution starvation. + ### Changed - Drastically reduce lookups by worker in the worker nodes. diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 09871491..0b359d93 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -115,6 +115,7 @@ export abstract class AbstractPool< this.chooseWorkerNode = this.chooseWorkerNode.bind(this) this.executeTask = this.executeTask.bind(this) this.enqueueTask = this.enqueueTask.bind(this) + this.dequeueTask = this.dequeueTask.bind(this) this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this) if (this.opts.enableEvents === true) { @@ -995,6 +996,7 @@ export abstract class AbstractPool< while (this.tasksQueueSize(workerNodeKey) > 0) { let targetWorkerNodeKey: number = workerNodeKey let minQueuedTasks = Infinity + let executeTask = false for (const [workerNodeId, workerNode] of this.workerNodes.entries()) { const workerInfo = this.getWorkerInfo(workerNodeId) if ( @@ -1002,6 +1004,9 @@ export abstract class AbstractPool< workerInfo.ready && workerNode.usage.tasks.queued === 0 ) { + if (workerNode.usage.tasks.executing === 0) { + executeTask = true + } targetWorkerNodeKey = workerNodeId break } @@ -1014,10 +1019,17 @@ export abstract class AbstractPool< targetWorkerNodeKey = workerNodeId } } - this.enqueueTask( - targetWorkerNodeKey, - this.dequeueTask(workerNodeKey) as Task - ) + if (executeTask) { + this.executeTask( + targetWorkerNodeKey, + this.dequeueTask(workerNodeKey) as Task + ) + } else { + this.enqueueTask( + targetWorkerNodeKey, + this.dequeueTask(workerNodeKey) as Task + ) + } } } -- 2.34.1