refactor: cleanup worker node state conditions check
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Sun, 25 Aug 2024 23:35:02 +0000 (01:35 +0200)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Sun, 25 Aug 2024 23:35:02 +0000 (01:35 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
src/pools/abstract-pool.ts

index 43880fb8da888e4f20ed8b85db0f534ded5c1866..6bee0367b3756b36a8fd0ea450ee60310576b27e 100644 (file)
@@ -302,8 +302,7 @@ export abstract class AbstractPool<
     const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
     if (
       workerNodeInfo.continuousStealing &&
-      (workerNodeTasksUsage.executing > 0 ||
-        this.tasksQueueSize(workerNodeKey) > 0)
+      !this.isWorkerNodeIdle(workerNodeKey)
     ) {
       workerNodeInfo.continuousStealing = false
       if (workerNodeTasksUsage.sequentiallyStolen > 0) {
@@ -802,22 +801,12 @@ export abstract class AbstractPool<
    * @returns Worker nodes busyness boolean status.
    */
   protected internalBusy (): boolean {
-    if (this.opts.enableTasksQueue === true) {
-      return (
-        this.workerNodes.findIndex(
-          workerNode =>
-            workerNode.info.ready &&
-            workerNode.usage.tasks.executing <
-              // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
-              this.opts.tasksQueueOptions!.concurrency!
-        ) === -1
-      )
-    }
     return (
-      this.workerNodes.findIndex(
-        workerNode =>
-          workerNode.info.ready && workerNode.usage.tasks.executing === 0
-      ) === -1
+      this.workerNodes.reduce(
+        (accumulator, _, workerNodeKey) =>
+          this.isWorkerNodeIdle(workerNodeKey) ? accumulator + 1 : accumulator,
+        0
+      ) === 0
     )
   }
 
@@ -1166,12 +1155,9 @@ export abstract class AbstractPool<
       // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
       this.promiseResponseMap.delete(taskId!)
       if (this.opts.enableTasksQueue === true && !this.destroying) {
-        const workerNodeTasksUsage = workerNode.usage.tasks
         if (
-          this.tasksQueueSize(workerNodeKey) > 0 &&
-          workerNodeTasksUsage.executing <
-            // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
-            this.opts.tasksQueueOptions!.concurrency!
+          !this.isWorkerNodeBusy(workerNodeKey) &&
+          this.tasksQueueSize(workerNodeKey) > 0
         ) {
           // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
           this.executeTask(workerNodeKey, this.dequeueTask(workerNodeKey)!)
@@ -1305,22 +1291,30 @@ export abstract class AbstractPool<
   private isWorkerNodeBusy (workerNodeKey: number): boolean {
     if (this.opts.enableTasksQueue === true) {
       return (
+        this.workerNodes[workerNodeKey].info.ready &&
         this.workerNodes[workerNodeKey].usage.tasks.executing >=
-        // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
-        this.opts.tasksQueueOptions!.concurrency!
+          // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+          this.opts.tasksQueueOptions!.concurrency!
       )
     }
-    return this.workerNodes[workerNodeKey].usage.tasks.executing > 0
+    return (
+      this.workerNodes[workerNodeKey].info.ready &&
+      this.workerNodes[workerNodeKey].usage.tasks.executing > 0
+    )
   }
 
   private isWorkerNodeIdle (workerNodeKey: number): boolean {
     if (this.opts.enableTasksQueue === true) {
       return (
+        this.workerNodes[workerNodeKey].info.ready &&
         this.workerNodes[workerNodeKey].usage.tasks.executing === 0 &&
         this.tasksQueueSize(workerNodeKey) === 0
       )
     }
-    return this.workerNodes[workerNodeKey].usage.tasks.executing === 0
+    return (
+      this.workerNodes[workerNodeKey].info.ready &&
+      this.workerNodes[workerNodeKey].usage.tasks.executing === 0
+    )
   }
 
   private redistributeQueuedTasks (sourceWorkerNodeKey: number): void {
@@ -2295,9 +2289,6 @@ export abstract class AbstractPool<
     if (!this.started) {
       return false
     }
-    if (this.empty) {
-      return true
-    }
     return (
       this.workerNodes.reduce(
         (accumulator, workerNode) =>