fix: fix pool starting detection
[poolifier.git] / src / pools / abstract-pool.ts
index f1392edd5124e25421531fec9958a2fea610bf59..50ce2026d4f2e17f182f773227c2bc51f22de26f 100644 (file)
@@ -84,6 +84,10 @@ export abstract class AbstractPool<
   Response
   >
 
+  /**
+   * Whether the pool is starting.
+   */
+  private readonly starting: boolean
   /**
    * The start timestamp of the pool.
    */
@@ -128,9 +132,17 @@ export abstract class AbstractPool<
 
     this.setupHook()
 
-    while (this.workerNodes.length < this.numberOfWorkers) {
+    this.starting = true
+    while (
+      this.workerNodes.reduce(
+        (accumulator, workerNode) =>
+          !workerNode.info.dynamic ? accumulator + 1 : accumulator,
+        0
+      ) < this.numberOfWorkers
+    ) {
       this.createAndSetupWorker()
     }
+    this.starting = false
 
     this.startTimestamp = performance.now()
   }
@@ -172,9 +184,9 @@ export abstract class AbstractPool<
         throw new RangeError(
           'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
         )
-      } else if (min === 0 && max === 0) {
+      } else if (max === 0) {
         throw new RangeError(
-          'Cannot instantiate a dynamic pool with a minimum pool size and a maximum pool size equal to zero'
+          'Cannot instantiate a dynamic pool with a pool size equal to zero'
         )
       } else if (min === max) {
         throw new RangeError(
@@ -410,18 +422,15 @@ export abstract class AbstractPool<
     }
   }
 
-  private get starting (): boolean {
-    return (
-      this.workerNodes.length < this.minSize ||
-      (this.workerNodes.length >= this.minSize &&
-        this.workerNodes.some(workerNode => !workerNode.info.ready))
-    )
-  }
-
   private get ready (): boolean {
     return (
-      this.workerNodes.length >= this.minSize &&
-      this.workerNodes.every(workerNode => workerNode.info.ready)
+      this.workerNodes.reduce(
+        (accumulator, workerNode) =>
+          !workerNode.info.dynamic && workerNode.info.ready
+            ? accumulator + 1
+            : accumulator,
+        0
+      ) >= this.minSize
     )
   }
 
@@ -950,7 +959,7 @@ export abstract class AbstractPool<
       this.removeWorkerNode(worker)
     })
 
-    this.pushWorkerNode(worker)
+    this.addWorkerNode(worker)
 
     this.afterWorkerSetup(worker)
 
@@ -981,8 +990,11 @@ export abstract class AbstractPool<
     })
     const workerInfo = this.getWorkerInfo(this.getWorkerNodeKey(worker))
     workerInfo.dynamic = true
+    if (this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker) {
+      workerInfo.ready = true
+    }
     this.sendToWorker(worker, {
-      checkAlive: true,
+      checkActive: true,
       workerId: workerInfo.id as number
     })
     return worker
@@ -1011,12 +1023,16 @@ export abstract class AbstractPool<
     // Listen to worker messages.
     this.registerWorkerMessageListener(worker, this.workerListener())
     // Send startup message to worker.
+    this.sendWorkerStartupMessage(worker)
+    // Setup worker task statistics computation.
+    this.setWorkerStatistics(worker)
+  }
+
+  private sendWorkerStartupMessage (worker: Worker): void {
     this.sendToWorker(worker, {
       ready: false,
       workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number
     })
-    // Setup worker task statistics computation.
-    this.setWorkerStatistics(worker)
   }
 
   private redistributeQueuedTasks (workerNodeKey: number): void {
@@ -1057,9 +1073,9 @@ export abstract class AbstractPool<
   protected workerListener (): (message: MessageValue<Response>) => void {
     return message => {
       this.checkMessageWorkerId(message)
-      if (message.ready != null && message.workerId != null) {
-        // Worker ready message received
-        this.handleWorkerReadyMessage(message)
+      if (message.ready != null) {
+        // Worker ready response received
+        this.handleWorkerReadyResponse(message)
       } else if (message.id != null) {
         // Task execution response received
         this.handleTaskExecutionResponse(message)
@@ -1067,7 +1083,7 @@ export abstract class AbstractPool<
     }
   }
 
-  private handleWorkerReadyMessage (message: MessageValue<Response>): void {
+  private handleWorkerReadyResponse (message: MessageValue<Response>): void {
     const worker = this.getWorkerById(message.workerId)
     this.getWorkerInfo(this.getWorkerNodeKey(worker as Worker)).ready =
       message.ready as boolean
@@ -1124,13 +1140,18 @@ export abstract class AbstractPool<
   }
 
   /**
-   * Pushes the given worker in the pool worker nodes.
+   * Adds the given worker in the pool worker nodes.
    *
    * @param worker - The worker.
    * @returns The worker nodes length.
    */
-  private pushWorkerNode (worker: Worker): number {
-    return this.workerNodes.push(new WorkerNode(worker, this.worker))
+  private addWorkerNode (worker: Worker): number {
+    const workerNode = new WorkerNode<Worker, Data>(worker, this.worker)
+    // Flag the worker node as ready at pool startup.
+    if (this.starting) {
+      workerNode.info.ready = true
+    }
+    return this.workerNodes.push(workerNode)
   }
 
   /**
@@ -1146,6 +1167,12 @@ export abstract class AbstractPool<
     }
   }
 
+  /**
+   * Executes the given task on the given worker.
+   *
+   * @param worker - The worker.
+   * @param task - The task to execute.
+   */
   private executeTask (workerNodeKey: number, task: Task<Data>): void {
     this.beforeTaskExecutionHook(workerNodeKey, task)
     this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)