refactor: factor out code to redistribute queued tasks
[poolifier.git] / src / pools / abstract-pool.ts
index 88f2c9e9f9ff9318f0777fb0c4e065482b59fc4b..730c0dd226c087cb46d6b2f68eb6288ec6885c8f 100644 (file)
@@ -886,31 +886,7 @@ export abstract class AbstractPool<
         this.emitter.emit(PoolEvents.error, error)
       }
       if (this.opts.enableTasksQueue === true) {
-        const workerNodeKey = this.getWorkerNodeKey(worker)
-        while (this.tasksQueueSize(workerNodeKey) > 0) {
-          let targetWorkerNodeKey: number = workerNodeKey
-          let minQueuedTasks = Infinity
-          for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
-            if (
-              workerNodeId !== workerNodeKey &&
-              workerNode.usage.tasks.queued === 0
-            ) {
-              targetWorkerNodeKey = workerNodeId
-              break
-            }
-            if (
-              workerNodeId !== workerNodeKey &&
-              workerNode.usage.tasks.queued < minQueuedTasks
-            ) {
-              minQueuedTasks = workerNode.usage.tasks.queued
-              targetWorkerNodeKey = workerNodeId
-            }
-          }
-          this.enqueueTask(
-            targetWorkerNodeKey,
-            this.dequeueTask(workerNodeKey) as Task<Data>
-          )
-        }
+        this.redistributeQueuedTasks(worker)
       }
       if (this.opts.restartWorkerOnError === true) {
         if (this.getWorkerInfo(this.getWorkerNodeKey(worker)).dynamic) {
@@ -935,6 +911,34 @@ export abstract class AbstractPool<
     return worker
   }
 
+  private redistributeQueuedTasks (worker: Worker): void {
+    const workerNodeKey = this.getWorkerNodeKey(worker)
+    while (this.tasksQueueSize(workerNodeKey) > 0) {
+      let targetWorkerNodeKey: number = workerNodeKey
+      let minQueuedTasks = Infinity
+      for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
+        if (
+          workerNodeId !== workerNodeKey &&
+          workerNode.usage.tasks.queued === 0
+        ) {
+          targetWorkerNodeKey = workerNodeId
+          break
+        }
+        if (
+          workerNodeId !== workerNodeKey &&
+          workerNode.usage.tasks.queued < minQueuedTasks
+        ) {
+          minQueuedTasks = workerNode.usage.tasks.queued
+          targetWorkerNodeKey = workerNodeId
+        }
+      }
+      this.enqueueTask(
+        targetWorkerNodeKey,
+        this.dequeueTask(workerNodeKey) as Task<Data>
+      )
+    }
+  }
+
   /**
    * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
    *