From b558f6b5a5625753de41024325e40e1cbd03eda1 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Thu, 17 Aug 2023 00:50:42 +0200 Subject: [PATCH] fix: avoid duplicate per task function name usage statistics MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- src/pools/abstract-pool.ts | 66 ++++++++++++++++++++++------------- src/pools/worker-node.ts | 9 +++++ src/pools/worker.ts | 4 +++ src/worker/abstract-worker.ts | 19 +++++++++- 4 files changed, 72 insertions(+), 26 deletions(-) diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 6d71e87e..25c0eba8 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -92,10 +92,6 @@ export abstract class AbstractPool< * The start timestamp of the pool. */ private readonly startTimestamp - /** - * The task function names. - */ - private taskFunctions!: string[] /** * Constructs a new poolifier pool. @@ -650,8 +646,11 @@ export abstract class AbstractPool< /** @inheritDoc */ public listTaskFunctions (): string[] { - if (this.taskFunctions != null) { - return this.taskFunctions + if ( + Array.isArray(this.getWorkerInfo(0).taskFunctions) && + (this.getWorkerInfo(0).taskFunctions as string[]).length > 0 + ) { + return this.getWorkerInfo(0).taskFunctions as string[] } else { return [] } @@ -674,20 +673,22 @@ export abstract class AbstractPool< ) { reject(new TypeError('name argument must not be an empty string')) } + if (transferList != null && !Array.isArray(transferList)) { + reject(new TypeError('transferList argument must be an array')) + } + const timestamp = performance.now() + const workerNodeKey = this.chooseWorkerNode() if ( name != null && - this.taskFunctions != null && - !this.taskFunctions.includes(name) + Array.isArray(this.getWorkerInfo(workerNodeKey).taskFunctions) && + !(this.getWorkerInfo(workerNodeKey).taskFunctions as string[]).includes( + name + ) ) { reject( new Error(`Task function '${name}' is not registered in the pool`) ) } - if (transferList != null && !Array.isArray(transferList)) { - reject(new TypeError('transferList argument must be an array')) - } - const timestamp = performance.now() - const workerNodeKey = this.chooseWorkerNode() const task: Task = { name: name ?? DEFAULT_TASK_NAME, // eslint-disable-next-line @typescript-eslint/consistent-type-assertions @@ -778,11 +779,13 @@ export abstract class AbstractPool< const workerUsage = this.workerNodes[workerNodeKey].usage ++workerUsage.tasks.executing this.updateWaitTimeWorkerUsage(workerUsage, task) - const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage( - task.name as string - ) as WorkerUsage - ++taskWorkerUsage.tasks.executing - this.updateWaitTimeWorkerUsage(taskWorkerUsage, task) + if (this.canUpdateTaskWorkerUsage(workerNodeKey)) { + const taskWorkerUsage = this.workerNodes[ + workerNodeKey + ].getTaskWorkerUsage(task.name as string) as WorkerUsage + ++taskWorkerUsage.tasks.executing + this.updateWaitTimeWorkerUsage(taskWorkerUsage, task) + } } /** @@ -800,12 +803,23 @@ export abstract class AbstractPool< this.updateTaskStatisticsWorkerUsage(workerUsage, message) this.updateRunTimeWorkerUsage(workerUsage, message) this.updateEluWorkerUsage(workerUsage, message) - const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage( - message.taskPerformance?.name ?? DEFAULT_TASK_NAME - ) as WorkerUsage - this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message) - this.updateRunTimeWorkerUsage(taskWorkerUsage, message) - this.updateEluWorkerUsage(taskWorkerUsage, message) + if (this.canUpdateTaskWorkerUsage(workerNodeKey)) { + const taskWorkerUsage = this.workerNodes[ + workerNodeKey + ].getTaskWorkerUsage( + message.taskPerformance?.name ?? DEFAULT_TASK_NAME + ) as WorkerUsage + this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message) + this.updateRunTimeWorkerUsage(taskWorkerUsage, message) + this.updateEluWorkerUsage(taskWorkerUsage, message) + } + } + + private canUpdateTaskWorkerUsage (workerNodeKey: number): boolean { + return ( + Array.isArray(this.getWorkerInfo(workerNodeKey).taskFunctions) && + (this.getWorkerInfo(workerNodeKey).taskFunctions as string[]).length > 1 + ) } private updateTaskStatisticsWorkerUsage ( @@ -1119,7 +1133,9 @@ export abstract class AbstractPool< this.handleTaskExecutionResponse(message) } else if (message.taskFunctions != null) { // Task functions message received from worker - this.taskFunctions = message.taskFunctions + this.getWorkerInfo( + this.getWorkerNodeKeyByWorkerId(message.workerId) + ).taskFunctions = message.taskFunctions } } } diff --git a/src/pools/worker-node.ts b/src/pools/worker-node.ts index 3b69d385..923fe9b2 100644 --- a/src/pools/worker-node.ts +++ b/src/pools/worker-node.ts @@ -2,6 +2,7 @@ import { MessageChannel } from 'node:worker_threads' import { CircularArray } from '../circular-array' import { Queue } from '../queue' import type { Task } from '../utility-types' +import { DEFAULT_TASK_NAME } from '../utils' import { type IWorker, type IWorkerNode, @@ -22,6 +23,7 @@ implements IWorkerNode { public readonly worker: Worker public readonly info: WorkerInfo public usage: WorkerUsage + public taskFunctions!: string[] private readonly tasksUsage: Map private readonly tasksQueue: Queue> @@ -87,6 +89,13 @@ implements IWorkerNode { /** @inheritdoc */ public getTaskWorkerUsage (name: string): WorkerUsage | undefined { + if ( + name === DEFAULT_TASK_NAME && + Array.isArray(this.taskFunctions) && + this.taskFunctions.length > 1 + ) { + name = this.taskFunctions[1] + } if (!this.tasksUsage.has(name)) { this.tasksUsage.set(name, this.initTaskWorkerUsage(name)) } diff --git a/src/pools/worker.ts b/src/pools/worker.ts index 5dbb6ac5..5ac90fbf 100644 --- a/src/pools/worker.ts +++ b/src/pools/worker.ts @@ -137,6 +137,10 @@ export interface WorkerInfo { * Ready flag. */ ready: boolean + /** + * Task function names. + */ + taskFunctions?: string[] /** * Message channel. */ diff --git a/src/worker/abstract-worker.ts b/src/worker/abstract-worker.ts index 98b32fe4..44c392bc 100644 --- a/src/worker/abstract-worker.ts +++ b/src/worker/abstract-worker.ts @@ -265,7 +265,24 @@ export abstract class AbstractWorker< * @returns The names of the worker's task functions. */ public listTaskFunctions (): string[] { - return [...this.taskFunctions.keys()] + const names: string[] = [...this.taskFunctions.keys()] + let defaultTaskFunctionName: string = DEFAULT_TASK_NAME + for (const [name, fn] of this.taskFunctions) { + if ( + name !== DEFAULT_TASK_NAME && + fn === this.taskFunctions.get(DEFAULT_TASK_NAME) + ) { + defaultTaskFunctionName = name + break + } + } + return [ + names[names.indexOf(DEFAULT_TASK_NAME)], + defaultTaskFunctionName, + ...names.filter( + (name) => name !== DEFAULT_TASK_NAME && name !== defaultTaskFunctionName + ) + ] } /** -- 2.34.1