fix: wait for queued tasks to end at worker termination
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Sun, 17 Dec 2023 19:32:16 +0000 (20:32 +0100)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Sun, 17 Dec 2023 19:32:16 +0000 (20:32 +0100)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
CHANGELOG.md
src/pools/abstract-pool.ts

index 107be86a152c1b24b78f841f8c4bfe5697ed8c80..642e0d528b5d499f2379f9ff2084aa8111b4e47d 100644 (file)
@@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
 
 ## [Unreleased]
 
+### Fixed
+
+- Wait for queued tasks to end at worker termination.
+
 ## [3.1.1] - 2023-12-16
 
 ### Fixed
index 8319a33eca58ec7954f987ef36289580622a8410..beacddf3852a1174d21ec8a69180d1521cebd42b 100644 (file)
@@ -56,8 +56,8 @@ import {
   checkFilePath,
   checkValidTasksQueueOptions,
   checkValidWorkerChoiceStrategy,
-  updateMeasurementStatistics
-  // waitWorkerNodeEvents
+  updateMeasurementStatistics,
+  waitWorkerNodeEvents
 } from './utils'
 
 /**
@@ -1046,14 +1046,9 @@ export abstract class AbstractPool<
    */
   protected async destroyWorkerNode (workerNodeKey: number): Promise<void> {
     this.flagWorkerNodeAsNotReady(workerNodeKey)
-    this.flushTasksQueue(workerNodeKey)
+    const flushedTasks = this.flushTasksQueue(workerNodeKey)
     const workerNode = this.workerNodes[workerNodeKey]
-    // FIXME: wait for tasks to be finished
-    // await waitWorkerNodeEvents(
-    //   workerNode,
-    //   'taskFinished',
-    //   workerNode.usage.tasks.executing
-    // )
+    await waitWorkerNodeEvents(workerNode, 'taskFinished', flushedTasks)
     await this.sendKillMessageToWorker(workerNodeKey)
     await workerNode.terminate()
   }
@@ -1919,14 +1914,17 @@ export abstract class AbstractPool<
     return this.workerNodes[workerNodeKey].tasksQueueSize()
   }
 
-  protected flushTasksQueue (workerNodeKey: number): void {
+  protected flushTasksQueue (workerNodeKey: number): number {
+    let flushedTasks = 0
     while (this.tasksQueueSize(workerNodeKey) > 0) {
       this.executeTask(
         workerNodeKey,
         this.dequeueTask(workerNodeKey) as Task<Data>
       )
+      ++flushedTasks
     }
     this.workerNodes[workerNodeKey].clearTasksQueue()
+    return flushedTasks
   }
 
   private flushTasksQueues (): void {