From ff128cc9dc9cee365f55b15721d5b90707577a0a Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Sun, 9 Jul 2023 15:37:28 +0200 Subject: [PATCH] feat: add initial infrastructure to track per task statistics MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- src/pools/abstract-pool.ts | 17 +++++++++++++++-- src/pools/worker-node.ts | 12 ++++++++++++ src/pools/worker.ts | 4 ++++ src/utility-types.ts | 6 +++++- src/utils.ts | 5 +++++ src/worker/abstract-worker.ts | 20 +++++++++++++------- tests/pools/cluster/fixed.test.js | 2 ++ tests/pools/thread/fixed.test.js | 2 ++ 8 files changed, 58 insertions(+), 10 deletions(-) diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index c699b528..bb37dbb7 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -2,6 +2,7 @@ import { randomUUID } from 'node:crypto' import { performance } from 'node:perf_hooks' import type { MessageValue, PromiseResponseWrapper } from '../utility-types' import { + DEFAULT_TASK_NAME, DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, EMPTY_FUNCTION, isKillBehavior, @@ -588,7 +589,7 @@ export abstract class AbstractPool< const timestamp = performance.now() const workerNodeKey = this.chooseWorkerNode() const submittedTask: Task = { - name, + name: name ?? DEFAULT_TASK_NAME, // eslint-disable-next-line @typescript-eslint/consistent-type-assertions data: data ?? ({} as Data), timestamp, @@ -671,6 +672,11 @@ export abstract class AbstractPool< const workerUsage = this.workerNodes[workerNodeKey].usage ++workerUsage.tasks.executing this.updateWaitTimeWorkerUsage(workerUsage, task) + const tasksWorkerUsage = this.workerNodes[ + workerNodeKey + ].getTasksWorkerUsage(task.name as string) as WorkerUsage + ++tasksWorkerUsage.tasks.executing + this.updateWaitTimeWorkerUsage(tasksWorkerUsage, task) } /** @@ -684,10 +690,17 @@ export abstract class AbstractPool< worker: Worker, message: MessageValue ): void { - const workerUsage = this.workerNodes[this.getWorkerNodeKey(worker)].usage + const workerNodeKey = this.getWorkerNodeKey(worker) + const workerUsage = this.workerNodes[workerNodeKey].usage this.updateTaskStatisticsWorkerUsage(workerUsage, message) this.updateRunTimeWorkerUsage(workerUsage, message) this.updateEluWorkerUsage(workerUsage, message) + const tasksWorkerUsage = this.workerNodes[ + workerNodeKey + ].getTasksWorkerUsage(message.name as string) as WorkerUsage + this.updateTaskStatisticsWorkerUsage(tasksWorkerUsage, message) + this.updateRunTimeWorkerUsage(tasksWorkerUsage, message) + this.updateEluWorkerUsage(tasksWorkerUsage, message) } private updateTaskStatisticsWorkerUsage ( diff --git a/src/pools/worker-node.ts b/src/pools/worker-node.ts index bbca7913..7a7fc237 100644 --- a/src/pools/worker-node.ts +++ b/src/pools/worker-node.ts @@ -21,6 +21,7 @@ implements IWorkerNode { public readonly worker: Worker public readonly info: WorkerInfo public usage: WorkerUsage + private readonly tasksUsage: Map private readonly tasksQueue: Queue> /** @@ -33,6 +34,7 @@ implements IWorkerNode { this.worker = worker this.info = this.initWorkerInfo(worker, workerType) this.usage = this.initWorkerUsage() + this.tasksUsage = new Map() this.tasksQueue = new Queue>() } @@ -65,8 +67,18 @@ implements IWorkerNode { this.tasksQueue.clear() } + /** @inheritdoc */ public resetUsage (): void { this.usage = this.initWorkerUsage() + this.tasksUsage.clear() + } + + /** @inheritdoc */ + public getTasksWorkerUsage (name: string): WorkerUsage | undefined { + if (!this.tasksUsage.has(name)) { + this.tasksUsage.set(name, this.initWorkerUsage()) + } + return this.tasksUsage.get(name) } private initWorkerInfo (worker: Worker, workerType: WorkerType): WorkerInfo { diff --git a/src/pools/worker.ts b/src/pools/worker.ts index c6303221..c0f84ce8 100644 --- a/src/pools/worker.ts +++ b/src/pools/worker.ts @@ -265,4 +265,8 @@ export interface IWorkerNode { * Worker node reset usage statistics . */ readonly resetUsage: () => void + /** + * Worker node get tasks usage statistics. + */ + readonly getTasksWorkerUsage: (name: string) => WorkerUsage | undefined } diff --git a/src/utility-types.ts b/src/utility-types.ts index d7477319..fa1c0ffe 100644 --- a/src/utility-types.ts +++ b/src/utility-types.ts @@ -8,12 +8,16 @@ import type { IWorker, Task } from './pools/worker' * @typeParam Data - Type of data sent to the worker triggering an error. This can only be structured-cloneable data. */ export interface TaskError { + /** + * Task name triggering the error. + */ + readonly name: string /** * Error message. */ readonly message: string /** - * Data passed to the worker triggering the error. + * Data triggering the error. */ readonly data?: Data } diff --git a/src/utils.ts b/src/utils.ts index 128b65da..0becc5ed 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -5,6 +5,11 @@ import type { } from './pools/selection-strategies/selection-strategies-types' import type { KillBehavior } from './worker/worker-options' +/** + * Default task name. + */ +export const DEFAULT_TASK_NAME = 'default' + /** * An intentional empty function. */ diff --git a/src/worker/abstract-worker.ts b/src/worker/abstract-worker.ts index a0154115..1eac7427 100644 --- a/src/worker/abstract-worker.ts +++ b/src/worker/abstract-worker.ts @@ -7,7 +7,12 @@ import type { TaskPerformance, WorkerStatistics } from '../utility-types' -import { EMPTY_FUNCTION, isAsyncFunction, isPlainObject } from '../utils' +import { + DEFAULT_TASK_NAME, + EMPTY_FUNCTION, + isAsyncFunction, + isPlainObject +} from '../utils' import { type KillBehavior, KillBehaviors, @@ -20,7 +25,6 @@ import type { WorkerSyncFunction } from './worker-functions' -const DEFAULT_FUNCTION_NAME = 'default' const DEFAULT_MAX_INACTIVE_TIME = 60000 const DEFAULT_KILL_BEHAVIOR: KillBehavior = KillBehaviors.SOFT @@ -114,7 +118,7 @@ export abstract class AbstractWorker< } this.taskFunctions = new Map>() if (typeof taskFunctions === 'function') { - this.taskFunctions.set(DEFAULT_FUNCTION_NAME, taskFunctions.bind(this)) + this.taskFunctions.set(DEFAULT_TASK_NAME, taskFunctions.bind(this)) } else if (isPlainObject(taskFunctions)) { let firstEntry = true for (const [name, fn] of Object.entries(taskFunctions)) { @@ -125,7 +129,7 @@ export abstract class AbstractWorker< } this.taskFunctions.set(name, fn.bind(this)) if (firstEntry) { - this.taskFunctions.set(DEFAULT_FUNCTION_NAME, fn.bind(this)) + this.taskFunctions.set(DEFAULT_TASK_NAME, fn.bind(this)) firstEntry = false } } @@ -264,6 +268,7 @@ export abstract class AbstractWorker< const errorMessage = this.handleError(e as Error | string) this.sendToMainWorker({ taskError: { + name: message.name ?? DEFAULT_TASK_NAME, message: errorMessage, data: message.data }, @@ -303,6 +308,7 @@ export abstract class AbstractWorker< const errorMessage = this.handleError(e as Error | string) this.sendToMainWorker({ taskError: { + name: message.name ?? DEFAULT_TASK_NAME, message: errorMessage, data: message.data }, @@ -321,10 +327,10 @@ export abstract class AbstractWorker< /** * Gets the task function in the given scope. * - * @param name - Name of the function that will be returned. + * @param name - Name of the task function that will be returned. */ private getTaskFunction (name?: string): WorkerFunction { - name = name ?? DEFAULT_FUNCTION_NAME + name = name ?? DEFAULT_TASK_NAME const fn = this.taskFunctions.get(name) if (fn == null) { throw new Error(`Task function '${name}' not found`) @@ -335,7 +341,7 @@ export abstract class AbstractWorker< private beginTaskPerformance (name?: string): TaskPerformance { this.checkStatistics() return { - name: name ?? DEFAULT_FUNCTION_NAME, + name: name ?? DEFAULT_TASK_NAME, timestamp: performance.now(), ...(this.statistics.elu && { elu: performance.eventLoopUtilization() }) } diff --git a/tests/pools/cluster/fixed.test.js b/tests/pools/cluster/fixed.test.js index c638663f..1d0f7b8c 100644 --- a/tests/pools/cluster/fixed.test.js +++ b/tests/pools/cluster/fixed.test.js @@ -161,6 +161,7 @@ describe('Fixed cluster pool test suite', () => { expect(typeof inError === 'string').toBe(true) expect(inError).toBe('Error Message from ClusterWorker') expect(taskError).toStrictEqual({ + name: 'default', message: 'Error Message from ClusterWorker', data }) @@ -187,6 +188,7 @@ describe('Fixed cluster pool test suite', () => { expect(typeof inError === 'string').toBe(true) expect(inError).toBe('Error Message from ClusterWorker:async') expect(taskError).toStrictEqual({ + name: 'default', message: 'Error Message from ClusterWorker:async', data }) diff --git a/tests/pools/thread/fixed.test.js b/tests/pools/thread/fixed.test.js index b2842f6e..de81cb68 100644 --- a/tests/pools/thread/fixed.test.js +++ b/tests/pools/thread/fixed.test.js @@ -163,6 +163,7 @@ describe('Fixed thread pool test suite', () => { expect(typeof inError.message === 'string').toBe(true) expect(inError.message).toBe('Error Message from ThreadWorker') expect(taskError).toStrictEqual({ + name: 'default', message: new Error('Error Message from ThreadWorker'), data }) @@ -191,6 +192,7 @@ describe('Fixed thread pool test suite', () => { expect(typeof inError.message === 'string').toBe(true) expect(inError.message).toBe('Error Message from ThreadWorker:async') expect(taskError).toStrictEqual({ + name: 'default', message: new Error('Error Message from ThreadWorker:async'), data }) -- 2.34.1