From d41a44de8cc111add35f7daa7834e23055bce558 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Sun, 17 Dec 2023 19:58:13 +0100 Subject: [PATCH] refactor: emit worker node event at task end MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- src/pools/abstract-pool.ts | 9 ++++++++- src/pools/utils.ts | 24 ++++++++++++++++++++++++ tests/test-utils.js | 2 ++ 3 files changed, 34 insertions(+), 1 deletion(-) diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 6b95be94..8319a33e 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -57,6 +57,7 @@ import { checkValidTasksQueueOptions, checkValidWorkerChoiceStrategy, updateMeasurementStatistics + // waitWorkerNodeEvents } from './utils' /** @@ -1046,8 +1047,13 @@ export abstract class AbstractPool< protected async destroyWorkerNode (workerNodeKey: number): Promise { this.flagWorkerNodeAsNotReady(workerNodeKey) this.flushTasksQueue(workerNodeKey) - // FIXME: wait for tasks to be finished const workerNode = this.workerNodes[workerNodeKey] + // FIXME: wait for tasks to be finished + // await waitWorkerNodeEvents( + // workerNode, + // 'taskFinished', + // workerNode.usage.tasks.executing + // ) await this.sendKillMessageToWorker(workerNodeKey) await workerNode.terminate() } @@ -1757,6 +1763,7 @@ export abstract class AbstractPool< this.afterTaskExecutionHook(workerNodeKey, message) this.workerChoiceStrategyContext.update(workerNodeKey) this.promiseResponseMap.delete(taskId as string) + this.workerNodes[workerNodeKey].emit('taskFinished', taskId) if (this.opts.enableTasksQueue === true) { const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks if ( diff --git a/src/pools/utils.ts b/src/pools/utils.ts index db8ef7b7..5e21c38c 100644 --- a/src/pools/utils.ts +++ b/src/pools/utils.ts @@ -10,6 +10,7 @@ import { import type { TasksQueueOptions } from './pool' import { type IWorker, + type IWorkerNode, type MeasurementStatistics, type WorkerNodeOptions, type WorkerType, @@ -203,3 +204,26 @@ export const createWorker = ( throw new Error(`Unknown worker type '${type}'`) } } + +export const waitWorkerNodeEvents = async < + Worker extends IWorker, + Data = unknown +>( + workerNode: IWorkerNode, + workerNodeEvent: string, + numberOfEventsToWait: number +): Promise => { + return await new Promise(resolve => { + let events = 0 + if (numberOfEventsToWait === 0) { + resolve(events) + return + } + workerNode.on(workerNodeEvent, () => { + ++events + if (events === numberOfEventsToWait) { + resolve(events) + } + }) + }) +} diff --git a/tests/test-utils.js b/tests/test-utils.js index 3963adec..ef759824 100644 --- a/tests/test-utils.js +++ b/tests/test-utils.js @@ -5,6 +5,7 @@ const waitWorkerEvents = async (pool, workerEvent, numberOfEventsToWait) => { let events = 0 if (numberOfEventsToWait === 0) { resolve(events) + return } for (const workerNode of pool.workerNodes) { workerNode.worker.on(workerEvent, () => { @@ -22,6 +23,7 @@ const waitPoolEvents = async (pool, poolEvent, numberOfEventsToWait) => { let events = 0 if (numberOfEventsToWait === 0) { resolve(events) + return } pool.emitter?.on(poolEvent, () => { ++events -- 2.34.1