From 9d2d0da193405976471817f6b0d048006bd01418 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Sat, 26 Aug 2023 20:06:28 +0200 Subject: [PATCH] fix: fix pool `execute()` arguments checking MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- CHANGELOG.md | 4 +++ src/pools/abstract-pool.ts | 13 +++----- src/worker/abstract-worker.ts | 38 +++++++++++----------- src/worker/cluster-worker.ts | 2 +- tests/pools/abstract/abstract-pool.test.js | 23 +++++++++++++ 5 files changed, 51 insertions(+), 29 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7fd00387..391be208 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed + +- Fix pool `execute()` arguments check. + ### Changed - Make continuous tasks stealing algorithm less aggressive. diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index d415ac9c..afcf611e 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -743,9 +743,11 @@ export abstract class AbstractPool< return await new Promise((resolve, reject) => { if (!this.started) { reject(new Error('Cannot execute a task on destroyed pool')) + return } if (name != null && typeof name !== 'string') { reject(new TypeError('name argument must be a string')) + return } if ( name != null && @@ -753,22 +755,15 @@ export abstract class AbstractPool< name.trim().length === 0 ) { reject(new TypeError('name argument must not be an empty string')) + return } if (transferList != null && !Array.isArray(transferList)) { reject(new TypeError('transferList argument must be an array')) + return } const timestamp = performance.now() const workerNodeKey = this.chooseWorkerNode() const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo - if ( - name != null && - Array.isArray(workerInfo.taskFunctions) && - !workerInfo.taskFunctions.includes(name) - ) { - reject( - new Error(`Task function '${name}' is not registered in the pool`) - ) - } const task: Task = { name: name ?? DEFAULT_TASK_NAME, // eslint-disable-next-line @typescript-eslint/consistent-type-assertions diff --git a/src/worker/abstract-worker.ts b/src/worker/abstract-worker.ts index a00e8f8f..033ed2a5 100644 --- a/src/worker/abstract-worker.ts +++ b/src/worker/abstract-worker.ts @@ -88,10 +88,13 @@ export abstract class AbstractWorker< protected opts: WorkerOptions = DEFAULT_WORKER_OPTIONS ) { super(type) - this.checkWorkerOptions(this.opts) + if (this.isMain == null) { + throw new Error('isMain parameter is mandatory') + } this.checkTaskFunctions(taskFunctions) + this.checkWorkerOptions(this.opts) if (!this.isMain) { - this.getMainWorker()?.on('message', this.handleReadyMessage.bind(this)) + this.getMainWorker().on('message', this.handleReadyMessage.bind(this)) } } @@ -463,7 +466,20 @@ export abstract class AbstractWorker< * @throws {@link https://nodejs.org/api/errors.html#class-error} If the task function is not found. */ protected run (task: Task): void { - const fn = this.getTaskFunction(task.name) + const { name, taskId, data } = task + const fn = this.taskFunctions.get(name ?? DEFAULT_TASK_NAME) + if (fn == null) { + this.sendToMainWorker({ + taskError: { + name: name as string, + message: `Task function '${name as string}' not found`, + data + }, + workerId: this.id, + taskId + }) + return + } if (isAsyncFunction(fn)) { this.runInAsyncScope(this.runAsync.bind(this), this, fn, task) } else { @@ -549,22 +565,6 @@ export abstract class AbstractWorker< .catch(EMPTY_FUNCTION) } - /** - * Gets the task function with the given name. - * - * @param name - Name of the task function that will be returned. - * @returns The task function. - * @throws {@link https://nodejs.org/api/errors.html#class-error} If the task function is not found. - */ - private getTaskFunction (name?: string): TaskFunction { - name = name ?? DEFAULT_TASK_NAME - const fn = this.taskFunctions.get(name) - if (fn == null) { - throw new Error(`Task function '${name}' not found`) - } - return fn - } - private beginTaskPerformance (name?: string): TaskPerformance { this.checkStatistics() return { diff --git a/src/worker/cluster-worker.ts b/src/worker/cluster-worker.ts index 35899e79..26964aa4 100644 --- a/src/worker/cluster-worker.ts +++ b/src/worker/cluster-worker.ts @@ -45,7 +45,7 @@ export class ClusterWorker< protected handleReadyMessage (message: MessageValue): void { if (message.workerId === this.id && message.ready === false) { try { - this.getMainWorker()?.on('message', this.messageListener.bind(this)) + this.getMainWorker().on('message', this.messageListener.bind(this)) this.sendToMainWorker({ ready: true, taskFunctions: this.listTaskFunctions(), diff --git a/tests/pools/abstract/abstract-pool.test.js b/tests/pools/abstract/abstract-pool.test.js index b777e341..2fa22471 100644 --- a/tests/pools/abstract/abstract-pool.test.js +++ b/tests/pools/abstract/abstract-pool.test.js @@ -697,6 +697,29 @@ describe('Abstract pool test suite', () => { } }) + it('Verify that pool execute() arguments are checked', async () => { + const pool = new FixedClusterPool( + numberOfWorkers, + './tests/worker-files/cluster/testWorker.js' + ) + await expect(pool.execute(undefined, 0)).rejects.toThrowError( + new TypeError('name argument must be a string') + ) + await expect(pool.execute(undefined, '')).rejects.toThrowError( + new TypeError('name argument must not be an empty string') + ) + await expect(pool.execute(undefined, undefined, {})).rejects.toThrowError( + new TypeError('transferList argument must be an array') + ) + await expect(pool.execute(undefined, 'unknown')).rejects.toBe( + "Task function 'unknown' not found" + ) + await pool.destroy() + await expect(pool.execute(undefined, undefined, {})).rejects.toThrowError( + new Error('Cannot execute a task on destroyed pool') + ) + }) + it('Verify that pool worker tasks usage are computed', async () => { const pool = new FixedClusterPool( numberOfWorkers, -- 2.34.1