feat: introduce worker node queue back pressure detection
[poolifier.git] / src / pools / abstract-pool.ts
index f89af5acf813afc18eeaaa1b9277039c6692e480..eb2c2f7818a687ac8f8016eb27489895cadfaaee 100644 (file)
@@ -646,14 +646,15 @@ export abstract class AbstractPool<
 
   /** @inheritDoc */
   public listTaskFunctions (): string[] {
-    if (
-      Array.isArray(this.getWorkerInfo(0).taskFunctions) &&
-      (this.getWorkerInfo(0).taskFunctions as string[]).length > 0
-    ) {
-      return this.getWorkerInfo(0).taskFunctions as string[]
-    } else {
-      return []
+    for (const workerNode of this.workerNodes) {
+      if (
+        Array.isArray(workerNode.info.taskFunctions) &&
+        workerNode.info.taskFunctions.length > 0
+      ) {
+        return workerNode.info.taskFunctions
+      }
     }
+    return []
   }
 
   /** @inheritDoc */
@@ -1211,7 +1212,11 @@ export abstract class AbstractPool<
    * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
    */
   private addWorkerNode (worker: Worker): number {
-    const workerNode = new WorkerNode<Worker, Data>(worker, this.worker)
+    const workerNode = new WorkerNode<Worker, Data>(
+      worker,
+      this.worker,
+      this.maxSize
+    )
     // Flag the worker node as ready at pool startup.
     if (this.starting) {
       workerNode.info.ready = true
@@ -1249,6 +1254,15 @@ export abstract class AbstractPool<
   }
 
   private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
+    if (
+      this.opts.enableTasksQueue === true &&
+      this.workerNodes[workerNodeKey].hasBackPressure()
+    ) {
+      this.emitter?.emit(PoolEvents.backPressure, {
+        workerId: this.getWorkerInfo(workerNodeKey).id,
+        ...this.info
+      })
+    }
     return this.workerNodes[workerNodeKey].enqueueTask(task)
   }