feat: untangle worker choice strategies tasks distribution and dynamic worker creatio...
[poolifier.git] / src / pools / abstract-pool.ts
index 59466274bd8c5740047b91e0c8b9936e3c4b3c8b..428dd9128ca8e26e20d5e766f6cab57b6c04ca2f 100644 (file)
@@ -709,7 +709,7 @@ export abstract class AbstractPool<
       }
       const timestamp = performance.now()
       const workerNodeKey = this.chooseWorkerNode()
-      const workerInfo = this.getWorkerInfo(workerNodeKey)
+      const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo
       if (
         name != null &&
         Array.isArray(workerInfo.taskFunctions) &&
@@ -805,15 +805,24 @@ export abstract class AbstractPool<
     workerNodeKey: number,
     task: Task<Data>
   ): void {
-    const workerUsage = this.workerNodes[workerNodeKey].usage
-    ++workerUsage.tasks.executing
-    this.updateWaitTimeWorkerUsage(workerUsage, task)
-    if (this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey)) {
+    if (this.workerNodes[workerNodeKey]?.usage != null) {
+      const workerUsage = this.workerNodes[workerNodeKey].usage
+      ++workerUsage.tasks.executing
+      this.updateWaitTimeWorkerUsage(workerUsage, task)
+    }
+    if (
+      this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
+      this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
+        task.name as string
+      ) != null
+    ) {
       const taskFunctionWorkerUsage = this.workerNodes[
         workerNodeKey
       ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
-      ++taskFunctionWorkerUsage.tasks.executing
-      this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
+      if (taskFunctionWorkerUsage != null) {
+        ++taskFunctionWorkerUsage.tasks.executing
+        this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
+      }
     }
   }
 
@@ -828,11 +837,18 @@ export abstract class AbstractPool<
     workerNodeKey: number,
     message: MessageValue<Response>
   ): void {
-    const workerUsage = this.workerNodes[workerNodeKey].usage
-    this.updateTaskStatisticsWorkerUsage(workerUsage, message)
-    this.updateRunTimeWorkerUsage(workerUsage, message)
-    this.updateEluWorkerUsage(workerUsage, message)
-    if (this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey)) {
+    if (this.workerNodes[workerNodeKey]?.usage != null) {
+      const workerUsage = this.workerNodes[workerNodeKey].usage
+      this.updateTaskStatisticsWorkerUsage(workerUsage, message)
+      this.updateRunTimeWorkerUsage(workerUsage, message)
+      this.updateEluWorkerUsage(workerUsage, message)
+    }
+    if (
+      this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
+      this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
+        message.taskPerformance?.name ?? DEFAULT_TASK_NAME
+      ) != null
+    ) {
       const taskFunctionWorkerUsage = this.workerNodes[
         workerNodeKey
       ].getTaskFunctionWorkerUsage(
@@ -853,6 +869,7 @@ export abstract class AbstractPool<
   private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
     const workerInfo = this.getWorkerInfo(workerNodeKey)
     return (
+      workerInfo != null &&
       Array.isArray(workerInfo.taskFunctions) &&
       workerInfo.taskFunctions.length > 2
     )
@@ -1002,7 +1019,7 @@ export abstract class AbstractPool<
     worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
     worker.on('error', (error) => {
       const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
-      const workerInfo = this.getWorkerInfo(workerNodeKey)
+      const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo
       workerInfo.ready = false
       this.workerNodes[workerNodeKey].closeChannel()
       this.emitter?.emit(PoolEvents.error, error)
@@ -1056,7 +1073,7 @@ export abstract class AbstractPool<
         })
       }
     })
-    const workerInfo = this.getWorkerInfo(workerNodeKey)
+    const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo
     this.sendToWorker(workerNodeKey, {
       checkActive: true,
       workerId: workerInfo.id as number
@@ -1121,7 +1138,7 @@ export abstract class AbstractPool<
         elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
           .elu.aggregate
       },
-      workerId: this.getWorkerInfo(workerNodeKey).id as number
+      workerId: (this.getWorkerInfo(workerNodeKey) as WorkerInfo).id as number
     })
   }
 
@@ -1131,7 +1148,7 @@ export abstract class AbstractPool<
       let minQueuedTasks = Infinity
       let executeTask = false
       for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
-        const workerInfo = this.getWorkerInfo(workerNodeId)
+        const workerInfo = this.getWorkerInfo(workerNodeId) as WorkerInfo
         if (
           workerNodeId !== workerNodeKey &&
           workerInfo.ready &&
@@ -1185,8 +1202,10 @@ export abstract class AbstractPool<
         this.handleTaskExecutionResponse(message)
       } else if (message.taskFunctions != null) {
         // Task functions message received from worker
-        this.getWorkerInfo(
-          this.getWorkerNodeKeyByWorkerId(message.workerId)
+        (
+          this.getWorkerInfo(
+            this.getWorkerNodeKeyByWorkerId(message.workerId)
+          ) as WorkerInfo
         ).taskFunctions = message.taskFunctions
       }
     }
@@ -1198,7 +1217,7 @@ export abstract class AbstractPool<
     }
     const workerInfo = this.getWorkerInfo(
       this.getWorkerNodeKeyByWorkerId(message.workerId)
-    )
+    ) as WorkerInfo
     workerInfo.ready = message.ready as boolean
     workerInfo.taskFunctions = message.taskFunctions
     if (this.emitter != null && this.ready) {
@@ -1260,8 +1279,8 @@ export abstract class AbstractPool<
    * @param workerNodeKey - The worker node key.
    * @returns The worker information.
    */
-  protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
-    return this.workerNodes[workerNodeKey].info
+  protected getWorkerInfo (workerNodeKey: number): WorkerInfo | undefined {
+    return this.workerNodes[workerNodeKey]?.info
   }
 
   /**