- if (destinationWorkerNodeKey != null) {
- const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
- const task = {
- ...(this.dequeueTask(workerNodeKey) as Task<Data>),
- workerId: destinationWorkerNode.info.id as number
- }
- if (
- this.tasksQueueSize(destinationWorkerNodeKey) === 0 &&
- destinationWorkerNode.usage.tasks.executing <
- (this.opts.tasksQueueOptions?.concurrency as number)
- ) {
- this.executeTask(destinationWorkerNodeKey, task)
- } else {
- this.enqueueTask(destinationWorkerNodeKey, task)
- }
+ }
+ }
+
+ private updateTaskStolenStatisticsWorkerUsage (
+ workerNodeKey: number,
+ taskName: string
+ ): void {
+ const workerNode = this.workerNodes[workerNodeKey]
+ if (workerNode?.usage != null) {
+ ++workerNode.usage.tasks.stolen
+ }
+ if (
+ this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
+ workerNode.getTaskFunctionWorkerUsage(taskName) != null
+ ) {
+ const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
+ taskName
+ ) as WorkerUsage
+ ++taskFunctionWorkerUsage.tasks.stolen
+ }
+ }
+
+ private updateTaskSequentiallyStolenStatisticsWorkerUsage (
+ workerNodeKey: number
+ ): void {
+ const workerNode = this.workerNodes[workerNodeKey]
+ if (workerNode?.usage != null) {
+ ++workerNode.usage.tasks.sequentiallyStolen
+ }
+ }
+
+ private updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
+ workerNodeKey: number,
+ taskName: string
+ ): void {
+ const workerNode = this.workerNodes[workerNodeKey]
+ if (
+ this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
+ workerNode.getTaskFunctionWorkerUsage(taskName) != null
+ ) {
+ const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
+ taskName
+ ) as WorkerUsage
+ ++taskFunctionWorkerUsage.tasks.sequentiallyStolen
+ }
+ }
+
+ private resetTaskSequentiallyStolenStatisticsWorkerUsage (
+ workerNodeKey: number
+ ): void {
+ const workerNode = this.workerNodes[workerNodeKey]
+ if (workerNode?.usage != null) {
+ workerNode.usage.tasks.sequentiallyStolen = 0
+ }
+ }
+
+ private resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
+ workerNodeKey: number,
+ taskName: string
+ ): void {
+ const workerNode = this.workerNodes[workerNodeKey]
+ if (
+ this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
+ workerNode.getTaskFunctionWorkerUsage(taskName) != null
+ ) {
+ const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
+ taskName
+ ) as WorkerUsage
+ taskFunctionWorkerUsage.tasks.sequentiallyStolen = 0
+ }
+ }
+
+ private readonly handleIdleWorkerNodeEvent = (
+ eventDetail: WorkerNodeEventDetail,
+ previousStolenTask?: Task<Data>
+ ): void => {
+ if (this.workerNodes.length <= 1) {
+ return
+ }
+ const { workerNodeKey } = eventDetail
+ if (workerNodeKey == null) {
+ throw new Error(
+ 'WorkerNode event detail workerNodeKey attribute must be defined'
+ )
+ }
+ const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
+ if (
+ previousStolenTask != null &&
+ workerNodeTasksUsage.sequentiallyStolen > 0 &&
+ (workerNodeTasksUsage.executing > 0 ||
+ this.tasksQueueSize(workerNodeKey) > 0)
+ ) {
+ for (const taskName of this.workerNodes[workerNodeKey].info
+ .taskFunctionNames as string[]) {
+ this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
+ workerNodeKey,
+ taskName
+ )