From db0e38eed9f8945c38ff579f5b33cb4bc44f62a1 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Fri, 18 Aug 2023 18:41:25 +0200 Subject: [PATCH] fix: ensure task function worker usage is updated if there's at least 2 task functions MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- CHANGELOG.md | 1 + src/pools/abstract-pool.ts | 32 +++++++++++++++++++------------- src/pools/worker-node.ts | 26 +++++++++++++++----------- src/pools/worker.ts | 2 +- 4 files changed, 36 insertions(+), 25 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 05f5c7a9..3fb91510 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Fix race condition between ready and task functions worker message handling at startup. - Fix duplicate task usage statistics computation per task function. +- Update task function worker usage statistics if and only if there's at least two different task functions. ### Added diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 64fda254..06d09e6b 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -799,12 +799,12 @@ export abstract class AbstractPool< const workerUsage = this.workerNodes[workerNodeKey].usage ++workerUsage.tasks.executing this.updateWaitTimeWorkerUsage(workerUsage, task) - if (this.canUpdateTaskWorkerUsage(workerNodeKey)) { - const taskWorkerUsage = this.workerNodes[ + if (this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey)) { + const taskFunctionWorkerUsage = this.workerNodes[ workerNodeKey - ].getTaskWorkerUsage(task.name as string) as WorkerUsage - ++taskWorkerUsage.tasks.executing - this.updateWaitTimeWorkerUsage(taskWorkerUsage, task) + ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage + ++taskFunctionWorkerUsage.tasks.executing + this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task) } } @@ -823,23 +823,29 @@ export abstract class AbstractPool< this.updateTaskStatisticsWorkerUsage(workerUsage, message) this.updateRunTimeWorkerUsage(workerUsage, message) this.updateEluWorkerUsage(workerUsage, message) - if (this.canUpdateTaskWorkerUsage(workerNodeKey)) { - const taskWorkerUsage = this.workerNodes[ + if (this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey)) { + const taskFunctionWorkerUsage = this.workerNodes[ workerNodeKey - ].getTaskWorkerUsage( + ].getTaskFunctionWorkerUsage( message.taskPerformance?.name ?? DEFAULT_TASK_NAME ) as WorkerUsage - this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message) - this.updateRunTimeWorkerUsage(taskWorkerUsage, message) - this.updateEluWorkerUsage(taskWorkerUsage, message) + this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message) + this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message) + this.updateEluWorkerUsage(taskFunctionWorkerUsage, message) } } - private canUpdateTaskWorkerUsage (workerNodeKey: number): boolean { + /** + * Whether the worker node shall update its task function worker usage or not. + * + * @param workerNodeKey - The worker node key. + * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise. + */ + private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean { const workerInfo = this.getWorkerInfo(workerNodeKey) return ( Array.isArray(workerInfo.taskFunctions) && - workerInfo.taskFunctions.length > 1 + workerInfo.taskFunctions.length > 2 ) } diff --git a/src/pools/worker-node.ts b/src/pools/worker-node.ts index 812d1444..7667dd16 100644 --- a/src/pools/worker-node.ts +++ b/src/pools/worker-node.ts @@ -28,7 +28,7 @@ implements IWorkerNode { public messageChannel?: MessageChannel /** @inheritdoc */ public usage: WorkerUsage - private readonly tasksUsage: Map + private readonly taskFunctionsUsage: Map private readonly tasksQueue: Queue> private readonly tasksQueueBackPressureSize: number @@ -46,7 +46,7 @@ implements IWorkerNode { this.messageChannel = new MessageChannel() } this.usage = this.initWorkerUsage() - this.tasksUsage = new Map() + this.taskFunctionsUsage = new Map() this.tasksQueue = new Queue>() this.tasksQueueBackPressureSize = Math.pow(poolMaxSize, 2) } @@ -88,7 +88,7 @@ implements IWorkerNode { /** @inheritdoc */ public resetUsage (): void { this.usage = this.initWorkerUsage() - this.tasksUsage.clear() + this.taskFunctionsUsage.clear() } /** @inheritdoc */ @@ -103,23 +103,27 @@ implements IWorkerNode { } /** @inheritdoc */ - public getTaskWorkerUsage (name: string): WorkerUsage | undefined { + public getTaskFunctionWorkerUsage (name: string): WorkerUsage | undefined { if (!Array.isArray(this.info.taskFunctions)) { throw new Error( - `Cannot get task worker usage for task function name '${name}' when task function names list is not yet defined` + `Cannot get task function worker usage for task function name '${name}' when task function names list is not yet defined` ) } if ( - name === DEFAULT_TASK_NAME && Array.isArray(this.info.taskFunctions) && - this.info.taskFunctions.length > 1 + this.info.taskFunctions.length < 3 ) { + throw new Error( + `Cannot get task function worker usage for task function name '${name}' when task function names list has less than 3 elements` + ) + } + if (name === DEFAULT_TASK_NAME) { name = this.info.taskFunctions[1] } - if (!this.tasksUsage.has(name)) { - this.tasksUsage.set(name, this.initTaskWorkerUsage(name)) + if (!this.taskFunctionsUsage.has(name)) { + this.taskFunctionsUsage.set(name, this.initTaskFunctionWorkerUsage(name)) } - return this.tasksUsage.get(name) + return this.taskFunctionsUsage.get(name) } private initWorkerInfo (worker: Worker, workerType: WorkerType): WorkerInfo { @@ -167,7 +171,7 @@ implements IWorkerNode { } } - private initTaskWorkerUsage (name: string): WorkerUsage { + private initTaskFunctionWorkerUsage (name: string): WorkerUsage { const getTaskQueueSize = (): number => { let taskQueueSize = 0 for (const task of this.tasksQueue) { diff --git a/src/pools/worker.ts b/src/pools/worker.ts index d6c23659..0bdda982 100644 --- a/src/pools/worker.ts +++ b/src/pools/worker.ts @@ -262,5 +262,5 @@ export interface IWorkerNode { * @param name - The task function name. * @returns The task function worker usage statistics if the task function worker usage statistics are initialized, `undefined` otherwise. */ - readonly getTaskWorkerUsage: (name: string) => WorkerUsage | undefined + readonly getTaskFunctionWorkerUsage: (name: string) => WorkerUsage | undefined } -- 2.34.1