} from '../utility-types'
import {
DEFAULT_TASK_NAME,
- DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
EMPTY_FUNCTION,
average,
exponentialDelay,
/** @inheritDoc */
public emitter?: EventEmitterAsyncResource
- /**
- * Dynamic pool maximum size property placeholder.
- */
- protected readonly max?: number
-
/**
* The task execution response promise map:
* - `key`: The message id of each submitted task.
/**
* Constructs a new poolifier pool.
*
- * @param numberOfWorkers - Number of workers that this pool should manage.
+ * @param minimumNumberOfWorkers - Minimum number of workers that this pool manages.
* @param filePath - Path to the worker file.
* @param opts - Options for the pool.
+ * @param maximumNumberOfWorkers - Maximum number of workers that this pool manages.
*/
public constructor (
- protected readonly numberOfWorkers: number,
+ protected readonly minimumNumberOfWorkers: number,
protected readonly filePath: string,
- protected readonly opts: PoolOptions<Worker>
+ protected readonly opts: PoolOptions<Worker>,
+ protected readonly maximumNumberOfWorkers?: number
) {
if (!this.isMain()) {
throw new Error(
'Cannot start a pool from a worker with the same type as the pool'
)
}
+ this.checkPoolType()
checkFilePath(this.filePath)
- this.checkNumberOfWorkers(this.numberOfWorkers)
+ this.checkMinimumNumberOfWorkers(this.minimumNumberOfWorkers)
this.checkPoolOptions(this.opts)
this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
this.startTimestamp = performance.now()
}
- private checkNumberOfWorkers (numberOfWorkers: number): void {
- if (numberOfWorkers == null) {
+ private checkPoolType (): void {
+ if (this.type === PoolTypes.fixed && this.maximumNumberOfWorkers != null) {
+ throw new Error(
+ 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
+ )
+ }
+ }
+
+ private checkMinimumNumberOfWorkers (minimumNumberOfWorkers: number): void {
+ if (minimumNumberOfWorkers == null) {
throw new Error(
'Cannot instantiate a pool without specifying the number of workers'
)
- } else if (!Number.isSafeInteger(numberOfWorkers)) {
+ } else if (!Number.isSafeInteger(minimumNumberOfWorkers)) {
throw new TypeError(
'Cannot instantiate a pool with a non safe integer number of workers'
)
- } else if (numberOfWorkers < 0) {
+ } else if (minimumNumberOfWorkers < 0) {
throw new RangeError(
'Cannot instantiate a pool with a negative number of workers'
)
- } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
+ } else if (this.type === PoolTypes.fixed && minimumNumberOfWorkers === 0) {
throw new RangeError('Cannot instantiate a fixed pool with zero worker')
}
}
this.checkValidWorkerChoiceStrategyOptions(
opts.workerChoiceStrategyOptions as WorkerChoiceStrategyOptions
)
- this.opts.workerChoiceStrategyOptions = {
- ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
- ...opts.workerChoiceStrategyOptions
+ if (opts.workerChoiceStrategyOptions != null) {
+ this.opts.workerChoiceStrategyOptions = opts.workerChoiceStrategyOptions
}
this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
this.opts.enableEvents = opts.enableEvents ?? true
'Invalid worker choice strategy options: must be a plain object'
)
}
- if (
- workerChoiceStrategyOptions?.retries != null &&
- !Number.isSafeInteger(workerChoiceStrategyOptions.retries)
- ) {
- throw new TypeError(
- 'Invalid worker choice strategy options: retries must be an integer'
- )
- }
- if (
- workerChoiceStrategyOptions?.retries != null &&
- workerChoiceStrategyOptions.retries < 0
- ) {
- throw new RangeError(
- `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
- )
- }
if (
workerChoiceStrategyOptions?.weights != null &&
- Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
+ Object.keys(workerChoiceStrategyOptions.weights).length !==
+ (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
) {
throw new Error(
'Invalid worker choice strategy options: must have a weight for each worker node'
started: this.started,
ready: this.ready,
strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
- minSize: this.minSize,
- maxSize: this.maxSize,
- ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ minSize: this.minimumNumberOfWorkers,
+ maxSize: this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers,
+ ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
.runTime.aggregate &&
- this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
.waitTime.aggregate && { utilization: round(this.utilization) }),
workerNodes: this.workerNodes.length,
idleWorkerNodes: this.workerNodes.reduce(
accumulator + workerNode.usage.tasks.failed,
0
),
- ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
.runTime.aggregate && {
runTime: {
minimum: round(
)
)
),
- ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
.runTime.average && {
average: round(
average(
)
)
}),
- ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
.runTime.median && {
median: round(
median(
})
}
}),
- ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
.waitTime.aggregate && {
waitTime: {
minimum: round(
)
)
),
- ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
.waitTime.average && {
average: round(
average(
)
)
}),
- ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
.waitTime.median && {
median: round(
median(
? accumulator + 1
: accumulator,
0
- ) >= this.minSize
+ ) >= this.minimumNumberOfWorkers
)
}
*/
private get utilization (): number {
const poolTimeCapacity =
- (performance.now() - this.startTimestamp) * this.maxSize
+ (performance.now() - this.startTimestamp) *
+ (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
const totalTasksRunTime = this.workerNodes.reduce(
(accumulator, workerNode) =>
accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
*/
protected abstract get worker (): WorkerType
- /**
- * The pool minimum size.
- */
- protected get minSize (): number {
- return this.numberOfWorkers
- }
-
- /**
- * The pool maximum size.
- */
- protected get maxSize (): number {
- return this.max ?? this.numberOfWorkers
- }
-
/**
* Checks if the worker id sent in the received message from a worker is valid.
*
workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
): void {
this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
- this.opts.workerChoiceStrategyOptions = {
- ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
- ...workerChoiceStrategyOptions
+ if (workerChoiceStrategyOptions != null) {
+ this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
}
this.workerChoiceStrategyContext.setOptions(
+ this,
this.opts.workerChoiceStrategyOptions
)
}
tasksQueueOptions: TasksQueueOptions
): TasksQueueOptions {
return {
- ...getDefaultTasksQueueOptions(this.maxSize),
+ ...getDefaultTasksQueueOptions(
+ this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
+ ),
...tasksQueueOptions
}
}
* The pool filling boolean status.
*/
protected get full (): boolean {
- return this.workerNodes.length >= this.maxSize
+ return (
+ this.workerNodes.length >=
+ (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
+ )
}
/**
(accumulator, workerNode) =>
!workerNode.info.dynamic ? accumulator + 1 : accumulator,
0
- ) < this.numberOfWorkers
+ ) < this.minimumNumberOfWorkers
) {
this.createAndSetupWorkerNode()
}
this.started = false
}
- protected async sendKillMessageToWorker (
- workerNodeKey: number
- ): Promise<void> {
+ private async sendKillMessageToWorker (workerNodeKey: number): Promise<void> {
await new Promise<void>((resolve, reject) => {
- if (workerNodeKey < 0 || workerNodeKey >= this.workerNodes.length) {
+ if (this.workerNodes?.[workerNodeKey] == null) {
reject(new Error(`Invalid worker node key '${workerNodeKey}'`))
return
}
'taskFinished',
flushedTasks,
this.opts.tasksQueueOptions?.tasksFinishedTimeout ??
- getDefaultTasksQueueOptions(this.maxSize).tasksFinishedTimeout
+ getDefaultTasksQueueOptions(
+ this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
+ ).tasksFinishedTimeout
)
await this.sendKillMessageToWorker(workerNodeKey)
await workerNode.terminate()
workerOptions: this.opts.workerOptions,
tasksQueueBackPressureSize:
this.opts.tasksQueueOptions?.size ??
- getDefaultTasksQueueOptions(this.maxSize).size
+ getDefaultTasksQueueOptions(
+ this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
+ ).size
}
)
// Flag the worker node as ready at pool startup.