0
),
busyWorkerNodes: this.workerNodes.reduce(
- (accumulator, workerNode) =>
- workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
+ (accumulator, _workerNode, workerNodeKey) =>
+ this.isWorkerNodeBusy(workerNodeKey) ? accumulator + 1 : accumulator,
0
),
executedTasks: this.workerNodes.reduce(
)
}
+ private isWorkerNodeBusy (workerNodeKey: number): boolean {
+ if (this.opts.enableTasksQueue === true) {
+ return (
+ this.workerNodes[workerNodeKey].usage.tasks.executing >=
+ (this.opts.tasksQueueOptions?.concurrency as number)
+ )
+ }
+ return this.workerNodes[workerNodeKey].usage.tasks.executing > 0
+ }
+
private async sendTaskFunctionOperationToWorker (
workerNodeKey: number,
message: MessageValue<Data>
})
}
+ private handleTask (workerNodeKey: number, task: Task<Data>): void {
+ if (this.shallExecuteTask(workerNodeKey)) {
+ this.executeTask(workerNodeKey, task)
+ } else {
+ this.enqueueTask(workerNodeKey, task)
+ }
+ }
+
private redistributeQueuedTasks (workerNodeKey: number): void {
if (this.workerNodes.length <= 1) {
return
},
0
)
- const task = this.dequeueTask(workerNodeKey) as Task<Data>
- if (this.shallExecuteTask(destinationWorkerNodeKey)) {
- this.executeTask(destinationWorkerNodeKey, task)
- } else {
- this.enqueueTask(destinationWorkerNodeKey, task)
- }
+ this.handleTask(
+ destinationWorkerNodeKey,
+ this.dequeueTask(workerNodeKey) as Task<Data>
+ )
}
}
)
if (sourceWorkerNode != null) {
const task = sourceWorkerNode.popTask() as Task<Data>
- if (this.shallExecuteTask(workerNodeKey)) {
- this.executeTask(workerNodeKey, task)
- } else {
- this.enqueueTask(workerNodeKey, task)
- }
+ this.handleTask(workerNodeKey, task)
this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
this.updateTaskStolenStatisticsWorkerUsage(
workerNodeKey,
(this.opts.tasksQueueOptions?.size as number) - sizeOffset
) {
const task = sourceWorkerNode.popTask() as Task<Data>
- if (this.shallExecuteTask(workerNodeKey)) {
- this.executeTask(workerNodeKey, task)
- } else {
- this.enqueueTask(workerNodeKey, task)
- }
+ this.handleTask(workerNodeKey, task)
this.updateTaskStolenStatisticsWorkerUsage(
workerNodeKey,
task.name as string