From: Jérôme Benoit Date: Wed, 7 Jun 2023 19:53:49 +0000 (+0200) Subject: refactor: use task performance data structure in messages X-Git-Tag: v2.6.0~22 X-Git-Url: https://git.piment-noir.org/?a=commitdiff_plain;h=d715b7bc2973eed82edebb1f7d233d451ad3c97b;p=poolifier.git refactor: use task performance data structure in messages Signed-off-by: Jérôme Benoit --- diff --git a/src/index.ts b/src/index.ts index 0d949157..1895d348 100644 --- a/src/index.ts +++ b/src/index.ts @@ -39,7 +39,7 @@ export { type ThreadPoolOptions, type ThreadWorkerWithMessageChannel } from './pools/thread/fixed' -export type { AbstractWorker, TaskPerformance } from './worker/abstract-worker' +export type { AbstractWorker } from './worker/abstract-worker' export { ClusterWorker } from './worker/cluster-worker' export { ThreadWorker } from './worker/thread-worker' export { KillBehaviors } from './worker/worker-options' @@ -54,6 +54,7 @@ export type { Draft, MessageValue, PromiseResponseWrapper, + TaskPerformance, WorkerStatistics } from './utility-types' export type { CircularArray } from './circular-array' diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index b93915c4..85a47e38 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -485,7 +485,7 @@ export abstract class AbstractPool< message: MessageValue ): void { if (this.workerChoiceStrategyContext.getTaskStatistics().runTime) { - workerTasksUsage.runTime += message.runTime ?? 0 + workerTasksUsage.runTime += message.taskPerformance?.runTime ?? 0 if ( this.workerChoiceStrategyContext.getTaskStatistics().avgRunTime && workerTasksUsage.ran !== 0 @@ -495,9 +495,9 @@ export abstract class AbstractPool< } if ( this.workerChoiceStrategyContext.getTaskStatistics().medRunTime && - message.runTime != null + message.taskPerformance?.runTime != null ) { - workerTasksUsage.runTimeHistory.push(message.runTime) + workerTasksUsage.runTimeHistory.push(message.taskPerformance.runTime) workerTasksUsage.medRunTime = median(workerTasksUsage.runTimeHistory) } } @@ -508,7 +508,7 @@ export abstract class AbstractPool< message: MessageValue ): void { if (this.workerChoiceStrategyContext.getTaskStatistics().waitTime) { - workerTasksUsage.waitTime += message.waitTime ?? 0 + workerTasksUsage.waitTime += message.taskPerformance?.waitTime ?? 0 if ( this.workerChoiceStrategyContext.getTaskStatistics().avgWaitTime && workerTasksUsage.ran !== 0 @@ -518,9 +518,9 @@ export abstract class AbstractPool< } if ( this.workerChoiceStrategyContext.getTaskStatistics().medWaitTime && - message.waitTime != null + message.taskPerformance?.waitTime != null ) { - workerTasksUsage.waitTimeHistory.push(message.waitTime) + workerTasksUsage.waitTimeHistory.push(message.taskPerformance.waitTime) workerTasksUsage.medWaitTime = median(workerTasksUsage.waitTimeHistory) } } @@ -531,15 +531,21 @@ export abstract class AbstractPool< message: MessageValue ): void { if (this.workerChoiceStrategyContext.getTaskStatistics().elu) { - if (workerTasksUsage.elu != null && message.elu != null) { + if ( + workerTasksUsage.elu != null && + message.taskPerformance?.elu != null + ) { workerTasksUsage.elu = { - idle: workerTasksUsage.elu.idle + message.elu.idle, - active: workerTasksUsage.elu.active + message.elu.active, + idle: workerTasksUsage.elu.idle + message.taskPerformance.elu.idle, + active: + workerTasksUsage.elu.active + message.taskPerformance.elu.active, utilization: - (workerTasksUsage.elu.utilization + message.elu.utilization) / 2 + (workerTasksUsage.elu.utilization + + message.taskPerformance.elu.utilization) / + 2 } - } else if (message.elu != null) { - workerTasksUsage.elu = message.elu + } else if (message.taskPerformance?.elu != null) { + workerTasksUsage.elu = message.taskPerformance.elu } } } diff --git a/src/utility-types.ts b/src/utility-types.ts index 9cdf6b0a..faeda0a5 100644 --- a/src/utility-types.ts +++ b/src/utility-types.ts @@ -11,6 +11,28 @@ import type { IWorker, Task } from './pools/worker' */ export type Draft = { -readonly [P in keyof T]?: T[P] } +/** + * Task performance. + */ +export interface TaskPerformance { + /** + * Task performance timestamp. + */ + timestamp: number + /** + * Task runtime. + */ + runTime?: number + /** + * Task wait time. + */ + waitTime?: number + /** + * Task event loop utilization. + */ + elu?: EventLoopUtilization +} + /** * Performance statistics computation. */ @@ -44,17 +66,9 @@ export interface MessageValue< */ readonly errorData?: unknown /** - * Runtime. - */ - readonly runTime?: number - /** - * Wait time. - */ - readonly waitTime?: number - /** - * Event loop utilization. + * Task performance. */ - readonly elu?: EventLoopUtilization + readonly taskPerformance?: TaskPerformance /** * Reference to main worker. */ diff --git a/src/worker/abstract-worker.ts b/src/worker/abstract-worker.ts index b7c542aa..7805514a 100644 --- a/src/worker/abstract-worker.ts +++ b/src/worker/abstract-worker.ts @@ -1,8 +1,12 @@ import { AsyncResource } from 'node:async_hooks' import type { Worker } from 'node:cluster' import type { MessagePort } from 'node:worker_threads' -import { type EventLoopUtilization, performance } from 'node:perf_hooks' -import type { MessageValue, WorkerStatistics } from '../utility-types' +import { performance } from 'node:perf_hooks' +import type { + MessageValue, + TaskPerformance, + WorkerStatistics +} from '../utility-types' import { EMPTY_FUNCTION, isPlainObject } from '../utils' import { type KillBehavior, @@ -20,16 +24,6 @@ const DEFAULT_FUNCTION_NAME = 'default' const DEFAULT_MAX_INACTIVE_TIME = 60000 const DEFAULT_KILL_BEHAVIOR: KillBehavior = KillBehaviors.SOFT -/** - * Task performance. - */ -export interface TaskPerformance { - timestamp: number - waitTime?: number - runTime?: number - elu?: EventLoopUtilization -} - /** * Base class that implements some shared logic for all poolifier workers. * @@ -226,15 +220,12 @@ export abstract class AbstractWorker< message: MessageValue ): void { try { - const taskPerformance = this.beginTaskPerformance(message) + let taskPerformance = this.beginTaskPerformance(message) const res = fn(message.data) - const { runTime, waitTime, elu } = - this.endTaskPerformance(taskPerformance) + taskPerformance = this.endTaskPerformance(taskPerformance) this.sendToMainWorker({ data: res, - runTime, - waitTime, - elu, + taskPerformance, id: message.id }) } catch (e) { @@ -259,16 +250,13 @@ export abstract class AbstractWorker< fn: WorkerAsyncFunction, message: MessageValue ): void { - const taskPerformance = this.beginTaskPerformance(message) + let taskPerformance = this.beginTaskPerformance(message) fn(message.data) .then(res => { - const { runTime, waitTime, elu } = - this.endTaskPerformance(taskPerformance) + taskPerformance = this.endTaskPerformance(taskPerformance) this.sendToMainWorker({ data: res, - runTime, - waitTime, - elu, + taskPerformance, id: message.id }) return null