From: Jérôme Benoit Date: Tue, 27 Aug 2024 16:56:34 +0000 (+0200) Subject: perf: track pool busy lifecycle via events X-Git-Tag: v4.2.6~2 X-Git-Url: https://git.piment-noir.org/?a=commitdiff_plain;h=f8a57da1a44233d40635aabba944ed036258f8ab;p=poolifier.git perf: track pool busy lifecycle via events Signed-off-by: Jérôme Benoit --- diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 848c374d..b3b1623f 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -137,6 +137,11 @@ export abstract class AbstractPool< */ private backPressureEventEmitted: boolean + /** + * Whether the pool busy event has been emitted or not. + */ + private busyEventEmitted: boolean + /** * Whether the pool is destroying or not. */ @@ -478,6 +483,7 @@ export abstract class AbstractPool< this.starting = false this.destroying = false this.readyEventEmitted = false + this.busyEventEmitted = false this.backPressureEventEmitted = false this.startingMinimumNumberOfWorkers = false if (this.opts.startWorkers === true) { @@ -809,7 +815,9 @@ export abstract class AbstractPool< this.opts.enableTasksQueue === true && this.workerNodes.reduce( (accumulator, workerNode) => - workerNode.info.backPressure ? accumulator + 1 : accumulator, + workerNode.info.ready && workerNode.info.backPressure + ? accumulator + 1 + : accumulator, 0 ) === this.workerNodes.length ) @@ -923,34 +931,50 @@ export abstract class AbstractPool< } private checkAndEmitEmptyEvent (): void { - if (this.empty) { - this.emitter?.emit(PoolEvents.empty, this.info) + if (this.emitter != null && this.empty) { + this.emitter.emit(PoolEvents.empty, this.info) } } private checkAndEmitReadyEvent (): void { - if (!this.readyEventEmitted && this.ready) { - this.emitter?.emit(PoolEvents.ready, this.info) + if (this.emitter != null && !this.readyEventEmitted && this.ready) { + this.emitter.emit(PoolEvents.ready, this.info) this.readyEventEmitted = true } } private checkAndEmitTaskDequeuingEvents (): void { - if (this.backPressureEventEmitted && !this.backPressure) { - this.emitter?.emit(PoolEvents.backPressureEnd, this.info) + if ( + this.emitter != null && + this.backPressureEventEmitted && + !this.backPressure + ) { + this.emitter.emit(PoolEvents.backPressureEnd, this.info) this.backPressureEventEmitted = false } } private checkAndEmitTaskExecutionEvents (): void { - if (this.busy) { - this.emitter?.emit(PoolEvents.busy, this.info) + if (this.emitter != null && !this.busyEventEmitted && this.busy) { + this.emitter.emit(PoolEvents.busy, this.info) + this.busyEventEmitted = true + } + } + + private checkAndEmitTaskExecutionFinishedEvents (): void { + if (this.emitter != null && this.busyEventEmitted && !this.busy) { + this.emitter.emit(PoolEvents.busyEnd, this.info) + this.busyEventEmitted = false } } private checkAndEmitTaskQueuingEvents (): void { - if (!this.backPressureEventEmitted && this.backPressure) { - this.emitter?.emit(PoolEvents.backPressure, this.info) + if ( + this.emitter != null && + !this.backPressureEventEmitted && + this.backPressure + ) { + this.emitter.emit(PoolEvents.backPressure, this.info) this.backPressureEventEmitted = true } } @@ -1181,6 +1205,7 @@ export abstract class AbstractPool< } asyncResource?.emitDestroy() this.afterTaskExecutionHook(workerNodeKey, message) + this.checkAndEmitTaskExecutionFinishedEvents() // eslint-disable-next-line @typescript-eslint/no-non-null-assertion this.promiseResponseMap.delete(taskId!) if (this.opts.enableTasksQueue === true && !this.destroying) { @@ -1725,10 +1750,13 @@ export abstract class AbstractPool< await this.destroyWorkerNode(workerNodeKey) }) ) - this.emitter?.emit(PoolEvents.destroy, this.info) - this.emitter?.emitDestroy() - this.readyEventEmitted = false - this.backPressureEventEmitted = false + if (this.emitter != null) { + this.emitter.emit(PoolEvents.destroy, this.info) + this.emitter.emitDestroy() + this.readyEventEmitted = false + this.busyEventEmitted = false + this.backPressureEventEmitted = false + } delete this.startTimestamp this.destroying = false this.started = false diff --git a/src/pools/cluster/dynamic.ts b/src/pools/cluster/dynamic.ts index bb9cf8d9..afc97c5c 100644 --- a/src/pools/cluster/dynamic.ts +++ b/src/pools/cluster/dynamic.ts @@ -38,8 +38,8 @@ export class DynamicClusterPool< /** @inheritDoc */ protected checkAndEmitDynamicWorkerCreationEvents (): void { - if (this.full) { - this.emitter?.emit(PoolEvents.full, this.info) + if (this.emitter != null && this.full) { + this.emitter.emit(PoolEvents.full, this.info) } } diff --git a/src/pools/pool.ts b/src/pools/pool.ts index 9ff07c59..0469e7dd 100644 --- a/src/pools/pool.ts +++ b/src/pools/pool.ts @@ -50,6 +50,7 @@ export const PoolEvents: Readonly<{ backPressure: 'backPressure' backPressureEnd: 'backPressureEnd' busy: 'busy' + busyEnd: 'busyEnd' destroy: 'destroy' empty: 'empty' error: 'error' @@ -60,6 +61,7 @@ export const PoolEvents: Readonly<{ backPressure: 'backPressure', backPressureEnd: 'backPressureEnd', busy: 'busy', + busyEnd: 'busyEnd', destroy: 'destroy', empty: 'empty', error: 'error', @@ -283,6 +285,7 @@ export interface IPool< * * - `'ready'`: Emitted when the number of workers created in the pool has reached the minimum size expected and are ready. If the pool is dynamic with a minimum number of workers is set to zero, this event is emitted when at least one dynamic worker is ready. * - `'busy'`: Emitted when the number of workers created in the pool has reached the maximum size expected and are executing concurrently their tasks quota. + * - `'busyEnd'`: Emitted when the number of workers created in the pool has reached the maximum size expected and are no longer executing concurrently their tasks quota. * - `'full'`: Emitted when the pool is dynamic and the number of workers created has reached the maximum size expected. * - `'empty'`: Emitted when the pool is dynamic with a minimum number of workers set to zero and the number of workers has reached the minimum size expected. * - `'destroy'`: Emitted when the pool is destroyed. diff --git a/src/pools/thread/dynamic.ts b/src/pools/thread/dynamic.ts index cbf06130..8a62cf64 100644 --- a/src/pools/thread/dynamic.ts +++ b/src/pools/thread/dynamic.ts @@ -38,8 +38,8 @@ export class DynamicThreadPool< /** @inheritDoc */ protected checkAndEmitDynamicWorkerCreationEvents (): void { - if (this.full) { - this.emitter?.emit(PoolEvents.full, this.info) + if (this.emitter != null && this.full) { + this.emitter.emit(PoolEvents.full, this.info) } } diff --git a/tests/pools/abstract-pool.test.mjs b/tests/pools/abstract-pool.test.mjs index a9b7d3e3..255afff8 100644 --- a/tests/pools/abstract-pool.test.mjs +++ b/tests/pools/abstract-pool.test.mjs @@ -928,12 +928,14 @@ describe('Abstract pool test suite', () => { expect(pool.info.ready).toBe(false) expect(pool.workerNodes).toStrictEqual([]) expect(pool.readyEventEmitted).toBe(false) + expect(pool.busyEventEmitted).toBe(false) expect(pool.backPressureEventEmitted).toBe(false) pool.start() expect(pool.info.started).toBe(true) expect(pool.info.ready).toBe(true) await waitPoolEvents(pool, PoolEvents.ready, 1) expect(pool.readyEventEmitted).toBe(true) + expect(pool.busyEventEmitted).toBe(false) expect(pool.backPressureEventEmitted).toBe(false) expect(pool.workerNodes.length).toBe(numberOfWorkers) for (const workerNode of pool.workerNodes) { @@ -1147,7 +1149,7 @@ describe('Abstract pool test suite', () => { await pool.destroy() }) - it("Verify that pool event emitter 'busy' event can register a callback", async () => { + it("Verify that pool event emitter 'busy' and 'busyEnd' events can register a callback", async () => { const pool = new FixedThreadPool( numberOfWorkers, './tests/worker-files/thread/testWorker.mjs' @@ -1155,20 +1157,45 @@ describe('Abstract pool test suite', () => { expect(pool.emitter.eventNames()).toStrictEqual([]) const promises = new Set() let poolBusy = 0 - let poolInfo + let poolBusyInfo pool.emitter.on(PoolEvents.busy, info => { ++poolBusy - poolInfo = info + poolBusyInfo = info + }) + let poolBusyEnd = 0 + let poolBusyEndInfo + pool.emitter.on(PoolEvents.busyEnd, info => { + ++poolBusyEnd + poolBusyEndInfo = info }) - expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy]) + expect(pool.emitter.eventNames()).toStrictEqual([ + PoolEvents.busy, + PoolEvents.busyEnd, + ]) for (let i = 0; i < numberOfWorkers * 2; i++) { promises.add(pool.execute()) } await Promise.all(promises) - // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers. - // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool. - expect(poolBusy).toBe(numberOfWorkers + 1) - expect(poolInfo).toStrictEqual({ + expect(poolBusy).toBe(1) + expect(poolBusyInfo).toStrictEqual({ + busyWorkerNodes: expect.any(Number), + defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN, + executedTasks: expect.any(Number), + executingTasks: expect.any(Number), + failedTasks: expect.any(Number), + idleWorkerNodes: expect.any(Number), + maxSize: expect.any(Number), + minSize: expect.any(Number), + ready: true, + started: true, + strategyRetries: expect.any(Number), + type: PoolTypes.fixed, + version, + worker: WorkerTypes.thread, + workerNodes: expect.any(Number), + }) + expect(poolBusyEnd).toBe(1) + expect(poolBusyEndInfo).toStrictEqual({ busyWorkerNodes: expect.any(Number), defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN, executedTasks: expect.any(Number), @@ -1239,16 +1266,16 @@ describe('Abstract pool test suite', () => { expect(pool.emitter.eventNames()).toStrictEqual([]) const promises = new Set() let poolBackPressure = 0 - let backPressurePoolInfo + let poolBackPressureInfo pool.emitter.on(PoolEvents.backPressure, info => { ++poolBackPressure - backPressurePoolInfo = info + poolBackPressureInfo = info }) let poolBackPressureEnd = 0 - let backPressureEndPoolInfo + let poolBackPressureEndInfo pool.emitter.on(PoolEvents.backPressureEnd, info => { ++poolBackPressureEnd - backPressureEndPoolInfo = info + poolBackPressureEndInfo = info }) expect(pool.emitter.eventNames()).toStrictEqual([ PoolEvents.backPressure, @@ -1259,7 +1286,7 @@ describe('Abstract pool test suite', () => { } await Promise.all(promises) expect(poolBackPressure).toBe(1) - expect(backPressurePoolInfo).toStrictEqual({ + expect(poolBackPressureInfo).toStrictEqual({ backPressure: true, backPressureWorkerNodes: expect.any(Number), busyWorkerNodes: expect.any(Number), @@ -1283,7 +1310,7 @@ describe('Abstract pool test suite', () => { workerNodes: expect.any(Number), }) expect(poolBackPressureEnd).toBe(1) - expect(backPressureEndPoolInfo).toStrictEqual({ + expect(poolBackPressureEndInfo).toStrictEqual({ backPressure: false, backPressureWorkerNodes: expect.any(Number), busyWorkerNodes: expect.any(Number), diff --git a/tests/pools/cluster/dynamic.test.mjs b/tests/pools/cluster/dynamic.test.mjs index 428706b4..f99245c0 100644 --- a/tests/pools/cluster/dynamic.test.mjs +++ b/tests/pools/cluster/dynamic.test.mjs @@ -82,6 +82,7 @@ describe('Dynamic cluster pool test suite', () => { PoolEvents.destroy, ]) expect(pool.readyEventEmitted).toBe(false) + expect(pool.busyEventEmitted).toBe(false) expect(pool.backPressureEventEmitted).toBe(false) expect(pool.workerNodes.length).toBe(0) expect(numberOfExitEvents).toBe(min) diff --git a/tests/pools/cluster/fixed.test.mjs b/tests/pools/cluster/fixed.test.mjs index bc34bfd5..975e6e16 100644 --- a/tests/pools/cluster/fixed.test.mjs +++ b/tests/pools/cluster/fixed.test.mjs @@ -243,6 +243,7 @@ describe('Fixed cluster pool test suite', () => { expect(pool.info.ready).toBe(false) expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.destroy]) expect(pool.readyEventEmitted).toBe(false) + expect(pool.busyEventEmitted).toBe(false) expect(pool.backPressureEventEmitted).toBe(false) expect(pool.workerNodes.length).toBe(0) expect(numberOfExitEvents).toBe(numberOfWorkers) diff --git a/tests/pools/thread/dynamic.test.mjs b/tests/pools/thread/dynamic.test.mjs index c4cc6e11..b80b2168 100644 --- a/tests/pools/thread/dynamic.test.mjs +++ b/tests/pools/thread/dynamic.test.mjs @@ -82,6 +82,7 @@ describe('Dynamic thread pool test suite', () => { PoolEvents.destroy, ]) expect(pool.readyEventEmitted).toBe(false) + expect(pool.busyEventEmitted).toBe(false) expect(pool.backPressureEventEmitted).toBe(false) expect(pool.workerNodes.length).toBe(0) expect(numberOfExitEvents).toBe(min) diff --git a/tests/pools/thread/fixed.test.mjs b/tests/pools/thread/fixed.test.mjs index e0e55670..7473ab54 100644 --- a/tests/pools/thread/fixed.test.mjs +++ b/tests/pools/thread/fixed.test.mjs @@ -273,6 +273,7 @@ describe('Fixed thread pool test suite', () => { expect(pool.info.ready).toBe(false) expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.destroy]) expect(pool.readyEventEmitted).toBe(false) + expect(pool.busyEventEmitted).toBe(false) expect(pool.backPressureEventEmitted).toBe(false) expect(pool.workerNodes.length).toBe(0) expect(numberOfExitEvents).toBe(numberOfThreads)