private redistributeQueuedTasks (workerNodeKey: number): void {
while (this.tasksQueueSize(workerNodeKey) > 0) {
- let destinationWorkerNodeKey: number = workerNodeKey
+ let destinationWorkerNodeKey!: number
let minQueuedTasks = Infinity
let executeTask = false
for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
}
}
}
- const task = {
- ...(this.dequeueTask(workerNodeKey) as Task<Data>),
- workerId: (this.getWorkerInfo(destinationWorkerNodeKey) as WorkerInfo)
- .id as number
- }
- if (executeTask) {
- this.executeTask(destinationWorkerNodeKey, task)
- } else {
- this.enqueueTask(destinationWorkerNodeKey, task)
+ if (destinationWorkerNodeKey != null) {
+ const task = {
+ ...(this.dequeueTask(workerNodeKey) as Task<Data>),
+ workerId: (this.getWorkerInfo(destinationWorkerNodeKey) as WorkerInfo)
+ .id as number
+ }
+ if (executeTask) {
+ this.executeTask(destinationWorkerNodeKey, task)
+ } else {
+ this.enqueueTask(destinationWorkerNodeKey, task)
+ }
}
}
}
workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
)
for (const sourceWorkerNode of workerNodes) {
+ if (sourceWorkerNode.usage.tasks.queued === 0) {
+ break
+ }
if (
sourceWorkerNode.info.ready &&
sourceWorkerNode.info.id !== workerId &&
)
for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
if (
+ sourceWorkerNode.usage.tasks.queued > 0 &&
workerNode.info.ready &&
workerNode.info.id !== workerId &&
- sourceWorkerNode.usage.tasks.queued > 0 &&
- !workerNode.hasBackPressure()
+ workerNode.usage.tasks.queued <
+ (this.opts.tasksQueueOptions?.size as number) - 1
) {
const task = {
...(sourceWorkerNode.popTask() as Task<Data>),