fix: ensure worker node destroy semantic is always the same
authorJérôme Benoit <jerome.benoit@sap.com>
Fri, 21 Jul 2023 18:29:08 +0000 (20:29 +0200)
committerJérôme Benoit <jerome.benoit@sap.com>
Fri, 21 Jul 2023 18:29:08 +0000 (20:29 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
src/pools/abstract-pool.ts
src/pools/cluster/fixed.ts
src/pools/thread/fixed.ts

index 56dd7fa9296265078f863d342338da0457309919..7c71c8557f0edcdc6a31ea12fd67649b2a8d0521 100644 (file)
@@ -10,7 +10,6 @@ import {
   DEFAULT_TASK_NAME,
   DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
   EMPTY_FUNCTION,
-  isAsyncFunction,
   isKillBehavior,
   isPlainObject,
   median,
@@ -659,16 +658,8 @@ export abstract class AbstractPool<
   /** @inheritDoc */
   public async destroy (): Promise<void> {
     await Promise.all(
-      this.workerNodes.map(async (workerNode, workerNodeKey) => {
-        this.flushTasksQueue(workerNodeKey)
-        // FIXME: wait for tasks to be finished
-        const workerExitPromise = new Promise<void>(resolve => {
-          workerNode.worker.on('exit', () => {
-            resolve()
-          })
-        })
+      this.workerNodes.map(async (_, workerNodeKey) => {
         await this.destroyWorkerNode(workerNodeKey)
-        await workerExitPromise
       })
     )
   }
@@ -678,9 +669,7 @@ export abstract class AbstractPool<
    *
    * @param workerNodeKey - The worker node key.
    */
-  protected abstract destroyWorkerNode (
-    workerNodeKey: number
-  ): void | Promise<void>
+  protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
 
   /**
    * Setup hook to execute code before worker nodes are created in the abstract constructor.
@@ -910,6 +899,7 @@ export abstract class AbstractPool<
         message.workerId
       )
       const workerUsage = this.workerNodes[localWorkerNodeKey].usage
+      // Kill message received from worker
       if (
         isKillBehavior(KillBehaviors.HARD, message.kill) ||
         (message.kill != null &&
@@ -919,17 +909,7 @@ export abstract class AbstractPool<
               workerUsage.tasks.executing === 0 &&
               this.tasksQueueSize(localWorkerNodeKey) === 0)))
       ) {
-        // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
-        const destroyWorkerNodeBounded = this.destroyWorkerNode.bind(this)
-        if (isAsyncFunction(destroyWorkerNodeBounded)) {
-          (
-            destroyWorkerNodeBounded as (workerNodeKey: number) => Promise<void>
-          )(localWorkerNodeKey).catch(EMPTY_FUNCTION)
-        } else {
-          (destroyWorkerNodeBounded as (workerNodeKey: number) => void)(
-            localWorkerNodeKey
-          )
-        }
+        this.destroyWorkerNode(localWorkerNodeKey).catch(EMPTY_FUNCTION)
       }
     })
     const workerInfo = this.getWorkerInfo(workerNodeKey)
@@ -1050,10 +1030,10 @@ export abstract class AbstractPool<
     return message => {
       this.checkMessageWorkerId(message)
       if (message.ready != null) {
-        // Worker ready response received
+        // Worker ready response received from worker
         this.handleWorkerReadyResponse(message)
       } else if (message.id != null) {
-        // Task execution response received
+        // Task execution response received from worker
         this.handleTaskExecutionResponse(message)
       }
     }
@@ -1173,7 +1153,7 @@ export abstract class AbstractPool<
     return this.workerNodes[workerNodeKey].tasksQueueSize()
   }
 
-  private flushTasksQueue (workerNodeKey: number): void {
+  protected flushTasksQueue (workerNodeKey: number): void {
     while (this.tasksQueueSize(workerNodeKey) > 0) {
       this.executeTask(
         workerNodeKey,
index eadbfcfe4fa96af126629ab3a851d47c2b38e5d5..8e264359b2ceed7a5efeab91d1488112b4f1dcb7 100644 (file)
@@ -60,13 +60,21 @@ export class FixedClusterPool<
   }
 
   /** @inheritDoc */
-  protected destroyWorkerNode (workerNodeKey: number): void {
+  protected async destroyWorkerNode (workerNodeKey: number): Promise<void> {
+    this.flushTasksQueue(workerNodeKey)
+    // FIXME: wait for tasks to be finished
     const worker = this.workerNodes[workerNodeKey].worker
-    this.sendToWorker(workerNodeKey, { kill: true, workerId: worker.id })
+    const workerExitPromise = new Promise<void>(resolve => {
+      worker.on('exit', () => {
+        resolve()
+      })
+    })
     worker.on('disconnect', () => {
       worker.kill()
     })
+    this.sendToWorker(workerNodeKey, { kill: true, workerId: worker.id })
     worker.disconnect()
+    await workerExitPromise
   }
 
   /** @inheritDoc */
index f4f712dfd6cc6c720c5872291670c59cc4e3aef3..a575d0972ff6611fadcbb662e195f31bf118a6d7 100644 (file)
@@ -57,11 +57,19 @@ export class FixedThreadPool<
 
   /** @inheritDoc */
   protected async destroyWorkerNode (workerNodeKey: number): Promise<void> {
+    this.flushTasksQueue(workerNodeKey)
+    // FIXME: wait for tasks to be finished
     const workerNode = this.workerNodes[workerNodeKey]
     const worker = workerNode.worker
+    const workerExitPromise = new Promise<void>(resolve => {
+      worker.on('exit', () => {
+        resolve()
+      })
+    })
     this.sendToWorker(workerNodeKey, { kill: true, workerId: worker.threadId })
     workerNode.closeChannel()
     await worker.terminate()
+    await workerExitPromise
   }
 
   /** @inheritDoc */