fix: avoid tasks redistribution on the errored worker node
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Thu, 9 May 2024 13:00:49 +0000 (15:00 +0200)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Thu, 9 May 2024 13:00:49 +0000 (15:00 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
src/pools/abstract-pool.ts

index 0de96a411d0273d63604dc41dfc7ad3fc92cfed8..73f4e31afbc79200b85e40b1f0ddc671aac75327 100644 (file)
@@ -1619,14 +1619,15 @@ export abstract class AbstractPool<
     }
   }
 
-  private redistributeQueuedTasks (workerNodeKey: number): void {
-    if (workerNodeKey === -1 || this.cannotStealTask()) {
+  private redistributeQueuedTasks (sourceWorkerNodeKey: number): void {
+    if (sourceWorkerNodeKey === -1 || this.cannotStealTask()) {
       return
     }
-    while (this.tasksQueueSize(workerNodeKey) > 0) {
+    while (this.tasksQueueSize(sourceWorkerNodeKey) > 0) {
       const destinationWorkerNodeKey = this.workerNodes.reduce(
         (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
-          return workerNode.info.ready &&
+          return sourceWorkerNodeKey !== workerNodeKey &&
+            workerNode.info.ready &&
             workerNode.usage.tasks.queued <
               workerNodes[minWorkerNodeKey].usage.tasks.queued
             ? workerNodeKey
@@ -1637,7 +1638,7 @@ export abstract class AbstractPool<
       this.handleTask(
         destinationWorkerNodeKey,
         // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
-        this.dequeueTask(workerNodeKey)!
+        this.dequeueTask(sourceWorkerNodeKey)!
       )
     }
   }
@@ -1663,46 +1664,42 @@ export abstract class AbstractPool<
   }
 
   private updateTaskSequentiallyStolenStatisticsWorkerUsage (
-    workerNodeKey: number
+    workerNodeKey: number,
+    taskName: string,
+    previousStolenTaskName: string
   ): void {
     const workerNode = this.workerNodes[workerNodeKey]
     // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
-    if (workerNode?.usage != null) {
+    if (workerNode.usage != null) {
       ++workerNode.usage.tasks.sequentiallyStolen
     }
-  }
-
-  private updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
-    workerNodeKey: number,
-    taskName: string
-  ): void {
-    const workerNode = this.workerNodes[workerNodeKey]
+    const taskFunctionWorkerUsage =
+      workerNode.getTaskFunctionWorkerUsage(taskName)
     if (
       this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
-      workerNode.getTaskFunctionWorkerUsage(taskName) != null
+      taskFunctionWorkerUsage != null &&
+      (taskFunctionWorkerUsage.tasks.sequentiallyStolen === 0 ||
+        (previousStolenTaskName === taskName &&
+          taskFunctionWorkerUsage.tasks.sequentiallyStolen > 0))
     ) {
-      const taskFunctionWorkerUsage =
-        // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
-        workerNode.getTaskFunctionWorkerUsage(taskName)!
       ++taskFunctionWorkerUsage.tasks.sequentiallyStolen
+    } else if (
+      this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
+      taskFunctionWorkerUsage != null
+    ) {
+      taskFunctionWorkerUsage.tasks.sequentiallyStolen = 0
     }
   }
 
   private resetTaskSequentiallyStolenStatisticsWorkerUsage (
-    workerNodeKey: number
+    workerNodeKey: number,
+    taskName: string
   ): void {
     const workerNode = this.workerNodes[workerNodeKey]
     // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
     if (workerNode?.usage != null) {
       workerNode.usage.tasks.sequentiallyStolen = 0
     }
-  }
-
-  private resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
-    workerNodeKey: number,
-    taskName: string
-  ): void {
-    const workerNode = this.workerNodes[workerNodeKey]
     if (
       this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
       workerNode.getTaskFunctionWorkerUsage(taskName) != null
@@ -1732,6 +1729,11 @@ export abstract class AbstractPool<
     ) {
       if (workerInfo != null && previousStolenTask != null) {
         workerInfo.stealing = false
+        this.resetTaskSequentiallyStolenStatisticsWorkerUsage(
+          workerNodeKey,
+          // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+          previousStolenTask.name!
+        )
       }
       return
     }
@@ -1739,19 +1741,15 @@ export abstract class AbstractPool<
     if (
       workerInfo != null &&
       previousStolenTask != null &&
-      workerNodeTasksUsage.sequentiallyStolen > 0 &&
       (workerNodeTasksUsage.executing > 0 ||
         this.tasksQueueSize(workerNodeKey) > 0)
     ) {
       workerInfo.stealing = false
-      // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
-      for (const taskFunctionProperties of workerInfo.taskFunctionsProperties!) {
-        this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
-          workerNodeKey,
-          taskFunctionProperties.name
-        )
-      }
-      this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
+      this.resetTaskSequentiallyStolenStatisticsWorkerUsage(
+        workerNodeKey,
+        // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+        previousStolenTask.name!
+      )
       return
     }
     if (workerInfo == null) {
@@ -1761,33 +1759,14 @@ export abstract class AbstractPool<
     }
     workerInfo.stealing = true
     const stolenTask = this.workerNodeStealTask(workerNodeKey)
-    if (
-      this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
-      stolenTask != null
-    ) {
-      // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
-      const taskFunctionTasksWorkerUsage = this.workerNodes[
-        workerNodeKey
+    if (stolenTask != null && previousStolenTask != null) {
+      this.updateTaskSequentiallyStolenStatisticsWorkerUsage(
+        workerNodeKey,
         // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
-      ].getTaskFunctionWorkerUsage(stolenTask.name!)!.tasks
-      if (
-        taskFunctionTasksWorkerUsage.sequentiallyStolen === 0 ||
-        (previousStolenTask != null &&
-          previousStolenTask.name === stolenTask.name &&
-          taskFunctionTasksWorkerUsage.sequentiallyStolen > 0)
-      ) {
-        this.updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
-          workerNodeKey,
-          // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
-          stolenTask.name!
-        )
-      } else {
-        this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
-          workerNodeKey,
-          // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
-          stolenTask.name!
-        )
-      }
+        stolenTask.name!,
+        // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+        previousStolenTask.name!
+      )
     }
     sleep(exponentialDelay(workerNodeTasksUsage.sequentiallyStolen))
       .then(() => {
@@ -1819,7 +1798,6 @@ export abstract class AbstractPool<
       // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
       const task = sourceWorkerNode.dequeueLastPrioritizedTask()!
       this.handleTask(workerNodeKey, task)
-      this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
       // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
       this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!)
       return task