refactor: cleanup worker node back pressure detection implementation
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Mon, 26 Aug 2024 13:25:54 +0000 (15:25 +0200)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Mon, 26 Aug 2024 13:25:54 +0000 (15:25 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
src/pools/abstract-pool.ts
src/pools/worker-node.ts
src/pools/worker.ts

index 6bee0367b3756b36a8fd0ea450ee60310576b27e..d572bc0fad93e8b452c2e60658faba920aaece81 100644 (file)
@@ -1192,9 +1192,11 @@ export abstract class AbstractPool<
   private hasBackPressure (): boolean {
     return (
       this.opts.enableTasksQueue === true &&
-      this.workerNodes.findIndex(
-        workerNode => !workerNode.hasBackPressure()
-      ) === -1
+      this.workerNodes.reduce(
+        (accumulator, workerNode) =>
+          workerNode.info.backPressure ? accumulator + 1 : accumulator,
+        0
+      ) === this.workerNodes.length
     )
   }
 
index b29601b303b40d5023b9a4afcaaafce782b1dfb4..867640f45fb367d328daf3a4e975f993787b9a7a 100644 (file)
@@ -87,6 +87,14 @@ export class WorkerNode<Worker extends IWorker, Data = unknown>
     }
   }
 
+  /**
+   * Whether the worker node has back pressure (i.e. its tasks queue is full).
+   * @returns `true` if the worker node has back pressure, `false` otherwise.
+   */
+  private hasBackPressure (): boolean {
+    return this.tasksQueue.size >= this.tasksQueueBackPressureSize
+  }
+
   private initTaskFunctionWorkerUsage (name: string): WorkerUsage {
     const getTaskFunctionQueueSize = (): number => {
       let taskFunctionQueueSize = 0
@@ -254,11 +262,6 @@ export class WorkerNode<Worker extends IWorker, Data = unknown>
     return this.taskFunctionsUsage.get(name)
   }
 
-  /** @inheritdoc */
-  public hasBackPressure (): boolean {
-    return this.tasksQueue.size >= this.tasksQueueBackPressureSize
-  }
-
   /** @inheritdoc */
   public registerOnceWorkerEventHandler (
     event: string,
index f67ec2f6501fc927290555018e4d9f1e0ca4b7a0..3c9c65808ed416cae261faa7c0f467900c79149f 100644 (file)
@@ -323,11 +323,6 @@ export interface IWorkerNode<Worker extends IWorker, Data = unknown>
    * @returns The task function worker usage statistics if the task function worker usage statistics are initialized, `undefined` otherwise.
    */
   readonly getTaskFunctionWorkerUsage: (name: string) => undefined | WorkerUsage
-  /**
-   * Whether the worker node has back pressure (i.e. its tasks queue is full).
-   * @returns `true` if the worker node has back pressure, `false` otherwise.
-   */
-  readonly hasBackPressure: () => boolean
   /**
    * Worker info.
    */