refactor: move worker setup into worker node constructor
[poolifier.git] / src / pools / utils.ts
index eb800c4ab50f9d39e2b6fc98ebb4cf39bfc134b5..11c30ad1da9b5cf875be570116fdf240cb0af89b 100644 (file)
@@ -1,4 +1,6 @@
 import { existsSync } from 'node:fs'
+import cluster from 'node:cluster'
+import { SHARE_ENV, Worker, type WorkerOptions } from 'node:worker_threads'
 import { average, isPlainObject, max, median, min } from '../utils'
 import {
   type MeasurementStatisticsRequirements,
@@ -6,9 +8,21 @@ import {
   type WorkerChoiceStrategy
 } from './selection-strategies/selection-strategies-types'
 import type { TasksQueueOptions } from './pool'
-import type { IWorker, MeasurementStatistics } from './worker'
+import {
+  type IWorker,
+  type MeasurementStatistics,
+  type WorkerNodeOptions,
+  type WorkerType,
+  WorkerTypes
+} from './worker'
 
 export const checkFilePath = (filePath: string): void => {
+  if (filePath == null) {
+    throw new TypeError('The worker file path must be specified')
+  }
+  if (typeof filePath !== 'string') {
+    throw new TypeError('The worker file path must be a string')
+  }
   if (!existsSync(filePath)) {
     throw new Error(`Cannot find the worker file '${filePath}'`)
   }
@@ -86,26 +100,43 @@ export const checkValidTasksQueueOptions = (
   }
 }
 
-export const checkWorkerNodeArguments = <Worker extends IWorker>(
-  worker: Worker,
-  tasksQueueBackPressureSize: number
+export const checkWorkerNodeArguments = (
+  type: WorkerType,
+  filePath: string,
+  opts: WorkerNodeOptions
 ): void => {
-  if (worker == null) {
-    throw new TypeError('Cannot construct a worker node without a worker')
+  if (type == null) {
+    throw new TypeError('Cannot construct a worker node without a worker type')
+  }
+  if (!Object.values(WorkerTypes).includes(type)) {
+    throw new TypeError(
+      `Cannot construct a worker node with an invalid worker type '${type}'`
+    )
   }
-  if (tasksQueueBackPressureSize == null) {
+  checkFilePath(filePath)
+  if (opts == null) {
     throw new TypeError(
-      'Cannot construct a worker node without a tasks queue back pressure size'
+      'Cannot construct a worker node without worker node options'
     )
   }
-  if (!Number.isSafeInteger(tasksQueueBackPressureSize)) {
+  if (opts != null && !isPlainObject(opts)) {
     throw new TypeError(
-      'Cannot construct a worker node with a tasks queue back pressure size that is not an integer'
+      'Cannot construct a worker node with invalid options: must be a plain object'
     )
   }
-  if (tasksQueueBackPressureSize <= 0) {
+  if (opts.tasksQueueBackPressureSize == null) {
+    throw new TypeError(
+      'Cannot construct a worker node without a tasks queue back pressure size option'
+    )
+  }
+  if (!Number.isSafeInteger(opts.tasksQueueBackPressureSize)) {
+    throw new TypeError(
+      'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer'
+    )
+  }
+  if (opts.tasksQueueBackPressureSize <= 0) {
     throw new RangeError(
-      'Cannot construct a worker node with a tasks queue back pressure size that is not a positive integer'
+      'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer'
     )
   }
 }
@@ -153,3 +184,22 @@ export const updateMeasurementStatistics = (
     }
   }
 }
+
+export const createWorker = <Worker extends IWorker>(
+  type: WorkerType,
+  filePath: string,
+  opts: { env?: Record<string, unknown>, workerOptions?: WorkerOptions }
+): Worker => {
+  switch (type) {
+    case WorkerTypes.thread:
+      return new Worker(filePath, {
+        env: SHARE_ENV,
+        ...opts?.workerOptions
+      }) as unknown as Worker
+    case WorkerTypes.cluster:
+      return cluster.fork(opts?.env) as unknown as Worker
+    default:
+      // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
+      throw new Error(`Unknown worker type '${type}'`)
+  }
+}