fix: trigger continuous tasks stealing under proper conditions
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Wed, 14 Aug 2024 16:20:48 +0000 (18:20 +0200)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Wed, 14 Aug 2024 16:20:48 +0000 (18:20 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
src/pools/abstract-pool.ts

index 8f7a6e4765d8145f4eca8d18e996bdaa291ac196..8a41821011841db2d754fee027a80fb5b51e8255 100644 (file)
@@ -311,13 +311,6 @@ export abstract class AbstractPool<
         utilization: round(this.utilization),
       }),
       workerNodes: this.workerNodes.length,
-      idleWorkerNodes: this.workerNodes.reduce(
-        (accumulator, workerNode) =>
-          workerNode.usage.tasks.executing === 0
-            ? accumulator + 1
-            : accumulator,
-        0
-      ),
       ...(this.opts.enableTasksQueue === true && {
         stealingWorkerNodes: this.workerNodes.reduce(
           (accumulator, workerNode) =>
@@ -325,6 +318,11 @@ export abstract class AbstractPool<
           0
         ),
       }),
+      idleWorkerNodes: this.workerNodes.reduce(
+        (accumulator, _, workerNodeKey) =>
+          this.isWorkerNodeIdle(workerNodeKey) ? accumulator + 1 : accumulator,
+        0
+      ),
       busyWorkerNodes: this.workerNodes.reduce(
         (accumulator, _, workerNodeKey) =>
           this.isWorkerNodeBusy(workerNodeKey) ? accumulator + 1 : accumulator,
@@ -865,6 +863,16 @@ export abstract class AbstractPool<
     )
   }
 
+  private isWorkerNodeIdle (workerNodeKey: number): boolean {
+    if (this.opts.enableTasksQueue === true) {
+      return (
+        this.workerNodes[workerNodeKey].usage.tasks.executing === 0 &&
+        this.tasksQueueSize(workerNodeKey) === 0
+      )
+    }
+    return this.workerNodes[workerNodeKey].usage.tasks.executing === 0
+  }
+
   private isWorkerNodeBusy (workerNodeKey: number): boolean {
     if (this.opts.enableTasksQueue === true) {
       return (
@@ -1657,18 +1665,14 @@ export abstract class AbstractPool<
         message.workerId
       )
       const workerInfo = this.getWorkerInfo(localWorkerNodeKey)
-      const workerUsage = this.workerNodes[localWorkerNodeKey]?.usage
       // Kill message received from worker
       if (
         isKillBehavior(KillBehaviors.HARD, message.kill) ||
         (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
-          ((this.opts.enableTasksQueue === false &&
-            workerUsage.tasks.executing === 0) ||
-            (this.opts.enableTasksQueue === true &&
-              workerUsage.tasks.executing === 0 &&
-              this.tasksQueueSize(localWorkerNodeKey) === 0 &&
-              workerInfo != null &&
-              !workerInfo.continuousStealing)))
+          this.isWorkerNodeIdle(localWorkerNodeKey) &&
+          workerInfo != null &&
+          !workerInfo.continuousStealing &&
+          !workerInfo.stealing)
       ) {
         // Flag the worker node as not ready immediately
         this.flagWorkerNodeAsNotReady(localWorkerNodeKey)
@@ -1959,27 +1963,21 @@ export abstract class AbstractPool<
     }
     const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
     if (
-      this.cannotStealTask() ||
-      (this.info.stealingWorkerNodes ?? 0) >
-        Math.round(
-          this.workerNodes.length *
-            // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
-            this.opts.tasksQueueOptions!.tasksStealingRatio!
-        )
+      !workerNodeInfo.continuousStealing &&
+      (this.cannotStealTask() ||
+        (this.info.stealingWorkerNodes ?? 0) >
+          Math.round(
+            this.workerNodes.length *
+              // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+              this.opts.tasksQueueOptions!.tasksStealingRatio!
+          ))
     ) {
-      workerNodeInfo.continuousStealing = false
-      if (workerNodeTasksUsage.sequentiallyStolen > 0) {
-        this.resetTaskSequentiallyStolenStatisticsWorkerUsage(
-          workerNodeKey,
-          previousStolenTask?.name
-        )
-      }
       return
     }
     if (
-      workerNodeInfo.continuousStealing ||
-      workerNodeTasksUsage.executing > 0 ||
-      this.tasksQueueSize(workerNodeKey) > 0
+      workerNodeInfo.continuousStealing &&
+      (workerNodeTasksUsage.executing > 0 ||
+        this.tasksQueueSize(workerNodeKey) > 0)
     ) {
       workerNodeInfo.continuousStealing = false
       if (workerNodeTasksUsage.sequentiallyStolen > 0) {
@@ -2160,10 +2158,7 @@ export abstract class AbstractPool<
           // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
           this.executeTask(workerNodeKey, this.dequeueTask(workerNodeKey)!)
         }
-        if (
-          workerNodeTasksUsage.executing === 0 &&
-          this.tasksQueueSize(workerNodeKey) === 0
-        ) {
+        if (this.isWorkerNodeIdle(workerNodeKey)) {
           workerNode.emit('idle', {
             workerNodeKey,
           })