fix: fix possible null exception at handling task execution response
[poolifier.git] / src / pools / abstract-pool.ts
index 6b95be9482664deaa1131eced2ccbe3ccb48ca8a..635b8af459363dc022671f089492f155b3470695 100644 (file)
@@ -56,7 +56,8 @@ import {
   checkFilePath,
   checkValidTasksQueueOptions,
   checkValidWorkerChoiceStrategy,
-  updateMeasurementStatistics
+  updateMeasurementStatistics,
+  waitWorkerNodeEvents
 } from './utils'
 
 /**
@@ -1045,9 +1046,9 @@ export abstract class AbstractPool<
    */
   protected async destroyWorkerNode (workerNodeKey: number): Promise<void> {
     this.flagWorkerNodeAsNotReady(workerNodeKey)
-    this.flushTasksQueue(workerNodeKey)
-    // FIXME: wait for tasks to be finished
+    const flushedTasks = this.flushTasksQueue(workerNodeKey)
     const workerNode = this.workerNodes[workerNodeKey]
+    await waitWorkerNodeEvents(workerNode, 'taskFinished', flushedTasks)
     await this.sendKillMessageToWorker(workerNodeKey)
     await workerNode.terminate()
   }
@@ -1739,6 +1740,7 @@ export abstract class AbstractPool<
     const promiseResponse = this.promiseResponseMap.get(taskId as string)
     if (promiseResponse != null) {
       const { resolve, reject, workerNodeKey, asyncResource } = promiseResponse
+      const workerNode = this.workerNodes[workerNodeKey]
       if (workerError != null) {
         this.emitter?.emit(PoolEvents.taskError, workerError)
         asyncResource != null
@@ -1757,8 +1759,9 @@ export abstract class AbstractPool<
       this.afterTaskExecutionHook(workerNodeKey, message)
       this.workerChoiceStrategyContext.update(workerNodeKey)
       this.promiseResponseMap.delete(taskId as string)
+      workerNode.emit('taskFinished', taskId)
       if (this.opts.enableTasksQueue === true) {
-        const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
+        const workerNodeTasksUsage = workerNode.usage.tasks
         if (
           this.tasksQueueSize(workerNodeKey) > 0 &&
           workerNodeTasksUsage.executing <
@@ -1774,7 +1777,7 @@ export abstract class AbstractPool<
           this.tasksQueueSize(workerNodeKey) === 0 &&
           workerNodeTasksUsage.sequentiallyStolen === 0
         ) {
-          this.workerNodes[workerNodeKey].emit('idleWorkerNode', {
+          workerNode.emit('idleWorkerNode', {
             workerId: workerId as number,
             workerNodeKey
           })
@@ -1912,14 +1915,17 @@ export abstract class AbstractPool<
     return this.workerNodes[workerNodeKey].tasksQueueSize()
   }
 
-  protected flushTasksQueue (workerNodeKey: number): void {
+  protected flushTasksQueue (workerNodeKey: number): number {
+    let flushedTasks = 0
     while (this.tasksQueueSize(workerNodeKey) > 0) {
       this.executeTask(
         workerNodeKey,
         this.dequeueTask(workerNodeKey) as Task<Data>
       )
+      ++flushedTasks
     }
     this.workerNodes[workerNodeKey].clearTasksQueue()
+    return flushedTasks
   }
 
   private flushTasksQueues (): void {