+ workerInfo.stealing = true
+ const stolenTask = this.workerNodeStealTask(workerNodeKey)
+ if (
+ this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
+ stolenTask != null
+ ) {
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ const taskFunctionTasksWorkerUsage = this.workerNodes[
+ workerNodeKey
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ ].getTaskFunctionWorkerUsage(stolenTask.name!)!.tasks
+ if (
+ taskFunctionTasksWorkerUsage.sequentiallyStolen === 0 ||
+ (previousStolenTask != null &&
+ previousStolenTask.name === stolenTask.name &&
+ taskFunctionTasksWorkerUsage.sequentiallyStolen > 0)
+ ) {
+ this.updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
+ workerNodeKey,
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ stolenTask.name!
+ )
+ } else {
+ this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
+ workerNodeKey,
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ stolenTask.name!
+ )
+ }
+ }
+ sleep(exponentialDelay(workerNodeTasksUsage.sequentiallyStolen))
+ .then(() => {
+ this.handleWorkerNodeIdleEvent(eventDetail, stolenTask)
+ return undefined
+ })
+ .catch(error => {
+ this.emitter?.emit(PoolEvents.error, error)
+ })
+ }
+
+ private readonly workerNodeStealTask = (
+ workerNodeKey: number
+ ): Task<Data> | undefined => {
+ const workerNodes = this.workerNodes
+ .slice()
+ .sort(
+ (workerNodeA, workerNodeB) =>
+ workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
+ )
+ const sourceWorkerNode = workerNodes.find(
+ (sourceWorkerNode, sourceWorkerNodeKey) =>
+ sourceWorkerNode.info.ready &&
+ !sourceWorkerNode.info.stealing &&
+ sourceWorkerNodeKey !== workerNodeKey &&
+ sourceWorkerNode.usage.tasks.queued > 0
+ )
+ if (sourceWorkerNode != null) {
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ const task = sourceWorkerNode.popTask()!
+ this.handleTask(workerNodeKey, task)
+ this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!)
+ return task
+ }
+ }
+
+ private readonly handleWorkerNodeBackPressureEvent = (
+ eventDetail: WorkerNodeEventDetail
+ ): void => {
+ if (
+ this.cannotStealTask() ||
+ (this.info.stealingWorkerNodes ?? 0) >
+ Math.floor(this.workerNodes.length / 2)
+ ) {
+ return
+ }
+ const { workerId } = eventDetail
+ const sizeOffset = 1
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ if (this.opts.tasksQueueOptions!.size! <= sizeOffset) {
+ return
+ }
+ const sourceWorkerNode =
+ this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
+ const workerNodes = this.workerNodes
+ .slice()
+ .sort(
+ (workerNodeA, workerNodeB) =>
+ workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
+ )
+ for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
+ if (
+ sourceWorkerNode.usage.tasks.queued > 0 &&
+ workerNode.info.ready &&
+ !workerNode.info.stealing &&
+ workerNode.info.id !== workerId &&
+ workerNode.usage.tasks.queued <
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.opts.tasksQueueOptions!.size! - sizeOffset
+ ) {
+ const workerInfo = this.getWorkerInfo(workerNodeKey)
+ if (workerInfo == null) {
+ throw new Error(
+ `Worker node with key '${workerNodeKey}' not found in pool`
+ )
+ }
+ workerInfo.stealing = true
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ const task = sourceWorkerNode.popTask()!
+ this.handleTask(workerNodeKey, task)
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!)
+ workerInfo.stealing = false
+ }
+ }