fix: fix busy worker nodes computation with tasks queing
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Wed, 13 Dec 2023 23:51:39 +0000 (00:51 +0100)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Wed, 13 Dec 2023 23:51:39 +0000 (00:51 +0100)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
src/pools/abstract-pool.ts

index da0eb335b26f41092fb57560d255b5b0b624ad6e..9b0ec92f3df1f9a2d5cdec26608cde6595a9cf96 100644 (file)
@@ -306,8 +306,8 @@ export abstract class AbstractPool<
         0
       ),
       busyWorkerNodes: this.workerNodes.reduce(
-        (accumulator, workerNode) =>
-          workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
+        (accumulator, _workerNode, workerNodeKey) =>
+          this.isWorkerNodeBusy(workerNodeKey) ? accumulator + 1 : accumulator,
         0
       ),
       executedTasks: this.workerNodes.reduce(
@@ -707,6 +707,16 @@ export abstract class AbstractPool<
     )
   }
 
+  private isWorkerNodeBusy (workerNodeKey: number): boolean {
+    if (this.opts.enableTasksQueue === true) {
+      return (
+        this.workerNodes[workerNodeKey].usage.tasks.executing >=
+        (this.opts.tasksQueueOptions?.concurrency as number)
+      )
+    }
+    return this.workerNodes[workerNodeKey].usage.tasks.executing > 0
+  }
+
   private async sendTaskFunctionOperationToWorker (
     workerNodeKey: number,
     message: MessageValue<Data>
@@ -1451,6 +1461,14 @@ export abstract class AbstractPool<
     })
   }
 
+  private handleTask (workerNodeKey: number, task: Task<Data>): void {
+    if (this.shallExecuteTask(workerNodeKey)) {
+      this.executeTask(workerNodeKey, task)
+    } else {
+      this.enqueueTask(workerNodeKey, task)
+    }
+  }
+
   private redistributeQueuedTasks (workerNodeKey: number): void {
     if (this.workerNodes.length <= 1) {
       return
@@ -1466,12 +1484,10 @@ export abstract class AbstractPool<
         },
         0
       )
-      const task = this.dequeueTask(workerNodeKey) as Task<Data>
-      if (this.shallExecuteTask(destinationWorkerNodeKey)) {
-        this.executeTask(destinationWorkerNodeKey, task)
-      } else {
-        this.enqueueTask(destinationWorkerNodeKey, task)
-      }
+      this.handleTask(
+        destinationWorkerNodeKey,
+        this.dequeueTask(workerNodeKey) as Task<Data>
+      )
     }
   }
 
@@ -1625,11 +1641,7 @@ export abstract class AbstractPool<
     )
     if (sourceWorkerNode != null) {
       const task = sourceWorkerNode.popTask() as Task<Data>
-      if (this.shallExecuteTask(workerNodeKey)) {
-        this.executeTask(workerNodeKey, task)
-      } else {
-        this.enqueueTask(workerNodeKey, task)
-      }
+      this.handleTask(workerNodeKey, task)
       this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
       this.updateTaskStolenStatisticsWorkerUsage(
         workerNodeKey,
@@ -1667,11 +1679,7 @@ export abstract class AbstractPool<
           (this.opts.tasksQueueOptions?.size as number) - sizeOffset
       ) {
         const task = sourceWorkerNode.popTask() as Task<Data>
-        if (this.shallExecuteTask(workerNodeKey)) {
-          this.executeTask(workerNodeKey, task)
-        } else {
-          this.enqueueTask(workerNodeKey, task)
-        }
+        this.handleTask(workerNodeKey, task)
         this.updateTaskStolenStatisticsWorkerUsage(
           workerNodeKey,
           task.name as string