fix: refine possible null exception fix at task response handling
[poolifier.git] / src / pools / utils.ts
index 4aac5cda47d361fd723198a53ad44f254235b56c..5e21c38c0af657dab99aca255e80f0e39888b6a6 100644 (file)
@@ -1,18 +1,28 @@
 import { existsSync } from 'node:fs'
-import { isPlainObject } from '../utils'
+import cluster from 'node:cluster'
+import { SHARE_ENV, Worker, type WorkerOptions } from 'node:worker_threads'
+import { average, isPlainObject, max, median, min } from '../utils'
 import {
+  type MeasurementStatisticsRequirements,
   WorkerChoiceStrategies,
   type WorkerChoiceStrategy
 } from './selection-strategies/selection-strategies-types'
 import type { TasksQueueOptions } from './pool'
+import {
+  type IWorker,
+  type IWorkerNode,
+  type MeasurementStatistics,
+  type WorkerNodeOptions,
+  type WorkerType,
+  WorkerTypes
+} from './worker'
 
 export const checkFilePath = (filePath: string): void => {
-  if (
-    filePath == null ||
-    typeof filePath !== 'string' ||
-    (typeof filePath === 'string' && filePath.trim().length === 0)
-  ) {
-    throw new Error('Please specify a file with a worker implementation')
+  if (filePath == null) {
+    throw new TypeError('The worker file path must be specified')
+  }
+  if (typeof filePath !== 'string') {
+    throw new TypeError('The worker file path must be a string')
   }
   if (!existsSync(filePath)) {
     throw new Error(`Cannot find the worker file '${filePath}'`)
@@ -90,3 +100,130 @@ export const checkValidTasksQueueOptions = (
     )
   }
 }
+
+export const checkWorkerNodeArguments = (
+  type: WorkerType,
+  filePath: string,
+  opts: WorkerNodeOptions
+): void => {
+  if (type == null) {
+    throw new TypeError('Cannot construct a worker node without a worker type')
+  }
+  if (!Object.values(WorkerTypes).includes(type)) {
+    throw new TypeError(
+      `Cannot construct a worker node with an invalid worker type '${type}'`
+    )
+  }
+  checkFilePath(filePath)
+  if (opts == null) {
+    throw new TypeError(
+      'Cannot construct a worker node without worker node options'
+    )
+  }
+  if (!isPlainObject(opts)) {
+    throw new TypeError(
+      'Cannot construct a worker node with invalid options: must be a plain object'
+    )
+  }
+  if (opts.tasksQueueBackPressureSize == null) {
+    throw new TypeError(
+      'Cannot construct a worker node without a tasks queue back pressure size option'
+    )
+  }
+  if (!Number.isSafeInteger(opts.tasksQueueBackPressureSize)) {
+    throw new TypeError(
+      'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer'
+    )
+  }
+  if (opts.tasksQueueBackPressureSize <= 0) {
+    throw new RangeError(
+      'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer'
+    )
+  }
+}
+
+/**
+ * Updates the given measurement statistics.
+ *
+ * @param measurementStatistics - The measurement statistics to update.
+ * @param measurementRequirements - The measurement statistics requirements.
+ * @param measurementValue - The measurement value.
+ * @param numberOfMeasurements - The number of measurements.
+ * @internal
+ */
+export const updateMeasurementStatistics = (
+  measurementStatistics: MeasurementStatistics,
+  measurementRequirements: MeasurementStatisticsRequirements,
+  measurementValue: number
+): void => {
+  if (measurementRequirements.aggregate) {
+    measurementStatistics.aggregate =
+      (measurementStatistics.aggregate ?? 0) + measurementValue
+    measurementStatistics.minimum = min(
+      measurementValue,
+      measurementStatistics.minimum ?? Infinity
+    )
+    measurementStatistics.maximum = max(
+      measurementValue,
+      measurementStatistics.maximum ?? -Infinity
+    )
+    if (
+      (measurementRequirements.average || measurementRequirements.median) &&
+      measurementValue != null
+    ) {
+      measurementStatistics.history.push(measurementValue)
+      if (measurementRequirements.average) {
+        measurementStatistics.average = average(measurementStatistics.history)
+      } else if (measurementStatistics.average != null) {
+        delete measurementStatistics.average
+      }
+      if (measurementRequirements.median) {
+        measurementStatistics.median = median(measurementStatistics.history)
+      } else if (measurementStatistics.median != null) {
+        delete measurementStatistics.median
+      }
+    }
+  }
+}
+
+export const createWorker = <Worker extends IWorker>(
+  type: WorkerType,
+  filePath: string,
+  opts: { env?: Record<string, unknown>, workerOptions?: WorkerOptions }
+): Worker => {
+  switch (type) {
+    case WorkerTypes.thread:
+      return new Worker(filePath, {
+        env: SHARE_ENV,
+        ...opts?.workerOptions
+      }) as unknown as Worker
+    case WorkerTypes.cluster:
+      return cluster.fork(opts?.env) as unknown as Worker
+    default:
+      // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
+      throw new Error(`Unknown worker type '${type}'`)
+  }
+}
+
+export const waitWorkerNodeEvents = async <
+  Worker extends IWorker,
+  Data = unknown
+>(
+  workerNode: IWorkerNode<Worker, Data>,
+  workerNodeEvent: string,
+  numberOfEventsToWait: number
+): Promise<number> => {
+  return await new Promise<number>(resolve => {
+    let events = 0
+    if (numberOfEventsToWait === 0) {
+      resolve(events)
+      return
+    }
+    workerNode.on(workerNodeEvent, () => {
+      ++events
+      if (events === numberOfEventsToWait) {
+        resolve(events)
+      }
+    })
+  })
+}