+ this.enqueueTask(destinationWorkerNodeKey, task)
+ }
+ }
+ }
+
+ private updateTaskStolenStatisticsWorkerUsage (
+ workerNodeKey: number,
+ taskName: string
+ ): void {
+ const workerNode = this.workerNodes[workerNodeKey]
+ if (workerNode?.usage != null) {
+ ++workerNode.usage.tasks.stolen
+ }
+ if (
+ this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
+ workerNode.getTaskFunctionWorkerUsage(taskName) != null
+ ) {
+ const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
+ taskName
+ ) as WorkerUsage
+ ++taskFunctionWorkerUsage.tasks.stolen
+ }
+ }
+
+ private taskStealingOnEmptyQueue (workerId: number): void {
+ const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
+ const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
+ const workerNodes = this.workerNodes
+ .slice()
+ .sort(
+ (workerNodeA, workerNodeB) =>
+ workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
+ )
+ const sourceWorkerNode = workerNodes.find(
+ workerNode =>
+ workerNode.info.ready &&
+ workerNode.info.id !== workerId &&
+ workerNode.usage.tasks.queued > 0
+ )
+ if (sourceWorkerNode != null) {
+ const task = {
+ ...(sourceWorkerNode.popTask() as Task<Data>),
+ workerId: destinationWorkerNode.info.id as number
+ }
+ if (this.shallExecuteTask(destinationWorkerNodeKey)) {
+ this.executeTask(destinationWorkerNodeKey, task)
+ } else {
+ this.enqueueTask(destinationWorkerNodeKey, task)
+ }
+ this.updateTaskStolenStatisticsWorkerUsage(
+ destinationWorkerNodeKey,
+ task.name as string
+ )
+ }
+ }
+
+ private tasksStealingOnBackPressure (workerId: number): void {
+ const sizeOffset = 1
+ if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) {
+ return
+ }
+ const sourceWorkerNode =
+ this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
+ const workerNodes = this.workerNodes
+ .slice()
+ .sort(
+ (workerNodeA, workerNodeB) =>
+ workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
+ )
+ for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
+ if (
+ sourceWorkerNode.usage.tasks.queued > 0 &&
+ workerNode.info.ready &&
+ workerNode.info.id !== workerId &&
+ workerNode.usage.tasks.queued <
+ (this.opts.tasksQueueOptions?.size as number) - sizeOffset
+ ) {
+ const task = {
+ ...(sourceWorkerNode.popTask() as Task<Data>),
+ workerId: workerNode.info.id as number
+ }
+ if (this.shallExecuteTask(workerNodeKey)) {
+ this.executeTask(workerNodeKey, task)
+ } else {
+ this.enqueueTask(workerNodeKey, task)
+ }
+ this.updateTaskStolenStatisticsWorkerUsage(
+ workerNodeKey,
+ task.name as string