refactor: emit worker node event at task end
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Sun, 17 Dec 2023 18:58:13 +0000 (19:58 +0100)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Sun, 17 Dec 2023 18:58:13 +0000 (19:58 +0100)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
src/pools/abstract-pool.ts
src/pools/utils.ts
tests/test-utils.js

index 6b95be9482664deaa1131eced2ccbe3ccb48ca8a..8319a33eca58ec7954f987ef36289580622a8410 100644 (file)
@@ -57,6 +57,7 @@ import {
   checkValidTasksQueueOptions,
   checkValidWorkerChoiceStrategy,
   updateMeasurementStatistics
+  // waitWorkerNodeEvents
 } from './utils'
 
 /**
@@ -1046,8 +1047,13 @@ export abstract class AbstractPool<
   protected async destroyWorkerNode (workerNodeKey: number): Promise<void> {
     this.flagWorkerNodeAsNotReady(workerNodeKey)
     this.flushTasksQueue(workerNodeKey)
-    // FIXME: wait for tasks to be finished
     const workerNode = this.workerNodes[workerNodeKey]
+    // FIXME: wait for tasks to be finished
+    // await waitWorkerNodeEvents(
+    //   workerNode,
+    //   'taskFinished',
+    //   workerNode.usage.tasks.executing
+    // )
     await this.sendKillMessageToWorker(workerNodeKey)
     await workerNode.terminate()
   }
@@ -1757,6 +1763,7 @@ export abstract class AbstractPool<
       this.afterTaskExecutionHook(workerNodeKey, message)
       this.workerChoiceStrategyContext.update(workerNodeKey)
       this.promiseResponseMap.delete(taskId as string)
+      this.workerNodes[workerNodeKey].emit('taskFinished', taskId)
       if (this.opts.enableTasksQueue === true) {
         const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
         if (
index db8ef7b7cc5991ea71a3dc0a2b163ac9d7a438c2..5e21c38c0af657dab99aca255e80f0e39888b6a6 100644 (file)
@@ -10,6 +10,7 @@ import {
 import type { TasksQueueOptions } from './pool'
 import {
   type IWorker,
+  type IWorkerNode,
   type MeasurementStatistics,
   type WorkerNodeOptions,
   type WorkerType,
@@ -203,3 +204,26 @@ export const createWorker = <Worker extends IWorker>(
       throw new Error(`Unknown worker type '${type}'`)
   }
 }
+
+export const waitWorkerNodeEvents = async <
+  Worker extends IWorker,
+  Data = unknown
+>(
+  workerNode: IWorkerNode<Worker, Data>,
+  workerNodeEvent: string,
+  numberOfEventsToWait: number
+): Promise<number> => {
+  return await new Promise<number>(resolve => {
+    let events = 0
+    if (numberOfEventsToWait === 0) {
+      resolve(events)
+      return
+    }
+    workerNode.on(workerNodeEvent, () => {
+      ++events
+      if (events === numberOfEventsToWait) {
+        resolve(events)
+      }
+    })
+  })
+}
index 3963adecdba419b1d9d09437b08312ed02f631f9..ef759824ce7e9b4d0294d84f2fa6dff02aba493a 100644 (file)
@@ -5,6 +5,7 @@ const waitWorkerEvents = async (pool, workerEvent, numberOfEventsToWait) => {
     let events = 0
     if (numberOfEventsToWait === 0) {
       resolve(events)
+      return
     }
     for (const workerNode of pool.workerNodes) {
       workerNode.worker.on(workerEvent, () => {
@@ -22,6 +23,7 @@ const waitPoolEvents = async (pool, poolEvent, numberOfEventsToWait) => {
     let events = 0
     if (numberOfEventsToWait === 0) {
       resolve(events)
+      return
     }
     pool.emitter?.on(poolEvent, () => {
       ++events