refactor: cleanup tasks recheduling code
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Wed, 23 Aug 2023 12:31:29 +0000 (14:31 +0200)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Wed, 23 Aug 2023 12:31:29 +0000 (14:31 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
src/deque.ts
src/pools/abstract-pool.ts

index 49373cef998ec1afc7f27b63a678e1d3914796b4..4a763fc7c4b417ed56d5d5f65f6e96cbfa2503cd 100644 (file)
@@ -154,6 +154,12 @@ export class Deque<T> {
     }
   }
 
+  /**
+   * Returns an backward iterator for the deque.
+   *
+   * @returns An backward iterator for the deque.
+   * @see https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Iteration_protocols
+   */
   backward (): Iterable<T> {
     return {
       [Symbol.iterator]: (): Iterator<T> => {
index e61991f222ae98024f47038d8e211b420b8ed507..d1bb5bc4e0fa1c8fc2dc1f8d650f81efe66cfec8 100644 (file)
@@ -1180,27 +1180,25 @@ export abstract class AbstractPool<
   }
 
   private redistributeQueuedTasks (workerNodeKey: number): void {
+    const workerNodes = this.workerNodes.filter(
+      (_, workerNodeId) => workerNodeId !== workerNodeKey
+    )
     while (this.tasksQueueSize(workerNodeKey) > 0) {
       let targetWorkerNodeKey: number = workerNodeKey
       let minQueuedTasks = Infinity
       let executeTask = false
-      for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
+      for (const [workerNodeId, workerNode] of workerNodes.entries()) {
         if (
           this.workerNodes[workerNodeId].usage.tasks.executing <
           (this.opts.tasksQueueOptions?.concurrency as number)
         ) {
           executeTask = true
         }
-        if (
-          workerNodeId !== workerNodeKey &&
-          workerNode.info.ready &&
-          workerNode.usage.tasks.queued === 0
-        ) {
+        if (workerNode.info.ready && workerNode.usage.tasks.queued === 0) {
           targetWorkerNodeKey = workerNodeId
           break
         }
         if (
-          workerNodeId !== workerNodeKey &&
           workerNode.info.ready &&
           workerNode.usage.tasks.queued < minQueuedTasks
         ) {
@@ -1235,25 +1233,22 @@ export abstract class AbstractPool<
       if (
         workerNode.info.ready &&
         sourceWorkerNode.usage.tasks.queued > 0 &&
-        !workerNode.hasBackPressure() &&
-        workerNode.usage.tasks.executing <
-          (this.opts.tasksQueueOptions?.concurrency as number)
+        !workerNode.hasBackPressure()
       ) {
-        this.executeTask(
-          workerNodeKey,
-          sourceWorkerNode.popTask() as Task<Data>
-        )
-      } else if (
-        workerNode.info.ready &&
-        sourceWorkerNode.usage.tasks.queued > 0 &&
-        !workerNode.hasBackPressure() &&
-        workerNode.usage.tasks.executing >=
+        if (
+          workerNode.usage.tasks.executing <
           (this.opts.tasksQueueOptions?.concurrency as number)
-      ) {
-        this.enqueueTask(
-          workerNodeKey,
-          sourceWorkerNode.popTask() as Task<Data>
-        )
+        ) {
+          this.executeTask(
+            workerNodeKey,
+            sourceWorkerNode.popTask() as Task<Data>
+          )
+        } else {
+          this.enqueueTask(
+            workerNodeKey,
+            sourceWorkerNode.popTask() as Task<Data>
+          )
+        }
       }
     }
   }