- `workerChoiceStrategyOptions` (optional) - The worker choice strategy options object to use in this pool.
Properties:
- - `retries` (optional) - The number of retries to perform if no worker is eligible.
- `measurement` (optional) - The measurement to use in worker choice strategies: `runTime`, `waitTime` or `elu`.
- `runTime` (optional) - Use the tasks [simple moving median](./worker-choice-strategies.md#simple-moving-median) runtime instead of the tasks simple moving average runtime in worker choice strategies.
- `waitTime` (optional) - Use the tasks [simple moving median](./worker-choice-strategies.md#simple-moving-median) wait time instead of the tasks simple moving average wait time in worker choice strategies.
- `elu` (optional) - Use the tasks [simple moving median](./worker-choice-strategies.md#simple-moving-median) ELU instead of the tasks simple moving average ELU in worker choice strategies.
- `weights` (optional) - The worker weights to use in weighted round robin worker choice strategies: `{ 0: 200, 1: 300, ..., n: 100 }`.
- Default: `{ retries: 6, runTime: { median: false }, waitTime: { median: false }, elu: { median: false } }`
+ Default: `{ runTime: { median: false }, waitTime: { median: false }, elu: { median: false } }`
- `startWorkers` (optional) - Start the minimum number of workers at pool initialization.
Default: `true`
} from '../utility-types'
import {
DEFAULT_TASK_NAME,
- DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
EMPTY_FUNCTION,
average,
exponentialDelay,
/** @inheritDoc */
public emitter?: EventEmitterAsyncResource
- /**
- * Dynamic pool maximum size property placeholder.
- */
- protected readonly max?: number
-
/**
* The task execution response promise map:
* - `key`: The message id of each submitted task.
/**
* Constructs a new poolifier pool.
*
- * @param numberOfWorkers - Number of workers that this pool should manage.
+ * @param minimumNumberOfWorkers - Minimum number of workers that this pool should manage.
+ * @param maximumNumberOfWorkers - Maximum number of workers that this pool should manage.
* @param filePath - Path to the worker file.
* @param opts - Options for the pool.
*/
public constructor (
- protected readonly numberOfWorkers: number,
+ protected readonly minimumNumberOfWorkers: number,
protected readonly filePath: string,
- protected readonly opts: PoolOptions<Worker>
+ protected readonly opts: PoolOptions<Worker>,
+ protected readonly maximumNumberOfWorkers?: number
) {
if (!this.isMain()) {
throw new Error(
'Cannot start a pool from a worker with the same type as the pool'
)
}
+ this.checkPoolType()
checkFilePath(this.filePath)
- this.checkNumberOfWorkers(this.numberOfWorkers)
+ this.checkMinimumNumberOfWorkers(this.minimumNumberOfWorkers)
this.checkPoolOptions(this.opts)
this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
this.startTimestamp = performance.now()
}
- private checkNumberOfWorkers (numberOfWorkers: number): void {
+ private checkPoolType (): void {
+ if (this.type === PoolTypes.fixed && this.maximumNumberOfWorkers != null) {
+ throw new Error(
+ 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
+ )
+ }
+ }
+
+ private checkMinimumNumberOfWorkers (numberOfWorkers: number): void {
if (numberOfWorkers == null) {
throw new Error(
'Cannot instantiate a pool without specifying the number of workers'
this.checkValidWorkerChoiceStrategyOptions(
opts.workerChoiceStrategyOptions as WorkerChoiceStrategyOptions
)
- this.opts.workerChoiceStrategyOptions = {
- ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
- ...opts.workerChoiceStrategyOptions
+ if (opts.workerChoiceStrategyOptions != null) {
+ this.opts.workerChoiceStrategyOptions = opts.workerChoiceStrategyOptions
}
this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
this.opts.enableEvents = opts.enableEvents ?? true
'Invalid worker choice strategy options: must be a plain object'
)
}
- if (
- workerChoiceStrategyOptions?.retries != null &&
- !Number.isSafeInteger(workerChoiceStrategyOptions.retries)
- ) {
- throw new TypeError(
- 'Invalid worker choice strategy options: retries must be an integer'
- )
- }
- if (
- workerChoiceStrategyOptions?.retries != null &&
- workerChoiceStrategyOptions.retries < 0
- ) {
- throw new RangeError(
- `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
- )
- }
if (
workerChoiceStrategyOptions?.weights != null &&
- Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
+ Object.keys(workerChoiceStrategyOptions.weights).length !==
+ (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
) {
throw new Error(
'Invalid worker choice strategy options: must have a weight for each worker node'
started: this.started,
ready: this.ready,
strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
- minSize: this.minSize,
- maxSize: this.maxSize,
- ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ minSize: this.minimumNumberOfWorkers,
+ maxSize: this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers,
+ ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
.runTime.aggregate &&
- this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
.waitTime.aggregate && { utilization: round(this.utilization) }),
workerNodes: this.workerNodes.length,
idleWorkerNodes: this.workerNodes.reduce(
accumulator + workerNode.usage.tasks.failed,
0
),
- ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
.runTime.aggregate && {
runTime: {
minimum: round(
)
)
),
- ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
.runTime.average && {
average: round(
average(
)
)
}),
- ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
.runTime.median && {
median: round(
median(
})
}
}),
- ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
.waitTime.aggregate && {
waitTime: {
minimum: round(
)
)
),
- ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
.waitTime.average && {
average: round(
average(
)
)
}),
- ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
.waitTime.median && {
median: round(
median(
? accumulator + 1
: accumulator,
0
- ) >= this.minSize
+ ) >= this.minimumNumberOfWorkers
)
}
*/
private get utilization (): number {
const poolTimeCapacity =
- (performance.now() - this.startTimestamp) * this.maxSize
+ (performance.now() - this.startTimestamp) *
+ (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
const totalTasksRunTime = this.workerNodes.reduce(
(accumulator, workerNode) =>
accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
*/
protected abstract get worker (): WorkerType
- /**
- * The pool minimum size.
- */
- protected get minSize (): number {
- return this.numberOfWorkers
- }
-
- /**
- * The pool maximum size.
- */
- protected get maxSize (): number {
- return this.max ?? this.numberOfWorkers
- }
-
/**
* Checks if the worker id sent in the received message from a worker is valid.
*
workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
): void {
this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
- this.opts.workerChoiceStrategyOptions = {
- ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
- ...workerChoiceStrategyOptions
+ if (workerChoiceStrategyOptions != null) {
+ this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
}
this.workerChoiceStrategyContext.setOptions(
+ this,
this.opts.workerChoiceStrategyOptions
)
}
tasksQueueOptions: TasksQueueOptions
): TasksQueueOptions {
return {
- ...getDefaultTasksQueueOptions(this.maxSize),
+ ...getDefaultTasksQueueOptions(
+ this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
+ ),
...tasksQueueOptions
}
}
* The pool filling boolean status.
*/
protected get full (): boolean {
- return this.workerNodes.length >= this.maxSize
+ return (
+ this.workerNodes.length >=
+ (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
+ )
}
/**
(accumulator, workerNode) =>
!workerNode.info.dynamic ? accumulator + 1 : accumulator,
0
- ) < this.numberOfWorkers
+ ) < this.minimumNumberOfWorkers
) {
this.createAndSetupWorkerNode()
}
this.started = false
}
- protected async sendKillMessageToWorker (
- workerNodeKey: number
- ): Promise<void> {
+ private async sendKillMessageToWorker (workerNodeKey: number): Promise<void> {
await new Promise<void>((resolve, reject) => {
if (workerNodeKey < 0 || workerNodeKey >= this.workerNodes.length) {
reject(new Error(`Invalid worker node key '${workerNodeKey}'`))
'taskFinished',
flushedTasks,
this.opts.tasksQueueOptions?.tasksFinishedTimeout ??
- getDefaultTasksQueueOptions(this.maxSize).tasksFinishedTimeout
+ getDefaultTasksQueueOptions(
+ this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
+ ).tasksFinishedTimeout
)
await this.sendKillMessageToWorker(workerNodeKey)
await workerNode.terminate()
workerOptions: this.opts.workerOptions,
tasksQueueBackPressureSize:
this.opts.tasksQueueOptions?.size ??
- getDefaultTasksQueueOptions(this.maxSize).size
+ getDefaultTasksQueueOptions(
+ this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
+ ).size
}
)
// Flag the worker node as ready at pool startup.
*/
public constructor (
min: number,
- protected readonly max: number,
+ max: number,
filePath: string,
opts: PoolOptions<Worker> = {}
) {
- super(min, filePath, opts)
- checkDynamicPoolSize(this.numberOfWorkers, this.max)
+ super(min, filePath, opts, max)
+ checkDynamicPoolSize(
+ this.minimumNumberOfWorkers,
+ this.maximumNumberOfWorkers as number
+ )
}
/** @inheritDoc */
public constructor (
numberOfWorkers: number,
filePath: string,
- protected readonly opts: PoolOptions<Worker> = {}
+ opts: PoolOptions<Worker> = {},
+ maximumNumberOfWorkers?: number
) {
- super(numberOfWorkers, filePath, opts)
+ super(numberOfWorkers, filePath, opts, maximumNumberOfWorkers)
}
/** @inheritDoc */
import { cpus } from 'node:os'
import {
DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS,
- DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
+ getDefaultInternalWorkerChoiceStrategyOptions
} from '../../utils'
import type { IPool } from '../pool'
import type { IWorker } from '../worker'
import type {
IWorkerChoiceStrategy,
+ InternalWorkerChoiceStrategyOptions,
MeasurementStatisticsRequirements,
StrategyPolicy,
- TaskStatisticsRequirements,
- WorkerChoiceStrategyOptions
+ TaskStatisticsRequirements
} from './selection-strategies-types'
/**
*/
public constructor (
protected readonly pool: IPool<Worker, Data, Response>,
- protected opts: WorkerChoiceStrategyOptions = DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
+ protected opts: InternalWorkerChoiceStrategyOptions
) {
- this.opts = { ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, ...opts }
+ this.setOptions(this.opts)
this.choose = this.choose.bind(this)
}
protected setTaskStatisticsRequirements (
- opts: WorkerChoiceStrategyOptions
+ opts: InternalWorkerChoiceStrategyOptions
): void {
this.toggleMedianMeasurementStatisticsRequirements(
this.taskStatisticsRequirements.runTime,
public abstract remove (workerNodeKey: number): boolean
/** @inheritDoc */
- public setOptions (opts: WorkerChoiceStrategyOptions): void {
- this.opts = { ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, ...opts }
+ public setOptions (opts: InternalWorkerChoiceStrategyOptions): void {
+ this.opts = {
+ ...getDefaultInternalWorkerChoiceStrategyOptions(this.pool.info.maxSize),
+ ...opts
+ }
this.setTaskStatisticsRequirements(this.opts)
}
-import {
- DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS,
- DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
-} from '../../utils'
+import { DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS } from '../../utils'
import type { IPool } from '../pool'
import type { IWorker, StrategyData } from '../worker'
import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy'
import {
type IWorkerChoiceStrategy,
+ type InternalWorkerChoiceStrategyOptions,
Measurements,
- type TaskStatisticsRequirements,
- type WorkerChoiceStrategyOptions
+ type TaskStatisticsRequirements
} from './selection-strategies-types'
/**
/** @inheritDoc */
public constructor (
pool: IPool<Worker, Data, Response>,
- opts: WorkerChoiceStrategyOptions = DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
+ opts: InternalWorkerChoiceStrategyOptions
) {
super(pool, opts)
this.setTaskStatisticsRequirements(this.opts)
import type { IWorker } from '../worker'
import type { IPool } from '../pool'
-import {
- DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS,
- DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
-} from '../../utils'
+import { DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS } from '../../utils'
import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy'
import type {
IWorkerChoiceStrategy,
- TaskStatisticsRequirements,
- WorkerChoiceStrategyOptions
+ InternalWorkerChoiceStrategyOptions,
+ TaskStatisticsRequirements
} from './selection-strategies-types'
/**
/** @inheritDoc */
public constructor (
pool: IPool<Worker, Data, Response>,
- opts: WorkerChoiceStrategyOptions = DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
+ opts: InternalWorkerChoiceStrategyOptions
) {
super(pool, opts)
this.setTaskStatisticsRequirements(this.opts)
}
/** @inheritDoc */
- public setOptions (opts: WorkerChoiceStrategyOptions): void {
+ public setOptions (opts: InternalWorkerChoiceStrategyOptions): void {
super.setOptions(opts)
this.roundWeights = this.getRoundWeights()
}
-import {
- DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS,
- DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
-} from '../../utils'
+import { DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS } from '../../utils'
import type { IPool } from '../pool'
import type { IWorker } from '../worker'
import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy'
import type {
IWorkerChoiceStrategy,
- TaskStatisticsRequirements,
- WorkerChoiceStrategyOptions
+ InternalWorkerChoiceStrategyOptions,
+ TaskStatisticsRequirements
} from './selection-strategies-types'
/**
/** @inheritDoc */
public constructor (
pool: IPool<Worker, Data, Response>,
- opts: WorkerChoiceStrategyOptions = DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
+ opts: InternalWorkerChoiceStrategyOptions
) {
super(pool, opts)
this.setTaskStatisticsRequirements(this.opts)
-import {
- DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS,
- DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
-} from '../../utils'
+import { DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS } from '../../utils'
import type { IPool } from '../pool'
import type { IWorker } from '../worker'
import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy'
import type {
IWorkerChoiceStrategy,
- TaskStatisticsRequirements,
- WorkerChoiceStrategyOptions
+ InternalWorkerChoiceStrategyOptions,
+ TaskStatisticsRequirements
} from './selection-strategies-types'
/**
/** @inheritDoc */
public constructor (
pool: IPool<Worker, Data, Response>,
- opts: WorkerChoiceStrategyOptions = DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
+ opts: InternalWorkerChoiceStrategyOptions
) {
super(pool, opts)
this.setTaskStatisticsRequirements(this.opts)
-import { DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS } from '../../utils'
import type { IPool } from '../pool'
import type { IWorker } from '../worker'
import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy'
import type {
IWorkerChoiceStrategy,
- WorkerChoiceStrategyOptions
+ InternalWorkerChoiceStrategyOptions
} from './selection-strategies-types'
/**
/** @inheritDoc */
public constructor (
pool: IPool<Worker, Data, Response>,
- opts: WorkerChoiceStrategyOptions = DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
+ opts: InternalWorkerChoiceStrategyOptions
) {
super(pool, opts)
- this.setTaskStatisticsRequirements(this.opts)
}
/** @inheritDoc */
-import { DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS } from '../../utils'
import type { IPool } from '../pool'
import type { IWorker } from '../worker'
import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy'
import type {
IWorkerChoiceStrategy,
- WorkerChoiceStrategyOptions
+ InternalWorkerChoiceStrategyOptions
} from './selection-strategies-types'
/**
/** @inheritDoc */
public constructor (
pool: IPool<Worker, Data, Response>,
- opts: WorkerChoiceStrategyOptions = DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
+ opts: InternalWorkerChoiceStrategyOptions
) {
super(pool, opts)
- this.setTaskStatisticsRequirements(this.opts)
}
/** @inheritDoc */
* Worker choice strategy options.
*/
export interface WorkerChoiceStrategyOptions {
- /**
- * Number of worker choice retries to perform if no worker is eligible.
- *
- * @defaultValue 6
- */
- readonly retries?: number
/**
* Measurement to use in worker choice strategy supporting it.
*/
readonly weights?: Record<number, number>
}
+/**
+ * Worker choice strategy internal options.
+ *
+ * @internal
+ */
+export interface InternalWorkerChoiceStrategyOptions
+ extends WorkerChoiceStrategyOptions {
+ /**
+ * Number of worker choice retries to perform if no worker is eligible.
+ *
+ * @defaultValue pool maximum size
+ */
+ readonly retries?: number
+}
+
/**
* Measurement statistics requirements.
*
import type { IWorker } from '../worker'
import type { IPool } from '../pool'
-import {
- DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS,
- DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
-} from '../../utils'
+import { DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS } from '../../utils'
import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy'
import type {
IWorkerChoiceStrategy,
- TaskStatisticsRequirements,
- WorkerChoiceStrategyOptions
+ InternalWorkerChoiceStrategyOptions,
+ TaskStatisticsRequirements
} from './selection-strategies-types'
/**
/** @inheritDoc */
public constructor (
pool: IPool<Worker, Data, Response>,
- opts: WorkerChoiceStrategyOptions = DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
+ opts: InternalWorkerChoiceStrategyOptions
) {
super(pool, opts)
this.setTaskStatisticsRequirements(this.opts)
-import { DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS } from '../../utils'
+import { getDefaultInternalWorkerChoiceStrategyOptions } from '../../utils'
import type { IPool } from '../pool'
import type { IWorker } from '../worker'
import { FairShareWorkerChoiceStrategy } from './fair-share-worker-choice-strategy'
import { RoundRobinWorkerChoiceStrategy } from './round-robin-worker-choice-strategy'
import type {
IWorkerChoiceStrategy,
+ InternalWorkerChoiceStrategyOptions,
StrategyPolicy,
TaskStatisticsRequirements,
- WorkerChoiceStrategy,
- WorkerChoiceStrategyOptions
+ WorkerChoiceStrategy
} from './selection-strategies-types'
import { WorkerChoiceStrategies } from './selection-strategies-types'
import { WeightedRoundRobinWorkerChoiceStrategy } from './weighted-round-robin-worker-choice-strategy'
public constructor (
pool: IPool<Worker, Data, Response>,
private workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN,
- private opts: WorkerChoiceStrategyOptions = DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
+ private opts?: InternalWorkerChoiceStrategyOptions
) {
- this.opts = { ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, ...opts }
+ this.opts = {
+ ...getDefaultInternalWorkerChoiceStrategyOptions(pool.info.maxSize),
+ ...this.opts
+ }
this.execute = this.execute.bind(this)
this.workerChoiceStrategies = new Map<
WorkerChoiceStrategy,
WorkerChoiceStrategies.ROUND_ROBIN,
new (RoundRobinWorkerChoiceStrategy.bind(this))<Worker, Data, Response>(
pool,
- opts
+ this.opts
)
],
[
WorkerChoiceStrategies.LEAST_USED,
new (LeastUsedWorkerChoiceStrategy.bind(this))<Worker, Data, Response>(
pool,
- opts
+ this.opts
)
],
[
WorkerChoiceStrategies.LEAST_BUSY,
new (LeastBusyWorkerChoiceStrategy.bind(this))<Worker, Data, Response>(
pool,
- opts
+ this.opts
)
],
[
WorkerChoiceStrategies.LEAST_ELU,
new (LeastEluWorkerChoiceStrategy.bind(this))<Worker, Data, Response>(
pool,
- opts
+ this.opts
)
],
[
WorkerChoiceStrategies.FAIR_SHARE,
new (FairShareWorkerChoiceStrategy.bind(this))<Worker, Data, Response>(
pool,
- opts
+ this.opts
)
],
[
Worker,
Data,
Response
- >(pool, opts)
+ >(pool, this.opts)
],
[
WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN,
Worker,
Data,
Response
- >(pool, opts)
+ >(pool, this.opts)
]
])
}
chooseCount++
} while (
workerNodeKey == null &&
- retriesCount < (this.opts.retries as number)
+ retriesCount < (this.opts?.retries as number)
)
if (workerNodeKey == null) {
throw new Error(
*
* @param opts - The worker choice strategy options.
*/
- public setOptions (opts: WorkerChoiceStrategyOptions): void {
- this.opts = { ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, ...opts }
+ public setOptions (
+ pool: IPool<Worker, Data, Response>,
+ opts?: InternalWorkerChoiceStrategyOptions
+ ): void {
+ this.opts = {
+ ...getDefaultInternalWorkerChoiceStrategyOptions(pool.info.maxSize),
+ ...opts
+ }
for (const workerChoiceStrategy of this.workerChoiceStrategies.values()) {
- workerChoiceStrategy.setOptions(opts)
+ workerChoiceStrategy.setOptions(this.opts)
}
}
}
*/
public constructor (
min: number,
- protected readonly max: number,
+ max: number,
filePath: string,
opts: PoolOptions<Worker> = {}
) {
- super(min, filePath, opts)
- checkDynamicPoolSize(this.numberOfWorkers, this.max)
+ super(min, filePath, opts, max)
+ checkDynamicPoolSize(
+ this.minimumNumberOfWorkers,
+ this.maximumNumberOfWorkers as number
+ )
}
/** @inheritDoc */
public constructor (
numberOfThreads: number,
filePath: string,
- protected readonly opts: PoolOptions<Worker> = {}
+ opts: PoolOptions<Worker> = {},
+ maximumNumberOfThreads?: number
) {
- super(numberOfThreads, filePath, opts)
+ super(numberOfThreads, filePath, opts, maximumNumberOfThreads)
}
/** @inheritDoc */
import { Worker as ClusterWorker } from 'node:cluster'
import { Worker as ThreadWorker } from 'node:worker_threads'
import type {
- MeasurementStatisticsRequirements,
- WorkerChoiceStrategyOptions
+ InternalWorkerChoiceStrategyOptions,
+ MeasurementStatisticsRequirements
} from './pools/selection-strategies/selection-strategies-types'
import type { KillBehavior } from './worker/worker-options'
import { type IWorker, type WorkerType, WorkerTypes } from './pools/worker'
/**
* Default worker choice strategy options.
+ *
+ * @param poolMaxSize - The pool maximum size.
+ * @returns The default worker choice strategy options.
*/
-export const DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS: WorkerChoiceStrategyOptions =
- {
- retries: 6,
+export const getDefaultInternalWorkerChoiceStrategyOptions = (
+ poolMaxSize: number
+): InternalWorkerChoiceStrategyOptions => {
+ return {
+ retries: poolMaxSize,
runTime: { median: false },
waitTime: { median: false },
elu: { median: false }
}
+}
/**
* Default measurement statistics requirements.
)
})
+ it('Verify that pool arguments number and pool type are checked', () => {
+ expect(
+ () =>
+ new FixedThreadPool(
+ numberOfWorkers,
+ './tests/worker-files/thread/testWorker.mjs',
+ undefined,
+ numberOfWorkers * 2
+ )
+ ).toThrow(
+ new Error(
+ 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
+ )
+ )
+ })
+
it('Verify that dynamic pool sizing is checked', () => {
expect(
() =>
enableEvents: true,
restartWorkerOnError: true,
enableTasksQueue: false,
- workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
- workerChoiceStrategyOptions: {
- retries: 6,
- runTime: { median: false },
- waitTime: { median: false },
- elu: { median: false }
- }
+ workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN
})
expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
- retries: 6,
+ retries: pool.info.maxSize,
runTime: { median: false },
waitTime: { median: false },
elu: { median: false }
for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
.workerChoiceStrategies) {
expect(workerChoiceStrategy.opts).toStrictEqual({
- retries: 6,
+ retries: pool.info.maxSize,
runTime: { median: false },
waitTime: { median: false },
elu: { median: false }
},
workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
workerChoiceStrategyOptions: {
- retries: 6,
runTime: { median: true },
- waitTime: { median: false },
- elu: { median: false },
weights: { 0: 300, 1: 200 }
},
onlineHandler: testHandler,
exitHandler: testHandler
})
expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
- retries: 6,
+ retries: pool.info.maxSize,
runTime: { median: true },
waitTime: { median: false },
elu: { median: false },
for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
.workerChoiceStrategies) {
expect(workerChoiceStrategy.opts).toStrictEqual({
- retries: 6,
+ retries: pool.info.maxSize,
runTime: { median: true },
waitTime: { median: false },
elu: { median: false },
}
)
).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'"))
- expect(
- () =>
- new FixedThreadPool(
- numberOfWorkers,
- './tests/worker-files/thread/testWorker.mjs',
- {
- workerChoiceStrategyOptions: {
- retries: 'invalidChoiceRetries'
- }
- }
- )
- ).toThrow(
- new TypeError(
- 'Invalid worker choice strategy options: retries must be an integer'
- )
- )
- expect(
- () =>
- new FixedThreadPool(
- numberOfWorkers,
- './tests/worker-files/thread/testWorker.mjs',
- {
- workerChoiceStrategyOptions: {
- retries: -1
- }
- }
- )
- ).toThrow(
- new RangeError(
- "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
- )
- )
expect(
() =>
new FixedThreadPool(
'./tests/worker-files/thread/testWorker.mjs',
{ workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
)
- expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
- retries: 6,
- runTime: { median: false },
- waitTime: { median: false },
- elu: { median: false }
- })
+ expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined()
expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
- retries: 6,
+ retries: pool.info.maxSize,
runTime: { median: false },
waitTime: { median: false },
elu: { median: false }
for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
.workerChoiceStrategies) {
expect(workerChoiceStrategy.opts).toStrictEqual({
- retries: 6,
+ retries: pool.info.maxSize,
runTime: { median: false },
waitTime: { median: false },
elu: { median: false }
elu: { median: true }
})
expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
- retries: 6,
runTime: { median: true },
- waitTime: { median: false },
elu: { median: true }
})
expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
- retries: 6,
+ retries: pool.info.maxSize,
runTime: { median: true },
waitTime: { median: false },
elu: { median: true }
for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
.workerChoiceStrategies) {
expect(workerChoiceStrategy.opts).toStrictEqual({
- retries: 6,
+ retries: pool.info.maxSize,
runTime: { median: true },
waitTime: { median: false },
elu: { median: true }
elu: { median: false }
})
expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
- retries: 6,
runTime: { median: false },
- waitTime: { median: false },
elu: { median: false }
})
expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
- retries: 6,
+ retries: pool.info.maxSize,
runTime: { median: false },
waitTime: { median: false },
elu: { median: false }
for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
.workerChoiceStrategies) {
expect(workerChoiceStrategy.opts).toStrictEqual({
- retries: 6,
+ retries: pool.info.maxSize,
runTime: { median: false },
waitTime: { median: false },
elu: { median: false }
'Invalid worker choice strategy options: must be a plain object'
)
)
- expect(() =>
- pool.setWorkerChoiceStrategyOptions({
- retries: 'invalidChoiceRetries'
- })
- ).toThrow(
- new TypeError(
- 'Invalid worker choice strategy options: retries must be an integer'
- )
- )
- expect(() => pool.setWorkerChoiceStrategyOptions({ retries: -1 })).toThrow(
- new RangeError(
- "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
- )
- )
expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow(
new Error(
'Invalid worker choice strategy options: must have a weight for each worker node'
expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
workerChoiceStrategy
)
- expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
- retries: 6,
- runTime: { median: false },
- waitTime: { median: false },
- elu: { median: false }
- })
+ expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined()
expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
- retries: 6,
+ retries: pool.info.maxSize,
runTime: { median: false },
waitTime: { median: false },
elu: { median: false }
max,
'./tests/worker-files/cluster/testWorker.js'
)
- pool.setWorkerChoiceStrategy(workerChoiceStrategy, { retries: 3 })
+ pool.setWorkerChoiceStrategy(workerChoiceStrategy)
expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy)
expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
workerChoiceStrategy
)
- expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
- retries: 3,
- runTime: { median: false },
- waitTime: { median: false },
- elu: { median: false }
- })
+ expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined()
expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
- retries: 3,
+ retries: pool.info.maxSize,
runTime: { median: false },
waitTime: { median: false },
elu: { median: false }
workerChoiceStrategyUndefinedStub
)
expect(() => workerChoiceStrategyContext.execute()).toThrow(
- new Error('Worker node key chosen is null or undefined after 6 retries')
+ new Error(
+ `Worker node key chosen is null or undefined after ${fixedPool.info.maxSize} retries`
+ )
)
const workerChoiceStrategyNullStub = createStubInstance(
RoundRobinWorkerChoiceStrategy,
workerChoiceStrategyNullStub
)
expect(() => workerChoiceStrategyContext.execute()).toThrow(
- new Error('Worker node key chosen is null or undefined after 6 retries')
+ new Error(
+ `Worker node key chosen is null or undefined after ${fixedPool.info.maxSize} retries`
+ )
)
})
import {
DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS,
DEFAULT_TASK_NAME,
- DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
EMPTY_FUNCTION,
availableParallelism,
average,
exponentialDelay,
+ getDefaultInternalWorkerChoiceStrategyOptions,
getWorkerId,
getWorkerType,
isAsyncFunction,
expect(EMPTY_FUNCTION).toStrictEqual(expect.any(Function))
})
- it('Verify DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS values', () => {
- expect(DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS).toStrictEqual({
- retries: 6,
+ it('Verify getDefaultInternalWorkerChoiceStrategyOptions() values', () => {
+ const poolMaxSize = 10
+ expect(
+ getDefaultInternalWorkerChoiceStrategyOptions(poolMaxSize)
+ ).toStrictEqual({
+ retries: poolMaxSize,
runTime: { median: false },
waitTime: { median: false },
elu: { median: false }