fix: fix back pressure event emission semantic
authorJérôme Benoit <jerome.benoit@sap.com>
Fri, 18 Aug 2023 16:59:13 +0000 (18:59 +0200)
committerJérôme Benoit <jerome.benoit@sap.com>
Fri, 18 Aug 2023 16:59:13 +0000 (18:59 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
CHANGELOG.md
src/pools/abstract-pool.ts
src/pools/pool.ts

index 3fb91510df4f0dab9b69005e38252a83b82825c9..74aac8821afdd12907fcb5e78fbbf71820cd0c84 100644 (file)
@@ -15,7 +15,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
 
 ### Added
 
-- Add back pressure detection on the worker node queue. Event `backPressure` is emitted when the worker node queue is full (size > poolMaxSize^2).
+- Add back pressure detection on the worker node queue. Event `backPressure` is emitted all worker node queues are full (size > poolMaxSize^2).
 - Use back pressure detection in worker choice strategies.
 - Add worker choice strategies retries mechanism if no worker is eligible.
 
index 06d09e6b72cc8937a950a3f129e7a97699d4f415..53aad1fdfcb548f56c9b1e87e93e86799b11271d 100644 (file)
@@ -1270,13 +1270,19 @@ export abstract class AbstractPool<
 
   /** @inheritDoc */
   public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
-    if (
+    return (
       this.opts.enableTasksQueue === true &&
       this.workerNodes[workerNodeKey].hasBackPressure()
-    ) {
-      return true
-    }
-    return false
+    )
+  }
+
+  private hasBackPressure (): boolean {
+    return (
+      this.opts.enableTasksQueue === true &&
+      this.workerNodes.findIndex(
+        (workerNode) => !workerNode.hasBackPressure()
+      ) !== -1
+    )
   }
 
   /**
@@ -1292,11 +1298,8 @@ export abstract class AbstractPool<
 
   private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
     const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
-    if (this.hasWorkerNodeBackPressure(workerNodeKey)) {
-      this.emitter?.emit(PoolEvents.backPressure, {
-        workerId: this.getWorkerInfo(workerNodeKey).id,
-        ...this.info
-      })
+    if (this.hasBackPressure()) {
+      this.emitter?.emit(PoolEvents.backPressure, this.info)
     }
     return tasksQueueSize
   }
index c7b878ae05814dec641298ca1630633ebc93b4b9..427dfd1c56f5ff0f60caa61b7aeb8e6c74062935 100644 (file)
@@ -201,7 +201,7 @@ export interface IPool<
    * - '`destroy`': Emitted when the pool is destroyed.
    * - `'error'`: Emitted when an uncaught error occurs.
    * - `'taskError'`: Emitted when an error occurs while executing a task.
-   * - `'backPressure'`: Emitted when a worker node has back pressure (i.e. its tasks queue is full).
+   * - `'backPressure'`: Emitted when all worker nodes have back pressure (i.e. their tasks queue is full).
    */
   readonly emitter?: PoolEmitter
   /**