refactor: cleanup task handling in worker code
authorJérôme Benoit <jerome.benoit@sap.com>
Sun, 9 Jul 2023 21:19:13 +0000 (23:19 +0200)
committerJérôme Benoit <jerome.benoit@sap.com>
Sun, 9 Jul 2023 21:19:13 +0000 (23:19 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
src/index.ts
src/pools/abstract-pool.ts
src/pools/worker-node.ts
src/pools/worker.ts
src/utility-types.ts
src/worker/abstract-worker.ts

index 54c352fc9f4f4b527201b4ef30198cd1c89e5c28..8a3cb9b1e2ef2521a83bfed53267980d61d8e786 100644 (file)
@@ -24,7 +24,6 @@ export type {
   MeasurementStatistics,
   MessageHandler,
   OnlineHandler,
-  Task,
   TaskStatistics,
   WorkerInfo,
   WorkerType,
@@ -61,6 +60,7 @@ export type {
 export type {
   MessageValue,
   PromiseResponseWrapper,
+  Task,
   TaskError,
   TaskPerformance,
   WorkerStatistics
index 615cf29604cdb1b2309b412be1df8f358d09958e..026bf918a40ce0158c3c159cd7734a6919f6e9c7 100644 (file)
@@ -1,6 +1,10 @@
 import { randomUUID } from 'node:crypto'
 import { performance } from 'node:perf_hooks'
-import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
+import type {
+  MessageValue,
+  PromiseResponseWrapper,
+  Task
+} from '../utility-types'
 import {
   DEFAULT_TASK_NAME,
   DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
@@ -25,7 +29,6 @@ import type {
   IWorker,
   IWorkerNode,
   MessageHandler,
-  Task,
   WorkerInfo,
   WorkerType,
   WorkerUsage
index a5cbbbfb98028b4269c1d4c615a70ea7ea0b741b..5c172d8aad3916e96e44b6424d0526ad0d31afdb 100644 (file)
@@ -1,9 +1,9 @@
 import { CircularArray } from '../circular-array'
 import { Queue } from '../queue'
+import type { Task } from '../utility-types'
 import {
   type IWorker,
   type IWorkerNode,
-  type Task,
   type WorkerInfo,
   type WorkerType,
   WorkerTypes,
index 055a326ad6448add8c87d9a0bdfed8b325066770..58610eaaddacc2beea93809f80675bb8530010c0 100644 (file)
@@ -1,4 +1,5 @@
 import type { CircularArray } from '../circular-array'
+import type { Task } from '../utility-types'
 
 /**
  * Callback invoked if the worker has received a message.
@@ -29,35 +30,6 @@ export type ExitHandler<Worker extends IWorker> = (
   exitCode: number
 ) => void
 
-/**
- * Message object that is passed as a task between main worker and worker.
- *
- * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
- * @internal
- */
-export interface Task<Data = unknown> {
-  /**
-   * Worker id.
-   */
-  readonly workerId: number
-  /**
-   * Task name.
-   */
-  readonly name?: string
-  /**
-   * Task input data that will be passed to the worker.
-   */
-  readonly data?: Data
-  /**
-   * Timestamp.
-   */
-  readonly timestamp?: number
-  /**
-   * Message UUID.
-   */
-  readonly id?: string
-}
-
 /**
  * Measurement statistics.
  *
index fa1c0ffefdf27e6996789cc444f44d81de9076fc..28bf19e256abaef2f619051773ad55456643b842 100644 (file)
@@ -1,6 +1,6 @@
 import type { EventLoopUtilization } from 'node:perf_hooks'
 import type { KillBehavior } from './worker/worker-options'
-import type { IWorker, Task } from './pools/worker'
+import type { IWorker } from './pools/worker'
 
 /**
  * Task error.
@@ -56,6 +56,35 @@ export interface WorkerStatistics {
   elu: boolean
 }
 
+/**
+ * Message object that is passed as a task between main worker and worker.
+ *
+ * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
+ * @internal
+ */
+export interface Task<Data = unknown> {
+  /**
+   * Worker id.
+   */
+  readonly workerId: number
+  /**
+   * Task name.
+   */
+  readonly name?: string
+  /**
+   * Task input data that will be passed to the worker.
+   */
+  readonly data?: Data
+  /**
+   * Timestamp.
+   */
+  readonly timestamp?: number
+  /**
+   * Message UUID.
+   */
+  readonly id?: string
+}
+
 /**
  * Message object that is passed between main worker and worker.
  *
index 5e7144877357806dacae29c1bf28993894616ca1..7aa626f1610a3cd4f4e30ca23232b640a1eca410 100644 (file)
@@ -4,6 +4,7 @@ import type { MessagePort } from 'node:worker_threads'
 import { performance } from 'node:perf_hooks'
 import type {
   MessageValue,
+  Task,
   TaskPerformance,
   WorkerStatistics
 } from '../utility-types'
@@ -280,12 +281,7 @@ export abstract class AbstractWorker<
         message.checkAlive ? this.startCheckAlive() : this.stopCheckAlive()
       } else if (message.id != null && message.data != null) {
         // Task message received
-        const fn = this.getTaskFunction(message.name)
-        if (isAsyncFunction(fn)) {
-          this.runInAsyncScope(this.runAsync.bind(this), this, fn, message)
-        } else {
-          this.runInAsyncScope(this.runSync.bind(this), this, fn, message)
-        }
+        this.run(message)
       } else if (message.kill === true) {
         // Kill message received
         this.stopCheckAlive()
@@ -363,36 +359,51 @@ export abstract class AbstractWorker<
     return e instanceof Error ? e.message : e
   }
 
+  /**
+   * Runs the given task.
+   *
+   * @param task - The task to execute.
+   * @throws {@link https://nodejs.org/api/errors.html#class-error} If the task function is not found.
+   */
+  protected run (task: Task<Data>): void {
+    const fn = this.getTaskFunction(task.name)
+    if (isAsyncFunction(fn)) {
+      this.runInAsyncScope(this.runAsync.bind(this), this, fn, task)
+    } else {
+      this.runInAsyncScope(this.runSync.bind(this), this, fn, task)
+    }
+  }
+
   /**
    * Runs the given function synchronously.
    *
-   * @param fn - Function that will be executed.
-   * @param message - Input data for the given function.
+   * @param fn - Task function that will be executed.
+   * @param task - Input data for the task function.
    */
   protected runSync (
     fn: WorkerSyncFunction<Data, Response>,
-    message: MessageValue<Data>
+    task: Task<Data>
   ): void {
     try {
-      let taskPerformance = this.beginTaskPerformance(message.name)
-      const res = fn(message.data)
+      let taskPerformance = this.beginTaskPerformance(task.name)
+      const res = fn(task.data)
       taskPerformance = this.endTaskPerformance(taskPerformance)
       this.sendToMainWorker({
         data: res,
         taskPerformance,
         workerId: this.id,
-        id: message.id
+        id: task.id
       })
     } catch (e) {
       const errorMessage = this.handleError(e as Error | string)
       this.sendToMainWorker({
         taskError: {
-          name: message.name ?? DEFAULT_TASK_NAME,
+          name: task.name ?? DEFAULT_TASK_NAME,
           message: errorMessage,
-          data: message.data
+          data: task.data
         },
         workerId: this.id,
-        id: message.id
+        id: task.id
       })
     } finally {
       if (!this.isMain && this.aliveInterval != null) {
@@ -404,22 +415,22 @@ export abstract class AbstractWorker<
   /**
    * Runs the given function asynchronously.
    *
-   * @param fn - Function that will be executed.
-   * @param message - Input data for the given function.
+   * @param fn - Task function that will be executed.
+   * @param task - Input data for the task function.
    */
   protected runAsync (
     fn: WorkerAsyncFunction<Data, Response>,
-    message: MessageValue<Data>
+    task: Task<Data>
   ): void {
-    let taskPerformance = this.beginTaskPerformance(message.name)
-    fn(message.data)
+    let taskPerformance = this.beginTaskPerformance(task.name)
+    fn(task.data)
       .then(res => {
         taskPerformance = this.endTaskPerformance(taskPerformance)
         this.sendToMainWorker({
           data: res,
           taskPerformance,
           workerId: this.id,
-          id: message.id
+          id: task.id
         })
         return null
       })
@@ -427,12 +438,12 @@ export abstract class AbstractWorker<
         const errorMessage = this.handleError(e as Error | string)
         this.sendToMainWorker({
           taskError: {
-            name: message.name ?? DEFAULT_TASK_NAME,
+            name: task.name ?? DEFAULT_TASK_NAME,
             message: errorMessage,
-            data: message.data
+            data: task.data
           },
           workerId: this.id,
-          id: message.id
+          id: task.id
         })
       })
       .finally(() => {
@@ -444,9 +455,11 @@ export abstract class AbstractWorker<
   }
 
   /**
-   * Gets the task function in the given scope.
+   * Gets the task function with the given name.
    *
    * @param name - Name of the task function that will be returned.
+   * @returns The task function.
+   * @throws {@link https://nodejs.org/api/errors.html#class-error} If the task function is not found.
    */
   private getTaskFunction (name?: string): WorkerFunction<Data, Response> {
     name = name ?? DEFAULT_TASK_NAME