## [Unreleased]
+### Fixed
+
+- Fix worker function type definition and validation.
+
## [2.4.8] - 2023-04-12
### Fixed
public constructor (
type: string,
protected readonly isMain: boolean,
- fn: (data: Data) => Response,
+ fn: (data: Data) => Response | Promise<Response>,
protected mainWorker: MainWorker | undefined | null,
protected readonly opts: WorkerOptions = {
/**
}
) {
super(type)
- this.checkFunctionInput(fn)
this.checkWorkerOptions(this.opts)
+ this.checkFunctionInput(fn)
if (!this.isMain) {
this.lastTaskTimestamp = performance.now()
this.aliveInterval = setInterval(
*/
protected messageListener (
message: MessageValue<Data, MainWorker>,
- fn: (data: Data) => Response
+ fn: (data: Data) => Response | Promise<Response>
): void {
if (message.id != null && message.data != null) {
// Task message received
*
* @param fn - The function that should be defined.
*/
- private checkFunctionInput (fn: (data: Data) => Response): void {
+ private checkFunctionInput (
+ fn: (data: Data) => Response | Promise<Response>
+ ): void {
if (fn == null) throw new Error('fn parameter is mandatory')
if (typeof fn !== 'function') {
throw new TypeError('fn parameter is not a function')
}
+ if (fn.constructor.name === 'AsyncFunction' && this.opts.async === false) {
+ throw new Error(
+ 'fn parameter is an async function, please set the async option to true'
+ )
+ }
}
/**
* @param fn - Function processed by the worker when the pool's `execution` function is invoked.
* @param opts - Options for the worker.
*/
- public constructor (fn: (data: Data) => Response, opts: WorkerOptions = {}) {
+ public constructor (
+ fn: (data: Data) => Response | Promise<Response>,
+ opts: WorkerOptions = {}
+ ) {
super(
'worker-cluster-pool:poolifier',
cluster.isPrimary,
* @param fn - Function processed by the worker when the pool's `execution` function is invoked.
* @param opts - Options for the worker.
*/
- public constructor (fn: (data: Data) => Response, opts: WorkerOptions = {}) {
+ public constructor (
+ fn: (data: Data) => Response | Promise<Response>,
+ opts: WorkerOptions = {}
+ ) {
super('worker-thread-pool:poolifier', isMainThread, fn, parentPort, opts)
}
)
class StubPoolWithRemoveAllWorker extends FixedThreadPool {
removeAllWorker () {
- this.workers = []
+ this.workerNodes = []
this.promiseResponseMap.clear()
}
}
errorHandler: e => console.error(e)
}
)
- ).toThrowError(new Error('Cannot start a pool from a worker!'))
+ ).toThrowError('Cannot start a pool from a worker!')
})
it('Verify that filePath is checked', () => {
it('Verify that numberOfWorkers is checked', () => {
expect(() => new FixedThreadPool()).toThrowError(
- new Error(
- 'Cannot instantiate a pool without specifying the number of workers'
- )
+ 'Cannot instantiate a pool without specifying the number of workers'
)
})
expect(pool.opts.enableEvents).toBe(true)
expect(pool.emitter).toBeDefined()
expect(pool.opts.enableTasksQueue).toBe(false)
+ expect(pool.opts.tasksQueueOptions).toBeUndefined()
expect(pool.opts.workerChoiceStrategy).toBe(
WorkerChoiceStrategies.ROUND_ROBIN
)
workerChoiceStrategyOptions: { medRunTime: true },
enableEvents: false,
enableTasksQueue: true,
+ tasksQueueOptions: { concurrency: 2 },
messageHandler: testHandler,
errorHandler: testHandler,
onlineHandler: testHandler,
expect(pool.opts.enableEvents).toBe(false)
expect(pool.emitter).toBeUndefined()
expect(pool.opts.enableTasksQueue).toBe(true)
+ expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
expect(pool.opts.workerChoiceStrategy).toBe(
WorkerChoiceStrategies.LESS_USED
)
await pool.destroy()
})
- it('Simulate worker not found during getWorkerTasksUsage', async () => {
+ it('Verify that pool options are valid', async () => {
+ expect(
+ () =>
+ new FixedThreadPool(
+ numberOfWorkers,
+ './tests/worker-files/thread/testWorker.js',
+ {
+ enableTasksQueue: true,
+ tasksQueueOptions: { concurrency: 0 }
+ }
+ )
+ ).toThrowError("Invalid worker tasks concurrency '0'")
+ expect(
+ () =>
+ new FixedThreadPool(
+ numberOfWorkers,
+ './tests/worker-files/thread/testWorker.js',
+ {
+ workerChoiceStrategy: 'invalidStrategy'
+ }
+ )
+ ).toThrowError("Invalid worker choice strategy 'invalidStrategy'")
+ })
+
+ it('Simulate worker not found at getWorkerTasksUsage()', async () => {
const pool = new StubPoolWithRemoveAllWorker(
numberOfWorkers,
'./tests/worker-files/cluster/testWorker.js',
errorHandler: e => console.error(e)
}
)
+ expect(pool.workerNodes.length).toBe(numberOfWorkers)
// Simulate worker not found.
pool.removeAllWorker()
+ expect(pool.workerNodes.length).toBe(0)
expect(() => pool.getWorkerTasksUsage()).toThrowError(
workerNotFoundInPoolError
)
it('Validation of inputs test', () => {
expect(() => new DynamicClusterPool(min)).toThrowError(
- new Error('Please specify a file with a worker implementation')
+ 'Please specify a file with a worker implementation'
)
})
expect(
() =>
new FixedClusterPool(0, './tests/worker-files/cluster/testWorker.js')
- ).toThrowError(new Error('Cannot instantiate a fixed pool with no worker'))
+ ).toThrowError('Cannot instantiate a fixed pool with no worker')
})
})
'./tests/worker-files/thread/testWorker.js',
{ workerChoiceStrategy: 'UNKNOWN_STRATEGY' }
)
- ).toThrowError(
- new Error("Invalid worker choice strategy 'UNKNOWN_STRATEGY'")
- )
+ ).toThrowError("Invalid worker choice strategy 'UNKNOWN_STRATEGY'")
})
})
it('Validation of inputs test', () => {
expect(() => new DynamicThreadPool(min)).toThrowError(
- new Error('Please specify a file with a worker implementation')
+ 'Please specify a file with a worker implementation'
)
})
it('Verify that a pool with zero worker fails', async () => {
expect(
() => new FixedThreadPool(0, './tests/worker-files/thread/testWorker.js')
- ).toThrowError(new Error('Cannot instantiate a fixed pool with no worker'))
+ ).toThrowError('Cannot instantiate a fixed pool with no worker')
})
})
}
}
- it('Verify that fn parameter is mandatory', () => {
- expect(() => new ClusterWorker()).toThrowError(
- new Error('fn parameter is mandatory')
- )
- })
-
- it('Verify that fn parameter is a function', () => {
- expect(() => new ClusterWorker({})).toThrowError(
- new TypeError('fn parameter is not a function')
- )
- })
-
it('Verify worker options default values', () => {
const worker = new ThreadWorker(() => {})
expect(worker.opts.maxInactiveTime).toStrictEqual(60000)
expect(worker.opts.async).toBe(true)
})
+ it('Verify that fn parameter is mandatory', () => {
+ expect(() => new ClusterWorker()).toThrowError('fn parameter is mandatory')
+ })
+
+ it('Verify that fn parameter is a function', () => {
+ expect(() => new ClusterWorker({})).toThrowError(
+ new TypeError('fn parameter is not a function')
+ )
+ expect(() => new ClusterWorker('')).toThrowError(
+ new TypeError('fn parameter is not a function')
+ )
+ })
+
+ it('Verify that async fn parameter without async option throw error', () => {
+ const fn = async () => {
+ return new Promise()
+ }
+ expect(() => new ClusterWorker(fn)).toThrowError(
+ 'fn parameter is an async function, please set the async option to true'
+ )
+ })
+
it('Verify that handleError function is working properly', () => {
const error = new Error('My error')
const worker = new ThreadWorker(() => {})
it('Verify that get main worker throw error if main worker is not set', () => {
expect(() =>
new StubPoolWithIsMainWorker(() => {}).getMainWorker()
- ).toThrowError(new Error('Main worker was not set'))
+ ).toThrowError('Main worker was not set')
})
})