From: Jérôme Benoit Date: Fri, 18 Aug 2023 14:26:04 +0000 (+0200) Subject: feat: add worker choice strategies retry mechanism X-Git-Tag: v2.6.29~16 X-Git-Url: https://git.piment-noir.org/?a=commitdiff_plain;h=8990357d855c45cd0063f24092bb58b4163ddb0a;p=poolifier.git feat: add worker choice strategies retry mechanism Signed-off-by: Jérôme Benoit --- diff --git a/CHANGELOG.md b/CHANGELOG.md index 29ab1b08..05f5c7a9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Fix race condition between ready and task functions worker message handling at startup. - Fix duplicate task usage statistics computation per task function. +### Added + +- Add back pressure detection on the worker node queue. Event `backPressure` is emitted when the worker node queue is full (size > poolMaxSize^2). +- Use back pressure detection in worker choice strategies. +- Add worker choice strategies retries mechanism if no worker is eligible. + ## [2.6.28] - 2023-08-16 ### Fixed diff --git a/docs/api.md b/docs/api.md index 3a62d763..7b3766e7 100644 --- a/docs/api.md +++ b/docs/api.md @@ -74,13 +74,14 @@ An object with these properties: - `workerChoiceStrategyOptions` (optional) - The worker choice strategy options object to use in this pool. Properties: + - `choiceRetries` (optional) - The number of retries to perform if no worker is eligible. - `measurement` (optional) - The measurement to use in worker choice strategies: `runTime`, `waitTime` or `elu`. - `runTime` (optional) - Use the tasks [median](./../docs/worker-choice-strategies.md#median) runtime instead of the tasks average runtime in worker choice strategies. - `waitTime` (optional) - Use the tasks [median](./../docs/worker-choice-strategies.md#median) wait time instead of the tasks average wait time in worker choice strategies. - `elu` (optional) - Use the tasks [median](./../docs/worker-choice-strategies.md#median) ELU instead of the tasks average ELU in worker choice strategies. - `weights` (optional) - The worker weights to use in weighted round robin worker choice strategies: `{ 0: 200, 1: 300, ..., n: 100 }`. - Default: `{ runTime: { median: false }, waitTime: { median: false }, elu: { median: false } }` + Default: `{ choiceRetries: 6, runTime: { median: false }, waitTime: { median: false }, elu: { median: false } }` - `restartWorkerOnError` (optional) - Restart worker on uncaught error in this pool. Default: `true` diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 768114e4..64fda254 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -204,9 +204,10 @@ export abstract class AbstractPool< this.opts.workerChoiceStrategy = opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy) - this.opts.workerChoiceStrategyOptions = - opts.workerChoiceStrategyOptions ?? - DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS + this.opts.workerChoiceStrategyOptions = { + ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, + ...opts.workerChoiceStrategyOptions + } this.checkValidWorkerChoiceStrategyOptions( this.opts.workerChoiceStrategyOptions ) @@ -244,6 +245,22 @@ export abstract class AbstractPool< 'Invalid worker choice strategy options: must be a plain object' ) } + if ( + workerChoiceStrategyOptions.choiceRetries != null && + !Number.isSafeInteger(workerChoiceStrategyOptions.choiceRetries) + ) { + throw new TypeError( + 'Invalid worker choice strategy options: choice retries must be an integer' + ) + } + if ( + workerChoiceStrategyOptions.choiceRetries != null && + workerChoiceStrategyOptions.choiceRetries <= 0 + ) { + throw new RangeError( + `Invalid worker choice strategy options: choice retries '${workerChoiceStrategyOptions.choiceRetries}' must be greater than zero` + ) + } if ( workerChoiceStrategyOptions.weights != null && Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize @@ -566,7 +583,10 @@ export abstract class AbstractPool< workerChoiceStrategyOptions: WorkerChoiceStrategyOptions ): void { this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions) - this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions + this.opts.workerChoiceStrategyOptions = { + ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, + ...workerChoiceStrategyOptions + } this.workerChoiceStrategyContext.setOptions( this.opts.workerChoiceStrategyOptions ) diff --git a/src/pools/selection-strategies/abstract-worker-choice-strategy.ts b/src/pools/selection-strategies/abstract-worker-choice-strategy.ts index d4fcd8d0..e6269422 100644 --- a/src/pools/selection-strategies/abstract-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/abstract-worker-choice-strategy.ts @@ -52,6 +52,7 @@ export abstract class AbstractWorkerChoiceStrategy< protected readonly pool: IPool, protected opts: WorkerChoiceStrategyOptions = DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS ) { + this.opts = { ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, ...opts } this.choose = this.choose.bind(this) } @@ -100,7 +101,7 @@ export abstract class AbstractWorkerChoiceStrategy< /** @inheritDoc */ public setOptions (opts: WorkerChoiceStrategyOptions): void { - this.opts = opts ?? DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS + this.opts = { ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, ...opts } this.setTaskStatisticsRequirements(this.opts) } @@ -110,7 +111,7 @@ export abstract class AbstractWorkerChoiceStrategy< * @param workerNodeKey - The worker node key. * @returns Whether the worker node is ready or not. */ - protected isWorkerNodeReady (workerNodeKey: number): boolean { + private isWorkerNodeReady (workerNodeKey: number): boolean { return this.pool.workerNodes[workerNodeKey].info.ready } @@ -120,10 +121,26 @@ export abstract class AbstractWorkerChoiceStrategy< * @param workerNodeKey - The worker node key. * @returns `true` if the worker node has back pressure, `false` otherwise. */ - protected hasWorkerNodeBackPressure (workerNodeKey: number): boolean { + private hasWorkerNodeBackPressure (workerNodeKey: number): boolean { return this.pool.hasWorkerNodeBackPressure(workerNodeKey) } + /** + * Whether the worker node is eligible or not. + * A worker node is eligible if it is ready and does not have back pressure. + * + * @param workerNodeKey - The worker node key. + * @returns `true` if the worker node is eligible, `false` otherwise. + * @see {@link isWorkerNodeReady} + * @see {@link hasWorkerNodeBackPressure} + */ + protected isWorkerNodeEligible (workerNodeKey: number): boolean { + return ( + this.isWorkerNodeReady(workerNodeKey) && + !this.hasWorkerNodeBackPressure(workerNodeKey) + ) + } + /** * Gets the worker task runtime. * If the task statistics require the average runtime, the average runtime is returned. diff --git a/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts b/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts index bf30a476..36d1cd4c 100644 --- a/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts @@ -88,7 +88,7 @@ export class FairShareWorkerChoiceStrategy< const workerVirtualTaskEndTimestamp = this.workersVirtualTaskEndTimestamp[workerNodeKey] if ( - this.isWorkerNodeReady(workerNodeKey) && + this.isWorkerNodeEligible(workerNodeKey) && workerVirtualTaskEndTimestamp < minWorkerVirtualTaskEndTimestamp ) { minWorkerVirtualTaskEndTimestamp = workerVirtualTaskEndTimestamp diff --git a/src/pools/selection-strategies/interleaved-weighted-round-robin-worker-choice-strategy.ts b/src/pools/selection-strategies/interleaved-weighted-round-robin-worker-choice-strategy.ts index 26f9d909..c45d6e0b 100644 --- a/src/pools/selection-strategies/interleaved-weighted-round-robin-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/interleaved-weighted-round-robin-worker-choice-strategy.ts @@ -81,7 +81,7 @@ export class InterleavedWeightedRoundRobinWorkerChoiceStrategy< const workerWeight = this.opts.weights?.[workerNodeKey] ?? this.defaultWorkerWeight if ( - this.isWorkerNodeReady(workerNodeKey) && + this.isWorkerNodeEligible(workerNodeKey) && workerWeight >= this.roundWeights[roundIndex] ) { roundId = roundIndex diff --git a/src/pools/selection-strategies/least-busy-worker-choice-strategy.ts b/src/pools/selection-strategies/least-busy-worker-choice-strategy.ts index 9a1550e0..b3e9e6ba 100644 --- a/src/pools/selection-strategies/least-busy-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/least-busy-worker-choice-strategy.ts @@ -75,11 +75,11 @@ export class LeastBusyWorkerChoiceStrategy< const workerTime = (workerNode.usage.runTime?.aggregate ?? 0) + (workerNode.usage.waitTime?.aggregate ?? 0) - if (this.isWorkerNodeReady(workerNodeKey) && workerTime === 0) { + if (this.isWorkerNodeEligible(workerNodeKey) && workerTime === 0) { this.nextWorkerNodeKey = workerNodeKey break } else if ( - this.isWorkerNodeReady(workerNodeKey) && + this.isWorkerNodeEligible(workerNodeKey) && workerTime < minTime ) { minTime = workerTime diff --git a/src/pools/selection-strategies/least-elu-worker-choice-strategy.ts b/src/pools/selection-strategies/least-elu-worker-choice-strategy.ts index 7c837cef..03e6ca04 100644 --- a/src/pools/selection-strategies/least-elu-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/least-elu-worker-choice-strategy.ts @@ -70,11 +70,11 @@ export class LeastEluWorkerChoiceStrategy< for (const [workerNodeKey, workerNode] of this.pool.workerNodes.entries()) { const workerUsage = workerNode.usage const workerElu = workerUsage.elu?.active?.aggregate ?? 0 - if (this.isWorkerNodeReady(workerNodeKey) && workerElu === 0) { + if (this.isWorkerNodeEligible(workerNodeKey) && workerElu === 0) { this.nextWorkerNodeKey = workerNodeKey break } else if ( - this.isWorkerNodeReady(workerNodeKey) && + this.isWorkerNodeEligible(workerNodeKey) && workerElu < minWorkerElu ) { minWorkerElu = workerElu diff --git a/src/pools/selection-strategies/least-used-worker-choice-strategy.ts b/src/pools/selection-strategies/least-used-worker-choice-strategy.ts index e8a7218e..e72efda2 100644 --- a/src/pools/selection-strategies/least-used-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/least-used-worker-choice-strategy.ts @@ -58,11 +58,11 @@ export class LeastUsedWorkerChoiceStrategy< workerTaskStatistics.executed + workerTaskStatistics.executing + workerTaskStatistics.queued - if (this.isWorkerNodeReady(workerNodeKey) && workerTasks === 0) { + if (this.isWorkerNodeEligible(workerNodeKey) && workerTasks === 0) { this.nextWorkerNodeKey = workerNodeKey break } else if ( - this.isWorkerNodeReady(workerNodeKey) && + this.isWorkerNodeEligible(workerNodeKey) && workerTasks < minNumberOfTasks ) { minNumberOfTasks = workerTasks diff --git a/src/pools/selection-strategies/round-robin-worker-choice-strategy.ts b/src/pools/selection-strategies/round-robin-worker-choice-strategy.ts index b023234a..55fa8ad7 100644 --- a/src/pools/selection-strategies/round-robin-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/round-robin-worker-choice-strategy.ts @@ -52,7 +52,7 @@ export class RoundRobinWorkerChoiceStrategy< const chosenWorkerNodeKey = this.nextWorkerNodeKey do { this.roundRobinNextWorkerNodeKey() - } while (!this.isWorkerNodeReady(this.nextWorkerNodeKey)) + } while (!this.isWorkerNodeEligible(this.nextWorkerNodeKey)) return chosenWorkerNodeKey } diff --git a/src/pools/selection-strategies/selection-strategies-types.ts b/src/pools/selection-strategies/selection-strategies-types.ts index 563b04b0..58f36328 100644 --- a/src/pools/selection-strategies/selection-strategies-types.ts +++ b/src/pools/selection-strategies/selection-strategies-types.ts @@ -70,7 +70,13 @@ export interface MeasurementOptions { */ export interface WorkerChoiceStrategyOptions { /** - * Measurement to use for worker choice strategy. + * Number of worker choice retries to perform if no worker is eligible. + * + * @defaultValue 6 + */ + readonly choiceRetries?: number + /** + * Measurement to use in worker choice strategy supporting it. */ readonly measurement?: Measurement /** diff --git a/src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts b/src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts index fecba414..b5c566d5 100644 --- a/src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts @@ -79,7 +79,7 @@ export class WeightedRoundRobinWorkerChoiceStrategy< const chosenWorkerNodeKey = this.nextWorkerNodeKey do { this.weightedRoundRobinNextWorkerNodeKey() - } while (!this.isWorkerNodeReady(this.nextWorkerNodeKey)) + } while (!this.isWorkerNodeEligible(this.nextWorkerNodeKey)) return chosenWorkerNodeKey } diff --git a/src/pools/selection-strategies/worker-choice-strategy-context.ts b/src/pools/selection-strategies/worker-choice-strategy-context.ts index 122774dd..8c5a9587 100644 --- a/src/pools/selection-strategies/worker-choice-strategy-context.ts +++ b/src/pools/selection-strategies/worker-choice-strategy-context.ts @@ -34,6 +34,11 @@ export class WorkerChoiceStrategyContext< IWorkerChoiceStrategy > + /** + * The number of times the worker choice strategy in the context has been retried. + */ + private choiceRetriesCount = 0 + /** * Worker choice strategy context constructor. * @@ -44,8 +49,9 @@ export class WorkerChoiceStrategyContext< public constructor ( pool: IPool, private workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN, - opts: WorkerChoiceStrategyOptions = DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS + private opts: WorkerChoiceStrategyOptions = DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS ) { + this.opts = { ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, ...opts } this.execute = this.execute.bind(this) this.workerChoiceStrategies = new Map< WorkerChoiceStrategy, @@ -119,7 +125,7 @@ export class WorkerChoiceStrategyContext< } /** - * Gets the worker choice strategy task statistics requirements in the context. + * Gets the worker choice strategy in the context task statistics requirements. * * @returns The task statistics requirements. */ @@ -146,7 +152,7 @@ export class WorkerChoiceStrategyContext< } /** - * Updates the worker node key in the worker choice strategy internals in the context. + * Updates the worker node key in the worker choice strategy in the context internals. * * @returns `true` if the update is successful, `false` otherwise. */ @@ -159,7 +165,7 @@ export class WorkerChoiceStrategyContext< } /** - * Executes the worker choice strategy algorithm in the context. + * Executes the worker choice strategy in the context algorithm. * * @returns The key of the worker node. * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker node key is null or undefined. @@ -170,7 +176,13 @@ export class WorkerChoiceStrategyContext< this.workerChoiceStrategy ) as IWorkerChoiceStrategy ).choose() - if (workerNodeKey == null) { + if ( + workerNodeKey == null && + this.choiceRetriesCount < (this.opts.choiceRetries as number) + ) { + this.choiceRetriesCount++ + return this.execute() + } else if (workerNodeKey == null) { throw new TypeError('Worker node key chosen is null or undefined') } return workerNodeKey @@ -196,6 +208,7 @@ export class WorkerChoiceStrategyContext< * @param opts - The worker choice strategy options. */ public setOptions (opts: WorkerChoiceStrategyOptions): void { + this.opts = { ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, ...opts } for (const workerChoiceStrategy of this.workerChoiceStrategies.values()) { workerChoiceStrategy.setOptions(opts) } diff --git a/src/utils.ts b/src/utils.ts index 45e4fd23..c92edc91 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -23,6 +23,7 @@ export const EMPTY_FUNCTION: () => void = Object.freeze(() => { */ export const DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS: WorkerChoiceStrategyOptions = { + choiceRetries: 6, runTime: { median: false }, waitTime: { median: false }, elu: { median: false } @@ -57,6 +58,22 @@ export const availableParallelism = (): number => { return availableParallelism } +/** + * Computes the retry delay in milliseconds using an exponential back off algorithm. + * + * @param retryNumber - The number of retries that have already been attempted + * @param maxDelayRatio - The maximum ratio of the delay that can be randomized + * @returns Delay in milliseconds + */ +export const exponentialDelay = ( + retryNumber = 0, + maxDelayRatio = 0.2 +): number => { + const delay = Math.pow(2, retryNumber) * 100 + const randomSum = delay * maxDelayRatio * Math.random() // 0-(maxDelayRatio*100)% of the delay + return delay + randomSum +} + /** * Computes the median of the given data set. * diff --git a/tests/pools/abstract/abstract-pool.test.js b/tests/pools/abstract/abstract-pool.test.js index 5c367aa2..a395782c 100644 --- a/tests/pools/abstract/abstract-pool.test.js +++ b/tests/pools/abstract/abstract-pool.test.js @@ -167,6 +167,13 @@ describe('Abstract pool test suite', () => { WorkerChoiceStrategies.ROUND_ROBIN ) expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({ + choiceRetries: 6, + runTime: { median: false }, + waitTime: { median: false }, + elu: { median: false } + }) + expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({ + choiceRetries: 6, runTime: { median: false }, waitTime: { median: false }, elu: { median: false } @@ -205,7 +212,17 @@ describe('Abstract pool test suite', () => { WorkerChoiceStrategies.LEAST_USED ) expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({ + choiceRetries: 6, runTime: { median: true }, + waitTime: { median: false }, + elu: { median: false }, + weights: { 0: 300, 1: 200 } + }) + expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({ + choiceRetries: 6, + runTime: { median: true }, + waitTime: { median: false }, + elu: { median: false }, weights: { 0: 300, 1: 200 } }) expect(pool.opts.messageHandler).toStrictEqual(testHandler) @@ -226,18 +243,6 @@ describe('Abstract pool test suite', () => { } ) ).toThrowError("Invalid worker choice strategy 'invalidStrategy'") - expect( - () => - new FixedThreadPool( - numberOfWorkers, - './tests/worker-files/thread/testWorker.js', - { - workerChoiceStrategyOptions: 'invalidOptions' - } - ) - ).toThrowError( - 'Invalid worker choice strategy options: must be a plain object' - ) expect( () => new FixedThreadPool( @@ -304,6 +309,13 @@ describe('Abstract pool test suite', () => { { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE } ) expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({ + choiceRetries: 6, + runTime: { median: false }, + waitTime: { median: false }, + elu: { median: false } + }) + expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({ + choiceRetries: 6, runTime: { median: false }, waitTime: { median: false }, elu: { median: false } @@ -311,6 +323,7 @@ describe('Abstract pool test suite', () => { for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext .workerChoiceStrategies) { expect(workerChoiceStrategy.opts).toStrictEqual({ + choiceRetries: 6, runTime: { median: false }, waitTime: { median: false }, elu: { median: false } @@ -340,13 +353,23 @@ describe('Abstract pool test suite', () => { elu: { median: true } }) expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({ + choiceRetries: 6, + runTime: { median: true }, + waitTime: { median: false }, + elu: { median: true } + }) + expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({ + choiceRetries: 6, runTime: { median: true }, + waitTime: { median: false }, elu: { median: true } }) for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext .workerChoiceStrategies) { expect(workerChoiceStrategy.opts).toStrictEqual({ + choiceRetries: 6, runTime: { median: true }, + waitTime: { median: false }, elu: { median: true } }) } @@ -374,13 +397,23 @@ describe('Abstract pool test suite', () => { elu: { median: false } }) expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({ + choiceRetries: 6, runTime: { median: false }, + waitTime: { median: false }, + elu: { median: false } + }) + expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({ + choiceRetries: 6, + runTime: { median: false }, + waitTime: { median: false }, elu: { median: false } }) for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext .workerChoiceStrategies) { expect(workerChoiceStrategy.opts).toStrictEqual({ + choiceRetries: 6, runTime: { median: false }, + waitTime: { median: false }, elu: { median: false } }) }