From 0567595a23237d7b0e4bc0ec70c8e313eb71bb10 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Mon, 29 May 2023 22:37:59 +0200 Subject: [PATCH] feat: add tasks wait time account per worker MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- README.md | 2 +- benchmarks/versus-external-pools/package.json | 2 +- package.json | 2 +- src/pools/abstract-pool.ts | 38 +++++++++++++++++-- src/pools/pool.ts | 4 +- .../abstract-worker-choice-strategy.ts | 13 ++++++- .../fair-share-worker-choice-strategy.ts | 5 ++- .../less-busy-worker-choice-strategy.ts | 5 ++- .../selection-strategies-types.ts | 18 +++++++++ ...hted-round-robin-worker-choice-strategy.ts | 5 ++- src/pools/worker.ts | 20 ++++++++++ 11 files changed, 102 insertions(+), 12 deletions(-) diff --git a/README.md b/README.md index 18b0d78a..1b7e641b 100644 --- a/README.md +++ b/README.md @@ -145,7 +145,7 @@ Remember that workers can only send and receive serializable data. ## Node versions -Node versions >= 16.x are supported. +Node versions >= 16.14.x are supported. ## [API](https://poolifier.github.io/poolifier/) diff --git a/benchmarks/versus-external-pools/package.json b/benchmarks/versus-external-pools/package.json index 9d1ecc6b..b466e3c6 100644 --- a/benchmarks/versus-external-pools/package.json +++ b/benchmarks/versus-external-pools/package.json @@ -6,7 +6,7 @@ "main": "index.js", "author": "pioardi", "engines": { - "node": ">=14.14.0", + "node": ">=16.14.0", "pnpm": ">=8.6.0" }, "volta": { diff --git a/package.json b/package.json index 16edd5ca..6a189663 100644 --- a/package.json +++ b/package.json @@ -43,7 +43,7 @@ ] }, "engines": { - "node": ">=16.0.0", + "node": ">=16.14.0", "pnpm": ">=8.6.0" }, "volta": { diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 4c82ef08..e3967446 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -263,6 +263,10 @@ export abstract class AbstractPool< runTimeHistory: new CircularArray(), avgRunTime: 0, medRunTime: 0, + waitTime: 0, + waitTimeHistory: new CircularArray(), + avgWaitTime: 0, + medWaitTime: 0, error: 0 }) } @@ -340,11 +344,13 @@ export abstract class AbstractPool< /** @inheritDoc */ public async execute (data?: Data, name?: string): Promise { + const submissionTimestamp = performance.now() const workerNodeKey = this.chooseWorkerNode() const submittedTask: Task = { name, // eslint-disable-next-line @typescript-eslint/consistent-type-assertions data: data ?? ({} as Data), + submissionTimestamp, id: crypto.randomUUID() } const res = new Promise((resolve, reject) => { @@ -409,8 +415,22 @@ export abstract class AbstractPool< * * @param workerNodeKey - The worker node key. */ - protected beforeTaskExecutionHook (workerNodeKey: number): void { - ++this.workerNodes[workerNodeKey].tasksUsage.running + protected beforeTaskExecutionHook ( + workerNodeKey: number, + task: Task + ): void { + const workerTasksUsage = this.workerNodes[workerNodeKey].tasksUsage + ++workerTasksUsage.running + if (this.workerChoiceStrategyContext.getRequiredStatistics().waitTime) { + const waitTime = performance.now() - (task.submissionTimestamp ?? 0) + workerTasksUsage.waitTime += waitTime + if ( + this.workerChoiceStrategyContext.getRequiredStatistics().medWaitTime + ) { + workerTasksUsage.waitTimeHistory.push(waitTime) + workerTasksUsage.medWaitTime = median(workerTasksUsage.waitTimeHistory) + } + } } /** @@ -448,6 +468,14 @@ export abstract class AbstractPool< workerTasksUsage.medRunTime = median(workerTasksUsage.runTimeHistory) } } + if ( + this.workerChoiceStrategyContext.getRequiredStatistics().waitTime && + this.workerChoiceStrategyContext.getRequiredStatistics().avgWaitTime && + workerTasksUsage.run !== 0 + ) { + workerTasksUsage.avgWaitTime = + workerTasksUsage.waitTime / workerTasksUsage.run + } } /** @@ -611,6 +639,10 @@ export abstract class AbstractPool< runTimeHistory: new CircularArray(), avgRunTime: 0, medRunTime: 0, + waitTime: 0, + waitTimeHistory: new CircularArray(), + avgWaitTime: 0, + medWaitTime: 0, error: 0 }, tasksQueue: new Queue>() @@ -650,7 +682,7 @@ export abstract class AbstractPool< } private executeTask (workerNodeKey: number, task: Task): void { - this.beforeTaskExecutionHook(workerNodeKey) + this.beforeTaskExecutionHook(workerNodeKey, task) this.sendToWorker(this.workerNodes[workerNodeKey].worker, task) } diff --git a/src/pools/pool.ts b/src/pools/pool.ts index 0a015476..14b30812 100644 --- a/src/pools/pool.ts +++ b/src/pools/pool.ts @@ -1,4 +1,4 @@ -import EventEmitter from 'node:events' +import EventEmitterAsyncResource from 'node:events' import type { ErrorHandler, ExitHandler, @@ -32,7 +32,7 @@ export enum PoolType { /** * Pool events emitter. */ -export class PoolEmitter extends EventEmitter {} +export class PoolEmitter extends EventEmitterAsyncResource {} /** * Enumeration of pool events. diff --git a/src/pools/selection-strategies/abstract-worker-choice-strategy.ts b/src/pools/selection-strategies/abstract-worker-choice-strategy.ts index a4455d28..ad42f918 100644 --- a/src/pools/selection-strategies/abstract-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/abstract-worker-choice-strategy.ts @@ -27,7 +27,10 @@ export abstract class AbstractWorkerChoiceStrategy< public readonly requiredStatistics: RequiredStatistics = { runTime: false, avgRunTime: false, - medRunTime: false + medRunTime: false, + waitTime: false, + avgWaitTime: false, + medWaitTime: false } /** @@ -52,6 +55,14 @@ export abstract class AbstractWorkerChoiceStrategy< this.requiredStatistics.avgRunTime = true this.requiredStatistics.medRunTime = opts.medRunTime as boolean } + if (this.requiredStatistics.avgWaitTime && opts.medWaitTime === true) { + this.requiredStatistics.avgWaitTime = false + this.requiredStatistics.medWaitTime = opts.medWaitTime as boolean + } + if (this.requiredStatistics.medWaitTime && opts.medWaitTime === false) { + this.requiredStatistics.avgWaitTime = true + this.requiredStatistics.medWaitTime = opts.medWaitTime as boolean + } } /** @inheritDoc */ diff --git a/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts b/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts index e26f6b8f..56d9dc36 100644 --- a/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts @@ -27,7 +27,10 @@ export class FairShareWorkerChoiceStrategy< public readonly requiredStatistics: RequiredStatistics = { runTime: true, avgRunTime: true, - medRunTime: false + medRunTime: false, + waitTime: false, + avgWaitTime: false, + medWaitTime: false } /** diff --git a/src/pools/selection-strategies/less-busy-worker-choice-strategy.ts b/src/pools/selection-strategies/less-busy-worker-choice-strategy.ts index f518f9b1..02f51c6b 100644 --- a/src/pools/selection-strategies/less-busy-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/less-busy-worker-choice-strategy.ts @@ -26,7 +26,10 @@ export class LessBusyWorkerChoiceStrategy< public readonly requiredStatistics: RequiredStatistics = { runTime: true, avgRunTime: false, - medRunTime: false + medRunTime: false, + waitTime: false, + avgWaitTime: false, + medWaitTime: false } /** @inheritDoc */ diff --git a/src/pools/selection-strategies/selection-strategies-types.ts b/src/pools/selection-strategies/selection-strategies-types.ts index b95cf7f3..94e2d3e6 100644 --- a/src/pools/selection-strategies/selection-strategies-types.ts +++ b/src/pools/selection-strategies/selection-strategies-types.ts @@ -39,6 +39,12 @@ export interface WorkerChoiceStrategyOptions { * @defaultValue false */ medRunTime?: boolean + /** + * Use tasks median wait time instead of average run time. + * + * @defaultValue false + */ + medWaitTime?: boolean /** * Worker weights to use for weighted round robin worker selection strategy. * Weight is the tasks maximum average or median runtime in milliseconds. @@ -66,6 +72,18 @@ export interface RequiredStatistics { * Require tasks median run time. */ medRunTime: boolean + /** + * Require tasks wait time. + */ + waitTime: boolean + /** + * Require tasks average wait time. + */ + avgWaitTime: boolean + /** + * Require tasks median wait time. + */ + medWaitTime: boolean } /** diff --git a/src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts b/src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts index a0553fcb..df45feb3 100644 --- a/src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts @@ -28,7 +28,10 @@ export class WeightedRoundRobinWorkerChoiceStrategy< public readonly requiredStatistics: RequiredStatistics = { runTime: true, avgRunTime: true, - medRunTime: false + medRunTime: false, + waitTime: false, + avgWaitTime: false, + medWaitTime: false } /** diff --git a/src/pools/worker.ts b/src/pools/worker.ts index 2db5f630..35d2c0ab 100644 --- a/src/pools/worker.ts +++ b/src/pools/worker.ts @@ -45,6 +45,10 @@ export interface Task { * Task input data that will be passed to the worker. */ readonly data?: Data + /** + * Submission timestamp. + */ + readonly submissionTimestamp?: number /** * Message UUID. */ @@ -81,6 +85,22 @@ export interface TasksUsage { * Median tasks runtime. */ medRunTime: number + /** + * Tasks wait time. + */ + waitTime: number + /** + * Tasks wait time history. + */ + waitTimeHistory: CircularArray + /** + * Average tasks wait time. + */ + avgWaitTime: number + /** + * Median tasks wait time. + */ + medWaitTime: number /** * Number of tasks errored. */ -- 2.34.1