perf: track pool back pressure lifecycle via events
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Tue, 27 Aug 2024 15:54:14 +0000 (17:54 +0200)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Tue, 27 Aug 2024 15:54:14 +0000 (17:54 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
src/pools/abstract-pool.ts
src/pools/pool.ts
tests/pools/abstract-pool.test.mjs
tests/pools/cluster/dynamic.test.mjs
tests/pools/cluster/fixed.test.mjs
tests/pools/thread/dynamic.test.mjs
tests/pools/thread/fixed.test.mjs

index 339e22cb2ca6db79d874299377738ed75fa8a29e..848c374df86673da633f2092635b384daeece3c0 100644 (file)
@@ -132,6 +132,11 @@ export abstract class AbstractPool<
     }
   }
 
+  /**
+   * Whether the pool back pressure event has been emitted or not.
+   */
+  private backPressureEventEmitted: boolean
+
   /**
    * Whether the pool is destroying or not.
    */
@@ -473,6 +478,7 @@ export abstract class AbstractPool<
     this.starting = false
     this.destroying = false
     this.readyEventEmitted = false
+    this.backPressureEventEmitted = false
     this.startingMinimumNumberOfWorkers = false
     if (this.opts.startWorkers === true) {
       this.start()
@@ -929,6 +935,13 @@ export abstract class AbstractPool<
     }
   }
 
+  private checkAndEmitTaskDequeuingEvents (): void {
+    if (this.backPressureEventEmitted && !this.backPressure) {
+      this.emitter?.emit(PoolEvents.backPressureEnd, this.info)
+      this.backPressureEventEmitted = false
+    }
+  }
+
   private checkAndEmitTaskExecutionEvents (): void {
     if (this.busy) {
       this.emitter?.emit(PoolEvents.busy, this.info)
@@ -936,8 +949,9 @@ export abstract class AbstractPool<
   }
 
   private checkAndEmitTaskQueuingEvents (): void {
-    if (this.backPressure) {
+    if (!this.backPressureEventEmitted && this.backPressure) {
       this.emitter?.emit(PoolEvents.backPressure, this.info)
+      this.backPressureEventEmitted = true
     }
   }
 
@@ -1091,7 +1105,9 @@ export abstract class AbstractPool<
   }
 
   private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
-    return this.workerNodes[workerNodeKey].dequeueTask()
+    const task = this.workerNodes[workerNodeKey].dequeueTask()
+    this.checkAndEmitTaskDequeuingEvents()
+    return task
   }
 
   private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
@@ -1712,6 +1728,7 @@ export abstract class AbstractPool<
     this.emitter?.emit(PoolEvents.destroy, this.info)
     this.emitter?.emitDestroy()
     this.readyEventEmitted = false
+    this.backPressureEventEmitted = false
     delete this.startTimestamp
     this.destroying = false
     this.started = false
index e4cec7422d31448d618775d7ac916bceb5863764..9ff07c59065ad3233a346edf027ece16a869c0db 100644 (file)
@@ -48,6 +48,7 @@ export type PoolType = keyof typeof PoolTypes
  */
 export const PoolEvents: Readonly<{
   backPressure: 'backPressure'
+  backPressureEnd: 'backPressureEnd'
   busy: 'busy'
   destroy: 'destroy'
   empty: 'empty'
@@ -57,6 +58,7 @@ export const PoolEvents: Readonly<{
   taskError: 'taskError'
 }> = Object.freeze({
   backPressure: 'backPressure',
+  backPressureEnd: 'backPressureEnd',
   busy: 'busy',
   destroy: 'destroy',
   empty: 'empty',
@@ -287,6 +289,7 @@ export interface IPool<
    * - `'error'`: Emitted when an uncaught error occurs.
    * - `'taskError'`: Emitted when an error occurs while executing a task.
    * - `'backPressure'`: Emitted when the number of workers created in the pool has reached the maximum size expected and are back pressured (i.e. their tasks queue is full: queue size \>= maximum queue size).
+   * - `'backPressureEnd'`: Emitted when the number of workers created in the pool has reached the maximum size expected and are no longer back pressured (i.e. their tasks queue is no longer full: queue size \< maximum queue size).
    */
   readonly emitter?: EventEmitterAsyncResource
   /**
index f236a8042645cdb4939f38e28a7ef3d92815f8d1..a9b7d3e3e737d7fef9fdfc7ede5ece8237c63bb3 100644 (file)
@@ -5,7 +5,6 @@ import { EventEmitterAsyncResource } from 'node:events'
 import { readFileSync } from 'node:fs'
 import { dirname, join } from 'node:path'
 import { fileURLToPath } from 'node:url'
-import { restore, stub } from 'sinon'
 
 import { CircularBuffer } from '../../lib/circular-buffer.cjs'
 import {
@@ -38,10 +37,6 @@ describe('Abstract pool test suite', () => {
     }
   }
 
-  afterEach(() => {
-    restore()
-  })
-
   it('Verify that pool can be created and destroyed', async () => {
     const pool = new FixedThreadPool(
       numberOfWorkers,
@@ -933,11 +928,13 @@ describe('Abstract pool test suite', () => {
     expect(pool.info.ready).toBe(false)
     expect(pool.workerNodes).toStrictEqual([])
     expect(pool.readyEventEmitted).toBe(false)
+    expect(pool.backPressureEventEmitted).toBe(false)
     pool.start()
     expect(pool.info.started).toBe(true)
     expect(pool.info.ready).toBe(true)
     await waitPoolEvents(pool, PoolEvents.ready, 1)
     expect(pool.readyEventEmitted).toBe(true)
+    expect(pool.backPressureEventEmitted).toBe(false)
     expect(pool.workerNodes.length).toBe(numberOfWorkers)
     for (const workerNode of pool.workerNodes) {
       expect(workerNode).toBeInstanceOf(WorkerNode)
@@ -1231,7 +1228,7 @@ describe('Abstract pool test suite', () => {
     await pool.destroy()
   })
 
-  it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
+  it("Verify that pool event emitter 'backPressure' and 'backPressureEnd' events can register a callback", async () => {
     const pool = new FixedThreadPool(
       numberOfWorkers,
       './tests/worker-files/thread/testWorker.mjs',
@@ -1239,23 +1236,30 @@ describe('Abstract pool test suite', () => {
         enableTasksQueue: true,
       }
     )
-    const backPressureGetterStub = stub().returns(true)
-    stub(pool, 'backPressure').get(backPressureGetterStub)
     expect(pool.emitter.eventNames()).toStrictEqual([])
     const promises = new Set()
     let poolBackPressure = 0
-    let poolInfo
+    let backPressurePoolInfo
     pool.emitter.on(PoolEvents.backPressure, info => {
       ++poolBackPressure
-      poolInfo = info
+      backPressurePoolInfo = info
     })
-    expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
-    for (let i = 0; i < numberOfWorkers + 1; i++) {
+    let poolBackPressureEnd = 0
+    let backPressureEndPoolInfo
+    pool.emitter.on(PoolEvents.backPressureEnd, info => {
+      ++poolBackPressureEnd
+      backPressureEndPoolInfo = info
+    })
+    expect(pool.emitter.eventNames()).toStrictEqual([
+      PoolEvents.backPressure,
+      PoolEvents.backPressureEnd,
+    ])
+    for (let i = 0; i < numberOfWorkers * 10; i++) {
       promises.add(pool.execute())
     }
     await Promise.all(promises)
     expect(poolBackPressure).toBe(1)
-    expect(poolInfo).toStrictEqual({
+    expect(backPressurePoolInfo).toStrictEqual({
       backPressure: true,
       backPressureWorkerNodes: expect.any(Number),
       busyWorkerNodes: expect.any(Number),
@@ -1278,7 +1282,30 @@ describe('Abstract pool test suite', () => {
       worker: WorkerTypes.thread,
       workerNodes: expect.any(Number),
     })
-    expect(backPressureGetterStub.callCount).toBeGreaterThanOrEqual(7)
+    expect(poolBackPressureEnd).toBe(1)
+    expect(backPressureEndPoolInfo).toStrictEqual({
+      backPressure: false,
+      backPressureWorkerNodes: expect.any(Number),
+      busyWorkerNodes: expect.any(Number),
+      defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
+      executedTasks: expect.any(Number),
+      executingTasks: expect.any(Number),
+      failedTasks: expect.any(Number),
+      idleWorkerNodes: expect.any(Number),
+      maxQueuedTasks: expect.any(Number),
+      maxSize: expect.any(Number),
+      minSize: expect.any(Number),
+      queuedTasks: expect.any(Number),
+      ready: true,
+      started: true,
+      stealingWorkerNodes: expect.any(Number),
+      stolenTasks: expect.any(Number),
+      strategyRetries: expect.any(Number),
+      type: PoolTypes.fixed,
+      version,
+      worker: WorkerTypes.thread,
+      workerNodes: expect.any(Number),
+    })
     await pool.destroy()
   })
 
index d9b33d40823c6035c16577741a99f0b7754a6d29..428706b4b7a6f706f5bbd78edf83afbb7e6e8f18 100644 (file)
@@ -76,11 +76,13 @@ describe('Dynamic cluster pool test suite', () => {
     await pool.destroy()
     const numberOfExitEvents = await exitPromise
     expect(pool.info.started).toBe(false)
+    expect(pool.info.ready).toBe(false)
     expect(pool.emitter.eventNames()).toStrictEqual([
       PoolEvents.busy,
       PoolEvents.destroy,
     ])
     expect(pool.readyEventEmitted).toBe(false)
+    expect(pool.backPressureEventEmitted).toBe(false)
     expect(pool.workerNodes.length).toBe(0)
     expect(numberOfExitEvents).toBe(min)
     expect(poolDestroy).toBe(1)
index 9204e7e5705587a5296d133fbe310a63f700a259..bc34bfd59a20cc29a9238a5b6662bddf96eb206b 100644 (file)
@@ -240,8 +240,10 @@ describe('Fixed cluster pool test suite', () => {
     await pool.destroy()
     const numberOfExitEvents = await exitPromise
     expect(pool.info.started).toBe(false)
+    expect(pool.info.ready).toBe(false)
     expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.destroy])
     expect(pool.readyEventEmitted).toBe(false)
+    expect(pool.backPressureEventEmitted).toBe(false)
     expect(pool.workerNodes.length).toBe(0)
     expect(numberOfExitEvents).toBe(numberOfWorkers)
     expect(poolDestroy).toBe(1)
index 9a7082a50b155f642f72504866f8651f9b74bf6d..c4cc6e11bfb553248cd880c07a98f09c7d1da6e8 100644 (file)
@@ -76,11 +76,13 @@ describe('Dynamic thread pool test suite', () => {
     await pool.destroy()
     const numberOfExitEvents = await exitPromise
     expect(pool.info.started).toBe(false)
+    expect(pool.info.ready).toBe(false)
     expect(pool.emitter.eventNames()).toStrictEqual([
       PoolEvents.busy,
       PoolEvents.destroy,
     ])
     expect(pool.readyEventEmitted).toBe(false)
+    expect(pool.backPressureEventEmitted).toBe(false)
     expect(pool.workerNodes.length).toBe(0)
     expect(numberOfExitEvents).toBe(min)
     expect(poolDestroy).toBe(1)
index 622a5101ec58225dc5dbbcb651bca384ea7d2a27..e0e556704cc8f803664da6efc50b4ebe45db5116 100644 (file)
@@ -270,8 +270,10 @@ describe('Fixed thread pool test suite', () => {
     await pool.destroy()
     const numberOfExitEvents = await exitPromise
     expect(pool.info.started).toBe(false)
+    expect(pool.info.ready).toBe(false)
     expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.destroy])
     expect(pool.readyEventEmitted).toBe(false)
+    expect(pool.backPressureEventEmitted).toBe(false)
     expect(pool.workerNodes.length).toBe(0)
     expect(numberOfExitEvents).toBe(numberOfThreads)
     expect(poolDestroy).toBe(1)