perf: update worker choice strategies internal if needed
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Mon, 18 Dec 2023 13:07:38 +0000 (14:07 +0100)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Mon, 18 Dec 2023 13:07:38 +0000 (14:07 +0100)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
package.json
pnpm-lock.yaml
src/pools/abstract-pool.ts
src/pools/utils.ts

index 80bf3adf8312f25c8544f845368804a3c656d679..4679669c86a0f1a92bc2388544d4e7b29088aeac 100644 (file)
@@ -22,8 +22,8 @@
     "benchmark": "pnpm build && node --max-old-space-size=8192 --enable-source-maps benchmarks/internal/bench.mjs",
     "benchmark:prod": "pnpm build:prod && node --max-old-space-size=8192 --enable-source-maps benchmarks/internal/bench.mjs",
     "benchmark:debug": "pnpm build && node --max-old-space-size=8192 --enable-source-maps --inspect benchmarks/internal/bench.mjs",
-    "test": "pnpm build --environment SOURCEMAP:false && c8 mocha 'tests/**/*.test.mjs'",
-    "test:debug": "pnpm build && mocha --no-parallel --inspect 'tests/**/*.test.mjs'",
+    "test": "pnpm build --environment SOURCEMAP:false && cross-env NODE_ENV=test c8 mocha 'tests/**/*.test.mjs'",
+    "test:debug": "pnpm build && cross-env NODE_ENV=test mocha --no-parallel --inspect 'tests/**/*.test.mjs'",
     "coverage": "c8 report --reporter=lcov",
     "coverage:html": "c8 report --reporter=html",
     "format": "biome format . --write; ts-standard . --fix",
     "@release-it/keep-a-changelog": "^5.0.0",
     "@rollup/plugin-terser": "^0.4.4",
     "@rollup/plugin-typescript": "^11.1.5",
-    "@types/node": "^20.10.4",
+    "@types/node": "^20.10.5",
     "@typescript-eslint/eslint-plugin": "^6.14.0",
     "@typescript-eslint/parser": "^6.14.0",
     "benchmark": "^2.1.4",
     "c8": "^8.0.1",
+    "cross-env": "^7.0.3",
     "eslint": "^8.56.0",
     "eslint-config-standard": "^17.1.0",
     "eslint-config-standard-with-typescript": "^43.0.0",
index 91a52991855027ef43daf24cac0cc6463e250e2a..d0c70337afbdd69eea91eebbbf72638c81afacb3 100644 (file)
@@ -30,8 +30,8 @@ devDependencies:
     specifier: ^11.1.5
     version: 11.1.5(rollup@4.9.1)(typescript@5.3.3)
   '@types/node':
-    specifier: ^20.10.4
-    version: 20.10.4
+    specifier: ^20.10.5
+    version: 20.10.5
   '@typescript-eslint/eslint-plugin':
     specifier: ^6.14.0
     version: 6.14.0(@typescript-eslint/parser@6.14.0)(eslint@8.56.0)(typescript@5.3.3)
@@ -44,6 +44,9 @@ devDependencies:
   c8:
     specifier: ^8.0.1
     version: 8.0.1
+  cross-env:
+    specifier: ^7.0.3
+    version: 7.0.3
   eslint:
     specifier: ^8.56.0
     version: 8.56.0
@@ -492,7 +495,7 @@ packages:
       '@jest/schemas': 29.6.3
       '@types/istanbul-lib-coverage': 2.0.6
       '@types/istanbul-reports': 3.0.4
-      '@types/node': 20.10.4
+      '@types/node': 20.10.5
       '@types/yargs': 17.0.32
       chalk: 4.1.2
     dev: true
@@ -948,7 +951,7 @@ packages:
     resolution: {integrity: sha512-ZUxbzKl0IfJILTS6t7ip5fQQM/J3TJYubDm3nMbgubNNYS62eXeUpoLUC8/7fJNiFYHTrGPQn7hspDUzIHX3UA==}
     dependencies:
       '@types/minimatch': 5.1.2
-      '@types/node': 20.10.4
+      '@types/node': 20.10.5
     dev: true
 
   /@types/http-cache-semantics@4.0.4:
@@ -993,8 +996,8 @@ packages:
       undici-types: 5.26.5
     dev: true
 
-  /@types/node@20.10.4:
-    resolution: {integrity: sha512-D08YG6rr8X90YB56tSIuBaddy/UXAA9RKJoFvrsnogAum/0pmjkgi4+2nx96A330FmioegBWmEYQ+syqCFaveg==}
+  /@types/node@20.10.5:
+    resolution: {integrity: sha512-nNPsNE65wjMxEKI93yOP+NPGGBJz/PoN3kZsVLee0XMiJolxSekEVD8wRwBUBqkwc7UWop0edW50yrCQW4CyRw==}
     dependencies:
       undici-types: 5.26.5
     dev: true
@@ -1990,6 +1993,14 @@ packages:
       typescript: 5.3.3
     dev: true
 
+  /cross-env@7.0.3:
+    resolution: {integrity: sha512-+/HKd6EgcQCJGh2PSjZuUitQBQynKor4wrFbRg4DtAgS1aWO+gU52xpH7M9ScGgXSYmAVS9bIJ8EzuaGw0oNAw==}
+    engines: {node: '>=10.14', npm: '>=6', yarn: '>=1'}
+    hasBin: true
+    dependencies:
+      cross-spawn: 7.0.3
+    dev: true
+
   /cross-spawn@7.0.3:
     resolution: {integrity: sha512-iRDPJKUPVEND7dHPO8rkbOnPpyDygcDFtWjpeWNCgy8WP2rXcxXL8TskReQl6OrB2G7+UJrags1q15Fudc7G6w==}
     engines: {node: '>= 8'}
@@ -4105,7 +4116,7 @@ packages:
     engines: {node: ^14.15.0 || ^16.10.0 || >=18.0.0}
     dependencies:
       '@jest/types': 29.6.3
-      '@types/node': 20.10.4
+      '@types/node': 20.10.5
       chalk: 4.1.2
       ci-info: 3.9.0
       graceful-fs: 4.2.11
index ea07dc8f0061ec0ccfb2a6adb92fc598c413a423..670d490daa18c0cd4e3e03217df69275aa35b824 100644 (file)
@@ -43,7 +43,6 @@ import type {
   WorkerUsage
 } from './worker'
 import {
-  type MeasurementStatisticsRequirements,
   Measurements,
   WorkerChoiceStrategies,
   type WorkerChoiceStrategy,
@@ -56,7 +55,10 @@ import {
   checkFilePath,
   checkValidTasksQueueOptions,
   checkValidWorkerChoiceStrategy,
-  updateMeasurementStatistics,
+  updateEluWorkerUsage,
+  updateRunTimeWorkerUsage,
+  updateTaskStatisticsWorkerUsage,
+  updateWaitTimeWorkerUsage,
   waitWorkerNodeEvents
 } from './utils'
 
@@ -1082,7 +1084,11 @@ export abstract class AbstractPool<
     if (this.workerNodes[workerNodeKey]?.usage != null) {
       const workerUsage = this.workerNodes[workerNodeKey].usage
       ++workerUsage.tasks.executing
-      this.updateWaitTimeWorkerUsage(workerUsage, task)
+      updateWaitTimeWorkerUsage(
+        this.workerChoiceStrategyContext,
+        workerUsage,
+        task
+      )
     }
     if (
       this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
@@ -1094,7 +1100,11 @@ export abstract class AbstractPool<
         workerNodeKey
       ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
       ++taskFunctionWorkerUsage.tasks.executing
-      this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
+      updateWaitTimeWorkerUsage(
+        this.workerChoiceStrategyContext,
+        taskFunctionWorkerUsage,
+        task
+      )
     }
   }
 
@@ -1109,11 +1119,21 @@ export abstract class AbstractPool<
     workerNodeKey: number,
     message: MessageValue<Response>
   ): void {
+    let needWorkerChoiceStrategyUpdate = false
     if (this.workerNodes[workerNodeKey]?.usage != null) {
       const workerUsage = this.workerNodes[workerNodeKey].usage
-      this.updateTaskStatisticsWorkerUsage(workerUsage, message)
-      this.updateRunTimeWorkerUsage(workerUsage, message)
-      this.updateEluWorkerUsage(workerUsage, message)
+      updateTaskStatisticsWorkerUsage(workerUsage, message)
+      updateRunTimeWorkerUsage(
+        this.workerChoiceStrategyContext,
+        workerUsage,
+        message
+      )
+      updateEluWorkerUsage(
+        this.workerChoiceStrategyContext,
+        workerUsage,
+        message
+      )
+      needWorkerChoiceStrategyUpdate = true
     }
     if (
       this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
@@ -1126,9 +1146,21 @@ export abstract class AbstractPool<
       ].getTaskFunctionWorkerUsage(
         message.taskPerformance?.name as string
       ) as WorkerUsage
-      this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
-      this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
-      this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
+      updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
+      updateRunTimeWorkerUsage(
+        this.workerChoiceStrategyContext,
+        taskFunctionWorkerUsage,
+        message
+      )
+      updateEluWorkerUsage(
+        this.workerChoiceStrategyContext,
+        taskFunctionWorkerUsage,
+        message
+      )
+      needWorkerChoiceStrategyUpdate = true
+    }
+    if (needWorkerChoiceStrategyUpdate) {
+      this.workerChoiceStrategyContext.update(workerNodeKey)
     }
   }
 
@@ -1147,84 +1179,6 @@ export abstract class AbstractPool<
     )
   }
 
-  private updateTaskStatisticsWorkerUsage (
-    workerUsage: WorkerUsage,
-    message: MessageValue<Response>
-  ): void {
-    const workerTaskStatistics = workerUsage.tasks
-    if (
-      workerTaskStatistics.executing != null &&
-      workerTaskStatistics.executing > 0
-    ) {
-      --workerTaskStatistics.executing
-    }
-    if (message.workerError == null) {
-      ++workerTaskStatistics.executed
-    } else {
-      ++workerTaskStatistics.failed
-    }
-  }
-
-  private updateRunTimeWorkerUsage (
-    workerUsage: WorkerUsage,
-    message: MessageValue<Response>
-  ): void {
-    if (message.workerError != null) {
-      return
-    }
-    updateMeasurementStatistics(
-      workerUsage.runTime,
-      this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
-      message.taskPerformance?.runTime ?? 0
-    )
-  }
-
-  private updateWaitTimeWorkerUsage (
-    workerUsage: WorkerUsage,
-    task: Task<Data>
-  ): void {
-    const timestamp = performance.now()
-    const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
-    updateMeasurementStatistics(
-      workerUsage.waitTime,
-      this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
-      taskWaitTime
-    )
-  }
-
-  private updateEluWorkerUsage (
-    workerUsage: WorkerUsage,
-    message: MessageValue<Response>
-  ): void {
-    if (message.workerError != null) {
-      return
-    }
-    const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
-      this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
-    updateMeasurementStatistics(
-      workerUsage.elu.active,
-      eluTaskStatisticsRequirements,
-      message.taskPerformance?.elu?.active ?? 0
-    )
-    updateMeasurementStatistics(
-      workerUsage.elu.idle,
-      eluTaskStatisticsRequirements,
-      message.taskPerformance?.elu?.idle ?? 0
-    )
-    if (eluTaskStatisticsRequirements.aggregate) {
-      if (message.taskPerformance?.elu != null) {
-        if (workerUsage.elu.utilization != null) {
-          workerUsage.elu.utilization =
-            (workerUsage.elu.utilization +
-              message.taskPerformance.elu.utilization) /
-            2
-        } else {
-          workerUsage.elu.utilization = message.taskPerformance.elu.utilization
-        }
-      }
-    }
-  }
-
   /**
    * Chooses a worker node for the next task.
    *
@@ -1757,7 +1711,6 @@ export abstract class AbstractPool<
       }
       asyncResource?.emitDestroy()
       this.afterTaskExecutionHook(workerNodeKey, message)
-      this.workerChoiceStrategyContext.update(workerNodeKey)
       this.promiseResponseMap.delete(taskId as string)
       workerNode?.emit('taskFinished', taskId)
       if (this.opts.enableTasksQueue === true) {
index 5e21c38c0af657dab99aca255e80f0e39888b6a6..ae09e8a9f5f7a5a0b2e22a1266c84534e4329e41 100644 (file)
@@ -1,7 +1,9 @@
 import { existsSync } from 'node:fs'
 import cluster from 'node:cluster'
 import { SHARE_ENV, Worker, type WorkerOptions } from 'node:worker_threads'
+import { env } from 'node:process'
 import { average, isPlainObject, max, median, min } from '../utils'
+import type { MessageValue, Task } from '../utility-types'
 import {
   type MeasurementStatisticsRequirements,
   WorkerChoiceStrategies,
@@ -14,8 +16,10 @@ import {
   type MeasurementStatistics,
   type WorkerNodeOptions,
   type WorkerType,
-  WorkerTypes
+  WorkerTypes,
+  type WorkerUsage
 } from './worker'
+import type { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
 
 export const checkFilePath = (filePath: string): void => {
   if (filePath == null) {
@@ -151,7 +155,7 @@ export const checkWorkerNodeArguments = (
  * @param numberOfMeasurements - The number of measurements.
  * @internal
  */
-export const updateMeasurementStatistics = (
+const updateMeasurementStatistics = (
   measurementStatistics: MeasurementStatistics,
   measurementRequirements: MeasurementStatisticsRequirements,
   measurementValue: number
@@ -185,6 +189,115 @@ export const updateMeasurementStatistics = (
     }
   }
 }
+if (env.NODE_ENV === 'test') {
+  // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
+  exports.updateMeasurementStatistics = updateMeasurementStatistics
+}
+
+export const updateWaitTimeWorkerUsage = <
+  Worker extends IWorker,
+  Data = unknown,
+  Response = unknown
+>(
+    workerChoiceStrategyContext: WorkerChoiceStrategyContext<
+    Worker,
+    Data,
+    Response
+    >,
+    workerUsage: WorkerUsage,
+    task: Task<Data>
+  ): void => {
+  const timestamp = performance.now()
+  const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
+  updateMeasurementStatistics(
+    workerUsage.waitTime,
+    workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
+    taskWaitTime
+  )
+}
+
+export const updateTaskStatisticsWorkerUsage = <Response = unknown>(
+  workerUsage: WorkerUsage,
+  message: MessageValue<Response>
+): void => {
+  const workerTaskStatistics = workerUsage.tasks
+  if (
+    workerTaskStatistics.executing != null &&
+    workerTaskStatistics.executing > 0
+  ) {
+    --workerTaskStatistics.executing
+  }
+  if (message.workerError == null) {
+    ++workerTaskStatistics.executed
+  } else {
+    ++workerTaskStatistics.failed
+  }
+}
+
+export const updateRunTimeWorkerUsage = <
+  Worker extends IWorker,
+  Data = unknown,
+  Response = unknown
+>(
+    workerChoiceStrategyContext: WorkerChoiceStrategyContext<
+    Worker,
+    Data,
+    Response
+    >,
+    workerUsage: WorkerUsage,
+    message: MessageValue<Response>
+  ): void => {
+  if (message.workerError != null) {
+    return
+  }
+  updateMeasurementStatistics(
+    workerUsage.runTime,
+    workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
+    message.taskPerformance?.runTime ?? 0
+  )
+}
+
+export const updateEluWorkerUsage = <
+  Worker extends IWorker,
+  Data = unknown,
+  Response = unknown
+>(
+    workerChoiceStrategyContext: WorkerChoiceStrategyContext<
+    Worker,
+    Data,
+    Response
+    >,
+    workerUsage: WorkerUsage,
+    message: MessageValue<Response>
+  ): void => {
+  if (message.workerError != null) {
+    return
+  }
+  const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
+    workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
+  updateMeasurementStatistics(
+    workerUsage.elu.active,
+    eluTaskStatisticsRequirements,
+    message.taskPerformance?.elu?.active ?? 0
+  )
+  updateMeasurementStatistics(
+    workerUsage.elu.idle,
+    eluTaskStatisticsRequirements,
+    message.taskPerformance?.elu?.idle ?? 0
+  )
+  if (eluTaskStatisticsRequirements.aggregate) {
+    if (message.taskPerformance?.elu != null) {
+      if (workerUsage.elu.utilization != null) {
+        workerUsage.elu.utilization =
+          (workerUsage.elu.utilization +
+            message.taskPerformance.elu.utilization) /
+          2
+      } else {
+        workerUsage.elu.utilization = message.taskPerformance.elu.utilization
+      }
+    }
+  }
+}
 
 export const createWorker = <Worker extends IWorker>(
   type: WorkerType,