fix: fix possible null exception at handling task execution response
[poolifier.git] / src / pools / abstract-pool.ts
index 56be9a1a9ffe0ca22131b40d9308adab87b9894a..635b8af459363dc022671f089492f155b3470695 100644 (file)
@@ -56,7 +56,8 @@ import {
   checkFilePath,
   checkValidTasksQueueOptions,
   checkValidWorkerChoiceStrategy,
-  updateMeasurementStatistics
+  updateMeasurementStatistics,
+  waitWorkerNodeEvents
 } from './utils'
 
 /**
@@ -1043,7 +1044,14 @@ export abstract class AbstractPool<
    *
    * @param workerNodeKey - The worker node key.
    */
-  protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
+  protected async destroyWorkerNode (workerNodeKey: number): Promise<void> {
+    this.flagWorkerNodeAsNotReady(workerNodeKey)
+    const flushedTasks = this.flushTasksQueue(workerNodeKey)
+    const workerNode = this.workerNodes[workerNodeKey]
+    await waitWorkerNodeEvents(workerNode, 'taskFinished', flushedTasks)
+    await this.sendKillMessageToWorker(workerNodeKey)
+    await workerNode.terminate()
+  }
 
   /**
    * Setup hook to execute code before worker nodes are created in the abstract constructor.
@@ -1278,33 +1286,33 @@ export abstract class AbstractPool<
       this.opts.errorHandler ?? EMPTY_FUNCTION
     )
     workerNode.registerWorkerEventHandler('error', (error: Error) => {
-      const workerNodeKey = this.getWorkerNodeKeyByWorker(workerNode.worker)
-      this.flagWorkerNodeAsNotReady(workerNodeKey)
-      const workerInfo = this.getWorkerInfo(workerNodeKey)
+      workerNode.info.ready = false
       this.emitter?.emit(PoolEvents.error, error)
-      this.workerNodes[workerNodeKey].closeChannel()
       if (
         this.started &&
         !this.starting &&
         !this.destroying &&
         this.opts.restartWorkerOnError === true
       ) {
-        if (workerInfo.dynamic) {
+        if (workerNode.info.dynamic) {
           this.createAndSetupDynamicWorkerNode()
         } else {
           this.createAndSetupWorkerNode()
         }
       }
       if (this.started && this.opts.enableTasksQueue === true) {
-        this.redistributeQueuedTasks(workerNodeKey)
+        this.redistributeQueuedTasks(this.workerNodes.indexOf(workerNode))
       }
+      workerNode.terminate().catch(error => {
+        this.emitter?.emit(PoolEvents.error, error)
+      })
     })
     workerNode.registerWorkerEventHandler(
       'exit',
       this.opts.exitHandler ?? EMPTY_FUNCTION
     )
     workerNode.registerOnceWorkerEventHandler('exit', () => {
-      this.removeWorkerNode(workerNode.worker)
+      this.removeWorkerNode(workerNode)
     })
     const workerNodeKey = this.addWorkerNode(workerNode)
     this.afterWorkerNodeSetup(workerNodeKey)
@@ -1732,6 +1740,7 @@ export abstract class AbstractPool<
     const promiseResponse = this.promiseResponseMap.get(taskId as string)
     if (promiseResponse != null) {
       const { resolve, reject, workerNodeKey, asyncResource } = promiseResponse
+      const workerNode = this.workerNodes[workerNodeKey]
       if (workerError != null) {
         this.emitter?.emit(PoolEvents.taskError, workerError)
         asyncResource != null
@@ -1750,8 +1759,9 @@ export abstract class AbstractPool<
       this.afterTaskExecutionHook(workerNodeKey, message)
       this.workerChoiceStrategyContext.update(workerNodeKey)
       this.promiseResponseMap.delete(taskId as string)
+      workerNode.emit('taskFinished', taskId)
       if (this.opts.enableTasksQueue === true) {
-        const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
+        const workerNodeTasksUsage = workerNode.usage.tasks
         if (
           this.tasksQueueSize(workerNodeKey) > 0 &&
           workerNodeTasksUsage.executing <
@@ -1767,7 +1777,7 @@ export abstract class AbstractPool<
           this.tasksQueueSize(workerNodeKey) === 0 &&
           workerNodeTasksUsage.sequentiallyStolen === 0
         ) {
-          this.workerNodes[workerNodeKey].emit('idleWorkerNode', {
+          workerNode.emit('idleWorkerNode', {
             workerId: workerId as number,
             workerNodeKey
           })
@@ -1846,12 +1856,12 @@ export abstract class AbstractPool<
   }
 
   /**
-   * Removes the worker node associated to the give given worker from the pool worker nodes.
+   * Removes the worker node from the pool worker nodes.
    *
-   * @param worker - The worker.
+   * @param workerNode - The worker node.
    */
-  private removeWorkerNode (worker: Worker): void {
-    const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
+  private removeWorkerNode (workerNode: IWorkerNode<Worker, Data>): void {
+    const workerNodeKey = this.workerNodes.indexOf(workerNode)
     if (workerNodeKey !== -1) {
       this.workerNodes.splice(workerNodeKey, 1)
       this.workerChoiceStrategyContext.remove(workerNodeKey)
@@ -1905,14 +1915,17 @@ export abstract class AbstractPool<
     return this.workerNodes[workerNodeKey].tasksQueueSize()
   }
 
-  protected flushTasksQueue (workerNodeKey: number): void {
+  protected flushTasksQueue (workerNodeKey: number): number {
+    let flushedTasks = 0
     while (this.tasksQueueSize(workerNodeKey) > 0) {
       this.executeTask(
         workerNodeKey,
         this.dequeueTask(workerNodeKey) as Task<Data>
       )
+      ++flushedTasks
     }
     this.workerNodes[workerNodeKey].clearTasksQueue()
+    return flushedTasks
   }
 
   private flushTasksQueues (): void {