From 20c6f652269cbc14fea53910c8066900f56c2f25 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Sun, 20 Aug 2023 16:44:12 +0200 Subject: [PATCH] feat: add `queueMaxSize` option to tasks queue options MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- CHANGELOG.md | 4 ++ docs/api.md | 3 +- src/pools/abstract-pool.ts | 37 ++++++++++++++-- src/pools/pool.ts | 26 ++++++----- src/pools/worker-node.ts | 21 +++++---- src/pools/worker.ts | 5 +++ tests/pools/abstract/abstract-pool.test.js | 50 +++++++++++++++++----- 7 files changed, 113 insertions(+), 33 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a9c20334..d2f3b38b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- Add `queueMaxSize` option to tasks queue options. + ## [2.6.31] - 2023-08-20 ### Fixed diff --git a/docs/api.md b/docs/api.md index 62d694b6..0474e2aa 100644 --- a/docs/api.md +++ b/docs/api.md @@ -93,9 +93,10 @@ An object with these properties: - `tasksQueueOptions` (optional) - The worker tasks queue options object to use in this pool. Properties: + - `queueMaxSize` (optional) - The maximum number of tasks that can be queued on a worker before flagging it as back pressured. It must be a positive integer. - `concurrency` (optional) - The maximum number of tasks that can be executed concurrently on a worker. It must be a positive integer. - Default: `{ concurrency: 1 }` + Default: `{ queueMaxSize: (pool maximum size)^2, concurrency: 1 }` #### `ThreadPoolOptions extends PoolOptions` diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 289d6b35..41e7bb24 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -295,7 +295,7 @@ export abstract class AbstractPool< !Number.isSafeInteger(tasksQueueOptions.concurrency) ) { throw new TypeError( - 'Invalid worker tasks concurrency: must be an integer' + 'Invalid worker node tasks concurrency: must be an integer' ) } if ( @@ -303,7 +303,23 @@ export abstract class AbstractPool< tasksQueueOptions.concurrency <= 0 ) { throw new RangeError( - `Invalid worker tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero` + `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero` + ) + } + if ( + tasksQueueOptions?.queueMaxSize != null && + !Number.isSafeInteger(tasksQueueOptions.queueMaxSize) + ) { + throw new TypeError( + 'Invalid worker node tasks queue max size: must be an integer' + ) + } + if ( + tasksQueueOptions?.queueMaxSize != null && + tasksQueueOptions.queueMaxSize <= 0 + ) { + throw new RangeError( + `Invalid worker node tasks queue max size: ${tasksQueueOptions.queueMaxSize} is a negative integer or zero` ) } } @@ -620,16 +636,29 @@ export abstract class AbstractPool< this.checkValidTasksQueueOptions(tasksQueueOptions) this.opts.tasksQueueOptions = this.buildTasksQueueOptions(tasksQueueOptions) + this.setTasksQueueMaxSize( + this.opts.tasksQueueOptions.queueMaxSize as number + ) } else if (this.opts.tasksQueueOptions != null) { delete this.opts.tasksQueueOptions } } + private setTasksQueueMaxSize (queueMaxSize: number): void { + for (const workerNode of this.workerNodes) { + workerNode.tasksQueueBackPressureSize = queueMaxSize + } + } + private buildTasksQueueOptions ( tasksQueueOptions: TasksQueueOptions ): TasksQueueOptions { return { - concurrency: tasksQueueOptions?.concurrency ?? 1 + ...{ + queueMaxSize: Math.pow(this.maxSize, 2), + concurrency: 1 + }, + ...tasksQueueOptions } } @@ -1292,7 +1321,7 @@ export abstract class AbstractPool< const workerNode = new WorkerNode( worker, this.worker, - this.maxSize + this.opts.tasksQueueOptions?.queueMaxSize ?? Math.pow(this.maxSize, 2) ) // Flag the worker node as ready at pool startup. if (this.starting) { diff --git a/src/pools/pool.ts b/src/pools/pool.ts index 09d2f229..c8aedce7 100644 --- a/src/pools/pool.ts +++ b/src/pools/pool.ts @@ -96,11 +96,17 @@ export interface PoolInfo { } /** - * Worker tasks queue options. + * Worker node tasks queue options. */ export interface TasksQueueOptions { /** - * Maximum number of tasks that can be executed concurrently on a worker. + * Maximum tasks queue size per worker node flagging it as back pressured. + * + * @defaultValue (pool maximum size)^2 + */ + readonly queueMaxSize?: number + /** + * Maximum number of tasks that can be executed concurrently on a worker node. * * @defaultValue 1 */ @@ -150,13 +156,13 @@ export interface PoolOptions { */ enableEvents?: boolean /** - * Pool worker tasks queue. + * Pool worker node tasks queue. * * @defaultValue false */ enableTasksQueue?: boolean /** - * Pool worker tasks queue options. + * Pool worker node tasks queue options. */ tasksQueueOptions?: TasksQueueOptions } @@ -202,7 +208,7 @@ export interface IPool< * - '`destroy`': Emitted when the pool is destroyed. * - `'error'`: Emitted when an uncaught error occurs. * - `'taskError'`: Emitted when an error occurs while executing a task. - * - `'backPressure'`: Emitted when all worker nodes have back pressure (i.e. their tasks queue is full: queue size \>= pool maximum size^2). + * - `'backPressure'`: Emitted when all worker nodes have back pressure (i.e. their tasks queue is full: queue size \>= max queue size). */ readonly emitter?: PoolEmitter /** @@ -247,19 +253,19 @@ export interface IPool< workerChoiceStrategyOptions: WorkerChoiceStrategyOptions ) => void /** - * Enables/disables the worker tasks queue in this pool. + * Enables/disables the worker node tasks queue in this pool. * - * @param enable - Whether to enable or disable the worker tasks queue. - * @param tasksQueueOptions - The worker tasks queue options. + * @param enable - Whether to enable or disable the worker node tasks queue. + * @param tasksQueueOptions - The worker node tasks queue options. */ readonly enableTasksQueue: ( enable: boolean, tasksQueueOptions?: TasksQueueOptions ) => void /** - * Sets the worker tasks queue options in this pool. + * Sets the worker node tasks queue options in this pool. * - * @param tasksQueueOptions - The worker tasks queue options. + * @param tasksQueueOptions - The worker node tasks queue options. */ readonly setTasksQueueOptions: (tasksQueueOptions: TasksQueueOptions) => void } diff --git a/src/pools/worker-node.ts b/src/pools/worker-node.ts index 82e27688..545ee7e8 100644 --- a/src/pools/worker-node.ts +++ b/src/pools/worker-node.ts @@ -28,18 +28,23 @@ implements IWorkerNode { public messageChannel?: MessageChannel /** @inheritdoc */ public usage: WorkerUsage + /** @inheritdoc */ + public tasksQueueBackPressureSize: number private readonly taskFunctionsUsage: Map private readonly tasksQueue: Queue> - private readonly tasksQueueBackPressureSize: number /** * Constructs a new worker node. * * @param worker - The worker. * @param workerType - The worker type. - * @param poolMaxSize - The pool maximum size. + * @param tasksQueueBackPressureSize - The tasks queue back pressure size. */ - constructor (worker: Worker, workerType: WorkerType, poolMaxSize: number) { + constructor ( + worker: Worker, + workerType: WorkerType, + tasksQueueBackPressureSize: number + ) { if (worker == null) { throw new TypeError('Cannot construct a worker node without a worker') } @@ -48,14 +53,14 @@ implements IWorkerNode { 'Cannot construct a worker node without a worker type' ) } - if (poolMaxSize == null) { + if (tasksQueueBackPressureSize == null) { throw new TypeError( - 'Cannot construct a worker node without a pool maximum size' + 'Cannot construct a worker node without a tasks queue back pressure size' ) } - if (!Number.isSafeInteger(poolMaxSize)) { + if (!Number.isSafeInteger(tasksQueueBackPressureSize)) { throw new TypeError( - 'Cannot construct a worker node with a pool maximum size that is not an integer' + 'Cannot construct a worker node with a tasks queue back pressure size that is not an integer' ) } this.worker = worker @@ -66,7 +71,7 @@ implements IWorkerNode { this.usage = this.initWorkerUsage() this.taskFunctionsUsage = new Map() this.tasksQueue = new Queue>() - this.tasksQueueBackPressureSize = Math.pow(poolMaxSize, 2) + this.tasksQueueBackPressureSize = tasksQueueBackPressureSize } /** @inheritdoc */ diff --git a/src/pools/worker.ts b/src/pools/worker.ts index e4a31e39..e6dd0fae 100644 --- a/src/pools/worker.ts +++ b/src/pools/worker.ts @@ -219,6 +219,11 @@ export interface IWorkerNode { * Worker usage statistics. */ usage: WorkerUsage + /** + * Tasks queue back pressure size. + * This is the number of tasks that can be enqueued before the worker node has back pressure. + */ + tasksQueueBackPressureSize: number /** * Tasks queue size. * diff --git a/tests/pools/abstract/abstract-pool.test.js b/tests/pools/abstract/abstract-pool.test.js index 867c1415..0ffc9e12 100644 --- a/tests/pools/abstract/abstract-pool.test.js +++ b/tests/pools/abstract/abstract-pool.test.js @@ -212,7 +212,10 @@ describe('Abstract pool test suite', () => { expect(pool.opts.enableEvents).toBe(false) expect(pool.opts.restartWorkerOnError).toBe(false) expect(pool.opts.enableTasksQueue).toBe(true) - expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 }) + expect(pool.opts.tasksQueueOptions).toStrictEqual({ + concurrency: 2, + queueMaxSize: 4 + }) expect(pool.opts.workerChoiceStrategy).toBe( WorkerChoiceStrategies.LEAST_USED ) @@ -290,7 +293,7 @@ describe('Abstract pool test suite', () => { ) ).toThrowError( new RangeError( - 'Invalid worker tasks concurrency: 0 is a negative integer or zero' + 'Invalid worker node tasks concurrency: 0 is a negative integer or zero' ) ) expect( @@ -317,7 +320,7 @@ describe('Abstract pool test suite', () => { } ) ).toThrowError( - new TypeError('Invalid worker tasks concurrency: must be an integer') + new TypeError('Invalid worker node tasks concurrency: must be an integer') ) }) @@ -488,10 +491,16 @@ describe('Abstract pool test suite', () => { expect(pool.opts.tasksQueueOptions).toBeUndefined() pool.enableTasksQueue(true) expect(pool.opts.enableTasksQueue).toBe(true) - expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1 }) + expect(pool.opts.tasksQueueOptions).toStrictEqual({ + concurrency: 1, + queueMaxSize: 4 + }) pool.enableTasksQueue(true, { concurrency: 2 }) expect(pool.opts.enableTasksQueue).toBe(true) - expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 }) + expect(pool.opts.tasksQueueOptions).toStrictEqual({ + concurrency: 2, + queueMaxSize: 4 + }) pool.enableTasksQueue(false) expect(pool.opts.enableTasksQueue).toBe(false) expect(pool.opts.tasksQueueOptions).toBeUndefined() @@ -504,9 +513,15 @@ describe('Abstract pool test suite', () => { './tests/worker-files/thread/testWorker.js', { enableTasksQueue: true } ) - expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1 }) + expect(pool.opts.tasksQueueOptions).toStrictEqual({ + concurrency: 1, + queueMaxSize: 4 + }) pool.setTasksQueueOptions({ concurrency: 2 }) - expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 }) + expect(pool.opts.tasksQueueOptions).toStrictEqual({ + concurrency: 2, + queueMaxSize: 4 + }) expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions') ).toThrowError( @@ -514,16 +529,31 @@ describe('Abstract pool test suite', () => { ) expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrowError( new RangeError( - 'Invalid worker tasks concurrency: 0 is a negative integer or zero' + 'Invalid worker node tasks concurrency: 0 is a negative integer or zero' ) ) expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrowError( new RangeError( - 'Invalid worker tasks concurrency: -1 is a negative integer or zero' + 'Invalid worker node tasks concurrency: -1 is a negative integer or zero' ) ) expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrowError( - new TypeError('Invalid worker tasks concurrency: must be an integer') + new TypeError('Invalid worker node tasks concurrency: must be an integer') + ) + expect(() => pool.setTasksQueueOptions({ queueMaxSize: 0 })).toThrowError( + new RangeError( + 'Invalid worker node tasks queue max size: 0 is a negative integer or zero' + ) + ) + expect(() => pool.setTasksQueueOptions({ queueMaxSize: -1 })).toThrowError( + new RangeError( + 'Invalid worker node tasks queue max size: -1 is a negative integer or zero' + ) + ) + expect(() => pool.setTasksQueueOptions({ queueMaxSize: 0.2 })).toThrowError( + new TypeError( + 'Invalid worker node tasks queue max size: must be an integer' + ) ) await pool.destroy() }) -- 2.34.1