import type {
MessageValue,
PromiseResponseWrapper,
- Task
+ Task,
+ Writable
} from '../utility-types'
import {
DEFAULT_TASK_NAME,
}
private checkValidTasksQueueOptions (
- tasksQueueOptions: TasksQueueOptions
+ tasksQueueOptions: Writable<TasksQueueOptions>
): void {
if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
throw new TypeError('Invalid tasks queue options: must be a plain object')
}
if (
tasksQueueOptions?.queueMaxSize != null &&
- !Number.isSafeInteger(tasksQueueOptions.queueMaxSize)
+ tasksQueueOptions?.size != null
) {
- throw new TypeError(
- 'Invalid worker node tasks queue max size: must be an integer'
+ throw new Error(
+ 'Invalid tasks queue options: cannot specify both queueMaxSize and size'
)
}
+ if (tasksQueueOptions?.queueMaxSize != null) {
+ tasksQueueOptions.size = tasksQueueOptions.queueMaxSize
+ }
if (
- tasksQueueOptions?.queueMaxSize != null &&
- tasksQueueOptions.queueMaxSize <= 0
+ tasksQueueOptions?.size != null &&
+ !Number.isSafeInteger(tasksQueueOptions.size)
) {
+ throw new TypeError(
+ 'Invalid worker node tasks queue max size: must be an integer'
+ )
+ }
+ if (tasksQueueOptions?.size != null && tasksQueueOptions.size <= 0) {
throw new RangeError(
- `Invalid worker node tasks queue max size: ${tasksQueueOptions.queueMaxSize} is a negative integer or zero`
+ `Invalid worker node tasks queue max size: ${tasksQueueOptions.size} is a negative integer or zero`
)
}
}
this.checkValidTasksQueueOptions(tasksQueueOptions)
this.opts.tasksQueueOptions =
this.buildTasksQueueOptions(tasksQueueOptions)
- this.setTasksQueueMaxSize(
- this.opts.tasksQueueOptions.queueMaxSize as number
- )
+ this.setTasksQueueMaxSize(this.opts.tasksQueueOptions.size as number)
} else if (this.opts.tasksQueueOptions != null) {
delete this.opts.tasksQueueOptions
}
}
- private setTasksQueueMaxSize (queueMaxSize: number): void {
+ private setTasksQueueMaxSize (size: number): void {
for (const workerNode of this.workerNodes) {
- workerNode.tasksQueueBackPressureSize = queueMaxSize
+ workerNode.tasksQueueBackPressureSize = size
}
}
): TasksQueueOptions {
return {
...{
- queueMaxSize: Math.pow(this.maxSize, 2),
+ size: Math.pow(this.maxSize, 2),
concurrency: 1
},
...tasksQueueOptions
const workerNode = new WorkerNode<Worker, Data>(
worker,
this.worker,
- this.opts.tasksQueueOptions?.queueMaxSize ?? Math.pow(this.maxSize, 2)
+ this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
)
// Flag the worker node as ready at pool startup.
if (this.starting) {
expect(pool.opts.enableTasksQueue).toBe(true)
expect(pool.opts.tasksQueueOptions).toStrictEqual({
concurrency: 2,
- queueMaxSize: 4
+ size: 4
})
expect(pool.opts.workerChoiceStrategy).toBe(
WorkerChoiceStrategies.LEAST_USED
expect(pool.opts.enableTasksQueue).toBe(true)
expect(pool.opts.tasksQueueOptions).toStrictEqual({
concurrency: 1,
- queueMaxSize: 4
+ size: 4
})
pool.enableTasksQueue(true, { concurrency: 2 })
expect(pool.opts.enableTasksQueue).toBe(true)
expect(pool.opts.tasksQueueOptions).toStrictEqual({
concurrency: 2,
- queueMaxSize: 4
+ size: 4
})
pool.enableTasksQueue(false)
expect(pool.opts.enableTasksQueue).toBe(false)
)
expect(pool.opts.tasksQueueOptions).toStrictEqual({
concurrency: 1,
- queueMaxSize: 4
+ size: 4
})
pool.setTasksQueueOptions({ concurrency: 2 })
expect(pool.opts.tasksQueueOptions).toStrictEqual({
concurrency: 2,
- queueMaxSize: 4
+ size: 4
})
expect(() =>
pool.setTasksQueueOptions('invalidTasksQueueOptions')
expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrowError(
new TypeError('Invalid worker node tasks concurrency: must be an integer')
)
- expect(() => pool.setTasksQueueOptions({ queueMaxSize: 0 })).toThrowError(
+ expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrowError(
new RangeError(
'Invalid worker node tasks queue max size: 0 is a negative integer or zero'
)
)
- expect(() => pool.setTasksQueueOptions({ queueMaxSize: -1 })).toThrowError(
+ expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrowError(
new RangeError(
'Invalid worker node tasks queue max size: -1 is a negative integer or zero'
)
)
- expect(() => pool.setTasksQueueOptions({ queueMaxSize: 0.2 })).toThrowError(
+ expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrowError(
new TypeError(
'Invalid worker node tasks queue max size: must be an integer'
)