transferList?: TransferListItem[]
): void
- /**
- * Creates a new worker.
- *
- * @returns Newly created worker.
- */
- protected abstract createWorker (): Worker
-
/**
* Creates a new, completely set up worker node.
*
* @returns New, completely set up worker node key.
*/
protected createAndSetupWorkerNode (): number {
- const worker = this.createWorker()
-
- worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
- worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
- worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
- worker.on('error', error => {
- const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
+ const workerNode = this.createWorkerNode()
+ workerNode.registerWorkerEventHandler(
+ 'online',
+ this.opts.onlineHandler ?? EMPTY_FUNCTION
+ )
+ workerNode.registerWorkerEventHandler(
+ 'message',
+ this.opts.messageHandler ?? EMPTY_FUNCTION
+ )
+ workerNode.registerWorkerEventHandler(
+ 'error',
+ this.opts.errorHandler ?? EMPTY_FUNCTION
+ )
+ workerNode.registerWorkerEventHandler('error', (error: Error) => {
+ const workerNodeKey = this.getWorkerNodeKeyByWorker(workerNode.worker)
this.flagWorkerNodeAsNotReady(workerNodeKey)
const workerInfo = this.getWorkerInfo(workerNodeKey)
this.emitter?.emit(PoolEvents.error, error)
this.redistributeQueuedTasks(workerNodeKey)
}
})
- worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
- worker.once('exit', () => {
- this.removeWorkerNode(worker)
+ workerNode.registerWorkerEventHandler(
+ 'exit',
+ this.opts.exitHandler ?? EMPTY_FUNCTION
+ )
+ workerNode.registerOnceWorkerEventHandler('exit', () => {
+ this.removeWorkerNode(workerNode.worker)
})
-
- const workerNodeKey = this.addWorkerNode(worker)
-
+ const workerNodeKey = this.addWorkerNode(workerNode)
this.afterWorkerNodeSetup(workerNodeKey)
-
return workerNodeKey
}
}
/**
- * Adds the given worker in the pool worker nodes.
+ * Creates a worker node.
*
- * @param worker - The worker.
- * @returns The added worker node key.
- * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
+ * @returns The created worker node.
*/
- private addWorkerNode (worker: Worker): number {
+ private createWorkerNode (): IWorkerNode<Worker, Data> {
const workerNode = new WorkerNode<Worker, Data>(
- worker,
- this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
+ this.worker,
+ this.filePath,
+ {
+ env: this.opts.env,
+ workerOptions: this.opts.workerOptions,
+ tasksQueueBackPressureSize:
+ this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
+ }
)
// Flag the worker node as ready at pool startup.
if (this.starting) {
workerNode.info.ready = true
}
+ return workerNode
+ }
+
+ /**
+ * Adds the given worker node in the pool worker nodes.
+ *
+ * @param workerNode - The worker node.
+ * @returns The added worker node key.
+ * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
+ */
+ private addWorkerNode (workerNode: IWorkerNode<Worker, Data>): number {
this.workerNodes.push(workerNode)
- const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
+ const workerNodeKey = this.workerNodes.indexOf(workerNode)
if (workerNodeKey === -1) {
throw new Error('Worker added not found in worker nodes')
}
}
/**
- * Removes the given worker from the pool worker nodes.
+ * Removes the worker node associated to the give given worker from the pool worker nodes.
*
* @param worker - The worker.
*/