fix: account stolen tasks on a per task function basis properly
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Sat, 25 Nov 2023 12:35:50 +0000 (13:35 +0100)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Sat, 25 Nov 2023 12:35:50 +0000 (13:35 +0100)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
src/pools/abstract-pool.ts

index ab319c627884c25c4851dc34ecb2626ebc528fc3..1759d03e6648736a393ec22a3293b0bde4225e48 100644 (file)
@@ -35,6 +35,7 @@ import {
 import type {
   IWorker,
   IWorkerNode,
+  TaskStatistics,
   WorkerInfo,
   WorkerNodeEventDetail,
   WorkerType,
@@ -1481,13 +1482,19 @@ export abstract class AbstractPool<
   }
 
   private updateTaskSequentiallyStolenStatisticsWorkerUsage (
-    workerNodeKey: number,
-    taskName: string
+    workerNodeKey: number
   ): void {
     const workerNode = this.workerNodes[workerNodeKey]
     if (workerNode?.usage != null) {
       ++workerNode.usage.tasks.sequentiallyStolen
     }
+  }
+
+  private updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
+    workerNodeKey: number,
+    taskName: string
+  ): void {
+    const workerNode = this.workerNodes[workerNodeKey]
     if (
       this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
       workerNode.getTaskFunctionWorkerUsage(taskName) != null
@@ -1500,13 +1507,19 @@ export abstract class AbstractPool<
   }
 
   private resetTaskSequentiallyStolenStatisticsWorkerUsage (
-    workerNodeKey: number,
-    taskName: string
+    workerNodeKey: number
   ): void {
     const workerNode = this.workerNodes[workerNodeKey]
     if (workerNode?.usage != null) {
       workerNode.usage.tasks.sequentiallyStolen = 0
     }
+  }
+
+  private resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
+    workerNodeKey: number,
+    taskName: string
+  ): void {
+    const workerNode = this.workerNodes[workerNodeKey]
     if (
       this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
       workerNode.getTaskFunctionWorkerUsage(taskName) != null
@@ -1535,13 +1548,42 @@ export abstract class AbstractPool<
       (workerNodeTasksUsage.executing > 0 ||
         this.tasksQueueSize(workerNodeKey) > 0)
     ) {
-      this.resetTaskSequentiallyStolenStatisticsWorkerUsage(
-        workerNodeKey,
-        previousStolenTask.name as string
-      )
+      for (const taskName of this.workerNodes[workerNodeKey].info
+        .taskFunctionNames as string[]) {
+        this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
+          workerNodeKey,
+          taskName
+        )
+      }
+      this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
       return
     }
     const stolenTask = this.workerNodeStealTask(workerNodeKey)
+    if (
+      this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
+      stolenTask != null
+    ) {
+      const taskFunctionTasksWorkerUsage = this.workerNodes[
+        workerNodeKey
+      ].getTaskFunctionWorkerUsage(stolenTask.name as string)
+        ?.tasks as TaskStatistics
+      if (
+        taskFunctionTasksWorkerUsage.sequentiallyStolen === 0 ||
+        (previousStolenTask != null &&
+          previousStolenTask.name === stolenTask.name &&
+          taskFunctionTasksWorkerUsage.sequentiallyStolen > 0)
+      ) {
+        this.updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
+          workerNodeKey,
+          stolenTask.name as string
+        )
+      } else {
+        this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
+          workerNodeKey,
+          stolenTask.name as string
+        )
+      }
+    }
     sleep(exponentialDelay(workerNodeTasksUsage.sequentiallyStolen))
       .then(() => {
         this.handleIdleWorkerNodeEvent(event, stolenTask)
@@ -1572,10 +1614,7 @@ export abstract class AbstractPool<
       } else {
         this.enqueueTask(workerNodeKey, task)
       }
-      this.updateTaskSequentiallyStolenStatisticsWorkerUsage(
-        workerNodeKey,
-        task.name as string
-      )
+      this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
       this.updateTaskStolenStatisticsWorkerUsage(
         workerNodeKey,
         task.name as string