fix: wait for worker exit at pool destroy
authorJérôme Benoit <jerome.benoit@sap.com>
Mon, 3 Jul 2023 14:55:46 +0000 (16:55 +0200)
committerJérôme Benoit <jerome.benoit@sap.com>
Mon, 3 Jul 2023 14:55:46 +0000 (16:55 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
README.md
src/pools/abstract-pool.ts
src/pools/worker.ts
tests/pools/cluster/dynamic.test.js
tests/pools/thread/dynamic.test.js

index 53ea832dffd985bd05f8d714370be03c994e71f7..3001f098fdb0ba551ea6416bbabe5ab4b8d408f1 100644 (file)
--- a/README.md
+++ b/README.md
@@ -244,7 +244,7 @@ This method will call the terminate method on each worker.
 
 - `maxInactiveTime` (optional) - Max time to wait tasks to work on in milliseconds, after this period the new worker will die.  
   The last active time of your worker unit will be updated when a task is submitted to a worker or when a worker terminate a task.  
-  If `killBehavior` is set to `KillBehaviors.HARD` this value represents also the timeout for the tasks that you submit to the pool, when this timeout expires your tasks is interrupted and the worker is killed if is not part of the minimum size of the pool.  
+  If `killBehavior` is set to `KillBehaviors.HARD` this value represents also the timeout for the tasks that you submit to the pool, when this timeout expires your tasks is interrupted before completion and removed. The worker is killed if is not part of the minimum size of the pool.  
   If `killBehavior` is set to `KillBehaviors.SOFT` your tasks have no timeout and your workers will not be terminated until your task is completed.  
   Default: `60000`
 
index 232b0cc17e702e95c12bb0b083e351af3f624161..e9be347603df21b2f56c025438d81e78a0790d31 100644 (file)
@@ -505,7 +505,13 @@ export abstract class AbstractPool<
       this.workerNodes.map(async (workerNode, workerNodeKey) => {
         this.flushTasksQueue(workerNodeKey)
         // FIXME: wait for tasks to be finished
+        const workerExitPromise = new Promise<void>(resolve => {
+          workerNode.worker.on('exit', () => {
+            resolve()
+          })
+        })
         await this.destroyWorker(workerNode.worker)
+        await workerExitPromise
       })
     )
   }
@@ -987,13 +993,11 @@ export abstract class AbstractPool<
   }
 
   private flushTasksQueue (workerNodeKey: number): void {
-    if (this.tasksQueueSize(workerNodeKey) > 0) {
-      for (let i = 0; i < this.tasksQueueSize(workerNodeKey); i++) {
-        this.executeTask(
-          workerNodeKey,
-          this.dequeueTask(workerNodeKey) as Task<Data>
-        )
-      }
+    while (this.tasksQueueSize(workerNodeKey) > 0) {
+      this.executeTask(
+        workerNodeKey,
+        this.dequeueTask(workerNodeKey) as Task<Data>
+      )
     }
     this.workerNodes[workerNodeKey].tasksQueue.clear()
   }
index 379d350b9c095b69304bd591e87b0bf4457b74e2..93e4eda4c8e80aabfc1b13503e87ef4b0dc7173b 100644 (file)
@@ -132,6 +132,10 @@ export interface WorkerInfo {
    * Started flag.
    */
   started: boolean
+  /**
+   * Shared buffer.
+   */
+  readonly sharedBuffer?: Int32Array
 }
 
 /**
index 78bdc2149740b1acc02bff301f7f75bb64e7627b..753f59af309dfd1e83053f0e1496682f118bb7dd 100644 (file)
@@ -125,7 +125,7 @@ describe('Dynamic cluster pool test suite', () => {
       longRunningPool.execute()
     }
     expect(longRunningPool.workerNodes.length).toBe(max)
-    await sleep(1500)
+    await sleep(1000)
     // Here we expect the workerNodes to be at the max size since the task is still executing
     expect(longRunningPool.workerNodes.length).toBe(max)
     // We need to clean up the resources after our test
index 1de664463edf589f50c0a46ed58bb159209a37fd..65d162399ab798aad102f6978de8a37a5c36a015 100644 (file)
@@ -125,7 +125,7 @@ describe('Dynamic thread pool test suite', () => {
       longRunningPool.execute()
     }
     expect(longRunningPool.workerNodes.length).toBe(max)
-    await sleep(1500)
+    await sleep(1000)
     // Here we expect the workerNodes to be at the max size since the task is still executing
     expect(longRunningPool.workerNodes.length).toBe(max)
     // We need to clean up the resources after our test