feat: add `queueMaxSize` option to tasks queue options
[poolifier.git] / src / pools / abstract-pool.ts
index 289d6b35698303a556ced4eb037abfa2c8bb1d8a..41e7bb24cd4f69653e0e11b7d74b636bbda8a25c 100644 (file)
@@ -295,7 +295,7 @@ export abstract class AbstractPool<
       !Number.isSafeInteger(tasksQueueOptions.concurrency)
     ) {
       throw new TypeError(
-        'Invalid worker tasks concurrency: must be an integer'
+        'Invalid worker node tasks concurrency: must be an integer'
       )
     }
     if (
@@ -303,7 +303,23 @@ export abstract class AbstractPool<
       tasksQueueOptions.concurrency <= 0
     ) {
       throw new RangeError(
-        `Invalid worker tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero`
+        `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero`
+      )
+    }
+    if (
+      tasksQueueOptions?.queueMaxSize != null &&
+      !Number.isSafeInteger(tasksQueueOptions.queueMaxSize)
+    ) {
+      throw new TypeError(
+        'Invalid worker node tasks queue max size: must be an integer'
+      )
+    }
+    if (
+      tasksQueueOptions?.queueMaxSize != null &&
+      tasksQueueOptions.queueMaxSize <= 0
+    ) {
+      throw new RangeError(
+        `Invalid worker node tasks queue max size: ${tasksQueueOptions.queueMaxSize} is a negative integer or zero`
       )
     }
   }
@@ -620,16 +636,29 @@ export abstract class AbstractPool<
       this.checkValidTasksQueueOptions(tasksQueueOptions)
       this.opts.tasksQueueOptions =
         this.buildTasksQueueOptions(tasksQueueOptions)
+      this.setTasksQueueMaxSize(
+        this.opts.tasksQueueOptions.queueMaxSize as number
+      )
     } else if (this.opts.tasksQueueOptions != null) {
       delete this.opts.tasksQueueOptions
     }
   }
 
+  private setTasksQueueMaxSize (queueMaxSize: number): void {
+    for (const workerNode of this.workerNodes) {
+      workerNode.tasksQueueBackPressureSize = queueMaxSize
+    }
+  }
+
   private buildTasksQueueOptions (
     tasksQueueOptions: TasksQueueOptions
   ): TasksQueueOptions {
     return {
-      concurrency: tasksQueueOptions?.concurrency ?? 1
+      ...{
+        queueMaxSize: Math.pow(this.maxSize, 2),
+        concurrency: 1
+      },
+      ...tasksQueueOptions
     }
   }
 
@@ -1292,7 +1321,7 @@ export abstract class AbstractPool<
     const workerNode = new WorkerNode<Worker, Data>(
       worker,
       this.worker,
-      this.maxSize
+      this.opts.tasksQueueOptions?.queueMaxSize ?? Math.pow(this.maxSize, 2)
     )
     // Flag the worker node as ready at pool startup.
     if (this.starting) {