utilization: round(this.utilization),
}),
workerNodes: this.workerNodes.length,
- idleWorkerNodes: this.workerNodes.reduce(
- (accumulator, workerNode) =>
- workerNode.usage.tasks.executing === 0
- ? accumulator + 1
- : accumulator,
- 0
- ),
...(this.opts.enableTasksQueue === true && {
stealingWorkerNodes: this.workerNodes.reduce(
(accumulator, workerNode) =>
0
),
}),
+ idleWorkerNodes: this.workerNodes.reduce(
+ (accumulator, _, workerNodeKey) =>
+ this.isWorkerNodeIdle(workerNodeKey) ? accumulator + 1 : accumulator,
+ 0
+ ),
busyWorkerNodes: this.workerNodes.reduce(
(accumulator, _, workerNodeKey) =>
this.isWorkerNodeBusy(workerNodeKey) ? accumulator + 1 : accumulator,
)
}
+ private isWorkerNodeIdle (workerNodeKey: number): boolean {
+ if (this.opts.enableTasksQueue === true) {
+ return (
+ this.workerNodes[workerNodeKey].usage.tasks.executing === 0 &&
+ this.tasksQueueSize(workerNodeKey) === 0
+ )
+ }
+ return this.workerNodes[workerNodeKey].usage.tasks.executing === 0
+ }
+
private isWorkerNodeBusy (workerNodeKey: number): boolean {
if (this.opts.enableTasksQueue === true) {
return (
message.workerId
)
const workerInfo = this.getWorkerInfo(localWorkerNodeKey)
- const workerUsage = this.workerNodes[localWorkerNodeKey]?.usage
// Kill message received from worker
if (
isKillBehavior(KillBehaviors.HARD, message.kill) ||
(isKillBehavior(KillBehaviors.SOFT, message.kill) &&
- ((this.opts.enableTasksQueue === false &&
- workerUsage.tasks.executing === 0) ||
- (this.opts.enableTasksQueue === true &&
- workerUsage.tasks.executing === 0 &&
- this.tasksQueueSize(localWorkerNodeKey) === 0 &&
- workerInfo != null &&
- !workerInfo.continuousStealing)))
+ this.isWorkerNodeIdle(localWorkerNodeKey) &&
+ workerInfo != null &&
+ !workerInfo.continuousStealing &&
+ !workerInfo.stealing)
) {
// Flag the worker node as not ready immediately
this.flagWorkerNodeAsNotReady(localWorkerNodeKey)
}
const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
if (
- this.cannotStealTask() ||
- (this.info.stealingWorkerNodes ?? 0) >
- Math.round(
- this.workerNodes.length *
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- this.opts.tasksQueueOptions!.tasksStealingRatio!
- )
+ !workerNodeInfo.continuousStealing &&
+ (this.cannotStealTask() ||
+ (this.info.stealingWorkerNodes ?? 0) >
+ Math.round(
+ this.workerNodes.length *
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.opts.tasksQueueOptions!.tasksStealingRatio!
+ ))
) {
- workerNodeInfo.continuousStealing = false
- if (workerNodeTasksUsage.sequentiallyStolen > 0) {
- this.resetTaskSequentiallyStolenStatisticsWorkerUsage(
- workerNodeKey,
- previousStolenTask?.name
- )
- }
return
}
if (
- workerNodeInfo.continuousStealing ||
- workerNodeTasksUsage.executing > 0 ||
- this.tasksQueueSize(workerNodeKey) > 0
+ workerNodeInfo.continuousStealing &&
+ (workerNodeTasksUsage.executing > 0 ||
+ this.tasksQueueSize(workerNodeKey) > 0)
) {
workerNodeInfo.continuousStealing = false
if (workerNodeTasksUsage.sequentiallyStolen > 0) {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
this.executeTask(workerNodeKey, this.dequeueTask(workerNodeKey)!)
}
- if (
- workerNodeTasksUsage.executing === 0 &&
- this.tasksQueueSize(workerNodeKey) === 0
- ) {
+ if (this.isWorkerNodeIdle(workerNodeKey)) {
workerNode.emit('idle', {
workerNodeKey,
})