refactor: cleanup worker node back pressure API
authorJérôme Benoit <jerome.benoit@sap.com>
Fri, 18 Aug 2023 12:24:14 +0000 (14:24 +0200)
committerJérôme Benoit <jerome.benoit@sap.com>
Fri, 18 Aug 2023 12:24:14 +0000 (14:24 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
src/pools/abstract-pool.ts
src/pools/pool.ts
src/pools/selection-strategies/abstract-worker-choice-strategy.ts
src/pools/worker-node.ts
src/pools/worker.ts

index eb2c2f7818a687ac8f8016eb27489895cadfaaee..768114e4cafcce5249f04e448240f0e9eb7bc5c6 100644 (file)
@@ -1242,6 +1242,17 @@ export abstract class AbstractPool<
     }
   }
 
+  /** @inheritDoc */
+  public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
+    if (
+      this.opts.enableTasksQueue === true &&
+      this.workerNodes[workerNodeKey].hasBackPressure()
+    ) {
+      return true
+    }
+    return false
+  }
+
   /**
    * Executes the given task on the worker given its worker node key.
    *
@@ -1254,16 +1265,14 @@ export abstract class AbstractPool<
   }
 
   private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
-    if (
-      this.opts.enableTasksQueue === true &&
-      this.workerNodes[workerNodeKey].hasBackPressure()
-    ) {
+    const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
+    if (this.hasWorkerNodeBackPressure(workerNodeKey)) {
       this.emitter?.emit(PoolEvents.backPressure, {
         workerId: this.getWorkerInfo(workerNodeKey).id,
         ...this.info
       })
     }
-    return this.workerNodes[workerNodeKey].enqueueTask(task)
+    return tasksQueueSize
   }
 
   private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
index 6f3d9561a158c7815c007a2a2ebf631ee1e163bc..a0d3a9418cd6fd9632c5dec2ed382f2548822a57 100644 (file)
@@ -180,6 +180,13 @@ export interface IPool<
    * Pool worker nodes.
    */
   readonly workerNodes: Array<IWorkerNode<Worker, Data>>
+  /**
+   * Whether the worker node has back pressure (i.e. its tasks queue is full).
+   *
+   * @param workerNodeKey - The worker node key.
+   * @returns `true` if the worker node has back pressure, `false` otherwise.
+   */
+  readonly hasWorkerNodeBackPressure: (workerNodeKey: number) => boolean
   /**
    * Emitter on which events can be listened to.
    *
index 9c20c5bea63143f3b3e556f2629b635026e32a2a..d4fcd8d0a95bae029d23ef386fd7cbd2d17a7dde 100644 (file)
@@ -114,6 +114,16 @@ export abstract class AbstractWorkerChoiceStrategy<
     return this.pool.workerNodes[workerNodeKey].info.ready
   }
 
+  /**
+   * Whether the worker node has back pressure or not (i.e. its tasks queue is full).
+   *
+   * @param workerNodeKey - The worker node key.
+   * @returns `true` if the worker node has back pressure, `false` otherwise.
+   */
+  protected hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
+    return this.pool.hasWorkerNodeBackPressure(workerNodeKey)
+  }
+
   /**
    * Gets the worker task runtime.
    * If the task statistics require the average runtime, the average runtime is returned.
index 64880fcb9820c098e37fb103bd27b8563073833a..812d14448f6fc4c7b221a74a23958920627f94ac 100644 (file)
@@ -30,7 +30,7 @@ implements IWorkerNode<Worker, Data> {
   public usage: WorkerUsage
   private readonly tasksUsage: Map<string, WorkerUsage>
   private readonly tasksQueue: Queue<Task<Data>>
-  private readonly tasksQueueBackPressureMaxSize: number
+  private readonly tasksQueueBackPressureSize: number
 
   /**
    * Constructs a new worker node.
@@ -48,7 +48,7 @@ implements IWorkerNode<Worker, Data> {
     this.usage = this.initWorkerUsage()
     this.tasksUsage = new Map<string, WorkerUsage>()
     this.tasksQueue = new Queue<Task<Data>>()
-    this.tasksQueueBackPressureMaxSize = Math.pow(poolMaxSize, 2)
+    this.tasksQueueBackPressureSize = Math.pow(poolMaxSize, 2)
   }
 
   /** @inheritdoc */
@@ -82,7 +82,7 @@ implements IWorkerNode<Worker, Data> {
 
   /** @inheritdoc */
   public hasBackPressure (): boolean {
-    return this.tasksQueueSize() >= this.tasksQueueBackPressureMaxSize
+    return this.tasksQueueSize() >= this.tasksQueueBackPressureSize
   }
 
   /** @inheritdoc */
index 3c44f05eae148883c5c83f91304f894d2297e5fc..d6c236591341c2fb73086d62cdb111cecd2a012e 100644 (file)
@@ -243,7 +243,7 @@ export interface IWorkerNode<Worker extends IWorker, Data = unknown> {
    */
   readonly clearTasksQueue: () => void
   /**
-   * Whether the worker node has back pressure.
+   * Whether the worker node has back pressure (i.e. its tasks queue is full).
    *
    * @returns `true` if the worker node has back pressure, `false` otherwise.
    */