build(deps-dev): bump @types/node
[poolifier.git] / src / pools / abstract-pool.ts
index a0fc76c371584d3e69855fe732d720db482cc0bf..00cf56171f181184ff93fe9f24b4a107af188fee 100644 (file)
@@ -495,10 +495,7 @@ export abstract class AbstractPool<
   private checkMessageWorkerId (message: MessageValue<Data | Response>): void {
     if (message.workerId == null) {
       throw new Error('Worker message received without worker id')
-    } else if (
-      message.workerId != null &&
-      this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
-    ) {
+    } else if (this.getWorkerNodeKeyByWorkerId(message.workerId) === -1) {
       throw new Error(
         `Worker message received from unknown worker '${message.workerId}'`
       )
@@ -622,7 +619,7 @@ export abstract class AbstractPool<
     for (const [workerNodeKey] of this.workerNodes.entries()) {
       this.workerNodes[workerNodeKey].addEventListener(
         'emptyqueue',
-        this.handleEmptyQueueEvent
+        this.handleEmptyQueueEvent as EventListener
       )
     }
   }
@@ -631,7 +628,7 @@ export abstract class AbstractPool<
     for (const [workerNodeKey] of this.workerNodes.entries()) {
       this.workerNodes[workerNodeKey].removeEventListener(
         'emptyqueue',
-        this.handleEmptyQueueEvent
+        this.handleEmptyQueueEvent as EventListener
       )
     }
   }
@@ -640,7 +637,7 @@ export abstract class AbstractPool<
     for (const [workerNodeKey] of this.workerNodes.entries()) {
       this.workerNodes[workerNodeKey].addEventListener(
         'backpressure',
-        this.handleBackPressureEvent
+        this.handleBackPressureEvent as EventListener
       )
     }
   }
@@ -649,7 +646,7 @@ export abstract class AbstractPool<
     for (const [workerNodeKey] of this.workerNodes.entries()) {
       this.workerNodes[workerNodeKey].removeEventListener(
         'backpressure',
-        this.handleBackPressureEvent
+        this.handleBackPressureEvent as EventListener
       )
     }
   }
@@ -977,6 +974,7 @@ export abstract class AbstractPool<
           )
         }
       }
+      // FIXME: should be registered only once
       this.registerWorkerMessageListener(workerNodeKey, killMessageListener)
       this.sendToWorker(workerNodeKey, { kill: true })
     })
@@ -1222,8 +1220,8 @@ export abstract class AbstractPool<
     worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
     worker.on('error', error => {
       const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
+      this.flagWorkerNodeAsNotReady(workerNodeKey)
       const workerInfo = this.getWorkerInfo(workerNodeKey)
-      workerInfo.ready = false
       this.emitter?.emit(PoolEvents.error, error)
       this.workerNodes[workerNodeKey].closeChannel()
       if (
@@ -1276,6 +1274,8 @@ export abstract class AbstractPool<
               workerUsage.tasks.executing === 0 &&
               this.tasksQueueSize(localWorkerNodeKey) === 0)))
       ) {
+        // Flag the worker node as not ready immediately
+        this.flagWorkerNodeAsNotReady(localWorkerNodeKey)
         this.destroyWorkerNode(localWorkerNodeKey).catch(error => {
           this.emitter?.emit(PoolEvents.error, error)
         })
@@ -1366,13 +1366,13 @@ export abstract class AbstractPool<
       if (this.opts.tasksQueueOptions?.taskStealing === true) {
         this.workerNodes[workerNodeKey].addEventListener(
           'emptyqueue',
-          this.handleEmptyQueueEvent
+          this.handleEmptyQueueEvent as EventListener
         )
       }
       if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
         this.workerNodes[workerNodeKey].addEventListener(
           'backpressure',
-          this.handleBackPressureEvent
+          this.handleBackPressureEvent as EventListener
         )
       }
     }
@@ -1601,7 +1601,7 @@ export abstract class AbstractPool<
    * @returns The worker information.
    */
   protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
-    return this.workerNodes[workerNodeKey].info
+    return this.workerNodes[workerNodeKey]?.info
   }
 
   /**
@@ -1641,6 +1641,10 @@ export abstract class AbstractPool<
     }
   }
 
+  protected flagWorkerNodeAsNotReady (workerNodeKey: number): void {
+    this.getWorkerInfo(workerNodeKey).ready = false
+  }
+
   /** @inheritDoc */
   public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
     return (