fix: disable `tasksStealingOnBackPressure` by default
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Wed, 8 May 2024 20:44:42 +0000 (22:44 +0200)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Wed, 8 May 2024 20:44:42 +0000 (22:44 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
CHANGELOG.md
docs/api.md
src/pools/abstract-pool.ts
src/pools/pool.ts
src/pools/utils.ts
src/pools/worker-node.ts
src/pools/worker.ts
tests/pools/abstract-pool.test.mjs
tests/pools/utils.test.mjs
tests/pools/worker-node.test.mjs

index 66877534f7c562d491a0368ef92b14c131f47a1e..a5b67be082d581fe2e85198f372ed9d24ceeaaf5 100644 (file)
@@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
 
 ## [Unreleased]
 
+### Fixed
+
+- Disable `tasksStealingOnBackPressure` by default until performance issues under heavy load are sorted out.
+
 ## [4.0.3] - 2024-05-08
 
 ### Changed
@@ -410,7 +414,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
 
 - Add `startWorkers` to pool options to whether start the minimum number of workers at pool initialization or not.
 - Add `start()` method to pool API to start the minimum number of workers.
-- Add `taskStealing` and `tasksStealingOnPressure` to tasks queue options to whether enable task stealing or not and whether enable tasks stealing under back pressure or not.
+- Add `taskStealing` and `tasksStealingOnBackPressure` to tasks queue options to whether enable task stealing or not and whether enable tasks stealing under back pressure or not.
 - Continuous internal benchmarking: [https://poolifier.github.io/benchmark-results/dev/bench](https://poolifier.github.io/benchmark-results/dev/bench).
 
 ## [2.6.44] - 2023-09-08
index 634ab08bbf12b8b319007d3de176b46e26e47b17..458017a18d79ff284a479e16de94580befd44709 100644 (file)
@@ -137,7 +137,7 @@ An object with these properties:
   - `tasksStealingOnBackPressure` (optional) - Tasks stealing enablement under back pressure.
   - `tasksFinishedTimeout` (optional) - Queued tasks finished timeout in milliseconds at worker termination.
 
-  Default: `{ size: (pool maximum size)^2, concurrency: 1, taskStealing: true, tasksStealingOnBackPressure: true, tasksFinishedTimeout: 2000 }`
+  Default: `{ size: (pool maximum size)^2, concurrency: 1, taskStealing: true, tasksStealingOnBackPressure: false, tasksFinishedTimeout: 2000 }`
 
 - `workerOptions` (optional) - An object with the worker options to pass to worker. See [worker_threads](https://nodejs.org/api/worker_threads.html#worker_threads_new_worker_filename_options) for more details.
 
index 065d39964f793b489e283bf7c2e047b708348300..0de96a411d0273d63604dc41dfc7ad3fc92cfed8 100644 (file)
@@ -1817,7 +1817,7 @@ export abstract class AbstractPool<
     )
     if (sourceWorkerNode != null) {
       // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
-      const task = sourceWorkerNode.dequeueLastBucketTask()!
+      const task = sourceWorkerNode.dequeueLastPrioritizedTask()!
       this.handleTask(workerNodeKey, task)
       this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
       // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
@@ -1831,6 +1831,7 @@ export abstract class AbstractPool<
   ): void => {
     if (
       this.cannotStealTask() ||
+      this.hasBackPressure() ||
       (this.info.stealingWorkerNodes ?? 0) >
         Math.floor(this.workerNodes.length / 2)
     ) {
@@ -1868,7 +1869,7 @@ export abstract class AbstractPool<
         }
         workerInfo.stealing = true
         // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
-        const task = sourceWorkerNode.dequeueLastBucketTask()!
+        const task = sourceWorkerNode.dequeueLastPrioritizedTask()!
         this.handleTask(workerNodeKey, task)
         // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
         this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!)
index ccb4b5b94ef858ad00c0eb5a28c490651854489e..66815bf90bdab3f2eb8a54baec75147b8aa08341 100644 (file)
@@ -137,7 +137,7 @@ export interface TasksQueueOptions {
   /**
    * Whether to enable tasks stealing under back pressure.
    *
-   * @defaultValue true
+   * @defaultValue false
    */
   readonly tasksStealingOnBackPressure?: boolean
   /**
index b7b38ac4a874f279f316929a80514f306b60390a..df9c256e95390d0731b730ae6726678d5bcbdfb7 100644 (file)
@@ -43,7 +43,7 @@ export const getDefaultTasksQueueOptions = (
     size: Math.pow(poolMaxSize, 2),
     concurrency: 1,
     taskStealing: true,
-    tasksStealingOnBackPressure: true,
+    tasksStealingOnBackPressure: false,
     tasksFinishedTimeout: 2000
   }
 }
index 5d28802c12769c1cbaf0cd519e22708918788016..772a839b1c077d23ae53f73b7a1475daa4c8bddc 100644 (file)
@@ -45,7 +45,7 @@ export class WorkerNode<Worker extends IWorker, Data = unknown>
   /** @inheritdoc */
   public tasksQueueBackPressureSize: number
   private readonly tasksQueue: PriorityQueue<Task<Data>>
-  private onBackPressureStarted: boolean
+  private setBackPressureFlag: boolean
   private readonly taskFunctionsUsage: Map<string, WorkerUsage>
 
   /**
@@ -70,7 +70,7 @@ export class WorkerNode<Worker extends IWorker, Data = unknown>
     // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
     this.tasksQueueBackPressureSize = opts.tasksQueueBackPressureSize!
     this.tasksQueue = new PriorityQueue<Task<Data>>(opts.tasksQueueBucketSize)
-    this.onBackPressureStarted = false
+    this.setBackPressureFlag = false
     this.taskFunctionsUsage = new Map<string, WorkerUsage>()
   }
 
@@ -82,23 +82,38 @@ export class WorkerNode<Worker extends IWorker, Data = unknown>
   /** @inheritdoc */
   public enqueueTask (task: Task<Data>): number {
     const tasksQueueSize = this.tasksQueue.enqueue(task, task.priority)
-    if (this.hasBackPressure() && !this.onBackPressureStarted) {
-      this.onBackPressureStarted = true
+    if (
+      !this.setBackPressureFlag &&
+      this.hasBackPressure() &&
+      !this.info.backPressure
+    ) {
+      this.setBackPressureFlag = true
+      this.info.backPressure = true
       this.emit('backPressure', { workerId: this.info.id })
-      this.onBackPressureStarted = false
     }
+    this.setBackPressureFlag = false
     return tasksQueueSize
   }
 
   /** @inheritdoc */
   public dequeueTask (bucket?: number): Task<Data> | undefined {
-    return this.tasksQueue.dequeue(bucket)
+    const task = this.tasksQueue.dequeue(bucket)
+    if (
+      !this.setBackPressureFlag &&
+      !this.hasBackPressure() &&
+      this.info.backPressure
+    ) {
+      this.setBackPressureFlag = true
+      this.info.backPressure = false
+    }
+    this.setBackPressureFlag = false
+    return task
   }
 
   /** @inheritdoc */
-  public dequeueLastBucketTask (): Task<Data> | undefined {
+  public dequeueLastPrioritizedTask (): Task<Data> | undefined {
     // Start from the last empty or partially filled bucket
-    return this.tasksQueue.dequeue(this.tasksQueue.buckets + 1)
+    return this.dequeueTask(this.tasksQueue.buckets + 1)
   }
 
   /** @inheritdoc */
@@ -197,6 +212,7 @@ export class WorkerNode<Worker extends IWorker, Data = unknown>
       type: getWorkerType(worker)!,
       dynamic: false,
       ready: false,
+      backPressure: false,
       stealing: false
     }
   }
index 1a9973ba8a8ba53bcb9aedb59f4ac246d2e4045e..99c0d9a6feeea35815a520ff95a160d927bcc4fc 100644 (file)
@@ -172,6 +172,11 @@ export interface WorkerInfo {
    * This flag is set to `true` when worker node is stealing tasks from another worker node.
    */
   stealing: boolean
+  /**
+   * Back pressure flag.
+   * This flag is set to `true` when worker node tasks queue has back pressure.
+   */
+  backPressure: boolean
   /**
    * Task functions properties.
    */
@@ -326,11 +331,11 @@ export interface IWorkerNode<Worker extends IWorker, Data = unknown>
    */
   readonly dequeueTask: (bucket?: number) => Task<Data> | undefined
   /**
-   * Dequeue last bucket task.
+   * Dequeue last prioritized task.
    *
    * @returns The dequeued task.
    */
-  readonly dequeueLastBucketTask: () => Task<Data> | undefined
+  readonly dequeueLastPrioritizedTask: () => Task<Data> | undefined
   /**
    * Clears tasks queue.
    */
index 6a65f3803d0825b29052942873e13ff5edbe7460..8cbbc0a5dc59e522f3328271dbfd8cd812586388 100644 (file)
@@ -275,7 +275,7 @@ describe('Abstract pool test suite', () => {
         concurrency: 2,
         size: Math.pow(numberOfWorkers, 2),
         taskStealing: true,
-        tasksStealingOnBackPressure: true,
+        tasksStealingOnBackPressure: false,
         tasksFinishedTimeout: 2000
       },
       workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
@@ -591,7 +591,7 @@ describe('Abstract pool test suite', () => {
       concurrency: 1,
       size: Math.pow(numberOfWorkers, 2),
       taskStealing: true,
-      tasksStealingOnBackPressure: true,
+      tasksStealingOnBackPressure: false,
       tasksFinishedTimeout: 2000
     })
     pool.enableTasksQueue(true, { concurrency: 2 })
@@ -600,7 +600,7 @@ describe('Abstract pool test suite', () => {
       concurrency: 2,
       size: Math.pow(numberOfWorkers, 2),
       taskStealing: true,
-      tasksStealingOnBackPressure: true,
+      tasksStealingOnBackPressure: false,
       tasksFinishedTimeout: 2000
     })
     pool.enableTasksQueue(false)
@@ -619,7 +619,7 @@ describe('Abstract pool test suite', () => {
       concurrency: 1,
       size: Math.pow(numberOfWorkers, 2),
       taskStealing: true,
-      tasksStealingOnBackPressure: true,
+      tasksStealingOnBackPressure: false,
       tasksFinishedTimeout: 2000
     })
     for (const workerNode of pool.workerNodes) {
@@ -819,6 +819,7 @@ describe('Abstract pool test suite', () => {
         type: WorkerTypes.cluster,
         dynamic: false,
         ready: true,
+        backPressure: false,
         stealing: false
       })
     }
@@ -835,6 +836,7 @@ describe('Abstract pool test suite', () => {
         type: WorkerTypes.thread,
         dynamic: false,
         ready: true,
+        backPressure: false,
         stealing: false
       })
     }
index cfa378e16aa37d81b1a4cbb51bfbe7c2a57babc7..3842c8c5e223e3e89fdea92e581b0d7f8e41fb52 100644 (file)
@@ -32,7 +32,7 @@ describe('Pool utils test suite', () => {
       concurrency: 1,
       size: Math.pow(poolMaxSize, 2),
       taskStealing: true,
-      tasksStealingOnBackPressure: true,
+      tasksStealingOnBackPressure: false,
       tasksFinishedTimeout: 2000
     })
   })
index d6339080b7b001862609ce1fe48366e9791f84f1..27ce3d2d4c66e27a6385a360c6efb12544099ac9 100644 (file)
@@ -192,6 +192,7 @@ describe('Worker node test suite', () => {
       type: WorkerTypes.thread,
       dynamic: false,
       ready: false,
+      backPressure: false,
       stealing: false
     })
     expect(threadWorkerNode.usage).toStrictEqual({
@@ -227,7 +228,7 @@ describe('Worker node test suite', () => {
     expect(threadWorkerNode.tasksQueueSize()).toBe(
       threadWorkerNode.tasksQueue.size
     )
-    expect(threadWorkerNode.onBackPressureStarted).toBe(false)
+    expect(threadWorkerNode.setBackPressureFlag).toBe(false)
     expect(threadWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map)
 
     expect(clusterWorkerNode).toBeInstanceOf(WorkerNode)
@@ -237,6 +238,7 @@ describe('Worker node test suite', () => {
       type: WorkerTypes.cluster,
       dynamic: false,
       ready: false,
+      backPressure: false,
       stealing: false
     })
     expect(clusterWorkerNode.usage).toStrictEqual({
@@ -272,7 +274,7 @@ describe('Worker node test suite', () => {
     expect(clusterWorkerNode.tasksQueueSize()).toBe(
       clusterWorkerNode.tasksQueue.size
     )
-    expect(clusterWorkerNode.onBackPressureStarted).toBe(false)
+    expect(clusterWorkerNode.setBackPressureFlag).toBe(false)
     expect(clusterWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map)
   })