From 2663563da5f52fd57be41127e5c789c86e969ae7 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Wed, 14 Aug 2024 18:20:48 +0200 Subject: [PATCH] fix: trigger continuous tasks stealing under proper conditions MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- src/pools/abstract-pool.ts | 67 ++++++++++++++++++-------------------- 1 file changed, 31 insertions(+), 36 deletions(-) diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 8f7a6e47..8a418210 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -311,13 +311,6 @@ export abstract class AbstractPool< utilization: round(this.utilization), }), workerNodes: this.workerNodes.length, - idleWorkerNodes: this.workerNodes.reduce( - (accumulator, workerNode) => - workerNode.usage.tasks.executing === 0 - ? accumulator + 1 - : accumulator, - 0 - ), ...(this.opts.enableTasksQueue === true && { stealingWorkerNodes: this.workerNodes.reduce( (accumulator, workerNode) => @@ -325,6 +318,11 @@ export abstract class AbstractPool< 0 ), }), + idleWorkerNodes: this.workerNodes.reduce( + (accumulator, _, workerNodeKey) => + this.isWorkerNodeIdle(workerNodeKey) ? accumulator + 1 : accumulator, + 0 + ), busyWorkerNodes: this.workerNodes.reduce( (accumulator, _, workerNodeKey) => this.isWorkerNodeBusy(workerNodeKey) ? accumulator + 1 : accumulator, @@ -865,6 +863,16 @@ export abstract class AbstractPool< ) } + private isWorkerNodeIdle (workerNodeKey: number): boolean { + if (this.opts.enableTasksQueue === true) { + return ( + this.workerNodes[workerNodeKey].usage.tasks.executing === 0 && + this.tasksQueueSize(workerNodeKey) === 0 + ) + } + return this.workerNodes[workerNodeKey].usage.tasks.executing === 0 + } + private isWorkerNodeBusy (workerNodeKey: number): boolean { if (this.opts.enableTasksQueue === true) { return ( @@ -1657,18 +1665,14 @@ export abstract class AbstractPool< message.workerId ) const workerInfo = this.getWorkerInfo(localWorkerNodeKey) - const workerUsage = this.workerNodes[localWorkerNodeKey]?.usage // Kill message received from worker if ( isKillBehavior(KillBehaviors.HARD, message.kill) || (isKillBehavior(KillBehaviors.SOFT, message.kill) && - ((this.opts.enableTasksQueue === false && - workerUsage.tasks.executing === 0) || - (this.opts.enableTasksQueue === true && - workerUsage.tasks.executing === 0 && - this.tasksQueueSize(localWorkerNodeKey) === 0 && - workerInfo != null && - !workerInfo.continuousStealing))) + this.isWorkerNodeIdle(localWorkerNodeKey) && + workerInfo != null && + !workerInfo.continuousStealing && + !workerInfo.stealing) ) { // Flag the worker node as not ready immediately this.flagWorkerNodeAsNotReady(localWorkerNodeKey) @@ -1959,27 +1963,21 @@ export abstract class AbstractPool< } const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks if ( - this.cannotStealTask() || - (this.info.stealingWorkerNodes ?? 0) > - Math.round( - this.workerNodes.length * - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - this.opts.tasksQueueOptions!.tasksStealingRatio! - ) + !workerNodeInfo.continuousStealing && + (this.cannotStealTask() || + (this.info.stealingWorkerNodes ?? 0) > + Math.round( + this.workerNodes.length * + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this.opts.tasksQueueOptions!.tasksStealingRatio! + )) ) { - workerNodeInfo.continuousStealing = false - if (workerNodeTasksUsage.sequentiallyStolen > 0) { - this.resetTaskSequentiallyStolenStatisticsWorkerUsage( - workerNodeKey, - previousStolenTask?.name - ) - } return } if ( - workerNodeInfo.continuousStealing || - workerNodeTasksUsage.executing > 0 || - this.tasksQueueSize(workerNodeKey) > 0 + workerNodeInfo.continuousStealing && + (workerNodeTasksUsage.executing > 0 || + this.tasksQueueSize(workerNodeKey) > 0) ) { workerNodeInfo.continuousStealing = false if (workerNodeTasksUsage.sequentiallyStolen > 0) { @@ -2160,10 +2158,7 @@ export abstract class AbstractPool< // eslint-disable-next-line @typescript-eslint/no-non-null-assertion this.executeTask(workerNodeKey, this.dequeueTask(workerNodeKey)!) } - if ( - workerNodeTasksUsage.executing === 0 && - this.tasksQueueSize(workerNodeKey) === 0 - ) { + if (this.isWorkerNodeIdle(workerNodeKey)) { workerNode.emit('idle', { workerNodeKey, }) -- 2.34.1