From: Jérôme Benoit Date: Mon, 29 Apr 2024 09:58:13 +0000 (+0200) Subject: perf: build worker choice strategies policy and task equirements on X-Git-Tag: v4.0.0~1^2~19 X-Git-Url: https://git.piment-noir.org/?a=commitdiff_plain;h=19b8be8ba6e628292d8c461a7d5702a25e0f76e7;p=poolifier.git perf: build worker choice strategies policy and task equirements on demand Signed-off-by: Jérôme Benoit --- diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index c510b2ad..5e88a7cd 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -547,9 +547,12 @@ export abstract class AbstractPool< workerChoiceStrategy: WorkerChoiceStrategy, workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions ): void { + let requireSync = false checkValidWorkerChoiceStrategy(workerChoiceStrategy) if (workerChoiceStrategyOptions != null) { - this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions) + requireSync = this.setWorkerChoiceStrategyOptions( + workerChoiceStrategyOptions + ) } if (workerChoiceStrategy !== this.opts.workerChoiceStrategy) { this.opts.workerChoiceStrategy = workerChoiceStrategy @@ -557,11 +560,14 @@ export abstract class AbstractPool< this.opts.workerChoiceStrategy, this.opts.workerChoiceStrategyOptions ) + requireSync = true + } + if (requireSync) { this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies( this.getWorkerWorkerChoiceStrategies(), this.opts.workerChoiceStrategyOptions ) - for (const [workerNodeKey] of this.workerNodes.entries()) { + for (const workerNodeKey of this.workerNodes.keys()) { this.sendStatisticsMessageToWorker(workerNodeKey) } } @@ -570,14 +576,23 @@ export abstract class AbstractPool< /** @inheritDoc */ public setWorkerChoiceStrategyOptions ( workerChoiceStrategyOptions: WorkerChoiceStrategyOptions | undefined - ): void { + ): boolean { this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions) if (workerChoiceStrategyOptions != null) { this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions + this.workerChoiceStrategiesContext?.setOptions( + this.opts.workerChoiceStrategyOptions + ) + this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies( + this.getWorkerWorkerChoiceStrategies(), + this.opts.workerChoiceStrategyOptions + ) + for (const workerNodeKey of this.workerNodes.keys()) { + this.sendStatisticsMessageToWorker(workerNodeKey) + } + return true } - this.workerChoiceStrategiesContext?.setOptions( - this.opts.workerChoiceStrategyOptions - ) + return false } /** @inheritDoc */ @@ -639,13 +654,13 @@ export abstract class AbstractPool< } private setTaskStealing (): void { - for (const [workerNodeKey] of this.workerNodes.entries()) { + for (const workerNodeKey of this.workerNodes.keys()) { this.workerNodes[workerNodeKey].on('idle', this.handleWorkerNodeIdleEvent) } } private unsetTaskStealing (): void { - for (const [workerNodeKey] of this.workerNodes.entries()) { + for (const workerNodeKey of this.workerNodes.keys()) { this.workerNodes[workerNodeKey].off( 'idle', this.handleWorkerNodeIdleEvent @@ -654,7 +669,7 @@ export abstract class AbstractPool< } private setTasksStealingOnBackPressure (): void { - for (const [workerNodeKey] of this.workerNodes.entries()) { + for (const workerNodeKey of this.workerNodes.keys()) { this.workerNodes[workerNodeKey].on( 'backPressure', this.handleWorkerNodeBackPressureEvent @@ -663,7 +678,7 @@ export abstract class AbstractPool< } private unsetTasksStealingOnBackPressure (): void { - for (const [workerNodeKey] of this.workerNodes.entries()) { + for (const workerNodeKey of this.workerNodes.keys()) { this.workerNodes[workerNodeKey].off( 'backPressure', this.handleWorkerNodeBackPressureEvent @@ -806,7 +821,7 @@ export abstract class AbstractPool< } } } - for (const [workerNodeKey] of this.workerNodes.entries()) { + for (const workerNodeKey of this.workerNodes.keys()) { this.registerWorkerMessageListener( workerNodeKey, taskFunctionOperationsListener @@ -2032,7 +2047,7 @@ export abstract class AbstractPool< } private flushTasksQueues (): void { - for (const [workerNodeKey] of this.workerNodes.entries()) { + for (const workerNodeKey of this.workerNodes.keys()) { this.flushTasksQueue(workerNodeKey) } } diff --git a/src/pools/pool.ts b/src/pools/pool.ts index 82f3d8d9..ccb4b5b9 100644 --- a/src/pools/pool.ts +++ b/src/pools/pool.ts @@ -348,10 +348,11 @@ export interface IPool< * Sets the worker choice strategy options in this pool. * * @param workerChoiceStrategyOptions - The worker choice strategy options. + * @returns `true` if the worker choice strategy options were set, `false` otherwise. */ readonly setWorkerChoiceStrategyOptions: ( workerChoiceStrategyOptions: WorkerChoiceStrategyOptions - ) => void + ) => boolean /** * Enables/disables the worker node tasks queue in this pool. * diff --git a/src/pools/selection-strategies/selection-strategies-utils.ts b/src/pools/selection-strategies/selection-strategies-utils.ts index d1e60e03..777be6e4 100644 --- a/src/pools/selection-strategies/selection-strategies-utils.ts +++ b/src/pools/selection-strategies/selection-strategies-utils.ts @@ -10,6 +10,8 @@ import { LeastUsedWorkerChoiceStrategy } from './least-used-worker-choice-strate import { RoundRobinWorkerChoiceStrategy } from './round-robin-worker-choice-strategy.js' import { type IWorkerChoiceStrategy, + type StrategyPolicy, + type TaskStatisticsRequirements, WorkerChoiceStrategies, type WorkerChoiceStrategy, type WorkerChoiceStrategyOptions @@ -98,6 +100,47 @@ export const buildWorkerChoiceStrategyOptions = < } } +export const buildWorkerChoiceStrategiesPolicy = ( + workerChoiceStrategies: Map +): StrategyPolicy => { + const policies: StrategyPolicy[] = [] + for (const workerChoiceStrategy of workerChoiceStrategies.values()) { + policies.push(workerChoiceStrategy.strategyPolicy) + } + return { + dynamicWorkerUsage: policies.some(p => p.dynamicWorkerUsage), + dynamicWorkerReady: policies.some(p => p.dynamicWorkerReady) + } +} + +export const buildWorkerChoiceStrategiesTaskStatisticsRequirements = ( + workerChoiceStrategies: Map +): TaskStatisticsRequirements => { + const taskStatisticsRequirements: TaskStatisticsRequirements[] = [] + for (const workerChoiceStrategy of workerChoiceStrategies.values()) { + taskStatisticsRequirements.push( + workerChoiceStrategy.taskStatisticsRequirements + ) + } + return { + runTime: { + aggregate: taskStatisticsRequirements.some(r => r.runTime.aggregate), + average: taskStatisticsRequirements.some(r => r.runTime.average), + median: taskStatisticsRequirements.some(r => r.runTime.median) + }, + waitTime: { + aggregate: taskStatisticsRequirements.some(r => r.waitTime.aggregate), + average: taskStatisticsRequirements.some(r => r.waitTime.average), + median: taskStatisticsRequirements.some(r => r.waitTime.median) + }, + elu: { + aggregate: taskStatisticsRequirements.some(r => r.elu.aggregate), + average: taskStatisticsRequirements.some(r => r.elu.average), + median: taskStatisticsRequirements.some(r => r.elu.median) + } + } +} + export const getWorkerChoiceStrategy = ( workerChoiceStrategy: WorkerChoiceStrategy, pool: IPool, diff --git a/src/pools/selection-strategies/worker-choice-strategies-context.ts b/src/pools/selection-strategies/worker-choice-strategies-context.ts index 7e633e16..d088015a 100644 --- a/src/pools/selection-strategies/worker-choice-strategies-context.ts +++ b/src/pools/selection-strategies/worker-choice-strategies-context.ts @@ -9,6 +9,8 @@ import type { } from './selection-strategies-types.js' import { WorkerChoiceStrategies } from './selection-strategies-types.js' import { + buildWorkerChoiceStrategiesPolicy, + buildWorkerChoiceStrategiesTaskStatisticsRequirements, getWorkerChoiceStrategiesRetries, getWorkerChoiceStrategy } from './selection-strategies-utils.js' @@ -43,6 +45,16 @@ export class WorkerChoiceStrategiesContext< IWorkerChoiceStrategy > + /** + * The active worker choice strategies in the context policy. + */ + private workerChoiceStrategiesPolicy: StrategyPolicy + + /** + * The active worker choice strategies in the context task statistics requirements. + */ + private workerChoiceStrategiesTaskStatisticsRequirements: TaskStatisticsRequirements + /** * The maximum number of worker choice strategies execution retries. */ @@ -71,6 +83,13 @@ export class WorkerChoiceStrategiesContext< for (const workerChoiceStrategy of workerChoiceStrategies) { this.addWorkerChoiceStrategy(workerChoiceStrategy, this.pool, opts) } + this.workerChoiceStrategiesPolicy = buildWorkerChoiceStrategiesPolicy( + this.workerChoiceStrategies + ) + this.workerChoiceStrategiesTaskStatisticsRequirements = + buildWorkerChoiceStrategiesTaskStatisticsRequirements( + this.workerChoiceStrategies + ) this.retriesCount = 0 this.retries = getWorkerChoiceStrategiesRetries( this.pool, @@ -84,45 +103,16 @@ export class WorkerChoiceStrategiesContext< * @returns The strategies policy. */ public getPolicy (): StrategyPolicy { - const policies: StrategyPolicy[] = [] - for (const workerChoiceStrategy of this.workerChoiceStrategies.values()) { - policies.push(workerChoiceStrategy.strategyPolicy) - } - return { - dynamicWorkerUsage: policies.some(p => p.dynamicWorkerUsage), - dynamicWorkerReady: policies.some(p => p.dynamicWorkerReady) - } + return this.workerChoiceStrategiesPolicy } /** * Gets the active worker choice strategies in the context task statistics requirements. * - * @returns The task statistics requirements. + * @returns The strategies task statistics requirements. */ public getTaskStatisticsRequirements (): TaskStatisticsRequirements { - const taskStatisticsRequirements: TaskStatisticsRequirements[] = [] - for (const workerChoiceStrategy of this.workerChoiceStrategies.values()) { - taskStatisticsRequirements.push( - workerChoiceStrategy.taskStatisticsRequirements - ) - } - return { - runTime: { - aggregate: taskStatisticsRequirements.some(r => r.runTime.aggregate), - average: taskStatisticsRequirements.some(r => r.runTime.average), - median: taskStatisticsRequirements.some(r => r.runTime.median) - }, - waitTime: { - aggregate: taskStatisticsRequirements.some(r => r.waitTime.aggregate), - average: taskStatisticsRequirements.some(r => r.waitTime.average), - median: taskStatisticsRequirements.some(r => r.waitTime.median) - }, - elu: { - aggregate: taskStatisticsRequirements.some(r => r.elu.aggregate), - average: taskStatisticsRequirements.some(r => r.elu.average), - median: taskStatisticsRequirements.some(r => r.elu.median) - } - } + return this.workerChoiceStrategiesTaskStatisticsRequirements } /** @@ -155,7 +145,7 @@ export class WorkerChoiceStrategiesContext< } /** - * Executes the worker choice strategy in the context algorithm. + * Executes the given worker choice strategy in the context algorithm. * * @param workerChoiceStrategy - The worker choice strategy algorithm to execute. @defaultValue this.defaultWorkerChoiceStrategy * @returns The key of the worker node. @@ -243,6 +233,13 @@ export class WorkerChoiceStrategiesContext< this.addWorkerChoiceStrategy(workerChoiceStrategy, this.pool, opts) } } + this.workerChoiceStrategiesPolicy = buildWorkerChoiceStrategiesPolicy( + this.workerChoiceStrategies + ) + this.workerChoiceStrategiesTaskStatisticsRequirements = + buildWorkerChoiceStrategiesTaskStatisticsRequirements( + this.workerChoiceStrategies + ) } /**