public readonly emitter?: PoolEmitter
/**
- * The execution response promise map.
+ * The task execution response promise map.
*
* - `key`: The message id of each submitted task.
* - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
* @param worker - The worker.
* @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
*/
- private getWorkerNodeKey (worker: Worker): number {
+ protected getWorkerNodeKey (worker: Worker): number {
return this.workerNodes.findIndex(
workerNode => workerNode.worker === worker
)
/** @inheritDoc */
public async execute (data?: Data, name?: string): Promise<Response> {
- const timestamp = performance.now()
- const workerNodeKey = this.chooseWorkerNode()
- const submittedTask: Task<Data> = {
- name: name ?? DEFAULT_TASK_NAME,
- // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
- data: data ?? ({} as Data),
- timestamp,
- workerId: this.getWorkerInfo(workerNodeKey).id as number,
- id: randomUUID()
- }
- const res = new Promise<Response>((resolve, reject) => {
+ return await new Promise<Response>((resolve, reject) => {
+ const timestamp = performance.now()
+ const workerNodeKey = this.chooseWorkerNode()
+ const submittedTask: Task<Data> = {
+ name: name ?? DEFAULT_TASK_NAME,
+ // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
+ data: data ?? ({} as Data),
+ timestamp,
+ workerId: this.getWorkerInfo(workerNodeKey).id as number,
+ id: randomUUID()
+ }
this.promiseResponseMap.set(submittedTask.id as string, {
resolve,
reject,
worker: this.workerNodes[workerNodeKey].worker
})
+ if (
+ this.opts.enableTasksQueue === true &&
+ (this.busy ||
+ this.workerNodes[workerNodeKey].usage.tasks.executing >=
+ ((this.opts.tasksQueueOptions as TasksQueueOptions)
+ .concurrency as number))
+ ) {
+ this.enqueueTask(workerNodeKey, submittedTask)
+ } else {
+ this.executeTask(workerNodeKey, submittedTask)
+ }
+ this.checkAndEmitEvents()
})
- if (
- this.opts.enableTasksQueue === true &&
- (this.busy ||
- this.workerNodes[workerNodeKey].usage.tasks.executing >=
- ((this.opts.tasksQueueOptions as TasksQueueOptions)
- .concurrency as number))
- ) {
- this.enqueueTask(workerNodeKey, submittedTask)
- } else {
- this.executeTask(workerNodeKey, submittedTask)
- }
- this.checkAndEmitEvents()
- // eslint-disable-next-line @typescript-eslint/return-await
- return res
}
/** @inheritDoc */
const workerNodeKey = this.getWorkerNodeKey(worker)
const workerInfo = this.getWorkerInfo(workerNodeKey)
workerInfo.ready = false
+ this.workerNodes[workerNodeKey].closeChannel()
this.emitter?.emit(PoolEvents.error, error)
if (this.opts.restartWorkerOnError === true && !this.starting) {
if (workerInfo.dynamic) {
worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
worker.once('exit', () => {
- const workerInfo = this.getWorkerInfoByWorker(worker)
- if (workerInfo.messageChannel != null) {
- workerInfo.messageChannel?.port1.close()
- workerInfo.messageChannel?.port1.close()
- }
this.removeWorkerNode(worker)
})
* Gets the worker information from the given worker node key.
*
* @param workerNodeKey - The worker node key.
+ * @returns The worker information.
*/
private getWorkerInfo (workerNodeKey: number): WorkerInfo {
return this.workerNodes[workerNodeKey].info
* Gets the worker information from the given worker.
*
* @param worker - The worker.
+ * @returns The worker information.
+ * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker is not found.
*/
protected getWorkerInfoByWorker (worker: Worker): WorkerInfo {
- return this.workerNodes[this.getWorkerNodeKey(worker)].info
+ const workerNodeKey = this.getWorkerNodeKey(worker)
+ if (workerNodeKey === -1) {
+ throw new Error('Worker not found')
+ }
+ return this.workerNodes[workerNodeKey].info
}
/**