From 303c0db0528190df1f9f726e96c49dbb10f09d27 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Tue, 27 Aug 2024 17:54:14 +0200 Subject: [PATCH] perf: track pool back pressure lifecycle via events MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- src/pools/abstract-pool.ts | 21 ++++++++++- src/pools/pool.ts | 3 ++ tests/pools/abstract-pool.test.mjs | 55 +++++++++++++++++++++------- tests/pools/cluster/dynamic.test.mjs | 2 + tests/pools/cluster/fixed.test.mjs | 2 + tests/pools/thread/dynamic.test.mjs | 2 + tests/pools/thread/fixed.test.mjs | 2 + 7 files changed, 71 insertions(+), 16 deletions(-) diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 339e22cb..848c374d 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -132,6 +132,11 @@ export abstract class AbstractPool< } } + /** + * Whether the pool back pressure event has been emitted or not. + */ + private backPressureEventEmitted: boolean + /** * Whether the pool is destroying or not. */ @@ -473,6 +478,7 @@ export abstract class AbstractPool< this.starting = false this.destroying = false this.readyEventEmitted = false + this.backPressureEventEmitted = false this.startingMinimumNumberOfWorkers = false if (this.opts.startWorkers === true) { this.start() @@ -929,6 +935,13 @@ export abstract class AbstractPool< } } + private checkAndEmitTaskDequeuingEvents (): void { + if (this.backPressureEventEmitted && !this.backPressure) { + this.emitter?.emit(PoolEvents.backPressureEnd, this.info) + this.backPressureEventEmitted = false + } + } + private checkAndEmitTaskExecutionEvents (): void { if (this.busy) { this.emitter?.emit(PoolEvents.busy, this.info) @@ -936,8 +949,9 @@ export abstract class AbstractPool< } private checkAndEmitTaskQueuingEvents (): void { - if (this.backPressure) { + if (!this.backPressureEventEmitted && this.backPressure) { this.emitter?.emit(PoolEvents.backPressure, this.info) + this.backPressureEventEmitted = true } } @@ -1091,7 +1105,9 @@ export abstract class AbstractPool< } private dequeueTask (workerNodeKey: number): Task | undefined { - return this.workerNodes[workerNodeKey].dequeueTask() + const task = this.workerNodes[workerNodeKey].dequeueTask() + this.checkAndEmitTaskDequeuingEvents() + return task } private enqueueTask (workerNodeKey: number, task: Task): number { @@ -1712,6 +1728,7 @@ export abstract class AbstractPool< this.emitter?.emit(PoolEvents.destroy, this.info) this.emitter?.emitDestroy() this.readyEventEmitted = false + this.backPressureEventEmitted = false delete this.startTimestamp this.destroying = false this.started = false diff --git a/src/pools/pool.ts b/src/pools/pool.ts index e4cec742..9ff07c59 100644 --- a/src/pools/pool.ts +++ b/src/pools/pool.ts @@ -48,6 +48,7 @@ export type PoolType = keyof typeof PoolTypes */ export const PoolEvents: Readonly<{ backPressure: 'backPressure' + backPressureEnd: 'backPressureEnd' busy: 'busy' destroy: 'destroy' empty: 'empty' @@ -57,6 +58,7 @@ export const PoolEvents: Readonly<{ taskError: 'taskError' }> = Object.freeze({ backPressure: 'backPressure', + backPressureEnd: 'backPressureEnd', busy: 'busy', destroy: 'destroy', empty: 'empty', @@ -287,6 +289,7 @@ export interface IPool< * - `'error'`: Emitted when an uncaught error occurs. * - `'taskError'`: Emitted when an error occurs while executing a task. * - `'backPressure'`: Emitted when the number of workers created in the pool has reached the maximum size expected and are back pressured (i.e. their tasks queue is full: queue size \>= maximum queue size). + * - `'backPressureEnd'`: Emitted when the number of workers created in the pool has reached the maximum size expected and are no longer back pressured (i.e. their tasks queue is no longer full: queue size \< maximum queue size). */ readonly emitter?: EventEmitterAsyncResource /** diff --git a/tests/pools/abstract-pool.test.mjs b/tests/pools/abstract-pool.test.mjs index f236a804..a9b7d3e3 100644 --- a/tests/pools/abstract-pool.test.mjs +++ b/tests/pools/abstract-pool.test.mjs @@ -5,7 +5,6 @@ import { EventEmitterAsyncResource } from 'node:events' import { readFileSync } from 'node:fs' import { dirname, join } from 'node:path' import { fileURLToPath } from 'node:url' -import { restore, stub } from 'sinon' import { CircularBuffer } from '../../lib/circular-buffer.cjs' import { @@ -38,10 +37,6 @@ describe('Abstract pool test suite', () => { } } - afterEach(() => { - restore() - }) - it('Verify that pool can be created and destroyed', async () => { const pool = new FixedThreadPool( numberOfWorkers, @@ -933,11 +928,13 @@ describe('Abstract pool test suite', () => { expect(pool.info.ready).toBe(false) expect(pool.workerNodes).toStrictEqual([]) expect(pool.readyEventEmitted).toBe(false) + expect(pool.backPressureEventEmitted).toBe(false) pool.start() expect(pool.info.started).toBe(true) expect(pool.info.ready).toBe(true) await waitPoolEvents(pool, PoolEvents.ready, 1) expect(pool.readyEventEmitted).toBe(true) + expect(pool.backPressureEventEmitted).toBe(false) expect(pool.workerNodes.length).toBe(numberOfWorkers) for (const workerNode of pool.workerNodes) { expect(workerNode).toBeInstanceOf(WorkerNode) @@ -1231,7 +1228,7 @@ describe('Abstract pool test suite', () => { await pool.destroy() }) - it("Verify that pool event emitter 'backPressure' event can register a callback", async () => { + it("Verify that pool event emitter 'backPressure' and 'backPressureEnd' events can register a callback", async () => { const pool = new FixedThreadPool( numberOfWorkers, './tests/worker-files/thread/testWorker.mjs', @@ -1239,23 +1236,30 @@ describe('Abstract pool test suite', () => { enableTasksQueue: true, } ) - const backPressureGetterStub = stub().returns(true) - stub(pool, 'backPressure').get(backPressureGetterStub) expect(pool.emitter.eventNames()).toStrictEqual([]) const promises = new Set() let poolBackPressure = 0 - let poolInfo + let backPressurePoolInfo pool.emitter.on(PoolEvents.backPressure, info => { ++poolBackPressure - poolInfo = info + backPressurePoolInfo = info }) - expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure]) - for (let i = 0; i < numberOfWorkers + 1; i++) { + let poolBackPressureEnd = 0 + let backPressureEndPoolInfo + pool.emitter.on(PoolEvents.backPressureEnd, info => { + ++poolBackPressureEnd + backPressureEndPoolInfo = info + }) + expect(pool.emitter.eventNames()).toStrictEqual([ + PoolEvents.backPressure, + PoolEvents.backPressureEnd, + ]) + for (let i = 0; i < numberOfWorkers * 10; i++) { promises.add(pool.execute()) } await Promise.all(promises) expect(poolBackPressure).toBe(1) - expect(poolInfo).toStrictEqual({ + expect(backPressurePoolInfo).toStrictEqual({ backPressure: true, backPressureWorkerNodes: expect.any(Number), busyWorkerNodes: expect.any(Number), @@ -1278,7 +1282,30 @@ describe('Abstract pool test suite', () => { worker: WorkerTypes.thread, workerNodes: expect.any(Number), }) - expect(backPressureGetterStub.callCount).toBeGreaterThanOrEqual(7) + expect(poolBackPressureEnd).toBe(1) + expect(backPressureEndPoolInfo).toStrictEqual({ + backPressure: false, + backPressureWorkerNodes: expect.any(Number), + busyWorkerNodes: expect.any(Number), + defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN, + executedTasks: expect.any(Number), + executingTasks: expect.any(Number), + failedTasks: expect.any(Number), + idleWorkerNodes: expect.any(Number), + maxQueuedTasks: expect.any(Number), + maxSize: expect.any(Number), + minSize: expect.any(Number), + queuedTasks: expect.any(Number), + ready: true, + started: true, + stealingWorkerNodes: expect.any(Number), + stolenTasks: expect.any(Number), + strategyRetries: expect.any(Number), + type: PoolTypes.fixed, + version, + worker: WorkerTypes.thread, + workerNodes: expect.any(Number), + }) await pool.destroy() }) diff --git a/tests/pools/cluster/dynamic.test.mjs b/tests/pools/cluster/dynamic.test.mjs index d9b33d40..428706b4 100644 --- a/tests/pools/cluster/dynamic.test.mjs +++ b/tests/pools/cluster/dynamic.test.mjs @@ -76,11 +76,13 @@ describe('Dynamic cluster pool test suite', () => { await pool.destroy() const numberOfExitEvents = await exitPromise expect(pool.info.started).toBe(false) + expect(pool.info.ready).toBe(false) expect(pool.emitter.eventNames()).toStrictEqual([ PoolEvents.busy, PoolEvents.destroy, ]) expect(pool.readyEventEmitted).toBe(false) + expect(pool.backPressureEventEmitted).toBe(false) expect(pool.workerNodes.length).toBe(0) expect(numberOfExitEvents).toBe(min) expect(poolDestroy).toBe(1) diff --git a/tests/pools/cluster/fixed.test.mjs b/tests/pools/cluster/fixed.test.mjs index 9204e7e5..bc34bfd5 100644 --- a/tests/pools/cluster/fixed.test.mjs +++ b/tests/pools/cluster/fixed.test.mjs @@ -240,8 +240,10 @@ describe('Fixed cluster pool test suite', () => { await pool.destroy() const numberOfExitEvents = await exitPromise expect(pool.info.started).toBe(false) + expect(pool.info.ready).toBe(false) expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.destroy]) expect(pool.readyEventEmitted).toBe(false) + expect(pool.backPressureEventEmitted).toBe(false) expect(pool.workerNodes.length).toBe(0) expect(numberOfExitEvents).toBe(numberOfWorkers) expect(poolDestroy).toBe(1) diff --git a/tests/pools/thread/dynamic.test.mjs b/tests/pools/thread/dynamic.test.mjs index 9a7082a5..c4cc6e11 100644 --- a/tests/pools/thread/dynamic.test.mjs +++ b/tests/pools/thread/dynamic.test.mjs @@ -76,11 +76,13 @@ describe('Dynamic thread pool test suite', () => { await pool.destroy() const numberOfExitEvents = await exitPromise expect(pool.info.started).toBe(false) + expect(pool.info.ready).toBe(false) expect(pool.emitter.eventNames()).toStrictEqual([ PoolEvents.busy, PoolEvents.destroy, ]) expect(pool.readyEventEmitted).toBe(false) + expect(pool.backPressureEventEmitted).toBe(false) expect(pool.workerNodes.length).toBe(0) expect(numberOfExitEvents).toBe(min) expect(poolDestroy).toBe(1) diff --git a/tests/pools/thread/fixed.test.mjs b/tests/pools/thread/fixed.test.mjs index 622a5101..e0e55670 100644 --- a/tests/pools/thread/fixed.test.mjs +++ b/tests/pools/thread/fixed.test.mjs @@ -270,8 +270,10 @@ describe('Fixed thread pool test suite', () => { await pool.destroy() const numberOfExitEvents = await exitPromise expect(pool.info.started).toBe(false) + expect(pool.info.ready).toBe(false) expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.destroy]) expect(pool.readyEventEmitted).toBe(false) + expect(pool.backPressureEventEmitted).toBe(false) expect(pool.workerNodes.length).toBe(0) expect(numberOfExitEvents).toBe(numberOfThreads) expect(poolDestroy).toBe(1) -- 2.34.1