private checkPoolOptions (opts: PoolOptions<Worker>): void {
if (isPlainObject(opts)) {
this.opts.startWorkers = opts.startWorkers ?? true
+ this.checkValidWorkerChoiceStrategy(
+ opts.workerChoiceStrategy as WorkerChoiceStrategy
+ )
this.opts.workerChoiceStrategy =
opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
- this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
+ this.checkValidWorkerChoiceStrategyOptions(
+ opts.workerChoiceStrategyOptions as WorkerChoiceStrategyOptions
+ )
this.opts.workerChoiceStrategyOptions = {
...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
...opts.workerChoiceStrategyOptions
}
- this.checkValidWorkerChoiceStrategyOptions(
- this.opts.workerChoiceStrategyOptions
- )
this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
this.opts.enableEvents = opts.enableEvents ?? true
this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
private checkValidWorkerChoiceStrategy (
workerChoiceStrategy: WorkerChoiceStrategy
): void {
- if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
+ if (
+ workerChoiceStrategy != null &&
+ !Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)
+ ) {
throw new Error(
`Invalid worker choice strategy '${workerChoiceStrategy}'`
)
private checkValidWorkerChoiceStrategyOptions (
workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
): void {
- if (!isPlainObject(workerChoiceStrategyOptions)) {
+ if (
+ workerChoiceStrategyOptions != null &&
+ !isPlainObject(workerChoiceStrategyOptions)
+ ) {
throw new TypeError(
'Invalid worker choice strategy options: must be a plain object'
)
}
if (
- workerChoiceStrategyOptions.retries != null &&
+ workerChoiceStrategyOptions?.retries != null &&
!Number.isSafeInteger(workerChoiceStrategyOptions.retries)
) {
throw new TypeError(
)
}
if (
- workerChoiceStrategyOptions.retries != null &&
+ workerChoiceStrategyOptions?.retries != null &&
workerChoiceStrategyOptions.retries < 0
) {
throw new RangeError(
)
}
if (
- workerChoiceStrategyOptions.weights != null &&
+ workerChoiceStrategyOptions?.weights != null &&
Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
) {
throw new Error(
)
}
if (
- workerChoiceStrategyOptions.measurement != null &&
+ workerChoiceStrategyOptions?.measurement != null &&
!Object.values(Measurements).includes(
workerChoiceStrategyOptions.measurement
)
}
if (
tasksQueueOptions?.concurrency != null &&
- !Number.isSafeInteger(tasksQueueOptions?.concurrency)
+ !Number.isSafeInteger(tasksQueueOptions.concurrency)
) {
throw new TypeError(
'Invalid worker node tasks concurrency: must be an integer'
}
if (
tasksQueueOptions?.concurrency != null &&
- tasksQueueOptions?.concurrency <= 0
+ tasksQueueOptions.concurrency <= 0
) {
throw new RangeError(
- `Invalid worker node tasks concurrency: ${tasksQueueOptions?.concurrency} is a negative integer or zero`
+ `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero`
)
}
if (
tasksQueueOptions?.size != null &&
- !Number.isSafeInteger(tasksQueueOptions?.size)
+ !Number.isSafeInteger(tasksQueueOptions.size)
) {
throw new TypeError(
'Invalid worker node tasks queue size: must be an integer'
)
}
- if (tasksQueueOptions?.size != null && tasksQueueOptions?.size <= 0) {
+ if (tasksQueueOptions?.size != null && tasksQueueOptions.size <= 0) {
throw new RangeError(
- `Invalid worker node tasks queue size: ${tasksQueueOptions?.size} is a negative integer or zero`
+ `Invalid worker node tasks queue size: ${tasksQueueOptions.size} is a negative integer or zero`
)
}
}
enableTasksQueue: true,
tasksQueueOptions: {
concurrency: 2,
- size: 4,
+ size: Math.pow(numberOfWorkers, 2),
taskStealing: true,
tasksStealingOnBackPressure: true
},
expect(pool.opts.enableTasksQueue).toBe(true)
expect(pool.opts.tasksQueueOptions).toStrictEqual({
concurrency: 1,
- size: 4,
+ size: Math.pow(numberOfWorkers, 2),
taskStealing: true,
tasksStealingOnBackPressure: true
})
expect(pool.opts.enableTasksQueue).toBe(true)
expect(pool.opts.tasksQueueOptions).toStrictEqual({
concurrency: 2,
- size: 4,
+ size: Math.pow(numberOfWorkers, 2),
taskStealing: true,
tasksStealingOnBackPressure: true
})
)
expect(pool.opts.tasksQueueOptions).toStrictEqual({
concurrency: 1,
- size: 4,
+ size: Math.pow(numberOfWorkers, 2),
taskStealing: true,
tasksStealingOnBackPressure: true
})
for (const workerNode of pool.workerNodes) {
+ expect(workerNode.tasksQueueBackPressureSize).toBe(
+ pool.opts.tasksQueueOptions.size
+ )
expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
expect(workerNode.onBackPressure).toBeInstanceOf(Function)
}
pool.setTasksQueueOptions({
concurrency: 2,
+ size: 2,
taskStealing: false,
tasksStealingOnBackPressure: false
})
expect(pool.opts.tasksQueueOptions).toStrictEqual({
concurrency: 2,
- size: 4,
+ size: 2,
taskStealing: false,
tasksStealingOnBackPressure: false
})
for (const workerNode of pool.workerNodes) {
+ expect(workerNode.tasksQueueBackPressureSize).toBe(
+ pool.opts.tasksQueueOptions.size
+ )
expect(workerNode.onEmptyQueue).toBeUndefined()
expect(workerNode.onBackPressure).toBeUndefined()
}
})
expect(pool.opts.tasksQueueOptions).toStrictEqual({
concurrency: 1,
- size: 4,
+ size: Math.pow(numberOfWorkers, 2),
taskStealing: true,
tasksStealingOnBackPressure: true
})
for (const workerNode of pool.workerNodes) {
+ expect(workerNode.tasksQueueBackPressureSize).toBe(
+ pool.opts.tasksQueueOptions.size
+ )
expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
expect(workerNode.onBackPressure).toBeInstanceOf(Function)
}