checkValidTasksQueueOptions,
checkValidWorkerChoiceStrategy,
updateMeasurementStatistics
+ // waitWorkerNodeEvents
} from './utils'
/**
protected async destroyWorkerNode (workerNodeKey: number): Promise<void> {
this.flagWorkerNodeAsNotReady(workerNodeKey)
this.flushTasksQueue(workerNodeKey)
- // FIXME: wait for tasks to be finished
const workerNode = this.workerNodes[workerNodeKey]
+ // FIXME: wait for tasks to be finished
+ // await waitWorkerNodeEvents(
+ // workerNode,
+ // 'taskFinished',
+ // workerNode.usage.tasks.executing
+ // )
await this.sendKillMessageToWorker(workerNodeKey)
await workerNode.terminate()
}
this.afterTaskExecutionHook(workerNodeKey, message)
this.workerChoiceStrategyContext.update(workerNodeKey)
this.promiseResponseMap.delete(taskId as string)
+ this.workerNodes[workerNodeKey].emit('taskFinished', taskId)
if (this.opts.enableTasksQueue === true) {
const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
if (
import type { TasksQueueOptions } from './pool'
import {
type IWorker,
+ type IWorkerNode,
type MeasurementStatistics,
type WorkerNodeOptions,
type WorkerType,
throw new Error(`Unknown worker type '${type}'`)
}
}
+
+export const waitWorkerNodeEvents = async <
+ Worker extends IWorker,
+ Data = unknown
+>(
+ workerNode: IWorkerNode<Worker, Data>,
+ workerNodeEvent: string,
+ numberOfEventsToWait: number
+): Promise<number> => {
+ return await new Promise<number>(resolve => {
+ let events = 0
+ if (numberOfEventsToWait === 0) {
+ resolve(events)
+ return
+ }
+ workerNode.on(workerNodeEvent, () => {
+ ++events
+ if (events === numberOfEventsToWait) {
+ resolve(events)
+ }
+ })
+ })
+}
let events = 0
if (numberOfEventsToWait === 0) {
resolve(events)
+ return
}
for (const workerNode of pool.workerNodes) {
workerNode.worker.on(workerEvent, () => {
let events = 0
if (numberOfEventsToWait === 0) {
resolve(events)
+ return
}
pool.emitter?.on(poolEvent, () => {
++events