...(this.opts.enableTasksQueue === true && {
stealingWorkerNodes: this.workerNodes.reduce(
(accumulator, workerNode) =>
- workerNode.info.stealing ? accumulator + 1 : accumulator,
- 0
- ),
- }),
- ...(this.opts.enableTasksQueue === true && {
- stolenWorkerNodes: this.workerNodes.reduce(
- (accumulator, workerNode) =>
- workerNode.info.stolen ? accumulator + 1 : accumulator,
+ workerNode.info.continuousStealing ? accumulator + 1 : accumulator,
0
),
}),
workerUsage.tasks.executing === 0 &&
this.tasksQueueSize(localWorkerNodeKey) === 0 &&
workerInfo != null &&
- !workerInfo.stealing)))
+ !workerInfo.continuousStealing)))
) {
// Flag the worker node as not ready immediately
this.flagWorkerNodeAsNotReady(localWorkerNodeKey)
private updateTaskSequentiallyStolenStatisticsWorkerUsage (
workerNodeKey: number,
- taskName: string,
+ taskName?: string,
previousTaskName?: string
): void {
const workerNode = this.workerNodes[workerNodeKey]
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
- if (workerNode?.usage != null) {
+ if (workerNode?.usage != null && taskName != null) {
++workerNode.usage.tasks.sequentiallyStolen
}
if (
+ taskName != null &&
this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
workerNode.getTaskFunctionWorkerUsage(taskName) != null
) {
private resetTaskSequentiallyStolenStatisticsWorkerUsage (
workerNodeKey: number,
- taskName: string
+ taskName?: string
): void {
const workerNode = this.workerNodes[workerNodeKey]
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
workerNode.usage.tasks.sequentiallyStolen = 0
}
if (
+ taskName != null &&
this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
workerNode.getTaskFunctionWorkerUsage(taskName) != null
) {
destinationWorkerInfo.stealing = true
sourceWorkerNode.info.stolen = true
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- const task = sourceWorkerNode.dequeueLastPrioritizedTask()!
+ const stolenTask = sourceWorkerNode.dequeueLastPrioritizedTask()!
sourceWorkerNode.info.stolen = false
destinationWorkerInfo.stealing = false
- this.handleTask(destinationWorkerNodeKey, task)
+ this.handleTask(destinationWorkerNodeKey, stolenTask)
this.updateTaskStolenStatisticsWorkerUsage(
destinationWorkerNodeKey,
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- task.name!
+ stolenTask.name!
)
- return task
+ return stolenTask
}
private readonly handleWorkerNodeIdleEvent = (
"WorkerNode event detail 'workerNodeKey' property must be defined"
)
}
+ const workerNodeInfo = this.getWorkerInfo(workerNodeKey)
+ if (workerNodeInfo == null) {
+ throw new Error(
+ `Worker node with key '${workerNodeKey.toString()}' not found in pool`
+ )
+ }
+ const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
if (
this.cannotStealTask() ||
(this.info.stealingWorkerNodes ?? 0) >
this.opts.tasksQueueOptions!.tasksStealingRatio!
)
) {
- if (previousStolenTask != null) {
+ workerNodeInfo.continuousStealing = false
+ if (workerNodeTasksUsage.sequentiallyStolen > 0) {
this.resetTaskSequentiallyStolenStatisticsWorkerUsage(
workerNodeKey,
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- previousStolenTask.name!
+ previousStolenTask?.name
)
}
return
}
- const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
if (
- previousStolenTask != null &&
- (workerNodeTasksUsage.executing > 0 ||
- this.tasksQueueSize(workerNodeKey) > 0)
+ workerNodeInfo.continuousStealing ||
+ workerNodeTasksUsage.executing > 0 ||
+ this.tasksQueueSize(workerNodeKey) > 0
) {
- this.resetTaskSequentiallyStolenStatisticsWorkerUsage(
- workerNodeKey,
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- previousStolenTask.name!
- )
+ workerNodeInfo.continuousStealing = false
+ if (workerNodeTasksUsage.sequentiallyStolen > 0) {
+ this.resetTaskSequentiallyStolenStatisticsWorkerUsage(
+ workerNodeKey,
+ previousStolenTask?.name
+ )
+ }
return
}
+ workerNodeInfo.continuousStealing = true
const stolenTask = this.workerNodeStealTask(workerNodeKey)
- if (stolenTask != null) {
- this.updateTaskSequentiallyStolenStatisticsWorkerUsage(
- workerNodeKey,
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- stolenTask.name!,
- previousStolenTask?.name
- )
- }
+ this.updateTaskSequentiallyStolenStatisticsWorkerUsage(
+ workerNodeKey,
+ stolenTask?.name,
+ previousStolenTask?.name
+ )
sleep(exponentialDelay(workerNodeTasksUsage.sequentiallyStolen))
.then(() => {
this.handleWorkerNodeIdleEvent(eventDetail, stolenTask)
}
private handleTaskExecutionResponse (message: MessageValue<Response>): void {
- const { workerId, taskId, workerError, data } = message
+ const { taskId, workerError, data } = message
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const promiseResponse = this.promiseResponseMap.get(taskId!)
if (promiseResponse != null) {
this.afterTaskExecutionHook(workerNodeKey, message)
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
this.promiseResponseMap.delete(taskId!)
- // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
- workerNode?.emit('taskFinished', taskId)
+ workerNode.emit('taskFinished', taskId)
if (
this.opts.enableTasksQueue === true &&
!this.destroying &&
}
if (
workerNodeTasksUsage.executing === 0 &&
- this.tasksQueueSize(workerNodeKey) === 0 &&
- workerNodeTasksUsage.sequentiallyStolen === 0
+ this.tasksQueueSize(workerNodeKey) === 0
) {
workerNode.emit('idle', {
- workerId,
workerNodeKey,
})
}