feat: add `queueMaxSize` option to tasks queue options
authorJérôme Benoit <jerome.benoit@sap.com>
Sun, 20 Aug 2023 14:44:12 +0000 (16:44 +0200)
committerJérôme Benoit <jerome.benoit@sap.com>
Sun, 20 Aug 2023 14:44:12 +0000 (16:44 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
CHANGELOG.md
docs/api.md
src/pools/abstract-pool.ts
src/pools/pool.ts
src/pools/worker-node.ts
src/pools/worker.ts
tests/pools/abstract/abstract-pool.test.js

index a9c2033497368a4682b1c29c4ee81fd8ca6bbfdf..d2f3b38b479ef9bbba3cb311ba0e2ffba8a6a4be 100644 (file)
@@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
 
 ## [Unreleased]
 
+### Added
+
+- Add `queueMaxSize` option to tasks queue options.
+
 ## [2.6.31] - 2023-08-20
 
 ### Fixed
index 62d694b6bbb4bf412176705e9f9ffbe2783150c1..0474e2aabea3a2196d7a3a105f46377bfa8bd7b4 100644 (file)
@@ -93,9 +93,10 @@ An object with these properties:
 - `tasksQueueOptions` (optional) - The worker tasks queue options object to use in this pool.  
   Properties:
 
+  - `queueMaxSize` (optional) - The maximum number of tasks that can be queued on a worker before flagging it as back pressured. It must be a positive integer.
   - `concurrency` (optional) - The maximum number of tasks that can be executed concurrently on a worker. It must be a positive integer.
 
-  Default: `{ concurrency: 1 }`
+  Default: `{ queueMaxSize: (pool maximum size)^2, concurrency: 1 }`
 
 #### `ThreadPoolOptions extends PoolOptions`
 
index 289d6b35698303a556ced4eb037abfa2c8bb1d8a..41e7bb24cd4f69653e0e11b7d74b636bbda8a25c 100644 (file)
@@ -295,7 +295,7 @@ export abstract class AbstractPool<
       !Number.isSafeInteger(tasksQueueOptions.concurrency)
     ) {
       throw new TypeError(
-        'Invalid worker tasks concurrency: must be an integer'
+        'Invalid worker node tasks concurrency: must be an integer'
       )
     }
     if (
@@ -303,7 +303,23 @@ export abstract class AbstractPool<
       tasksQueueOptions.concurrency <= 0
     ) {
       throw new RangeError(
-        `Invalid worker tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero`
+        `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero`
+      )
+    }
+    if (
+      tasksQueueOptions?.queueMaxSize != null &&
+      !Number.isSafeInteger(tasksQueueOptions.queueMaxSize)
+    ) {
+      throw new TypeError(
+        'Invalid worker node tasks queue max size: must be an integer'
+      )
+    }
+    if (
+      tasksQueueOptions?.queueMaxSize != null &&
+      tasksQueueOptions.queueMaxSize <= 0
+    ) {
+      throw new RangeError(
+        `Invalid worker node tasks queue max size: ${tasksQueueOptions.queueMaxSize} is a negative integer or zero`
       )
     }
   }
@@ -620,16 +636,29 @@ export abstract class AbstractPool<
       this.checkValidTasksQueueOptions(tasksQueueOptions)
       this.opts.tasksQueueOptions =
         this.buildTasksQueueOptions(tasksQueueOptions)
+      this.setTasksQueueMaxSize(
+        this.opts.tasksQueueOptions.queueMaxSize as number
+      )
     } else if (this.opts.tasksQueueOptions != null) {
       delete this.opts.tasksQueueOptions
     }
   }
 
+  private setTasksQueueMaxSize (queueMaxSize: number): void {
+    for (const workerNode of this.workerNodes) {
+      workerNode.tasksQueueBackPressureSize = queueMaxSize
+    }
+  }
+
   private buildTasksQueueOptions (
     tasksQueueOptions: TasksQueueOptions
   ): TasksQueueOptions {
     return {
-      concurrency: tasksQueueOptions?.concurrency ?? 1
+      ...{
+        queueMaxSize: Math.pow(this.maxSize, 2),
+        concurrency: 1
+      },
+      ...tasksQueueOptions
     }
   }
 
@@ -1292,7 +1321,7 @@ export abstract class AbstractPool<
     const workerNode = new WorkerNode<Worker, Data>(
       worker,
       this.worker,
-      this.maxSize
+      this.opts.tasksQueueOptions?.queueMaxSize ?? Math.pow(this.maxSize, 2)
     )
     // Flag the worker node as ready at pool startup.
     if (this.starting) {
index 09d2f229c5bbf02baf00172c494efa3d9df23584..c8aedce756d791bcbcff27afc1e7b795267139e0 100644 (file)
@@ -96,11 +96,17 @@ export interface PoolInfo {
 }
 
 /**
- * Worker tasks queue options.
+ * Worker node tasks queue options.
  */
 export interface TasksQueueOptions {
   /**
-   * Maximum number of tasks that can be executed concurrently on a worker.
+   * Maximum tasks queue size per worker node flagging it as back pressured.
+   *
+   * @defaultValue (pool maximum size)^2
+   */
+  readonly queueMaxSize?: number
+  /**
+   * Maximum number of tasks that can be executed concurrently on a worker node.
    *
    * @defaultValue 1
    */
@@ -150,13 +156,13 @@ export interface PoolOptions<Worker extends IWorker> {
    */
   enableEvents?: boolean
   /**
-   * Pool worker tasks queue.
+   * Pool worker node tasks queue.
    *
    * @defaultValue false
    */
   enableTasksQueue?: boolean
   /**
-   * Pool worker tasks queue options.
+   * Pool worker node tasks queue options.
    */
   tasksQueueOptions?: TasksQueueOptions
 }
@@ -202,7 +208,7 @@ export interface IPool<
    * - '`destroy`': Emitted when the pool is destroyed.
    * - `'error'`: Emitted when an uncaught error occurs.
    * - `'taskError'`: Emitted when an error occurs while executing a task.
-   * - `'backPressure'`: Emitted when all worker nodes have back pressure (i.e. their tasks queue is full: queue size \>= pool maximum size^2).
+   * - `'backPressure'`: Emitted when all worker nodes have back pressure (i.e. their tasks queue is full: queue size \>= max queue size).
    */
   readonly emitter?: PoolEmitter
   /**
@@ -247,19 +253,19 @@ export interface IPool<
     workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
   ) => void
   /**
-   * Enables/disables the worker tasks queue in this pool.
+   * Enables/disables the worker node tasks queue in this pool.
    *
-   * @param enable - Whether to enable or disable the worker tasks queue.
-   * @param tasksQueueOptions - The worker tasks queue options.
+   * @param enable - Whether to enable or disable the worker node tasks queue.
+   * @param tasksQueueOptions - The worker node tasks queue options.
    */
   readonly enableTasksQueue: (
     enable: boolean,
     tasksQueueOptions?: TasksQueueOptions
   ) => void
   /**
-   * Sets the worker tasks queue options in this pool.
+   * Sets the worker node tasks queue options in this pool.
    *
-   * @param tasksQueueOptions - The worker tasks queue options.
+   * @param tasksQueueOptions - The worker node tasks queue options.
    */
   readonly setTasksQueueOptions: (tasksQueueOptions: TasksQueueOptions) => void
 }
index 82e276886c08f56bc4051bc537df88b222600af0..545ee7e8598398d9b5ce11df7ac86a9edc75918f 100644 (file)
@@ -28,18 +28,23 @@ implements IWorkerNode<Worker, Data> {
   public messageChannel?: MessageChannel
   /** @inheritdoc */
   public usage: WorkerUsage
+  /** @inheritdoc */
+  public tasksQueueBackPressureSize: number
   private readonly taskFunctionsUsage: Map<string, WorkerUsage>
   private readonly tasksQueue: Queue<Task<Data>>
-  private readonly tasksQueueBackPressureSize: number
 
   /**
    * Constructs a new worker node.
    *
    * @param worker - The worker.
    * @param workerType - The worker type.
-   * @param poolMaxSize - The pool maximum size.
+   * @param tasksQueueBackPressureSize - The tasks queue back pressure size.
    */
-  constructor (worker: Worker, workerType: WorkerType, poolMaxSize: number) {
+  constructor (
+    worker: Worker,
+    workerType: WorkerType,
+    tasksQueueBackPressureSize: number
+  ) {
     if (worker == null) {
       throw new TypeError('Cannot construct a worker node without a worker')
     }
@@ -48,14 +53,14 @@ implements IWorkerNode<Worker, Data> {
         'Cannot construct a worker node without a worker type'
       )
     }
-    if (poolMaxSize == null) {
+    if (tasksQueueBackPressureSize == null) {
       throw new TypeError(
-        'Cannot construct a worker node without a pool maximum size'
+        'Cannot construct a worker node without a tasks queue back pressure size'
       )
     }
-    if (!Number.isSafeInteger(poolMaxSize)) {
+    if (!Number.isSafeInteger(tasksQueueBackPressureSize)) {
       throw new TypeError(
-        'Cannot construct a worker node with a pool maximum size that is not an integer'
+        'Cannot construct a worker node with a tasks queue back pressure size that is not an integer'
       )
     }
     this.worker = worker
@@ -66,7 +71,7 @@ implements IWorkerNode<Worker, Data> {
     this.usage = this.initWorkerUsage()
     this.taskFunctionsUsage = new Map<string, WorkerUsage>()
     this.tasksQueue = new Queue<Task<Data>>()
-    this.tasksQueueBackPressureSize = Math.pow(poolMaxSize, 2)
+    this.tasksQueueBackPressureSize = tasksQueueBackPressureSize
   }
 
   /** @inheritdoc */
index e4a31e393fad35e06368e3d33cc5820ca1dbd099..e6dd0fae91cd7b0cfd29b14ba828b89d131854d0 100644 (file)
@@ -219,6 +219,11 @@ export interface IWorkerNode<Worker extends IWorker, Data = unknown> {
    * Worker usage statistics.
    */
   usage: WorkerUsage
+  /**
+   * Tasks queue back pressure size.
+   * This is the number of tasks that can be enqueued before the worker node has back pressure.
+   */
+  tasksQueueBackPressureSize: number
   /**
    * Tasks queue size.
    *
index 867c1415c95f0cea27f3b85df83d3d8090f140c2..0ffc9e124d27e5262508dcd66e88fb06e99e1847 100644 (file)
@@ -212,7 +212,10 @@ describe('Abstract pool test suite', () => {
     expect(pool.opts.enableEvents).toBe(false)
     expect(pool.opts.restartWorkerOnError).toBe(false)
     expect(pool.opts.enableTasksQueue).toBe(true)
-    expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
+    expect(pool.opts.tasksQueueOptions).toStrictEqual({
+      concurrency: 2,
+      queueMaxSize: 4
+    })
     expect(pool.opts.workerChoiceStrategy).toBe(
       WorkerChoiceStrategies.LEAST_USED
     )
@@ -290,7 +293,7 @@ describe('Abstract pool test suite', () => {
         )
     ).toThrowError(
       new RangeError(
-        'Invalid worker tasks concurrency: 0 is a negative integer or zero'
+        'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
       )
     )
     expect(
@@ -317,7 +320,7 @@ describe('Abstract pool test suite', () => {
           }
         )
     ).toThrowError(
-      new TypeError('Invalid worker tasks concurrency: must be an integer')
+      new TypeError('Invalid worker node tasks concurrency: must be an integer')
     )
   })
 
@@ -488,10 +491,16 @@ describe('Abstract pool test suite', () => {
     expect(pool.opts.tasksQueueOptions).toBeUndefined()
     pool.enableTasksQueue(true)
     expect(pool.opts.enableTasksQueue).toBe(true)
-    expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1 })
+    expect(pool.opts.tasksQueueOptions).toStrictEqual({
+      concurrency: 1,
+      queueMaxSize: 4
+    })
     pool.enableTasksQueue(true, { concurrency: 2 })
     expect(pool.opts.enableTasksQueue).toBe(true)
-    expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
+    expect(pool.opts.tasksQueueOptions).toStrictEqual({
+      concurrency: 2,
+      queueMaxSize: 4
+    })
     pool.enableTasksQueue(false)
     expect(pool.opts.enableTasksQueue).toBe(false)
     expect(pool.opts.tasksQueueOptions).toBeUndefined()
@@ -504,9 +513,15 @@ describe('Abstract pool test suite', () => {
       './tests/worker-files/thread/testWorker.js',
       { enableTasksQueue: true }
     )
-    expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1 })
+    expect(pool.opts.tasksQueueOptions).toStrictEqual({
+      concurrency: 1,
+      queueMaxSize: 4
+    })
     pool.setTasksQueueOptions({ concurrency: 2 })
-    expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
+    expect(pool.opts.tasksQueueOptions).toStrictEqual({
+      concurrency: 2,
+      queueMaxSize: 4
+    })
     expect(() =>
       pool.setTasksQueueOptions('invalidTasksQueueOptions')
     ).toThrowError(
@@ -514,16 +529,31 @@ describe('Abstract pool test suite', () => {
     )
     expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrowError(
       new RangeError(
-        'Invalid worker tasks concurrency: 0 is a negative integer or zero'
+        'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
       )
     )
     expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrowError(
       new RangeError(
-        'Invalid worker tasks concurrency: -1 is a negative integer or zero'
+        'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
       )
     )
     expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrowError(
-      new TypeError('Invalid worker tasks concurrency: must be an integer')
+      new TypeError('Invalid worker node tasks concurrency: must be an integer')
+    )
+    expect(() => pool.setTasksQueueOptions({ queueMaxSize: 0 })).toThrowError(
+      new RangeError(
+        'Invalid worker node tasks queue max size: 0 is a negative integer or zero'
+      )
+    )
+    expect(() => pool.setTasksQueueOptions({ queueMaxSize: -1 })).toThrowError(
+      new RangeError(
+        'Invalid worker node tasks queue max size: -1 is a negative integer or zero'
+      )
+    )
+    expect(() => pool.setTasksQueueOptions({ queueMaxSize: 0.2 })).toThrowError(
+      new TypeError(
+        'Invalid worker node tasks queue max size: must be an integer'
+      )
     )
     await pool.destroy()
   })