- let targetWorkerNodeKey: number = workerNodeKey
- let minQueuedTasks = Infinity
- let executeTask = false
- for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
- const workerInfo = this.getWorkerInfo(workerNodeId)
- if (
- workerNodeId !== workerNodeKey &&
- workerInfo.ready &&
- workerNode.usage.tasks.queued === 0
- ) {
- if (
- this.workerNodes[workerNodeId].usage.tasks.executing <
- (this.opts.tasksQueueOptions?.concurrency as number)
- ) {
- executeTask = true
- }
- targetWorkerNodeKey = workerNodeId
- break
- }
- if (
- workerNodeId !== workerNodeKey &&
- workerInfo.ready &&
- workerNode.usage.tasks.queued < minQueuedTasks
- ) {
- minQueuedTasks = workerNode.usage.tasks.queued
- targetWorkerNodeKey = workerNodeId
- }
+ const destinationWorkerNodeKey = this.workerNodes.reduce(
+ (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
+ return workerNode.info.ready &&
+ workerNode.usage.tasks.queued <
+ workerNodes[minWorkerNodeKey].usage.tasks.queued
+ ? workerNodeKey
+ : minWorkerNodeKey
+ },
+ 0
+ )
+ this.handleTask(
+ destinationWorkerNodeKey,
+ this.dequeueTask(workerNodeKey) as Task<Data>
+ )
+ }
+ }
+
+ private updateTaskStolenStatisticsWorkerUsage (
+ workerNodeKey: number,
+ taskName: string
+ ): void {
+ const workerNode = this.workerNodes[workerNodeKey]
+ if (workerNode?.usage != null) {
+ ++workerNode.usage.tasks.stolen
+ }
+ if (
+ this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
+ workerNode.getTaskFunctionWorkerUsage(taskName) != null
+ ) {
+ const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
+ taskName
+ ) as WorkerUsage
+ ++taskFunctionWorkerUsage.tasks.stolen
+ }
+ }
+
+ private updateTaskSequentiallyStolenStatisticsWorkerUsage (
+ workerNodeKey: number
+ ): void {
+ const workerNode = this.workerNodes[workerNodeKey]
+ if (workerNode?.usage != null) {
+ ++workerNode.usage.tasks.sequentiallyStolen
+ }
+ }
+
+ private updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
+ workerNodeKey: number,
+ taskName: string
+ ): void {
+ const workerNode = this.workerNodes[workerNodeKey]
+ if (
+ this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
+ workerNode.getTaskFunctionWorkerUsage(taskName) != null
+ ) {
+ const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
+ taskName
+ ) as WorkerUsage
+ ++taskFunctionWorkerUsage.tasks.sequentiallyStolen
+ }
+ }
+
+ private resetTaskSequentiallyStolenStatisticsWorkerUsage (
+ workerNodeKey: number
+ ): void {
+ const workerNode = this.workerNodes[workerNodeKey]
+ if (workerNode?.usage != null) {
+ workerNode.usage.tasks.sequentiallyStolen = 0
+ }
+ }
+
+ private resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
+ workerNodeKey: number,
+ taskName: string
+ ): void {
+ const workerNode = this.workerNodes[workerNodeKey]
+ if (
+ this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
+ workerNode.getTaskFunctionWorkerUsage(taskName) != null
+ ) {
+ const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
+ taskName
+ ) as WorkerUsage
+ taskFunctionWorkerUsage.tasks.sequentiallyStolen = 0
+ }
+ }
+
+ private readonly handleIdleWorkerNodeEvent = (
+ eventDetail: WorkerNodeEventDetail,
+ previousStolenTask?: Task<Data>
+ ): void => {
+ if (this.workerNodes.length <= 1) {
+ return
+ }
+ const { workerNodeKey } = eventDetail
+ if (workerNodeKey == null) {
+ throw new Error(
+ 'WorkerNode event detail workerNodeKey attribute must be defined'
+ )
+ }
+ const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
+ if (
+ previousStolenTask != null &&
+ workerNodeTasksUsage.sequentiallyStolen > 0 &&
+ (workerNodeTasksUsage.executing > 0 ||
+ this.tasksQueueSize(workerNodeKey) > 0)
+ ) {
+ for (const taskName of this.workerNodes[workerNodeKey].info
+ .taskFunctionNames as string[]) {
+ this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
+ workerNodeKey,
+ taskName
+ )