Merge dependabot/npm_and_yarn/examples/typescript/http-client-pool/types/node-20...
[poolifier.git] / src / pools / abstract-pool.ts
index 4f60dff0279643a864b8c570eaddf3e3af9e7546..07fec75f140a81ee10f39671274ecc4037a4fee3 100644 (file)
@@ -168,7 +168,7 @@ export abstract class AbstractPool<
     this.enqueueTask = this.enqueueTask.bind(this)
 
     if (this.opts.enableEvents === true) {
-      this.initializeEventEmitter()
+      this.initEventEmitter()
     }
     this.workerChoiceStrategiesContext = new WorkerChoiceStrategiesContext<
     Worker,
@@ -281,7 +281,7 @@ export abstract class AbstractPool<
     }
   }
 
-  private initializeEventEmitter (): void {
+  private initEventEmitter (): void {
     this.emitter = new EventEmitterAsyncResource({
       name: `poolifier:${this.type}-${this.worker}-pool`
     })
@@ -868,6 +868,9 @@ export abstract class AbstractPool<
     this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies(
       this.getWorkerWorkerChoiceStrategies()
     )
+    for (const workerNodeKey of this.workerNodes.keys()) {
+      this.sendStatisticsMessageToWorker(workerNodeKey)
+    }
     return opResult
   }
 
@@ -885,11 +888,16 @@ export abstract class AbstractPool<
         this.taskFunctions.get(name)
       )
     })
-    this.deleteTaskFunctionWorkerUsages(name)
+    for (const workerNode of this.workerNodes) {
+      workerNode.deleteTaskFunctionWorkerUsage(name)
+    }
     this.taskFunctions.delete(name)
     this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies(
       this.getWorkerWorkerChoiceStrategies()
     )
+    for (const workerNodeKey of this.workerNodes.keys()) {
+      this.sendStatisticsMessageToWorker(workerNodeKey)
+    }
     return opResult
   }
 
@@ -974,12 +982,6 @@ export abstract class AbstractPool<
     })
   }
 
-  private deleteTaskFunctionWorkerUsages (name: string): void {
-    for (const workerNode of this.workerNodes) {
-      workerNode.deleteTaskFunctionWorkerUsage(name)
-    }
-  }
-
   private shallExecuteTask (workerNodeKey: number): boolean {
     return (
       this.tasksQueueSize(workerNodeKey) === 0 &&
@@ -1061,7 +1063,7 @@ export abstract class AbstractPool<
   /**
    * Starts the minimum number of workers.
    */
-  private startMinimumNumberOfWorkers (): void {
+  private startMinimumNumberOfWorkers (initWorkerNodeUsage = false): void {
     this.startingMinimumNumberOfWorkers = true
     while (
       this.workerNodes.reduce(
@@ -1070,7 +1072,9 @@ export abstract class AbstractPool<
         0
       ) < this.minimumNumberOfWorkers
     ) {
-      this.createAndSetupWorkerNode()
+      const workerNodeKey = this.createAndSetupWorkerNode()
+      initWorkerNodeUsage &&
+        this.initWorkerNodeUsage(this.workerNodes[workerNodeKey])
     }
     this.startingMinimumNumberOfWorkers = false
   }
@@ -1234,7 +1238,7 @@ export abstract class AbstractPool<
     workerNodeKey: number,
     message: MessageValue<Response>
   ): void {
-    let needWorkerChoiceStrategyUpdate = false
+    let needWorkerChoiceStrategiesUpdate = false
     // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
     if (this.workerNodes[workerNodeKey]?.usage != null) {
       const workerUsage = this.workerNodes[workerNodeKey].usage
@@ -1249,7 +1253,7 @@ export abstract class AbstractPool<
         workerUsage,
         message
       )
-      needWorkerChoiceStrategyUpdate = true
+      needWorkerChoiceStrategiesUpdate = true
     }
     if (
       this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
@@ -1274,9 +1278,9 @@ export abstract class AbstractPool<
         taskFunctionWorkerUsage,
         message
       )
-      needWorkerChoiceStrategyUpdate = true
+      needWorkerChoiceStrategiesUpdate = true
     }
-    if (needWorkerChoiceStrategyUpdate) {
+    if (needWorkerChoiceStrategiesUpdate) {
       this.workerChoiceStrategiesContext?.update(workerNodeKey)
     }
   }
@@ -1338,6 +1342,44 @@ export abstract class AbstractPool<
     transferList?: readonly TransferListItem[]
   ): void
 
+  /**
+   * Initializes the worker node usage with sensible default values gathered during runtime.
+   *
+   * @param workerNode - The worker node.
+   */
+  private initWorkerNodeUsage (workerNode: IWorkerNode<Worker, Data>): void {
+    if (
+      this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
+        .runTime.aggregate === true
+    ) {
+      workerNode.usage.runTime.aggregate = min(
+        ...this.workerNodes.map(
+          workerNode => workerNode.usage.runTime.aggregate ?? Infinity
+        )
+      )
+    }
+    if (
+      this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
+        .waitTime.aggregate === true
+    ) {
+      workerNode.usage.waitTime.aggregate = min(
+        ...this.workerNodes.map(
+          workerNode => workerNode.usage.waitTime.aggregate ?? Infinity
+        )
+      )
+    }
+    if (
+      this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements().elu
+        .aggregate === true
+    ) {
+      workerNode.usage.elu.active.aggregate = min(
+        ...this.workerNodes.map(
+          workerNode => workerNode.usage.elu.active.aggregate ?? Infinity
+        )
+      )
+    }
+  }
+
   /**
    * Creates a new, completely set up worker node.
    *
@@ -1368,7 +1410,7 @@ export abstract class AbstractPool<
         if (workerNode.info.dynamic) {
           this.createAndSetupDynamicWorkerNode()
         } else if (!this.startingMinimumNumberOfWorkers) {
-          this.startMinimumNumberOfWorkers()
+          this.startMinimumNumberOfWorkers(true)
         }
       }
       if (
@@ -1394,7 +1436,7 @@ export abstract class AbstractPool<
         !this.startingMinimumNumberOfWorkers &&
         !this.destroying
       ) {
-        this.startMinimumNumberOfWorkers()
+        this.startMinimumNumberOfWorkers(true)
       }
     })
     const workerNodeKey = this.addWorkerNode(workerNode)
@@ -1459,6 +1501,7 @@ export abstract class AbstractPool<
     ) {
       workerNode.info.ready = true
     }
+    this.initWorkerNodeUsage(workerNode)
     this.checkAndEmitDynamicWorkerCreationEvents()
     return workerNodeKey
   }
@@ -1842,11 +1885,11 @@ export abstract class AbstractPool<
       this.handleWorkerReadyResponse(message)
     } else if (taskFunctionsProperties != null) {
       // Task function properties message received from worker
-      const workerInfo = this.getWorkerInfo(
-        this.getWorkerNodeKeyByWorkerId(workerId)
-      )
+      const workerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
+      const workerInfo = this.getWorkerInfo(workerNodeKey)
       if (workerInfo != null) {
         workerInfo.taskFunctionsProperties = taskFunctionsProperties
+        this.sendStatisticsMessageToWorker(workerNodeKey)
       }
     } else if (taskId != null) {
       // Task execution response received from worker
@@ -1866,10 +1909,11 @@ export abstract class AbstractPool<
     if (ready == null || !ready) {
       throw new Error(`Worker ${workerId} failed to initialize`)
     }
-    const workerNode =
-      this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
+    const workerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
+    const workerNode = this.workerNodes[workerNodeKey]
     workerNode.info.ready = ready
     workerNode.info.taskFunctionsProperties = taskFunctionsProperties
+    this.sendStatisticsMessageToWorker(workerNodeKey)
     this.checkAndEmitReadyEvent()
   }