)
}
if (
- workerChoiceStrategyOptions.choiceRetries != null &&
- !Number.isSafeInteger(workerChoiceStrategyOptions.choiceRetries)
+ workerChoiceStrategyOptions.retries != null &&
+ !Number.isSafeInteger(workerChoiceStrategyOptions.retries)
) {
throw new TypeError(
- 'Invalid worker choice strategy options: choice retries must be an integer'
+ 'Invalid worker choice strategy options: retries must be an integer'
)
}
if (
- workerChoiceStrategyOptions.choiceRetries != null &&
- workerChoiceStrategyOptions.choiceRetries < 0
+ workerChoiceStrategyOptions.retries != null &&
+ workerChoiceStrategyOptions.retries < 0
) {
throw new RangeError(
- `Invalid worker choice strategy options: choice retries '${workerChoiceStrategyOptions.choiceRetries}' must be greater or equal than zero`
+ `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
)
}
if (
this.checkValidTasksQueueOptions(tasksQueueOptions)
this.opts.tasksQueueOptions =
this.buildTasksQueueOptions(tasksQueueOptions)
- this.setTasksQueueMaxSize(this.opts.tasksQueueOptions.size as number)
+ this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number)
} else if (this.opts.tasksQueueOptions != null) {
delete this.opts.tasksQueueOptions
}
}
- private setTasksQueueMaxSize (size: number): void {
+ private setTasksQueueSize (size: number): void {
for (const workerNode of this.workerNodes) {
workerNode.tasksQueueBackPressureSize = size
}
* @virtual
*/
protected setupHook (): void {
- /** Intentionally empty */
+ /* Intentionally empty */
}
/**
}
}
+ private updateTaskStolenStatisticsWorkerUsage (
+ workerNodeKey: number,
+ taskName: string
+ ): void {
+ const workerNode = this.workerNodes[workerNodeKey]
+ if (workerNode?.usage != null) {
+ ++workerNode.usage.tasks.stolen
+ }
+ if (
+ this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
+ workerNode.getTaskFunctionWorkerUsage(taskName) != null
+ ) {
+ const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
+ taskName
+ ) as WorkerUsage
+ ++taskFunctionWorkerUsage.tasks.stolen
+ }
+ }
+
private taskStealingOnEmptyQueue (workerId: number): void {
const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
} else {
this.enqueueTask(destinationWorkerNodeKey, task)
}
- if (destinationWorkerNode?.usage != null) {
- ++destinationWorkerNode.usage.tasks.stolen
- }
- if (
- this.shallUpdateTaskFunctionWorkerUsage(destinationWorkerNodeKey) &&
- destinationWorkerNode.getTaskFunctionWorkerUsage(
- task.name as string
- ) != null
- ) {
- const taskFunctionWorkerUsage =
- destinationWorkerNode.getTaskFunctionWorkerUsage(
- task.name as string
- ) as WorkerUsage
- ++taskFunctionWorkerUsage.tasks.stolen
- }
+ this.updateTaskStolenStatisticsWorkerUsage(
+ destinationWorkerNodeKey,
+ task.name as string
+ )
break
}
}
}
private tasksStealingOnBackPressure (workerId: number): void {
- if ((this.opts.tasksQueueOptions?.size as number) <= 1) {
+ const sizeOffset = 1
+ if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) {
return
}
const sourceWorkerNode =
workerNode.info.ready &&
workerNode.info.id !== workerId &&
workerNode.usage.tasks.queued <
- (this.opts.tasksQueueOptions?.size as number) - 1
+ (this.opts.tasksQueueOptions?.size as number) - sizeOffset
) {
const task = {
...(sourceWorkerNode.popTask() as Task<Data>),
} else {
this.enqueueTask(workerNodeKey, task)
}
- if (workerNode?.usage != null) {
- ++workerNode.usage.tasks.stolen
- }
- if (
- this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
- workerNode.getTaskFunctionWorkerUsage(task.name as string) != null
- ) {
- const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
- task.name as string
- ) as WorkerUsage
- ++taskFunctionWorkerUsage.tasks.stolen
- }
+ this.updateTaskStolenStatisticsWorkerUsage(
+ workerNodeKey,
+ task.name as string
+ )
}
}
}
this.workerNodes.push(workerNode)
const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
if (workerNodeKey === -1) {
- throw new Error('Worker node added not found')
+ throw new Error('Worker added not found in worker nodes')
}
return workerNodeKey
}