fix: fix back pressure detection
[poolifier.git] / src / pools / abstract-pool.ts
index bb5f02d49923c99c4b21dd068b431524e5b8ce38..dceef6decb534f083510d90a947a05861c2ec2df 100644 (file)
@@ -118,7 +118,10 @@ export abstract class AbstractPool<
     this.executeTask = this.executeTask.bind(this)
     this.enqueueTask = this.enqueueTask.bind(this)
     this.dequeueTask = this.dequeueTask.bind(this)
-    this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
+    this.checkAndEmitTaskExecutionEvents =
+      this.checkAndEmitTaskExecutionEvents.bind(this)
+    this.checkAndEmitTaskQueuingEvents =
+      this.checkAndEmitTaskQueuingEvents.bind(this)
 
     if (this.opts.enableEvents === true) {
       this.emitter = new PoolEmitter()
@@ -300,7 +303,7 @@ export abstract class AbstractPool<
       tasksQueueOptions.concurrency <= 0
     ) {
       throw new Error(
-        `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
+        `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}' is a negative integer or zero`
       )
     }
   }
@@ -368,6 +371,9 @@ export abstract class AbstractPool<
           0
         )
       }),
+      ...(this.opts.enableTasksQueue === true && {
+        backPressure: this.hasBackPressure()
+      }),
       failedTasks: this.workerNodes.reduce(
         (accumulator, workerNode) =>
           accumulator + workerNode.usage.tasks.failed,
@@ -733,7 +739,6 @@ export abstract class AbstractPool<
       } else {
         this.enqueueTask(workerNodeKey, task)
       }
-      this.checkAndEmitEvents()
     })
   }
 
@@ -864,7 +869,7 @@ export abstract class AbstractPool<
       workerTaskStatistics.executing < 0
     ) {
       throw new Error(
-        'Worker usage statistics for tasks executing cannot be negative'
+        'Worker usage statistic for tasks executing cannot be negative'
       )
     }
     if (message.taskError == null) {
@@ -1221,7 +1226,7 @@ export abstract class AbstractPool<
     }
   }
 
-  private checkAndEmitEvents (): void {
+  private checkAndEmitTaskExecutionEvents (): void {
     if (this.emitter != null) {
       if (this.busy) {
         this.emitter.emit(PoolEvents.busy, this.info)
@@ -1232,6 +1237,12 @@ export abstract class AbstractPool<
     }
   }
 
+  private checkAndEmitTaskQueuingEvents (): void {
+    if (this.hasBackPressure()) {
+      this.emitter?.emit(PoolEvents.backPressure, this.info)
+    }
+  }
+
   /**
    * Gets the worker information given its worker node key.
    *
@@ -1293,7 +1304,7 @@ export abstract class AbstractPool<
       this.opts.enableTasksQueue === true &&
       this.workerNodes.findIndex(
         (workerNode) => !workerNode.hasBackPressure()
-      ) !== -1
+      ) === -1
     )
   }
 
@@ -1306,13 +1317,12 @@ export abstract class AbstractPool<
   private executeTask (workerNodeKey: number, task: Task<Data>): void {
     this.beforeTaskExecutionHook(workerNodeKey, task)
     this.sendToWorker(workerNodeKey, task, task.transferList)
+    this.checkAndEmitTaskExecutionEvents()
   }
 
   private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
     const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
-    if (this.hasBackPressure()) {
-      this.emitter?.emit(PoolEvents.backPressure, this.info)
-    }
+    this.checkAndEmitTaskQueuingEvents()
     return tasksQueueSize
   }