From b6b3245344bd453ea91fa3d74acd5145f70d84fd Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Tue, 6 Jun 2023 19:56:29 +0200 Subject: [PATCH] feat: conditional task performance computation at the worker level MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- CHANGELOG.md | 8 +++ src/index.ts | 10 ++-- src/pools/abstract-pool.ts | 56 +++++++++++------- .../abstract-worker-choice-strategy.ts | 36 +++++------ .../fair-share-worker-choice-strategy.ts | 6 +- ...hted-round-robin-worker-choice-strategy.ts | 2 +- .../least-busy-worker-choice-strategy.ts | 6 +- .../least-used-worker-choice-strategy.ts | 2 +- .../round-robin-worker-choice-strategy.ts | 2 +- .../selection-strategies-types.ts | 6 +- ...hted-round-robin-worker-choice-strategy.ts | 6 +- .../worker-choice-strategy-context.ts | 10 ++-- src/pools/worker.ts | 4 +- src/utility-types.ts | 59 ++++--------------- src/worker/abstract-worker.ts | 44 +++++++++----- src/worker/cluster-worker.ts | 7 +-- src/worker/thread-worker.ts | 7 +-- src/worker/worker-functions.ts | 45 ++++++++++++++ tests/pools/abstract/abstract-pool.test.js | 12 +--- .../selection-strategies.test.js | 48 ++++----------- .../worker-choice-strategy-context.test.js | 16 ++--- 21 files changed, 201 insertions(+), 191 deletions(-) create mode 100644 src/worker/worker-functions.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index d8dc4c7a..dc94cb94 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- Add Event Loop Utilization (ELU) statistics to worker tasks usage. + +### Changed + +- Compute statistics at the worker level only if needed. + ## [2.5.3] - 2023-06-04 ### Changed diff --git a/src/index.ts b/src/index.ts index 4431b0f9..993cbe93 100644 --- a/src/index.ts +++ b/src/index.ts @@ -26,7 +26,7 @@ export type { export { WorkerChoiceStrategies } from './pools/selection-strategies/selection-strategies-types' export type { IWorkerChoiceStrategy, - RequiredStatistics, + TaskStatistics, WorkerChoiceStrategy, WorkerChoiceStrategyOptions } from './pools/selection-strategies/selection-strategies-types' @@ -40,13 +40,15 @@ export { ThreadWorker } from './worker/thread-worker' export { KillBehaviors } from './worker/worker-options' export type { KillBehavior, WorkerOptions } from './worker/worker-options' export type { - Draft, - MessageValue, - PromiseResponseWrapper, TaskFunctions, WorkerAsyncFunction, WorkerFunction, WorkerSyncFunction +} from './worker/worker-functions' +export type { + Draft, + MessageValue, + PromiseResponseWrapper } from './utility-types' export type { CircularArray } from './circular-array' export type { Queue } from './queue' diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 065b228a..ce645cc8 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -95,12 +95,6 @@ export abstract class AbstractPool< this.enqueueTask = this.enqueueTask.bind(this) this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this) - this.setupHook() - - for (let i = 1; i <= this.numberOfWorkers; i++) { - this.createAndSetupWorker() - } - if (this.opts.enableEvents === true) { this.emitter = new PoolEmitter() } @@ -113,6 +107,12 @@ export abstract class AbstractPool< this.opts.workerChoiceStrategy, this.opts.workerChoiceStrategyOptions ) + + this.setupHook() + + for (let i = 1; i <= this.numberOfWorkers; i++) { + this.createAndSetupWorker() + } } private checkFilePath (filePath: string): void { @@ -288,6 +288,12 @@ export abstract class AbstractPool< ): void { this.checkValidWorkerChoiceStrategy(workerChoiceStrategy) this.opts.workerChoiceStrategy = workerChoiceStrategy + this.workerChoiceStrategyContext.setWorkerChoiceStrategy( + this.opts.workerChoiceStrategy + ) + if (workerChoiceStrategyOptions != null) { + this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions) + } for (const workerNode of this.workerNodes) { this.setWorkerNodeTasksUsage(workerNode, { ran: 0, @@ -303,12 +309,7 @@ export abstract class AbstractPool< error: 0, elu: undefined }) - } - this.workerChoiceStrategyContext.setWorkerChoiceStrategy( - this.opts.workerChoiceStrategy - ) - if (workerChoiceStrategyOptions != null) { - this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions) + this.setWorkerStatistics(workerNode.worker) } } @@ -380,13 +381,13 @@ export abstract class AbstractPool< /** @inheritDoc */ public async execute (data?: Data, name?: string): Promise { - const submissionTimestamp = performance.now() + const timestamp = performance.now() const workerNodeKey = this.chooseWorkerNode() const submittedTask: Task = { name, // eslint-disable-next-line @typescript-eslint/consistent-type-assertions data: data ?? ({} as Data), - submissionTimestamp, + timestamp, id: crypto.randomUUID() } const res = new Promise((resolve, reject) => { @@ -483,17 +484,17 @@ export abstract class AbstractPool< workerTasksUsage: TasksUsage, message: MessageValue ): void { - if (this.workerChoiceStrategyContext.getRequiredStatistics().runTime) { + if (this.workerChoiceStrategyContext.getTaskStatistics().runTime) { workerTasksUsage.runTime += message.runTime ?? 0 if ( - this.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime && + this.workerChoiceStrategyContext.getTaskStatistics().avgRunTime && workerTasksUsage.ran !== 0 ) { workerTasksUsage.avgRunTime = workerTasksUsage.runTime / workerTasksUsage.ran } if ( - this.workerChoiceStrategyContext.getRequiredStatistics().medRunTime && + this.workerChoiceStrategyContext.getTaskStatistics().medRunTime && message.runTime != null ) { workerTasksUsage.runTimeHistory.push(message.runTime) @@ -506,17 +507,17 @@ export abstract class AbstractPool< workerTasksUsage: TasksUsage, message: MessageValue ): void { - if (this.workerChoiceStrategyContext.getRequiredStatistics().waitTime) { + if (this.workerChoiceStrategyContext.getTaskStatistics().waitTime) { workerTasksUsage.waitTime += message.waitTime ?? 0 if ( - this.workerChoiceStrategyContext.getRequiredStatistics().avgWaitTime && + this.workerChoiceStrategyContext.getTaskStatistics().avgWaitTime && workerTasksUsage.ran !== 0 ) { workerTasksUsage.avgWaitTime = workerTasksUsage.waitTime / workerTasksUsage.ran } if ( - this.workerChoiceStrategyContext.getRequiredStatistics().medWaitTime && + this.workerChoiceStrategyContext.getTaskStatistics().medWaitTime && message.waitTime != null ) { workerTasksUsage.waitTimeHistory.push(message.waitTime) @@ -529,9 +530,8 @@ export abstract class AbstractPool< workerTasksUsage: TasksUsage, message: MessageValue ): void { - if (this.workerChoiceStrategyContext.getRequiredStatistics().elu) { + if (this.workerChoiceStrategyContext.getTaskStatistics().elu) { if (workerTasksUsage.elu != null && message.elu != null) { - // TODO: cumulative or delta? workerTasksUsage.elu = { idle: workerTasksUsage.elu.idle + message.elu.idle, active: workerTasksUsage.elu.active + message.elu.active, @@ -638,6 +638,8 @@ export abstract class AbstractPool< this.pushWorkerNode(worker) + this.setWorkerStatistics(worker) + this.afterWorkerSetup(worker) return worker @@ -800,4 +802,14 @@ export abstract class AbstractPool< this.flushTasksQueue(workerNodeKey) } } + + private setWorkerStatistics (worker: Worker): void { + this.sendToWorker(worker, { + statistics: { + runTime: this.workerChoiceStrategyContext.getTaskStatistics().runTime, + waitTime: this.workerChoiceStrategyContext.getTaskStatistics().waitTime, + elu: this.workerChoiceStrategyContext.getTaskStatistics().elu + } + }) + } } diff --git a/src/pools/selection-strategies/abstract-worker-choice-strategy.ts b/src/pools/selection-strategies/abstract-worker-choice-strategy.ts index 9b14f3c5..285a68c2 100644 --- a/src/pools/selection-strategies/abstract-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/abstract-worker-choice-strategy.ts @@ -4,7 +4,7 @@ import type { IPool } from '../pool' import type { IWorker } from '../worker' import type { IWorkerChoiceStrategy, - RequiredStatistics, + TaskStatistics, WorkerChoiceStrategyOptions } from './selection-strategies-types' @@ -25,7 +25,7 @@ export abstract class AbstractWorkerChoiceStrategy< */ private toggleFindLastFreeWorkerNodeKey: boolean = false /** @inheritDoc */ - public readonly requiredStatistics: RequiredStatistics = { + public readonly taskStatistics: TaskStatistics = { runTime: false, avgRunTime: false, medRunTime: false, @@ -48,22 +48,22 @@ export abstract class AbstractWorkerChoiceStrategy< this.choose = this.choose.bind(this) } - protected setRequiredStatistics (opts: WorkerChoiceStrategyOptions): void { - if (this.requiredStatistics.avgRunTime && opts.medRunTime === true) { - this.requiredStatistics.avgRunTime = false - this.requiredStatistics.medRunTime = opts.medRunTime as boolean + protected setTaskStatistics (opts: WorkerChoiceStrategyOptions): void { + if (this.taskStatistics.avgRunTime && opts.medRunTime === true) { + this.taskStatistics.avgRunTime = false + this.taskStatistics.medRunTime = opts.medRunTime as boolean } - if (this.requiredStatistics.medRunTime && opts.medRunTime === false) { - this.requiredStatistics.avgRunTime = true - this.requiredStatistics.medRunTime = opts.medRunTime as boolean + if (this.taskStatistics.medRunTime && opts.medRunTime === false) { + this.taskStatistics.avgRunTime = true + this.taskStatistics.medRunTime = opts.medRunTime as boolean } - if (this.requiredStatistics.avgWaitTime && opts.medWaitTime === true) { - this.requiredStatistics.avgWaitTime = false - this.requiredStatistics.medWaitTime = opts.medWaitTime as boolean + if (this.taskStatistics.avgWaitTime && opts.medWaitTime === true) { + this.taskStatistics.avgWaitTime = false + this.taskStatistics.medWaitTime = opts.medWaitTime as boolean } - if (this.requiredStatistics.medWaitTime && opts.medWaitTime === false) { - this.requiredStatistics.avgWaitTime = true - this.requiredStatistics.medWaitTime = opts.medWaitTime as boolean + if (this.taskStatistics.medWaitTime && opts.medWaitTime === false) { + this.taskStatistics.avgWaitTime = true + this.taskStatistics.medWaitTime = opts.medWaitTime as boolean } } @@ -82,7 +82,7 @@ export abstract class AbstractWorkerChoiceStrategy< /** @inheritDoc */ public setOptions (opts: WorkerChoiceStrategyOptions): void { opts = opts ?? DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS - this.setRequiredStatistics(opts) + this.setTaskStatistics(opts) this.opts = opts } @@ -109,7 +109,7 @@ export abstract class AbstractWorkerChoiceStrategy< * @returns The worker task runtime. */ protected getWorkerTaskRunTime (workerNodeKey: number): number { - return this.requiredStatistics.medRunTime + return this.taskStatistics.medRunTime ? this.pool.workerNodes[workerNodeKey].tasksUsage.medRunTime : this.pool.workerNodes[workerNodeKey].tasksUsage.avgRunTime } @@ -123,7 +123,7 @@ export abstract class AbstractWorkerChoiceStrategy< * @returns The worker task wait time. */ protected getWorkerWaitTime (workerNodeKey: number): number { - return this.requiredStatistics.medWaitTime + return this.taskStatistics.medWaitTime ? this.pool.workerNodes[workerNodeKey].tasksUsage.medWaitTime : this.pool.workerNodes[workerNodeKey].tasksUsage.avgWaitTime } diff --git a/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts b/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts index a0e7ee1a..f6fdc4af 100644 --- a/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts @@ -4,7 +4,7 @@ import type { IWorker } from '../worker' import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy' import type { IWorkerChoiceStrategy, - RequiredStatistics, + TaskStatistics, WorkerChoiceStrategyOptions } from './selection-strategies-types' @@ -24,7 +24,7 @@ export class FairShareWorkerChoiceStrategy< extends AbstractWorkerChoiceStrategy implements IWorkerChoiceStrategy { /** @inheritDoc */ - public readonly requiredStatistics: RequiredStatistics = { + public readonly taskStatistics: TaskStatistics = { runTime: true, avgRunTime: true, medRunTime: false, @@ -45,7 +45,7 @@ export class FairShareWorkerChoiceStrategy< opts: WorkerChoiceStrategyOptions = DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS ) { super(pool, opts) - this.setRequiredStatistics(this.opts) + this.setTaskStatistics(this.opts) } /** @inheritDoc */ diff --git a/src/pools/selection-strategies/interleaved-weighted-round-robin-worker-choice-strategy.ts b/src/pools/selection-strategies/interleaved-weighted-round-robin-worker-choice-strategy.ts index 7248f03a..d7d78953 100644 --- a/src/pools/selection-strategies/interleaved-weighted-round-robin-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/interleaved-weighted-round-robin-worker-choice-strategy.ts @@ -45,7 +45,7 @@ export class InterleavedWeightedRoundRobinWorkerChoiceStrategy< opts: WorkerChoiceStrategyOptions = DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS ) { super(pool, opts) - this.setRequiredStatistics(this.opts) + this.setTaskStatistics(this.opts) this.defaultWorkerWeight = this.computeDefaultWorkerWeight() this.roundWeights = this.getRoundWeights() } diff --git a/src/pools/selection-strategies/least-busy-worker-choice-strategy.ts b/src/pools/selection-strategies/least-busy-worker-choice-strategy.ts index 2dfb6083..16b2988b 100644 --- a/src/pools/selection-strategies/least-busy-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/least-busy-worker-choice-strategy.ts @@ -4,7 +4,7 @@ import type { IWorker } from '../worker' import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy' import type { IWorkerChoiceStrategy, - RequiredStatistics, + TaskStatistics, WorkerChoiceStrategyOptions } from './selection-strategies-types' @@ -23,7 +23,7 @@ export class LeastBusyWorkerChoiceStrategy< extends AbstractWorkerChoiceStrategy implements IWorkerChoiceStrategy { /** @inheritDoc */ - public readonly requiredStatistics: RequiredStatistics = { + public readonly taskStatistics: TaskStatistics = { runTime: true, avgRunTime: false, medRunTime: false, @@ -39,7 +39,7 @@ export class LeastBusyWorkerChoiceStrategy< opts: WorkerChoiceStrategyOptions = DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS ) { super(pool, opts) - this.setRequiredStatistics(this.opts) + this.setTaskStatistics(this.opts) } /** @inheritDoc */ diff --git a/src/pools/selection-strategies/least-used-worker-choice-strategy.ts b/src/pools/selection-strategies/least-used-worker-choice-strategy.ts index 86a1ef23..d9e3c20e 100644 --- a/src/pools/selection-strategies/least-used-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/least-used-worker-choice-strategy.ts @@ -27,7 +27,7 @@ export class LeastUsedWorkerChoiceStrategy< opts: WorkerChoiceStrategyOptions = DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS ) { super(pool, opts) - this.setRequiredStatistics(this.opts) + this.setTaskStatistics(this.opts) } /** @inheritDoc */ diff --git a/src/pools/selection-strategies/round-robin-worker-choice-strategy.ts b/src/pools/selection-strategies/round-robin-worker-choice-strategy.ts index be4fe332..afdff788 100644 --- a/src/pools/selection-strategies/round-robin-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/round-robin-worker-choice-strategy.ts @@ -32,7 +32,7 @@ export class RoundRobinWorkerChoiceStrategy< opts: WorkerChoiceStrategyOptions = DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS ) { super(pool, opts) - this.setRequiredStatistics(this.opts) + this.setTaskStatistics(this.opts) } /** @inheritDoc */ diff --git a/src/pools/selection-strategies/selection-strategies-types.ts b/src/pools/selection-strategies/selection-strategies-types.ts index c8497c27..4742d069 100644 --- a/src/pools/selection-strategies/selection-strategies-types.ts +++ b/src/pools/selection-strategies/selection-strategies-types.ts @@ -65,7 +65,7 @@ export interface WorkerChoiceStrategyOptions { * * @internal */ -export interface RequiredStatistics { +export interface TaskStatistics { /** * Require tasks runtime. */ @@ -101,9 +101,9 @@ export interface RequiredStatistics { */ export interface IWorkerChoiceStrategy { /** - * Required tasks usage statistics. + * Required tasks statistics. */ - readonly requiredStatistics: RequiredStatistics + readonly taskStatistics: TaskStatistics /** * Resets strategy internals. * diff --git a/src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts b/src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts index 1ea091c6..729dbe66 100644 --- a/src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts @@ -4,7 +4,7 @@ import { DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS } from '../../utils' import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy' import type { IWorkerChoiceStrategy, - RequiredStatistics, + TaskStatistics, WorkerChoiceStrategyOptions } from './selection-strategies-types' @@ -24,7 +24,7 @@ export class WeightedRoundRobinWorkerChoiceStrategy< extends AbstractWorkerChoiceStrategy implements IWorkerChoiceStrategy { /** @inheritDoc */ - public readonly requiredStatistics: RequiredStatistics = { + public readonly taskStatistics: TaskStatistics = { runTime: true, avgRunTime: true, medRunTime: false, @@ -53,7 +53,7 @@ export class WeightedRoundRobinWorkerChoiceStrategy< opts: WorkerChoiceStrategyOptions = DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS ) { super(pool, opts) - this.setRequiredStatistics(this.opts) + this.setTaskStatistics(this.opts) this.defaultWorkerWeight = this.computeDefaultWorkerWeight() } diff --git a/src/pools/selection-strategies/worker-choice-strategy-context.ts b/src/pools/selection-strategies/worker-choice-strategy-context.ts index 43bcb3ad..3d32dad6 100644 --- a/src/pools/selection-strategies/worker-choice-strategy-context.ts +++ b/src/pools/selection-strategies/worker-choice-strategy-context.ts @@ -8,7 +8,7 @@ import { LeastUsedWorkerChoiceStrategy } from './least-used-worker-choice-strate import { RoundRobinWorkerChoiceStrategy } from './round-robin-worker-choice-strategy' import type { IWorkerChoiceStrategy, - RequiredStatistics, + TaskStatistics, WorkerChoiceStrategy, WorkerChoiceStrategyOptions } from './selection-strategies-types' @@ -97,16 +97,16 @@ export class WorkerChoiceStrategyContext< } /** - * Gets the worker choice strategy in the context required statistics. + * Gets the worker choice strategy task statistics in the context. * - * @returns The required statistics. + * @returns The task statistics. */ - public getRequiredStatistics (): RequiredStatistics { + public getTaskStatistics (): TaskStatistics { return ( this.workerChoiceStrategies.get( this.workerChoiceStrategy ) as IWorkerChoiceStrategy - ).requiredStatistics + ).taskStatistics } /** diff --git a/src/pools/worker.ts b/src/pools/worker.ts index d10a27c9..841a2bcb 100644 --- a/src/pools/worker.ts +++ b/src/pools/worker.ts @@ -47,9 +47,9 @@ export interface Task { */ readonly data?: Data /** - * Submission timestamp. + * Timestamp. */ - readonly submissionTimestamp?: number + readonly timestamp?: number /** * Message UUID. */ diff --git a/src/utility-types.ts b/src/utility-types.ts index cc29801f..9cdf6b0a 100644 --- a/src/utility-types.ts +++ b/src/utility-types.ts @@ -11,6 +11,15 @@ import type { IWorker, Task } from './pools/worker' */ export type Draft = { -readonly [P in keyof T]?: T[P] } +/** + * Performance statistics computation. + */ +export interface WorkerStatistics { + runTime: boolean + waitTime: boolean + elu: boolean +} + /** * Message object that is passed between main worker and worker. * @@ -50,54 +59,12 @@ export interface MessageValue< * Reference to main worker. */ readonly parent?: MainWorker + /** + * Whether to compute the given statistics or not. + */ + readonly statistics?: WorkerStatistics } -/** - * Worker synchronous function that can be executed. - * - * @typeParam Data - Type of data sent to the worker. This can only be serializable data. - * @typeParam Response - Type of execution response. This can only be serializable data. - */ -export type WorkerSyncFunction = ( - data?: Data -) => Response - -/** - * Worker asynchronous function that can be executed. - * This function must return a promise. - * - * @typeParam Data - Type of data sent to the worker. This can only be serializable data. - * @typeParam Response - Type of execution response. This can only be serializable data. - */ -export type WorkerAsyncFunction = ( - data?: Data -) => Promise - -/** - * Worker function that can be executed. - * This function can be synchronous or asynchronous. - * - * @typeParam Data - Type of data sent to the worker. This can only be serializable data. - * @typeParam Response - Type of execution response. This can only be serializable data. - */ -export type WorkerFunction = - | WorkerSyncFunction - | WorkerAsyncFunction - -/** - * Worker functions that can be executed. - * This object can contain synchronous or asynchronous functions. - * The key is the name of the function. - * The value is the function itself. - * - * @typeParam Data - Type of data sent to the worker. This can only be serializable data. - * @typeParam Response - Type of execution response. This can only be serializable data. - */ -export type TaskFunctions = Record< -string, -WorkerFunction -> - /** * An object holding the execution response promise resolve/reject callbacks. * diff --git a/src/worker/abstract-worker.ts b/src/worker/abstract-worker.ts index a484dce1..f40acf37 100644 --- a/src/worker/abstract-worker.ts +++ b/src/worker/abstract-worker.ts @@ -2,29 +2,32 @@ import { AsyncResource } from 'node:async_hooks' import type { Worker } from 'node:cluster' import type { MessagePort } from 'node:worker_threads' import { type EventLoopUtilization, performance } from 'node:perf_hooks' -import type { - MessageValue, - TaskFunctions, - WorkerAsyncFunction, - WorkerFunction, - WorkerSyncFunction -} from '../utility-types' +import type { MessageValue, WorkerStatistics } from '../utility-types' import { EMPTY_FUNCTION, isPlainObject } from '../utils' import { type KillBehavior, KillBehaviors, type WorkerOptions } from './worker-options' +import type { + TaskFunctions, + WorkerAsyncFunction, + WorkerFunction, + WorkerSyncFunction +} from './worker-functions' const DEFAULT_FUNCTION_NAME = 'default' const DEFAULT_MAX_INACTIVE_TIME = 60000 const DEFAULT_KILL_BEHAVIOR: KillBehavior = KillBehaviors.SOFT +/** + * Task performance. + */ interface TaskPerformance { timestamp: number - waitTime: number + waitTime?: number runTime?: number - elu: EventLoopUtilization + elu?: EventLoopUtilization } /** @@ -47,6 +50,10 @@ export abstract class AbstractWorker< * Timestamp of the last task processed by this worker. */ protected lastTaskTimestamp!: number + /** + * Performance statistics computation. + */ + protected statistics!: WorkerStatistics /** * Handler id of the `aliveInterval` worker alive check. */ @@ -90,7 +97,6 @@ export abstract class AbstractWorker< ) this.checkAlive.bind(this)() } - this.mainWorker?.on('message', this.messageListener.bind(this)) } @@ -162,6 +168,9 @@ export abstract class AbstractWorker< // Kill message received this.aliveInterval != null && clearInterval(this.aliveInterval) this.emitDestroy() + } else if (message.statistics != null) { + // Statistics message received + this.statistics = message.statistics } } @@ -292,22 +301,25 @@ export abstract class AbstractWorker< } private beforeTaskRunHook (message: MessageValue): TaskPerformance { - // TODO: conditional accounting const timestamp = performance.now() return { timestamp, - waitTime: timestamp - (message.submissionTimestamp ?? 0), - elu: performance.eventLoopUtilization() + ...(this.statistics.waitTime && { + waitTime: timestamp - (message.timestamp ?? timestamp) + }), + ...(this.statistics.elu && { elu: performance.eventLoopUtilization() }) } } private afterTaskRunHook (taskPerformance: TaskPerformance): TaskPerformance { return { ...taskPerformance, - ...{ - runTime: performance.now() - taskPerformance.timestamp, + ...(this.statistics.runTime && { + runTime: performance.now() - taskPerformance.timestamp + }), + ...(this.statistics.elu && { elu: performance.eventLoopUtilization(taskPerformance.elu) - } + }) } } } diff --git a/src/worker/cluster-worker.ts b/src/worker/cluster-worker.ts index 0ff15cce..2a3961b3 100644 --- a/src/worker/cluster-worker.ts +++ b/src/worker/cluster-worker.ts @@ -1,11 +1,8 @@ import cluster, { type Worker } from 'node:cluster' -import type { - MessageValue, - TaskFunctions, - WorkerFunction -} from '../utility-types' +import type { MessageValue } from '../utility-types' import { AbstractWorker } from './abstract-worker' import type { WorkerOptions } from './worker-options' +import type { TaskFunctions, WorkerFunction } from './worker-functions' /** * A cluster worker used by a poolifier `ClusterPool`. diff --git a/src/worker/thread-worker.ts b/src/worker/thread-worker.ts index ded967f1..81d98aa5 100644 --- a/src/worker/thread-worker.ts +++ b/src/worker/thread-worker.ts @@ -1,11 +1,8 @@ import { type MessagePort, isMainThread, parentPort } from 'node:worker_threads' -import type { - MessageValue, - TaskFunctions, - WorkerFunction -} from '../utility-types' +import type { MessageValue } from '../utility-types' import { AbstractWorker } from './abstract-worker' import type { WorkerOptions } from './worker-options' +import type { TaskFunctions, WorkerFunction } from './worker-functions' /** * A thread worker used by a poolifier `ThreadPool`. diff --git a/src/worker/worker-functions.ts b/src/worker/worker-functions.ts new file mode 100644 index 00000000..28b9ed54 --- /dev/null +++ b/src/worker/worker-functions.ts @@ -0,0 +1,45 @@ +/** + * Worker synchronous function that can be executed. + * + * @typeParam Data - Type of data sent to the worker. This can only be serializable data. + * @typeParam Response - Type of execution response. This can only be serializable data. + */ +export type WorkerSyncFunction = ( + data?: Data +) => Response + +/** + * Worker asynchronous function that can be executed. + * This function must return a promise. + * + * @typeParam Data - Type of data sent to the worker. This can only be serializable data. + * @typeParam Response - Type of execution response. This can only be serializable data. + */ +export type WorkerAsyncFunction = ( + data?: Data +) => Promise + +/** + * Worker function that can be executed. + * This function can be synchronous or asynchronous. + * + * @typeParam Data - Type of data sent to the worker. This can only be serializable data. + * @typeParam Response - Type of execution response. This can only be serializable data. + */ +export type WorkerFunction = + | WorkerSyncFunction + | WorkerAsyncFunction + +/** + * Worker functions that can be executed. + * This object can contain synchronous or asynchronous functions. + * The key is the name of the function. + * The value is the function itself. + * + * @typeParam Data - Type of data sent to the worker. This can only be serializable data. + * @typeParam Response - Type of execution response. This can only be serializable data. + */ +export type TaskFunctions = Record< +string, +WorkerFunction +> diff --git a/tests/pools/abstract/abstract-pool.test.js b/tests/pools/abstract/abstract-pool.test.js index b88e489b..e85f1127 100644 --- a/tests/pools/abstract/abstract-pool.test.js +++ b/tests/pools/abstract/abstract-pool.test.js @@ -193,9 +193,7 @@ describe('Abstract pool test suite', () => { medWaitTime: false }) } - expect( - pool.workerChoiceStrategyContext.getRequiredStatistics() - ).toStrictEqual({ + expect(pool.workerChoiceStrategyContext.getTaskStatistics()).toStrictEqual({ runTime: true, avgRunTime: true, medRunTime: false, @@ -212,9 +210,7 @@ describe('Abstract pool test suite', () => { .workerChoiceStrategies) { expect(workerChoiceStrategy.opts).toStrictEqual({ medRunTime: true }) } - expect( - pool.workerChoiceStrategyContext.getRequiredStatistics() - ).toStrictEqual({ + expect(pool.workerChoiceStrategyContext.getTaskStatistics()).toStrictEqual({ runTime: true, avgRunTime: false, medRunTime: true, @@ -231,9 +227,7 @@ describe('Abstract pool test suite', () => { .workerChoiceStrategies) { expect(workerChoiceStrategy.opts).toStrictEqual({ medRunTime: false }) } - expect( - pool.workerChoiceStrategyContext.getRequiredStatistics() - ).toStrictEqual({ + expect(pool.workerChoiceStrategyContext.getTaskStatistics()).toStrictEqual({ runTime: true, avgRunTime: true, medRunTime: false, diff --git a/tests/pools/selection-strategies/selection-strategies.test.js b/tests/pools/selection-strategies/selection-strategies.test.js index e9f5983e..eedee427 100644 --- a/tests/pools/selection-strategies/selection-strategies.test.js +++ b/tests/pools/selection-strategies/selection-strategies.test.js @@ -121,9 +121,7 @@ describe('Selection strategies test suite', () => { './tests/worker-files/thread/testWorker.js', { workerChoiceStrategy } ) - expect( - pool.workerChoiceStrategyContext.getRequiredStatistics() - ).toStrictEqual({ + expect(pool.workerChoiceStrategyContext.getTaskStatistics()).toStrictEqual({ runTime: false, avgRunTime: false, medRunTime: false, @@ -139,9 +137,7 @@ describe('Selection strategies test suite', () => { './tests/worker-files/thread/testWorker.js', { workerChoiceStrategy } ) - expect( - pool.workerChoiceStrategyContext.getRequiredStatistics() - ).toStrictEqual({ + expect(pool.workerChoiceStrategyContext.getTaskStatistics()).toStrictEqual({ runTime: false, avgRunTime: false, medRunTime: false, @@ -304,9 +300,7 @@ describe('Selection strategies test suite', () => { './tests/worker-files/thread/testWorker.js', { workerChoiceStrategy } ) - expect( - pool.workerChoiceStrategyContext.getRequiredStatistics() - ).toStrictEqual({ + expect(pool.workerChoiceStrategyContext.getTaskStatistics()).toStrictEqual({ runTime: false, avgRunTime: false, medRunTime: false, @@ -322,9 +316,7 @@ describe('Selection strategies test suite', () => { './tests/worker-files/thread/testWorker.js', { workerChoiceStrategy } ) - expect( - pool.workerChoiceStrategyContext.getRequiredStatistics() - ).toStrictEqual({ + expect(pool.workerChoiceStrategyContext.getTaskStatistics()).toStrictEqual({ runTime: false, avgRunTime: false, medRunTime: false, @@ -411,9 +403,7 @@ describe('Selection strategies test suite', () => { './tests/worker-files/thread/testWorker.js', { workerChoiceStrategy } ) - expect( - pool.workerChoiceStrategyContext.getRequiredStatistics() - ).toStrictEqual({ + expect(pool.workerChoiceStrategyContext.getTaskStatistics()).toStrictEqual({ runTime: true, avgRunTime: false, medRunTime: false, @@ -429,9 +419,7 @@ describe('Selection strategies test suite', () => { './tests/worker-files/thread/testWorker.js', { workerChoiceStrategy } ) - expect( - pool.workerChoiceStrategyContext.getRequiredStatistics() - ).toStrictEqual({ + expect(pool.workerChoiceStrategyContext.getTaskStatistics()).toStrictEqual({ runTime: true, avgRunTime: false, medRunTime: false, @@ -524,9 +512,7 @@ describe('Selection strategies test suite', () => { './tests/worker-files/thread/testWorker.js', { workerChoiceStrategy } ) - expect( - pool.workerChoiceStrategyContext.getRequiredStatistics() - ).toStrictEqual({ + expect(pool.workerChoiceStrategyContext.getTaskStatistics()).toStrictEqual({ runTime: true, avgRunTime: true, medRunTime: false, @@ -542,9 +528,7 @@ describe('Selection strategies test suite', () => { './tests/worker-files/thread/testWorker.js', { workerChoiceStrategy } ) - expect( - pool.workerChoiceStrategyContext.getRequiredStatistics() - ).toStrictEqual({ + expect(pool.workerChoiceStrategyContext.getTaskStatistics()).toStrictEqual({ runTime: true, avgRunTime: true, medRunTime: false, @@ -765,9 +749,7 @@ describe('Selection strategies test suite', () => { './tests/worker-files/thread/testWorker.js', { workerChoiceStrategy } ) - expect( - pool.workerChoiceStrategyContext.getRequiredStatistics() - ).toStrictEqual({ + expect(pool.workerChoiceStrategyContext.getTaskStatistics()).toStrictEqual({ runTime: true, avgRunTime: true, medRunTime: false, @@ -783,9 +765,7 @@ describe('Selection strategies test suite', () => { './tests/worker-files/thread/testWorker.js', { workerChoiceStrategy } ) - expect( - pool.workerChoiceStrategyContext.getRequiredStatistics() - ).toStrictEqual({ + expect(pool.workerChoiceStrategyContext.getTaskStatistics()).toStrictEqual({ runTime: true, avgRunTime: true, medRunTime: false, @@ -1032,9 +1012,7 @@ describe('Selection strategies test suite', () => { './tests/worker-files/thread/testWorker.js', { workerChoiceStrategy } ) - expect( - pool.workerChoiceStrategyContext.getRequiredStatistics() - ).toStrictEqual({ + expect(pool.workerChoiceStrategyContext.getTaskStatistics()).toStrictEqual({ runTime: false, avgRunTime: false, medRunTime: false, @@ -1050,9 +1028,7 @@ describe('Selection strategies test suite', () => { './tests/worker-files/thread/testWorker.js', { workerChoiceStrategy } ) - expect( - pool.workerChoiceStrategyContext.getRequiredStatistics() - ).toStrictEqual({ + expect(pool.workerChoiceStrategyContext.getTaskStatistics()).toStrictEqual({ runTime: false, avgRunTime: false, medRunTime: false, diff --git a/tests/pools/selection-strategies/worker-choice-strategy-context.test.js b/tests/pools/selection-strategies/worker-choice-strategy-context.test.js index d40fae4e..5e202039 100644 --- a/tests/pools/selection-strategies/worker-choice-strategy-context.test.js +++ b/tests/pools/selection-strategies/worker-choice-strategy-context.test.js @@ -368,10 +368,10 @@ describe('Worker choice strategy context test suite', () => { medRunTime: true } ) - expect(workerChoiceStrategyContext.getRequiredStatistics().avgRunTime).toBe( + expect(workerChoiceStrategyContext.getTaskStatistics().avgRunTime).toBe( false ) - expect(workerChoiceStrategyContext.getRequiredStatistics().medRunTime).toBe( + expect(workerChoiceStrategyContext.getTaskStatistics().medRunTime).toBe( true ) workerChoiceStrategyContext = new WorkerChoiceStrategyContext( @@ -381,10 +381,10 @@ describe('Worker choice strategy context test suite', () => { medRunTime: true } ) - expect(workerChoiceStrategyContext.getRequiredStatistics().avgRunTime).toBe( + expect(workerChoiceStrategyContext.getTaskStatistics().avgRunTime).toBe( false ) - expect(workerChoiceStrategyContext.getRequiredStatistics().medRunTime).toBe( + expect(workerChoiceStrategyContext.getTaskStatistics().medRunTime).toBe( true ) const fsWorkerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE @@ -395,10 +395,10 @@ describe('Worker choice strategy context test suite', () => { medRunTime: true } ) - expect(workerChoiceStrategyContext.getRequiredStatistics().avgRunTime).toBe( + expect(workerChoiceStrategyContext.getTaskStatistics().avgRunTime).toBe( false ) - expect(workerChoiceStrategyContext.getRequiredStatistics().medRunTime).toBe( + expect(workerChoiceStrategyContext.getTaskStatistics().medRunTime).toBe( true ) workerChoiceStrategyContext = new WorkerChoiceStrategyContext( @@ -408,10 +408,10 @@ describe('Worker choice strategy context test suite', () => { medRunTime: true } ) - expect(workerChoiceStrategyContext.getRequiredStatistics().avgRunTime).toBe( + expect(workerChoiceStrategyContext.getTaskStatistics().avgRunTime).toBe( false ) - expect(workerChoiceStrategyContext.getRequiredStatistics().medRunTime).toBe( + expect(workerChoiceStrategyContext.getTaskStatistics().medRunTime).toBe( true ) }) -- 2.34.1