}
}
- private redistributeQueuedTasks (workerNodeKey: number): void {
- if (workerNodeKey === -1 || this.cannotStealTask()) {
+ private redistributeQueuedTasks (sourceWorkerNodeKey: number): void {
+ if (sourceWorkerNodeKey === -1 || this.cannotStealTask()) {
return
}
- while (this.tasksQueueSize(workerNodeKey) > 0) {
+ while (this.tasksQueueSize(sourceWorkerNodeKey) > 0) {
const destinationWorkerNodeKey = this.workerNodes.reduce(
(minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
- return workerNode.info.ready &&
+ return sourceWorkerNodeKey !== workerNodeKey &&
+ workerNode.info.ready &&
workerNode.usage.tasks.queued <
workerNodes[minWorkerNodeKey].usage.tasks.queued
? workerNodeKey
this.handleTask(
destinationWorkerNodeKey,
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- this.dequeueTask(workerNodeKey)!
+ this.dequeueTask(sourceWorkerNodeKey)!
)
}
}
}
private updateTaskSequentiallyStolenStatisticsWorkerUsage (
- workerNodeKey: number
+ workerNodeKey: number,
+ taskName: string,
+ previousStolenTaskName: string
): void {
const workerNode = this.workerNodes[workerNodeKey]
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
- if (workerNode?.usage != null) {
+ if (workerNode.usage != null) {
++workerNode.usage.tasks.sequentiallyStolen
}
- }
-
- private updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
- workerNodeKey: number,
- taskName: string
- ): void {
- const workerNode = this.workerNodes[workerNodeKey]
+ const taskFunctionWorkerUsage =
+ workerNode.getTaskFunctionWorkerUsage(taskName)
if (
this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
- workerNode.getTaskFunctionWorkerUsage(taskName) != null
+ taskFunctionWorkerUsage != null &&
+ (taskFunctionWorkerUsage.tasks.sequentiallyStolen === 0 ||
+ (previousStolenTaskName === taskName &&
+ taskFunctionWorkerUsage.tasks.sequentiallyStolen > 0))
) {
- const taskFunctionWorkerUsage =
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- workerNode.getTaskFunctionWorkerUsage(taskName)!
++taskFunctionWorkerUsage.tasks.sequentiallyStolen
+ } else if (
+ this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
+ taskFunctionWorkerUsage != null
+ ) {
+ taskFunctionWorkerUsage.tasks.sequentiallyStolen = 0
}
}
private resetTaskSequentiallyStolenStatisticsWorkerUsage (
- workerNodeKey: number
+ workerNodeKey: number,
+ taskName: string
): void {
const workerNode = this.workerNodes[workerNodeKey]
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
if (workerNode?.usage != null) {
workerNode.usage.tasks.sequentiallyStolen = 0
}
- }
-
- private resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
- workerNodeKey: number,
- taskName: string
- ): void {
- const workerNode = this.workerNodes[workerNodeKey]
if (
this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
workerNode.getTaskFunctionWorkerUsage(taskName) != null
) {
if (workerInfo != null && previousStolenTask != null) {
workerInfo.stealing = false
+ this.resetTaskSequentiallyStolenStatisticsWorkerUsage(
+ workerNodeKey,
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ previousStolenTask.name!
+ )
}
return
}
if (
workerInfo != null &&
previousStolenTask != null &&
- workerNodeTasksUsage.sequentiallyStolen > 0 &&
(workerNodeTasksUsage.executing > 0 ||
this.tasksQueueSize(workerNodeKey) > 0)
) {
workerInfo.stealing = false
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- for (const taskFunctionProperties of workerInfo.taskFunctionsProperties!) {
- this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
- workerNodeKey,
- taskFunctionProperties.name
- )
- }
- this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
+ this.resetTaskSequentiallyStolenStatisticsWorkerUsage(
+ workerNodeKey,
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ previousStolenTask.name!
+ )
return
}
if (workerInfo == null) {
}
workerInfo.stealing = true
const stolenTask = this.workerNodeStealTask(workerNodeKey)
- if (
- this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
- stolenTask != null
- ) {
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- const taskFunctionTasksWorkerUsage = this.workerNodes[
- workerNodeKey
+ if (stolenTask != null && previousStolenTask != null) {
+ this.updateTaskSequentiallyStolenStatisticsWorkerUsage(
+ workerNodeKey,
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- ].getTaskFunctionWorkerUsage(stolenTask.name!)!.tasks
- if (
- taskFunctionTasksWorkerUsage.sequentiallyStolen === 0 ||
- (previousStolenTask != null &&
- previousStolenTask.name === stolenTask.name &&
- taskFunctionTasksWorkerUsage.sequentiallyStolen > 0)
- ) {
- this.updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
- workerNodeKey,
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- stolenTask.name!
- )
- } else {
- this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
- workerNodeKey,
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- stolenTask.name!
- )
- }
+ stolenTask.name!,
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ previousStolenTask.name!
+ )
}
sleep(exponentialDelay(workerNodeTasksUsage.sequentiallyStolen))
.then(() => {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const task = sourceWorkerNode.dequeueLastPrioritizedTask()!
this.handleTask(workerNodeKey, task)
- this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!)
return task