perf: use optimized circular buffer implementation to store measurements history
[poolifier.git] / src / pools / utils.ts
index c7ab2d1e664894bbfba218103ee9d85b15e8b0e8..f2ede8b6d2b07d44316cfdaa14abd91cb10204cc 100644 (file)
@@ -1,15 +1,21 @@
+import cluster, { Worker as ClusterWorker } from 'node:cluster'
 import { existsSync } from 'node:fs'
-import cluster from 'node:cluster'
-import { SHARE_ENV, Worker, type WorkerOptions } from 'node:worker_threads'
 import { env } from 'node:process'
-import { average, isPlainObject, max, median, min } from '../utils'
-import type { MessageValue, Task } from '../utility-types'
+import {
+  SHARE_ENV,
+  Worker as ThreadWorker,
+  type WorkerOptions
+} from 'node:worker_threads'
+
+import type { MessageValue, Task } from '../utility-types.js'
+import { average, isPlainObject, max, median, min } from '../utils.js'
+import type { TasksQueueOptions } from './pool.js'
 import {
   type MeasurementStatisticsRequirements,
   WorkerChoiceStrategies,
   type WorkerChoiceStrategy
-} from './selection-strategies/selection-strategies-types'
-import type { TasksQueueOptions } from './pool'
+} from './selection-strategies/selection-strategies-types.js'
+import type { WorkerChoiceStrategiesContext } from './selection-strategies/worker-choice-strategies-context.js'
 import {
   type IWorker,
   type IWorkerNode,
@@ -18,8 +24,17 @@ import {
   type WorkerType,
   WorkerTypes,
   type WorkerUsage
-} from './worker'
-import type { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
+} from './worker.js'
+
+/**
+ * Default measurement statistics requirements.
+ */
+export const DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS: MeasurementStatisticsRequirements =
+  {
+    aggregate: false,
+    average: false,
+    median: false
+  }
 
 export const getDefaultTasksQueueOptions = (
   poolMaxSize: number
@@ -28,12 +43,12 @@ export const getDefaultTasksQueueOptions = (
     size: Math.pow(poolMaxSize, 2),
     concurrency: 1,
     taskStealing: true,
-    tasksStealingOnBackPressure: true,
+    tasksStealingOnBackPressure: false,
     tasksFinishedTimeout: 2000
   }
 }
 
-export const checkFilePath = (filePath: string): void => {
+export const checkFilePath = (filePath: string | undefined): void => {
   if (filePath == null) {
     throw new TypeError('The worker file path must be specified')
   }
@@ -45,7 +60,10 @@ export const checkFilePath = (filePath: string): void => {
   }
 }
 
-export const checkDynamicPoolSize = (min: number, max: number): void => {
+export const checkDynamicPoolSize = (
+  min: number,
+  max: number | undefined
+): void => {
   if (max == null) {
     throw new TypeError(
       'Cannot instantiate a dynamic pool without specifying the maximum pool size'
@@ -69,8 +87,21 @@ export const checkDynamicPoolSize = (min: number, max: number): void => {
   }
 }
 
+export const checkValidPriority = (priority: number | undefined): void => {
+  if (priority != null && !Number.isSafeInteger(priority)) {
+    throw new TypeError(`Invalid property 'priority': '${priority}'`)
+  }
+  if (
+    priority != null &&
+    Number.isSafeInteger(priority) &&
+    (priority < -20 || priority > 19)
+  ) {
+    throw new RangeError("Property 'priority' must be between -20 and 19")
+  }
+}
+
 export const checkValidWorkerChoiceStrategy = (
-  workerChoiceStrategy: WorkerChoiceStrategy
+  workerChoiceStrategy: WorkerChoiceStrategy | undefined
 ): void => {
   if (
     workerChoiceStrategy != null &&
@@ -81,7 +112,7 @@ export const checkValidWorkerChoiceStrategy = (
 }
 
 export const checkValidTasksQueueOptions = (
-  tasksQueueOptions: TasksQueueOptions
+  tasksQueueOptions: TasksQueueOptions | undefined
 ): void => {
   if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
     throw new TypeError('Invalid tasks queue options: must be a plain object')
@@ -118,9 +149,9 @@ export const checkValidTasksQueueOptions = (
 }
 
 export const checkWorkerNodeArguments = (
-  type: WorkerType,
-  filePath: string,
-  opts: WorkerNodeOptions
+  type: WorkerType | undefined,
+  filePath: string | undefined,
+  opts: WorkerNodeOptions | undefined
 ): void => {
   if (type == null) {
     throw new TypeError('Cannot construct a worker node without a worker type')
@@ -156,6 +187,21 @@ export const checkWorkerNodeArguments = (
       'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer'
     )
   }
+  if (opts.tasksQueueBucketSize == null) {
+    throw new TypeError(
+      'Cannot construct a worker node without a tasks queue bucket size option'
+    )
+  }
+  if (!Number.isSafeInteger(opts.tasksQueueBucketSize)) {
+    throw new TypeError(
+      'Cannot construct a worker node with a tasks queue bucket size option that is not an integer'
+    )
+  }
+  if (opts.tasksQueueBucketSize <= 0) {
+    throw new RangeError(
+      'Cannot construct a worker node with a tasks queue bucket size option that is not a positive integer'
+    )
+  }
 }
 
 /**
@@ -164,37 +210,41 @@ export const checkWorkerNodeArguments = (
  * @param measurementStatistics - The measurement statistics to update.
  * @param measurementRequirements - The measurement statistics requirements.
  * @param measurementValue - The measurement value.
- * @param numberOfMeasurements - The number of measurements.
  * @internal
  */
 const updateMeasurementStatistics = (
   measurementStatistics: MeasurementStatistics,
-  measurementRequirements: MeasurementStatisticsRequirements,
-  measurementValue: number
+  measurementRequirements: MeasurementStatisticsRequirements | undefined,
+  measurementValue: number | undefined
 ): void => {
-  if (measurementRequirements.aggregate) {
+  if (
+    measurementRequirements != null &&
+    measurementValue != null &&
+    measurementRequirements.aggregate
+  ) {
     measurementStatistics.aggregate =
       (measurementStatistics.aggregate ?? 0) + measurementValue
     measurementStatistics.minimum = min(
       measurementValue,
-      measurementStatistics.minimum ?? Infinity
+      measurementStatistics.minimum ?? Number.POSITIVE_INFINITY
     )
     measurementStatistics.maximum = max(
       measurementValue,
-      measurementStatistics.maximum ?? -Infinity
+      measurementStatistics.maximum ?? Number.NEGATIVE_INFINITY
     )
-    if (
-      (measurementRequirements.average || measurementRequirements.median) &&
-      measurementValue != null
-    ) {
-      measurementStatistics.history.push(measurementValue)
+    if (measurementRequirements.average || measurementRequirements.median) {
+      measurementStatistics.history.put(measurementValue)
       if (measurementRequirements.average) {
-        measurementStatistics.average = average(measurementStatistics.history)
+        measurementStatistics.average = average(
+          measurementStatistics.history.toArray()
+        )
       } else if (measurementStatistics.average != null) {
         delete measurementStatistics.average
       }
       if (measurementRequirements.median) {
-        measurementStatistics.median = median(measurementStatistics.history)
+        measurementStatistics.median = median(
+          measurementStatistics.history.toArray()
+        )
       } else if (measurementStatistics.median != null) {
         delete measurementStatistics.median
       }
@@ -211,11 +261,9 @@ export const updateWaitTimeWorkerUsage = <
   Data = unknown,
   Response = unknown
 >(
-    workerChoiceStrategyContext: WorkerChoiceStrategyContext<
-    Worker,
-    Data,
-    Response
-    >,
+    workerChoiceStrategiesContext:
+    | WorkerChoiceStrategiesContext<Worker, Data, Response>
+    | undefined,
     workerUsage: WorkerUsage,
     task: Task<Data>
   ): void => {
@@ -223,7 +271,7 @@ export const updateWaitTimeWorkerUsage = <
   const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
   updateMeasurementStatistics(
     workerUsage.waitTime,
-    workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
+    workerChoiceStrategiesContext?.getTaskStatisticsRequirements().waitTime,
     taskWaitTime
   )
 }
@@ -234,6 +282,7 @@ export const updateTaskStatisticsWorkerUsage = <Response = unknown>(
 ): void => {
   const workerTaskStatistics = workerUsage.tasks
   if (
+    // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
     workerTaskStatistics.executing != null &&
     workerTaskStatistics.executing > 0
   ) {
@@ -251,11 +300,9 @@ export const updateRunTimeWorkerUsage = <
   Data = unknown,
   Response = unknown
 >(
-    workerChoiceStrategyContext: WorkerChoiceStrategyContext<
-    Worker,
-    Data,
-    Response
-    >,
+    workerChoiceStrategiesContext:
+    | WorkerChoiceStrategiesContext<Worker, Data, Response>
+    | undefined,
     workerUsage: WorkerUsage,
     message: MessageValue<Response>
   ): void => {
@@ -264,7 +311,7 @@ export const updateRunTimeWorkerUsage = <
   }
   updateMeasurementStatistics(
     workerUsage.runTime,
-    workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
+    workerChoiceStrategiesContext?.getTaskStatisticsRequirements().runTime,
     message.taskPerformance?.runTime ?? 0
   )
 }
@@ -274,19 +321,17 @@ export const updateEluWorkerUsage = <
   Data = unknown,
   Response = unknown
 >(
-    workerChoiceStrategyContext: WorkerChoiceStrategyContext<
-    Worker,
-    Data,
-    Response
-    >,
+    workerChoiceStrategiesContext:
+    | WorkerChoiceStrategiesContext<Worker, Data, Response>
+    | undefined,
     workerUsage: WorkerUsage,
     message: MessageValue<Response>
   ): void => {
   if (message.workerError != null) {
     return
   }
-  const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
-    workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
+  const eluTaskStatisticsRequirements =
+    workerChoiceStrategiesContext?.getTaskStatisticsRequirements().elu
   updateMeasurementStatistics(
     workerUsage.elu.active,
     eluTaskStatisticsRequirements,
@@ -297,7 +342,7 @@ export const updateEluWorkerUsage = <
     eluTaskStatisticsRequirements,
     message.taskPerformance?.elu?.idle ?? 0
   )
-  if (eluTaskStatisticsRequirements.aggregate) {
+  if (eluTaskStatisticsRequirements?.aggregate === true) {
     if (message.taskPerformance?.elu != null) {
       if (workerUsage.elu.utilization != null) {
         workerUsage.elu.utilization =
@@ -318,18 +363,48 @@ export const createWorker = <Worker extends IWorker>(
 ): Worker => {
   switch (type) {
     case WorkerTypes.thread:
-      return new Worker(filePath, {
+      return new ThreadWorker(filePath, {
         env: SHARE_ENV,
-        ...opts?.workerOptions
+        ...opts.workerOptions
       }) as unknown as Worker
     case WorkerTypes.cluster:
-      return cluster.fork(opts?.env) as unknown as Worker
+      return cluster.fork(opts.env) as unknown as Worker
     default:
       // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
       throw new Error(`Unknown worker type '${type}'`)
   }
 }
 
+/**
+ * Returns the worker type of the given worker.
+ *
+ * @param worker - The worker to get the type of.
+ * @returns The worker type of the given worker.
+ * @internal
+ */
+export const getWorkerType = (worker: IWorker): WorkerType | undefined => {
+  if (worker instanceof ThreadWorker) {
+    return WorkerTypes.thread
+  } else if (worker instanceof ClusterWorker) {
+    return WorkerTypes.cluster
+  }
+}
+
+/**
+ * Returns the worker id of the given worker.
+ *
+ * @param worker - The worker to get the id of.
+ * @returns The worker id of the given worker.
+ * @internal
+ */
+export const getWorkerId = (worker: IWorker): number | undefined => {
+  if (worker instanceof ThreadWorker) {
+    return worker.threadId
+  } else if (worker instanceof ClusterWorker) {
+    return worker.id
+  }
+}
+
 export const waitWorkerNodeEvents = async <
   Worker extends IWorker,
   Data = unknown
@@ -345,12 +420,20 @@ export const waitWorkerNodeEvents = async <
       resolve(events)
       return
     }
-    workerNode.on(workerNodeEvent, () => {
-      ++events
-      if (events === numberOfEventsToWait) {
-        resolve(events)
-      }
-    })
+    switch (workerNodeEvent) {
+      case 'idle':
+      case 'backPressure':
+      case 'taskFinished':
+        workerNode.on(workerNodeEvent, () => {
+          ++events
+          if (events === numberOfEventsToWait) {
+            resolve(events)
+          }
+        })
+        break
+      default:
+        throw new Error('Invalid worker node event')
+    }
     if (timeout >= 0) {
       setTimeout(() => {
         resolve(events)