fix: ensure worker node destroy semantic is always the same
[poolifier.git] / src / pools / abstract-pool.ts
index 56dd7fa9296265078f863d342338da0457309919..7c71c8557f0edcdc6a31ea12fd67649b2a8d0521 100644 (file)
@@ -10,7 +10,6 @@ import {
   DEFAULT_TASK_NAME,
   DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
   EMPTY_FUNCTION,
-  isAsyncFunction,
   isKillBehavior,
   isPlainObject,
   median,
@@ -659,16 +658,8 @@ export abstract class AbstractPool<
   /** @inheritDoc */
   public async destroy (): Promise<void> {
     await Promise.all(
-      this.workerNodes.map(async (workerNode, workerNodeKey) => {
-        this.flushTasksQueue(workerNodeKey)
-        // FIXME: wait for tasks to be finished
-        const workerExitPromise = new Promise<void>(resolve => {
-          workerNode.worker.on('exit', () => {
-            resolve()
-          })
-        })
+      this.workerNodes.map(async (_, workerNodeKey) => {
         await this.destroyWorkerNode(workerNodeKey)
-        await workerExitPromise
       })
     )
   }
@@ -678,9 +669,7 @@ export abstract class AbstractPool<
    *
    * @param workerNodeKey - The worker node key.
    */
-  protected abstract destroyWorkerNode (
-    workerNodeKey: number
-  ): void | Promise<void>
+  protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
 
   /**
    * Setup hook to execute code before worker nodes are created in the abstract constructor.
@@ -910,6 +899,7 @@ export abstract class AbstractPool<
         message.workerId
       )
       const workerUsage = this.workerNodes[localWorkerNodeKey].usage
+      // Kill message received from worker
       if (
         isKillBehavior(KillBehaviors.HARD, message.kill) ||
         (message.kill != null &&
@@ -919,17 +909,7 @@ export abstract class AbstractPool<
               workerUsage.tasks.executing === 0 &&
               this.tasksQueueSize(localWorkerNodeKey) === 0)))
       ) {
-        // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
-        const destroyWorkerNodeBounded = this.destroyWorkerNode.bind(this)
-        if (isAsyncFunction(destroyWorkerNodeBounded)) {
-          (
-            destroyWorkerNodeBounded as (workerNodeKey: number) => Promise<void>
-          )(localWorkerNodeKey).catch(EMPTY_FUNCTION)
-        } else {
-          (destroyWorkerNodeBounded as (workerNodeKey: number) => void)(
-            localWorkerNodeKey
-          )
-        }
+        this.destroyWorkerNode(localWorkerNodeKey).catch(EMPTY_FUNCTION)
       }
     })
     const workerInfo = this.getWorkerInfo(workerNodeKey)
@@ -1050,10 +1030,10 @@ export abstract class AbstractPool<
     return message => {
       this.checkMessageWorkerId(message)
       if (message.ready != null) {
-        // Worker ready response received
+        // Worker ready response received from worker
         this.handleWorkerReadyResponse(message)
       } else if (message.id != null) {
-        // Task execution response received
+        // Task execution response received from worker
         this.handleTaskExecutionResponse(message)
       }
     }
@@ -1173,7 +1153,7 @@ export abstract class AbstractPool<
     return this.workerNodes[workerNodeKey].tasksQueueSize()
   }
 
-  private flushTasksQueue (workerNodeKey: number): void {
+  protected flushTasksQueue (workerNodeKey: number): void {
     while (this.tasksQueueSize(workerNodeKey) > 0) {
       this.executeTask(
         workerNodeKey,