From f1f77f45c0682a4e1a068c292199006bc4f0c657 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Sat, 25 Nov 2023 13:35:50 +0100 Subject: [PATCH] fix: account stolen tasks on a per task function basis properly MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- src/pools/abstract-pool.ts | 63 ++++++++++++++++++++++++++++++-------- 1 file changed, 51 insertions(+), 12 deletions(-) diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index ab319c62..1759d03e 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -35,6 +35,7 @@ import { import type { IWorker, IWorkerNode, + TaskStatistics, WorkerInfo, WorkerNodeEventDetail, WorkerType, @@ -1481,13 +1482,19 @@ export abstract class AbstractPool< } private updateTaskSequentiallyStolenStatisticsWorkerUsage ( - workerNodeKey: number, - taskName: string + workerNodeKey: number ): void { const workerNode = this.workerNodes[workerNodeKey] if (workerNode?.usage != null) { ++workerNode.usage.tasks.sequentiallyStolen } + } + + private updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage ( + workerNodeKey: number, + taskName: string + ): void { + const workerNode = this.workerNodes[workerNodeKey] if ( this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && workerNode.getTaskFunctionWorkerUsage(taskName) != null @@ -1500,13 +1507,19 @@ export abstract class AbstractPool< } private resetTaskSequentiallyStolenStatisticsWorkerUsage ( - workerNodeKey: number, - taskName: string + workerNodeKey: number ): void { const workerNode = this.workerNodes[workerNodeKey] if (workerNode?.usage != null) { workerNode.usage.tasks.sequentiallyStolen = 0 } + } + + private resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage ( + workerNodeKey: number, + taskName: string + ): void { + const workerNode = this.workerNodes[workerNodeKey] if ( this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && workerNode.getTaskFunctionWorkerUsage(taskName) != null @@ -1535,13 +1548,42 @@ export abstract class AbstractPool< (workerNodeTasksUsage.executing > 0 || this.tasksQueueSize(workerNodeKey) > 0) ) { - this.resetTaskSequentiallyStolenStatisticsWorkerUsage( - workerNodeKey, - previousStolenTask.name as string - ) + for (const taskName of this.workerNodes[workerNodeKey].info + .taskFunctionNames as string[]) { + this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage( + workerNodeKey, + taskName + ) + } + this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey) return } const stolenTask = this.workerNodeStealTask(workerNodeKey) + if ( + this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && + stolenTask != null + ) { + const taskFunctionTasksWorkerUsage = this.workerNodes[ + workerNodeKey + ].getTaskFunctionWorkerUsage(stolenTask.name as string) + ?.tasks as TaskStatistics + if ( + taskFunctionTasksWorkerUsage.sequentiallyStolen === 0 || + (previousStolenTask != null && + previousStolenTask.name === stolenTask.name && + taskFunctionTasksWorkerUsage.sequentiallyStolen > 0) + ) { + this.updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage( + workerNodeKey, + stolenTask.name as string + ) + } else { + this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage( + workerNodeKey, + stolenTask.name as string + ) + } + } sleep(exponentialDelay(workerNodeTasksUsage.sequentiallyStolen)) .then(() => { this.handleIdleWorkerNodeEvent(event, stolenTask) @@ -1572,10 +1614,7 @@ export abstract class AbstractPool< } else { this.enqueueTask(workerNodeKey, task) } - this.updateTaskSequentiallyStolenStatisticsWorkerUsage( - workerNodeKey, - task.name as string - ) + this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey) this.updateTaskStolenStatisticsWorkerUsage( workerNodeKey, task.name as string -- 2.34.1