+ private updateTaskSequentiallyStolenStatisticsWorkerUsage (
+ workerNodeKey: number
+ ): void {
+ const workerNode = this.workerNodes[workerNodeKey]
+ if (workerNode?.usage != null) {
+ ++workerNode.usage.tasks.sequentiallyStolen
+ }
+ }
+
+ private updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
+ workerNodeKey: number,
+ taskName: string
+ ): void {
+ const workerNode = this.workerNodes[workerNodeKey]
+ if (
+ this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
+ workerNode.getTaskFunctionWorkerUsage(taskName) != null
+ ) {
+ const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
+ taskName
+ ) as WorkerUsage
+ ++taskFunctionWorkerUsage.tasks.sequentiallyStolen
+ }
+ }
+
+ private resetTaskSequentiallyStolenStatisticsWorkerUsage (
+ workerNodeKey: number
+ ): void {
+ const workerNode = this.workerNodes[workerNodeKey]
+ if (workerNode?.usage != null) {
+ workerNode.usage.tasks.sequentiallyStolen = 0
+ }
+ }
+
+ private resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
+ workerNodeKey: number,
+ taskName: string
+ ): void {
+ const workerNode = this.workerNodes[workerNodeKey]
+ if (
+ this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
+ workerNode.getTaskFunctionWorkerUsage(taskName) != null
+ ) {
+ const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
+ taskName
+ ) as WorkerUsage
+ taskFunctionWorkerUsage.tasks.sequentiallyStolen = 0
+ }
+ }
+
+ private readonly handleIdleWorkerNodeEvent = (
+ eventDetail: WorkerNodeEventDetail,
+ previousStolenTask?: Task<Data>
+ ): void => {
+ if (this.workerNodes.length <= 1) {
+ return
+ }
+ const { workerNodeKey } = eventDetail
+ if (workerNodeKey == null) {
+ throw new Error(
+ 'WorkerNode event detail workerNodeKey attribute must be defined'
+ )
+ }
+ const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
+ if (
+ previousStolenTask != null &&
+ workerNodeTasksUsage.sequentiallyStolen > 0 &&
+ (workerNodeTasksUsage.executing > 0 ||
+ this.tasksQueueSize(workerNodeKey) > 0)
+ ) {
+ for (const taskName of this.workerNodes[workerNodeKey].info
+ .taskFunctionNames as string[]) {
+ this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
+ workerNodeKey,
+ taskName
+ )
+ }
+ this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
+ return
+ }
+ const stolenTask = this.workerNodeStealTask(workerNodeKey)
+ if (
+ this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
+ stolenTask != null
+ ) {
+ const taskFunctionTasksWorkerUsage = this.workerNodes[
+ workerNodeKey
+ ].getTaskFunctionWorkerUsage(stolenTask.name as string)
+ ?.tasks as TaskStatistics
+ if (
+ taskFunctionTasksWorkerUsage.sequentiallyStolen === 0 ||
+ (previousStolenTask != null &&
+ previousStolenTask.name === stolenTask.name &&
+ taskFunctionTasksWorkerUsage.sequentiallyStolen > 0)
+ ) {
+ this.updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
+ workerNodeKey,
+ stolenTask.name as string
+ )
+ } else {
+ this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
+ workerNodeKey,
+ stolenTask.name as string
+ )
+ }
+ }
+ sleep(exponentialDelay(workerNodeTasksUsage.sequentiallyStolen))
+ .then(() => {
+ this.handleIdleWorkerNodeEvent(eventDetail, stolenTask)
+ return undefined
+ })
+ .catch(EMPTY_FUNCTION)
+ }
+
+ private readonly workerNodeStealTask = (
+ workerNodeKey: number
+ ): Task<Data> | undefined => {