import type {
IWorker,
IWorkerNode,
+ TaskStatistics,
WorkerInfo,
WorkerNodeEventDetail,
WorkerType,
}
private updateTaskSequentiallyStolenStatisticsWorkerUsage (
- workerNodeKey: number,
- taskName: string
+ workerNodeKey: number
): void {
const workerNode = this.workerNodes[workerNodeKey]
if (workerNode?.usage != null) {
++workerNode.usage.tasks.sequentiallyStolen
}
+ }
+
+ private updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
+ workerNodeKey: number,
+ taskName: string
+ ): void {
+ const workerNode = this.workerNodes[workerNodeKey]
if (
this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
workerNode.getTaskFunctionWorkerUsage(taskName) != null
}
private resetTaskSequentiallyStolenStatisticsWorkerUsage (
- workerNodeKey: number,
- taskName: string
+ workerNodeKey: number
): void {
const workerNode = this.workerNodes[workerNodeKey]
if (workerNode?.usage != null) {
workerNode.usage.tasks.sequentiallyStolen = 0
}
+ }
+
+ private resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
+ workerNodeKey: number,
+ taskName: string
+ ): void {
+ const workerNode = this.workerNodes[workerNodeKey]
if (
this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
workerNode.getTaskFunctionWorkerUsage(taskName) != null
(workerNodeTasksUsage.executing > 0 ||
this.tasksQueueSize(workerNodeKey) > 0)
) {
- this.resetTaskSequentiallyStolenStatisticsWorkerUsage(
- workerNodeKey,
- previousStolenTask.name as string
- )
+ for (const taskName of this.workerNodes[workerNodeKey].info
+ .taskFunctionNames as string[]) {
+ this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
+ workerNodeKey,
+ taskName
+ )
+ }
+ this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
return
}
const stolenTask = this.workerNodeStealTask(workerNodeKey)
+ if (
+ this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
+ stolenTask != null
+ ) {
+ const taskFunctionTasksWorkerUsage = this.workerNodes[
+ workerNodeKey
+ ].getTaskFunctionWorkerUsage(stolenTask.name as string)
+ ?.tasks as TaskStatistics
+ if (
+ taskFunctionTasksWorkerUsage.sequentiallyStolen === 0 ||
+ (previousStolenTask != null &&
+ previousStolenTask.name === stolenTask.name &&
+ taskFunctionTasksWorkerUsage.sequentiallyStolen > 0)
+ ) {
+ this.updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
+ workerNodeKey,
+ stolenTask.name as string
+ )
+ } else {
+ this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
+ workerNodeKey,
+ stolenTask.name as string
+ )
+ }
+ }
sleep(exponentialDelay(workerNodeTasksUsage.sequentiallyStolen))
.then(() => {
this.handleIdleWorkerNodeEvent(event, stolenTask)
} else {
this.enqueueTask(workerNodeKey, task)
}
- this.updateTaskSequentiallyStolenStatisticsWorkerUsage(
- workerNodeKey,
- task.name as string
- )
+ this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
this.updateTaskStolenStatisticsWorkerUsage(
workerNodeKey,
task.name as string