fix: fix task stealing
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Wed, 23 Aug 2023 22:03:23 +0000 (00:03 +0200)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Wed, 23 Aug 2023 22:03:23 +0000 (00:03 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
CHANGELOG.md
README.md
src/pools/abstract-pool.ts
tests/pools/cluster/fixed.test.js
tests/pools/thread/fixed.test.js

index 18942cb66f0d5be88d079e96229062319fd1688a..18d9e42e6cc651e5ff863ceaafd94e8d47c4517c 100644 (file)
@@ -15,9 +15,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
 
 - Rename tasks queue options `queueMaxSize` to `size`.
 
-<!-- ### Added
+### Added
 
-- Task stealing scheduling algorithm if tasks queueing is enabled. -->
+- Task stealing scheduling algorithm if tasks queueing is enabled.
 
 ## [2.6.32] - 2023-08-23
 
index 08de7a855ed1a7ccf607c52792d0cd09baa96552..47a680d039609442639275395e62b5b699a540cb 100644 (file)
--- a/README.md
+++ b/README.md
@@ -47,7 +47,7 @@ Please consult our [general guidelines](#general-guidelines).
 - Tasks distribution strategies :white_check_mark:
 - Lockless tasks queueing :white_check_mark:
 - Queued tasks rescheduling:
-  <!-- - Task stealing :white_check_mark: -->
+  - Task stealing :white_check_mark:
   - Tasks stealing under back pressure :white_check_mark:
   - Tasks redistribution on worker error :white_check_mark:
 - General guidelines on pool choice :white_check_mark:
index ee291ff9aede6ac3c89feef5451271c8dbea58ab..c6712d198bc28a9d77c08ae817e5c96c3b9227c6 100644 (file)
@@ -1156,8 +1156,8 @@ export abstract class AbstractPool<
     // Send the statistics message to worker.
     this.sendStatisticsMessageToWorker(workerNodeKey)
     if (this.opts.enableTasksQueue === true) {
-      // this.workerNodes[workerNodeKey].onEmptyQueue =
-      //   this.taskStealingOnEmptyQueue.bind(this)
+      this.workerNodes[workerNodeKey].onEmptyQueue =
+        this.taskStealingOnEmptyQueue.bind(this)
       this.workerNodes[workerNodeKey].onBackPressure =
         this.tasksStealingOnBackPressure.bind(this)
     }
@@ -1189,26 +1189,32 @@ export abstract class AbstractPool<
   }
 
   private redistributeQueuedTasks (workerNodeKey: number): void {
-    const workerNodes = this.workerNodes.filter(
-      (workerNode, workerNodeId) =>
-        workerNode.info.ready && workerNodeId !== workerNodeKey
-    )
     while (this.tasksQueueSize(workerNodeKey) > 0) {
       let destinationWorkerNodeKey: number = workerNodeKey
       let minQueuedTasks = Infinity
       let executeTask = false
-      for (const [workerNodeId, workerNode] of workerNodes.entries()) {
+      for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
         if (
+          workerNode.info.ready &&
+          workerNodeId !== workerNodeKey &&
           workerNode.usage.tasks.executing <
-          (this.opts.tasksQueueOptions?.concurrency as number)
+            (this.opts.tasksQueueOptions?.concurrency as number)
         ) {
           executeTask = true
         }
-        if (workerNode.usage.tasks.queued === 0) {
+        if (
+          workerNode.info.ready &&
+          workerNodeId !== workerNodeKey &&
+          workerNode.usage.tasks.queued === 0
+        ) {
           destinationWorkerNodeKey = workerNodeId
           break
         }
-        if (workerNode.usage.tasks.queued < minQueuedTasks) {
+        if (
+          workerNode.info.ready &&
+          workerNodeId !== workerNodeKey &&
+          workerNode.usage.tasks.queued < minQueuedTasks
+        ) {
           minQueuedTasks = workerNode.usage.tasks.queued
           destinationWorkerNodeKey = workerNodeId
         }
@@ -1227,32 +1233,30 @@ export abstract class AbstractPool<
   }
 
   private taskStealingOnEmptyQueue (workerId: number): void {
+    const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
+    const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
     const workerNodes = this.workerNodes
-      .filter(
-        (workerNode) => workerNode.info.ready && workerNode.info.id !== workerId
-      )
+      .slice()
       .sort(
         (workerNodeA, workerNodeB) =>
           workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
       )
-    const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
-    const destinationWorkerNode = workerNodes[destinationWorkerNodeKey]
     for (const sourceWorkerNode of workerNodes) {
-      if (sourceWorkerNode.usage.tasks.queued > 0) {
+      if (
+        sourceWorkerNode.info.ready &&
+        sourceWorkerNode.info.id !== workerId &&
+        sourceWorkerNode.usage.tasks.queued > 0
+      ) {
+        const task = {
+          ...(sourceWorkerNode.popTask() as Task<Data>),
+          workerId: destinationWorkerNode.info.id as number
+        }
         if (
-          destinationWorkerNode?.usage?.tasks?.executing <
+          destinationWorkerNode.usage.tasks.executing <
           (this.opts.tasksQueueOptions?.concurrency as number)
         ) {
-          const task = {
-            ...(sourceWorkerNode.popTask() as Task<Data>),
-            workerId: destinationWorkerNode.info.id as number
-          }
           this.executeTask(destinationWorkerNodeKey, task)
         } else {
-          const task = {
-            ...(sourceWorkerNode.popTask() as Task<Data>),
-            workerId: destinationWorkerNode.info.id as number
-          }
           this.enqueueTask(destinationWorkerNodeKey, task)
         }
         break
@@ -1264,15 +1268,15 @@ export abstract class AbstractPool<
     const sourceWorkerNode =
       this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
     const workerNodes = this.workerNodes
-      .filter(
-        (workerNode) => workerNode.info.ready && workerNode.info.id !== workerId
-      )
+      .slice()
       .sort(
         (workerNodeA, workerNodeB) =>
           workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
       )
     for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
       if (
+        workerNode.info.ready &&
+        workerNode.info.id !== workerId &&
         sourceWorkerNode.usage.tasks.queued > 0 &&
         !workerNode.hasBackPressure()
       ) {
index a5790db075f400b3ff4ff93dd6c0a01a3beb62fd..900f0edc9d139a5289e4b54509f80629e60802dc 100644 (file)
@@ -139,7 +139,10 @@ describe('Fixed cluster pool test suite', () => {
     expect(queuePool.info.backPressure).toBe(false)
     await Promise.all(promises)
     for (const workerNode of queuePool.workerNodes) {
-      expect(workerNode.usage.tasks.executing).toBe(0)
+      expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
+      expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
+        numberOfWorkers * maxMultiplier
+      )
       expect(workerNode.usage.tasks.executed).toBe(maxMultiplier)
       expect(workerNode.usage.tasks.queued).toBe(0)
       expect(workerNode.usage.tasks.maxQueued).toBe(
index 4c2c6952944902647177af13756cec63e6f04b0f..8cd337ee6fa1ceb7109e42a7e55255f76690392c 100644 (file)
@@ -139,7 +139,10 @@ describe('Fixed thread pool test suite', () => {
     expect(queuePool.info.backPressure).toBe(false)
     await Promise.all(promises)
     for (const workerNode of queuePool.workerNodes) {
-      expect(workerNode.usage.tasks.executing).toBe(0)
+      expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
+      expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
+        numberOfThreads * maxMultiplier
+      )
       expect(workerNode.usage.tasks.executed).toBe(maxMultiplier)
       expect(workerNode.usage.tasks.queued).toBe(0)
       expect(workerNode.usage.tasks.maxQueued).toBe(