*
* @param workerNodeKey - The worker node key.
*/
- protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
+ protected async destroyWorkerNode (workerNodeKey: number): Promise<void> {
+ this.flagWorkerNodeAsNotReady(workerNodeKey)
+ this.flushTasksQueue(workerNodeKey)
+ // FIXME: wait for tasks to be finished
+ const workerNode = this.workerNodes[workerNodeKey]
+ await this.sendKillMessageToWorker(workerNodeKey)
+ await workerNode.terminate()
+ }
/**
* Setup hook to execute code before worker nodes are created in the abstract constructor.
this.opts.errorHandler ?? EMPTY_FUNCTION
)
workerNode.registerWorkerEventHandler('error', (error: Error) => {
- const workerNodeKey = this.getWorkerNodeKeyByWorker(workerNode.worker)
- this.flagWorkerNodeAsNotReady(workerNodeKey)
- const workerInfo = this.getWorkerInfo(workerNodeKey)
+ workerNode.info.ready = false
this.emitter?.emit(PoolEvents.error, error)
- this.workerNodes[workerNodeKey].closeChannel()
if (
this.started &&
!this.starting &&
!this.destroying &&
this.opts.restartWorkerOnError === true
) {
- if (workerInfo.dynamic) {
+ if (workerNode.info.dynamic) {
this.createAndSetupDynamicWorkerNode()
} else {
this.createAndSetupWorkerNode()
}
}
if (this.started && this.opts.enableTasksQueue === true) {
- this.redistributeQueuedTasks(workerNodeKey)
+ this.redistributeQueuedTasks(this.workerNodes.indexOf(workerNode))
}
+ workerNode.terminate().catch(error => {
+ this.emitter?.emit(PoolEvents.error, error)
+ })
})
workerNode.registerWorkerEventHandler(
'exit',
this.opts.exitHandler ?? EMPTY_FUNCTION
)
workerNode.registerOnceWorkerEventHandler('exit', () => {
- this.removeWorkerNode(workerNode.worker)
+ this.removeWorkerNode(workerNode)
})
const workerNodeKey = this.addWorkerNode(workerNode)
this.afterWorkerNodeSetup(workerNodeKey)
}
/**
- * Removes the worker node associated to the give given worker from the pool worker nodes.
+ * Removes the worker node from the pool worker nodes.
*
- * @param worker - The worker.
+ * @param workerNode - The worker node.
*/
- private removeWorkerNode (worker: Worker): void {
- const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
+ private removeWorkerNode (workerNode: IWorkerNode<Worker, Data>): void {
+ const workerNodeKey = this.workerNodes.indexOf(workerNode)
if (workerNodeKey !== -1) {
this.workerNodes.splice(workerNodeKey, 1)
this.workerChoiceStrategyContext.remove(workerNodeKey)