refactor: cleanup and type task error messages
authorJérôme Benoit <jerome.benoit@sap.com>
Wed, 7 Jun 2023 20:43:12 +0000 (22:43 +0200)
committerJérôme Benoit <jerome.benoit@sap.com>
Wed, 7 Jun 2023 20:43:12 +0000 (22:43 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
src/pools/abstract-pool.ts
src/pools/cluster/fixed.ts
src/utility-types.ts
src/worker/abstract-worker.ts
tests/pools/cluster/fixed.test.js
tests/pools/thread/fixed.test.js

index 85a47e38f7c248f7a92ff9e53166e2121048015c..ac7da967eccda63f3cebc2e27d16d6f991b82226 100644 (file)
@@ -472,7 +472,7 @@ export abstract class AbstractPool<
       this.workerNodes[this.getWorkerNodeKey(worker)].tasksUsage
     --workerTasksUsage.running
     ++workerTasksUsage.ran
-    if (message.error != null) {
+    if (message.taskError != null) {
       ++workerTasksUsage.error
     }
     this.updateRunTimeTasksUsage(workerTasksUsage, message)
@@ -662,13 +662,10 @@ export abstract class AbstractPool<
         // Task execution response received
         const promiseResponse = this.promiseResponseMap.get(message.id)
         if (promiseResponse != null) {
-          if (message.error != null) {
-            promiseResponse.reject(message.error)
+          if (message.taskError != null) {
+            promiseResponse.reject(message.taskError.message)
             if (this.emitter != null) {
-              this.emitter.emit(PoolEvents.taskError, {
-                error: message.error,
-                errorData: message.errorData
-              })
+              this.emitter.emit(PoolEvents.taskError, message.taskError)
             }
           } else {
             promiseResponse.resolve(message.data as Response)
index f22906dfc00f89ba32430f63732492eb986dcfd3..592b3fe7d4234fe3df267074963b10301ee23707 100644 (file)
@@ -18,8 +18,7 @@ export interface ClusterPoolOptions extends PoolOptions<Worker> {
    *
    * @see https://nodejs.org/api/cluster.html#cluster_cluster_fork_env
    */
-  // eslint-disable-next-line @typescript-eslint/no-explicit-any
-  env?: any
+  env?: Record<string, unknown>
   /**
    * Cluster settings.
    *
index faeda0a5a4e4048ed9c16b5e48d8bdecd4b8c690..3917ad2042b920f7509749605d86f073208c71cb 100644 (file)
@@ -11,6 +11,17 @@ import type { IWorker, Task } from './pools/worker'
  */
 export type Draft<T> = { -readonly [P in keyof T]?: T[P] }
 
+export interface TaskError<Data = unknown> {
+  /**
+   * Error message.
+   */
+  message: string
+  /**
+   * Data passed to the worker triggering the error.
+   */
+  data?: Data
+}
+
 /**
  * Task performance.
  */
@@ -51,6 +62,7 @@ export interface WorkerStatistics {
  */
 export interface MessageValue<
   Data = unknown,
+  ErrorData = unknown,
   MainWorker extends ClusterWorker | MessagePort = ClusterWorker | MessagePort
 > extends Task<Data> {
   /**
@@ -60,11 +72,7 @@ export interface MessageValue<
   /**
    * Task error.
    */
-  readonly error?: string
-  /**
-   * Task data triggering task error.
-   */
-  readonly errorData?: unknown
+  readonly taskError?: TaskError<ErrorData>
   /**
    * Task performance.
    */
index 7805514a7b3b083385b96ac4d26218840b625b31..76a7ee70ba96a93e6a92934a6b9cc54d81dbe821 100644 (file)
@@ -146,7 +146,9 @@ export abstract class AbstractWorker<
    *
    * @param message - Message received.
    */
-  protected messageListener (message: MessageValue<Data, MainWorker>): void {
+  protected messageListener (
+    message: MessageValue<Data, Data, MainWorker>
+  ): void {
     if (message.id != null && message.data != null) {
       // Task message received
       const fn = this.getTaskFunction(message.name)
@@ -185,7 +187,9 @@ export abstract class AbstractWorker<
    *
    * @param message - The response message.
    */
-  protected abstract sendToMainWorker (message: MessageValue<Response>): void
+  protected abstract sendToMainWorker (
+    message: MessageValue<Response, Data>
+  ): void
 
   /**
    * Checks if the worker should be terminated, because its living too long.
@@ -231,8 +235,10 @@ export abstract class AbstractWorker<
     } catch (e) {
       const err = this.handleError(e as Error)
       this.sendToMainWorker({
-        error: err,
-        errorData: message.data,
+        taskError: {
+          message: err,
+          data: message.data
+        },
         id: message.id
       })
     } finally {
@@ -264,8 +270,10 @@ export abstract class AbstractWorker<
       .catch(e => {
         const err = this.handleError(e as Error)
         this.sendToMainWorker({
-          error: err,
-          errorData: message.data,
+          taskError: {
+            message: err,
+            data: message.data
+          },
           id: message.id
         })
       })
index c6cdce54617c36ffc05d76cfcfc5ff190cfda194..c879fc689a40cc3edc5e3e360d8cec45b5ba99d8 100644 (file)
@@ -145,8 +145,8 @@ describe('Fixed cluster pool test suite', () => {
     expect(typeof inError === 'string').toBe(true)
     expect(inError).toBe('Error Message from ClusterWorker')
     expect(taskError).toStrictEqual({
-      error: 'Error Message from ClusterWorker',
-      errorData: data
+      message: 'Error Message from ClusterWorker',
+      data
     })
     expect(
       errorPool.workerNodes.some(
@@ -171,8 +171,8 @@ describe('Fixed cluster pool test suite', () => {
     expect(typeof inError === 'string').toBe(true)
     expect(inError).toBe('Error Message from ClusterWorker:async')
     expect(taskError).toStrictEqual({
-      error: 'Error Message from ClusterWorker:async',
-      errorData: data
+      message: 'Error Message from ClusterWorker:async',
+      data
     })
     expect(
       asyncErrorPool.workerNodes.some(
index 4c90207cb9ffb61c7a764688a38536b22b5cb65e..90dd110cd44ce0da7c2d43dd7b5cd0fd442be26b 100644 (file)
@@ -147,8 +147,8 @@ describe('Fixed thread pool test suite', () => {
     expect(typeof inError.message === 'string').toBe(true)
     expect(inError.message).toBe('Error Message from ThreadWorker')
     expect(taskError).toStrictEqual({
-      error: new Error('Error Message from ThreadWorker'),
-      errorData: data
+      message: new Error('Error Message from ThreadWorker'),
+      data
     })
     expect(
       errorPool.workerNodes.some(
@@ -175,8 +175,8 @@ describe('Fixed thread pool test suite', () => {
     expect(typeof inError.message === 'string').toBe(true)
     expect(inError.message).toBe('Error Message from ThreadWorker:async')
     expect(taskError).toStrictEqual({
-      error: new Error('Error Message from ThreadWorker:async'),
-      errorData: data
+      message: new Error('Error Message from ThreadWorker:async'),
+      data
     })
     expect(
       asyncErrorPool.workerNodes.some(