fix: fix pool back pressure semantic on dynamic pool
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Tue, 27 Aug 2024 13:47:38 +0000 (15:47 +0200)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Tue, 27 Aug 2024 13:47:38 +0000 (15:47 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
src/pools/abstract-pool.ts
src/pools/cluster/dynamic.ts
src/pools/cluster/fixed.ts
src/pools/pool.ts
src/pools/thread/dynamic.ts
src/pools/thread/fixed.ts
src/pools/worker-node.ts
src/pools/worker.ts
tests/pools/abstract-pool.test.mjs

index 15d7076f09d45bb0a2f39266d40de42407c2a95d..339e22cb2ca6db79d874299377738ed75fa8a29e 100644 (file)
@@ -229,7 +229,7 @@ export abstract class AbstractPool<
   ): void => {
     if (
       this.cannotStealTask() ||
-      this.hasBackPressure() ||
+      this.backPressure ||
       (this.info.stealingWorkerNodes ?? 0) >
         Math.round(
           this.workerNodes.length *
@@ -794,6 +794,21 @@ export abstract class AbstractPool<
     return this.workerNodes[workerNodeKey]?.info
   }
 
+  /**
+   * Whether the worker nodes are back pressured or not.
+   * @returns Worker nodes back pressure boolean status.
+   */
+  protected internalBackPressure (): boolean {
+    return (
+      this.opts.enableTasksQueue === true &&
+      this.workerNodes.reduce(
+        (accumulator, workerNode) =>
+          workerNode.info.backPressure ? accumulator + 1 : accumulator,
+        0
+      ) === this.workerNodes.length
+    )
+  }
+
   /**
    * Whether worker nodes are executing concurrently their tasks quota or not.
    * @returns Worker nodes busyness boolean status.
@@ -802,9 +817,9 @@ export abstract class AbstractPool<
     return (
       this.workerNodes.reduce(
         (accumulator, _, workerNodeKey) =>
-          this.isWorkerNodeIdle(workerNodeKey) ? accumulator + 1 : accumulator,
+          this.isWorkerNodeBusy(workerNodeKey) ? accumulator + 1 : accumulator,
         0
-      ) === 0
+      ) === this.workerNodes.length
     )
   }
 
@@ -921,7 +936,7 @@ export abstract class AbstractPool<
   }
 
   private checkAndEmitTaskQueuingEvents (): void {
-    if (this.hasBackPressure()) {
+    if (this.backPressure) {
       this.emitter?.emit(PoolEvents.backPressure, this.info)
     }
   }
@@ -1187,17 +1202,6 @@ export abstract class AbstractPool<
     this.checkAndEmitReadyEvent()
   }
 
-  private hasBackPressure (): boolean {
-    return (
-      this.opts.enableTasksQueue === true &&
-      this.workerNodes.reduce(
-        (accumulator, workerNode) =>
-          workerNode.info.backPressure ? accumulator + 1 : accumulator,
-        0
-      ) === this.workerNodes.length
-    )
-  }
-
   private initEventEmitter (): void {
     this.emitter = new EventEmitterAsyncResource({
       name: `poolifier:${this.type}-${this.worker}-pool`,
@@ -1948,6 +1952,12 @@ export abstract class AbstractPool<
     this.started = true
   }
 
+  /**
+   * Whether the pool is back pressured or not.
+   * @returns The pool back pressure boolean status.
+   */
+  protected abstract get backPressure (): boolean
+
   /**
    * Whether the pool is busy or not.
    * @returns The pool busyness boolean status.
@@ -1959,7 +1969,10 @@ export abstract class AbstractPool<
    * @returns The pool emptiness boolean status.
    */
   protected get empty (): boolean {
-    return this.minimumNumberOfWorkers === 0 && this.workerNodes.length === 0
+    return (
+      this.minimumNumberOfWorkers === 0 &&
+      this.workerNodes.length === this.minimumNumberOfWorkers
+    )
   }
 
   /**
@@ -2045,7 +2058,7 @@ export abstract class AbstractPool<
         ),
       }),
       ...(this.opts.enableTasksQueue === true && {
-        backPressure: this.hasBackPressure(),
+        backPressure: this.backPressure,
       }),
       ...(this.opts.enableTasksQueue === true && {
         stolenTasks: this.workerNodes.reduce(
index f13ebf645dde633a4d4db9dee5be0c0a2c949b6d..bb9cf8d98c2a8c01a2a42004dafe9bd90011ba5d 100644 (file)
@@ -48,6 +48,11 @@ export class DynamicClusterPool<
     return (!this.full && this.internalBusy()) || this.empty
   }
 
+  /** @inheritDoc */
+  protected get backPressure (): boolean {
+    return this.full && this.internalBackPressure()
+  }
+
   /** @inheritDoc */
   protected get busy (): boolean {
     return this.full && this.internalBusy()
index 2a12e08054debaf784d769d1e58f3c071bd57e1a..0de2ae6373dffcb36362f18fc9114fbb127a3cf5 100644 (file)
@@ -100,6 +100,11 @@ export class FixedClusterPool<
     return false
   }
 
+  /** @inheritDoc */
+  protected get backPressure (): boolean {
+    return this.internalBackPressure()
+  }
+
   /** @inheritDoc */
   protected get busy (): boolean {
     return this.internalBusy()
index a9493f9ab1f536219b5da0a31809263d0556a3f3..e4cec7422d31448d618775d7ac916bceb5863764 100644 (file)
@@ -286,7 +286,7 @@ export interface IPool<
    * - `'destroy'`: Emitted when the pool is destroyed.
    * - `'error'`: Emitted when an uncaught error occurs.
    * - `'taskError'`: Emitted when an error occurs while executing a task.
-   * - `'backPressure'`: Emitted when all worker nodes have back pressure (i.e. their tasks queue is full: queue size \>= maximum queue size).
+   * - `'backPressure'`: Emitted when the number of workers created in the pool has reached the maximum size expected and are back pressured (i.e. their tasks queue is full: queue size \>= maximum queue size).
    */
   readonly emitter?: EventEmitterAsyncResource
   /**
index fc4115bef8066e3236ef7550ae5e9e33c1f40204..cbf0613037f1b8334a965fe442c53a24db636de3 100644 (file)
@@ -48,6 +48,11 @@ export class DynamicThreadPool<
     return (!this.full && this.internalBusy()) || this.empty
   }
 
+  /** @inheritDoc */
+  protected get backPressure (): boolean {
+    return this.full && this.internalBackPressure()
+  }
+
   /** @inheritDoc */
   protected get busy (): boolean {
     return this.full && this.internalBusy()
index f65a0cd557fce10e6a99bad38a5f5d605db8631c..9b89b3fee7be8b9622a164f06ed108e71c125d5f 100644 (file)
@@ -120,6 +120,11 @@ export class FixedThreadPool<
     return false
   }
 
+  /** @inheritDoc */
+  protected get backPressure (): boolean {
+    return this.internalBackPressure()
+  }
+
   /** @inheritDoc */
   protected get busy (): boolean {
     return this.internalBusy()
index 867640f45fb367d328daf3a4e975f993787b9a7a..1a1f207b67634d48a71f613a143b9567ded4239c 100644 (file)
@@ -88,8 +88,8 @@ export class WorkerNode<Worker extends IWorker, Data = unknown>
   }
 
   /**
-   * Whether the worker node has back pressure (i.e. its tasks queue is full).
-   * @returns `true` if the worker node has back pressure, `false` otherwise.
+   * Whether the worker node is back pressured or not.
+   * @returns `true` if the worker node is back pressured, `false` otherwise.
    */
   private hasBackPressure (): boolean {
     return this.tasksQueue.size >= this.tasksQueueBackPressureSize
index 3c9c65808ed416cae261faa7c0f467900c79149f..ff8aa6201f4ba645723d3d8fcdd095e4bf5ff53e 100644 (file)
@@ -149,7 +149,7 @@ export type WorkerType = keyof typeof WorkerTypes
 export interface WorkerInfo {
   /**
    * Back pressure flag.
-   * This flag is set to `true` when worker node tasks queue has back pressure.
+   * This flag is set to `true` when worker node tasks queue is back pressured.
    */
   backPressure: boolean
   /**
index f3e544ff581927e7c15a8d362f175928ad54733c..f236a8042645cdb4939f38e28a7ef3d92815f8d1 100644 (file)
@@ -1239,7 +1239,8 @@ describe('Abstract pool test suite', () => {
         enableTasksQueue: true,
       }
     )
-    stub(pool, 'hasBackPressure').returns(true)
+    const backPressureGetterStub = stub().returns(true)
+    stub(pool, 'backPressure').get(backPressureGetterStub)
     expect(pool.emitter.eventNames()).toStrictEqual([])
     const promises = new Set()
     let poolBackPressure = 0
@@ -1277,7 +1278,7 @@ describe('Abstract pool test suite', () => {
       worker: WorkerTypes.thread,
       workerNodes: expect.any(Number),
     })
-    expect(pool.hasBackPressure.callCount).toBeGreaterThanOrEqual(7)
+    expect(backPressureGetterStub.callCount).toBeGreaterThanOrEqual(7)
     await pool.destroy()
   })