From: Jérôme Benoit Date: Wed, 28 Aug 2024 12:40:34 +0000 (+0200) Subject: perf: track dynamic pool full lifecycle via events X-Git-Tag: v4.2.7~3 X-Git-Url: https://git.piment-noir.org/?a=commitdiff_plain;h=5f9e90500ddf396a1862479540a4784bca043a20;p=poolifier.git perf: track dynamic pool full lifecycle via events Signed-off-by: Jérôme Benoit --- diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index a0c779c2..8d8a750c 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -623,6 +623,11 @@ export abstract class AbstractPool< */ protected abstract checkAndEmitDynamicWorkerCreationEvents (): void + /** + * Emits dynamic worker destruction events. + */ + protected abstract checkAndEmitDynamicWorkerDestructionEvents (): void + /** * Creates a new, completely set up dynamic worker node. * @returns New, completely set up dynamic worker node key. @@ -1395,8 +1400,10 @@ export abstract class AbstractPool< if (workerNodeKey !== -1) { this.workerNodes.splice(workerNodeKey, 1) this.workerChoiceStrategiesContext?.remove(workerNodeKey) + workerNode.info.dynamic && + this.checkAndEmitDynamicWorkerDestructionEvents() + this.checkAndEmitEmptyEvent() } - this.checkAndEmitEmptyEvent() } private resetTaskSequentiallyStolenStatisticsWorkerUsage ( @@ -1758,8 +1765,6 @@ export abstract class AbstractPool< this.emitter.emit(PoolEvents.destroy, this.info) this.emitter.emitDestroy() this.readyEventEmitted = false - this.busyEventEmitted = false - this.backPressureEventEmitted = false } delete this.startTimestamp this.destroying = false diff --git a/src/pools/cluster/dynamic.ts b/src/pools/cluster/dynamic.ts index afc97c5c..acca81ba 100644 --- a/src/pools/cluster/dynamic.ts +++ b/src/pools/cluster/dynamic.ts @@ -16,6 +16,11 @@ export class DynamicClusterPool< Data = unknown, Response = unknown > extends FixedClusterPool { + /** + * Whether the pool full event has been emitted or not. + */ + private fullEventEmitted: boolean + /** * Constructs a new poolifier dynamic cluster pool. * @param min - Minimum number of workers which are always active. @@ -34,12 +39,22 @@ export class DynamicClusterPool< this.minimumNumberOfWorkers, this.maximumNumberOfWorkers ) + this.fullEventEmitted = false } /** @inheritDoc */ protected checkAndEmitDynamicWorkerCreationEvents (): void { - if (this.emitter != null && this.full) { + if (this.emitter != null && !this.fullEventEmitted && this.full) { this.emitter.emit(PoolEvents.full, this.info) + this.fullEventEmitted = true + } + } + + /** @inheritDoc */ + protected checkAndEmitDynamicWorkerDestructionEvents (): void { + if (this.emitter != null && this.fullEventEmitted && !this.full) { + this.emitter.emit(PoolEvents.fullEnd, this.info) + this.fullEventEmitted = false } } diff --git a/src/pools/cluster/fixed.ts b/src/pools/cluster/fixed.ts index 0de2ae63..e4df768e 100644 --- a/src/pools/cluster/fixed.ts +++ b/src/pools/cluster/fixed.ts @@ -43,6 +43,11 @@ export class FixedClusterPool< /* noop */ } + /** @inheritDoc */ + protected checkAndEmitDynamicWorkerDestructionEvents (): void { + /* noop */ + } + /** @inheritDoc */ protected deregisterWorkerMessageListener( workerNodeKey: number, diff --git a/src/pools/pool.ts b/src/pools/pool.ts index 0469e7dd..bb772dc8 100644 --- a/src/pools/pool.ts +++ b/src/pools/pool.ts @@ -55,6 +55,7 @@ export const PoolEvents: Readonly<{ empty: 'empty' error: 'error' full: 'full' + fullEnd: 'fullEnd' ready: 'ready' taskError: 'taskError' }> = Object.freeze({ @@ -66,6 +67,7 @@ export const PoolEvents: Readonly<{ empty: 'empty', error: 'error', full: 'full', + fullEnd: 'fullEnd', ready: 'ready', taskError: 'taskError', } as const) @@ -287,6 +289,7 @@ export interface IPool< * - `'busy'`: Emitted when the number of workers created in the pool has reached the maximum size expected and are executing concurrently their tasks quota. * - `'busyEnd'`: Emitted when the number of workers created in the pool has reached the maximum size expected and are no longer executing concurrently their tasks quota. * - `'full'`: Emitted when the pool is dynamic and the number of workers created has reached the maximum size expected. + * - `'fullEnd'`: Emitted when the pool is dynamic and the number of workers created has no longer reached the maximum size expected. * - `'empty'`: Emitted when the pool is dynamic with a minimum number of workers set to zero and the number of workers has reached the minimum size expected. * - `'destroy'`: Emitted when the pool is destroyed. * - `'error'`: Emitted when an uncaught error occurs. diff --git a/src/pools/thread/dynamic.ts b/src/pools/thread/dynamic.ts index 8a62cf64..8fd4b675 100644 --- a/src/pools/thread/dynamic.ts +++ b/src/pools/thread/dynamic.ts @@ -16,6 +16,11 @@ export class DynamicThreadPool< Data = unknown, Response = unknown > extends FixedThreadPool { + /** + * Whether the pool full event has been emitted or not. + */ + private fullEventEmitted: boolean + /** * Constructs a new poolifier dynamic thread pool. * @param min - Minimum number of threads which are always active. @@ -34,12 +39,22 @@ export class DynamicThreadPool< this.minimumNumberOfWorkers, this.maximumNumberOfWorkers ) + this.fullEventEmitted = false } /** @inheritDoc */ protected checkAndEmitDynamicWorkerCreationEvents (): void { if (this.emitter != null && this.full) { this.emitter.emit(PoolEvents.full, this.info) + this.fullEventEmitted = true + } + } + + /** @inheritDoc */ + protected checkAndEmitDynamicWorkerDestructionEvents (): void { + if (this.emitter != null && this.fullEventEmitted && !this.full) { + this.emitter.emit(PoolEvents.fullEnd, this.info) + this.fullEventEmitted = false } } diff --git a/src/pools/thread/fixed.ts b/src/pools/thread/fixed.ts index 9b89b3fe..612a9edf 100644 --- a/src/pools/thread/fixed.ts +++ b/src/pools/thread/fixed.ts @@ -47,6 +47,11 @@ export class FixedThreadPool< /* noop */ } + /** @inheritDoc */ + protected checkAndEmitDynamicWorkerDestructionEvents (): void { + /* noop */ + } + /** @inheritDoc */ protected deregisterWorkerMessageListener( workerNodeKey: number, diff --git a/tests/pools/abstract-pool.test.mjs b/tests/pools/abstract-pool.test.mjs index 255afff8..77f7dc69 100644 --- a/tests/pools/abstract-pool.test.mjs +++ b/tests/pools/abstract-pool.test.mjs @@ -1136,15 +1136,15 @@ describe('Abstract pool test suite', () => { executingTasks: expect.any(Number), failedTasks: expect.any(Number), idleWorkerNodes: expect.any(Number), - maxSize: expect.any(Number), - minSize: expect.any(Number), + maxSize: numberOfWorkers, + minSize: Math.floor(numberOfWorkers / 2), ready: true, started: true, strategyRetries: expect.any(Number), type: PoolTypes.dynamic, version, worker: WorkerTypes.cluster, - workerNodes: expect.any(Number), + workerNodes: Math.floor(numberOfWorkers / 2), }) await pool.destroy() }) @@ -1184,15 +1184,15 @@ describe('Abstract pool test suite', () => { executingTasks: expect.any(Number), failedTasks: expect.any(Number), idleWorkerNodes: expect.any(Number), - maxSize: expect.any(Number), - minSize: expect.any(Number), + maxSize: numberOfWorkers, + minSize: numberOfWorkers, ready: true, started: true, strategyRetries: expect.any(Number), type: PoolTypes.fixed, version, worker: WorkerTypes.thread, - workerNodes: expect.any(Number), + workerNodes: numberOfWorkers, }) expect(poolBusyEnd).toBe(1) expect(poolBusyEndInfo).toStrictEqual({ @@ -1202,20 +1202,20 @@ describe('Abstract pool test suite', () => { executingTasks: expect.any(Number), failedTasks: expect.any(Number), idleWorkerNodes: expect.any(Number), - maxSize: expect.any(Number), - minSize: expect.any(Number), + maxSize: numberOfWorkers, + minSize: numberOfWorkers, ready: true, started: true, strategyRetries: expect.any(Number), type: PoolTypes.fixed, version, worker: WorkerTypes.thread, - workerNodes: expect.any(Number), + workerNodes: numberOfWorkers, }) await pool.destroy() }) - it("Verify that pool event emitter 'full' event can register a callback", async () => { + it("Verify that pool event emitter 'full' and 'fullEnd' events can register a callback", async () => { const pool = new DynamicClusterPool( Math.floor(numberOfWorkers / 2), numberOfWorkers, @@ -1224,33 +1224,61 @@ describe('Abstract pool test suite', () => { expect(pool.emitter.eventNames()).toStrictEqual([]) const promises = new Set() let poolFull = 0 - let poolInfo + let poolFullInfo pool.emitter.on(PoolEvents.full, info => { ++poolFull - poolInfo = info + poolFullInfo = info }) - expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full]) + let poolFullEnd = 0 + let poolFullEndInfo + pool.emitter.on(PoolEvents.fullEnd, info => { + ++poolFullEnd + poolFullEndInfo = info + }) + expect(pool.emitter.eventNames()).toStrictEqual([ + PoolEvents.full, + PoolEvents.fullEnd, + ]) for (let i = 0; i < numberOfWorkers * 2; i++) { promises.add(pool.execute()) } await Promise.all(promises) expect(poolFull).toBe(1) - expect(poolInfo).toStrictEqual({ + expect(poolFullInfo).toStrictEqual({ busyWorkerNodes: expect.any(Number), defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN, executedTasks: expect.any(Number), executingTasks: expect.any(Number), failedTasks: expect.any(Number), idleWorkerNodes: expect.any(Number), - maxSize: expect.any(Number), - minSize: expect.any(Number), + maxSize: numberOfWorkers, + minSize: Math.floor(numberOfWorkers / 2), ready: true, started: true, strategyRetries: expect.any(Number), type: PoolTypes.dynamic, version, worker: WorkerTypes.cluster, - workerNodes: expect.any(Number), + workerNodes: numberOfWorkers, + }) + await waitPoolEvents(pool, PoolEvents.fullEnd, 1) + expect(poolFullEnd).toBe(1) + expect(poolFullEndInfo).toStrictEqual({ + busyWorkerNodes: expect.any(Number), + defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN, + executedTasks: expect.any(Number), + executingTasks: expect.any(Number), + failedTasks: expect.any(Number), + idleWorkerNodes: expect.any(Number), + maxSize: numberOfWorkers, + minSize: Math.floor(numberOfWorkers / 2), + ready: true, + started: true, + strategyRetries: expect.any(Number), + type: PoolTypes.dynamic, + version, + worker: WorkerTypes.cluster, + workerNodes: Math.floor(numberOfWorkers / 2), }) await pool.destroy() }) @@ -1296,8 +1324,8 @@ describe('Abstract pool test suite', () => { failedTasks: expect.any(Number), idleWorkerNodes: expect.any(Number), maxQueuedTasks: expect.any(Number), - maxSize: expect.any(Number), - minSize: expect.any(Number), + maxSize: numberOfWorkers, + minSize: numberOfWorkers, queuedTasks: expect.any(Number), ready: true, started: true, @@ -1307,7 +1335,7 @@ describe('Abstract pool test suite', () => { type: PoolTypes.fixed, version, worker: WorkerTypes.thread, - workerNodes: expect.any(Number), + workerNodes: numberOfWorkers, }) expect(poolBackPressureEnd).toBe(1) expect(poolBackPressureEndInfo).toStrictEqual({ @@ -1320,8 +1348,8 @@ describe('Abstract pool test suite', () => { failedTasks: expect.any(Number), idleWorkerNodes: expect.any(Number), maxQueuedTasks: expect.any(Number), - maxSize: expect.any(Number), - minSize: expect.any(Number), + maxSize: numberOfWorkers, + minSize: numberOfWorkers, queuedTasks: expect.any(Number), ready: true, started: true, @@ -1331,7 +1359,7 @@ describe('Abstract pool test suite', () => { type: PoolTypes.fixed, version, worker: WorkerTypes.thread, - workerNodes: expect.any(Number), + workerNodes: numberOfWorkers, }) await pool.destroy() }) @@ -1364,15 +1392,15 @@ describe('Abstract pool test suite', () => { executingTasks: expect.any(Number), failedTasks: expect.any(Number), idleWorkerNodes: expect.any(Number), - maxSize: expect.any(Number), - minSize: expect.any(Number), + maxSize: numberOfWorkers, + minSize: 0, ready: true, started: true, strategyRetries: expect.any(Number), type: PoolTypes.dynamic, version, worker: WorkerTypes.cluster, - workerNodes: expect.any(Number), + workerNodes: 0, }) await pool.destroy() }) diff --git a/tests/pools/cluster/dynamic.test.mjs b/tests/pools/cluster/dynamic.test.mjs index f99245c0..1b854a65 100644 --- a/tests/pools/cluster/dynamic.test.mjs +++ b/tests/pools/cluster/dynamic.test.mjs @@ -82,6 +82,7 @@ describe('Dynamic cluster pool test suite', () => { PoolEvents.destroy, ]) expect(pool.readyEventEmitted).toBe(false) + expect(pool.fullEventEmitted).toBe(false) expect(pool.busyEventEmitted).toBe(false) expect(pool.backPressureEventEmitted).toBe(false) expect(pool.workerNodes.length).toBe(0) diff --git a/tests/pools/thread/dynamic.test.mjs b/tests/pools/thread/dynamic.test.mjs index b80b2168..93d8b1f0 100644 --- a/tests/pools/thread/dynamic.test.mjs +++ b/tests/pools/thread/dynamic.test.mjs @@ -82,6 +82,7 @@ describe('Dynamic thread pool test suite', () => { PoolEvents.destroy, ]) expect(pool.readyEventEmitted).toBe(false) + expect(pool.fullEventEmitted).toBe(false) expect(pool.busyEventEmitted).toBe(false) expect(pool.backPressureEventEmitted).toBe(false) expect(pool.workerNodes.length).toBe(0)