0
),
}),
+ ...(this.opts.enableTasksQueue === true && {
+ stolenWorkerNodes: this.workerNodes.reduce(
+ (accumulator, workerNode) =>
+ workerNode.info.stolen ? accumulator + 1 : accumulator,
+ 0
+ ),
+ }),
busyWorkerNodes: this.workerNodes.reduce(
(accumulator, _, workerNodeKey) =>
this.isWorkerNodeBusy(workerNodeKey) ? accumulator + 1 : accumulator,
((this.opts.enableTasksQueue === false &&
workerUsage.tasks.executing === 0) ||
(this.opts.enableTasksQueue === true &&
- workerInfo != null &&
- !workerInfo.stealing &&
workerUsage.tasks.executing === 0 &&
- this.tasksQueueSize(localWorkerNodeKey) === 0)))
+ this.tasksQueueSize(localWorkerNodeKey) === 0 &&
+ workerInfo != null &&
+ !workerInfo.stealing)))
) {
// Flag the worker node as not ready immediately
this.flagWorkerNodeAsNotReady(localWorkerNodeKey)
)
) {
if (previousStolenTask != null) {
- workerInfo.stealing = false
this.resetTaskSequentiallyStolenStatisticsWorkerUsage(
workerNodeKey,
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
(workerNodeTasksUsage.executing > 0 ||
this.tasksQueueSize(workerNodeKey) > 0)
) {
- workerInfo.stealing = false
this.resetTaskSequentiallyStolenStatisticsWorkerUsage(
workerNodeKey,
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
)
return
}
- workerInfo.stealing = true
const stolenTask = this.workerNodeStealTask(workerNodeKey)
if (stolenTask != null) {
this.updateTaskSequentiallyStolenStatisticsWorkerUsage(
})
}
+ private readonly stealTask = (
+ sourceWorkerNode: IWorkerNode<Worker, Data>,
+ destinationWorkerNodeKey: number
+ ): Task<Data> | undefined => {
+ const destinationWorkerInfo = this.getWorkerInfo(destinationWorkerNodeKey)
+ if (destinationWorkerInfo == null) {
+ throw new Error(
+ `Worker node with key '${destinationWorkerNodeKey.toString()}' not found in pool`
+ )
+ }
+ destinationWorkerInfo.stealing = true
+ sourceWorkerNode.info.stolen = true
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ const task = sourceWorkerNode.dequeueLastPrioritizedTask()!
+ sourceWorkerNode.info.stolen = false
+ destinationWorkerInfo.stealing = false
+ this.handleTask(destinationWorkerNodeKey, task)
+ this.updateTaskStolenStatisticsWorkerUsage(
+ destinationWorkerNodeKey,
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ task.name!
+ )
+ return task
+ }
+
private readonly workerNodeStealTask = (
workerNodeKey: number
): Task<Data> | undefined => {
const sourceWorkerNode = workerNodes.find(
(sourceWorkerNode, sourceWorkerNodeKey) =>
sourceWorkerNode.info.ready &&
+ !sourceWorkerNode.info.stolen &&
!sourceWorkerNode.info.stealing &&
sourceWorkerNodeKey !== workerNodeKey &&
sourceWorkerNode.usage.tasks.queued > 0
)
if (sourceWorkerNode != null) {
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- const task = sourceWorkerNode.dequeueLastPrioritizedTask()!
- this.handleTask(workerNodeKey, task)
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!)
- return task
+ return this.stealTask(sourceWorkerNode, workerNodeKey)
}
}
if (
sourceWorkerNode.usage.tasks.queued > 0 &&
workerNode.info.ready &&
+ !workerNode.info.stolen &&
!workerNode.info.stealing &&
workerNode.info.id !== workerId &&
workerNode.usage.tasks.queued <
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
this.opts.tasksQueueOptions!.size! - sizeOffset
) {
- const workerInfo = this.getWorkerInfo(workerNodeKey)
- if (workerInfo == null) {
- throw new Error(
- `Worker node with key '${workerNodeKey.toString()}' not found in pool`
- )
- }
- workerInfo.stealing = true
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- const task = sourceWorkerNode.dequeueLastPrioritizedTask()!
- this.handleTask(workerNodeKey, task)
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!)
- workerInfo.stealing = false
+ this.stealTask(sourceWorkerNode, workerNodeKey)
}
}
}