From 26ce26ca8861318068427cc86697103e7a3ddbf4 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Tue, 19 Dec 2023 22:22:17 +0100 Subject: [PATCH] fix: ensure worker choice is retried at least the pool max size MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- docs/api.md | 3 +- src/pools/abstract-pool.ts | 119 ++++++++---------- src/pools/cluster/dynamic.ts | 9 +- src/pools/cluster/fixed.ts | 5 +- .../abstract-worker-choice-strategy.ts | 19 +-- .../fair-share-worker-choice-strategy.ts | 11 +- ...hted-round-robin-worker-choice-strategy.ts | 13 +- .../least-busy-worker-choice-strategy.ts | 11 +- .../least-elu-worker-choice-strategy.ts | 11 +- .../least-used-worker-choice-strategy.ts | 6 +- .../round-robin-worker-choice-strategy.ts | 6 +- .../selection-strategies-types.ts | 21 +++- ...hted-round-robin-worker-choice-strategy.ts | 11 +- .../worker-choice-strategy-context.ts | 41 +++--- src/pools/thread/dynamic.ts | 9 +- src/pools/thread/fixed.ts | 5 +- src/utils.ts | 16 ++- tests/pools/abstract-pool.test.mjs | 104 +++++---------- .../selection-strategies.test.mjs | 20 +-- .../worker-choice-strategy-context.test.mjs | 8 +- tests/utils.test.mjs | 11 +- 21 files changed, 203 insertions(+), 256 deletions(-) diff --git a/docs/api.md b/docs/api.md index 65d96e57..bd5ace7a 100644 --- a/docs/api.md +++ b/docs/api.md @@ -111,14 +111,13 @@ An object with these properties: - `workerChoiceStrategyOptions` (optional) - The worker choice strategy options object to use in this pool. Properties: - - `retries` (optional) - The number of retries to perform if no worker is eligible. - `measurement` (optional) - The measurement to use in worker choice strategies: `runTime`, `waitTime` or `elu`. - `runTime` (optional) - Use the tasks [simple moving median](./worker-choice-strategies.md#simple-moving-median) runtime instead of the tasks simple moving average runtime in worker choice strategies. - `waitTime` (optional) - Use the tasks [simple moving median](./worker-choice-strategies.md#simple-moving-median) wait time instead of the tasks simple moving average wait time in worker choice strategies. - `elu` (optional) - Use the tasks [simple moving median](./worker-choice-strategies.md#simple-moving-median) ELU instead of the tasks simple moving average ELU in worker choice strategies. - `weights` (optional) - The worker weights to use in weighted round robin worker choice strategies: `{ 0: 200, 1: 300, ..., n: 100 }`. - Default: `{ retries: 6, runTime: { median: false }, waitTime: { median: false }, elu: { median: false } }` + Default: `{ runTime: { median: false }, waitTime: { median: false }, elu: { median: false } }` - `startWorkers` (optional) - Start the minimum number of workers at pool initialization. Default: `true` diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 11cc9cc1..ebc7b5d9 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -10,7 +10,6 @@ import type { } from '../utility-types' import { DEFAULT_TASK_NAME, - DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, EMPTY_FUNCTION, average, exponentialDelay, @@ -81,11 +80,6 @@ export abstract class AbstractPool< /** @inheritDoc */ public emitter?: EventEmitterAsyncResource - /** - * Dynamic pool maximum size property placeholder. - */ - protected readonly max?: number - /** * The task execution response promise map: * - `key`: The message id of each submitted task. @@ -136,22 +130,25 @@ export abstract class AbstractPool< /** * Constructs a new poolifier pool. * - * @param numberOfWorkers - Number of workers that this pool should manage. + * @param minimumNumberOfWorkers - Minimum number of workers that this pool should manage. + * @param maximumNumberOfWorkers - Maximum number of workers that this pool should manage. * @param filePath - Path to the worker file. * @param opts - Options for the pool. */ public constructor ( - protected readonly numberOfWorkers: number, + protected readonly minimumNumberOfWorkers: number, protected readonly filePath: string, - protected readonly opts: PoolOptions + protected readonly opts: PoolOptions, + protected readonly maximumNumberOfWorkers?: number ) { if (!this.isMain()) { throw new Error( 'Cannot start a pool from a worker with the same type as the pool' ) } + this.checkPoolType() checkFilePath(this.filePath) - this.checkNumberOfWorkers(this.numberOfWorkers) + this.checkMinimumNumberOfWorkers(this.minimumNumberOfWorkers) this.checkPoolOptions(this.opts) this.chooseWorkerNode = this.chooseWorkerNode.bind(this) @@ -186,7 +183,15 @@ export abstract class AbstractPool< this.startTimestamp = performance.now() } - private checkNumberOfWorkers (numberOfWorkers: number): void { + private checkPoolType (): void { + if (this.type === PoolTypes.fixed && this.maximumNumberOfWorkers != null) { + throw new Error( + 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization' + ) + } + } + + private checkMinimumNumberOfWorkers (numberOfWorkers: number): void { if (numberOfWorkers == null) { throw new Error( 'Cannot instantiate a pool without specifying the number of workers' @@ -215,9 +220,8 @@ export abstract class AbstractPool< this.checkValidWorkerChoiceStrategyOptions( opts.workerChoiceStrategyOptions as WorkerChoiceStrategyOptions ) - this.opts.workerChoiceStrategyOptions = { - ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, - ...opts.workerChoiceStrategyOptions + if (opts.workerChoiceStrategyOptions != null) { + this.opts.workerChoiceStrategyOptions = opts.workerChoiceStrategyOptions } this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true this.opts.enableEvents = opts.enableEvents ?? true @@ -244,25 +248,10 @@ export abstract class AbstractPool< 'Invalid worker choice strategy options: must be a plain object' ) } - if ( - workerChoiceStrategyOptions?.retries != null && - !Number.isSafeInteger(workerChoiceStrategyOptions.retries) - ) { - throw new TypeError( - 'Invalid worker choice strategy options: retries must be an integer' - ) - } - if ( - workerChoiceStrategyOptions?.retries != null && - workerChoiceStrategyOptions.retries < 0 - ) { - throw new RangeError( - `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero` - ) - } if ( workerChoiceStrategyOptions?.weights != null && - Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize + Object.keys(workerChoiceStrategyOptions.weights).length !== + (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers) ) { throw new Error( 'Invalid worker choice strategy options: must have a weight for each worker node' @@ -295,11 +284,11 @@ export abstract class AbstractPool< started: this.started, ready: this.ready, strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy, - minSize: this.minSize, - maxSize: this.maxSize, - ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements() + minSize: this.minimumNumberOfWorkers, + maxSize: this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers, + ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements() .runTime.aggregate && - this.workerChoiceStrategyContext.getTaskStatisticsRequirements() + this.workerChoiceStrategyContext?.getTaskStatisticsRequirements() .waitTime.aggregate && { utilization: round(this.utilization) }), workerNodes: this.workerNodes.length, idleWorkerNodes: this.workerNodes.reduce( @@ -353,7 +342,7 @@ export abstract class AbstractPool< accumulator + workerNode.usage.tasks.failed, 0 ), - ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements() + ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements() .runTime.aggregate && { runTime: { minimum: round( @@ -370,7 +359,7 @@ export abstract class AbstractPool< ) ) ), - ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements() + ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements() .runTime.average && { average: round( average( @@ -382,7 +371,7 @@ export abstract class AbstractPool< ) ) }), - ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements() + ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements() .runTime.median && { median: round( median( @@ -396,7 +385,7 @@ export abstract class AbstractPool< }) } }), - ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements() + ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements() .waitTime.aggregate && { waitTime: { minimum: round( @@ -413,7 +402,7 @@ export abstract class AbstractPool< ) ) ), - ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements() + ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements() .waitTime.average && { average: round( average( @@ -425,7 +414,7 @@ export abstract class AbstractPool< ) ) }), - ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements() + ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements() .waitTime.median && { median: round( median( @@ -453,7 +442,7 @@ export abstract class AbstractPool< ? accumulator + 1 : accumulator, 0 - ) >= this.minSize + ) >= this.minimumNumberOfWorkers ) } @@ -464,7 +453,8 @@ export abstract class AbstractPool< */ private get utilization (): number { const poolTimeCapacity = - (performance.now() - this.startTimestamp) * this.maxSize + (performance.now() - this.startTimestamp) * + (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers) const totalTasksRunTime = this.workerNodes.reduce( (accumulator, workerNode) => accumulator + (workerNode.usage.runTime?.aggregate ?? 0), @@ -490,20 +480,6 @@ export abstract class AbstractPool< */ protected abstract get worker (): WorkerType - /** - * The pool minimum size. - */ - protected get minSize (): number { - return this.numberOfWorkers - } - - /** - * The pool maximum size. - */ - protected get maxSize (): number { - return this.max ?? this.numberOfWorkers - } - /** * Checks if the worker id sent in the received message from a worker is valid. * @@ -556,11 +532,11 @@ export abstract class AbstractPool< workerChoiceStrategyOptions: WorkerChoiceStrategyOptions ): void { this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions) - this.opts.workerChoiceStrategyOptions = { - ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, - ...workerChoiceStrategyOptions + if (workerChoiceStrategyOptions != null) { + this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions } this.workerChoiceStrategyContext.setOptions( + this, this.opts.workerChoiceStrategyOptions ) } @@ -607,7 +583,9 @@ export abstract class AbstractPool< tasksQueueOptions: TasksQueueOptions ): TasksQueueOptions { return { - ...getDefaultTasksQueueOptions(this.maxSize), + ...getDefaultTasksQueueOptions( + this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers + ), ...tasksQueueOptions } } @@ -660,7 +638,10 @@ export abstract class AbstractPool< * The pool filling boolean status. */ protected get full (): boolean { - return this.workerNodes.length >= this.maxSize + return ( + this.workerNodes.length >= + (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers) + ) } /** @@ -968,7 +949,7 @@ export abstract class AbstractPool< (accumulator, workerNode) => !workerNode.info.dynamic ? accumulator + 1 : accumulator, 0 - ) < this.numberOfWorkers + ) < this.minimumNumberOfWorkers ) { this.createAndSetupWorkerNode() } @@ -1001,9 +982,7 @@ export abstract class AbstractPool< this.started = false } - protected async sendKillMessageToWorker ( - workerNodeKey: number - ): Promise { + private async sendKillMessageToWorker (workerNodeKey: number): Promise { await new Promise((resolve, reject) => { if (workerNodeKey < 0 || workerNodeKey >= this.workerNodes.length) { reject(new Error(`Invalid worker node key '${workerNodeKey}'`)) @@ -1043,7 +1022,9 @@ export abstract class AbstractPool< 'taskFinished', flushedTasks, this.opts.tasksQueueOptions?.tasksFinishedTimeout ?? - getDefaultTasksQueueOptions(this.maxSize).tasksFinishedTimeout + getDefaultTasksQueueOptions( + this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers + ).tasksFinishedTimeout ) await this.sendKillMessageToWorker(workerNodeKey) await workerNode.terminate() @@ -1780,7 +1761,9 @@ export abstract class AbstractPool< workerOptions: this.opts.workerOptions, tasksQueueBackPressureSize: this.opts.tasksQueueOptions?.size ?? - getDefaultTasksQueueOptions(this.maxSize).size + getDefaultTasksQueueOptions( + this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers + ).size } ) // Flag the worker node as ready at pool startup. diff --git a/src/pools/cluster/dynamic.ts b/src/pools/cluster/dynamic.ts index 32aad22a..9f2f0dd9 100644 --- a/src/pools/cluster/dynamic.ts +++ b/src/pools/cluster/dynamic.ts @@ -28,12 +28,15 @@ export class DynamicClusterPool< */ public constructor ( min: number, - protected readonly max: number, + max: number, filePath: string, opts: PoolOptions = {} ) { - super(min, filePath, opts) - checkDynamicPoolSize(this.numberOfWorkers, this.max) + super(min, filePath, opts, max) + checkDynamicPoolSize( + this.minimumNumberOfWorkers, + this.maximumNumberOfWorkers as number + ) } /** @inheritDoc */ diff --git a/src/pools/cluster/fixed.ts b/src/pools/cluster/fixed.ts index 46cabdcb..d6481be7 100644 --- a/src/pools/cluster/fixed.ts +++ b/src/pools/cluster/fixed.ts @@ -26,9 +26,10 @@ export class FixedClusterPool< public constructor ( numberOfWorkers: number, filePath: string, - protected readonly opts: PoolOptions = {} + opts: PoolOptions = {}, + maximumNumberOfWorkers?: number ) { - super(numberOfWorkers, filePath, opts) + super(numberOfWorkers, filePath, opts, maximumNumberOfWorkers) } /** @inheritDoc */ diff --git a/src/pools/selection-strategies/abstract-worker-choice-strategy.ts b/src/pools/selection-strategies/abstract-worker-choice-strategy.ts index f1d7dcf4..34473ab4 100644 --- a/src/pools/selection-strategies/abstract-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/abstract-worker-choice-strategy.ts @@ -1,16 +1,16 @@ import { cpus } from 'node:os' import { DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS, - DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS + getDefaultInternalWorkerChoiceStrategyOptions } from '../../utils' import type { IPool } from '../pool' import type { IWorker } from '../worker' import type { IWorkerChoiceStrategy, + InternalWorkerChoiceStrategyOptions, MeasurementStatisticsRequirements, StrategyPolicy, - TaskStatisticsRequirements, - WorkerChoiceStrategyOptions + TaskStatisticsRequirements } from './selection-strategies-types' /** @@ -56,14 +56,14 @@ export abstract class AbstractWorkerChoiceStrategy< */ public constructor ( protected readonly pool: IPool, - protected opts: WorkerChoiceStrategyOptions = DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS + protected opts: InternalWorkerChoiceStrategyOptions ) { - this.opts = { ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, ...opts } + this.setOptions(this.opts) this.choose = this.choose.bind(this) } protected setTaskStatisticsRequirements ( - opts: WorkerChoiceStrategyOptions + opts: InternalWorkerChoiceStrategyOptions ): void { this.toggleMedianMeasurementStatisticsRequirements( this.taskStatisticsRequirements.runTime, @@ -111,8 +111,11 @@ export abstract class AbstractWorkerChoiceStrategy< public abstract remove (workerNodeKey: number): boolean /** @inheritDoc */ - public setOptions (opts: WorkerChoiceStrategyOptions): void { - this.opts = { ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, ...opts } + public setOptions (opts: InternalWorkerChoiceStrategyOptions): void { + this.opts = { + ...getDefaultInternalWorkerChoiceStrategyOptions(this.pool.info.maxSize), + ...opts + } this.setTaskStatisticsRequirements(this.opts) } diff --git a/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts b/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts index f47a1e6e..aff4e73d 100644 --- a/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts @@ -1,15 +1,12 @@ -import { - DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS, - DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS -} from '../../utils' +import { DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS } from '../../utils' import type { IPool } from '../pool' import type { IWorker, StrategyData } from '../worker' import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy' import { type IWorkerChoiceStrategy, + type InternalWorkerChoiceStrategyOptions, Measurements, - type TaskStatisticsRequirements, - type WorkerChoiceStrategyOptions + type TaskStatisticsRequirements } from './selection-strategies-types' /** @@ -45,7 +42,7 @@ export class FairShareWorkerChoiceStrategy< /** @inheritDoc */ public constructor ( pool: IPool, - opts: WorkerChoiceStrategyOptions = DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS + opts: InternalWorkerChoiceStrategyOptions ) { super(pool, opts) this.setTaskStatisticsRequirements(this.opts) diff --git a/src/pools/selection-strategies/interleaved-weighted-round-robin-worker-choice-strategy.ts b/src/pools/selection-strategies/interleaved-weighted-round-robin-worker-choice-strategy.ts index eee39ec8..8ccb5f78 100644 --- a/src/pools/selection-strategies/interleaved-weighted-round-robin-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/interleaved-weighted-round-robin-worker-choice-strategy.ts @@ -1,14 +1,11 @@ import type { IWorker } from '../worker' import type { IPool } from '../pool' -import { - DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS, - DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS -} from '../../utils' +import { DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS } from '../../utils' import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy' import type { IWorkerChoiceStrategy, - TaskStatisticsRequirements, - WorkerChoiceStrategyOptions + InternalWorkerChoiceStrategyOptions, + TaskStatisticsRequirements } from './selection-strategies-types' /** @@ -60,7 +57,7 @@ export class InterleavedWeightedRoundRobinWorkerChoiceStrategy< /** @inheritDoc */ public constructor ( pool: IPool, - opts: WorkerChoiceStrategyOptions = DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS + opts: InternalWorkerChoiceStrategyOptions ) { super(pool, opts) this.setTaskStatisticsRequirements(this.opts) @@ -157,7 +154,7 @@ export class InterleavedWeightedRoundRobinWorkerChoiceStrategy< } /** @inheritDoc */ - public setOptions (opts: WorkerChoiceStrategyOptions): void { + public setOptions (opts: InternalWorkerChoiceStrategyOptions): void { super.setOptions(opts) this.roundWeights = this.getRoundWeights() } diff --git a/src/pools/selection-strategies/least-busy-worker-choice-strategy.ts b/src/pools/selection-strategies/least-busy-worker-choice-strategy.ts index 81cadde1..a37f91d4 100644 --- a/src/pools/selection-strategies/least-busy-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/least-busy-worker-choice-strategy.ts @@ -1,14 +1,11 @@ -import { - DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS, - DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS -} from '../../utils' +import { DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS } from '../../utils' import type { IPool } from '../pool' import type { IWorker } from '../worker' import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy' import type { IWorkerChoiceStrategy, - TaskStatisticsRequirements, - WorkerChoiceStrategyOptions + InternalWorkerChoiceStrategyOptions, + TaskStatisticsRequirements } from './selection-strategies-types' /** @@ -43,7 +40,7 @@ export class LeastBusyWorkerChoiceStrategy< /** @inheritDoc */ public constructor ( pool: IPool, - opts: WorkerChoiceStrategyOptions = DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS + opts: InternalWorkerChoiceStrategyOptions ) { super(pool, opts) this.setTaskStatisticsRequirements(this.opts) diff --git a/src/pools/selection-strategies/least-elu-worker-choice-strategy.ts b/src/pools/selection-strategies/least-elu-worker-choice-strategy.ts index 2ad4e90d..d81f2064 100644 --- a/src/pools/selection-strategies/least-elu-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/least-elu-worker-choice-strategy.ts @@ -1,14 +1,11 @@ -import { - DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS, - DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS -} from '../../utils' +import { DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS } from '../../utils' import type { IPool } from '../pool' import type { IWorker } from '../worker' import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy' import type { IWorkerChoiceStrategy, - TaskStatisticsRequirements, - WorkerChoiceStrategyOptions + InternalWorkerChoiceStrategyOptions, + TaskStatisticsRequirements } from './selection-strategies-types' /** @@ -39,7 +36,7 @@ export class LeastEluWorkerChoiceStrategy< /** @inheritDoc */ public constructor ( pool: IPool, - opts: WorkerChoiceStrategyOptions = DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS + opts: InternalWorkerChoiceStrategyOptions ) { super(pool, opts) this.setTaskStatisticsRequirements(this.opts) diff --git a/src/pools/selection-strategies/least-used-worker-choice-strategy.ts b/src/pools/selection-strategies/least-used-worker-choice-strategy.ts index 1bd8d059..6318affe 100644 --- a/src/pools/selection-strategies/least-used-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/least-used-worker-choice-strategy.ts @@ -1,10 +1,9 @@ -import { DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS } from '../../utils' import type { IPool } from '../pool' import type { IWorker } from '../worker' import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy' import type { IWorkerChoiceStrategy, - WorkerChoiceStrategyOptions + InternalWorkerChoiceStrategyOptions } from './selection-strategies-types' /** @@ -24,10 +23,9 @@ export class LeastUsedWorkerChoiceStrategy< /** @inheritDoc */ public constructor ( pool: IPool, - opts: WorkerChoiceStrategyOptions = DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS + opts: InternalWorkerChoiceStrategyOptions ) { super(pool, opts) - this.setTaskStatisticsRequirements(this.opts) } /** @inheritDoc */ diff --git a/src/pools/selection-strategies/round-robin-worker-choice-strategy.ts b/src/pools/selection-strategies/round-robin-worker-choice-strategy.ts index 7c49cec7..2d08cff2 100644 --- a/src/pools/selection-strategies/round-robin-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/round-robin-worker-choice-strategy.ts @@ -1,10 +1,9 @@ -import { DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS } from '../../utils' import type { IPool } from '../pool' import type { IWorker } from '../worker' import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy' import type { IWorkerChoiceStrategy, - WorkerChoiceStrategyOptions + InternalWorkerChoiceStrategyOptions } from './selection-strategies-types' /** @@ -24,10 +23,9 @@ export class RoundRobinWorkerChoiceStrategy< /** @inheritDoc */ public constructor ( pool: IPool, - opts: WorkerChoiceStrategyOptions = DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS + opts: InternalWorkerChoiceStrategyOptions ) { super(pool, opts) - this.setTaskStatisticsRequirements(this.opts) } /** @inheritDoc */ diff --git a/src/pools/selection-strategies/selection-strategies-types.ts b/src/pools/selection-strategies/selection-strategies-types.ts index 6990e65f..59b6a84f 100644 --- a/src/pools/selection-strategies/selection-strategies-types.ts +++ b/src/pools/selection-strategies/selection-strategies-types.ts @@ -67,12 +67,6 @@ export interface MeasurementOptions { * Worker choice strategy options. */ export interface WorkerChoiceStrategyOptions { - /** - * Number of worker choice retries to perform if no worker is eligible. - * - * @defaultValue 6 - */ - readonly retries?: number /** * Measurement to use in worker choice strategy supporting it. */ @@ -104,6 +98,21 @@ export interface WorkerChoiceStrategyOptions { readonly weights?: Record } +/** + * Worker choice strategy internal options. + * + * @internal + */ +export interface InternalWorkerChoiceStrategyOptions + extends WorkerChoiceStrategyOptions { + /** + * Number of worker choice retries to perform if no worker is eligible. + * + * @defaultValue pool maximum size + */ + readonly retries?: number +} + /** * Measurement statistics requirements. * diff --git a/src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts b/src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts index 5148494e..fea72350 100644 --- a/src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts @@ -1,14 +1,11 @@ import type { IWorker } from '../worker' import type { IPool } from '../pool' -import { - DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS, - DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS -} from '../../utils' +import { DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS } from '../../utils' import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy' import type { IWorkerChoiceStrategy, - TaskStatisticsRequirements, - WorkerChoiceStrategyOptions + InternalWorkerChoiceStrategyOptions, + TaskStatisticsRequirements } from './selection-strategies-types' /** @@ -49,7 +46,7 @@ export class WeightedRoundRobinWorkerChoiceStrategy< /** @inheritDoc */ public constructor ( pool: IPool, - opts: WorkerChoiceStrategyOptions = DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS + opts: InternalWorkerChoiceStrategyOptions ) { super(pool, opts) this.setTaskStatisticsRequirements(this.opts) diff --git a/src/pools/selection-strategies/worker-choice-strategy-context.ts b/src/pools/selection-strategies/worker-choice-strategy-context.ts index 14e734fd..50c3f9c8 100644 --- a/src/pools/selection-strategies/worker-choice-strategy-context.ts +++ b/src/pools/selection-strategies/worker-choice-strategy-context.ts @@ -1,4 +1,4 @@ -import { DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS } from '../../utils' +import { getDefaultInternalWorkerChoiceStrategyOptions } from '../../utils' import type { IPool } from '../pool' import type { IWorker } from '../worker' import { FairShareWorkerChoiceStrategy } from './fair-share-worker-choice-strategy' @@ -9,10 +9,10 @@ import { LeastEluWorkerChoiceStrategy } from './least-elu-worker-choice-strategy import { RoundRobinWorkerChoiceStrategy } from './round-robin-worker-choice-strategy' import type { IWorkerChoiceStrategy, + InternalWorkerChoiceStrategyOptions, StrategyPolicy, TaskStatisticsRequirements, - WorkerChoiceStrategy, - WorkerChoiceStrategyOptions + WorkerChoiceStrategy } from './selection-strategies-types' import { WorkerChoiceStrategies } from './selection-strategies-types' import { WeightedRoundRobinWorkerChoiceStrategy } from './weighted-round-robin-worker-choice-strategy' @@ -44,9 +44,12 @@ export class WorkerChoiceStrategyContext< public constructor ( pool: IPool, private workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN, - private opts: WorkerChoiceStrategyOptions = DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS + private opts?: InternalWorkerChoiceStrategyOptions ) { - this.opts = { ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, ...opts } + this.opts = { + ...getDefaultInternalWorkerChoiceStrategyOptions(pool.info.maxSize), + ...this.opts + } this.execute = this.execute.bind(this) this.workerChoiceStrategies = new Map< WorkerChoiceStrategy, @@ -56,35 +59,35 @@ export class WorkerChoiceStrategyContext< WorkerChoiceStrategies.ROUND_ROBIN, new (RoundRobinWorkerChoiceStrategy.bind(this))( pool, - opts + this.opts ) ], [ WorkerChoiceStrategies.LEAST_USED, new (LeastUsedWorkerChoiceStrategy.bind(this))( pool, - opts + this.opts ) ], [ WorkerChoiceStrategies.LEAST_BUSY, new (LeastBusyWorkerChoiceStrategy.bind(this))( pool, - opts + this.opts ) ], [ WorkerChoiceStrategies.LEAST_ELU, new (LeastEluWorkerChoiceStrategy.bind(this))( pool, - opts + this.opts ) ], [ WorkerChoiceStrategies.FAIR_SHARE, new (FairShareWorkerChoiceStrategy.bind(this))( pool, - opts + this.opts ) ], [ @@ -93,7 +96,7 @@ export class WorkerChoiceStrategyContext< Worker, Data, Response - >(pool, opts) + >(pool, this.opts) ], [ WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN, @@ -101,7 +104,7 @@ export class WorkerChoiceStrategyContext< Worker, Data, Response - >(pool, opts) + >(pool, this.opts) ] ]) } @@ -194,7 +197,7 @@ export class WorkerChoiceStrategyContext< chooseCount++ } while ( workerNodeKey == null && - retriesCount < (this.opts.retries as number) + retriesCount < (this.opts?.retries as number) ) if (workerNodeKey == null) { throw new Error( @@ -223,10 +226,16 @@ export class WorkerChoiceStrategyContext< * * @param opts - The worker choice strategy options. */ - public setOptions (opts: WorkerChoiceStrategyOptions): void { - this.opts = { ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, ...opts } + public setOptions ( + pool: IPool, + opts?: InternalWorkerChoiceStrategyOptions + ): void { + this.opts = { + ...getDefaultInternalWorkerChoiceStrategyOptions(pool.info.maxSize), + ...opts + } for (const workerChoiceStrategy of this.workerChoiceStrategies.values()) { - workerChoiceStrategy.setOptions(opts) + workerChoiceStrategy.setOptions(this.opts) } } } diff --git a/src/pools/thread/dynamic.ts b/src/pools/thread/dynamic.ts index 6def273e..b0a22346 100644 --- a/src/pools/thread/dynamic.ts +++ b/src/pools/thread/dynamic.ts @@ -28,12 +28,15 @@ export class DynamicThreadPool< */ public constructor ( min: number, - protected readonly max: number, + max: number, filePath: string, opts: PoolOptions = {} ) { - super(min, filePath, opts) - checkDynamicPoolSize(this.numberOfWorkers, this.max) + super(min, filePath, opts, max) + checkDynamicPoolSize( + this.minimumNumberOfWorkers, + this.maximumNumberOfWorkers as number + ) } /** @inheritDoc */ diff --git a/src/pools/thread/fixed.ts b/src/pools/thread/fixed.ts index 19783027..d2f4df8e 100644 --- a/src/pools/thread/fixed.ts +++ b/src/pools/thread/fixed.ts @@ -32,9 +32,10 @@ export class FixedThreadPool< public constructor ( numberOfThreads: number, filePath: string, - protected readonly opts: PoolOptions = {} + opts: PoolOptions = {}, + maximumNumberOfThreads?: number ) { - super(numberOfThreads, filePath, opts) + super(numberOfThreads, filePath, opts, maximumNumberOfThreads) } /** @inheritDoc */ diff --git a/src/utils.ts b/src/utils.ts index 5a685439..25233a02 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -3,8 +3,8 @@ import { getRandomValues } from 'node:crypto' import { Worker as ClusterWorker } from 'node:cluster' import { Worker as ThreadWorker } from 'node:worker_threads' import type { - MeasurementStatisticsRequirements, - WorkerChoiceStrategyOptions + InternalWorkerChoiceStrategyOptions, + MeasurementStatisticsRequirements } from './pools/selection-strategies/selection-strategies-types' import type { KillBehavior } from './worker/worker-options' import { type IWorker, type WorkerType, WorkerTypes } from './pools/worker' @@ -23,14 +23,20 @@ export const EMPTY_FUNCTION: () => void = Object.freeze(() => { /** * Default worker choice strategy options. + * + * @param poolMaxSize - The pool maximum size. + * @returns The default worker choice strategy options. */ -export const DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS: WorkerChoiceStrategyOptions = - { - retries: 6, +export const getDefaultInternalWorkerChoiceStrategyOptions = ( + poolMaxSize: number +): InternalWorkerChoiceStrategyOptions => { + return { + retries: poolMaxSize, runTime: { median: false }, waitTime: { median: false }, elu: { median: false } } +} /** * Default measurement statistics requirements. diff --git a/tests/pools/abstract-pool.test.mjs b/tests/pools/abstract-pool.test.mjs index 0f61dbd8..7a9c18e4 100644 --- a/tests/pools/abstract-pool.test.mjs +++ b/tests/pools/abstract-pool.test.mjs @@ -124,6 +124,22 @@ describe('Abstract pool test suite', () => { ) }) + it('Verify that pool arguments number and pool type are checked', () => { + expect( + () => + new FixedThreadPool( + numberOfWorkers, + './tests/worker-files/thread/testWorker.mjs', + undefined, + numberOfWorkers * 2 + ) + ).toThrow( + new Error( + 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization' + ) + ) + }) + it('Verify that dynamic pool sizing is checked', () => { expect( () => @@ -210,16 +226,10 @@ describe('Abstract pool test suite', () => { enableEvents: true, restartWorkerOnError: true, enableTasksQueue: false, - workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN, - workerChoiceStrategyOptions: { - retries: 6, - runTime: { median: false }, - waitTime: { median: false }, - elu: { median: false } - } + workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN }) expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({ - retries: 6, + retries: pool.info.maxSize, runTime: { median: false }, waitTime: { median: false }, elu: { median: false } @@ -227,7 +237,7 @@ describe('Abstract pool test suite', () => { for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext .workerChoiceStrategies) { expect(workerChoiceStrategy.opts).toStrictEqual({ - retries: 6, + retries: pool.info.maxSize, runTime: { median: false }, waitTime: { median: false }, elu: { median: false } @@ -269,10 +279,7 @@ describe('Abstract pool test suite', () => { }, workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED, workerChoiceStrategyOptions: { - retries: 6, runTime: { median: true }, - waitTime: { median: false }, - elu: { median: false }, weights: { 0: 300, 1: 200 } }, onlineHandler: testHandler, @@ -281,7 +288,7 @@ describe('Abstract pool test suite', () => { exitHandler: testHandler }) expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({ - retries: 6, + retries: pool.info.maxSize, runTime: { median: true }, waitTime: { median: false }, elu: { median: false }, @@ -290,7 +297,7 @@ describe('Abstract pool test suite', () => { for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext .workerChoiceStrategies) { expect(workerChoiceStrategy.opts).toStrictEqual({ - retries: 6, + retries: pool.info.maxSize, runTime: { median: true }, waitTime: { median: false }, elu: { median: false }, @@ -311,38 +318,6 @@ describe('Abstract pool test suite', () => { } ) ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'")) - expect( - () => - new FixedThreadPool( - numberOfWorkers, - './tests/worker-files/thread/testWorker.mjs', - { - workerChoiceStrategyOptions: { - retries: 'invalidChoiceRetries' - } - } - ) - ).toThrow( - new TypeError( - 'Invalid worker choice strategy options: retries must be an integer' - ) - ) - expect( - () => - new FixedThreadPool( - numberOfWorkers, - './tests/worker-files/thread/testWorker.mjs', - { - workerChoiceStrategyOptions: { - retries: -1 - } - } - ) - ).toThrow( - new RangeError( - "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero" - ) - ) expect( () => new FixedThreadPool( @@ -478,14 +453,9 @@ describe('Abstract pool test suite', () => { './tests/worker-files/thread/testWorker.mjs', { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE } ) - expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({ - retries: 6, - runTime: { median: false }, - waitTime: { median: false }, - elu: { median: false } - }) + expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined() expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({ - retries: 6, + retries: pool.info.maxSize, runTime: { median: false }, waitTime: { median: false }, elu: { median: false } @@ -493,7 +463,7 @@ describe('Abstract pool test suite', () => { for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext .workerChoiceStrategies) { expect(workerChoiceStrategy.opts).toStrictEqual({ - retries: 6, + retries: pool.info.maxSize, runTime: { median: false }, waitTime: { median: false }, elu: { median: false } @@ -523,13 +493,11 @@ describe('Abstract pool test suite', () => { elu: { median: true } }) expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({ - retries: 6, runTime: { median: true }, - waitTime: { median: false }, elu: { median: true } }) expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({ - retries: 6, + retries: pool.info.maxSize, runTime: { median: true }, waitTime: { median: false }, elu: { median: true } @@ -537,7 +505,7 @@ describe('Abstract pool test suite', () => { for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext .workerChoiceStrategies) { expect(workerChoiceStrategy.opts).toStrictEqual({ - retries: 6, + retries: pool.info.maxSize, runTime: { median: true }, waitTime: { median: false }, elu: { median: true } @@ -567,13 +535,11 @@ describe('Abstract pool test suite', () => { elu: { median: false } }) expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({ - retries: 6, runTime: { median: false }, - waitTime: { median: false }, elu: { median: false } }) expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({ - retries: 6, + retries: pool.info.maxSize, runTime: { median: false }, waitTime: { median: false }, elu: { median: false } @@ -581,7 +547,7 @@ describe('Abstract pool test suite', () => { for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext .workerChoiceStrategies) { expect(workerChoiceStrategy.opts).toStrictEqual({ - retries: 6, + retries: pool.info.maxSize, runTime: { median: false }, waitTime: { median: false }, elu: { median: false } @@ -613,20 +579,6 @@ describe('Abstract pool test suite', () => { 'Invalid worker choice strategy options: must be a plain object' ) ) - expect(() => - pool.setWorkerChoiceStrategyOptions({ - retries: 'invalidChoiceRetries' - }) - ).toThrow( - new TypeError( - 'Invalid worker choice strategy options: retries must be an integer' - ) - ) - expect(() => pool.setWorkerChoiceStrategyOptions({ retries: -1 })).toThrow( - new RangeError( - "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero" - ) - ) expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow( new Error( 'Invalid worker choice strategy options: must have a weight for each worker node' diff --git a/tests/pools/selection-strategies/selection-strategies.test.mjs b/tests/pools/selection-strategies/selection-strategies.test.mjs index ff1f499f..200e83c7 100644 --- a/tests/pools/selection-strategies/selection-strategies.test.mjs +++ b/tests/pools/selection-strategies/selection-strategies.test.mjs @@ -66,14 +66,9 @@ describe('Selection strategies test suite', () => { expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe( workerChoiceStrategy ) - expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({ - retries: 6, - runTime: { median: false }, - waitTime: { median: false }, - elu: { median: false } - }) + expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined() expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({ - retries: 6, + retries: pool.info.maxSize, runTime: { median: false }, waitTime: { median: false }, elu: { median: false } @@ -86,19 +81,14 @@ describe('Selection strategies test suite', () => { max, './tests/worker-files/cluster/testWorker.js' ) - pool.setWorkerChoiceStrategy(workerChoiceStrategy, { retries: 3 }) + pool.setWorkerChoiceStrategy(workerChoiceStrategy) expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy) expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe( workerChoiceStrategy ) - expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({ - retries: 3, - runTime: { median: false }, - waitTime: { median: false }, - elu: { median: false } - }) + expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined() expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({ - retries: 3, + retries: pool.info.maxSize, runTime: { median: false }, waitTime: { median: false }, elu: { median: false } diff --git a/tests/pools/selection-strategies/worker-choice-strategy-context.test.mjs b/tests/pools/selection-strategies/worker-choice-strategy-context.test.mjs index 96281a95..f076f4dd 100644 --- a/tests/pools/selection-strategies/worker-choice-strategy-context.test.mjs +++ b/tests/pools/selection-strategies/worker-choice-strategy-context.test.mjs @@ -95,7 +95,9 @@ describe('Worker choice strategy context test suite', () => { workerChoiceStrategyUndefinedStub ) expect(() => workerChoiceStrategyContext.execute()).toThrow( - new Error('Worker node key chosen is null or undefined after 6 retries') + new Error( + `Worker node key chosen is null or undefined after ${fixedPool.info.maxSize} retries` + ) ) const workerChoiceStrategyNullStub = createStubInstance( RoundRobinWorkerChoiceStrategy, @@ -109,7 +111,9 @@ describe('Worker choice strategy context test suite', () => { workerChoiceStrategyNullStub ) expect(() => workerChoiceStrategyContext.execute()).toThrow( - new Error('Worker node key chosen is null or undefined after 6 retries') + new Error( + `Worker node key chosen is null or undefined after ${fixedPool.info.maxSize} retries` + ) ) }) diff --git a/tests/utils.test.mjs b/tests/utils.test.mjs index 26056bd2..4b4e1e13 100644 --- a/tests/utils.test.mjs +++ b/tests/utils.test.mjs @@ -6,11 +6,11 @@ import { expect } from 'expect' import { DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS, DEFAULT_TASK_NAME, - DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, EMPTY_FUNCTION, availableParallelism, average, exponentialDelay, + getDefaultInternalWorkerChoiceStrategyOptions, getWorkerId, getWorkerType, isAsyncFunction, @@ -35,9 +35,12 @@ describe('Utils test suite', () => { expect(EMPTY_FUNCTION).toStrictEqual(expect.any(Function)) }) - it('Verify DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS values', () => { - expect(DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS).toStrictEqual({ - retries: 6, + it('Verify getDefaultInternalWorkerChoiceStrategyOptions() values', () => { + const poolMaxSize = 10 + expect( + getDefaultInternalWorkerChoiceStrategyOptions(poolMaxSize) + ).toStrictEqual({ + retries: poolMaxSize, runTime: { median: false }, waitTime: { median: false }, elu: { median: false } -- 2.34.1