public readonly emitter?: PoolEmitter
/**
- * The promise response map.
+ * The execution response promise map.
*
* - `key`: The message id of each submitted task.
- * - `value`: An object that contains the worker, the promise resolve and reject callbacks.
+ * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
*
- * When we receive a message from the worker we get a map entry with the promise resolve/reject bound to the message.
+ * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
*/
protected promiseResponseMap: Map<
string,
): void {
this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
this.opts.workerChoiceStrategy = workerChoiceStrategy
- for (const [index, workerNode] of this.workerNodes.entries()) {
- this.setWorkerNode(
- index,
- workerNode.worker,
- {
- run: 0,
- running: 0,
- runTime: 0,
- runTimeHistory: new CircularArray(),
- avgRunTime: 0,
- medRunTime: 0,
- error: 0
- },
- workerNode.tasksQueue
- )
+ for (const workerNode of this.workerNodes) {
+ this.setWorkerNodeTasksUsage(workerNode, {
+ run: 0,
+ running: 0,
+ runTime: 0,
+ runTimeHistory: new CircularArray(),
+ avgRunTime: 0,
+ medRunTime: 0,
+ error: 0
+ })
}
this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
workerChoiceStrategy
id: crypto.randomUUID()
}
const res = this.internalExecute(workerNodeKey, workerNode, submittedTask)
- let currentTask: Task<Data> = submittedTask
if (
this.opts.enableTasksQueue === true &&
- (this.busy || this.tasksQueueSize(workerNodeKey) > 0)
+ (this.busy || this.workerNodes[workerNodeKey].tasksUsage.running > 0)
) {
- currentTask = this.enqueueDequeueTask(
- workerNodeKey,
- submittedTask
- ) as Task<Data>
+ this.enqueueTask(workerNodeKey, submittedTask)
+ } else {
+ this.sendToWorker(workerNode.worker, submittedTask)
}
- this.sendToWorker(workerNode.worker, currentTask)
this.checkAndEmitEvents()
// eslint-disable-next-line @typescript-eslint/return-await
return res
protected workerListener (): (message: MessageValue<Response>) => void {
return message => {
if (message.id != null) {
- // Task response received
+ // Task execution response received
const promiseResponse = this.promiseResponseMap.get(message.id)
if (promiseResponse != null) {
if (message.error != null) {
}
}
+ /**
+ * Sets the given worker node its tasks usage in the pool.
+ *
+ * @param workerNode - The worker node.
+ * @param tasksUsage - The worker node tasks usage.
+ */
+ private setWorkerNodeTasksUsage (
+ workerNode: WorkerNode<Worker, Data>,
+ tasksUsage: TasksUsage
+ ): void {
+ workerNode.tasksUsage = tasksUsage
+ }
+
/**
* Gets the given worker its tasks usage in the pool.
*
this.workerChoiceStrategyContext.remove(workerNodeKey)
}
- private enqueueDequeueTask (
- workerNodeKey: number,
- task: Task<Data>
- ): Task<Data> | undefined {
- this.enqueueTask(workerNodeKey, task)
- return this.dequeueTask(workerNodeKey)
- }
-
private enqueueTask (workerNodeKey: number, task: Task<Data>): void {
this.workerNodes[workerNodeKey].tasksQueue.push(task)
}