fix: fix worker node cross tasks stealing
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Sun, 24 Dec 2023 11:08:02 +0000 (12:08 +0100)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Sun, 24 Dec 2023 11:08:02 +0000 (12:08 +0100)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
CHANGELOG.md
src/pools/abstract-pool.ts
src/pools/pool.ts
src/pools/worker-node.ts
src/pools/worker.ts
tests/pools/abstract-pool.test.mjs
tests/pools/worker-node.test.mjs
tests/worker/cluster-worker.test.mjs
tests/worker/thread-worker.test.mjs

index 188ce7a3ba265e6f70fba47df59e5a33f04809f5..0511b41987444533fc61ab93fcfdc0f8be7afa17 100644 (file)
@@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
 
 ## [Unreleased]
 
+### Fixed
+
+- Avoid worker node cross tasks stealing.
+- Ensure only half the pool worker nodes can steal tasks.
+
 ## [3.1.10] - 2023-12-23
 
 ### Changed
index cfef89e32b568da13e28a5f4fdb77230dccaefb6..c8d4551167d28d0bcf64cf3e7afb082dff53841d 100644 (file)
@@ -298,6 +298,13 @@ export abstract class AbstractPool<
             : accumulator,
         0
       ),
+      ...(this.opts.enableTasksQueue === true && {
+        stealingWorkerNodes: this.workerNodes.reduce(
+          (accumulator, workerNode) =>
+            workerNode.info.stealing ? accumulator + 1 : accumulator,
+          0
+        )
+      }),
       busyWorkerNodes: this.workerNodes.reduce(
         (accumulator, _workerNode, workerNodeKey) =>
           this.isWorkerNodeBusy(workerNodeKey) ? accumulator + 1 : accumulator,
@@ -1397,6 +1404,10 @@ export abstract class AbstractPool<
     })
   }
 
+  private cannotStealTask (): boolean {
+    return this.workerNodes.length <= 1 || this.info.queuedTasks === 0
+  }
+
   private handleTask (workerNodeKey: number, task: Task<Data>): void {
     if (this.shallExecuteTask(workerNodeKey)) {
       this.executeTask(workerNodeKey, task)
@@ -1409,7 +1420,7 @@ export abstract class AbstractPool<
     if (workerNodeKey === -1) {
       return
     }
-    if (this.workerNodes.length <= 1) {
+    if (this.cannotStealTask()) {
       return
     }
     while (this.tasksQueueSize(workerNodeKey) > 0) {
@@ -1503,15 +1514,22 @@ export abstract class AbstractPool<
     eventDetail: WorkerNodeEventDetail,
     previousStolenTask?: Task<Data>
   ): void => {
-    if (this.workerNodes.length <= 1) {
-      return
-    }
     const { workerNodeKey } = eventDetail
     if (workerNodeKey == null) {
       throw new Error(
-        'WorkerNode event detail workerNodeKey attribute must be defined'
+        'WorkerNode event detail workerNodeKey property must be defined'
       )
     }
+    if (
+      this.cannotStealTask() ||
+      (this.info.stealingWorkerNodes as number) >
+        Math.floor(this.workerNodes.length / 2)
+    ) {
+      if (previousStolenTask != null) {
+        this.getWorkerInfo(workerNodeKey).stealing = false
+      }
+      return
+    }
     const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
     if (
       previousStolenTask != null &&
@@ -1519,6 +1537,7 @@ export abstract class AbstractPool<
       (workerNodeTasksUsage.executing > 0 ||
         this.tasksQueueSize(workerNodeKey) > 0)
     ) {
+      this.getWorkerInfo(workerNodeKey).stealing = false
       for (const taskName of this.workerNodes[workerNodeKey].info
         .taskFunctionNames as string[]) {
         this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
@@ -1529,6 +1548,7 @@ export abstract class AbstractPool<
       this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
       return
     }
+    this.getWorkerInfo(workerNodeKey).stealing = true
     const stolenTask = this.workerNodeStealTask(workerNodeKey)
     if (
       this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
@@ -1575,6 +1595,7 @@ export abstract class AbstractPool<
     const sourceWorkerNode = workerNodes.find(
       (sourceWorkerNode, sourceWorkerNodeKey) =>
         sourceWorkerNode.info.ready &&
+        !sourceWorkerNode.info.stealing &&
         sourceWorkerNodeKey !== workerNodeKey &&
         sourceWorkerNode.usage.tasks.queued > 0
     )
@@ -1593,7 +1614,11 @@ export abstract class AbstractPool<
   private readonly handleBackPressureEvent = (
     eventDetail: WorkerNodeEventDetail
   ): void => {
-    if (this.workerNodes.length <= 1) {
+    if (
+      this.cannotStealTask() ||
+      (this.info.stealingWorkerNodes as number) >
+        Math.floor(this.workerNodes.length / 2)
+    ) {
       return
     }
     const { workerId } = eventDetail
@@ -1613,16 +1638,19 @@ export abstract class AbstractPool<
       if (
         sourceWorkerNode.usage.tasks.queued > 0 &&
         workerNode.info.ready &&
+        !workerNode.info.stealing &&
         workerNode.info.id !== workerId &&
         workerNode.usage.tasks.queued <
           (this.opts.tasksQueueOptions?.size as number) - sizeOffset
       ) {
+        this.getWorkerInfo(workerNodeKey).stealing = true
         const task = sourceWorkerNode.popTask() as Task<Data>
         this.handleTask(workerNodeKey, task)
         this.updateTaskStolenStatisticsWorkerUsage(
           workerNodeKey,
           task.name as string
         )
+        this.getWorkerInfo(workerNodeKey).stealing = false
       }
     }
   }
index 6058a3dea2aa9c0d55ce5f65bba695406a8875c9..08aad620f5cde2ef28892e43e47f99ba545e6707 100644 (file)
@@ -69,6 +69,8 @@ export interface PoolInfo {
   readonly utilization?: number
   /** Pool total worker nodes. */
   readonly workerNodes: number
+  /** Pool stealing worker nodes. */
+  readonly stealingWorkerNodes?: number
   /** Pool idle worker nodes. */
   readonly idleWorkerNodes: number
   /** Pool busy worker nodes. */
index c9ff0e7099a8477f3de03708459e4d485cc1f8f2..d2397676bb77235279b72748e7e449c97ed0ca13 100644 (file)
@@ -214,7 +214,8 @@ export class WorkerNode<Worker extends IWorker, Data = unknown>
       id: getWorkerId(worker),
       type: getWorkerType(worker) as WorkerType,
       dynamic: false,
-      ready: false
+      ready: false,
+      stealing: false
     }
   }
 
index 9a27d8a5056bb7b4b222f2d210803699ab95ad0d..9c8b77fc623b68ed2a25aed883531dfa5e25cf70 100644 (file)
@@ -154,6 +154,11 @@ export interface WorkerInfo {
    * Ready flag.
    */
   ready: boolean
+  /**
+   * Stealing flag.
+   * This flag is set to `true` when worker node is stealing tasks from another worker node.
+   */
+  stealing: boolean
   /**
    * Task function names.
    */
index 1e6bf3adb012de185bfb7d69287126f534b9c953..cf4b55afc26a6a1278f886a0d4c2762b1c230f07 100644 (file)
@@ -882,7 +882,8 @@ describe('Abstract pool test suite', () => {
         id: expect.any(Number),
         type: WorkerTypes.cluster,
         dynamic: false,
-        ready: true
+        ready: true,
+        stealing: false
       })
     }
     await pool.destroy()
@@ -897,7 +898,8 @@ describe('Abstract pool test suite', () => {
         id: expect.any(Number),
         type: WorkerTypes.thread,
         dynamic: false,
-        ready: true
+        ready: true,
+        stealing: false
       })
     }
     await pool.destroy()
@@ -1269,6 +1271,7 @@ describe('Abstract pool test suite', () => {
       maxSize: expect.any(Number),
       workerNodes: expect.any(Number),
       idleWorkerNodes: expect.any(Number),
+      stealingWorkerNodes: expect.any(Number),
       busyWorkerNodes: expect.any(Number),
       executedTasks: expect.any(Number),
       executingTasks: expect.any(Number),
@@ -1278,7 +1281,7 @@ describe('Abstract pool test suite', () => {
       stolenTasks: expect.any(Number),
       failedTasks: expect.any(Number)
     })
-    expect(pool.hasBackPressure.callCount).toBe(5)
+    expect(pool.hasBackPressure.callCount).toBeGreaterThanOrEqual(7)
     await pool.destroy()
   })
 
index 3b87a8485d39e0547707197c8667238ec35a0265..f22ea153f22f00e9af20576431efcae57499eb07 100644 (file)
@@ -124,7 +124,8 @@ describe('Worker node test suite', () => {
       id: threadWorkerNode.worker.threadId,
       type: WorkerTypes.thread,
       dynamic: false,
-      ready: false
+      ready: false,
+      stealing: false
     })
     expect(threadWorkerNode.usage).toStrictEqual({
       tasks: {
@@ -167,7 +168,8 @@ describe('Worker node test suite', () => {
       id: clusterWorkerNode.worker.id,
       type: WorkerTypes.cluster,
       dynamic: false,
-      ready: false
+      ready: false,
+      stealing: false
     })
     expect(clusterWorkerNode.usage).toStrictEqual({
       tasks: {
index 8ceb403ddffbf13b2720be8c215fe73bb4c94903..b785010ec5cf7b61999852c53658091abba0e5f5 100644 (file)
@@ -4,13 +4,6 @@ import { ClusterWorker } from '../../lib/index.js'
 import { DEFAULT_TASK_NAME } from '../../lib/utils.js'
 
 describe('Cluster worker test suite', () => {
-  const sendStub = stub().returns()
-  class SpyWorker extends ClusterWorker {
-    getMainWorker () {
-      return { send: sendStub }
-    }
-  }
-
   afterEach(() => {
     restore()
   })
@@ -25,6 +18,7 @@ describe('Cluster worker test suite', () => {
       send: stub().returns()
     })
     worker.handleKillMessage()
+    expect(worker.getMainWorker.calledTwice).toBe(true)
     expect(worker.getMainWorker().send.calledOnce).toBe(true)
     expect(worker.opts.killHandler.calledOnce).toBe(true)
   })
@@ -37,6 +31,10 @@ describe('Cluster worker test suite', () => {
       return 2
     }
     const worker = new ClusterWorker({ fn1, fn2 })
+    worker.getMainWorker = stub().returns({
+      id: 1,
+      send: stub().returns()
+    })
     expect(worker.removeTaskFunction(0, fn1)).toStrictEqual({
       status: false,
       error: new TypeError('name parameter is not a string')
@@ -45,10 +43,6 @@ describe('Cluster worker test suite', () => {
       status: false,
       error: new TypeError('name parameter is an empty string')
     })
-    worker.getMainWorker = stub().returns({
-      id: 1,
-      send: stub().returns()
-    })
     expect(worker.taskFunctions.get(DEFAULT_TASK_NAME)).toBeInstanceOf(Function)
     expect(worker.taskFunctions.get('fn1')).toBeInstanceOf(Function)
     expect(worker.taskFunctions.get('fn2')).toBeInstanceOf(Function)
@@ -73,6 +67,7 @@ describe('Cluster worker test suite', () => {
     expect(worker.taskFunctions.get('fn1')).toBeInstanceOf(Function)
     expect(worker.taskFunctions.get('fn2')).toBeUndefined()
     expect(worker.taskFunctions.size).toBe(2)
+    expect(worker.getMainWorker.calledTwice).toBe(true)
     expect(worker.getMainWorker().send.calledOnce).toBe(true)
   })
 
@@ -85,9 +80,13 @@ describe('Cluster worker test suite', () => {
     expect(worker.handleError(errorMessage)).toStrictEqual(errorMessage)
   })
 
-  it('Verify worker invokes the getMainWorker() and send() methods', () => {
-    const worker = new SpyWorker(() => {})
+  it('Verify that sendToMainWorker() method invokes the getMainWorker() and send() methods', () => {
+    const worker = new ClusterWorker(() => {})
+    worker.getMainWorker = stub().returns({
+      send: stub().returns()
+    })
     worker.sendToMainWorker({ ok: 1 })
+    expect(worker.getMainWorker.calledTwice).toBe(true)
     expect(worker.getMainWorker().send.calledOnce).toBe(true)
   })
 })
index 26da45e12bbe1ac63e3bfa169564faf7d59abcff..81b096f2b8b308812089cdebfb234e15954199ba 100644 (file)
@@ -4,13 +4,6 @@ import { ThreadWorker } from '../../lib/index.js'
 import { DEFAULT_TASK_NAME } from '../../lib/utils.js'
 
 describe('Thread worker test suite', () => {
-  class SpyWorker extends ThreadWorker {
-    constructor (fn) {
-      super(fn)
-      this.port = { postMessage: stub().returns() }
-    }
-  }
-
   afterEach(() => {
     restore()
   })
@@ -40,6 +33,9 @@ describe('Thread worker test suite', () => {
       return 2
     }
     const worker = new ThreadWorker({ fn1, fn2 })
+    worker.port = {
+      postMessage: stub().returns()
+    }
     expect(worker.removeTaskFunction(0, fn1)).toStrictEqual({
       status: false,
       error: new TypeError('name parameter is not a string')
@@ -48,9 +44,6 @@ describe('Thread worker test suite', () => {
       status: false,
       error: new TypeError('name parameter is an empty string')
     })
-    worker.port = {
-      postMessage: stub().returns()
-    }
     expect(worker.taskFunctions.get(DEFAULT_TASK_NAME)).toBeInstanceOf(Function)
     expect(worker.taskFunctions.get('fn1')).toBeInstanceOf(Function)
     expect(worker.taskFunctions.get('fn2')).toBeInstanceOf(Function)
@@ -87,8 +80,9 @@ describe('Thread worker test suite', () => {
     expect(worker.handleError(errorMessage)).toStrictEqual(errorMessage)
   })
 
-  it('Verify worker invokes the postMessage() method on port property', () => {
-    const worker = new SpyWorker(() => {})
+  it('Verify that sendToMainWorker() method invokes the port property postMessage() method', () => {
+    const worker = new ThreadWorker(() => {})
+    worker.port = { postMessage: stub().returns() }
     worker.sendToMainWorker({ ok: 1 })
     expect(worker.port.postMessage.calledOnce).toBe(true)
   })