perf: track dynamic pool full lifecycle via events
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Wed, 28 Aug 2024 12:40:34 +0000 (14:40 +0200)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Wed, 28 Aug 2024 12:40:34 +0000 (14:40 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
src/pools/abstract-pool.ts
src/pools/cluster/dynamic.ts
src/pools/cluster/fixed.ts
src/pools/pool.ts
src/pools/thread/dynamic.ts
src/pools/thread/fixed.ts
tests/pools/abstract-pool.test.mjs
tests/pools/cluster/dynamic.test.mjs
tests/pools/thread/dynamic.test.mjs

index a0c779c2aac7d933888a542cda46c09d157fc8ab..8d8a750cda17e5cdf86d1775a519bdb1b8905196 100644 (file)
@@ -623,6 +623,11 @@ export abstract class AbstractPool<
    */
   protected abstract checkAndEmitDynamicWorkerCreationEvents (): void
 
+  /**
+   * Emits dynamic worker destruction events.
+   */
+  protected abstract checkAndEmitDynamicWorkerDestructionEvents (): void
+
   /**
    * Creates a new, completely set up dynamic worker node.
    * @returns New, completely set up dynamic worker node key.
@@ -1395,8 +1400,10 @@ export abstract class AbstractPool<
     if (workerNodeKey !== -1) {
       this.workerNodes.splice(workerNodeKey, 1)
       this.workerChoiceStrategiesContext?.remove(workerNodeKey)
+      workerNode.info.dynamic &&
+        this.checkAndEmitDynamicWorkerDestructionEvents()
+      this.checkAndEmitEmptyEvent()
     }
-    this.checkAndEmitEmptyEvent()
   }
 
   private resetTaskSequentiallyStolenStatisticsWorkerUsage (
@@ -1758,8 +1765,6 @@ export abstract class AbstractPool<
       this.emitter.emit(PoolEvents.destroy, this.info)
       this.emitter.emitDestroy()
       this.readyEventEmitted = false
-      this.busyEventEmitted = false
-      this.backPressureEventEmitted = false
     }
     delete this.startTimestamp
     this.destroying = false
index afc97c5cdf9d20494d313361cd1ad3e93106a1d8..acca81ba8c77c54ada5fd810ef17c2b660b66f99 100644 (file)
@@ -16,6 +16,11 @@ export class DynamicClusterPool<
   Data = unknown,
   Response = unknown
 > extends FixedClusterPool<Data, Response> {
+  /**
+   * Whether the pool full event has been emitted or not.
+   */
+  private fullEventEmitted: boolean
+
   /**
    * Constructs a new poolifier dynamic cluster pool.
    * @param min - Minimum number of workers which are always active.
@@ -34,12 +39,22 @@ export class DynamicClusterPool<
       this.minimumNumberOfWorkers,
       this.maximumNumberOfWorkers
     )
+    this.fullEventEmitted = false
   }
 
   /** @inheritDoc */
   protected checkAndEmitDynamicWorkerCreationEvents (): void {
-    if (this.emitter != null && this.full) {
+    if (this.emitter != null && !this.fullEventEmitted && this.full) {
       this.emitter.emit(PoolEvents.full, this.info)
+      this.fullEventEmitted = true
+    }
+  }
+
+  /** @inheritDoc */
+  protected checkAndEmitDynamicWorkerDestructionEvents (): void {
+    if (this.emitter != null && this.fullEventEmitted && !this.full) {
+      this.emitter.emit(PoolEvents.fullEnd, this.info)
+      this.fullEventEmitted = false
     }
   }
 
index 0de2ae6373dffcb36362f18fc9114fbb127a3cf5..e4df768ec3f3583989a75a3d82b7820610fe2c25 100644 (file)
@@ -43,6 +43,11 @@ export class FixedClusterPool<
     /* noop */
   }
 
+  /** @inheritDoc */
+  protected checkAndEmitDynamicWorkerDestructionEvents (): void {
+    /* noop */
+  }
+
   /** @inheritDoc */
   protected deregisterWorkerMessageListener<Message extends Data | Response>(
     workerNodeKey: number,
index 0469e7ddc3ef27b233918e878c82e4fd108a5761..bb772dc8eb9fa7479c1cab73ae25786066f08d61 100644 (file)
@@ -55,6 +55,7 @@ export const PoolEvents: Readonly<{
   empty: 'empty'
   error: 'error'
   full: 'full'
+  fullEnd: 'fullEnd'
   ready: 'ready'
   taskError: 'taskError'
 }> = Object.freeze({
@@ -66,6 +67,7 @@ export const PoolEvents: Readonly<{
   empty: 'empty',
   error: 'error',
   full: 'full',
+  fullEnd: 'fullEnd',
   ready: 'ready',
   taskError: 'taskError',
 } as const)
@@ -287,6 +289,7 @@ export interface IPool<
    * - `'busy'`: Emitted when the number of workers created in the pool has reached the maximum size expected and are executing concurrently their tasks quota.
    * - `'busyEnd'`: Emitted when the number of workers created in the pool has reached the maximum size expected and are no longer executing concurrently their tasks quota.
    * - `'full'`: Emitted when the pool is dynamic and the number of workers created has reached the maximum size expected.
+   * - `'fullEnd'`: Emitted when the pool is dynamic and the number of workers created has no longer reached the maximum size expected.
    * - `'empty'`: Emitted when the pool is dynamic with a minimum number of workers set to zero and the number of workers has reached the minimum size expected.
    * - `'destroy'`: Emitted when the pool is destroyed.
    * - `'error'`: Emitted when an uncaught error occurs.
index 8a62cf64c78ab90ba654ebb4e071f05b0418ac5e..8fd4b675854f218da49a9d0f071c202a36572a3f 100644 (file)
@@ -16,6 +16,11 @@ export class DynamicThreadPool<
   Data = unknown,
   Response = unknown
 > extends FixedThreadPool<Data, Response> {
+  /**
+   * Whether the pool full event has been emitted or not.
+   */
+  private fullEventEmitted: boolean
+
   /**
    * Constructs a new poolifier dynamic thread pool.
    * @param min - Minimum number of threads which are always active.
@@ -34,12 +39,22 @@ export class DynamicThreadPool<
       this.minimumNumberOfWorkers,
       this.maximumNumberOfWorkers
     )
+    this.fullEventEmitted = false
   }
 
   /** @inheritDoc */
   protected checkAndEmitDynamicWorkerCreationEvents (): void {
     if (this.emitter != null && this.full) {
       this.emitter.emit(PoolEvents.full, this.info)
+      this.fullEventEmitted = true
+    }
+  }
+
+  /** @inheritDoc */
+  protected checkAndEmitDynamicWorkerDestructionEvents (): void {
+    if (this.emitter != null && this.fullEventEmitted && !this.full) {
+      this.emitter.emit(PoolEvents.fullEnd, this.info)
+      this.fullEventEmitted = false
     }
   }
 
index 9b89b3fee7be8b9622a164f06ed108e71c125d5f..612a9edfd1ff345fda1a1894c4eb10876098bd86 100644 (file)
@@ -47,6 +47,11 @@ export class FixedThreadPool<
     /* noop */
   }
 
+  /** @inheritDoc */
+  protected checkAndEmitDynamicWorkerDestructionEvents (): void {
+    /* noop */
+  }
+
   /** @inheritDoc */
   protected deregisterWorkerMessageListener<Message extends Data | Response>(
     workerNodeKey: number,
index 255afff8eafea9130492650d7270c74a6e8b5362..77f7dc69ddf37a0e8f67abb96765db80feefe2e1 100644 (file)
@@ -1136,15 +1136,15 @@ describe('Abstract pool test suite', () => {
       executingTasks: expect.any(Number),
       failedTasks: expect.any(Number),
       idleWorkerNodes: expect.any(Number),
-      maxSize: expect.any(Number),
-      minSize: expect.any(Number),
+      maxSize: numberOfWorkers,
+      minSize: Math.floor(numberOfWorkers / 2),
       ready: true,
       started: true,
       strategyRetries: expect.any(Number),
       type: PoolTypes.dynamic,
       version,
       worker: WorkerTypes.cluster,
-      workerNodes: expect.any(Number),
+      workerNodes: Math.floor(numberOfWorkers / 2),
     })
     await pool.destroy()
   })
@@ -1184,15 +1184,15 @@ describe('Abstract pool test suite', () => {
       executingTasks: expect.any(Number),
       failedTasks: expect.any(Number),
       idleWorkerNodes: expect.any(Number),
-      maxSize: expect.any(Number),
-      minSize: expect.any(Number),
+      maxSize: numberOfWorkers,
+      minSize: numberOfWorkers,
       ready: true,
       started: true,
       strategyRetries: expect.any(Number),
       type: PoolTypes.fixed,
       version,
       worker: WorkerTypes.thread,
-      workerNodes: expect.any(Number),
+      workerNodes: numberOfWorkers,
     })
     expect(poolBusyEnd).toBe(1)
     expect(poolBusyEndInfo).toStrictEqual({
@@ -1202,20 +1202,20 @@ describe('Abstract pool test suite', () => {
       executingTasks: expect.any(Number),
       failedTasks: expect.any(Number),
       idleWorkerNodes: expect.any(Number),
-      maxSize: expect.any(Number),
-      minSize: expect.any(Number),
+      maxSize: numberOfWorkers,
+      minSize: numberOfWorkers,
       ready: true,
       started: true,
       strategyRetries: expect.any(Number),
       type: PoolTypes.fixed,
       version,
       worker: WorkerTypes.thread,
-      workerNodes: expect.any(Number),
+      workerNodes: numberOfWorkers,
     })
     await pool.destroy()
   })
 
-  it("Verify that pool event emitter 'full' event can register a callback", async () => {
+  it("Verify that pool event emitter 'full' and 'fullEnd' events can register a callback", async () => {
     const pool = new DynamicClusterPool(
       Math.floor(numberOfWorkers / 2),
       numberOfWorkers,
@@ -1224,33 +1224,61 @@ describe('Abstract pool test suite', () => {
     expect(pool.emitter.eventNames()).toStrictEqual([])
     const promises = new Set()
     let poolFull = 0
-    let poolInfo
+    let poolFullInfo
     pool.emitter.on(PoolEvents.full, info => {
       ++poolFull
-      poolInfo = info
+      poolFullInfo = info
     })
-    expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
+    let poolFullEnd = 0
+    let poolFullEndInfo
+    pool.emitter.on(PoolEvents.fullEnd, info => {
+      ++poolFullEnd
+      poolFullEndInfo = info
+    })
+    expect(pool.emitter.eventNames()).toStrictEqual([
+      PoolEvents.full,
+      PoolEvents.fullEnd,
+    ])
     for (let i = 0; i < numberOfWorkers * 2; i++) {
       promises.add(pool.execute())
     }
     await Promise.all(promises)
     expect(poolFull).toBe(1)
-    expect(poolInfo).toStrictEqual({
+    expect(poolFullInfo).toStrictEqual({
       busyWorkerNodes: expect.any(Number),
       defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
       executedTasks: expect.any(Number),
       executingTasks: expect.any(Number),
       failedTasks: expect.any(Number),
       idleWorkerNodes: expect.any(Number),
-      maxSize: expect.any(Number),
-      minSize: expect.any(Number),
+      maxSize: numberOfWorkers,
+      minSize: Math.floor(numberOfWorkers / 2),
       ready: true,
       started: true,
       strategyRetries: expect.any(Number),
       type: PoolTypes.dynamic,
       version,
       worker: WorkerTypes.cluster,
-      workerNodes: expect.any(Number),
+      workerNodes: numberOfWorkers,
+    })
+    await waitPoolEvents(pool, PoolEvents.fullEnd, 1)
+    expect(poolFullEnd).toBe(1)
+    expect(poolFullEndInfo).toStrictEqual({
+      busyWorkerNodes: expect.any(Number),
+      defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
+      executedTasks: expect.any(Number),
+      executingTasks: expect.any(Number),
+      failedTasks: expect.any(Number),
+      idleWorkerNodes: expect.any(Number),
+      maxSize: numberOfWorkers,
+      minSize: Math.floor(numberOfWorkers / 2),
+      ready: true,
+      started: true,
+      strategyRetries: expect.any(Number),
+      type: PoolTypes.dynamic,
+      version,
+      worker: WorkerTypes.cluster,
+      workerNodes: Math.floor(numberOfWorkers / 2),
     })
     await pool.destroy()
   })
@@ -1296,8 +1324,8 @@ describe('Abstract pool test suite', () => {
       failedTasks: expect.any(Number),
       idleWorkerNodes: expect.any(Number),
       maxQueuedTasks: expect.any(Number),
-      maxSize: expect.any(Number),
-      minSize: expect.any(Number),
+      maxSize: numberOfWorkers,
+      minSize: numberOfWorkers,
       queuedTasks: expect.any(Number),
       ready: true,
       started: true,
@@ -1307,7 +1335,7 @@ describe('Abstract pool test suite', () => {
       type: PoolTypes.fixed,
       version,
       worker: WorkerTypes.thread,
-      workerNodes: expect.any(Number),
+      workerNodes: numberOfWorkers,
     })
     expect(poolBackPressureEnd).toBe(1)
     expect(poolBackPressureEndInfo).toStrictEqual({
@@ -1320,8 +1348,8 @@ describe('Abstract pool test suite', () => {
       failedTasks: expect.any(Number),
       idleWorkerNodes: expect.any(Number),
       maxQueuedTasks: expect.any(Number),
-      maxSize: expect.any(Number),
-      minSize: expect.any(Number),
+      maxSize: numberOfWorkers,
+      minSize: numberOfWorkers,
       queuedTasks: expect.any(Number),
       ready: true,
       started: true,
@@ -1331,7 +1359,7 @@ describe('Abstract pool test suite', () => {
       type: PoolTypes.fixed,
       version,
       worker: WorkerTypes.thread,
-      workerNodes: expect.any(Number),
+      workerNodes: numberOfWorkers,
     })
     await pool.destroy()
   })
@@ -1364,15 +1392,15 @@ describe('Abstract pool test suite', () => {
       executingTasks: expect.any(Number),
       failedTasks: expect.any(Number),
       idleWorkerNodes: expect.any(Number),
-      maxSize: expect.any(Number),
-      minSize: expect.any(Number),
+      maxSize: numberOfWorkers,
+      minSize: 0,
       ready: true,
       started: true,
       strategyRetries: expect.any(Number),
       type: PoolTypes.dynamic,
       version,
       worker: WorkerTypes.cluster,
-      workerNodes: expect.any(Number),
+      workerNodes: 0,
     })
     await pool.destroy()
   })
index f99245c039d83e4cd5a449554388a23c7fba69c2..1b854a65206717f98a7f0453a1df5abdec9b0ce7 100644 (file)
@@ -82,6 +82,7 @@ describe('Dynamic cluster pool test suite', () => {
       PoolEvents.destroy,
     ])
     expect(pool.readyEventEmitted).toBe(false)
+    expect(pool.fullEventEmitted).toBe(false)
     expect(pool.busyEventEmitted).toBe(false)
     expect(pool.backPressureEventEmitted).toBe(false)
     expect(pool.workerNodes.length).toBe(0)
index b80b216817e6943c0da9099cbf442b8e4fc2a917..93d8b1f0cab6590e60edb061106d4d4b0851463f 100644 (file)
@@ -82,6 +82,7 @@ describe('Dynamic thread pool test suite', () => {
       PoolEvents.destroy,
     ])
     expect(pool.readyEventEmitted).toBe(false)
+    expect(pool.fullEventEmitted).toBe(false)
     expect(pool.busyEventEmitted).toBe(false)
     expect(pool.backPressureEventEmitted).toBe(false)
     expect(pool.workerNodes.length).toBe(0)