*
* When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
*/
- protected promiseResponseMap: Map<
- string,
- PromiseResponseWrapper<Worker, Response>
- > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
+ protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
+ new Map<string, PromiseResponseWrapper<Response>>()
/**
* Worker choice strategy context referencing a worker choice algorithm implementation.
return await new Promise<Response>((resolve, reject) => {
const timestamp = performance.now()
const workerNodeKey = this.chooseWorkerNode()
- const submittedTask: Task<Data> = {
+ const task: Task<Data> = {
name: name ?? DEFAULT_TASK_NAME,
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
data: data ?? ({} as Data),
workerId: this.getWorkerInfo(workerNodeKey).id as number,
id: randomUUID()
}
- this.promiseResponseMap.set(submittedTask.id as string, {
+ this.promiseResponseMap.set(task.id as string, {
resolve,
reject,
- worker: this.workerNodes[workerNodeKey].worker
+ workerNodeKey
})
if (
this.opts.enableTasksQueue === true &&
((this.opts.tasksQueueOptions as TasksQueueOptions)
.concurrency as number))
) {
- this.enqueueTask(workerNodeKey, submittedTask)
+ this.enqueueTask(workerNodeKey, task)
} else {
- this.executeTask(workerNodeKey, submittedTask)
+ this.executeTask(workerNodeKey, task)
}
this.checkAndEmitEvents()
})
* Hook executed after the worker task execution.
* Can be overridden.
*
- * @param worker - The worker.
+ * @param workerNodeKey - The worker node key.
* @param message - The received message.
*/
protected afterTaskExecutionHook (
- worker: Worker,
+ workerNodeKey: number,
message: MessageValue<Response>
): void {
- const workerNodeKey = this.getWorkerNodeKey(worker)
const workerUsage = this.workerNodes[workerNodeKey].usage
this.updateTaskStatisticsWorkerUsage(workerUsage, message)
this.updateRunTimeWorkerUsage(workerUsage, message)
} else {
promiseResponse.resolve(message.data as Response)
}
- this.afterTaskExecutionHook(promiseResponse.worker, message)
+ const workerNodeKey = promiseResponse.workerNodeKey
+ this.afterTaskExecutionHook(workerNodeKey, message)
this.promiseResponseMap.delete(message.id as string)
- const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
if (
this.opts.enableTasksQueue === true &&
this.tasksQueueSize(workerNodeKey) > 0
import type { EventLoopUtilization } from 'node:perf_hooks'
import type { MessagePort } from 'node:worker_threads'
import type { KillBehavior } from './worker/worker-options'
-import type { IWorker } from './pools/worker'
/**
* Task error.
/**
* An object holding the execution response promise resolve/reject callbacks.
*
- * @typeParam Worker - Type of worker.
* @typeParam Response - Type of execution response. This can only be structured-cloneable data.
* @internal
*/
-export interface PromiseResponseWrapper<
- Worker extends IWorker,
- Response = unknown
-> {
+export interface PromiseResponseWrapper<Response = unknown> {
/**
* Resolve callback to fulfill the promise.
*/
*/
readonly reject: (reason?: unknown) => void
/**
- * The worker handling the execution.
+ * The worker node key handling the execution.
*/
- readonly worker: Worker
+ readonly workerNodeKey: number
}