From a1763c54c962c69b5e02a30c0909724986495fcd Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Sat, 19 Aug 2023 11:04:10 +0200 Subject: [PATCH] fix: fix back pressure detection MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- CHANGELOG.md | 2 +- src/pools/abstract-pool.ts | 27 +++++--- src/pools/pool.ts | 1 + src/pools/worker.ts | 2 +- tests/pools/abstract/abstract-pool.test.js | 74 +++++++++++----------- tests/pools/cluster/fixed.test.js | 1 + tests/pools/thread/fixed.test.js | 1 + 7 files changed, 61 insertions(+), 47 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index bc0afc7a..ae8fefd8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -101,7 +101,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed - Fix queued tasks redistribution on error task execution starvation. -- Ensure task queueing per worker condition is untangled from the pool busyness semantic. +- Ensure tasks queueing per worker condition is untangled from the pool busyness semantic. ### Changed diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 7ed6ab03..dceef6de 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -118,7 +118,10 @@ export abstract class AbstractPool< this.executeTask = this.executeTask.bind(this) this.enqueueTask = this.enqueueTask.bind(this) this.dequeueTask = this.dequeueTask.bind(this) - this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this) + this.checkAndEmitTaskExecutionEvents = + this.checkAndEmitTaskExecutionEvents.bind(this) + this.checkAndEmitTaskQueuingEvents = + this.checkAndEmitTaskQueuingEvents.bind(this) if (this.opts.enableEvents === true) { this.emitter = new PoolEmitter() @@ -368,6 +371,9 @@ export abstract class AbstractPool< 0 ) }), + ...(this.opts.enableTasksQueue === true && { + backPressure: this.hasBackPressure() + }), failedTasks: this.workerNodes.reduce( (accumulator, workerNode) => accumulator + workerNode.usage.tasks.failed, @@ -733,7 +739,6 @@ export abstract class AbstractPool< } else { this.enqueueTask(workerNodeKey, task) } - this.checkAndEmitEvents() }) } @@ -1221,7 +1226,7 @@ export abstract class AbstractPool< } } - private checkAndEmitEvents (): void { + private checkAndEmitTaskExecutionEvents (): void { if (this.emitter != null) { if (this.busy) { this.emitter.emit(PoolEvents.busy, this.info) @@ -1229,9 +1234,12 @@ export abstract class AbstractPool< if (this.type === PoolTypes.dynamic && this.full) { this.emitter.emit(PoolEvents.full, this.info) } - if (this.hasBackPressure()) { - this.emitter.emit(PoolEvents.backPressure, this.info) - } + } + } + + private checkAndEmitTaskQueuingEvents (): void { + if (this.hasBackPressure()) { + this.emitter?.emit(PoolEvents.backPressure, this.info) } } @@ -1296,7 +1304,7 @@ export abstract class AbstractPool< this.opts.enableTasksQueue === true && this.workerNodes.findIndex( (workerNode) => !workerNode.hasBackPressure() - ) !== -1 + ) === -1 ) } @@ -1309,10 +1317,13 @@ export abstract class AbstractPool< private executeTask (workerNodeKey: number, task: Task): void { this.beforeTaskExecutionHook(workerNodeKey, task) this.sendToWorker(workerNodeKey, task, task.transferList) + this.checkAndEmitTaskExecutionEvents() } private enqueueTask (workerNodeKey: number, task: Task): number { - return this.workerNodes[workerNodeKey].enqueueTask(task) + const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task) + this.checkAndEmitTaskQueuingEvents() + return tasksQueueSize } private dequeueTask (workerNodeKey: number): Task | undefined { diff --git a/src/pools/pool.ts b/src/pools/pool.ts index e65cc115..09d2f229 100644 --- a/src/pools/pool.ts +++ b/src/pools/pool.ts @@ -79,6 +79,7 @@ export interface PoolInfo { readonly executingTasks: number readonly queuedTasks?: number readonly maxQueuedTasks?: number + readonly backPressure?: boolean readonly failedTasks: number readonly runTime?: { readonly minimum: number diff --git a/src/pools/worker.ts b/src/pools/worker.ts index d9dd4386..1d0c4e22 100644 --- a/src/pools/worker.ts +++ b/src/pools/worker.ts @@ -229,7 +229,7 @@ export interface IWorkerNode { * Enqueue task. * * @param task - The task to queue. - * @returns The task queue size. + * @returns The tasks queue size. */ readonly enqueueTask: (task: Task) => number /** diff --git a/tests/pools/abstract/abstract-pool.test.js b/tests/pools/abstract/abstract-pool.test.js index 0929adc3..ec552e98 100644 --- a/tests/pools/abstract/abstract-pool.test.js +++ b/tests/pools/abstract/abstract-pool.test.js @@ -765,31 +765,25 @@ describe('Abstract pool test suite', () => { await pool.destroy() }) - it("Verify that pool event emitter 'full' event can register a callback", async () => { - const pool = new DynamicThreadPool( + it("Verify that pool event emitter 'ready' event can register a callback", async () => { + const pool = new DynamicClusterPool( Math.floor(numberOfWorkers / 2), numberOfWorkers, - './tests/worker-files/thread/testWorker.js' + './tests/worker-files/cluster/testWorker.js' ) - const promises = new Set() - let poolFull = 0 let poolInfo - pool.emitter.on(PoolEvents.full, (info) => { - ++poolFull + let poolReady = 0 + pool.emitter.on(PoolEvents.ready, (info) => { + ++poolReady poolInfo = info }) - for (let i = 0; i < numberOfWorkers * 2; i++) { - promises.add(pool.execute()) - } - await Promise.all(promises) - // The `full` event is triggered when the number of submitted tasks at once reach the maximum number of workers in the dynamic pool. - // So in total numberOfWorkers * 2 - 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool with min = (max = numberOfWorkers) / 2. - expect(poolFull).toBe(numberOfWorkers * 2 - 1) + await waitPoolEvents(pool, PoolEvents.ready, 1) + expect(poolReady).toBe(1) expect(poolInfo).toStrictEqual({ version, type: PoolTypes.dynamic, - worker: WorkerTypes.thread, - ready: expect.any(Boolean), + worker: WorkerTypes.cluster, + ready: true, strategy: WorkerChoiceStrategies.ROUND_ROBIN, minSize: expect.any(Number), maxSize: expect.any(Number), @@ -803,25 +797,30 @@ describe('Abstract pool test suite', () => { await pool.destroy() }) - it("Verify that pool event emitter 'ready' event can register a callback", async () => { - const pool = new DynamicClusterPool( - Math.floor(numberOfWorkers / 2), + it("Verify that pool event emitter 'busy' event can register a callback", async () => { + const pool = new FixedThreadPool( numberOfWorkers, - './tests/worker-files/cluster/testWorker.js' + './tests/worker-files/thread/testWorker.js' ) + const promises = new Set() + let poolBusy = 0 let poolInfo - let poolReady = 0 - pool.emitter.on(PoolEvents.ready, (info) => { - ++poolReady + pool.emitter.on(PoolEvents.busy, (info) => { + ++poolBusy poolInfo = info }) - await waitPoolEvents(pool, PoolEvents.ready, 1) - expect(poolReady).toBe(1) + for (let i = 0; i < numberOfWorkers * 2; i++) { + promises.add(pool.execute()) + } + await Promise.all(promises) + // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers. + // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool. + expect(poolBusy).toBe(numberOfWorkers + 1) expect(poolInfo).toStrictEqual({ version, - type: PoolTypes.dynamic, - worker: WorkerTypes.cluster, - ready: true, + type: PoolTypes.fixed, + worker: WorkerTypes.thread, + ready: expect.any(Boolean), strategy: WorkerChoiceStrategies.ROUND_ROBIN, minSize: expect.any(Number), maxSize: expect.any(Number), @@ -835,28 +834,29 @@ describe('Abstract pool test suite', () => { await pool.destroy() }) - it("Verify that pool event emitter 'busy' event can register a callback", async () => { - const pool = new FixedThreadPool( + it("Verify that pool event emitter 'full' event can register a callback", async () => { + const pool = new DynamicThreadPool( + Math.floor(numberOfWorkers / 2), numberOfWorkers, './tests/worker-files/thread/testWorker.js' ) const promises = new Set() - let poolBusy = 0 + let poolFull = 0 let poolInfo - pool.emitter.on(PoolEvents.busy, (info) => { - ++poolBusy + pool.emitter.on(PoolEvents.full, (info) => { + ++poolFull poolInfo = info }) for (let i = 0; i < numberOfWorkers * 2; i++) { promises.add(pool.execute()) } await Promise.all(promises) - // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers. - // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool. - expect(poolBusy).toBe(numberOfWorkers + 1) + // The `full` event is triggered when the number of submitted tasks at once reach the maximum number of workers in the dynamic pool. + // So in total numberOfWorkers * 2 - 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool with min = (max = numberOfWorkers) / 2. + expect(poolFull).toBe(numberOfWorkers * 2 - 1) expect(poolInfo).toStrictEqual({ version, - type: PoolTypes.fixed, + type: PoolTypes.dynamic, worker: WorkerTypes.thread, ready: expect.any(Boolean), strategy: WorkerChoiceStrategies.ROUND_ROBIN, diff --git a/tests/pools/cluster/fixed.test.js b/tests/pools/cluster/fixed.test.js index e563b0ba..3baaa98a 100644 --- a/tests/pools/cluster/fixed.test.js +++ b/tests/pools/cluster/fixed.test.js @@ -134,6 +134,7 @@ describe('Fixed cluster pool test suite', () => { numberOfWorkers * (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency) ) + expect(queuePool.info.backPressure).toBe(false) await Promise.all(promises) for (const workerNode of queuePool.workerNodes) { expect(workerNode.usage.tasks.executing).toBe(0) diff --git a/tests/pools/thread/fixed.test.js b/tests/pools/thread/fixed.test.js index 1a372a26..aeebc4a7 100644 --- a/tests/pools/thread/fixed.test.js +++ b/tests/pools/thread/fixed.test.js @@ -134,6 +134,7 @@ describe('Fixed thread pool test suite', () => { numberOfThreads * (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency) ) + expect(queuePool.info.backPressure).toBe(false) await Promise.all(promises) for (const workerNode of queuePool.workerNodes) { expect(workerNode.usage.tasks.executing).toBe(0) -- 2.34.1