## [Unreleased]
+### Fixed
+
+- Fix pool `execute()` arguments check.
+
### Changed
- Make continuous tasks stealing algorithm less aggressive.
return await new Promise<Response>((resolve, reject) => {
if (!this.started) {
reject(new Error('Cannot execute a task on destroyed pool'))
+ return
}
if (name != null && typeof name !== 'string') {
reject(new TypeError('name argument must be a string'))
+ return
}
if (
name != null &&
name.trim().length === 0
) {
reject(new TypeError('name argument must not be an empty string'))
+ return
}
if (transferList != null && !Array.isArray(transferList)) {
reject(new TypeError('transferList argument must be an array'))
+ return
}
const timestamp = performance.now()
const workerNodeKey = this.chooseWorkerNode()
const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo
- if (
- name != null &&
- Array.isArray(workerInfo.taskFunctions) &&
- !workerInfo.taskFunctions.includes(name)
- ) {
- reject(
- new Error(`Task function '${name}' is not registered in the pool`)
- )
- }
const task: Task<Data> = {
name: name ?? DEFAULT_TASK_NAME,
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
protected opts: WorkerOptions = DEFAULT_WORKER_OPTIONS
) {
super(type)
- this.checkWorkerOptions(this.opts)
+ if (this.isMain == null) {
+ throw new Error('isMain parameter is mandatory')
+ }
this.checkTaskFunctions(taskFunctions)
+ this.checkWorkerOptions(this.opts)
if (!this.isMain) {
- this.getMainWorker()?.on('message', this.handleReadyMessage.bind(this))
+ this.getMainWorker().on('message', this.handleReadyMessage.bind(this))
}
}
* @throws {@link https://nodejs.org/api/errors.html#class-error} If the task function is not found.
*/
protected run (task: Task<Data>): void {
- const fn = this.getTaskFunction(task.name)
+ const { name, taskId, data } = task
+ const fn = this.taskFunctions.get(name ?? DEFAULT_TASK_NAME)
+ if (fn == null) {
+ this.sendToMainWorker({
+ taskError: {
+ name: name as string,
+ message: `Task function '${name as string}' not found`,
+ data
+ },
+ workerId: this.id,
+ taskId
+ })
+ return
+ }
if (isAsyncFunction(fn)) {
this.runInAsyncScope(this.runAsync.bind(this), this, fn, task)
} else {
.catch(EMPTY_FUNCTION)
}
- /**
- * Gets the task function with the given name.
- *
- * @param name - Name of the task function that will be returned.
- * @returns The task function.
- * @throws {@link https://nodejs.org/api/errors.html#class-error} If the task function is not found.
- */
- private getTaskFunction (name?: string): TaskFunction<Data, Response> {
- name = name ?? DEFAULT_TASK_NAME
- const fn = this.taskFunctions.get(name)
- if (fn == null) {
- throw new Error(`Task function '${name}' not found`)
- }
- return fn
- }
-
private beginTaskPerformance (name?: string): TaskPerformance {
this.checkStatistics()
return {
protected handleReadyMessage (message: MessageValue<Data>): void {
if (message.workerId === this.id && message.ready === false) {
try {
- this.getMainWorker()?.on('message', this.messageListener.bind(this))
+ this.getMainWorker().on('message', this.messageListener.bind(this))
this.sendToMainWorker({
ready: true,
taskFunctions: this.listTaskFunctions(),
}
})
+ it('Verify that pool execute() arguments are checked', async () => {
+ const pool = new FixedClusterPool(
+ numberOfWorkers,
+ './tests/worker-files/cluster/testWorker.js'
+ )
+ await expect(pool.execute(undefined, 0)).rejects.toThrowError(
+ new TypeError('name argument must be a string')
+ )
+ await expect(pool.execute(undefined, '')).rejects.toThrowError(
+ new TypeError('name argument must not be an empty string')
+ )
+ await expect(pool.execute(undefined, undefined, {})).rejects.toThrowError(
+ new TypeError('transferList argument must be an array')
+ )
+ await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
+ "Task function 'unknown' not found"
+ )
+ await pool.destroy()
+ await expect(pool.execute(undefined, undefined, {})).rejects.toThrowError(
+ new Error('Cannot execute a task on destroyed pool')
+ )
+ })
+
it('Verify that pool worker tasks usage are computed', async () => {
const pool = new FixedClusterPool(
numberOfWorkers,