fix: ensure worker node is defined before emitting
[poolifier.git] / src / pools / abstract-pool.ts
index 4170b8b61cd6f6e1f0de861ca6c94ee6278cb58f..65ace6d907a20ca5a88d820d638c1aaa1a6f8bf1 100644 (file)
@@ -703,7 +703,7 @@ export abstract class AbstractPool<
         message: MessageValue<Response>
       ): void => {
         this.checkMessageWorkerId(message)
-        const workerId = this.getWorkerInfo(workerNodeKey).id
+        const workerId = this.getWorkerInfo(workerNodeKey)?.id
         if (
           message.taskFunctionOperationStatus != null &&
           message.workerId === workerId
@@ -1061,7 +1061,7 @@ export abstract class AbstractPool<
     task: Task<Data>
   ): void {
     // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
-    if (this.workerNodes[workerNodeKey].usage != null) {
+    if (this.workerNodes[workerNodeKey]?.usage != null) {
       const workerUsage = this.workerNodes[workerNodeKey].usage
       ++workerUsage.tasks.executing
       updateWaitTimeWorkerUsage(
@@ -1103,7 +1103,7 @@ export abstract class AbstractPool<
   ): void {
     let needWorkerChoiceStrategyUpdate = false
     // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
-    if (this.workerNodes[workerNodeKey].usage != null) {
+    if (this.workerNodes[workerNodeKey]?.usage != null) {
       const workerUsage = this.workerNodes[workerNodeKey].usage
       updateTaskStatisticsWorkerUsage(workerUsage, message)
       updateRunTimeWorkerUsage(
@@ -1157,6 +1157,7 @@ export abstract class AbstractPool<
   private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
     const workerInfo = this.getWorkerInfo(workerNodeKey)
     return (
+      workerInfo != null &&
       Array.isArray(workerInfo.taskFunctionNames) &&
       workerInfo.taskFunctionNames.length > 2
     )
@@ -1180,7 +1181,7 @@ export abstract class AbstractPool<
       }
     }
     // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
-    return this.workerChoiceStrategyContext!.execute()!
+    return this.workerChoiceStrategyContext!.execute()
   }
 
   /**
@@ -1454,7 +1455,7 @@ export abstract class AbstractPool<
   ): void {
     const workerNode = this.workerNodes[workerNodeKey]
     // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
-    if (workerNode.usage != null) {
+    if (workerNode?.usage != null) {
       ++workerNode.usage.tasks.stolen
     }
     if (
@@ -1473,7 +1474,7 @@ export abstract class AbstractPool<
   ): void {
     const workerNode = this.workerNodes[workerNodeKey]
     // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
-    if (workerNode.usage != null) {
+    if (workerNode?.usage != null) {
       ++workerNode.usage.tasks.sequentiallyStolen
     }
   }
@@ -1499,7 +1500,7 @@ export abstract class AbstractPool<
   ): void {
     const workerNode = this.workerNodes[workerNodeKey]
     // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
-    if (workerNode.usage != null) {
+    if (workerNode?.usage != null) {
       workerNode.usage.tasks.sequentiallyStolen = 0
     }
   }
@@ -1536,13 +1537,14 @@ export abstract class AbstractPool<
       (this.info.stealingWorkerNodes ?? 0) >
         Math.floor(this.workerNodes.length / 2)
     ) {
-      if (previousStolenTask != null) {
+      if (workerInfo != null && previousStolenTask != null) {
         workerInfo.stealing = false
       }
       return
     }
     const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
     if (
+      workerInfo != null &&
       previousStolenTask != null &&
       workerNodeTasksUsage.sequentiallyStolen > 0 &&
       (workerNodeTasksUsage.executing > 0 ||
@@ -1560,6 +1562,11 @@ export abstract class AbstractPool<
       this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
       return
     }
+    if (workerInfo == null) {
+      throw new Error(
+        `Worker node with key '${workerNodeKey}' not found in pool`
+      )
+    }
     workerInfo.stealing = true
     const stolenTask = this.workerNodeStealTask(workerNodeKey)
     if (
@@ -1660,6 +1667,11 @@ export abstract class AbstractPool<
           this.opts.tasksQueueOptions!.size! - sizeOffset
       ) {
         const workerInfo = this.getWorkerInfo(workerNodeKey)
+        if (workerInfo == null) {
+          throw new Error(
+            `Worker node with key '${workerNodeKey}' not found in pool`
+          )
+        }
         workerInfo.stealing = true
         // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
         const task = sourceWorkerNode.popTask()!
@@ -1687,9 +1699,12 @@ export abstract class AbstractPool<
       this.handleTaskExecutionResponse(message)
     } else if (taskFunctionNames != null) {
       // Task function names message received from worker
-      this.getWorkerInfo(
+      const workerInfo = this.getWorkerInfo(
         this.getWorkerNodeKeyByWorkerId(workerId)
-      ).taskFunctionNames = taskFunctionNames
+      )
+      if (workerInfo != null) {
+        workerInfo.taskFunctionNames = taskFunctionNames
+      }
     }
   }
 
@@ -1733,7 +1748,8 @@ export abstract class AbstractPool<
       this.afterTaskExecutionHook(workerNodeKey, message)
       // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
       this.promiseResponseMap.delete(taskId!)
-      workerNode.emit('taskFinished', taskId)
+      // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
+      workerNode?.emit('taskFinished', taskId)
       if (this.opts.enableTasksQueue === true && !this.destroying) {
         const workerNodeTasksUsage = workerNode.usage.tasks
         if (
@@ -1783,13 +1799,8 @@ export abstract class AbstractPool<
    * @param workerNodeKey - The worker node key.
    * @returns The worker information.
    */
-  protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
-    const workerInfo = this.workerNodes[workerNodeKey]?.info
-    // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
-    if (workerInfo == null) {
-      throw new Error(`Worker node with key '${workerNodeKey}' not found`)
-    }
-    return workerInfo
+  protected getWorkerInfo (workerNodeKey: number): WorkerInfo | undefined {
+    return this.workerNodes[workerNodeKey]?.info
   }
 
   /**
@@ -1848,7 +1859,10 @@ export abstract class AbstractPool<
   }
 
   protected flagWorkerNodeAsNotReady (workerNodeKey: number): void {
-    this.getWorkerInfo(workerNodeKey).ready = false
+    const workerInfo = this.getWorkerInfo(workerNodeKey)
+    if (workerInfo != null) {
+      workerInfo.ready = false
+    }
   }
 
   private hasBackPressure (): boolean {