feat: add initial infrastructure to track per task statistics
authorJérôme Benoit <jerome.benoit@sap.com>
Sun, 9 Jul 2023 13:37:28 +0000 (15:37 +0200)
committerJérôme Benoit <jerome.benoit@sap.com>
Sun, 9 Jul 2023 13:37:28 +0000 (15:37 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
src/pools/abstract-pool.ts
src/pools/worker-node.ts
src/pools/worker.ts
src/utility-types.ts
src/utils.ts
src/worker/abstract-worker.ts
tests/pools/cluster/fixed.test.js
tests/pools/thread/fixed.test.js

index c699b5282a7c0b862a4cc16b63ee5dd34637a643..bb37dbb70fe4aac89115fc5da929270f0a39dd8d 100644 (file)
@@ -2,6 +2,7 @@ import { randomUUID } from 'node:crypto'
 import { performance } from 'node:perf_hooks'
 import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
 import {
+  DEFAULT_TASK_NAME,
   DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
   EMPTY_FUNCTION,
   isKillBehavior,
@@ -588,7 +589,7 @@ export abstract class AbstractPool<
     const timestamp = performance.now()
     const workerNodeKey = this.chooseWorkerNode()
     const submittedTask: Task<Data> = {
-      name,
+      name: name ?? DEFAULT_TASK_NAME,
       // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
       data: data ?? ({} as Data),
       timestamp,
@@ -671,6 +672,11 @@ export abstract class AbstractPool<
     const workerUsage = this.workerNodes[workerNodeKey].usage
     ++workerUsage.tasks.executing
     this.updateWaitTimeWorkerUsage(workerUsage, task)
+    const tasksWorkerUsage = this.workerNodes[
+      workerNodeKey
+    ].getTasksWorkerUsage(task.name as string) as WorkerUsage
+    ++tasksWorkerUsage.tasks.executing
+    this.updateWaitTimeWorkerUsage(tasksWorkerUsage, task)
   }
 
   /**
@@ -684,10 +690,17 @@ export abstract class AbstractPool<
     worker: Worker,
     message: MessageValue<Response>
   ): void {
-    const workerUsage = this.workerNodes[this.getWorkerNodeKey(worker)].usage
+    const workerNodeKey = this.getWorkerNodeKey(worker)
+    const workerUsage = this.workerNodes[workerNodeKey].usage
     this.updateTaskStatisticsWorkerUsage(workerUsage, message)
     this.updateRunTimeWorkerUsage(workerUsage, message)
     this.updateEluWorkerUsage(workerUsage, message)
+    const tasksWorkerUsage = this.workerNodes[
+      workerNodeKey
+    ].getTasksWorkerUsage(message.name as string) as WorkerUsage
+    this.updateTaskStatisticsWorkerUsage(tasksWorkerUsage, message)
+    this.updateRunTimeWorkerUsage(tasksWorkerUsage, message)
+    this.updateEluWorkerUsage(tasksWorkerUsage, message)
   }
 
   private updateTaskStatisticsWorkerUsage (
index bbca79135b5d7cb3df575abccebd70c0719296f2..7a7fc237180d32c28b74250d5943867dcf80d9de 100644 (file)
@@ -21,6 +21,7 @@ implements IWorkerNode<Worker, Data> {
   public readonly worker: Worker
   public readonly info: WorkerInfo
   public usage: WorkerUsage
+  private readonly tasksUsage: Map<string, WorkerUsage>
   private readonly tasksQueue: Queue<Task<Data>>
 
   /**
@@ -33,6 +34,7 @@ implements IWorkerNode<Worker, Data> {
     this.worker = worker
     this.info = this.initWorkerInfo(worker, workerType)
     this.usage = this.initWorkerUsage()
+    this.tasksUsage = new Map<string, WorkerUsage>()
     this.tasksQueue = new Queue<Task<Data>>()
   }
 
@@ -65,8 +67,18 @@ implements IWorkerNode<Worker, Data> {
     this.tasksQueue.clear()
   }
 
+  /** @inheritdoc */
   public resetUsage (): void {
     this.usage = this.initWorkerUsage()
+    this.tasksUsage.clear()
+  }
+
+  /** @inheritdoc */
+  public getTasksWorkerUsage (name: string): WorkerUsage | undefined {
+    if (!this.tasksUsage.has(name)) {
+      this.tasksUsage.set(name, this.initWorkerUsage())
+    }
+    return this.tasksUsage.get(name)
   }
 
   private initWorkerInfo (worker: Worker, workerType: WorkerType): WorkerInfo {
index c6303221c9d2da4badc9af17ac6e8a80fda27d98..c0f84ce8819bc04e9078cb5d264a2ab4d9acd39f 100644 (file)
@@ -265,4 +265,8 @@ export interface IWorkerNode<Worker extends IWorker, Data = unknown> {
    * Worker node reset usage statistics .
    */
   readonly resetUsage: () => void
+  /**
+   * Worker node get tasks usage statistics.
+   */
+  readonly getTasksWorkerUsage: (name: string) => WorkerUsage | undefined
 }
index d74773190f4c1e477e86d4a4f4c984d7d61d9f40..fa1c0ffefdf27e6996789cc444f44d81de9076fc 100644 (file)
@@ -8,12 +8,16 @@ import type { IWorker, Task } from './pools/worker'
  * @typeParam Data - Type of data sent to the worker triggering an error. This can only be structured-cloneable data.
  */
 export interface TaskError<Data = unknown> {
+  /**
+   * Task name triggering the error.
+   */
+  readonly name: string
   /**
    * Error message.
    */
   readonly message: string
   /**
-   * Data passed to the worker triggering the error.
+   * Data triggering the error.
    */
   readonly data?: Data
 }
index 128b65da06f6c17b9fd12659521db3d8437a655b..0becc5ed9aed9147cfaa5351d60a5e757ba0554b 100644 (file)
@@ -5,6 +5,11 @@ import type {
 } from './pools/selection-strategies/selection-strategies-types'
 import type { KillBehavior } from './worker/worker-options'
 
+/**
+ * Default task name.
+ */
+export const DEFAULT_TASK_NAME = 'default'
+
 /**
  * An intentional empty function.
  */
index a01541156b2041d28faa135dcef7694b496962c0..1eac74272554b622086ffc20ad8738fa9874fd62 100644 (file)
@@ -7,7 +7,12 @@ import type {
   TaskPerformance,
   WorkerStatistics
 } from '../utility-types'
-import { EMPTY_FUNCTION, isAsyncFunction, isPlainObject } from '../utils'
+import {
+  DEFAULT_TASK_NAME,
+  EMPTY_FUNCTION,
+  isAsyncFunction,
+  isPlainObject
+} from '../utils'
 import {
   type KillBehavior,
   KillBehaviors,
@@ -20,7 +25,6 @@ import type {
   WorkerSyncFunction
 } from './worker-functions'
 
-const DEFAULT_FUNCTION_NAME = 'default'
 const DEFAULT_MAX_INACTIVE_TIME = 60000
 const DEFAULT_KILL_BEHAVIOR: KillBehavior = KillBehaviors.SOFT
 
@@ -114,7 +118,7 @@ export abstract class AbstractWorker<
     }
     this.taskFunctions = new Map<string, WorkerFunction<Data, Response>>()
     if (typeof taskFunctions === 'function') {
-      this.taskFunctions.set(DEFAULT_FUNCTION_NAME, taskFunctions.bind(this))
+      this.taskFunctions.set(DEFAULT_TASK_NAME, taskFunctions.bind(this))
     } else if (isPlainObject(taskFunctions)) {
       let firstEntry = true
       for (const [name, fn] of Object.entries(taskFunctions)) {
@@ -125,7 +129,7 @@ export abstract class AbstractWorker<
         }
         this.taskFunctions.set(name, fn.bind(this))
         if (firstEntry) {
-          this.taskFunctions.set(DEFAULT_FUNCTION_NAME, fn.bind(this))
+          this.taskFunctions.set(DEFAULT_TASK_NAME, fn.bind(this))
           firstEntry = false
         }
       }
@@ -264,6 +268,7 @@ export abstract class AbstractWorker<
       const errorMessage = this.handleError(e as Error | string)
       this.sendToMainWorker({
         taskError: {
+          name: message.name ?? DEFAULT_TASK_NAME,
           message: errorMessage,
           data: message.data
         },
@@ -303,6 +308,7 @@ export abstract class AbstractWorker<
         const errorMessage = this.handleError(e as Error | string)
         this.sendToMainWorker({
           taskError: {
+            name: message.name ?? DEFAULT_TASK_NAME,
             message: errorMessage,
             data: message.data
           },
@@ -321,10 +327,10 @@ export abstract class AbstractWorker<
   /**
    * Gets the task function in the given scope.
    *
-   * @param name - Name of the function that will be returned.
+   * @param name - Name of the task function that will be returned.
    */
   private getTaskFunction (name?: string): WorkerFunction<Data, Response> {
-    name = name ?? DEFAULT_FUNCTION_NAME
+    name = name ?? DEFAULT_TASK_NAME
     const fn = this.taskFunctions.get(name)
     if (fn == null) {
       throw new Error(`Task function '${name}' not found`)
@@ -335,7 +341,7 @@ export abstract class AbstractWorker<
   private beginTaskPerformance (name?: string): TaskPerformance {
     this.checkStatistics()
     return {
-      name: name ?? DEFAULT_FUNCTION_NAME,
+      name: name ?? DEFAULT_TASK_NAME,
       timestamp: performance.now(),
       ...(this.statistics.elu && { elu: performance.eventLoopUtilization() })
     }
index c638663f01c01ecdfde4b8c3c9b3ecaefe73539e..1d0f7b8c29fc2992f7415fc71cb8ad53e58bc1a9 100644 (file)
@@ -161,6 +161,7 @@ describe('Fixed cluster pool test suite', () => {
     expect(typeof inError === 'string').toBe(true)
     expect(inError).toBe('Error Message from ClusterWorker')
     expect(taskError).toStrictEqual({
+      name: 'default',
       message: 'Error Message from ClusterWorker',
       data
     })
@@ -187,6 +188,7 @@ describe('Fixed cluster pool test suite', () => {
     expect(typeof inError === 'string').toBe(true)
     expect(inError).toBe('Error Message from ClusterWorker:async')
     expect(taskError).toStrictEqual({
+      name: 'default',
       message: 'Error Message from ClusterWorker:async',
       data
     })
index b2842f6e367eb7839053ce55844dc57bedf6ad12..de81cb68c304e8ef703ff4bf819c4e5b20829237 100644 (file)
@@ -163,6 +163,7 @@ describe('Fixed thread pool test suite', () => {
     expect(typeof inError.message === 'string').toBe(true)
     expect(inError.message).toBe('Error Message from ThreadWorker')
     expect(taskError).toStrictEqual({
+      name: 'default',
       message: new Error('Error Message from ThreadWorker'),
       data
     })
@@ -191,6 +192,7 @@ describe('Fixed thread pool test suite', () => {
     expect(typeof inError.message === 'string').toBe(true)
     expect(inError.message).toBe('Error Message from ThreadWorker:async')
     expect(taskError).toStrictEqual({
+      name: 'default',
       message: new Error('Error Message from ThreadWorker:async'),
       data
     })