checkFilePath,
checkValidTasksQueueOptions,
checkValidWorkerChoiceStrategy,
- updateMeasurementStatistics
+ updateMeasurementStatistics,
+ waitWorkerNodeEvents
} from './utils'
/**
*/
protected async destroyWorkerNode (workerNodeKey: number): Promise<void> {
this.flagWorkerNodeAsNotReady(workerNodeKey)
- this.flushTasksQueue(workerNodeKey)
- // FIXME: wait for tasks to be finished
+ const flushedTasks = this.flushTasksQueue(workerNodeKey)
const workerNode = this.workerNodes[workerNodeKey]
+ await waitWorkerNodeEvents(workerNode, 'taskFinished', flushedTasks)
await this.sendKillMessageToWorker(workerNodeKey)
await workerNode.terminate()
}
const promiseResponse = this.promiseResponseMap.get(taskId as string)
if (promiseResponse != null) {
const { resolve, reject, workerNodeKey, asyncResource } = promiseResponse
+ const workerNode = this.workerNodes[workerNodeKey]
if (workerError != null) {
this.emitter?.emit(PoolEvents.taskError, workerError)
asyncResource != null
this.afterTaskExecutionHook(workerNodeKey, message)
this.workerChoiceStrategyContext.update(workerNodeKey)
this.promiseResponseMap.delete(taskId as string)
+ workerNode.emit('taskFinished', taskId)
if (this.opts.enableTasksQueue === true) {
- const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
+ const workerNodeTasksUsage = workerNode.usage.tasks
if (
this.tasksQueueSize(workerNodeKey) > 0 &&
workerNodeTasksUsage.executing <
this.tasksQueueSize(workerNodeKey) === 0 &&
workerNodeTasksUsage.sequentiallyStolen === 0
) {
- this.workerNodes[workerNodeKey].emit('idleWorkerNode', {
+ workerNode.emit('idleWorkerNode', {
workerId: workerId as number,
workerNodeKey
})
return this.workerNodes[workerNodeKey].tasksQueueSize()
}
- protected flushTasksQueue (workerNodeKey: number): void {
+ protected flushTasksQueue (workerNodeKey: number): number {
+ let flushedTasks = 0
while (this.tasksQueueSize(workerNodeKey) > 0) {
this.executeTask(
workerNodeKey,
this.dequeueTask(workerNodeKey) as Task<Data>
)
+ ++flushedTasks
}
this.workerNodes[workerNodeKey].clearTasksQueue()
+ return flushedTasks
}
private flushTasksQueues (): void {