fix: fix back pressure detection
authorJérôme Benoit <jerome.benoit@sap.com>
Sat, 19 Aug 2023 09:04:10 +0000 (11:04 +0200)
committerJérôme Benoit <jerome.benoit@sap.com>
Sat, 19 Aug 2023 09:04:10 +0000 (11:04 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
CHANGELOG.md
src/pools/abstract-pool.ts
src/pools/pool.ts
src/pools/worker.ts
tests/pools/abstract/abstract-pool.test.js
tests/pools/cluster/fixed.test.js
tests/pools/thread/fixed.test.js

index bc0afc7aeb7fd8828570e1c1085cdf7759b6b23d..ae8fefd89c6e7212003d615f43c68a2def3ee9ca 100644 (file)
@@ -101,7 +101,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
 ### Fixed
 
 - Fix queued tasks redistribution on error task execution starvation.
-- Ensure task queueing per worker condition is untangled from the pool busyness semantic.
+- Ensure tasks queueing per worker condition is untangled from the pool busyness semantic.
 
 ### Changed
 
index 7ed6ab0351f3ad1cf44c6961b1273a2d78fc593f..dceef6decb534f083510d90a947a05861c2ec2df 100644 (file)
@@ -118,7 +118,10 @@ export abstract class AbstractPool<
     this.executeTask = this.executeTask.bind(this)
     this.enqueueTask = this.enqueueTask.bind(this)
     this.dequeueTask = this.dequeueTask.bind(this)
-    this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
+    this.checkAndEmitTaskExecutionEvents =
+      this.checkAndEmitTaskExecutionEvents.bind(this)
+    this.checkAndEmitTaskQueuingEvents =
+      this.checkAndEmitTaskQueuingEvents.bind(this)
 
     if (this.opts.enableEvents === true) {
       this.emitter = new PoolEmitter()
@@ -368,6 +371,9 @@ export abstract class AbstractPool<
           0
         )
       }),
+      ...(this.opts.enableTasksQueue === true && {
+        backPressure: this.hasBackPressure()
+      }),
       failedTasks: this.workerNodes.reduce(
         (accumulator, workerNode) =>
           accumulator + workerNode.usage.tasks.failed,
@@ -733,7 +739,6 @@ export abstract class AbstractPool<
       } else {
         this.enqueueTask(workerNodeKey, task)
       }
-      this.checkAndEmitEvents()
     })
   }
 
@@ -1221,7 +1226,7 @@ export abstract class AbstractPool<
     }
   }
 
-  private checkAndEmitEvents (): void {
+  private checkAndEmitTaskExecutionEvents (): void {
     if (this.emitter != null) {
       if (this.busy) {
         this.emitter.emit(PoolEvents.busy, this.info)
@@ -1229,9 +1234,12 @@ export abstract class AbstractPool<
       if (this.type === PoolTypes.dynamic && this.full) {
         this.emitter.emit(PoolEvents.full, this.info)
       }
-      if (this.hasBackPressure()) {
-        this.emitter.emit(PoolEvents.backPressure, this.info)
-      }
+    }
+  }
+
+  private checkAndEmitTaskQueuingEvents (): void {
+    if (this.hasBackPressure()) {
+      this.emitter?.emit(PoolEvents.backPressure, this.info)
     }
   }
 
@@ -1296,7 +1304,7 @@ export abstract class AbstractPool<
       this.opts.enableTasksQueue === true &&
       this.workerNodes.findIndex(
         (workerNode) => !workerNode.hasBackPressure()
-      ) !== -1
+      ) === -1
     )
   }
 
@@ -1309,10 +1317,13 @@ export abstract class AbstractPool<
   private executeTask (workerNodeKey: number, task: Task<Data>): void {
     this.beforeTaskExecutionHook(workerNodeKey, task)
     this.sendToWorker(workerNodeKey, task, task.transferList)
+    this.checkAndEmitTaskExecutionEvents()
   }
 
   private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
-    return this.workerNodes[workerNodeKey].enqueueTask(task)
+    const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
+    this.checkAndEmitTaskQueuingEvents()
+    return tasksQueueSize
   }
 
   private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
index e65cc115d2d77c0a0590fa5f936043b16dcb38d2..09d2f229c5bbf02baf00172c494efa3d9df23584 100644 (file)
@@ -79,6 +79,7 @@ export interface PoolInfo {
   readonly executingTasks: number
   readonly queuedTasks?: number
   readonly maxQueuedTasks?: number
+  readonly backPressure?: boolean
   readonly failedTasks: number
   readonly runTime?: {
     readonly minimum: number
index d9dd43865bf6d36cb7d4bb6ce808eccf3528a7c8..1d0c4e22e47f073384bdf8c69e3c477ea9cb1d2b 100644 (file)
@@ -229,7 +229,7 @@ export interface IWorkerNode<Worker extends IWorker, Data = unknown> {
    * Enqueue task.
    *
    * @param task - The task to queue.
-   * @returns The task queue size.
+   * @returns The tasks queue size.
    */
   readonly enqueueTask: (task: Task<Data>) => number
   /**
index 0929adc36a60dcfa0422ad6908041484736ee28c..ec552e986003d859f38fe0557d6d8727c23d7ceb 100644 (file)
@@ -765,31 +765,25 @@ describe('Abstract pool test suite', () => {
     await pool.destroy()
   })
 
-  it("Verify that pool event emitter 'full' event can register a callback", async () => {
-    const pool = new DynamicThreadPool(
+  it("Verify that pool event emitter 'ready' event can register a callback", async () => {
+    const pool = new DynamicClusterPool(
       Math.floor(numberOfWorkers / 2),
       numberOfWorkers,
-      './tests/worker-files/thread/testWorker.js'
+      './tests/worker-files/cluster/testWorker.js'
     )
-    const promises = new Set()
-    let poolFull = 0
     let poolInfo
-    pool.emitter.on(PoolEvents.full, (info) => {
-      ++poolFull
+    let poolReady = 0
+    pool.emitter.on(PoolEvents.ready, (info) => {
+      ++poolReady
       poolInfo = info
     })
-    for (let i = 0; i < numberOfWorkers * 2; i++) {
-      promises.add(pool.execute())
-    }
-    await Promise.all(promises)
-    // The `full` event is triggered when the number of submitted tasks at once reach the maximum number of workers in the dynamic pool.
-    // So in total numberOfWorkers * 2 - 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool with min = (max = numberOfWorkers) / 2.
-    expect(poolFull).toBe(numberOfWorkers * 2 - 1)
+    await waitPoolEvents(pool, PoolEvents.ready, 1)
+    expect(poolReady).toBe(1)
     expect(poolInfo).toStrictEqual({
       version,
       type: PoolTypes.dynamic,
-      worker: WorkerTypes.thread,
-      ready: expect.any(Boolean),
+      worker: WorkerTypes.cluster,
+      ready: true,
       strategy: WorkerChoiceStrategies.ROUND_ROBIN,
       minSize: expect.any(Number),
       maxSize: expect.any(Number),
@@ -803,25 +797,30 @@ describe('Abstract pool test suite', () => {
     await pool.destroy()
   })
 
-  it("Verify that pool event emitter 'ready' event can register a callback", async () => {
-    const pool = new DynamicClusterPool(
-      Math.floor(numberOfWorkers / 2),
+  it("Verify that pool event emitter 'busy' event can register a callback", async () => {
+    const pool = new FixedThreadPool(
       numberOfWorkers,
-      './tests/worker-files/cluster/testWorker.js'
+      './tests/worker-files/thread/testWorker.js'
     )
+    const promises = new Set()
+    let poolBusy = 0
     let poolInfo
-    let poolReady = 0
-    pool.emitter.on(PoolEvents.ready, (info) => {
-      ++poolReady
+    pool.emitter.on(PoolEvents.busy, (info) => {
+      ++poolBusy
       poolInfo = info
     })
-    await waitPoolEvents(pool, PoolEvents.ready, 1)
-    expect(poolReady).toBe(1)
+    for (let i = 0; i < numberOfWorkers * 2; i++) {
+      promises.add(pool.execute())
+    }
+    await Promise.all(promises)
+    // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
+    // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
+    expect(poolBusy).toBe(numberOfWorkers + 1)
     expect(poolInfo).toStrictEqual({
       version,
-      type: PoolTypes.dynamic,
-      worker: WorkerTypes.cluster,
-      ready: true,
+      type: PoolTypes.fixed,
+      worker: WorkerTypes.thread,
+      ready: expect.any(Boolean),
       strategy: WorkerChoiceStrategies.ROUND_ROBIN,
       minSize: expect.any(Number),
       maxSize: expect.any(Number),
@@ -835,28 +834,29 @@ describe('Abstract pool test suite', () => {
     await pool.destroy()
   })
 
-  it("Verify that pool event emitter 'busy' event can register a callback", async () => {
-    const pool = new FixedThreadPool(
+  it("Verify that pool event emitter 'full' event can register a callback", async () => {
+    const pool = new DynamicThreadPool(
+      Math.floor(numberOfWorkers / 2),
       numberOfWorkers,
       './tests/worker-files/thread/testWorker.js'
     )
     const promises = new Set()
-    let poolBusy = 0
+    let poolFull = 0
     let poolInfo
-    pool.emitter.on(PoolEvents.busy, (info) => {
-      ++poolBusy
+    pool.emitter.on(PoolEvents.full, (info) => {
+      ++poolFull
       poolInfo = info
     })
     for (let i = 0; i < numberOfWorkers * 2; i++) {
       promises.add(pool.execute())
     }
     await Promise.all(promises)
-    // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
-    // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
-    expect(poolBusy).toBe(numberOfWorkers + 1)
+    // The `full` event is triggered when the number of submitted tasks at once reach the maximum number of workers in the dynamic pool.
+    // So in total numberOfWorkers * 2 - 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool with min = (max = numberOfWorkers) / 2.
+    expect(poolFull).toBe(numberOfWorkers * 2 - 1)
     expect(poolInfo).toStrictEqual({
       version,
-      type: PoolTypes.fixed,
+      type: PoolTypes.dynamic,
       worker: WorkerTypes.thread,
       ready: expect.any(Boolean),
       strategy: WorkerChoiceStrategies.ROUND_ROBIN,
index e563b0ba520c67f2f11b44221e61b88b098c6496..3baaa98ad04831151e433856bdcacee5682338f9 100644 (file)
@@ -134,6 +134,7 @@ describe('Fixed cluster pool test suite', () => {
       numberOfWorkers *
         (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
     )
+    expect(queuePool.info.backPressure).toBe(false)
     await Promise.all(promises)
     for (const workerNode of queuePool.workerNodes) {
       expect(workerNode.usage.tasks.executing).toBe(0)
index 1a372a26f4e66238f59f64a009a69ddc21982ad8..aeebc4a7a623a1151f37f41aaf7f56ddcfcadbc1 100644 (file)
@@ -134,6 +134,7 @@ describe('Fixed thread pool test suite', () => {
       numberOfThreads *
         (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
     )
+    expect(queuePool.info.backPressure).toBe(false)
     await Promise.all(promises)
     for (const workerNode of queuePool.workerNodes) {
       expect(workerNode.usage.tasks.executing).toBe(0)