From 8735b4e51c0cfabc9612d57417834d42042cab4e Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Sat, 19 Aug 2023 14:46:17 +0200 Subject: [PATCH] fix: ensure worker node cannot be instantiaed without proper arguments MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- CHANGELOG.md | 1 + src/pools/abstract-pool.ts | 22 +++--- src/pools/cluster/dynamic.ts | 5 -- src/pools/cluster/fixed.ts | 10 --- src/pools/thread/dynamic.ts | 5 -- src/pools/thread/fixed.ts | 10 --- src/pools/worker-node.ts | 18 ++++- tests/pools/abstract/abstract-pool.test.js | 92 +++++++++++++++++++--- 8 files changed, 110 insertions(+), 53 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3a5cac15..ccd7e49f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Ensure pool event `backPressure` is emitted. - Ensure pool event `full` is emitted only once. +- Ensure worker node cannot be instantiated without proper arguments. ## [2.6.29] - 2023-08-18 diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 67086606..183344f8 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -84,6 +84,11 @@ export abstract class AbstractPool< Response > + /** + * Dynamic pool maximum size property placeholder. + */ + protected readonly max?: number + /** * Whether the pool is starting or not. */ @@ -117,13 +122,6 @@ export abstract class AbstractPool< this.chooseWorkerNode = this.chooseWorkerNode.bind(this) this.executeTask = this.executeTask.bind(this) this.enqueueTask = this.enqueueTask.bind(this) - this.dequeueTask = this.dequeueTask.bind(this) - this.checkAndEmitTaskExecutionEvents = - this.checkAndEmitTaskExecutionEvents.bind(this) - this.checkAndEmitTaskQueuingEvents = - this.checkAndEmitTaskQueuingEvents.bind(this) - this.checkAndEmitDynamicWorkerCreationEvents = - this.checkAndEmitDynamicWorkerCreationEvents.bind(this) if (this.opts.enableEvents === true) { this.emitter = new PoolEmitter() @@ -305,7 +303,7 @@ export abstract class AbstractPool< tasksQueueOptions.concurrency <= 0 ) { throw new Error( - `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}' is a negative integer or zero` + `Invalid worker tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero` ) } } @@ -517,12 +515,16 @@ export abstract class AbstractPool< /** * The pool minimum size. */ - protected abstract get minSize (): number + protected get minSize (): number { + return this.numberOfWorkers + } /** * The pool maximum size. */ - protected abstract get maxSize (): number + protected get maxSize (): number { + return this.max ?? this.numberOfWorkers + } /** * Checks if the worker id sent in the received message from a worker is valid. diff --git a/src/pools/cluster/dynamic.ts b/src/pools/cluster/dynamic.ts index 75296f7a..b93d7e87 100644 --- a/src/pools/cluster/dynamic.ts +++ b/src/pools/cluster/dynamic.ts @@ -39,11 +39,6 @@ export class DynamicClusterPool< return PoolTypes.dynamic } - /** @inheritDoc */ - protected get maxSize (): number { - return this.max - } - /** @inheritDoc */ protected get busy (): boolean { return this.full && this.internalBusy() diff --git a/src/pools/cluster/fixed.ts b/src/pools/cluster/fixed.ts index 9022c702..c7c95c4b 100644 --- a/src/pools/cluster/fixed.ts +++ b/src/pools/cluster/fixed.ts @@ -116,16 +116,6 @@ export class FixedClusterPool< return WorkerTypes.cluster } - /** @inheritDoc */ - protected get minSize (): number { - return this.numberOfWorkers - } - - /** @inheritDoc */ - protected get maxSize (): number { - return this.numberOfWorkers - } - /** @inheritDoc */ protected get busy (): boolean { return this.internalBusy() diff --git a/src/pools/thread/dynamic.ts b/src/pools/thread/dynamic.ts index 1d3f3f5d..a34f0c25 100644 --- a/src/pools/thread/dynamic.ts +++ b/src/pools/thread/dynamic.ts @@ -39,11 +39,6 @@ export class DynamicThreadPool< return PoolTypes.dynamic } - /** @inheritDoc */ - protected get maxSize (): number { - return this.max - } - /** @inheritDoc */ protected get busy (): boolean { return this.full && this.internalBusy() diff --git a/src/pools/thread/fixed.ts b/src/pools/thread/fixed.ts index 8dd4e7e3..32107e48 100644 --- a/src/pools/thread/fixed.ts +++ b/src/pools/thread/fixed.ts @@ -128,16 +128,6 @@ export class FixedThreadPool< return WorkerTypes.thread } - /** @inheritDoc */ - protected get minSize (): number { - return this.numberOfWorkers - } - - /** @inheritDoc */ - protected get maxSize (): number { - return this.numberOfWorkers - } - /** @inheritDoc */ protected get busy (): boolean { return this.internalBusy() diff --git a/src/pools/worker-node.ts b/src/pools/worker-node.ts index 7667dd16..09e4a7b5 100644 --- a/src/pools/worker-node.ts +++ b/src/pools/worker-node.ts @@ -40,6 +40,22 @@ implements IWorkerNode { * @param poolMaxSize - The pool maximum size. */ constructor (worker: Worker, workerType: WorkerType, poolMaxSize: number) { + if (worker == null) { + throw new Error('Cannot construct a worker node without a worker') + } + if (workerType == null) { + throw new Error('Cannot construct a worker node without a worker type') + } + if (poolMaxSize == null) { + throw new Error( + 'Cannot construct a worker node without a pool maximum size' + ) + } + if (isNaN(poolMaxSize)) { + throw new Error( + 'Cannot construct a worker node with a NaN pool maximum size' + ) + } this.worker = worker this.info = this.initWorkerInfo(worker, workerType) if (workerType === WorkerTypes.thread) { @@ -82,7 +98,7 @@ implements IWorkerNode { /** @inheritdoc */ public hasBackPressure (): boolean { - return this.tasksQueueSize() >= this.tasksQueueBackPressureSize + return this.tasksQueue.size >= this.tasksQueueBackPressureSize } /** @inheritdoc */ diff --git a/tests/pools/abstract/abstract-pool.test.js b/tests/pools/abstract/abstract-pool.test.js index 0ecd3944..4fd76d03 100644 --- a/tests/pools/abstract/abstract-pool.test.js +++ b/tests/pools/abstract/abstract-pool.test.js @@ -242,7 +242,9 @@ describe('Abstract pool test suite', () => { workerChoiceStrategy: 'invalidStrategy' } ) - ).toThrowError("Invalid worker choice strategy 'invalidStrategy'") + ).toThrowError( + new Error("Invalid worker choice strategy 'invalidStrategy'") + ) expect( () => new FixedThreadPool( @@ -253,7 +255,9 @@ describe('Abstract pool test suite', () => { } ) ).toThrowError( - 'Invalid worker choice strategy options: must have a weight for each worker node' + new Error( + 'Invalid worker choice strategy options: must have a weight for each worker node' + ) ) expect( () => @@ -265,7 +269,9 @@ describe('Abstract pool test suite', () => { } ) ).toThrowError( - "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'" + new Error( + "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'" + ) ) expect( () => @@ -277,7 +283,11 @@ describe('Abstract pool test suite', () => { tasksQueueOptions: { concurrency: 0 } } ) - ).toThrowError("Invalid worker tasks concurrency '0'") + ).toThrowError( + new TypeError( + 'Invalid worker tasks concurrency: 0 is a negative integer or zero' + ) + ) expect( () => new FixedThreadPool( @@ -288,7 +298,9 @@ describe('Abstract pool test suite', () => { tasksQueueOptions: 'invalidTasksQueueOptions' } ) - ).toThrowError('Invalid tasks queue options: must be a plain object') + ).toThrowError( + new TypeError('Invalid tasks queue options: must be a plain object') + ) expect( () => new FixedThreadPool( @@ -299,7 +311,9 @@ describe('Abstract pool test suite', () => { tasksQueueOptions: { concurrency: 0.2 } } ) - ).toThrowError('Invalid worker tasks concurrency: must be an integer') + ).toThrowError( + new TypeError('Invalid worker tasks concurrency: must be an integer') + ) }) it('Verify that pool worker choice strategy options can be set', async () => { @@ -439,17 +453,23 @@ describe('Abstract pool test suite', () => { expect(() => pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions') ).toThrowError( - 'Invalid worker choice strategy options: must be a plain object' + new TypeError( + 'Invalid worker choice strategy options: must be a plain object' + ) ) expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} }) ).toThrowError( - 'Invalid worker choice strategy options: must have a weight for each worker node' + new Error( + 'Invalid worker choice strategy options: must have a weight for each worker node' + ) ) expect(() => pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' }) ).toThrowError( - "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'" + new Error( + "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'" + ) ) await pool.destroy() }) @@ -484,12 +504,21 @@ describe('Abstract pool test suite', () => { expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 }) expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions') - ).toThrowError('Invalid tasks queue options: must be a plain object') + ).toThrowError( + new TypeError('Invalid tasks queue options: must be a plain object') + ) expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrowError( - "Invalid worker tasks concurrency '0'" + new Error( + 'Invalid worker tasks concurrency: 0 is a negative integer or zero' + ) + ) + expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrowError( + new Error( + 'Invalid worker tasks concurrency: -1 is a negative integer or zero' + ) ) expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrowError( - 'Invalid worker tasks concurrency: must be an integer' + new TypeError('Invalid worker tasks concurrency: must be an integer') ) await pool.destroy() }) @@ -870,6 +899,45 @@ describe('Abstract pool test suite', () => { await pool.destroy() }) + it.skip("Verify that pool event emitter 'backPressure' event can register a callback", async () => { + const pool = new DynamicThreadPool( + Math.floor(numberOfWorkers / 2), + numberOfWorkers, + './tests/worker-files/thread/testWorker.js', + { + enableTasksQueue: true + } + ) + const promises = new Set() + let poolBackPressure = 0 + let poolInfo + pool.emitter.on(PoolEvents.backPressure, (info) => { + ++poolBackPressure + poolInfo = info + }) + for (let i = 0; i < Math.pow(numberOfWorkers, 2); i++) { + promises.add(pool.execute()) + } + await Promise.all(promises) + expect(poolBackPressure).toBe(1) + expect(poolInfo).toStrictEqual({ + version, + type: PoolTypes.dynamic, + worker: WorkerTypes.thread, + ready: expect.any(Boolean), + strategy: WorkerChoiceStrategies.ROUND_ROBIN, + minSize: expect.any(Number), + maxSize: expect.any(Number), + workerNodes: expect.any(Number), + idleWorkerNodes: expect.any(Number), + busyWorkerNodes: expect.any(Number), + executedTasks: expect.any(Number), + executingTasks: expect.any(Number), + failedTasks: expect.any(Number) + }) + await pool.destroy() + }) + it('Verify that listTaskFunctions() is working', async () => { const dynamicThreadPool = new DynamicThreadPool( Math.floor(numberOfWorkers / 2), -- 2.34.1