From 920278a2effe40d24c8832ccee0110f0bab1db19 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Mon, 3 Jul 2023 16:55:46 +0200 Subject: [PATCH] fix: wait for worker exit at pool destroy MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- README.md | 2 +- src/pools/abstract-pool.ts | 18 +++++++++++------- src/pools/worker.ts | 4 ++++ tests/pools/cluster/dynamic.test.js | 2 +- tests/pools/thread/dynamic.test.js | 2 +- 5 files changed, 18 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index 53ea832d..3001f098 100644 --- a/README.md +++ b/README.md @@ -244,7 +244,7 @@ This method will call the terminate method on each worker. - `maxInactiveTime` (optional) - Max time to wait tasks to work on in milliseconds, after this period the new worker will die. The last active time of your worker unit will be updated when a task is submitted to a worker or when a worker terminate a task. - If `killBehavior` is set to `KillBehaviors.HARD` this value represents also the timeout for the tasks that you submit to the pool, when this timeout expires your tasks is interrupted and the worker is killed if is not part of the minimum size of the pool. + If `killBehavior` is set to `KillBehaviors.HARD` this value represents also the timeout for the tasks that you submit to the pool, when this timeout expires your tasks is interrupted before completion and removed. The worker is killed if is not part of the minimum size of the pool. If `killBehavior` is set to `KillBehaviors.SOFT` your tasks have no timeout and your workers will not be terminated until your task is completed. Default: `60000` diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 232b0cc1..e9be3476 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -505,7 +505,13 @@ export abstract class AbstractPool< this.workerNodes.map(async (workerNode, workerNodeKey) => { this.flushTasksQueue(workerNodeKey) // FIXME: wait for tasks to be finished + const workerExitPromise = new Promise(resolve => { + workerNode.worker.on('exit', () => { + resolve() + }) + }) await this.destroyWorker(workerNode.worker) + await workerExitPromise }) ) } @@ -987,13 +993,11 @@ export abstract class AbstractPool< } private flushTasksQueue (workerNodeKey: number): void { - if (this.tasksQueueSize(workerNodeKey) > 0) { - for (let i = 0; i < this.tasksQueueSize(workerNodeKey); i++) { - this.executeTask( - workerNodeKey, - this.dequeueTask(workerNodeKey) as Task - ) - } + while (this.tasksQueueSize(workerNodeKey) > 0) { + this.executeTask( + workerNodeKey, + this.dequeueTask(workerNodeKey) as Task + ) } this.workerNodes[workerNodeKey].tasksQueue.clear() } diff --git a/src/pools/worker.ts b/src/pools/worker.ts index 379d350b..93e4eda4 100644 --- a/src/pools/worker.ts +++ b/src/pools/worker.ts @@ -132,6 +132,10 @@ export interface WorkerInfo { * Started flag. */ started: boolean + /** + * Shared buffer. + */ + readonly sharedBuffer?: Int32Array } /** diff --git a/tests/pools/cluster/dynamic.test.js b/tests/pools/cluster/dynamic.test.js index 78bdc214..753f59af 100644 --- a/tests/pools/cluster/dynamic.test.js +++ b/tests/pools/cluster/dynamic.test.js @@ -125,7 +125,7 @@ describe('Dynamic cluster pool test suite', () => { longRunningPool.execute() } expect(longRunningPool.workerNodes.length).toBe(max) - await sleep(1500) + await sleep(1000) // Here we expect the workerNodes to be at the max size since the task is still executing expect(longRunningPool.workerNodes.length).toBe(max) // We need to clean up the resources after our test diff --git a/tests/pools/thread/dynamic.test.js b/tests/pools/thread/dynamic.test.js index 1de66446..65d16239 100644 --- a/tests/pools/thread/dynamic.test.js +++ b/tests/pools/thread/dynamic.test.js @@ -125,7 +125,7 @@ describe('Dynamic thread pool test suite', () => { longRunningPool.execute() } expect(longRunningPool.workerNodes.length).toBe(max) - await sleep(1500) + await sleep(1000) // Here we expect the workerNodes to be at the max size since the task is still executing expect(longRunningPool.workerNodes.length).toBe(max) // We need to clean up the resources after our test -- 2.34.1