refactor: use task performance data structure in messages
authorJérôme Benoit <jerome.benoit@sap.com>
Wed, 7 Jun 2023 19:53:49 +0000 (21:53 +0200)
committerJérôme Benoit <jerome.benoit@sap.com>
Wed, 7 Jun 2023 19:53:49 +0000 (21:53 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
src/index.ts
src/pools/abstract-pool.ts
src/utility-types.ts
src/worker/abstract-worker.ts

index 0d9491572fa292dc3381aefaea3a6730dcee5b66..1895d34877410bdca90ca30b60acf13c48db7299 100644 (file)
@@ -39,7 +39,7 @@ export {
   type ThreadPoolOptions,
   type ThreadWorkerWithMessageChannel
 } from './pools/thread/fixed'
-export type { AbstractWorker, TaskPerformance } from './worker/abstract-worker'
+export type { AbstractWorker } from './worker/abstract-worker'
 export { ClusterWorker } from './worker/cluster-worker'
 export { ThreadWorker } from './worker/thread-worker'
 export { KillBehaviors } from './worker/worker-options'
@@ -54,6 +54,7 @@ export type {
   Draft,
   MessageValue,
   PromiseResponseWrapper,
+  TaskPerformance,
   WorkerStatistics
 } from './utility-types'
 export type { CircularArray } from './circular-array'
index b93915c48e31111b93eb2199ce50fd26dae222b6..85a47e38f7c248f7a92ff9e53166e2121048015c 100644 (file)
@@ -485,7 +485,7 @@ export abstract class AbstractPool<
     message: MessageValue<Response>
   ): void {
     if (this.workerChoiceStrategyContext.getTaskStatistics().runTime) {
-      workerTasksUsage.runTime += message.runTime ?? 0
+      workerTasksUsage.runTime += message.taskPerformance?.runTime ?? 0
       if (
         this.workerChoiceStrategyContext.getTaskStatistics().avgRunTime &&
         workerTasksUsage.ran !== 0
@@ -495,9 +495,9 @@ export abstract class AbstractPool<
       }
       if (
         this.workerChoiceStrategyContext.getTaskStatistics().medRunTime &&
-        message.runTime != null
+        message.taskPerformance?.runTime != null
       ) {
-        workerTasksUsage.runTimeHistory.push(message.runTime)
+        workerTasksUsage.runTimeHistory.push(message.taskPerformance.runTime)
         workerTasksUsage.medRunTime = median(workerTasksUsage.runTimeHistory)
       }
     }
@@ -508,7 +508,7 @@ export abstract class AbstractPool<
     message: MessageValue<Response>
   ): void {
     if (this.workerChoiceStrategyContext.getTaskStatistics().waitTime) {
-      workerTasksUsage.waitTime += message.waitTime ?? 0
+      workerTasksUsage.waitTime += message.taskPerformance?.waitTime ?? 0
       if (
         this.workerChoiceStrategyContext.getTaskStatistics().avgWaitTime &&
         workerTasksUsage.ran !== 0
@@ -518,9 +518,9 @@ export abstract class AbstractPool<
       }
       if (
         this.workerChoiceStrategyContext.getTaskStatistics().medWaitTime &&
-        message.waitTime != null
+        message.taskPerformance?.waitTime != null
       ) {
-        workerTasksUsage.waitTimeHistory.push(message.waitTime)
+        workerTasksUsage.waitTimeHistory.push(message.taskPerformance.waitTime)
         workerTasksUsage.medWaitTime = median(workerTasksUsage.waitTimeHistory)
       }
     }
@@ -531,15 +531,21 @@ export abstract class AbstractPool<
     message: MessageValue<Response>
   ): void {
     if (this.workerChoiceStrategyContext.getTaskStatistics().elu) {
-      if (workerTasksUsage.elu != null && message.elu != null) {
+      if (
+        workerTasksUsage.elu != null &&
+        message.taskPerformance?.elu != null
+      ) {
         workerTasksUsage.elu = {
-          idle: workerTasksUsage.elu.idle + message.elu.idle,
-          active: workerTasksUsage.elu.active + message.elu.active,
+          idle: workerTasksUsage.elu.idle + message.taskPerformance.elu.idle,
+          active:
+            workerTasksUsage.elu.active + message.taskPerformance.elu.active,
           utilization:
-            (workerTasksUsage.elu.utilization + message.elu.utilization) / 2
+            (workerTasksUsage.elu.utilization +
+              message.taskPerformance.elu.utilization) /
+            2
         }
-      } else if (message.elu != null) {
-        workerTasksUsage.elu = message.elu
+      } else if (message.taskPerformance?.elu != null) {
+        workerTasksUsage.elu = message.taskPerformance.elu
       }
     }
   }
index 9cdf6b0a7e82f9d1e3da5fb434537ca8e2d943f9..faeda0a5a4e4048ed9c16b5e48d8bdecd4b8c690 100644 (file)
@@ -11,6 +11,28 @@ import type { IWorker, Task } from './pools/worker'
  */
 export type Draft<T> = { -readonly [P in keyof T]?: T[P] }
 
+/**
+ * Task performance.
+ */
+export interface TaskPerformance {
+  /**
+   * Task performance timestamp.
+   */
+  timestamp: number
+  /**
+   * Task runtime.
+   */
+  runTime?: number
+  /**
+   * Task wait time.
+   */
+  waitTime?: number
+  /**
+   * Task event loop utilization.
+   */
+  elu?: EventLoopUtilization
+}
+
 /**
  * Performance statistics computation.
  */
@@ -44,17 +66,9 @@ export interface MessageValue<
    */
   readonly errorData?: unknown
   /**
-   * Runtime.
-   */
-  readonly runTime?: number
-  /**
-   * Wait time.
-   */
-  readonly waitTime?: number
-  /**
-   * Event loop utilization.
+   * Task performance.
    */
-  readonly elu?: EventLoopUtilization
+  readonly taskPerformance?: TaskPerformance
   /**
    * Reference to main worker.
    */
index b7c542aa469f40bf4feb50aa99599b52bb7bc44e..7805514a7b3b083385b96ac4d26218840b625b31 100644 (file)
@@ -1,8 +1,12 @@
 import { AsyncResource } from 'node:async_hooks'
 import type { Worker } from 'node:cluster'
 import type { MessagePort } from 'node:worker_threads'
-import { type EventLoopUtilization, performance } from 'node:perf_hooks'
-import type { MessageValue, WorkerStatistics } from '../utility-types'
+import { performance } from 'node:perf_hooks'
+import type {
+  MessageValue,
+  TaskPerformance,
+  WorkerStatistics
+} from '../utility-types'
 import { EMPTY_FUNCTION, isPlainObject } from '../utils'
 import {
   type KillBehavior,
@@ -20,16 +24,6 @@ const DEFAULT_FUNCTION_NAME = 'default'
 const DEFAULT_MAX_INACTIVE_TIME = 60000
 const DEFAULT_KILL_BEHAVIOR: KillBehavior = KillBehaviors.SOFT
 
-/**
- * Task performance.
- */
-export interface TaskPerformance {
-  timestamp: number
-  waitTime?: number
-  runTime?: number
-  elu?: EventLoopUtilization
-}
-
 /**
  * Base class that implements some shared logic for all poolifier workers.
  *
@@ -226,15 +220,12 @@ export abstract class AbstractWorker<
     message: MessageValue<Data>
   ): void {
     try {
-      const taskPerformance = this.beginTaskPerformance(message)
+      let taskPerformance = this.beginTaskPerformance(message)
       const res = fn(message.data)
-      const { runTime, waitTime, elu } =
-        this.endTaskPerformance(taskPerformance)
+      taskPerformance = this.endTaskPerformance(taskPerformance)
       this.sendToMainWorker({
         data: res,
-        runTime,
-        waitTime,
-        elu,
+        taskPerformance,
         id: message.id
       })
     } catch (e) {
@@ -259,16 +250,13 @@ export abstract class AbstractWorker<
     fn: WorkerAsyncFunction<Data, Response>,
     message: MessageValue<Data>
   ): void {
-    const taskPerformance = this.beginTaskPerformance(message)
+    let taskPerformance = this.beginTaskPerformance(message)
     fn(message.data)
       .then(res => {
-        const { runTime, waitTime, elu } =
-          this.endTaskPerformance(taskPerformance)
+        taskPerformance = this.endTaskPerformance(taskPerformance)
         this.sendToMainWorker({
           data: res,
-          runTime,
-          waitTime,
-          elu,
+          taskPerformance,
           id: message.id
         })
         return null