const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
message.workerId
)
- const workerInfo = this.getWorkerInfo(localWorkerNodeKey)
// Kill message received from worker
if (
isKillBehavior(KillBehaviors.HARD, message.kill) ||
(isKillBehavior(KillBehaviors.SOFT, message.kill) &&
this.isWorkerNodeIdle(localWorkerNodeKey) &&
- workerInfo != null &&
- !workerInfo.continuousStealing &&
- !workerInfo.backPressureStealing)
+ !this.isWorkerNodeStealing(localWorkerNodeKey))
) {
// Flag the worker node as not ready immediately
this.flagWorkerNodeAsNotReady(localWorkerNodeKey)
return workerNode.info.ready && workerNode.usage.tasks.executing === 0
}
+ private isWorkerNodeStealing (workerNodeKey: number): boolean {
+ const workerNode = this.workerNodes[workerNodeKey]
+ return (
+ workerNode.info.ready &&
+ (workerNode.info.continuousStealing ||
+ workerNode.info.backPressureStealing)
+ )
+ }
+
private redistributeQueuedTasks (sourceWorkerNodeKey: number): void {
if (sourceWorkerNodeKey === -1 || this.cannotStealTask()) {
return
0
),
stealingWorkerNodes: this.workerNodes.reduce(
- (accumulator, workerNode) =>
- workerNode.info.continuousStealing ||
- workerNode.info.backPressureStealing
+ (accumulator, _, workerNodeKey) =>
+ this.isWorkerNodeStealing(workerNodeKey)
? accumulator + 1
: accumulator,
0