fix: fix pool `execute()` arguments checking
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Sat, 26 Aug 2023 18:06:28 +0000 (20:06 +0200)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Sat, 26 Aug 2023 18:06:28 +0000 (20:06 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
CHANGELOG.md
src/pools/abstract-pool.ts
src/worker/abstract-worker.ts
src/worker/cluster-worker.ts
tests/pools/abstract/abstract-pool.test.js

index 7fd003879795606a2256d2b1a215594ff6510c27..391be208cd908e6e97303176105cf7abcf132067 100644 (file)
@@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
 
 ## [Unreleased]
 
+### Fixed
+
+- Fix pool `execute()` arguments check.
+
 ### Changed
 
 - Make continuous tasks stealing algorithm less aggressive.
index d415ac9cd827debe42315ff81580b94ef301ea70..afcf611eb65225449cf83c9440f5d1af81a24b6b 100644 (file)
@@ -743,9 +743,11 @@ export abstract class AbstractPool<
     return await new Promise<Response>((resolve, reject) => {
       if (!this.started) {
         reject(new Error('Cannot execute a task on destroyed pool'))
+        return
       }
       if (name != null && typeof name !== 'string') {
         reject(new TypeError('name argument must be a string'))
+        return
       }
       if (
         name != null &&
@@ -753,22 +755,15 @@ export abstract class AbstractPool<
         name.trim().length === 0
       ) {
         reject(new TypeError('name argument must not be an empty string'))
+        return
       }
       if (transferList != null && !Array.isArray(transferList)) {
         reject(new TypeError('transferList argument must be an array'))
+        return
       }
       const timestamp = performance.now()
       const workerNodeKey = this.chooseWorkerNode()
       const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo
-      if (
-        name != null &&
-        Array.isArray(workerInfo.taskFunctions) &&
-        !workerInfo.taskFunctions.includes(name)
-      ) {
-        reject(
-          new Error(`Task function '${name}' is not registered in the pool`)
-        )
-      }
       const task: Task<Data> = {
         name: name ?? DEFAULT_TASK_NAME,
         // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
index a00e8f8faadd4e912de558c5d4f8ca980db76036..033ed2a5e663fbcd7c88b4317adc240e20b2d298 100644 (file)
@@ -88,10 +88,13 @@ export abstract class AbstractWorker<
     protected opts: WorkerOptions = DEFAULT_WORKER_OPTIONS
   ) {
     super(type)
-    this.checkWorkerOptions(this.opts)
+    if (this.isMain == null) {
+      throw new Error('isMain parameter is mandatory')
+    }
     this.checkTaskFunctions(taskFunctions)
+    this.checkWorkerOptions(this.opts)
     if (!this.isMain) {
-      this.getMainWorker()?.on('message', this.handleReadyMessage.bind(this))
+      this.getMainWorker().on('message', this.handleReadyMessage.bind(this))
     }
   }
 
@@ -463,7 +466,20 @@ export abstract class AbstractWorker<
    * @throws {@link https://nodejs.org/api/errors.html#class-error} If the task function is not found.
    */
   protected run (task: Task<Data>): void {
-    const fn = this.getTaskFunction(task.name)
+    const { name, taskId, data } = task
+    const fn = this.taskFunctions.get(name ?? DEFAULT_TASK_NAME)
+    if (fn == null) {
+      this.sendToMainWorker({
+        taskError: {
+          name: name as string,
+          message: `Task function '${name as string}' not found`,
+          data
+        },
+        workerId: this.id,
+        taskId
+      })
+      return
+    }
     if (isAsyncFunction(fn)) {
       this.runInAsyncScope(this.runAsync.bind(this), this, fn, task)
     } else {
@@ -549,22 +565,6 @@ export abstract class AbstractWorker<
       .catch(EMPTY_FUNCTION)
   }
 
-  /**
-   * Gets the task function with the given name.
-   *
-   * @param name - Name of the task function that will be returned.
-   * @returns The task function.
-   * @throws {@link https://nodejs.org/api/errors.html#class-error} If the task function is not found.
-   */
-  private getTaskFunction (name?: string): TaskFunction<Data, Response> {
-    name = name ?? DEFAULT_TASK_NAME
-    const fn = this.taskFunctions.get(name)
-    if (fn == null) {
-      throw new Error(`Task function '${name}' not found`)
-    }
-    return fn
-  }
-
   private beginTaskPerformance (name?: string): TaskPerformance {
     this.checkStatistics()
     return {
index 35899e794f5b373aa5bca3ec5b3dbe2d1f5cd4c3..26964aa489f83fff90f8e4306c3dc1bbafc36f23 100644 (file)
@@ -45,7 +45,7 @@ export class ClusterWorker<
   protected handleReadyMessage (message: MessageValue<Data>): void {
     if (message.workerId === this.id && message.ready === false) {
       try {
-        this.getMainWorker()?.on('message', this.messageListener.bind(this))
+        this.getMainWorker().on('message', this.messageListener.bind(this))
         this.sendToMainWorker({
           ready: true,
           taskFunctions: this.listTaskFunctions(),
index b777e34153d077c08293f3d79f9f54fafac14c39..2fa224716461bc928c65cb2b7d07f48eb0cffd47 100644 (file)
@@ -697,6 +697,29 @@ describe('Abstract pool test suite', () => {
     }
   })
 
+  it('Verify that pool execute() arguments are checked', async () => {
+    const pool = new FixedClusterPool(
+      numberOfWorkers,
+      './tests/worker-files/cluster/testWorker.js'
+    )
+    await expect(pool.execute(undefined, 0)).rejects.toThrowError(
+      new TypeError('name argument must be a string')
+    )
+    await expect(pool.execute(undefined, '')).rejects.toThrowError(
+      new TypeError('name argument must not be an empty string')
+    )
+    await expect(pool.execute(undefined, undefined, {})).rejects.toThrowError(
+      new TypeError('transferList argument must be an array')
+    )
+    await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
+      "Task function 'unknown' not found"
+    )
+    await pool.destroy()
+    await expect(pool.execute(undefined, undefined, {})).rejects.toThrowError(
+      new Error('Cannot execute a task on destroyed pool')
+    )
+  })
+
   it('Verify that pool worker tasks usage are computed', async () => {
     const pool = new FixedClusterPool(
       numberOfWorkers,