const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
if (
workerNodeInfo.continuousStealing &&
- (workerNodeTasksUsage.executing > 0 ||
- this.tasksQueueSize(workerNodeKey) > 0)
+ !this.isWorkerNodeIdle(workerNodeKey)
) {
workerNodeInfo.continuousStealing = false
if (workerNodeTasksUsage.sequentiallyStolen > 0) {
* @returns Worker nodes busyness boolean status.
*/
protected internalBusy (): boolean {
- if (this.opts.enableTasksQueue === true) {
- return (
- this.workerNodes.findIndex(
- workerNode =>
- workerNode.info.ready &&
- workerNode.usage.tasks.executing <
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- this.opts.tasksQueueOptions!.concurrency!
- ) === -1
- )
- }
return (
- this.workerNodes.findIndex(
- workerNode =>
- workerNode.info.ready && workerNode.usage.tasks.executing === 0
- ) === -1
+ this.workerNodes.reduce(
+ (accumulator, _, workerNodeKey) =>
+ this.isWorkerNodeIdle(workerNodeKey) ? accumulator + 1 : accumulator,
+ 0
+ ) === 0
)
}
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
this.promiseResponseMap.delete(taskId!)
if (this.opts.enableTasksQueue === true && !this.destroying) {
- const workerNodeTasksUsage = workerNode.usage.tasks
if (
- this.tasksQueueSize(workerNodeKey) > 0 &&
- workerNodeTasksUsage.executing <
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- this.opts.tasksQueueOptions!.concurrency!
+ !this.isWorkerNodeBusy(workerNodeKey) &&
+ this.tasksQueueSize(workerNodeKey) > 0
) {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
this.executeTask(workerNodeKey, this.dequeueTask(workerNodeKey)!)
private isWorkerNodeBusy (workerNodeKey: number): boolean {
if (this.opts.enableTasksQueue === true) {
return (
+ this.workerNodes[workerNodeKey].info.ready &&
this.workerNodes[workerNodeKey].usage.tasks.executing >=
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- this.opts.tasksQueueOptions!.concurrency!
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.opts.tasksQueueOptions!.concurrency!
)
}
- return this.workerNodes[workerNodeKey].usage.tasks.executing > 0
+ return (
+ this.workerNodes[workerNodeKey].info.ready &&
+ this.workerNodes[workerNodeKey].usage.tasks.executing > 0
+ )
}
private isWorkerNodeIdle (workerNodeKey: number): boolean {
if (this.opts.enableTasksQueue === true) {
return (
+ this.workerNodes[workerNodeKey].info.ready &&
this.workerNodes[workerNodeKey].usage.tasks.executing === 0 &&
this.tasksQueueSize(workerNodeKey) === 0
)
}
- return this.workerNodes[workerNodeKey].usage.tasks.executing === 0
+ return (
+ this.workerNodes[workerNodeKey].info.ready &&
+ this.workerNodes[workerNodeKey].usage.tasks.executing === 0
+ )
}
private redistributeQueuedTasks (sourceWorkerNodeKey: number): void {
if (!this.started) {
return false
}
- if (this.empty) {
- return true
- }
return (
this.workerNodes.reduce(
(accumulator, workerNode) =>