fix: ensure task stealing can't start twice on the same worker node
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Wed, 14 Aug 2024 11:55:01 +0000 (13:55 +0200)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Wed, 14 Aug 2024 11:55:01 +0000 (13:55 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
src/pools/abstract-pool.ts
src/pools/pool.ts
src/pools/worker-node.ts
src/pools/worker.ts
tests/pools/abstract-pool.test.mjs
tests/pools/worker-node.test.mjs

index a45eef05ed191bda2f66889db0f4ea713b48541f..61f01a6ec684dcd9199b72fc8d900fd473f6ef4f 100644 (file)
@@ -321,14 +321,7 @@ export abstract class AbstractPool<
       ...(this.opts.enableTasksQueue === true && {
         stealingWorkerNodes: this.workerNodes.reduce(
           (accumulator, workerNode) =>
-            workerNode.info.stealing ? accumulator + 1 : accumulator,
-          0
-        ),
-      }),
-      ...(this.opts.enableTasksQueue === true && {
-        stolenWorkerNodes: this.workerNodes.reduce(
-          (accumulator, workerNode) =>
-            workerNode.info.stolen ? accumulator + 1 : accumulator,
+            workerNode.info.continuousStealing ? accumulator + 1 : accumulator,
           0
         ),
       }),
@@ -1675,7 +1668,7 @@ export abstract class AbstractPool<
               workerUsage.tasks.executing === 0 &&
               this.tasksQueueSize(localWorkerNodeKey) === 0 &&
               workerInfo != null &&
-              !workerInfo.stealing)))
+              !workerInfo.continuousStealing)))
       ) {
         // Flag the worker node as not ready immediately
         this.flagWorkerNodeAsNotReady(localWorkerNodeKey)
@@ -1862,15 +1855,16 @@ export abstract class AbstractPool<
 
   private updateTaskSequentiallyStolenStatisticsWorkerUsage (
     workerNodeKey: number,
-    taskName: string,
+    taskName?: string,
     previousTaskName?: string
   ): void {
     const workerNode = this.workerNodes[workerNodeKey]
     // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
-    if (workerNode?.usage != null) {
+    if (workerNode?.usage != null && taskName != null) {
       ++workerNode.usage.tasks.sequentiallyStolen
     }
     if (
+      taskName != null &&
       this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
       workerNode.getTaskFunctionWorkerUsage(taskName) != null
     ) {
@@ -1892,7 +1886,7 @@ export abstract class AbstractPool<
 
   private resetTaskSequentiallyStolenStatisticsWorkerUsage (
     workerNodeKey: number,
-    taskName: string
+    taskName?: string
   ): void {
     const workerNode = this.workerNodes[workerNodeKey]
     // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
@@ -1900,6 +1894,7 @@ export abstract class AbstractPool<
       workerNode.usage.tasks.sequentiallyStolen = 0
     }
     if (
+      taskName != null &&
       this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
       workerNode.getTaskFunctionWorkerUsage(taskName) != null
     ) {
@@ -1934,16 +1929,16 @@ export abstract class AbstractPool<
     destinationWorkerInfo.stealing = true
     sourceWorkerNode.info.stolen = true
     // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
-    const task = sourceWorkerNode.dequeueLastPrioritizedTask()!
+    const stolenTask = sourceWorkerNode.dequeueLastPrioritizedTask()!
     sourceWorkerNode.info.stolen = false
     destinationWorkerInfo.stealing = false
-    this.handleTask(destinationWorkerNodeKey, task)
+    this.handleTask(destinationWorkerNodeKey, stolenTask)
     this.updateTaskStolenStatisticsWorkerUsage(
       destinationWorkerNodeKey,
       // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
-      task.name!
+      stolenTask.name!
     )
-    return task
+    return stolenTask
   }
 
   private readonly handleWorkerNodeIdleEvent = (
@@ -1956,6 +1951,13 @@ export abstract class AbstractPool<
         "WorkerNode event detail 'workerNodeKey' property must be defined"
       )
     }
+    const workerNodeInfo = this.getWorkerInfo(workerNodeKey)
+    if (workerNodeInfo == null) {
+      throw new Error(
+        `Worker node with key '${workerNodeKey.toString()}' not found in pool`
+      )
+    }
+    const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
     if (
       this.cannotStealTask() ||
       (this.info.stealingWorkerNodes ?? 0) >
@@ -1965,37 +1967,36 @@ export abstract class AbstractPool<
             this.opts.tasksQueueOptions!.tasksStealingRatio!
         )
     ) {
-      if (previousStolenTask != null) {
+      workerNodeInfo.continuousStealing = false
+      if (workerNodeTasksUsage.sequentiallyStolen > 0) {
         this.resetTaskSequentiallyStolenStatisticsWorkerUsage(
           workerNodeKey,
-          // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
-          previousStolenTask.name!
+          previousStolenTask?.name
         )
       }
       return
     }
-    const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
     if (
-      previousStolenTask != null &&
-      (workerNodeTasksUsage.executing > 0 ||
-        this.tasksQueueSize(workerNodeKey) > 0)
+      workerNodeInfo.continuousStealing ||
+      workerNodeTasksUsage.executing > 0 ||
+      this.tasksQueueSize(workerNodeKey) > 0
     ) {
-      this.resetTaskSequentiallyStolenStatisticsWorkerUsage(
-        workerNodeKey,
-        // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
-        previousStolenTask.name!
-      )
+      workerNodeInfo.continuousStealing = false
+      if (workerNodeTasksUsage.sequentiallyStolen > 0) {
+        this.resetTaskSequentiallyStolenStatisticsWorkerUsage(
+          workerNodeKey,
+          previousStolenTask?.name
+        )
+      }
       return
     }
+    workerNodeInfo.continuousStealing = true
     const stolenTask = this.workerNodeStealTask(workerNodeKey)
-    if (stolenTask != null) {
-      this.updateTaskSequentiallyStolenStatisticsWorkerUsage(
-        workerNodeKey,
-        // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
-        stolenTask.name!,
-        previousStolenTask?.name
-      )
-    }
+    this.updateTaskSequentiallyStolenStatisticsWorkerUsage(
+      workerNodeKey,
+      stolenTask?.name,
+      previousStolenTask?.name
+    )
     sleep(exponentialDelay(workerNodeTasksUsage.sequentiallyStolen))
       .then(() => {
         this.handleWorkerNodeIdleEvent(eventDetail, stolenTask)
@@ -2123,7 +2124,7 @@ export abstract class AbstractPool<
   }
 
   private handleTaskExecutionResponse (message: MessageValue<Response>): void {
-    const { workerId, taskId, workerError, data } = message
+    const { taskId, workerError, data } = message
     // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
     const promiseResponse = this.promiseResponseMap.get(taskId!)
     if (promiseResponse != null) {
@@ -2147,8 +2148,7 @@ export abstract class AbstractPool<
       this.afterTaskExecutionHook(workerNodeKey, message)
       // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
       this.promiseResponseMap.delete(taskId!)
-      // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
-      workerNode?.emit('taskFinished', taskId)
+      workerNode.emit('taskFinished', taskId)
       if (
         this.opts.enableTasksQueue === true &&
         !this.destroying &&
@@ -2167,11 +2167,9 @@ export abstract class AbstractPool<
         }
         if (
           workerNodeTasksUsage.executing === 0 &&
-          this.tasksQueueSize(workerNodeKey) === 0 &&
-          workerNodeTasksUsage.sequentiallyStolen === 0
+          this.tasksQueueSize(workerNodeKey) === 0
         ) {
           workerNode.emit('idle', {
-            workerId,
             workerNodeKey,
           })
         }
index 416d69616f94f4abd859d27a0efef11818aaaa08..02a0c795f03d15bd03a2aa4b59f52ab45fca0b64 100644 (file)
@@ -88,10 +88,8 @@ export interface PoolInfo {
   readonly utilization?: number
   /** Pool total worker nodes. */
   readonly workerNodes: number
-  /** Pool stealing worker nodes. */
+  /** Pool continuous stealing worker nodes. */
   readonly stealingWorkerNodes?: number
-  /** Pool stolen worker nodes. */
-  readonly stolenWorkerNodes?: number
   /** Pool idle worker nodes. */
   readonly idleWorkerNodes: number
   /** Pool busy worker nodes. */
index c79fa3899ab8c9f10b23187066693cebf3689d5c..dbc27fa5fad1e6258b38d72b67eaf7878567d4bd 100644 (file)
@@ -221,6 +221,7 @@ export class WorkerNode<Worker extends IWorker, Data = unknown>
       ready: false,
       stealing: false,
       stolen: false,
+      continuousStealing: false,
       backPressure: false,
     }
   }
index 956e1f0bd41b9de1b1d80dbb8209ad1eb0efa80f..a0742c2852ca77687e8e4b44fa14994a3126c38a 100644 (file)
@@ -165,7 +165,7 @@ export interface WorkerInfo {
   ready: boolean
   /**
    * Stealing flag.
-   * This flag is set to `true` when worker node is stealing tasks from another worker node.
+   * This flag is set to `true` when worker node is stealing one task from another worker node.
    */
   stealing: boolean
   /**
@@ -173,6 +173,11 @@ export interface WorkerInfo {
    * This flag is set to `true` when worker node has one task stolen from another worker node.
    */
   stolen: boolean
+  /**
+   * Continuous stealing flag.
+   * This flag is set to `true` when worker node continuously steal tasks from other worker nodes.
+   */
+  continuousStealing: boolean
   /**
    * Back pressure flag.
    * This flag is set to `true` when worker node tasks queue has back pressure.
index 04c87246ddd604b0107cb8aae1f1534b14acee0f..2f5e643731da6088d2058d5d5a77dace18770358 100644 (file)
@@ -873,6 +873,7 @@ describe('Abstract pool test suite', () => {
         ready: true,
         stealing: false,
         stolen: false,
+        continuousStealing: false,
         backPressure: false,
       })
     }
@@ -891,6 +892,7 @@ describe('Abstract pool test suite', () => {
         ready: true,
         stealing: false,
         stolen: false,
+        continuousStealing: false,
         backPressure: false,
       })
     }
@@ -1259,9 +1261,8 @@ describe('Abstract pool test suite', () => {
       minSize: expect.any(Number),
       maxSize: expect.any(Number),
       workerNodes: expect.any(Number),
-      idleWorkerNodes: expect.any(Number),
       stealingWorkerNodes: expect.any(Number),
-      stolenWorkerNodes: expect.any(Number),
+      idleWorkerNodes: expect.any(Number),
       busyWorkerNodes: expect.any(Number),
       executedTasks: expect.any(Number),
       executingTasks: expect.any(Number),
index 1ebe0caf0acd555fcddca31fc038d6bb6f73ae8e..c27339ac4a862f5d5d8eca41cd83730cc69b979f 100644 (file)
@@ -242,6 +242,7 @@ describe('Worker node test suite', () => {
       ready: false,
       stealing: false,
       stolen: false,
+      continuousStealing: false,
       backPressure: false,
     })
     expect(threadWorkerNode.usage).toStrictEqual({
@@ -302,6 +303,7 @@ describe('Worker node test suite', () => {
       ready: false,
       stealing: false,
       stolen: false,
+      continuousStealing: false,
       backPressure: false,
     })
     expect(clusterWorkerNode.usage).toStrictEqual({