X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fworker%2Fabstract-worker.ts;h=f40acf3786540eb350576612c068c4e851968449;hb=b6b3245344bd453ea91fa3d74acd5145f70d84fd;hp=a484dce12927c1cd29721e9529aa08a822c37b43;hpb=8d20e449d72975f6add9177d1097d5a204d14f71;p=poolifier.git diff --git a/src/worker/abstract-worker.ts b/src/worker/abstract-worker.ts index a484dce1..f40acf37 100644 --- a/src/worker/abstract-worker.ts +++ b/src/worker/abstract-worker.ts @@ -2,29 +2,32 @@ import { AsyncResource } from 'node:async_hooks' import type { Worker } from 'node:cluster' import type { MessagePort } from 'node:worker_threads' import { type EventLoopUtilization, performance } from 'node:perf_hooks' -import type { - MessageValue, - TaskFunctions, - WorkerAsyncFunction, - WorkerFunction, - WorkerSyncFunction -} from '../utility-types' +import type { MessageValue, WorkerStatistics } from '../utility-types' import { EMPTY_FUNCTION, isPlainObject } from '../utils' import { type KillBehavior, KillBehaviors, type WorkerOptions } from './worker-options' +import type { + TaskFunctions, + WorkerAsyncFunction, + WorkerFunction, + WorkerSyncFunction +} from './worker-functions' const DEFAULT_FUNCTION_NAME = 'default' const DEFAULT_MAX_INACTIVE_TIME = 60000 const DEFAULT_KILL_BEHAVIOR: KillBehavior = KillBehaviors.SOFT +/** + * Task performance. + */ interface TaskPerformance { timestamp: number - waitTime: number + waitTime?: number runTime?: number - elu: EventLoopUtilization + elu?: EventLoopUtilization } /** @@ -47,6 +50,10 @@ export abstract class AbstractWorker< * Timestamp of the last task processed by this worker. */ protected lastTaskTimestamp!: number + /** + * Performance statistics computation. + */ + protected statistics!: WorkerStatistics /** * Handler id of the `aliveInterval` worker alive check. */ @@ -90,7 +97,6 @@ export abstract class AbstractWorker< ) this.checkAlive.bind(this)() } - this.mainWorker?.on('message', this.messageListener.bind(this)) } @@ -162,6 +168,9 @@ export abstract class AbstractWorker< // Kill message received this.aliveInterval != null && clearInterval(this.aliveInterval) this.emitDestroy() + } else if (message.statistics != null) { + // Statistics message received + this.statistics = message.statistics } } @@ -292,22 +301,25 @@ export abstract class AbstractWorker< } private beforeTaskRunHook (message: MessageValue): TaskPerformance { - // TODO: conditional accounting const timestamp = performance.now() return { timestamp, - waitTime: timestamp - (message.submissionTimestamp ?? 0), - elu: performance.eventLoopUtilization() + ...(this.statistics.waitTime && { + waitTime: timestamp - (message.timestamp ?? timestamp) + }), + ...(this.statistics.elu && { elu: performance.eventLoopUtilization() }) } } private afterTaskRunHook (taskPerformance: TaskPerformance): TaskPerformance { return { ...taskPerformance, - ...{ - runTime: performance.now() - taskPerformance.timestamp, + ...(this.statistics.runTime && { + runTime: performance.now() - taskPerformance.timestamp + }), + ...(this.statistics.elu && { elu: performance.eventLoopUtilization(taskPerformance.elu) - } + }) } } }