From 2a8bfdff6ba9391ec6dd0f733b23d39633583bcb Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Mon, 26 Aug 2024 01:35:02 +0200 Subject: [PATCH] refactor: cleanup worker node state conditions check MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- src/pools/abstract-pool.ts | 49 ++++++++++++++++---------------------- 1 file changed, 20 insertions(+), 29 deletions(-) diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 43880fb8..6bee0367 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -302,8 +302,7 @@ export abstract class AbstractPool< const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks if ( workerNodeInfo.continuousStealing && - (workerNodeTasksUsage.executing > 0 || - this.tasksQueueSize(workerNodeKey) > 0) + !this.isWorkerNodeIdle(workerNodeKey) ) { workerNodeInfo.continuousStealing = false if (workerNodeTasksUsage.sequentiallyStolen > 0) { @@ -802,22 +801,12 @@ export abstract class AbstractPool< * @returns Worker nodes busyness boolean status. */ protected internalBusy (): boolean { - if (this.opts.enableTasksQueue === true) { - return ( - this.workerNodes.findIndex( - workerNode => - workerNode.info.ready && - workerNode.usage.tasks.executing < - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - this.opts.tasksQueueOptions!.concurrency! - ) === -1 - ) - } return ( - this.workerNodes.findIndex( - workerNode => - workerNode.info.ready && workerNode.usage.tasks.executing === 0 - ) === -1 + this.workerNodes.reduce( + (accumulator, _, workerNodeKey) => + this.isWorkerNodeIdle(workerNodeKey) ? accumulator + 1 : accumulator, + 0 + ) === 0 ) } @@ -1166,12 +1155,9 @@ export abstract class AbstractPool< // eslint-disable-next-line @typescript-eslint/no-non-null-assertion this.promiseResponseMap.delete(taskId!) if (this.opts.enableTasksQueue === true && !this.destroying) { - const workerNodeTasksUsage = workerNode.usage.tasks if ( - this.tasksQueueSize(workerNodeKey) > 0 && - workerNodeTasksUsage.executing < - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - this.opts.tasksQueueOptions!.concurrency! + !this.isWorkerNodeBusy(workerNodeKey) && + this.tasksQueueSize(workerNodeKey) > 0 ) { // eslint-disable-next-line @typescript-eslint/no-non-null-assertion this.executeTask(workerNodeKey, this.dequeueTask(workerNodeKey)!) @@ -1305,22 +1291,30 @@ export abstract class AbstractPool< private isWorkerNodeBusy (workerNodeKey: number): boolean { if (this.opts.enableTasksQueue === true) { return ( + this.workerNodes[workerNodeKey].info.ready && this.workerNodes[workerNodeKey].usage.tasks.executing >= - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - this.opts.tasksQueueOptions!.concurrency! + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this.opts.tasksQueueOptions!.concurrency! ) } - return this.workerNodes[workerNodeKey].usage.tasks.executing > 0 + return ( + this.workerNodes[workerNodeKey].info.ready && + this.workerNodes[workerNodeKey].usage.tasks.executing > 0 + ) } private isWorkerNodeIdle (workerNodeKey: number): boolean { if (this.opts.enableTasksQueue === true) { return ( + this.workerNodes[workerNodeKey].info.ready && this.workerNodes[workerNodeKey].usage.tasks.executing === 0 && this.tasksQueueSize(workerNodeKey) === 0 ) } - return this.workerNodes[workerNodeKey].usage.tasks.executing === 0 + return ( + this.workerNodes[workerNodeKey].info.ready && + this.workerNodes[workerNodeKey].usage.tasks.executing === 0 + ) } private redistributeQueuedTasks (sourceWorkerNodeKey: number): void { @@ -2295,9 +2289,6 @@ export abstract class AbstractPool< if (!this.started) { return false } - if (this.empty) { - return true - } return ( this.workerNodes.reduce( (accumulator, workerNode) => -- 2.34.1