!Number.isSafeInteger(tasksQueueOptions.concurrency)
) {
throw new TypeError(
- 'Invalid worker tasks concurrency: must be an integer'
+ 'Invalid worker node tasks concurrency: must be an integer'
)
}
if (
tasksQueueOptions.concurrency <= 0
) {
throw new RangeError(
- `Invalid worker tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero`
+ `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero`
+ )
+ }
+ if (
+ tasksQueueOptions?.queueMaxSize != null &&
+ !Number.isSafeInteger(tasksQueueOptions.queueMaxSize)
+ ) {
+ throw new TypeError(
+ 'Invalid worker node tasks queue max size: must be an integer'
+ )
+ }
+ if (
+ tasksQueueOptions?.queueMaxSize != null &&
+ tasksQueueOptions.queueMaxSize <= 0
+ ) {
+ throw new RangeError(
+ `Invalid worker node tasks queue max size: ${tasksQueueOptions.queueMaxSize} is a negative integer or zero`
)
}
}
this.checkValidTasksQueueOptions(tasksQueueOptions)
this.opts.tasksQueueOptions =
this.buildTasksQueueOptions(tasksQueueOptions)
+ this.setTasksQueueMaxSize(
+ this.opts.tasksQueueOptions.queueMaxSize as number
+ )
} else if (this.opts.tasksQueueOptions != null) {
delete this.opts.tasksQueueOptions
}
}
+ private setTasksQueueMaxSize (queueMaxSize: number): void {
+ for (const workerNode of this.workerNodes) {
+ workerNode.tasksQueueBackPressureSize = queueMaxSize
+ }
+ }
+
private buildTasksQueueOptions (
tasksQueueOptions: TasksQueueOptions
): TasksQueueOptions {
return {
- concurrency: tasksQueueOptions?.concurrency ?? 1
+ ...{
+ queueMaxSize: Math.pow(this.maxSize, 2),
+ concurrency: 1
+ },
+ ...tasksQueueOptions
}
}
const workerNode = new WorkerNode<Worker, Data>(
worker,
this.worker,
- this.maxSize
+ this.opts.tasksQueueOptions?.queueMaxSize ?? Math.pow(this.maxSize, 2)
)
// Flag the worker node as ready at pool startup.
if (this.starting) {