From 87347ea858fc66b283f747102113d82204613a52 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Sun, 17 Dec 2023 20:32:16 +0100 Subject: [PATCH] fix: wait for queued tasks to end at worker termination MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- CHANGELOG.md | 4 ++++ src/pools/abstract-pool.ts | 18 ++++++++---------- 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 107be86a..642e0d52 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed + +- Wait for queued tasks to end at worker termination. + ## [3.1.1] - 2023-12-16 ### Fixed diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 8319a33e..beacddf3 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -56,8 +56,8 @@ import { checkFilePath, checkValidTasksQueueOptions, checkValidWorkerChoiceStrategy, - updateMeasurementStatistics - // waitWorkerNodeEvents + updateMeasurementStatistics, + waitWorkerNodeEvents } from './utils' /** @@ -1046,14 +1046,9 @@ export abstract class AbstractPool< */ protected async destroyWorkerNode (workerNodeKey: number): Promise { this.flagWorkerNodeAsNotReady(workerNodeKey) - this.flushTasksQueue(workerNodeKey) + const flushedTasks = this.flushTasksQueue(workerNodeKey) const workerNode = this.workerNodes[workerNodeKey] - // FIXME: wait for tasks to be finished - // await waitWorkerNodeEvents( - // workerNode, - // 'taskFinished', - // workerNode.usage.tasks.executing - // ) + await waitWorkerNodeEvents(workerNode, 'taskFinished', flushedTasks) await this.sendKillMessageToWorker(workerNodeKey) await workerNode.terminate() } @@ -1919,14 +1914,17 @@ export abstract class AbstractPool< return this.workerNodes[workerNodeKey].tasksQueueSize() } - protected flushTasksQueue (workerNodeKey: number): void { + protected flushTasksQueue (workerNodeKey: number): number { + let flushedTasks = 0 while (this.tasksQueueSize(workerNodeKey) > 0) { this.executeTask( workerNodeKey, this.dequeueTask(workerNodeKey) as Task ) + ++flushedTasks } this.workerNodes[workerNodeKey].clearTasksQueue() + return flushedTasks } private flushTasksQueues (): void { -- 2.34.1