}
private redistributeQueuedTasks (workerNodeKey: number): void {
+ const workerNodes = this.workerNodes.filter(
+ (_, workerNodeId) => workerNodeId !== workerNodeKey
+ )
while (this.tasksQueueSize(workerNodeKey) > 0) {
let targetWorkerNodeKey: number = workerNodeKey
let minQueuedTasks = Infinity
let executeTask = false
- for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
+ for (const [workerNodeId, workerNode] of workerNodes.entries()) {
if (
this.workerNodes[workerNodeId].usage.tasks.executing <
(this.opts.tasksQueueOptions?.concurrency as number)
) {
executeTask = true
}
- if (
- workerNodeId !== workerNodeKey &&
- workerNode.info.ready &&
- workerNode.usage.tasks.queued === 0
- ) {
+ if (workerNode.info.ready && workerNode.usage.tasks.queued === 0) {
targetWorkerNodeKey = workerNodeId
break
}
if (
- workerNodeId !== workerNodeKey &&
workerNode.info.ready &&
workerNode.usage.tasks.queued < minQueuedTasks
) {
if (
workerNode.info.ready &&
sourceWorkerNode.usage.tasks.queued > 0 &&
- !workerNode.hasBackPressure() &&
- workerNode.usage.tasks.executing <
- (this.opts.tasksQueueOptions?.concurrency as number)
+ !workerNode.hasBackPressure()
) {
- this.executeTask(
- workerNodeKey,
- sourceWorkerNode.popTask() as Task<Data>
- )
- } else if (
- workerNode.info.ready &&
- sourceWorkerNode.usage.tasks.queued > 0 &&
- !workerNode.hasBackPressure() &&
- workerNode.usage.tasks.executing >=
+ if (
+ workerNode.usage.tasks.executing <
(this.opts.tasksQueueOptions?.concurrency as number)
- ) {
- this.enqueueTask(
- workerNodeKey,
- sourceWorkerNode.popTask() as Task<Data>
- )
+ ) {
+ this.executeTask(
+ workerNodeKey,
+ sourceWorkerNode.popTask() as Task<Data>
+ )
+ } else {
+ this.enqueueTask(
+ workerNodeKey,
+ sourceWorkerNode.popTask() as Task<Data>
+ )
+ }
}
}
}