this.checkValidTasksQueueOptions(tasksQueueOptions)
this.opts.tasksQueueOptions =
this.buildTasksQueueOptions(tasksQueueOptions)
- this.setTasksQueueMaxSize(this.opts.tasksQueueOptions.size as number)
+ this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number)
} else if (this.opts.tasksQueueOptions != null) {
delete this.opts.tasksQueueOptions
}
}
- private setTasksQueueMaxSize (size: number): void {
+ private setTasksQueueSize (size: number): void {
for (const workerNode of this.workerNodes) {
workerNode.tasksQueueBackPressureSize = size
}
* @virtual
*/
protected setupHook (): void {
- /** Intentionally empty */
+ /* Intentionally empty */
}
/**
private updateTaskStolenStatisticsWorkerUsage (
workerNodeKey: number,
- workerNode: IWorkerNode<Worker, Data>,
taskName: string
): void {
+ const workerNode = this.workerNodes[workerNodeKey]
if (workerNode?.usage != null) {
++workerNode.usage.tasks.stolen
}
}
this.updateTaskStolenStatisticsWorkerUsage(
destinationWorkerNodeKey,
- destinationWorkerNode,
task.name as string
)
break
}
private tasksStealingOnBackPressure (workerId: number): void {
- if ((this.opts.tasksQueueOptions?.size as number) <= 1) {
+ const sizeOffset = 1
+ if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) {
return
}
const sourceWorkerNode =
workerNode.info.ready &&
workerNode.info.id !== workerId &&
workerNode.usage.tasks.queued <
- (this.opts.tasksQueueOptions?.size as number) - 1
+ (this.opts.tasksQueueOptions?.size as number) - sizeOffset
) {
const task = {
...(sourceWorkerNode.popTask() as Task<Data>),
}
this.updateTaskStolenStatisticsWorkerUsage(
workerNodeKey,
- workerNode,
task.name as string
)
}
this.workerNodes.push(workerNode)
const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
if (workerNodeKey === -1) {
- throw new Error('Worker node added not found')
+ throw new Error('Worker added not found in worker nodes')
}
return workerNodeKey
}