From d4aeae5aa9e260c8c2f6d28f3133de368552c108 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Fri, 14 Apr 2023 22:00:04 +0200 Subject: [PATCH] fix: fix worker function type definition and validation MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- CHANGELOG.md | 4 ++ src/worker/abstract-worker.ts | 15 +++++-- src/worker/cluster-worker.ts | 5 ++- src/worker/thread-worker.ts | 5 ++- tests/pools/abstract/abstract-pool.test.js | 39 ++++++++++++++++--- tests/pools/cluster/dynamic.test.js | 2 +- tests/pools/cluster/fixed.test.js | 2 +- .../selection-strategies.test.js | 4 +- tests/pools/thread/dynamic.test.js | 2 +- tests/pools/thread/fixed.test.js | 2 +- tests/worker/abstract-worker.test.js | 36 ++++++++++------- 11 files changed, 84 insertions(+), 32 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9ba7cb1b..5efd1b84 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed + +- Fix worker function type definition and validation. + ## [2.4.8] - 2023-04-12 ### Fixed diff --git a/src/worker/abstract-worker.ts b/src/worker/abstract-worker.ts index 7563e97b..73eec1c6 100644 --- a/src/worker/abstract-worker.ts +++ b/src/worker/abstract-worker.ts @@ -41,7 +41,7 @@ export abstract class AbstractWorker< public constructor ( type: string, protected readonly isMain: boolean, - fn: (data: Data) => Response, + fn: (data: Data) => Response | Promise, protected mainWorker: MainWorker | undefined | null, protected readonly opts: WorkerOptions = { /** @@ -56,8 +56,8 @@ export abstract class AbstractWorker< } ) { super(type) - this.checkFunctionInput(fn) this.checkWorkerOptions(this.opts) + this.checkFunctionInput(fn) if (!this.isMain) { this.lastTaskTimestamp = performance.now() this.aliveInterval = setInterval( @@ -83,7 +83,7 @@ export abstract class AbstractWorker< */ protected messageListener ( message: MessageValue, - fn: (data: Data) => Response + fn: (data: Data) => Response | Promise ): void { if (message.id != null && message.data != null) { // Task message received @@ -114,11 +114,18 @@ export abstract class AbstractWorker< * * @param fn - The function that should be defined. */ - private checkFunctionInput (fn: (data: Data) => Response): void { + private checkFunctionInput ( + fn: (data: Data) => Response | Promise + ): void { if (fn == null) throw new Error('fn parameter is mandatory') if (typeof fn !== 'function') { throw new TypeError('fn parameter is not a function') } + if (fn.constructor.name === 'AsyncFunction' && this.opts.async === false) { + throw new Error( + 'fn parameter is an async function, please set the async option to true' + ) + } } /** diff --git a/src/worker/cluster-worker.ts b/src/worker/cluster-worker.ts index 655520cc..1d272981 100644 --- a/src/worker/cluster-worker.ts +++ b/src/worker/cluster-worker.ts @@ -28,7 +28,10 @@ export class ClusterWorker< * @param fn - Function processed by the worker when the pool's `execution` function is invoked. * @param opts - Options for the worker. */ - public constructor (fn: (data: Data) => Response, opts: WorkerOptions = {}) { + public constructor ( + fn: (data: Data) => Response | Promise, + opts: WorkerOptions = {} + ) { super( 'worker-cluster-pool:poolifier', cluster.isPrimary, diff --git a/src/worker/thread-worker.ts b/src/worker/thread-worker.ts index ac775a09..ad2bc644 100644 --- a/src/worker/thread-worker.ts +++ b/src/worker/thread-worker.ts @@ -28,7 +28,10 @@ export class ThreadWorker< * @param fn - Function processed by the worker when the pool's `execution` function is invoked. * @param opts - Options for the worker. */ - public constructor (fn: (data: Data) => Response, opts: WorkerOptions = {}) { + public constructor ( + fn: (data: Data) => Response | Promise, + opts: WorkerOptions = {} + ) { super('worker-thread-pool:poolifier', isMainThread, fn, parentPort, opts) } diff --git a/tests/pools/abstract/abstract-pool.test.js b/tests/pools/abstract/abstract-pool.test.js index 1a5a75a0..86506f67 100644 --- a/tests/pools/abstract/abstract-pool.test.js +++ b/tests/pools/abstract/abstract-pool.test.js @@ -15,7 +15,7 @@ describe('Abstract pool test suite', () => { ) class StubPoolWithRemoveAllWorker extends FixedThreadPool { removeAllWorker () { - this.workers = [] + this.workerNodes = [] this.promiseResponseMap.clear() } } @@ -35,7 +35,7 @@ describe('Abstract pool test suite', () => { errorHandler: e => console.error(e) } ) - ).toThrowError(new Error('Cannot start a pool from a worker!')) + ).toThrowError('Cannot start a pool from a worker!') }) it('Verify that filePath is checked', () => { @@ -52,9 +52,7 @@ describe('Abstract pool test suite', () => { it('Verify that numberOfWorkers is checked', () => { expect(() => new FixedThreadPool()).toThrowError( - new Error( - 'Cannot instantiate a pool without specifying the number of workers' - ) + 'Cannot instantiate a pool without specifying the number of workers' ) }) @@ -88,6 +86,7 @@ describe('Abstract pool test suite', () => { expect(pool.opts.enableEvents).toBe(true) expect(pool.emitter).toBeDefined() expect(pool.opts.enableTasksQueue).toBe(false) + expect(pool.opts.tasksQueueOptions).toBeUndefined() expect(pool.opts.workerChoiceStrategy).toBe( WorkerChoiceStrategies.ROUND_ROBIN ) @@ -108,6 +107,7 @@ describe('Abstract pool test suite', () => { workerChoiceStrategyOptions: { medRunTime: true }, enableEvents: false, enableTasksQueue: true, + tasksQueueOptions: { concurrency: 2 }, messageHandler: testHandler, errorHandler: testHandler, onlineHandler: testHandler, @@ -117,6 +117,7 @@ describe('Abstract pool test suite', () => { expect(pool.opts.enableEvents).toBe(false) expect(pool.emitter).toBeUndefined() expect(pool.opts.enableTasksQueue).toBe(true) + expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 }) expect(pool.opts.workerChoiceStrategy).toBe( WorkerChoiceStrategies.LESS_USED ) @@ -130,7 +131,31 @@ describe('Abstract pool test suite', () => { await pool.destroy() }) - it('Simulate worker not found during getWorkerTasksUsage', async () => { + it('Verify that pool options are valid', async () => { + expect( + () => + new FixedThreadPool( + numberOfWorkers, + './tests/worker-files/thread/testWorker.js', + { + enableTasksQueue: true, + tasksQueueOptions: { concurrency: 0 } + } + ) + ).toThrowError("Invalid worker tasks concurrency '0'") + expect( + () => + new FixedThreadPool( + numberOfWorkers, + './tests/worker-files/thread/testWorker.js', + { + workerChoiceStrategy: 'invalidStrategy' + } + ) + ).toThrowError("Invalid worker choice strategy 'invalidStrategy'") + }) + + it('Simulate worker not found at getWorkerTasksUsage()', async () => { const pool = new StubPoolWithRemoveAllWorker( numberOfWorkers, './tests/worker-files/cluster/testWorker.js', @@ -138,8 +163,10 @@ describe('Abstract pool test suite', () => { errorHandler: e => console.error(e) } ) + expect(pool.workerNodes.length).toBe(numberOfWorkers) // Simulate worker not found. pool.removeAllWorker() + expect(pool.workerNodes.length).toBe(0) expect(() => pool.getWorkerTasksUsage()).toThrowError( workerNotFoundInPoolError ) diff --git a/tests/pools/cluster/dynamic.test.js b/tests/pools/cluster/dynamic.test.js index 0da467ca..23403ab7 100644 --- a/tests/pools/cluster/dynamic.test.js +++ b/tests/pools/cluster/dynamic.test.js @@ -66,7 +66,7 @@ describe('Dynamic cluster pool test suite', () => { it('Validation of inputs test', () => { expect(() => new DynamicClusterPool(min)).toThrowError( - new Error('Please specify a file with a worker implementation') + 'Please specify a file with a worker implementation' ) }) diff --git a/tests/pools/cluster/fixed.test.js b/tests/pools/cluster/fixed.test.js index c28b8021..4aaa57c7 100644 --- a/tests/pools/cluster/fixed.test.js +++ b/tests/pools/cluster/fixed.test.js @@ -206,6 +206,6 @@ describe('Fixed cluster pool test suite', () => { expect( () => new FixedClusterPool(0, './tests/worker-files/cluster/testWorker.js') - ).toThrowError(new Error('Cannot instantiate a fixed pool with no worker')) + ).toThrowError('Cannot instantiate a fixed pool with no worker') }) }) diff --git a/tests/pools/selection-strategies/selection-strategies.test.js b/tests/pools/selection-strategies/selection-strategies.test.js index 7755cc34..2fcbba5a 100644 --- a/tests/pools/selection-strategies/selection-strategies.test.js +++ b/tests/pools/selection-strategies/selection-strategies.test.js @@ -714,8 +714,6 @@ describe('Selection strategies test suite', () => { './tests/worker-files/thread/testWorker.js', { workerChoiceStrategy: 'UNKNOWN_STRATEGY' } ) - ).toThrowError( - new Error("Invalid worker choice strategy 'UNKNOWN_STRATEGY'") - ) + ).toThrowError("Invalid worker choice strategy 'UNKNOWN_STRATEGY'") }) }) diff --git a/tests/pools/thread/dynamic.test.js b/tests/pools/thread/dynamic.test.js index 11d91128..29ce0152 100644 --- a/tests/pools/thread/dynamic.test.js +++ b/tests/pools/thread/dynamic.test.js @@ -66,7 +66,7 @@ describe('Dynamic thread pool test suite', () => { it('Validation of inputs test', () => { expect(() => new DynamicThreadPool(min)).toThrowError( - new Error('Please specify a file with a worker implementation') + 'Please specify a file with a worker implementation' ) }) diff --git a/tests/pools/thread/fixed.test.js b/tests/pools/thread/fixed.test.js index 75c4a935..6cccd6db 100644 --- a/tests/pools/thread/fixed.test.js +++ b/tests/pools/thread/fixed.test.js @@ -186,6 +186,6 @@ describe('Fixed thread pool test suite', () => { it('Verify that a pool with zero worker fails', async () => { expect( () => new FixedThreadPool(0, './tests/worker-files/thread/testWorker.js') - ).toThrowError(new Error('Cannot instantiate a fixed pool with no worker')) + ).toThrowError('Cannot instantiate a fixed pool with no worker') }) }) diff --git a/tests/worker/abstract-worker.test.js b/tests/worker/abstract-worker.test.js index fbc9a06d..2eae1f81 100644 --- a/tests/worker/abstract-worker.test.js +++ b/tests/worker/abstract-worker.test.js @@ -9,18 +9,6 @@ describe('Abstract worker test suite', () => { } } - it('Verify that fn parameter is mandatory', () => { - expect(() => new ClusterWorker()).toThrowError( - new Error('fn parameter is mandatory') - ) - }) - - it('Verify that fn parameter is a function', () => { - expect(() => new ClusterWorker({})).toThrowError( - new TypeError('fn parameter is not a function') - ) - }) - it('Verify worker options default values', () => { const worker = new ThreadWorker(() => {}) expect(worker.opts.maxInactiveTime).toStrictEqual(60000) @@ -39,6 +27,28 @@ describe('Abstract worker test suite', () => { expect(worker.opts.async).toBe(true) }) + it('Verify that fn parameter is mandatory', () => { + expect(() => new ClusterWorker()).toThrowError('fn parameter is mandatory') + }) + + it('Verify that fn parameter is a function', () => { + expect(() => new ClusterWorker({})).toThrowError( + new TypeError('fn parameter is not a function') + ) + expect(() => new ClusterWorker('')).toThrowError( + new TypeError('fn parameter is not a function') + ) + }) + + it('Verify that async fn parameter without async option throw error', () => { + const fn = async () => { + return new Promise() + } + expect(() => new ClusterWorker(fn)).toThrowError( + 'fn parameter is an async function, please set the async option to true' + ) + }) + it('Verify that handleError function is working properly', () => { const error = new Error('My error') const worker = new ThreadWorker(() => {}) @@ -48,6 +58,6 @@ describe('Abstract worker test suite', () => { it('Verify that get main worker throw error if main worker is not set', () => { expect(() => new StubPoolWithIsMainWorker(() => {}).getMainWorker() - ).toThrowError(new Error('Main worker was not set')) + ).toThrowError('Main worker was not set') }) }) -- 2.34.1