...(this.opts.enableTasksQueue === true && {
stealingWorkerNodes: this.workerNodes.reduce(
(accumulator, workerNode) =>
- workerNode.info.continuousStealing ? accumulator + 1 : accumulator,
+ workerNode.info.continuousStealing ||
+ workerNode.info.backPressureStealing
+ ? accumulator + 1
+ : accumulator,
0
),
}),
this.isWorkerNodeIdle(localWorkerNodeKey) &&
workerInfo != null &&
!workerInfo.continuousStealing &&
- !workerInfo.stealing)
+ !workerInfo.backPressureStealing)
) {
// Flag the worker node as not ready immediately
this.flagWorkerNodeAsNotReady(localWorkerNodeKey)
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
this.opts.tasksQueueOptions!.size! - sizeOffset
) {
+ if (workerNode.info.backPressureStealing) {
+ continue
+ }
+ workerNode.info.backPressureStealing = true
this.stealTask(sourceWorkerNode, workerNodeKey)
+ workerNode.info.backPressureStealing = false
}
}
}
readonly utilization?: number
/** Pool total worker nodes. */
readonly workerNodes: number
- /** Pool continuous stealing worker nodes. */
+ /** Pool tasks stealing worker nodes. */
readonly stealingWorkerNodes?: number
/** Pool idle worker nodes. */
readonly idleWorkerNodes: number
stealing: false,
stolen: false,
continuousStealing: false,
+ backPressureStealing: false,
backPressure: false,
}
}
stolen: boolean
/**
* Continuous stealing flag.
- * This flag is set to `true` when worker node continuously steal tasks from other worker nodes.
+ * This flag is set to `true` when worker node is continuously stealing tasks from other worker nodes.
*/
continuousStealing: boolean
+ /**
+ * Back pressure stealing flag.
+ * This flag is set to `true` when worker node is stealing one task from another back pressured worker node.
+ */
+ backPressureStealing: boolean
/**
* Back pressure flag.
* This flag is set to `true` when worker node tasks queue has back pressure.
stealing: false,
stolen: false,
continuousStealing: false,
+ backPressureStealing: false,
backPressure: false,
})
}
stealing: false,
stolen: false,
continuousStealing: false,
+ backPressureStealing: false,
backPressure: false,
})
}
stealing: false,
stolen: false,
continuousStealing: false,
+ backPressureStealing: false,
backPressure: false,
})
expect(threadWorkerNode.usage).toStrictEqual({
stealing: false,
stolen: false,
continuousStealing: false,
+ backPressureStealing: false,
backPressure: false,
})
expect(clusterWorkerNode.usage).toStrictEqual({