refactor: code cleanups
[poolifier.git] / src / pools / abstract-pool.ts
index ad7adce328f2c0c27618d2fd237e412e2a4fa17d..a23c7983bfc652fbfcb8ece4a75853dbed270ac5 100644 (file)
@@ -327,7 +327,7 @@ export abstract class AbstractPool<
         )
       }),
       busyWorkerNodes: this.workerNodes.reduce(
-        (accumulator, _workerNode, workerNodeKey) =>
+        (accumulator, _, workerNodeKey) =>
           this.isWorkerNodeBusy(workerNodeKey) ? accumulator + 1 : accumulator,
         0
       ),
@@ -376,14 +376,16 @@ export abstract class AbstractPool<
           minimum: round(
             min(
               ...this.workerNodes.map(
-                workerNode => workerNode.usage.runTime.minimum ?? Infinity
+                workerNode =>
+                  workerNode.usage.runTime.minimum ?? Number.POSITIVE_INFINITY
               )
             )
           ),
           maximum: round(
             max(
               ...this.workerNodes.map(
-                workerNode => workerNode.usage.runTime.maximum ?? -Infinity
+                workerNode =>
+                  workerNode.usage.runTime.maximum ?? Number.NEGATIVE_INFINITY
               )
             )
           ),
@@ -419,14 +421,16 @@ export abstract class AbstractPool<
           minimum: round(
             min(
               ...this.workerNodes.map(
-                workerNode => workerNode.usage.waitTime.minimum ?? Infinity
+                workerNode =>
+                  workerNode.usage.waitTime.minimum ?? Number.POSITIVE_INFINITY
               )
             )
           ),
           maximum: round(
             max(
               ...this.workerNodes.map(
-                workerNode => workerNode.usage.waitTime.maximum ?? -Infinity
+                workerNode =>
+                  workerNode.usage.waitTime.maximum ?? Number.NEGATIVE_INFINITY
               )
             )
           ),
@@ -455,6 +459,115 @@ export abstract class AbstractPool<
             )
           })
         }
+      }),
+      ...(this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
+        .elu.aggregate === true && {
+        elu: {
+          idle: {
+            minimum: round(
+              min(
+                ...this.workerNodes.map(
+                  workerNode =>
+                    workerNode.usage.elu.idle.minimum ??
+                    Number.POSITIVE_INFINITY
+                )
+              )
+            ),
+            maximum: round(
+              max(
+                ...this.workerNodes.map(
+                  workerNode =>
+                    workerNode.usage.elu.idle.maximum ??
+                    Number.NEGATIVE_INFINITY
+                )
+              )
+            ),
+            ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
+              .elu.average && {
+              average: round(
+                average(
+                  this.workerNodes.reduce<number[]>(
+                    (accumulator, workerNode) =>
+                      accumulator.concat(workerNode.usage.elu.idle.history),
+                    []
+                  )
+                )
+              )
+            }),
+            ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
+              .elu.median && {
+              median: round(
+                median(
+                  this.workerNodes.reduce<number[]>(
+                    (accumulator, workerNode) =>
+                      accumulator.concat(workerNode.usage.elu.idle.history),
+                    []
+                  )
+                )
+              )
+            })
+          },
+          active: {
+            minimum: round(
+              min(
+                ...this.workerNodes.map(
+                  workerNode =>
+                    workerNode.usage.elu.active.minimum ??
+                    Number.POSITIVE_INFINITY
+                )
+              )
+            ),
+            maximum: round(
+              max(
+                ...this.workerNodes.map(
+                  workerNode =>
+                    workerNode.usage.elu.active.maximum ??
+                    Number.NEGATIVE_INFINITY
+                )
+              )
+            ),
+            ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
+              .elu.average && {
+              average: round(
+                average(
+                  this.workerNodes.reduce<number[]>(
+                    (accumulator, workerNode) =>
+                      accumulator.concat(workerNode.usage.elu.active.history),
+                    []
+                  )
+                )
+              )
+            }),
+            ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
+              .elu.median && {
+              median: round(
+                median(
+                  this.workerNodes.reduce<number[]>(
+                    (accumulator, workerNode) =>
+                      accumulator.concat(workerNode.usage.elu.active.history),
+                    []
+                  )
+                )
+              )
+            })
+          },
+          utilization: {
+            average: round(
+              average(
+                this.workerNodes.map(
+                  workerNode => workerNode.usage.elu.utilization ?? 0
+                )
+              )
+            ),
+            median: round(
+              median(
+                this.workerNodes.map(
+                  workerNode => workerNode.usage.elu.utilization ?? 0
+                )
+              )
+            )
+          }
+        }
       })
     }
   }
@@ -571,7 +684,7 @@ export abstract class AbstractPool<
     }
     if (requireSync) {
       this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies(
-        this.getWorkerWorkerChoiceStrategies(),
+        this.getWorkerChoiceStrategies(),
         this.opts.workerChoiceStrategyOptions
       )
       for (const workerNodeKey of this.workerNodes.keys()) {
@@ -591,7 +704,7 @@ export abstract class AbstractPool<
         this.opts.workerChoiceStrategyOptions
       )
       this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies(
-        this.getWorkerWorkerChoiceStrategies(),
+        this.getWorkerChoiceStrategies(),
         this.opts.workerChoiceStrategyOptions
       )
       for (const workerNodeKey of this.workerNodes.keys()) {
@@ -871,7 +984,7 @@ export abstract class AbstractPool<
     })
     this.taskFunctions.set(name, fn)
     this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies(
-      this.getWorkerWorkerChoiceStrategies()
+      this.getWorkerChoiceStrategies()
     )
     for (const workerNodeKey of this.workerNodes.keys()) {
       this.sendStatisticsMessageToWorker(workerNodeKey)
@@ -898,7 +1011,7 @@ export abstract class AbstractPool<
     }
     this.taskFunctions.delete(name)
     this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies(
-      this.getWorkerWorkerChoiceStrategies()
+      this.getWorkerChoiceStrategies()
     )
     for (const workerNodeKey of this.workerNodes.keys()) {
       this.sendStatisticsMessageToWorker(workerNodeKey)
@@ -920,20 +1033,48 @@ export abstract class AbstractPool<
   }
 
   /**
-   * Gets task function strategy, if any.
+   * Gets task function worker choice strategy, if any.
    *
    * @param name - The task function name.
    * @returns The task function worker choice strategy if the task function worker choice strategy is defined, `undefined` otherwise.
    */
-  private readonly getTaskFunctionWorkerWorkerChoiceStrategy = (
+  private readonly getTaskFunctionWorkerChoiceStrategy = (
     name?: string
   ): WorkerChoiceStrategy | undefined => {
-    if (name != null) {
-      return this.listTaskFunctionsProperties().find(
-        (taskFunctionProperties: TaskFunctionProperties) =>
-          taskFunctionProperties.name === name
-      )?.strategy
+    name = name ?? DEFAULT_TASK_NAME
+    const taskFunctionsProperties = this.listTaskFunctionsProperties()
+    if (name === DEFAULT_TASK_NAME) {
+      name = taskFunctionsProperties[1]?.name
     }
+    return taskFunctionsProperties.find(
+      (taskFunctionProperties: TaskFunctionProperties) =>
+        taskFunctionProperties.name === name
+    )?.strategy
+  }
+
+  /**
+   * Gets worker node task function worker choice strategy, if any.
+   *
+   * @param workerNodeKey - The worker node key.
+   * @param name - The task function name.
+   * @returns The worker node task function worker choice strategy if the worker node task function worker choice strategy is defined, `undefined` otherwise.
+   */
+  private readonly getWorkerNodeTaskFunctionWorkerChoiceStrategy = (
+    workerNodeKey: number,
+    name?: string
+  ): WorkerChoiceStrategy | undefined => {
+    const workerInfo = this.getWorkerInfo(workerNodeKey)
+    if (workerInfo == null) {
+      return
+    }
+    name = name ?? DEFAULT_TASK_NAME
+    if (name === DEFAULT_TASK_NAME) {
+      name = workerInfo.taskFunctionsProperties?.[1]?.name
+    }
+    return workerInfo.taskFunctionsProperties?.find(
+      (taskFunctionProperties: TaskFunctionProperties) =>
+        taskFunctionProperties.name === name
+    )?.strategy
   }
 
   /**
@@ -941,18 +1082,24 @@ export abstract class AbstractPool<
    *
    * @param workerNodeKey - The worker node key.
    * @param name - The task function name.
-   * @returns The task function worker choice priority if the task function worker choice priority is defined, `undefined` otherwise.
+   * @returns The worker node task function priority if the worker node task function priority is defined, `undefined` otherwise.
    */
   private readonly getWorkerNodeTaskFunctionPriority = (
     workerNodeKey: number,
     name?: string
   ): number | undefined => {
-    if (name != null) {
-      return this.getWorkerInfo(workerNodeKey)?.taskFunctionsProperties?.find(
-        (taskFunctionProperties: TaskFunctionProperties) =>
-          taskFunctionProperties.name === name
-      )?.priority
+    const workerInfo = this.getWorkerInfo(workerNodeKey)
+    if (workerInfo == null) {
+      return
+    }
+    name = name ?? DEFAULT_TASK_NAME
+    if (name === DEFAULT_TASK_NAME) {
+      name = workerInfo.taskFunctionsProperties?.[1]?.name
     }
+    return workerInfo.taskFunctionsProperties?.find(
+      (taskFunctionProperties: TaskFunctionProperties) =>
+        taskFunctionProperties.name === name
+    )?.priority
   }
 
   /**
@@ -960,7 +1107,7 @@ export abstract class AbstractPool<
    *
    * @returns The worker choice strategies.
    */
-  private readonly getWorkerWorkerChoiceStrategies =
+  private readonly getWorkerChoiceStrategies =
     (): Set<WorkerChoiceStrategy> => {
       return new Set([
         // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
@@ -1028,15 +1175,16 @@ export abstract class AbstractPool<
         return
       }
       const timestamp = performance.now()
-      const taskFunctionStrategy =
-        this.getTaskFunctionWorkerWorkerChoiceStrategy(name)
-      const workerNodeKey = this.chooseWorkerNode(taskFunctionStrategy)
+      const workerNodeKey = this.chooseWorkerNode(name)
       const task: Task<Data> = {
         name: name ?? DEFAULT_TASK_NAME,
         // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
         data: data ?? ({} as Data),
         priority: this.getWorkerNodeTaskFunctionPriority(workerNodeKey, name),
-        strategy: taskFunctionStrategy,
+        strategy: this.getWorkerNodeTaskFunctionWorkerChoiceStrategy(
+          workerNodeKey,
+          name
+        ),
         transferList,
         timestamp,
         taskId: randomUUID()
@@ -1115,7 +1263,7 @@ export abstract class AbstractPool<
     }
     this.destroying = true
     await Promise.all(
-      this.workerNodes.map(async (_workerNode, workerNodeKey) => {
+      this.workerNodes.map(async (_, workerNodeKey) => {
         await this.destroyWorkerNode(workerNodeKey)
       })
     )
@@ -1306,14 +1454,12 @@ export abstract class AbstractPool<
   }
 
   /**
-   * Chooses a worker node for the next task given the worker choice strategy.
+   * Chooses a worker node for the next task.
    *
-   * @param workerChoiceStrategy - The worker choice strategy.
+   * @param name - The task function name.
    * @returns The chosen worker node key
    */
-  private chooseWorkerNode (
-    workerChoiceStrategy?: WorkerChoiceStrategy
-  ): number {
+  private chooseWorkerNode (name?: string): number {
     if (this.shallCreateDynamicWorker()) {
       const workerNodeKey = this.createAndSetupDynamicWorkerNode()
       if (
@@ -1324,7 +1470,9 @@ export abstract class AbstractPool<
       }
     }
     // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
-    return this.workerChoiceStrategiesContext!.execute(workerChoiceStrategy)
+    return this.workerChoiceStrategiesContext!.execute(
+      this.getTaskFunctionWorkerChoiceStrategy(name)
+    )
   }
 
   /**
@@ -1359,7 +1507,8 @@ export abstract class AbstractPool<
     ) {
       workerNode.usage.runTime.aggregate = min(
         ...this.workerNodes.map(
-          workerNode => workerNode.usage.runTime.aggregate ?? Infinity
+          workerNode =>
+            workerNode.usage.runTime.aggregate ?? Number.POSITIVE_INFINITY
         )
       )
     }
@@ -1369,7 +1518,8 @@ export abstract class AbstractPool<
     ) {
       workerNode.usage.waitTime.aggregate = min(
         ...this.workerNodes.map(
-          workerNode => workerNode.usage.waitTime.aggregate ?? Infinity
+          workerNode =>
+            workerNode.usage.waitTime.aggregate ?? Number.POSITIVE_INFINITY
         )
       )
     }
@@ -1379,7 +1529,8 @@ export abstract class AbstractPool<
     ) {
       workerNode.usage.elu.active.aggregate = min(
         ...this.workerNodes.map(
-          workerNode => workerNode.usage.elu.active.aggregate ?? Infinity
+          workerNode =>
+            workerNode.usage.elu.active.aggregate ?? Number.POSITIVE_INFINITY
         )
       )
     }
@@ -1461,6 +1612,7 @@ export abstract class AbstractPool<
       const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
         message.workerId
       )
+      const workerInfo = this.getWorkerInfo(localWorkerNodeKey)
       const workerUsage = this.workerNodes[localWorkerNodeKey]?.usage
       // Kill message received from worker
       if (
@@ -1469,6 +1621,8 @@ export abstract class AbstractPool<
           ((this.opts.enableTasksQueue === false &&
             workerUsage.tasks.executing === 0) ||
             (this.opts.enableTasksQueue === true &&
+              workerInfo != null &&
+              !workerInfo.stealing &&
               workerUsage.tasks.executing === 0 &&
               this.tasksQueueSize(localWorkerNodeKey) === 0)))
       ) {
@@ -1619,14 +1773,15 @@ export abstract class AbstractPool<
     }
   }
 
-  private redistributeQueuedTasks (workerNodeKey: number): void {
-    if (workerNodeKey === -1 || this.cannotStealTask()) {
+  private redistributeQueuedTasks (sourceWorkerNodeKey: number): void {
+    if (sourceWorkerNodeKey === -1 || this.cannotStealTask()) {
       return
     }
-    while (this.tasksQueueSize(workerNodeKey) > 0) {
+    while (this.tasksQueueSize(sourceWorkerNodeKey) > 0) {
       const destinationWorkerNodeKey = this.workerNodes.reduce(
         (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
-          return workerNode.info.ready &&
+          return sourceWorkerNodeKey !== workerNodeKey &&
+            workerNode.info.ready &&
             workerNode.usage.tasks.queued <
               workerNodes[minWorkerNodeKey].usage.tasks.queued
             ? workerNodeKey
@@ -1637,7 +1792,7 @@ export abstract class AbstractPool<
       this.handleTask(
         destinationWorkerNodeKey,
         // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
-        this.dequeueTask(workerNodeKey)!
+        this.dequeueTask(sourceWorkerNodeKey)!
       )
     }
   }
@@ -1655,28 +1810,21 @@ export abstract class AbstractPool<
       this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
       workerNode.getTaskFunctionWorkerUsage(taskName) != null
     ) {
-      const taskFunctionWorkerUsage =
-        // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
-        workerNode.getTaskFunctionWorkerUsage(taskName)!
-      ++taskFunctionWorkerUsage.tasks.stolen
+      // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+      ++workerNode.getTaskFunctionWorkerUsage(taskName)!.tasks.stolen
     }
   }
 
   private updateTaskSequentiallyStolenStatisticsWorkerUsage (
-    workerNodeKey: number
+    workerNodeKey: number,
+    taskName: string,
+    previousTaskName?: string
   ): void {
     const workerNode = this.workerNodes[workerNodeKey]
     // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
     if (workerNode?.usage != null) {
       ++workerNode.usage.tasks.sequentiallyStolen
     }
-  }
-
-  private updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
-    workerNodeKey: number,
-    taskName: string
-  ): void {
-    const workerNode = this.workerNodes[workerNodeKey]
     if (
       this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
       workerNode.getTaskFunctionWorkerUsage(taskName) != null
@@ -1684,33 +1832,36 @@ export abstract class AbstractPool<
       const taskFunctionWorkerUsage =
         // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
         workerNode.getTaskFunctionWorkerUsage(taskName)!
-      ++taskFunctionWorkerUsage.tasks.sequentiallyStolen
+      if (
+        taskFunctionWorkerUsage.tasks.sequentiallyStolen === 0 ||
+        (previousTaskName != null &&
+          previousTaskName === taskName &&
+          taskFunctionWorkerUsage.tasks.sequentiallyStolen > 0)
+      ) {
+        ++taskFunctionWorkerUsage.tasks.sequentiallyStolen
+      } else if (taskFunctionWorkerUsage.tasks.sequentiallyStolen > 0) {
+        taskFunctionWorkerUsage.tasks.sequentiallyStolen = 0
+      }
     }
   }
 
   private resetTaskSequentiallyStolenStatisticsWorkerUsage (
-    workerNodeKey: number
+    workerNodeKey: number,
+    taskName: string
   ): void {
     const workerNode = this.workerNodes[workerNodeKey]
     // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
     if (workerNode?.usage != null) {
       workerNode.usage.tasks.sequentiallyStolen = 0
     }
-  }
-
-  private resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
-    workerNodeKey: number,
-    taskName: string
-  ): void {
-    const workerNode = this.workerNodes[workerNodeKey]
     if (
       this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
       workerNode.getTaskFunctionWorkerUsage(taskName) != null
     ) {
-      const taskFunctionWorkerUsage =
-        // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
-        workerNode.getTaskFunctionWorkerUsage(taskName)!
-      taskFunctionWorkerUsage.tasks.sequentiallyStolen = 0
+      // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+      workerNode.getTaskFunctionWorkerUsage(
+        taskName
+      )!.tasks.sequentiallyStolen = 0
     }
   }
 
@@ -1725,69 +1876,49 @@ export abstract class AbstractPool<
       )
     }
     const workerInfo = this.getWorkerInfo(workerNodeKey)
+    if (workerInfo == null) {
+      throw new Error(
+        `Worker node with key '${workerNodeKey}' not found in pool`
+      )
+    }
     if (
       this.cannotStealTask() ||
       (this.info.stealingWorkerNodes ?? 0) >
         Math.floor(this.workerNodes.length / 2)
     ) {
-      if (workerInfo != null && previousStolenTask != null) {
+      if (previousStolenTask != null) {
         workerInfo.stealing = false
+        this.resetTaskSequentiallyStolenStatisticsWorkerUsage(
+          workerNodeKey,
+          // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+          previousStolenTask.name!
+        )
       }
       return
     }
     const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
     if (
-      workerInfo != null &&
       previousStolenTask != null &&
-      workerNodeTasksUsage.sequentiallyStolen > 0 &&
       (workerNodeTasksUsage.executing > 0 ||
         this.tasksQueueSize(workerNodeKey) > 0)
     ) {
       workerInfo.stealing = false
-      // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
-      for (const taskFunctionProperties of workerInfo.taskFunctionsProperties!) {
-        this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
-          workerNodeKey,
-          taskFunctionProperties.name
-        )
-      }
-      this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
-      return
-    }
-    if (workerInfo == null) {
-      throw new Error(
-        `Worker node with key '${workerNodeKey}' not found in pool`
+      this.resetTaskSequentiallyStolenStatisticsWorkerUsage(
+        workerNodeKey,
+        // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+        previousStolenTask.name!
       )
+      return
     }
     workerInfo.stealing = true
     const stolenTask = this.workerNodeStealTask(workerNodeKey)
-    if (
-      this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
-      stolenTask != null
-    ) {
-      // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
-      const taskFunctionTasksWorkerUsage = this.workerNodes[
-        workerNodeKey
+    if (stolenTask != null) {
+      this.updateTaskSequentiallyStolenStatisticsWorkerUsage(
+        workerNodeKey,
         // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
-      ].getTaskFunctionWorkerUsage(stolenTask.name!)!.tasks
-      if (
-        taskFunctionTasksWorkerUsage.sequentiallyStolen === 0 ||
-        (previousStolenTask != null &&
-          previousStolenTask.name === stolenTask.name &&
-          taskFunctionTasksWorkerUsage.sequentiallyStolen > 0)
-      ) {
-        this.updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
-          workerNodeKey,
-          // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
-          stolenTask.name!
-        )
-      } else {
-        this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
-          workerNodeKey,
-          // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
-          stolenTask.name!
-        )
-      }
+        stolenTask.name!,
+        previousStolenTask?.name
+      )
     }
     sleep(exponentialDelay(workerNodeTasksUsage.sequentiallyStolen))
       .then(() => {
@@ -1817,9 +1948,8 @@ export abstract class AbstractPool<
     )
     if (sourceWorkerNode != null) {
       // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
-      const task = sourceWorkerNode.dequeueTask(1)!
+      const task = sourceWorkerNode.dequeueLastPrioritizedTask()!
       this.handleTask(workerNodeKey, task)
-      this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
       // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
       this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!)
       return task
@@ -1831,17 +1961,18 @@ export abstract class AbstractPool<
   ): void => {
     if (
       this.cannotStealTask() ||
+      this.hasBackPressure() ||
       (this.info.stealingWorkerNodes ?? 0) >
         Math.floor(this.workerNodes.length / 2)
     ) {
       return
     }
-    const { workerId } = eventDetail
     const sizeOffset = 1
     // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
     if (this.opts.tasksQueueOptions!.size! <= sizeOffset) {
       return
     }
+    const { workerId } = eventDetail
     const sourceWorkerNode =
       this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
     const workerNodes = this.workerNodes
@@ -1868,7 +1999,7 @@ export abstract class AbstractPool<
         }
         workerInfo.stealing = true
         // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
-        const task = sourceWorkerNode.dequeueTask(1)!
+        const task = sourceWorkerNode.dequeueLastPrioritizedTask()!
         this.handleTask(workerNodeKey, task)
         // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
         this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!)
@@ -2105,11 +2236,8 @@ export abstract class AbstractPool<
     return tasksQueueSize
   }
 
-  private dequeueTask (
-    workerNodeKey: number,
-    bucket?: number
-  ): Task<Data> | undefined {
-    return this.workerNodes[workerNodeKey].dequeueTask(bucket)
+  private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
+    return this.workerNodes[workerNodeKey].dequeueTask()
   }
 
   private tasksQueueSize (workerNodeKey: number): number {