refactor: factor out dynamic worker creation
authorJérôme Benoit <jerome.benoit@sap.com>
Sun, 11 Jun 2023 19:09:48 +0000 (21:09 +0200)
committerJérôme Benoit <jerome.benoit@sap.com>
Sun, 11 Jun 2023 19:09:48 +0000 (21:09 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
src/pools/abstract-pool.ts

index db7c83ee2fed1ec558b6ee9eee5b7b8202368085..c3565446f254c0292978f1f3eed1c23c1b1ac267 100644 (file)
@@ -614,28 +614,14 @@ export abstract class AbstractPool<
    * @returns The worker node key
    */
   protected chooseWorkerNode (): number {
-    let workerNodeKey: number
-    if (this.type === PoolTypes.dynamic && !this.full && this.internalBusy()) {
-      const workerCreated = this.createAndSetupWorker()
-      this.registerWorkerMessageListener(workerCreated, message => {
-        const currentWorkerNodeKey = this.getWorkerNodeKey(workerCreated)
-        if (
-          isKillBehavior(KillBehaviors.HARD, message.kill) ||
-          (message.kill != null &&
-            this.workerNodes[currentWorkerNodeKey].workerUsage.tasks
-              .executing === 0)
-        ) {
-          // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
-          this.flushTasksQueue(currentWorkerNodeKey)
-          // FIXME: wait for tasks to be finished
-          void (this.destroyWorker(workerCreated) as Promise<void>)
-        }
-      })
-      workerNodeKey = this.getWorkerNodeKey(workerCreated)
-    } else {
-      workerNodeKey = this.workerChoiceStrategyContext.execute()
+    if (this.shallCreateDynamicWorker()) {
+      return this.getWorkerNodeKey(this.createAndSetupDynamicWorker())
     }
-    return workerNodeKey
+    return this.workerChoiceStrategyContext.execute()
+  }
+
+  protected shallCreateDynamicWorker (): boolean {
+    return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
   }
 
   /**
@@ -706,6 +692,30 @@ export abstract class AbstractPool<
     return worker
   }
 
+  /**
+   * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
+   *
+   * @returns New, completely set up dynamic worker.
+   */
+  protected createAndSetupDynamicWorker (): Worker {
+    const worker = this.createAndSetupWorker()
+    this.registerWorkerMessageListener(worker, message => {
+      const currentWorkerNodeKey = this.getWorkerNodeKey(worker)
+      if (
+        isKillBehavior(KillBehaviors.HARD, message.kill) ||
+        (message.kill != null &&
+          this.workerNodes[currentWorkerNodeKey].workerUsage.tasks.executing ===
+            0)
+      ) {
+        // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
+        this.flushTasksQueue(currentWorkerNodeKey)
+        // FIXME: wait for tasks to be finished
+        void (this.destroyWorker(worker) as Promise<void>)
+      }
+    })
+    return worker
+  }
+
   /**
    * This function is the listener registered for each worker message.
    *