fix: ensure worker error is propagated unchanged if possible (#2634)
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Thu, 24 Oct 2024 20:29:51 +0000 (22:29 +0200)
committerGitHub <noreply@github.com>
Thu, 24 Oct 2024 20:29:51 +0000 (22:29 +0200)
* fix: ensure worker error is propagated unchanged if possible

closes #2633

Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
* refactor: cleanups

Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
---------

Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
src/pools/abstract-pool.ts
src/utility-types.ts
src/worker/abstract-worker.ts
src/worker/cluster-worker.ts
src/worker/thread-worker.ts
tests/pools/abstract-pool.test.mjs
tests/pools/cluster/fixed.test.mjs
tests/pools/thread/fixed.test.mjs
tests/worker/cluster-worker.test.mjs
tests/worker/thread-worker.test.mjs

index 72507108c0a1f669a443167f2c10b88263ea22c9..ae4e3f10b027cab3c34e5b5798a05042c37d9126 100644 (file)
@@ -1184,14 +1184,18 @@ export abstract class AbstractPool<
       const { asyncResource, reject, resolve, workerNodeKey } = promiseResponse
       const workerNode = this.workerNodes[workerNodeKey]
       if (workerError != null) {
+        let error: Error
+        if (workerError.error != null) {
+          error = workerError.error
+        } else {
+          const err = new Error(workerError.message)
+          err.stack = workerError.stack
+          error = err
+        }
         this.emitter?.emit(PoolEvents.taskError, workerError)
         asyncResource != null
-          ? asyncResource.runInAsyncScope(
-            reject,
-            this.emitter,
-            workerError.message
-          )
-          : reject(workerError.message)
+          ? asyncResource.runInAsyncScope(reject, this.emitter, error)
+          : reject(error)
       } else {
         asyncResource != null
           ? asyncResource.runInAsyncScope(resolve, this.emitter, data)
@@ -1493,6 +1497,7 @@ export abstract class AbstractPool<
                 // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
                 `Task function operation '${message.taskFunctionOperation?.toString()}' failed on worker ${message.workerId?.toString()} with error: '${
                   // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
+                  message.workerError?.error?.message ??
                   message.workerError?.message
                 }'`
               )
@@ -1545,6 +1550,7 @@ export abstract class AbstractPool<
                     // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
                   }' failed on worker ${errorResponse?.workerId?.toString()} with error: '${
                     // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
+                    errorResponse?.workerError?.error?.message ??
                     errorResponse?.workerError?.message
                   }'`
                 )
index 3c49745785a7c1dc334015d81f40b2994221dfb2..c5caefc1dd00a4ff5556c3d2fc449bda9bc071b6 100644 (file)
@@ -14,10 +14,14 @@ export interface WorkerError<Data = unknown> {
    * Data triggering the error.
    */
   readonly data?: Data
+  /**
+   * Error object.
+   */
+  readonly error?: Error
   /**
    * Error message.
    */
-  readonly message: string
+  readonly message?: string
   /**
    * Task function name triggering the error.
    */
index 3b63c4544eafd6932f831a5af90f99f88c3e6924..61a3e32f34900c0fdcfbc25e412f6c0655647740 100644 (file)
@@ -85,9 +85,11 @@ export abstract class AbstractWorker<
         taskId,
         workerError: {
           data,
-          // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
-          message: `Task function '${name!}' not found`,
           name,
+          ...this.handleError(
+            // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+            new Error(`Task function '${name!}' not found`)
+          ),
         },
       })
       return
@@ -126,9 +128,8 @@ export abstract class AbstractWorker<
           taskId,
           workerError: {
             data,
-            message: this.handleErrorMessage(error as Error | string),
             name,
-            stack: (error as Error).stack,
+            ...this.handleError(error as Error),
           },
         })
       })
@@ -162,9 +163,8 @@ export abstract class AbstractWorker<
         taskId,
         workerError: {
           data,
-          message: this.handleErrorMessage(error as Error | string),
           name,
-          stack: (error as Error).stack,
+          ...this.handleError(error as Error),
         },
       })
     } finally {
@@ -219,13 +219,14 @@ export abstract class AbstractWorker<
   }
 
   /**
-   * Handles an error and convert it if needed to its message string.
-   * Error are not structured-cloneable and cannot be sent to the main worker.
+   * Handles a worker error .
    * @param error - The error raised by the worker.
-   * @returns The error message.
+   * @returns The worker error object.
    */
-  protected handleErrorMessage (error: Error | string): string {
-    return error instanceof Error ? error.message : error
+  protected abstract handleError (error: Error): {
+    error?: Error
+    message?: string
+    stack?: string
   }
 
   /**
@@ -308,9 +309,8 @@ export abstract class AbstractWorker<
       ...(!status &&
         error != null && {
         workerError: {
-          message: this.handleErrorMessage(error as Error | string),
           name: taskFunctionProperties.name,
-          stack: error.stack,
+          ...this.handleError(error),
         },
       }),
     })
index e01e54e5f64e1b48b9d328e7d08c71ff100cee6e..5a611e80c74e49d670a6a801d38c3ce8c7db87f4 100644 (file)
@@ -45,6 +45,13 @@ export class ClusterWorker<
     super(cluster.isPrimary, cluster.worker, taskFunctions, opts)
   }
 
+  /**
+   * @inheritDoc
+   */
+  protected handleError (error: Error): { message: string; stack?: string } {
+    return { message: error.message, stack: error.stack }
+  }
+
   /** @inheritDoc */
   protected handleReadyMessage (message: MessageValue<Data>): void {
     if (message.workerId === this.id && message.ready === false) {
index f1f9602fd7325ecc9665cccec6d4d0774ab6f593..4204d565aa033bd42b9d7303306d9ad11c449365 100644 (file)
@@ -58,8 +58,8 @@ export class ThreadWorker<
   /**
    * @inheritDoc
    */
-  protected override handleErrorMessage (error: Error | string): string {
-    return error as string
+  protected handleError (error: Error): { error: Error } {
+    return { error }
   }
 
   /** @inheritDoc */
index 777fff9f62574be885052bcce2e3d7b2f6215837..d9d82f8ab76d6ec304cb2792ff5769ffcf2403a7 100644 (file)
@@ -959,8 +959,8 @@ describe('Abstract pool test suite', () => {
     await expect(pool.execute(undefined, undefined, {})).rejects.toThrow(
       new TypeError('transferList argument must be an array')
     )
-    await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
-      "Task function 'unknown' not found"
+    await expect(pool.execute(undefined, 'unknown')).rejects.toThrow(
+      new Error("Task function 'unknown' not found")
     )
     await pool.destroy()
     await expect(pool.execute()).rejects.toThrow(
@@ -1783,21 +1783,21 @@ describe('Abstract pool test suite', () => {
     const workerId = dynamicThreadPool.workerNodes[0].info.id
     await expect(dynamicThreadPool.setDefaultTaskFunction(0)).rejects.toThrow(
       new Error(
-        `Task function operation 'default' failed on worker ${workerId} with error: 'TypeError: name parameter is not a string'`
+        `Task function operation 'default' failed on worker ${workerId} with error: 'name parameter is not a string'`
       )
     )
     await expect(
       dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
     ).rejects.toThrow(
       new Error(
-        `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function reserved name as the default task function'`
+        `Task function operation 'default' failed on worker ${workerId} with error: 'Cannot set the default task function reserved name as the default task function'`
       )
     )
     await expect(
       dynamicThreadPool.setDefaultTaskFunction('unknown')
     ).rejects.toThrow(
       new Error(
-        `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function to a non-existing task function'`
+        `Task function operation 'default' failed on worker ${workerId} with error: 'Cannot set the default task function to a non-existing task function'`
       )
     )
     expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
@@ -1918,8 +1918,8 @@ describe('Abstract pool test suite', () => {
     await expect(pool.mapExecute([undefined], undefined, {})).rejects.toThrow(
       new TypeError('transferList argument must be an array')
     )
-    await expect(pool.mapExecute([undefined], 'unknown')).rejects.toBe(
-      "Task function 'unknown' not found"
+    await expect(pool.mapExecute([undefined], 'unknown')).rejects.toThrow(
+      new Error("Task function 'unknown' not found")
     )
     let results = await pool.mapExecute(
       [{}, {}, {}, {}],
index 493399b32ac827a1828f8373108df151a3e415af..748e9b2b26b340ac8b6b2c7be0d2e9a67c250850 100644 (file)
@@ -176,7 +176,9 @@ describe('Fixed cluster pool test suite', () => {
     } catch (e) {
       inError = e
     }
-    expect(inError).toStrictEqual('Error Message from ClusterWorker')
+    expect(inError).toBeInstanceOf(Error)
+    expect(inError.message).toStrictEqual('Error Message from ClusterWorker')
+    expect(typeof inError.stack === 'string').toBe(true)
     expect(taskError).toStrictEqual({
       data,
       message: 'Error Message from ClusterWorker',
@@ -206,7 +208,11 @@ describe('Fixed cluster pool test suite', () => {
     } catch (e) {
       inError = e
     }
-    expect(inError).toStrictEqual('Error Message from ClusterWorker:async')
+    expect(inError).toBeInstanceOf(Error)
+    expect(inError.message).toStrictEqual(
+      'Error Message from ClusterWorker:async'
+    )
+    expect(typeof inError.stack === 'string').toBe(true)
     expect(taskError).toStrictEqual({
       data,
       message: 'Error Message from ClusterWorker:async',
index 904d6a7f0543bc5a7a7ae4651b8578e1c517ab7d..8086bcfcaeee9d255b21ff5cd38b5edb520a0094 100644 (file)
@@ -206,9 +206,8 @@ describe('Fixed thread pool test suite', () => {
     expect(inError.message).toStrictEqual('Error Message from ThreadWorker')
     expect(taskError).toStrictEqual({
       data,
-      message: new Error('Error Message from ThreadWorker'),
+      error: new Error('Error Message from ThreadWorker'),
       name: DEFAULT_TASK_NAME,
-      stack: expect.any(String),
     })
     expect(
       errorPool.workerNodes.some(
@@ -239,9 +238,8 @@ describe('Fixed thread pool test suite', () => {
     )
     expect(taskError).toStrictEqual({
       data,
-      message: new Error('Error Message from ThreadWorker:async'),
+      error: new Error('Error Message from ThreadWorker:async'),
       name: DEFAULT_TASK_NAME,
-      stack: expect.any(String),
     })
     expect(
       asyncErrorPool.workerNodes.some(
index f76b0bfacc43ae95f8e1931e35237ec862877cdc..a7de4671e1cb4b6db6b2d48256e2e28f121b3249 100644 (file)
@@ -90,12 +90,13 @@ describe('Cluster worker test suite', () => {
     expect(worker.getMainWorker().send.calledOnce).toBe(true)
   })
 
-  it('Verify that handleErrorMessage() method is working properly', () => {
+  it('Verify that handleError() method is working properly', () => {
     const error = new Error('Error as an error')
     const worker = new ClusterWorker(() => {})
-    expect(worker.handleErrorMessage(error)).toStrictEqual(error.message)
-    const errorMessage = 'Error as a string'
-    expect(worker.handleErrorMessage(errorMessage)).toStrictEqual(errorMessage)
+    expect(worker.handleError(error)).toStrictEqual({
+      message: error.message,
+      stack: error.stack,
+    })
   })
 
   it('Verify that sendToMainWorker() method invokes the getMainWorker() and send() methods', () => {
index e3cd28daaeeb923018a02ad97ebf0d11154b1e21..2dbc9cb0baf609e1a21410a9c1b06789fda2c64d 100644 (file)
@@ -90,12 +90,10 @@ describe('Thread worker test suite', () => {
     expect(worker.port.postMessage.calledOnce).toBe(true)
   })
 
-  it('Verify that handleErrorMessage() method is working properly', () => {
+  it('Verify that handleError() method is working properly', () => {
     const error = new Error('Error as an error')
     const worker = new ThreadWorker(() => {})
-    expect(worker.handleErrorMessage(error)).toStrictEqual(error)
-    const errorMessage = 'Error as a string'
-    expect(worker.handleErrorMessage(errorMessage)).toStrictEqual(errorMessage)
+    expect(worker.handleError(error)).toStrictEqual({ error })
   })
 
   it('Verify that sendToMainWorker() method invokes the port property postMessage() method', () => {