feat: add tasks stealing algorithm
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Tue, 22 Aug 2023 21:36:42 +0000 (23:36 +0200)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Tue, 22 Aug 2023 21:36:42 +0000 (23:36 +0200)
reference #901, #887

Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
CHANGELOG.md
README.md
src/pools/abstract-pool.ts
src/pools/worker-node.ts
src/pools/worker.ts

index 56591cb7652b2f638181428cce753ae7171a1f02..0fe43abdff91300f83c2d96a3fef74b238a534fe 100644 (file)
@@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
 
 - Add `queueMaxSize` option to tasks queue options.
 - Add O(1) deque implementation implemented with doubly linked list and use it for tasks queueing.
+- Add tasks stealing algorithm when a worker node queue is back pressured if tasks queueing is enabled.
 
 ## [2.6.31] - 2023-08-20
 
index e80193ecbe941c65af463d4445f23ff5af725c78..eae0023f7b9f9923971601d425aedd89cf8a0a3d 100644 (file)
--- a/README.md
+++ b/README.md
@@ -46,6 +46,9 @@ Please consult our [general guidelines](#general-guidelines).
 - Support for sync and async task functions :white_check_mark:
 - Tasks distribution strategies :white_check_mark:
 - Lockless tasks queueing :white_check_mark:
+- Queued tasks rescheduling:
+  - Tasks redistribution on worker error :white_check_mark:
+  - Tasks stealing under back pressure :white_check_mark:
 - General guidelines on pool choice :white_check_mark:
 - Error handling out of the box :white_check_mark:
 - Widely tested :white_check_mark:
index 1926a4a426fbcb84724ab55749778ff95c5910ee..f0119f4ed3c8135e596d171a0f6ec093d7b29935 100644 (file)
@@ -1142,6 +1142,10 @@ export abstract class AbstractPool<
     this.sendStartupMessageToWorker(workerNodeKey)
     // Send the statistics message to worker.
     this.sendStatisticsMessageToWorker(workerNodeKey)
+    if (this.opts.enableTasksQueue === true) {
+      this.workerNodes[workerNodeKey].onBackPressure =
+        this.tasksStealingOnBackPressure.bind(this)
+    }
   }
 
   /**
@@ -1175,24 +1179,23 @@ export abstract class AbstractPool<
       let minQueuedTasks = Infinity
       let executeTask = false
       for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
-        const workerInfo = this.getWorkerInfo(workerNodeId) as WorkerInfo
+        if (
+          this.workerNodes[workerNodeId].usage.tasks.executing <
+          (this.opts.tasksQueueOptions?.concurrency as number)
+        ) {
+          executeTask = true
+        }
         if (
           workerNodeId !== workerNodeKey &&
-          workerInfo.ready &&
+          workerNode.info.ready &&
           workerNode.usage.tasks.queued === 0
         ) {
-          if (
-            this.workerNodes[workerNodeId].usage.tasks.executing <
-            (this.opts.tasksQueueOptions?.concurrency as number)
-          ) {
-            executeTask = true
-          }
           targetWorkerNodeKey = workerNodeId
           break
         }
         if (
           workerNodeId !== workerNodeKey &&
-          workerInfo.ready &&
+          workerNode.info.ready &&
           workerNode.usage.tasks.queued < minQueuedTasks
         ) {
           minQueuedTasks = workerNode.usage.tasks.queued
@@ -1202,12 +1205,48 @@ export abstract class AbstractPool<
       if (executeTask) {
         this.executeTask(
           targetWorkerNodeKey,
-          this.dequeueTask(workerNodeKey) as Task<Data>
+          this.popTask(workerNodeKey) as Task<Data>
         )
       } else {
         this.enqueueTask(
           targetWorkerNodeKey,
-          this.dequeueTask(workerNodeKey) as Task<Data>
+          this.popTask(workerNodeKey) as Task<Data>
+        )
+      }
+    }
+  }
+
+  private tasksStealingOnBackPressure (workerId: number): void {
+    const sourceWorkerNode =
+      this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
+    const workerNodes = this.workerNodes
+      .filter((workerNode) => workerNode.info.id !== workerId)
+      .sort(
+        (workerNodeA, workerNodeB) =>
+          workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
+      )
+    for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
+      if (
+        workerNode.info.ready &&
+        sourceWorkerNode.usage.tasks.queued > 0 &&
+        !workerNode.hasBackPressure() &&
+        workerNode.usage.tasks.executing <
+          (this.opts.tasksQueueOptions?.concurrency as number)
+      ) {
+        this.executeTask(
+          workerNodeKey,
+          sourceWorkerNode.popTask() as Task<Data>
+        )
+      } else if (
+        workerNode.info.ready &&
+        sourceWorkerNode.usage.tasks.queued > 0 &&
+        !workerNode.hasBackPressure() &&
+        workerNode.usage.tasks.executing >=
+          (this.opts.tasksQueueOptions?.concurrency as number)
+      ) {
+        this.enqueueTask(
+          workerNodeKey,
+          sourceWorkerNode.popTask() as Task<Data>
         )
       }
     }
@@ -1387,6 +1426,10 @@ export abstract class AbstractPool<
     return this.workerNodes[workerNodeKey].dequeueTask()
   }
 
+  private popTask (workerNodeKey: number): Task<Data> | undefined {
+    return this.workerNodes[workerNodeKey].popTask()
+  }
+
   private tasksQueueSize (workerNodeKey: number): number {
     return this.workerNodes[workerNodeKey].tasksQueueSize()
   }
index d7f96e6499522f79d45035150f56b24c392eb2e5..b539cabd9af018bd71c8c8e27eba63eab755333a 100644 (file)
@@ -30,6 +30,8 @@ implements IWorkerNode<Worker, Data> {
   public usage: WorkerUsage
   /** @inheritdoc */
   public tasksQueueBackPressureSize: number
+  /** @inheritdoc */
+  public onBackPressure?: (workerId: number) => void
   private readonly taskFunctionsUsage: Map<string, WorkerUsage>
   private readonly tasksQueue: Deque<Task<Data>>
 
@@ -90,7 +92,20 @@ implements IWorkerNode<Worker, Data> {
 
   /** @inheritdoc */
   public enqueueTask (task: Task<Data>): number {
-    return this.tasksQueue.push(task)
+    const tasksQueueSize = this.tasksQueue.push(task)
+    if (this.onBackPressure != null && this.hasBackPressure()) {
+      this.once(this.onBackPressure)(this.info.id as number)
+    }
+    return tasksQueueSize
+  }
+
+  /** @inheritdoc */
+  public unshiftTask (task: Task<Data>): number {
+    const tasksQueueSize = this.tasksQueue.unshift(task)
+    if (this.onBackPressure != null && this.hasBackPressure()) {
+      this.once(this.onBackPressure)(this.info.id as number)
+    }
+    return tasksQueueSize
   }
 
   /** @inheritdoc */
@@ -98,6 +113,11 @@ implements IWorkerNode<Worker, Data> {
     return this.tasksQueue.shift()
   }
 
+  /** @inheritdoc */
+  public popTask (): Task<Data> | undefined {
+    return this.tasksQueue.pop()
+  }
+
   /** @inheritdoc */
   public clearTasksQueue (): void {
     this.tasksQueue.clear()
@@ -251,4 +271,25 @@ implements IWorkerNode<Worker, Data> {
       return worker.id
     }
   }
+
+  /**
+   * Executes a function once at a time.
+   */
+
+  private once (
+    // eslint-disable-next-line @typescript-eslint/no-explicit-any
+    fn: (...args: any[]) => void,
+    context = this
+    // eslint-disable-next-line @typescript-eslint/no-explicit-any
+  ): (...args: any[]) => void {
+    let called = false
+    // eslint-disable-next-line @typescript-eslint/no-explicit-any
+    return function (...args: any[]): void {
+      if (!called) {
+        called = true
+        fn.apply(context, args)
+        called = false
+      }
+    }
+  }
 }
index e6dd0fae91cd7b0cfd29b14ba828b89d131854d0..6b387a96088d430d455ed0a4dbb3759c1cbbe86e 100644 (file)
@@ -224,6 +224,12 @@ export interface IWorkerNode<Worker extends IWorker, Data = unknown> {
    * This is the number of tasks that can be enqueued before the worker node has back pressure.
    */
   tasksQueueBackPressureSize: number
+  /**
+   * Callback invoked when worker node tasks queue is back pressured.
+   *
+   * @param workerId - The worker id.
+   */
+  onBackPressure?: (workerId: number) => void
   /**
    * Tasks queue size.
    *
@@ -237,12 +243,25 @@ export interface IWorkerNode<Worker extends IWorker, Data = unknown> {
    * @returns The tasks queue size.
    */
   readonly enqueueTask: (task: Task<Data>) => number
+  /**
+   * Prepends a task to the tasks queue.
+   *
+   * @param task - The task to prepend.
+   * @returns The tasks queue size.
+   */
+  readonly unshiftTask: (task: Task<Data>) => number
   /**
    * Dequeue task.
    *
    * @returns The dequeued task.
    */
   readonly dequeueTask: () => Task<Data> | undefined
+  /**
+   * Pops a task from the tasks queue.
+   *
+   * @returns The popped task.
+   */
+  readonly popTask: () => Task<Data> | undefined
   /**
    * Clears tasks queue.
    */