feat: add pool and worker readyness tracking infrastructure
[poolifier.git] / src / pools / abstract-pool.ts
index 53bce7d2b7523f6a819cc5fd2dba3cbf5c84cc6a..6c9a0fd185f7c560f4f7fa5ac597ab784f6d6dbd 100644 (file)
@@ -153,7 +153,19 @@ export abstract class AbstractPool<
         'Cannot instantiate a pool with a negative number of workers'
       )
     } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
-      throw new Error('Cannot instantiate a fixed pool with no worker')
+      throw new RangeError('Cannot instantiate a fixed pool with zero worker')
+    }
+  }
+
+  protected checkDynamicPoolSize (min: number, max: number): void {
+    if (this.type === PoolTypes.dynamic && min > max) {
+      throw new RangeError(
+        'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
+      )
+    } else if (this.type === PoolTypes.dynamic && min === max) {
+      throw new RangeError(
+        'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
+      )
     }
   }
 
@@ -252,6 +264,8 @@ export abstract class AbstractPool<
       version,
       type: this.type,
       worker: this.worker,
+      ready: this.ready,
+      strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
       minSize: this.minSize,
       maxSize: this.maxSize,
       ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
@@ -381,6 +395,19 @@ export abstract class AbstractPool<
     }
   }
 
+  private get starting (): boolean {
+    return (
+      !this.full ||
+      (this.full && this.workerNodes.some(workerNode => !workerNode.info.ready))
+    )
+  }
+
+  private get ready (): boolean {
+    return (
+      this.full && this.workerNodes.every(workerNode => workerNode.info.ready)
+    )
+  }
+
   /**
    * Gets the approximate pool utilization.
    *
@@ -864,6 +891,13 @@ export abstract class AbstractPool<
   protected afterWorkerSetup (worker: Worker): void {
     // Listen to worker messages.
     this.registerWorkerMessageListener(worker, this.workerListener())
+    // Send startup message to worker.
+    this.sendToWorker(worker, {
+      ready: false,
+      workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id
+    })
+    // Setup worker task statistics computation.
+    this.setWorkerStatistics(worker)
   }
 
   /**
@@ -883,7 +917,7 @@ export abstract class AbstractPool<
       if (this.opts.enableTasksQueue === true) {
         this.redistributeQueuedTasks(worker)
       }
-      if (this.opts.restartWorkerOnError === true) {
+      if (this.opts.restartWorkerOnError === true && !this.starting) {
         if (this.getWorkerInfo(this.getWorkerNodeKey(worker)).dynamic) {
           this.createAndSetupDynamicWorker()
         } else {
@@ -899,8 +933,6 @@ export abstract class AbstractPool<
 
     this.pushWorkerNode(worker)
 
-    this.setWorkerStatistics(worker)
-
     this.afterWorkerSetup(worker)
 
     return worker
@@ -941,7 +973,6 @@ export abstract class AbstractPool<
    */
   protected createAndSetupDynamicWorker (): Worker {
     const worker = this.createAndSetupWorker()
-    this.getWorkerInfo(this.getWorkerNodeKey(worker)).dynamic = true
     this.registerWorkerMessageListener(worker, message => {
       const workerNodeKey = this.getWorkerNodeKey(worker)
       if (
@@ -957,6 +988,7 @@ export abstract class AbstractPool<
         void (this.destroyWorker(worker) as Promise<void>)
       }
     })
+    this.getWorkerInfo(this.getWorkerNodeKey(worker)).dynamic = true
     this.sendToWorker(worker, { checkAlive: true })
     return worker
   }
@@ -968,7 +1000,7 @@ export abstract class AbstractPool<
    */
   protected workerListener (): (message: MessageValue<Response>) => void {
     return message => {
-      if (message.workerId != null && message.ready != null) {
+      if (message.ready != null && message.workerId != null) {
         // Worker ready message received
         this.handleWorkerReadyMessage(message)
       } else if (message.id != null) {
@@ -990,6 +1022,9 @@ export abstract class AbstractPool<
         }'`
       )
     }
+    if (this.emitter != null && this.ready) {
+      this.emitter.emit(PoolEvents.ready, this.info)
+    }
   }
 
   private handleTaskExecutionResponse (message: MessageValue<Response>): void {