*
* @param workerNodeKey - The worker node key.
*/
- protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
+ protected async destroyWorkerNode (workerNodeKey: number): Promise<void> {
+ this.flagWorkerNodeAsNotReady(workerNodeKey)
+ this.flushTasksQueue(workerNodeKey)
+ // FIXME: wait for tasks to be finished
+ const workerNode = this.workerNodes[workerNodeKey]
+ await this.sendKillMessageToWorker(workerNodeKey)
+ await workerNode.terminate()
+ }
/**
* Setup hook to execute code before worker nodes are created in the abstract constructor.
)
workerNode.registerWorkerEventHandler('error', (error: Error) => {
const workerNodeKey = this.getWorkerNodeKeyByWorker(workerNode.worker)
- this.flagWorkerNodeAsNotReady(workerNodeKey)
- const workerInfo = this.getWorkerInfo(workerNodeKey)
+ workerNode.info.ready = false
this.emitter?.emit(PoolEvents.error, error)
- this.workerNodes[workerNodeKey].closeChannel()
if (
this.started &&
!this.starting &&
!this.destroying &&
this.opts.restartWorkerOnError === true
) {
- if (workerInfo.dynamic) {
+ if (workerNode.info.dynamic) {
this.createAndSetupDynamicWorkerNode()
} else {
this.createAndSetupWorkerNode()
if (this.started && this.opts.enableTasksQueue === true) {
this.redistributeQueuedTasks(workerNodeKey)
}
+ workerNode.terminate().catch(error => {
+ this.emitter?.emit(PoolEvents.error, error)
+ })
})
workerNode.registerWorkerEventHandler(
'exit',
}
/**
- * Removes the worker node associated to the give given worker from the pool worker nodes.
+ * Removes the worker node associated to the given worker from the pool worker nodes.
*
* @param worker - The worker.
*/
return cluster.isPrimary
}
- /** @inheritDoc */
- protected async destroyWorkerNode (workerNodeKey: number): Promise<void> {
- this.flagWorkerNodeAsNotReady(workerNodeKey)
- this.flushTasksQueue(workerNodeKey)
- // FIXME: wait for tasks to be finished
- const workerNode = this.workerNodes[workerNodeKey]
- const waitWorkerExit = new Promise<void>(resolve => {
- workerNode.registerOnceWorkerEventHandler('exit', () => {
- resolve()
- })
- })
- workerNode.registerOnceWorkerEventHandler('disconnect', () => {
- workerNode.worker.kill()
- })
- await this.sendKillMessageToWorker(workerNodeKey)
- workerNode.removeAllListeners()
- workerNode.worker.disconnect()
- await waitWorkerExit
- }
-
/** @inheritDoc */
protected sendToWorker (
workerNodeKey: number,
return isMainThread
}
- /** @inheritDoc */
- protected async destroyWorkerNode (workerNodeKey: number): Promise<void> {
- this.flagWorkerNodeAsNotReady(workerNodeKey)
- this.flushTasksQueue(workerNodeKey)
- // FIXME: wait for tasks to be finished
- const workerNode = this.workerNodes[workerNodeKey]
- const waitWorkerExit = new Promise<void>(resolve => {
- workerNode.registerOnceWorkerEventHandler('exit', () => {
- resolve()
- })
- })
- await this.sendKillMessageToWorker(workerNodeKey)
- workerNode.closeChannel()
- workerNode.removeAllListeners()
- await workerNode.worker.terminate()
- await waitWorkerExit
- }
-
/** @inheritDoc */
protected sendToWorker (
workerNodeKey: number,
}
/** @inheritdoc */
- public closeChannel (): void {
- if (this.messageChannel != null) {
- this.messageChannel.port1.unref()
- this.messageChannel.port2.unref()
- this.messageChannel.port1.close()
- this.messageChannel.port2.close()
- delete this.messageChannel
+ public async terminate (): Promise<void> {
+ const waitWorkerExit = new Promise<void>(resolve => {
+ this.registerOnceWorkerEventHandler('exit', () => {
+ resolve()
+ })
+ })
+ this.closeMessageChannel()
+ this.removeAllListeners()
+ if (this.info.type === WorkerTypes.thread) {
+ await this.worker.terminate?.()
+ } else if (this.info.type === WorkerTypes.cluster) {
+ this.registerOnceWorkerEventHandler('disconnect', () => {
+ this.worker.kill?.()
+ })
+ this.worker.disconnect?.()
}
+ await waitWorkerExit
}
/** @inheritdoc */
return this.taskFunctionsUsage.delete(name)
}
+ private closeMessageChannel (): void {
+ if (this.messageChannel != null) {
+ this.messageChannel.port1.unref()
+ this.messageChannel.port2.unref()
+ this.messageChannel.port1.close()
+ this.messageChannel.port2.close()
+ delete this.messageChannel
+ }
+ }
+
private initWorkerInfo (worker: Worker): WorkerInfo {
return {
id: getWorkerId(worker),
*/
export interface IWorker {
/**
- * Worker id.
+ * Cluster worker id.
*/
readonly id?: number
+ /**
+ * Worker thread worker id.
+ */
readonly threadId?: number
/**
* Registers an event listener.
| ErrorHandler<this>
| ExitHandler<this>
) => void
+ /**
+ * Stop all JavaScript execution in the worker thread as soon as possible.
+ * Returns a Promise for the exit code that is fulfilled when the `'exit' event` is emitted.
+ */
+ readonly terminate?: () => Promise<number>
+ /**
+ * Cluster worker disconnect.
+ */
+ readonly disconnect?: () => void
+ /**
+ * Cluster worker kill.
+ */
+ readonly kill?: (signal?: string) => void
}
/**
*/
strategyData?: StrategyData
/**
- * Message channel (worker_threads only).
+ * Message channel (worker thread only).
*/
readonly messageChannel?: MessageChannel
/**
*/
readonly resetUsage: () => void
/**
- * Closes communication channel.
+ * Terminates the worker node.
*/
- readonly closeChannel: () => void
+ readonly terminate: () => Promise<void>
/**
* Registers a worker event handler.
*