Merge branch 'master' into combined-prs-branch
[poolifier.git] / src / pools / utils.ts
index db8ef7b7cc5991ea71a3dc0a2b163ac9d7a438c2..c0e803a2f4cb5a2bd88b3350e98b3a754c4e5b51 100644 (file)
@@ -1,22 +1,39 @@
 import { existsSync } from 'node:fs'
 import cluster from 'node:cluster'
 import { SHARE_ENV, Worker, type WorkerOptions } from 'node:worker_threads'
-import { average, isPlainObject, max, median, min } from '../utils'
+import { env } from 'node:process'
+import { average, isPlainObject, max, median, min } from '../utils.js'
+import type { MessageValue, Task } from '../utility-types.js'
 import {
   type MeasurementStatisticsRequirements,
   WorkerChoiceStrategies,
   type WorkerChoiceStrategy
-} from './selection-strategies/selection-strategies-types'
-import type { TasksQueueOptions } from './pool'
+} from './selection-strategies/selection-strategies-types.js'
+import type { TasksQueueOptions } from './pool.js'
 import {
   type IWorker,
+  type IWorkerNode,
   type MeasurementStatistics,
   type WorkerNodeOptions,
   type WorkerType,
-  WorkerTypes
-} from './worker'
+  WorkerTypes,
+  type WorkerUsage
+} from './worker.js'
+import type { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context.js'
 
-export const checkFilePath = (filePath: string): void => {
+export const getDefaultTasksQueueOptions = (
+  poolMaxSize: number
+): Required<TasksQueueOptions> => {
+  return {
+    size: Math.pow(poolMaxSize, 2),
+    concurrency: 1,
+    taskStealing: true,
+    tasksStealingOnBackPressure: true,
+    tasksFinishedTimeout: 2000
+  }
+}
+
+export const checkFilePath = (filePath: string | undefined): void => {
   if (filePath == null) {
     throw new TypeError('The worker file path must be specified')
   }
@@ -28,7 +45,10 @@ export const checkFilePath = (filePath: string): void => {
   }
 }
 
-export const checkDynamicPoolSize = (min: number, max: number): void => {
+export const checkDynamicPoolSize = (
+  min: number,
+  max: number | undefined
+): void => {
   if (max == null) {
     throw new TypeError(
       'Cannot instantiate a dynamic pool without specifying the maximum pool size'
@@ -53,7 +73,7 @@ export const checkDynamicPoolSize = (min: number, max: number): void => {
 }
 
 export const checkValidWorkerChoiceStrategy = (
-  workerChoiceStrategy: WorkerChoiceStrategy
+  workerChoiceStrategy: WorkerChoiceStrategy | undefined
 ): void => {
   if (
     workerChoiceStrategy != null &&
@@ -64,7 +84,7 @@ export const checkValidWorkerChoiceStrategy = (
 }
 
 export const checkValidTasksQueueOptions = (
-  tasksQueueOptions: TasksQueueOptions
+  tasksQueueOptions: TasksQueueOptions | undefined
 ): void => {
   if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
     throw new TypeError('Invalid tasks queue options: must be a plain object')
@@ -101,9 +121,9 @@ export const checkValidTasksQueueOptions = (
 }
 
 export const checkWorkerNodeArguments = (
-  type: WorkerType,
-  filePath: string,
-  opts: WorkerNodeOptions
+  type: WorkerType | undefined,
+  filePath: string | undefined,
+  opts: WorkerNodeOptions | undefined
 ): void => {
   if (type == null) {
     throw new TypeError('Cannot construct a worker node without a worker type')
@@ -147,15 +167,18 @@ export const checkWorkerNodeArguments = (
  * @param measurementStatistics - The measurement statistics to update.
  * @param measurementRequirements - The measurement statistics requirements.
  * @param measurementValue - The measurement value.
- * @param numberOfMeasurements - The number of measurements.
  * @internal
  */
-export const updateMeasurementStatistics = (
+const updateMeasurementStatistics = (
   measurementStatistics: MeasurementStatistics,
-  measurementRequirements: MeasurementStatisticsRequirements,
-  measurementValue: number
+  measurementRequirements: MeasurementStatisticsRequirements | undefined,
+  measurementValue: number | undefined
 ): void => {
-  if (measurementRequirements.aggregate) {
+  if (
+    measurementRequirements != null &&
+    measurementValue != null &&
+    measurementRequirements.aggregate
+  ) {
     measurementStatistics.aggregate =
       (measurementStatistics.aggregate ?? 0) + measurementValue
     measurementStatistics.minimum = min(
@@ -166,10 +189,7 @@ export const updateMeasurementStatistics = (
       measurementValue,
       measurementStatistics.maximum ?? -Infinity
     )
-    if (
-      (measurementRequirements.average || measurementRequirements.median) &&
-      measurementValue != null
-    ) {
+    if (measurementRequirements.average || measurementRequirements.median) {
       measurementStatistics.history.push(measurementValue)
       if (measurementRequirements.average) {
         measurementStatistics.average = average(measurementStatistics.history)
@@ -184,6 +204,110 @@ export const updateMeasurementStatistics = (
     }
   }
 }
+if (env.NODE_ENV === 'test') {
+  // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
+  exports.updateMeasurementStatistics = updateMeasurementStatistics
+}
+
+export const updateWaitTimeWorkerUsage = <
+  Worker extends IWorker,
+  Data = unknown,
+  Response = unknown
+>(
+    workerChoiceStrategyContext:
+    | WorkerChoiceStrategyContext<Worker, Data, Response>
+    | undefined,
+    workerUsage: WorkerUsage,
+    task: Task<Data>
+  ): void => {
+  const timestamp = performance.now()
+  const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
+  updateMeasurementStatistics(
+    workerUsage.waitTime,
+    workerChoiceStrategyContext?.getTaskStatisticsRequirements()?.waitTime,
+    taskWaitTime
+  )
+}
+
+export const updateTaskStatisticsWorkerUsage = <Response = unknown>(
+  workerUsage: WorkerUsage,
+  message: MessageValue<Response>
+): void => {
+  const workerTaskStatistics = workerUsage.tasks
+  if (
+    // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
+    workerTaskStatistics.executing != null &&
+    workerTaskStatistics.executing > 0
+  ) {
+    --workerTaskStatistics.executing
+  }
+  if (message.workerError == null) {
+    ++workerTaskStatistics.executed
+  } else {
+    ++workerTaskStatistics.failed
+  }
+}
+
+export const updateRunTimeWorkerUsage = <
+  Worker extends IWorker,
+  Data = unknown,
+  Response = unknown
+>(
+    workerChoiceStrategyContext:
+    | WorkerChoiceStrategyContext<Worker, Data, Response>
+    | undefined,
+    workerUsage: WorkerUsage,
+    message: MessageValue<Response>
+  ): void => {
+  if (message.workerError != null) {
+    return
+  }
+  updateMeasurementStatistics(
+    workerUsage.runTime,
+    workerChoiceStrategyContext?.getTaskStatisticsRequirements()?.runTime,
+    message.taskPerformance?.runTime ?? 0
+  )
+}
+
+export const updateEluWorkerUsage = <
+  Worker extends IWorker,
+  Data = unknown,
+  Response = unknown
+>(
+    workerChoiceStrategyContext:
+    | WorkerChoiceStrategyContext<Worker, Data, Response>
+    | undefined,
+    workerUsage: WorkerUsage,
+    message: MessageValue<Response>
+  ): void => {
+  if (message.workerError != null) {
+    return
+  }
+  const eluTaskStatisticsRequirements =
+    workerChoiceStrategyContext?.getTaskStatisticsRequirements()?.elu
+  updateMeasurementStatistics(
+    workerUsage.elu.active,
+    eluTaskStatisticsRequirements,
+    message.taskPerformance?.elu?.active ?? 0
+  )
+  updateMeasurementStatistics(
+    workerUsage.elu.idle,
+    eluTaskStatisticsRequirements,
+    message.taskPerformance?.elu?.idle ?? 0
+  )
+  if (eluTaskStatisticsRequirements?.aggregate === true) {
+    if (message.taskPerformance?.elu != null) {
+      if (workerUsage.elu.utilization != null) {
+        workerUsage.elu.utilization =
+          (workerUsage.elu.utilization +
+            message.taskPerformance.elu.utilization) /
+          2
+      } else {
+        workerUsage.elu.utilization = message.taskPerformance.elu.utilization
+      }
+    }
+  }
+}
 
 export const createWorker = <Worker extends IWorker>(
   type: WorkerType,
@@ -194,12 +318,41 @@ export const createWorker = <Worker extends IWorker>(
     case WorkerTypes.thread:
       return new Worker(filePath, {
         env: SHARE_ENV,
-        ...opts?.workerOptions
+        ...opts.workerOptions
       }) as unknown as Worker
     case WorkerTypes.cluster:
-      return cluster.fork(opts?.env) as unknown as Worker
+      return cluster.fork(opts.env) as unknown as Worker
     default:
       // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
       throw new Error(`Unknown worker type '${type}'`)
   }
 }
+
+export const waitWorkerNodeEvents = async <
+  Worker extends IWorker,
+  Data = unknown
+>(
+  workerNode: IWorkerNode<Worker, Data>,
+  workerNodeEvent: string,
+  numberOfEventsToWait: number,
+  timeout: number
+): Promise<number> => {
+  return await new Promise<number>(resolve => {
+    let events = 0
+    if (numberOfEventsToWait === 0) {
+      resolve(events)
+      return
+    }
+    workerNode.on(workerNodeEvent, () => {
+      ++events
+      if (events === numberOfEventsToWait) {
+        resolve(events)
+      }
+    })
+    if (timeout >= 0) {
+      setTimeout(() => {
+        resolve(events)
+      }, timeout)
+    }
+  })
+}