perf: track pool busy lifecycle via events
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Tue, 27 Aug 2024 16:56:34 +0000 (18:56 +0200)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Tue, 27 Aug 2024 16:56:34 +0000 (18:56 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
src/pools/abstract-pool.ts
src/pools/cluster/dynamic.ts
src/pools/pool.ts
src/pools/thread/dynamic.ts
tests/pools/abstract-pool.test.mjs
tests/pools/cluster/dynamic.test.mjs
tests/pools/cluster/fixed.test.mjs
tests/pools/thread/dynamic.test.mjs
tests/pools/thread/fixed.test.mjs

index 848c374df86673da633f2092635b384daeece3c0..b3b1623f4490346db6286a15fcdb3d82c9f9aba3 100644 (file)
@@ -137,6 +137,11 @@ export abstract class AbstractPool<
    */
   private backPressureEventEmitted: boolean
 
+  /**
+   * Whether the pool busy event has been emitted or not.
+   */
+  private busyEventEmitted: boolean
+
   /**
    * Whether the pool is destroying or not.
    */
@@ -478,6 +483,7 @@ export abstract class AbstractPool<
     this.starting = false
     this.destroying = false
     this.readyEventEmitted = false
+    this.busyEventEmitted = false
     this.backPressureEventEmitted = false
     this.startingMinimumNumberOfWorkers = false
     if (this.opts.startWorkers === true) {
@@ -809,7 +815,9 @@ export abstract class AbstractPool<
       this.opts.enableTasksQueue === true &&
       this.workerNodes.reduce(
         (accumulator, workerNode) =>
-          workerNode.info.backPressure ? accumulator + 1 : accumulator,
+          workerNode.info.ready && workerNode.info.backPressure
+            ? accumulator + 1
+            : accumulator,
         0
       ) === this.workerNodes.length
     )
@@ -923,34 +931,50 @@ export abstract class AbstractPool<
   }
 
   private checkAndEmitEmptyEvent (): void {
-    if (this.empty) {
-      this.emitter?.emit(PoolEvents.empty, this.info)
+    if (this.emitter != null && this.empty) {
+      this.emitter.emit(PoolEvents.empty, this.info)
     }
   }
 
   private checkAndEmitReadyEvent (): void {
-    if (!this.readyEventEmitted && this.ready) {
-      this.emitter?.emit(PoolEvents.ready, this.info)
+    if (this.emitter != null && !this.readyEventEmitted && this.ready) {
+      this.emitter.emit(PoolEvents.ready, this.info)
       this.readyEventEmitted = true
     }
   }
 
   private checkAndEmitTaskDequeuingEvents (): void {
-    if (this.backPressureEventEmitted && !this.backPressure) {
-      this.emitter?.emit(PoolEvents.backPressureEnd, this.info)
+    if (
+      this.emitter != null &&
+      this.backPressureEventEmitted &&
+      !this.backPressure
+    ) {
+      this.emitter.emit(PoolEvents.backPressureEnd, this.info)
       this.backPressureEventEmitted = false
     }
   }
 
   private checkAndEmitTaskExecutionEvents (): void {
-    if (this.busy) {
-      this.emitter?.emit(PoolEvents.busy, this.info)
+    if (this.emitter != null && !this.busyEventEmitted && this.busy) {
+      this.emitter.emit(PoolEvents.busy, this.info)
+      this.busyEventEmitted = true
+    }
+  }
+
+  private checkAndEmitTaskExecutionFinishedEvents (): void {
+    if (this.emitter != null && this.busyEventEmitted && !this.busy) {
+      this.emitter.emit(PoolEvents.busyEnd, this.info)
+      this.busyEventEmitted = false
     }
   }
 
   private checkAndEmitTaskQueuingEvents (): void {
-    if (!this.backPressureEventEmitted && this.backPressure) {
-      this.emitter?.emit(PoolEvents.backPressure, this.info)
+    if (
+      this.emitter != null &&
+      !this.backPressureEventEmitted &&
+      this.backPressure
+    ) {
+      this.emitter.emit(PoolEvents.backPressure, this.info)
       this.backPressureEventEmitted = true
     }
   }
@@ -1181,6 +1205,7 @@ export abstract class AbstractPool<
       }
       asyncResource?.emitDestroy()
       this.afterTaskExecutionHook(workerNodeKey, message)
+      this.checkAndEmitTaskExecutionFinishedEvents()
       // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
       this.promiseResponseMap.delete(taskId!)
       if (this.opts.enableTasksQueue === true && !this.destroying) {
@@ -1725,10 +1750,13 @@ export abstract class AbstractPool<
         await this.destroyWorkerNode(workerNodeKey)
       })
     )
-    this.emitter?.emit(PoolEvents.destroy, this.info)
-    this.emitter?.emitDestroy()
-    this.readyEventEmitted = false
-    this.backPressureEventEmitted = false
+    if (this.emitter != null) {
+      this.emitter.emit(PoolEvents.destroy, this.info)
+      this.emitter.emitDestroy()
+      this.readyEventEmitted = false
+      this.busyEventEmitted = false
+      this.backPressureEventEmitted = false
+    }
     delete this.startTimestamp
     this.destroying = false
     this.started = false
index bb9cf8d98c2a8c01a2a42004dafe9bd90011ba5d..afc97c5cdf9d20494d313361cd1ad3e93106a1d8 100644 (file)
@@ -38,8 +38,8 @@ export class DynamicClusterPool<
 
   /** @inheritDoc */
   protected checkAndEmitDynamicWorkerCreationEvents (): void {
-    if (this.full) {
-      this.emitter?.emit(PoolEvents.full, this.info)
+    if (this.emitter != null && this.full) {
+      this.emitter.emit(PoolEvents.full, this.info)
     }
   }
 
index 9ff07c59065ad3233a346edf027ece16a869c0db..0469e7ddc3ef27b233918e878c82e4fd108a5761 100644 (file)
@@ -50,6 +50,7 @@ export const PoolEvents: Readonly<{
   backPressure: 'backPressure'
   backPressureEnd: 'backPressureEnd'
   busy: 'busy'
+  busyEnd: 'busyEnd'
   destroy: 'destroy'
   empty: 'empty'
   error: 'error'
@@ -60,6 +61,7 @@ export const PoolEvents: Readonly<{
   backPressure: 'backPressure',
   backPressureEnd: 'backPressureEnd',
   busy: 'busy',
+  busyEnd: 'busyEnd',
   destroy: 'destroy',
   empty: 'empty',
   error: 'error',
@@ -283,6 +285,7 @@ export interface IPool<
    *
    * - `'ready'`: Emitted when the number of workers created in the pool has reached the minimum size expected and are ready. If the pool is dynamic with a minimum number of workers is set to zero, this event is emitted when at least one dynamic worker is ready.
    * - `'busy'`: Emitted when the number of workers created in the pool has reached the maximum size expected and are executing concurrently their tasks quota.
+   * - `'busyEnd'`: Emitted when the number of workers created in the pool has reached the maximum size expected and are no longer executing concurrently their tasks quota.
    * - `'full'`: Emitted when the pool is dynamic and the number of workers created has reached the maximum size expected.
    * - `'empty'`: Emitted when the pool is dynamic with a minimum number of workers set to zero and the number of workers has reached the minimum size expected.
    * - `'destroy'`: Emitted when the pool is destroyed.
index cbf0613037f1b8334a965fe442c53a24db636de3..8a62cf64c78ab90ba654ebb4e071f05b0418ac5e 100644 (file)
@@ -38,8 +38,8 @@ export class DynamicThreadPool<
 
   /** @inheritDoc */
   protected checkAndEmitDynamicWorkerCreationEvents (): void {
-    if (this.full) {
-      this.emitter?.emit(PoolEvents.full, this.info)
+    if (this.emitter != null && this.full) {
+      this.emitter.emit(PoolEvents.full, this.info)
     }
   }
 
index a9b7d3e3e737d7fef9fdfc7ede5ece8237c63bb3..255afff8eafea9130492650d7270c74a6e8b5362 100644 (file)
@@ -928,12 +928,14 @@ describe('Abstract pool test suite', () => {
     expect(pool.info.ready).toBe(false)
     expect(pool.workerNodes).toStrictEqual([])
     expect(pool.readyEventEmitted).toBe(false)
+    expect(pool.busyEventEmitted).toBe(false)
     expect(pool.backPressureEventEmitted).toBe(false)
     pool.start()
     expect(pool.info.started).toBe(true)
     expect(pool.info.ready).toBe(true)
     await waitPoolEvents(pool, PoolEvents.ready, 1)
     expect(pool.readyEventEmitted).toBe(true)
+    expect(pool.busyEventEmitted).toBe(false)
     expect(pool.backPressureEventEmitted).toBe(false)
     expect(pool.workerNodes.length).toBe(numberOfWorkers)
     for (const workerNode of pool.workerNodes) {
@@ -1147,7 +1149,7 @@ describe('Abstract pool test suite', () => {
     await pool.destroy()
   })
 
-  it("Verify that pool event emitter 'busy' event can register a callback", async () => {
+  it("Verify that pool event emitter 'busy' and 'busyEnd' events can register a callback", async () => {
     const pool = new FixedThreadPool(
       numberOfWorkers,
       './tests/worker-files/thread/testWorker.mjs'
@@ -1155,20 +1157,45 @@ describe('Abstract pool test suite', () => {
     expect(pool.emitter.eventNames()).toStrictEqual([])
     const promises = new Set()
     let poolBusy = 0
-    let poolInfo
+    let poolBusyInfo
     pool.emitter.on(PoolEvents.busy, info => {
       ++poolBusy
-      poolInfo = info
+      poolBusyInfo = info
+    })
+    let poolBusyEnd = 0
+    let poolBusyEndInfo
+    pool.emitter.on(PoolEvents.busyEnd, info => {
+      ++poolBusyEnd
+      poolBusyEndInfo = info
     })
-    expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
+    expect(pool.emitter.eventNames()).toStrictEqual([
+      PoolEvents.busy,
+      PoolEvents.busyEnd,
+    ])
     for (let i = 0; i < numberOfWorkers * 2; i++) {
       promises.add(pool.execute())
     }
     await Promise.all(promises)
-    // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
-    // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
-    expect(poolBusy).toBe(numberOfWorkers + 1)
-    expect(poolInfo).toStrictEqual({
+    expect(poolBusy).toBe(1)
+    expect(poolBusyInfo).toStrictEqual({
+      busyWorkerNodes: expect.any(Number),
+      defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
+      executedTasks: expect.any(Number),
+      executingTasks: expect.any(Number),
+      failedTasks: expect.any(Number),
+      idleWorkerNodes: expect.any(Number),
+      maxSize: expect.any(Number),
+      minSize: expect.any(Number),
+      ready: true,
+      started: true,
+      strategyRetries: expect.any(Number),
+      type: PoolTypes.fixed,
+      version,
+      worker: WorkerTypes.thread,
+      workerNodes: expect.any(Number),
+    })
+    expect(poolBusyEnd).toBe(1)
+    expect(poolBusyEndInfo).toStrictEqual({
       busyWorkerNodes: expect.any(Number),
       defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
       executedTasks: expect.any(Number),
@@ -1239,16 +1266,16 @@ describe('Abstract pool test suite', () => {
     expect(pool.emitter.eventNames()).toStrictEqual([])
     const promises = new Set()
     let poolBackPressure = 0
-    let backPressurePoolInfo
+    let poolBackPressureInfo
     pool.emitter.on(PoolEvents.backPressure, info => {
       ++poolBackPressure
-      backPressurePoolInfo = info
+      poolBackPressureInfo = info
     })
     let poolBackPressureEnd = 0
-    let backPressureEndPoolInfo
+    let poolBackPressureEndInfo
     pool.emitter.on(PoolEvents.backPressureEnd, info => {
       ++poolBackPressureEnd
-      backPressureEndPoolInfo = info
+      poolBackPressureEndInfo = info
     })
     expect(pool.emitter.eventNames()).toStrictEqual([
       PoolEvents.backPressure,
@@ -1259,7 +1286,7 @@ describe('Abstract pool test suite', () => {
     }
     await Promise.all(promises)
     expect(poolBackPressure).toBe(1)
-    expect(backPressurePoolInfo).toStrictEqual({
+    expect(poolBackPressureInfo).toStrictEqual({
       backPressure: true,
       backPressureWorkerNodes: expect.any(Number),
       busyWorkerNodes: expect.any(Number),
@@ -1283,7 +1310,7 @@ describe('Abstract pool test suite', () => {
       workerNodes: expect.any(Number),
     })
     expect(poolBackPressureEnd).toBe(1)
-    expect(backPressureEndPoolInfo).toStrictEqual({
+    expect(poolBackPressureEndInfo).toStrictEqual({
       backPressure: false,
       backPressureWorkerNodes: expect.any(Number),
       busyWorkerNodes: expect.any(Number),
index 428706b4b7a6f706f5bbd78edf83afbb7e6e8f18..f99245c039d83e4cd5a449554388a23c7fba69c2 100644 (file)
@@ -82,6 +82,7 @@ describe('Dynamic cluster pool test suite', () => {
       PoolEvents.destroy,
     ])
     expect(pool.readyEventEmitted).toBe(false)
+    expect(pool.busyEventEmitted).toBe(false)
     expect(pool.backPressureEventEmitted).toBe(false)
     expect(pool.workerNodes.length).toBe(0)
     expect(numberOfExitEvents).toBe(min)
index bc34bfd59a20cc29a9238a5b6662bddf96eb206b..975e6e16a60aaf5a46579aad66acf3c9d070e855 100644 (file)
@@ -243,6 +243,7 @@ describe('Fixed cluster pool test suite', () => {
     expect(pool.info.ready).toBe(false)
     expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.destroy])
     expect(pool.readyEventEmitted).toBe(false)
+    expect(pool.busyEventEmitted).toBe(false)
     expect(pool.backPressureEventEmitted).toBe(false)
     expect(pool.workerNodes.length).toBe(0)
     expect(numberOfExitEvents).toBe(numberOfWorkers)
index c4cc6e11bfb553248cd880c07a98f09c7d1da6e8..b80b216817e6943c0da9099cbf442b8e4fc2a917 100644 (file)
@@ -82,6 +82,7 @@ describe('Dynamic thread pool test suite', () => {
       PoolEvents.destroy,
     ])
     expect(pool.readyEventEmitted).toBe(false)
+    expect(pool.busyEventEmitted).toBe(false)
     expect(pool.backPressureEventEmitted).toBe(false)
     expect(pool.workerNodes.length).toBe(0)
     expect(numberOfExitEvents).toBe(min)
index e0e556704cc8f803664da6efc50b4ebe45db5116..7473ab5490150bd52f992fdff3035cd5adc2d907 100644 (file)
@@ -273,6 +273,7 @@ describe('Fixed thread pool test suite', () => {
     expect(pool.info.ready).toBe(false)
     expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.destroy])
     expect(pool.readyEventEmitted).toBe(false)
+    expect(pool.busyEventEmitted).toBe(false)
     expect(pool.backPressureEventEmitted).toBe(false)
     expect(pool.workerNodes.length).toBe(0)
     expect(numberOfExitEvents).toBe(numberOfThreads)