fix: fix possible null exception at handling task execution response
[poolifier.git] / src / pools / abstract-pool.ts
index 282979852bf564de38942b3d8c86f1dddf0c1831..635b8af459363dc022671f089492f155b3470695 100644 (file)
@@ -56,7 +56,8 @@ import {
   checkFilePath,
   checkValidTasksQueueOptions,
   checkValidWorkerChoiceStrategy,
-  updateMeasurementStatistics
+  updateMeasurementStatistics,
+  waitWorkerNodeEvents
 } from './utils'
 
 /**
@@ -1045,9 +1046,9 @@ export abstract class AbstractPool<
    */
   protected async destroyWorkerNode (workerNodeKey: number): Promise<void> {
     this.flagWorkerNodeAsNotReady(workerNodeKey)
-    this.flushTasksQueue(workerNodeKey)
-    // FIXME: wait for tasks to be finished
+    const flushedTasks = this.flushTasksQueue(workerNodeKey)
     const workerNode = this.workerNodes[workerNodeKey]
+    await waitWorkerNodeEvents(workerNode, 'taskFinished', flushedTasks)
     await this.sendKillMessageToWorker(workerNodeKey)
     await workerNode.terminate()
   }
@@ -1285,7 +1286,6 @@ export abstract class AbstractPool<
       this.opts.errorHandler ?? EMPTY_FUNCTION
     )
     workerNode.registerWorkerEventHandler('error', (error: Error) => {
-      const workerNodeKey = this.getWorkerNodeKeyByWorker(workerNode.worker)
       workerNode.info.ready = false
       this.emitter?.emit(PoolEvents.error, error)
       if (
@@ -1301,7 +1301,7 @@ export abstract class AbstractPool<
         }
       }
       if (this.started && this.opts.enableTasksQueue === true) {
-        this.redistributeQueuedTasks(workerNodeKey)
+        this.redistributeQueuedTasks(this.workerNodes.indexOf(workerNode))
       }
       workerNode.terminate().catch(error => {
         this.emitter?.emit(PoolEvents.error, error)
@@ -1312,7 +1312,7 @@ export abstract class AbstractPool<
       this.opts.exitHandler ?? EMPTY_FUNCTION
     )
     workerNode.registerOnceWorkerEventHandler('exit', () => {
-      this.removeWorkerNode(workerNode.worker)
+      this.removeWorkerNode(workerNode)
     })
     const workerNodeKey = this.addWorkerNode(workerNode)
     this.afterWorkerNodeSetup(workerNodeKey)
@@ -1740,6 +1740,7 @@ export abstract class AbstractPool<
     const promiseResponse = this.promiseResponseMap.get(taskId as string)
     if (promiseResponse != null) {
       const { resolve, reject, workerNodeKey, asyncResource } = promiseResponse
+      const workerNode = this.workerNodes[workerNodeKey]
       if (workerError != null) {
         this.emitter?.emit(PoolEvents.taskError, workerError)
         asyncResource != null
@@ -1758,8 +1759,9 @@ export abstract class AbstractPool<
       this.afterTaskExecutionHook(workerNodeKey, message)
       this.workerChoiceStrategyContext.update(workerNodeKey)
       this.promiseResponseMap.delete(taskId as string)
+      workerNode.emit('taskFinished', taskId)
       if (this.opts.enableTasksQueue === true) {
-        const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
+        const workerNodeTasksUsage = workerNode.usage.tasks
         if (
           this.tasksQueueSize(workerNodeKey) > 0 &&
           workerNodeTasksUsage.executing <
@@ -1775,7 +1777,7 @@ export abstract class AbstractPool<
           this.tasksQueueSize(workerNodeKey) === 0 &&
           workerNodeTasksUsage.sequentiallyStolen === 0
         ) {
-          this.workerNodes[workerNodeKey].emit('idleWorkerNode', {
+          workerNode.emit('idleWorkerNode', {
             workerId: workerId as number,
             workerNodeKey
           })
@@ -1854,12 +1856,12 @@ export abstract class AbstractPool<
   }
 
   /**
-   * Removes the worker node associated to the given worker from the pool worker nodes.
+   * Removes the worker node from the pool worker nodes.
    *
-   * @param worker - The worker.
+   * @param workerNode - The worker node.
    */
-  private removeWorkerNode (worker: Worker): void {
-    const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
+  private removeWorkerNode (workerNode: IWorkerNode<Worker, Data>): void {
+    const workerNodeKey = this.workerNodes.indexOf(workerNode)
     if (workerNodeKey !== -1) {
       this.workerNodes.splice(workerNodeKey, 1)
       this.workerChoiceStrategyContext.remove(workerNodeKey)
@@ -1913,14 +1915,17 @@ export abstract class AbstractPool<
     return this.workerNodes[workerNodeKey].tasksQueueSize()
   }
 
-  protected flushTasksQueue (workerNodeKey: number): void {
+  protected flushTasksQueue (workerNodeKey: number): number {
+    let flushedTasks = 0
     while (this.tasksQueueSize(workerNodeKey) > 0) {
       this.executeTask(
         workerNodeKey,
         this.dequeueTask(workerNodeKey) as Task<Data>
       )
+      ++flushedTasks
     }
     this.workerNodes[workerNodeKey].clearTasksQueue()
+    return flushedTasks
   }
 
   private flushTasksQueues (): void {