From 62c15a687b812a5404f64867adbd14320444af99 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Mon, 5 Jun 2023 15:41:17 +0200 Subject: [PATCH] feat: add ELU tasks accounting MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Reference: #768 Signed-off-by: Jérôme Benoit --- .eslintrc.js | 2 + rollup.config.mjs | 1 + src/pools/abstract-pool.ts | 27 +++++++++++- .../abstract-worker-choice-strategy.ts | 3 +- .../fair-share-worker-choice-strategy.ts | 3 +- .../least-busy-worker-choice-strategy.ts | 3 +- .../selection-strategies-types.ts | 4 ++ ...hted-round-robin-worker-choice-strategy.ts | 3 +- src/pools/worker.ts | 5 +++ src/utility-types.ts | 5 +++ src/worker/abstract-worker.ts | 41 ++++++++++++++++--- 11 files changed, 85 insertions(+), 12 deletions(-) diff --git a/.eslintrc.js b/.eslintrc.js index d7004cc5..4d8c7ffd 100644 --- a/.eslintrc.js +++ b/.eslintrc.js @@ -51,6 +51,7 @@ module.exports = defineConfig({ 'dequeue', 'dequeued', 'ecma', + 'elu', 'enqueue', 'enum', 'errored', @@ -63,6 +64,7 @@ module.exports = defineConfig({ 'mjs', 'num', 'os', + 'perf', 'piscina', 'pnpm', 'poolifier', diff --git a/rollup.config.mjs b/rollup.config.mjs index 7454ef72..b5c0b285 100644 --- a/rollup.config.mjs +++ b/rollup.config.mjs @@ -49,6 +49,7 @@ export default { 'node:crypto', 'node:events', 'node:os', + 'node:perf_hooks', 'node:worker_threads' ], plugins: [ diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 5088f3d9..065b228a 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -1,4 +1,5 @@ import crypto from 'node:crypto' +import { performance } from 'node:perf_hooks' import type { MessageValue, PromiseResponseWrapper } from '../utility-types' import { DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, @@ -299,7 +300,8 @@ export abstract class AbstractPool< waitTimeHistory: new CircularArray(), avgWaitTime: 0, medWaitTime: 0, - error: 0 + error: 0, + elu: undefined }) } this.workerChoiceStrategyContext.setWorkerChoiceStrategy( @@ -474,6 +476,7 @@ export abstract class AbstractPool< } this.updateRunTimeTasksUsage(workerTasksUsage, message) this.updateWaitTimeTasksUsage(workerTasksUsage, message) + this.updateEluTasksUsage(workerTasksUsage, message) } private updateRunTimeTasksUsage ( @@ -522,6 +525,25 @@ export abstract class AbstractPool< } } + private updateEluTasksUsage ( + workerTasksUsage: TasksUsage, + message: MessageValue + ): void { + if (this.workerChoiceStrategyContext.getRequiredStatistics().elu) { + if (workerTasksUsage.elu != null && message.elu != null) { + // TODO: cumulative or delta? + workerTasksUsage.elu = { + idle: workerTasksUsage.elu.idle + message.elu.idle, + active: workerTasksUsage.elu.active + message.elu.active, + utilization: + workerTasksUsage.elu.utilization + message.elu.utilization + } + } else if (message.elu != null) { + workerTasksUsage.elu = message.elu + } + } + } + /** * Chooses a worker node for the next task. * @@ -704,7 +726,8 @@ export abstract class AbstractPool< waitTimeHistory: new CircularArray(), avgWaitTime: 0, medWaitTime: 0, - error: 0 + error: 0, + elu: undefined }, tasksQueue: new Queue>() }) diff --git a/src/pools/selection-strategies/abstract-worker-choice-strategy.ts b/src/pools/selection-strategies/abstract-worker-choice-strategy.ts index 6e798fbc..9b14f3c5 100644 --- a/src/pools/selection-strategies/abstract-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/abstract-worker-choice-strategy.ts @@ -31,7 +31,8 @@ export abstract class AbstractWorkerChoiceStrategy< medRunTime: false, waitTime: false, avgWaitTime: false, - medWaitTime: false + medWaitTime: false, + elu: false } /** diff --git a/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts b/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts index 56d9dc36..a0e7ee1a 100644 --- a/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts @@ -30,7 +30,8 @@ export class FairShareWorkerChoiceStrategy< medRunTime: false, waitTime: false, avgWaitTime: false, - medWaitTime: false + medWaitTime: false, + elu: false } /** diff --git a/src/pools/selection-strategies/least-busy-worker-choice-strategy.ts b/src/pools/selection-strategies/least-busy-worker-choice-strategy.ts index 6e413825..2dfb6083 100644 --- a/src/pools/selection-strategies/least-busy-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/least-busy-worker-choice-strategy.ts @@ -29,7 +29,8 @@ export class LeastBusyWorkerChoiceStrategy< medRunTime: false, waitTime: false, avgWaitTime: false, - medWaitTime: false + medWaitTime: false, + elu: false } /** @inheritDoc */ diff --git a/src/pools/selection-strategies/selection-strategies-types.ts b/src/pools/selection-strategies/selection-strategies-types.ts index 18ba40b4..c8497c27 100644 --- a/src/pools/selection-strategies/selection-strategies-types.ts +++ b/src/pools/selection-strategies/selection-strategies-types.ts @@ -90,6 +90,10 @@ export interface RequiredStatistics { * Require tasks median wait time. */ medWaitTime: boolean + /** + * Event loop utilization. + */ + elu: boolean } /** diff --git a/src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts b/src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts index 17ae6647..1ea091c6 100644 --- a/src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts @@ -30,7 +30,8 @@ export class WeightedRoundRobinWorkerChoiceStrategy< medRunTime: false, waitTime: false, avgWaitTime: false, - medWaitTime: false + medWaitTime: false, + elu: false } /** diff --git a/src/pools/worker.ts b/src/pools/worker.ts index ec75c819..d10a27c9 100644 --- a/src/pools/worker.ts +++ b/src/pools/worker.ts @@ -1,3 +1,4 @@ +import type { EventLoopUtilization } from 'node:perf_hooks' import type { CircularArray } from '../circular-array' import type { Queue } from '../queue' @@ -105,6 +106,10 @@ export interface TasksUsage { * Number of tasks errored. */ error: number + /** + * Event loop utilization. + */ + elu: EventLoopUtilization | undefined } /** diff --git a/src/utility-types.ts b/src/utility-types.ts index 21a44253..cc29801f 100644 --- a/src/utility-types.ts +++ b/src/utility-types.ts @@ -1,5 +1,6 @@ import type { Worker as ClusterWorker } from 'node:cluster' import type { MessagePort } from 'node:worker_threads' +import type { EventLoopUtilization } from 'node:perf_hooks' import type { KillBehavior } from './worker/worker-options' import type { IWorker, Task } from './pools/worker' @@ -41,6 +42,10 @@ export interface MessageValue< * Wait time. */ readonly waitTime?: number + /** + * Event loop utilization. + */ + readonly elu?: EventLoopUtilization /** * Reference to main worker. */ diff --git a/src/worker/abstract-worker.ts b/src/worker/abstract-worker.ts index 91a45e94..a484dce1 100644 --- a/src/worker/abstract-worker.ts +++ b/src/worker/abstract-worker.ts @@ -1,6 +1,7 @@ import { AsyncResource } from 'node:async_hooks' import type { Worker } from 'node:cluster' import type { MessagePort } from 'node:worker_threads' +import { type EventLoopUtilization, performance } from 'node:perf_hooks' import type { MessageValue, TaskFunctions, @@ -19,6 +20,13 @@ const DEFAULT_FUNCTION_NAME = 'default' const DEFAULT_MAX_INACTIVE_TIME = 60000 const DEFAULT_KILL_BEHAVIOR: KillBehavior = KillBehaviors.SOFT +interface TaskPerformance { + timestamp: number + waitTime: number + runTime?: number + elu: EventLoopUtilization +} + /** * Base class that implements some shared logic for all poolifier workers. * @@ -209,14 +217,14 @@ export abstract class AbstractWorker< message: MessageValue ): void { try { - const startTimestamp = performance.now() - const waitTime = startTimestamp - (message.submissionTimestamp ?? 0) + const taskPerformance = this.beforeTaskRunHook(message) const res = fn(message.data) - const runTime = performance.now() - startTimestamp + const { runTime, waitTime, elu } = this.afterTaskRunHook(taskPerformance) this.sendToMainWorker({ data: res, runTime, waitTime, + elu, id: message.id }) } catch (e) { @@ -241,15 +249,16 @@ export abstract class AbstractWorker< fn: WorkerAsyncFunction, message: MessageValue ): void { - const startTimestamp = performance.now() - const waitTime = startTimestamp - (message.submissionTimestamp ?? 0) + const taskPerformance = this.beforeTaskRunHook(message) fn(message.data) .then(res => { - const runTime = performance.now() - startTimestamp + const { runTime, waitTime, elu } = + this.afterTaskRunHook(taskPerformance) this.sendToMainWorker({ data: res, runTime, waitTime, + elu, id: message.id }) return null @@ -281,4 +290,24 @@ export abstract class AbstractWorker< } return fn } + + private beforeTaskRunHook (message: MessageValue): TaskPerformance { + // TODO: conditional accounting + const timestamp = performance.now() + return { + timestamp, + waitTime: timestamp - (message.submissionTimestamp ?? 0), + elu: performance.eventLoopUtilization() + } + } + + private afterTaskRunHook (taskPerformance: TaskPerformance): TaskPerformance { + return { + ...taskPerformance, + ...{ + runTime: performance.now() - taskPerformance.timestamp, + elu: performance.eventLoopUtilization(taskPerformance.elu) + } + } + } } -- 2.34.1