checkFilePath,
checkValidTasksQueueOptions,
checkValidWorkerChoiceStrategy,
+ getDefaultTasksQueueOptions,
updateEluWorkerUsage,
updateRunTimeWorkerUsage,
updateTaskStatisticsWorkerUsage,
}
}
- /**
- * Gets the given worker its worker node key.
- *
- * @param worker - The worker.
- * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
- */
- private getWorkerNodeKeyByWorker (worker: Worker): number {
- return this.workerNodes.findIndex(
- workerNode => workerNode.worker === worker
- )
- }
-
/**
* Gets the worker node key given its worker id.
*
tasksQueueOptions: TasksQueueOptions
): TasksQueueOptions {
return {
- ...{
- size: Math.pow(this.maxSize, 2),
- concurrency: 1,
- taskStealing: true,
- tasksStealingOnBackPressure: true
- },
+ ...getDefaultTasksQueueOptions(this.maxSize),
...tasksQueueOptions
}
}
workerNodeKey: number
): Promise<void> {
await new Promise<void>((resolve, reject) => {
+ if (workerNodeKey < 0 || workerNodeKey >= this.workerNodes.length) {
+ reject(new Error(`Invalid worker node key '${workerNodeKey}'`))
+ return
+ }
const killMessageListener = (message: MessageValue<Response>): void => {
this.checkMessageWorkerId(message)
if (message.kill === 'success') {
this.flagWorkerNodeAsNotReady(workerNodeKey)
const flushedTasks = this.flushTasksQueue(workerNodeKey)
const workerNode = this.workerNodes[workerNodeKey]
- await waitWorkerNodeEvents(workerNode, 'taskFinished', flushedTasks)
+ await waitWorkerNodeEvents(
+ workerNode,
+ 'taskFinished',
+ flushedTasks,
+ this.opts.tasksQueueOptions?.tasksFinishedTimeout ??
+ getDefaultTasksQueueOptions(this.maxSize).tasksFinishedTimeout
+ )
await this.sendKillMessageToWorker(workerNodeKey)
await workerNode.terminate()
}
if (this.started && this.opts.enableTasksQueue === true) {
this.redistributeQueuedTasks(this.workerNodes.indexOf(workerNode))
}
- workerNode.terminate().catch(error => {
+ workerNode?.terminate().catch(error => {
this.emitter?.emit(PoolEvents.error, error)
})
})
}
private redistributeQueuedTasks (workerNodeKey: number): void {
+ if (workerNodeKey === -1) {
+ return
+ }
if (this.workerNodes.length <= 1) {
return
}
this.afterTaskExecutionHook(workerNodeKey, message)
this.promiseResponseMap.delete(taskId as string)
workerNode?.emit('taskFinished', taskId)
- if (this.opts.enableTasksQueue === true) {
+ if (this.opts.enableTasksQueue === true && !this.destroying) {
const workerNodeTasksUsage = workerNode.usage.tasks
if (
this.tasksQueueSize(workerNodeKey) > 0 &&
env: this.opts.env,
workerOptions: this.opts.workerOptions,
tasksQueueBackPressureSize:
- this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
+ this.opts.tasksQueueOptions?.size ??
+ getDefaultTasksQueueOptions(this.maxSize).size
}
)
// Flag the worker node as ready at pool startup.