fix: account for all stealing worker nodes
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Mon, 19 Aug 2024 12:26:44 +0000 (14:26 +0200)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Mon, 19 Aug 2024 12:26:44 +0000 (14:26 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
src/pools/abstract-pool.ts
src/pools/pool.ts
src/pools/worker-node.ts
src/pools/worker.ts
tests/pools/abstract-pool.test.mjs
tests/pools/worker-node.test.mjs

index 7aae2c89c7cdaa67e7b162159901fa0cccf3a99a..a65cbbbcb091524da151d9ed2148b02640de09d0 100644 (file)
@@ -314,7 +314,10 @@ export abstract class AbstractPool<
       ...(this.opts.enableTasksQueue === true && {
         stealingWorkerNodes: this.workerNodes.reduce(
           (accumulator, workerNode) =>
-            workerNode.info.continuousStealing ? accumulator + 1 : accumulator,
+            workerNode.info.continuousStealing ||
+            workerNode.info.backPressureStealing
+              ? accumulator + 1
+              : accumulator,
           0
         ),
       }),
@@ -1672,7 +1675,7 @@ export abstract class AbstractPool<
           this.isWorkerNodeIdle(localWorkerNodeKey) &&
           workerInfo != null &&
           !workerInfo.continuousStealing &&
-          !workerInfo.stealing)
+          !workerInfo.backPressureStealing)
       ) {
         // Flag the worker node as not ready immediately
         this.flagWorkerNodeAsNotReady(localWorkerNodeKey)
@@ -2061,7 +2064,12 @@ export abstract class AbstractPool<
           // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
           this.opts.tasksQueueOptions!.size! - sizeOffset
       ) {
+        if (workerNode.info.backPressureStealing) {
+          continue
+        }
+        workerNode.info.backPressureStealing = true
         this.stealTask(sourceWorkerNode, workerNodeKey)
+        workerNode.info.backPressureStealing = false
       }
     }
   }
index 02a0c795f03d15bd03a2aa4b59f52ab45fca0b64..27d897b2807f15f9a9e21bc9064e11e2a0803112 100644 (file)
@@ -88,7 +88,7 @@ export interface PoolInfo {
   readonly utilization?: number
   /** Pool total worker nodes. */
   readonly workerNodes: number
-  /** Pool continuous stealing worker nodes. */
+  /** Pool tasks stealing worker nodes. */
   readonly stealingWorkerNodes?: number
   /** Pool idle worker nodes. */
   readonly idleWorkerNodes: number
index dbc27fa5fad1e6258b38d72b67eaf7878567d4bd..8748b414cd7fe0e70c1303c90ee970a114d0a1e0 100644 (file)
@@ -222,6 +222,7 @@ export class WorkerNode<Worker extends IWorker, Data = unknown>
       stealing: false,
       stolen: false,
       continuousStealing: false,
+      backPressureStealing: false,
       backPressure: false,
     }
   }
index a0742c2852ca77687e8e4b44fa14994a3126c38a..3c4fe610e101cf684d71cff095a7a28fb477d15f 100644 (file)
@@ -175,9 +175,14 @@ export interface WorkerInfo {
   stolen: boolean
   /**
    * Continuous stealing flag.
-   * This flag is set to `true` when worker node continuously steal tasks from other worker nodes.
+   * This flag is set to `true` when worker node is continuously stealing tasks from other worker nodes.
    */
   continuousStealing: boolean
+  /**
+   * Back pressure stealing flag.
+   * This flag is set to `true` when worker node is stealing one task from another back pressured worker node.
+   */
+  backPressureStealing: boolean
   /**
    * Back pressure flag.
    * This flag is set to `true` when worker node tasks queue has back pressure.
index 2f5e643731da6088d2058d5d5a77dace18770358..5b4f0246e9d234f59b44156317987fdd4bd010fc 100644 (file)
@@ -874,6 +874,7 @@ describe('Abstract pool test suite', () => {
         stealing: false,
         stolen: false,
         continuousStealing: false,
+        backPressureStealing: false,
         backPressure: false,
       })
     }
@@ -893,6 +894,7 @@ describe('Abstract pool test suite', () => {
         stealing: false,
         stolen: false,
         continuousStealing: false,
+        backPressureStealing: false,
         backPressure: false,
       })
     }
index c27339ac4a862f5d5d8eca41cd83730cc69b979f..69f1e15afc67c2760f71e24516202bbbb814195e 100644 (file)
@@ -243,6 +243,7 @@ describe('Worker node test suite', () => {
       stealing: false,
       stolen: false,
       continuousStealing: false,
+      backPressureStealing: false,
       backPressure: false,
     })
     expect(threadWorkerNode.usage).toStrictEqual({
@@ -304,6 +305,7 @@ describe('Worker node test suite', () => {
       stealing: false,
       stolen: false,
       continuousStealing: false,
+      backPressureStealing: false,
       backPressure: false,
     })
     expect(clusterWorkerNode.usage).toStrictEqual({