fix: protect worker node tasks queue from concurrent tasks stealing
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Tue, 13 Aug 2024 15:43:09 +0000 (17:43 +0200)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Tue, 13 Aug 2024 15:43:09 +0000 (17:43 +0200)
The locking primitive is a mutex implemented with cheap boolean checks.
In a monothreaded JS runtime, it should be enough.

Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
src/pools/abstract-pool.ts
src/pools/pool.ts
src/pools/worker-node.ts
src/pools/worker.ts
tests/pools/abstract-pool.test.mjs
tests/pools/worker-node.test.mjs

index 095c08a67f886e0ed3745b19bc8ae4ce33714acd..4c387477fac2d4ac5a39f027c3d2c069b2829095 100644 (file)
@@ -325,6 +325,13 @@ export abstract class AbstractPool<
           0
         ),
       }),
+      ...(this.opts.enableTasksQueue === true && {
+        stolenWorkerNodes: this.workerNodes.reduce(
+          (accumulator, workerNode) =>
+            workerNode.info.stolen ? accumulator + 1 : accumulator,
+          0
+        ),
+      }),
       busyWorkerNodes: this.workerNodes.reduce(
         (accumulator, _, workerNodeKey) =>
           this.isWorkerNodeBusy(workerNodeKey) ? accumulator + 1 : accumulator,
@@ -1649,10 +1656,10 @@ export abstract class AbstractPool<
           ((this.opts.enableTasksQueue === false &&
             workerUsage.tasks.executing === 0) ||
             (this.opts.enableTasksQueue === true &&
-              workerInfo != null &&
-              !workerInfo.stealing &&
               workerUsage.tasks.executing === 0 &&
-              this.tasksQueueSize(localWorkerNodeKey) === 0)))
+              this.tasksQueueSize(localWorkerNodeKey) === 0 &&
+              workerInfo != null &&
+              !workerInfo.stealing)))
       ) {
         // Flag the worker node as not ready immediately
         this.flagWorkerNodeAsNotReady(localWorkerNodeKey)
@@ -1913,7 +1920,6 @@ export abstract class AbstractPool<
         )
     ) {
       if (previousStolenTask != null) {
-        workerInfo.stealing = false
         this.resetTaskSequentiallyStolenStatisticsWorkerUsage(
           workerNodeKey,
           // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
@@ -1928,7 +1934,6 @@ export abstract class AbstractPool<
       (workerNodeTasksUsage.executing > 0 ||
         this.tasksQueueSize(workerNodeKey) > 0)
     ) {
-      workerInfo.stealing = false
       this.resetTaskSequentiallyStolenStatisticsWorkerUsage(
         workerNodeKey,
         // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
@@ -1936,7 +1941,6 @@ export abstract class AbstractPool<
       )
       return
     }
-    workerInfo.stealing = true
     const stolenTask = this.workerNodeStealTask(workerNodeKey)
     if (stolenTask != null) {
       this.updateTaskSequentiallyStolenStatisticsWorkerUsage(
@@ -1956,6 +1960,31 @@ export abstract class AbstractPool<
       })
   }
 
+  private readonly stealTask = (
+    sourceWorkerNode: IWorkerNode<Worker, Data>,
+    destinationWorkerNodeKey: number
+  ): Task<Data> | undefined => {
+    const destinationWorkerInfo = this.getWorkerInfo(destinationWorkerNodeKey)
+    if (destinationWorkerInfo == null) {
+      throw new Error(
+        `Worker node with key '${destinationWorkerNodeKey.toString()}' not found in pool`
+      )
+    }
+    destinationWorkerInfo.stealing = true
+    sourceWorkerNode.info.stolen = true
+    // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+    const task = sourceWorkerNode.dequeueLastPrioritizedTask()!
+    sourceWorkerNode.info.stolen = false
+    destinationWorkerInfo.stealing = false
+    this.handleTask(destinationWorkerNodeKey, task)
+    this.updateTaskStolenStatisticsWorkerUsage(
+      destinationWorkerNodeKey,
+      // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+      task.name!
+    )
+    return task
+  }
+
   private readonly workerNodeStealTask = (
     workerNodeKey: number
   ): Task<Data> | undefined => {
@@ -1968,17 +1997,13 @@ export abstract class AbstractPool<
     const sourceWorkerNode = workerNodes.find(
       (sourceWorkerNode, sourceWorkerNodeKey) =>
         sourceWorkerNode.info.ready &&
+        !sourceWorkerNode.info.stolen &&
         !sourceWorkerNode.info.stealing &&
         sourceWorkerNodeKey !== workerNodeKey &&
         sourceWorkerNode.usage.tasks.queued > 0
     )
     if (sourceWorkerNode != null) {
-      // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
-      const task = sourceWorkerNode.dequeueLastPrioritizedTask()!
-      this.handleTask(workerNodeKey, task)
-      // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
-      this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!)
-      return task
+      return this.stealTask(sourceWorkerNode, workerNodeKey)
     }
   }
 
@@ -2015,25 +2040,14 @@ export abstract class AbstractPool<
       if (
         sourceWorkerNode.usage.tasks.queued > 0 &&
         workerNode.info.ready &&
+        !workerNode.info.stolen &&
         !workerNode.info.stealing &&
         workerNode.info.id !== workerId &&
         workerNode.usage.tasks.queued <
           // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
           this.opts.tasksQueueOptions!.size! - sizeOffset
       ) {
-        const workerInfo = this.getWorkerInfo(workerNodeKey)
-        if (workerInfo == null) {
-          throw new Error(
-            `Worker node with key '${workerNodeKey.toString()}' not found in pool`
-          )
-        }
-        workerInfo.stealing = true
-        // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
-        const task = sourceWorkerNode.dequeueLastPrioritizedTask()!
-        this.handleTask(workerNodeKey, task)
-        // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
-        this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!)
-        workerInfo.stealing = false
+        this.stealTask(sourceWorkerNode, workerNodeKey)
       }
     }
   }
index 226c2fb4bbe33cdaa1b6ba195181ceb90db92779..42478ef31aaf953dd70dfbdd41d22d845ac20917 100644 (file)
@@ -90,6 +90,8 @@ export interface PoolInfo {
   readonly workerNodes: number
   /** Pool stealing worker nodes. */
   readonly stealingWorkerNodes?: number
+  /** Pool stolen worker nodes. */
+  readonly stolenWorkerNodes?: number
   /** Pool idle worker nodes. */
   readonly idleWorkerNodes: number
   /** Pool busy worker nodes. */
index b42d8493407696dfeb6ad81ba00ae4313995bfdb..c79fa3899ab8c9f10b23187066693cebf3689d5c 100644 (file)
@@ -220,6 +220,7 @@ export class WorkerNode<Worker extends IWorker, Data = unknown>
       dynamic: false,
       ready: false,
       stealing: false,
+      stolen: false,
       backPressure: false,
     }
   }
index 9f5fc0c18dd1d2f214dce86d63d65e88faa11642..956e1f0bd41b9de1b1d80dbb8209ad1eb0efa80f 100644 (file)
@@ -168,6 +168,11 @@ export interface WorkerInfo {
    * This flag is set to `true` when worker node is stealing tasks from another worker node.
    */
   stealing: boolean
+  /**
+   * Stolen flag.
+   * This flag is set to `true` when worker node has one task stolen from another worker node.
+   */
+  stolen: boolean
   /**
    * Back pressure flag.
    * This flag is set to `true` when worker node tasks queue has back pressure.
index ed49bbcab5bf329dcf5ead3ba17eb08319ecebd8..f8b983d41bda566cc60aed37a19acbb7f462561a 100644 (file)
@@ -872,6 +872,7 @@ describe('Abstract pool test suite', () => {
         dynamic: false,
         ready: true,
         stealing: false,
+        stolen: false,
         backPressure: false,
       })
     }
@@ -889,6 +890,7 @@ describe('Abstract pool test suite', () => {
         dynamic: false,
         ready: true,
         stealing: false,
+        stolen: false,
         backPressure: false,
       })
     }
@@ -1262,6 +1264,7 @@ describe('Abstract pool test suite', () => {
       workerNodes: expect.any(Number),
       idleWorkerNodes: expect.any(Number),
       stealingWorkerNodes: expect.any(Number),
+      stolenWorkerNodes: expect.any(Number),
       busyWorkerNodes: expect.any(Number),
       executedTasks: expect.any(Number),
       executingTasks: expect.any(Number),
index 4fea0e437a9e12774b408ea8e420b09d37aceef9..1ebe0caf0acd555fcddca31fc038d6bb6f73ae8e 100644 (file)
@@ -241,6 +241,7 @@ describe('Worker node test suite', () => {
       dynamic: false,
       ready: false,
       stealing: false,
+      stolen: false,
       backPressure: false,
     })
     expect(threadWorkerNode.usage).toStrictEqual({
@@ -300,6 +301,7 @@ describe('Worker node test suite', () => {
       dynamic: false,
       ready: false,
       stealing: false,
+      stolen: false,
       backPressure: false,
     })
     expect(clusterWorkerNode.usage).toStrictEqual({