feat: introduce worker node queue back pressure detection
[poolifier.git] / src / pools / abstract-pool.ts
index cba92d11f4ed684e8043bb43800cf7351b35a9c0..eb2c2f7818a687ac8f8016eb27489895cadfaaee 100644 (file)
@@ -1212,7 +1212,11 @@ export abstract class AbstractPool<
    * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
    */
   private addWorkerNode (worker: Worker): number {
-    const workerNode = new WorkerNode<Worker, Data>(worker, this.worker)
+    const workerNode = new WorkerNode<Worker, Data>(
+      worker,
+      this.worker,
+      this.maxSize
+    )
     // Flag the worker node as ready at pool startup.
     if (this.starting) {
       workerNode.info.ready = true
@@ -1250,6 +1254,15 @@ export abstract class AbstractPool<
   }
 
   private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
+    if (
+      this.opts.enableTasksQueue === true &&
+      this.workerNodes[workerNodeKey].hasBackPressure()
+    ) {
+      this.emitter?.emit(PoolEvents.backPressure, {
+        workerId: this.getWorkerInfo(workerNodeKey).id,
+        ...this.info
+      })
+    }
     return this.workerNodes[workerNodeKey].enqueueTask(task)
   }