import { existsSync } from 'node:fs'
+import cluster from 'node:cluster'
+import { SHARE_ENV, Worker, type WorkerOptions } from 'node:worker_threads'
import { average, isPlainObject, max, median, min } from '../utils'
import {
type MeasurementStatisticsRequirements,
type WorkerChoiceStrategy
} from './selection-strategies/selection-strategies-types'
import type { TasksQueueOptions } from './pool'
-import type { IWorker, MeasurementStatistics } from './worker'
+import {
+ type IWorker,
+ type IWorkerNode,
+ type MeasurementStatistics,
+ type WorkerNodeOptions,
+ type WorkerType,
+ WorkerTypes
+} from './worker'
export const checkFilePath = (filePath: string): void => {
- if (
- filePath == null ||
- typeof filePath !== 'string' ||
- (typeof filePath === 'string' && filePath.trim().length === 0)
- ) {
- throw new Error('Please specify a file with a worker implementation')
+ if (filePath == null) {
+ throw new TypeError('The worker file path must be specified')
+ }
+ if (typeof filePath !== 'string') {
+ throw new TypeError('The worker file path must be a string')
}
if (!existsSync(filePath)) {
throw new Error(`Cannot find the worker file '${filePath}'`)
}
}
-export const checkWorkerNodeArguments = <Worker extends IWorker>(
- worker: Worker,
- tasksQueueBackPressureSize: number
+export const checkWorkerNodeArguments = (
+ type: WorkerType,
+ filePath: string,
+ opts: WorkerNodeOptions
): void => {
- if (worker == null) {
- throw new TypeError('Cannot construct a worker node without a worker')
+ if (type == null) {
+ throw new TypeError('Cannot construct a worker node without a worker type')
+ }
+ if (!Object.values(WorkerTypes).includes(type)) {
+ throw new TypeError(
+ `Cannot construct a worker node with an invalid worker type '${type}'`
+ )
+ }
+ checkFilePath(filePath)
+ if (opts == null) {
+ throw new TypeError(
+ 'Cannot construct a worker node without worker node options'
+ )
}
- if (tasksQueueBackPressureSize == null) {
+ if (!isPlainObject(opts)) {
throw new TypeError(
- 'Cannot construct a worker node without a tasks queue back pressure size'
+ 'Cannot construct a worker node with invalid options: must be a plain object'
)
}
- if (!Number.isSafeInteger(tasksQueueBackPressureSize)) {
+ if (opts.tasksQueueBackPressureSize == null) {
throw new TypeError(
- 'Cannot construct a worker node with a tasks queue back pressure size that is not an integer'
+ 'Cannot construct a worker node without a tasks queue back pressure size option'
)
}
- if (tasksQueueBackPressureSize <= 0) {
+ if (!Number.isSafeInteger(opts.tasksQueueBackPressureSize)) {
+ throw new TypeError(
+ 'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer'
+ )
+ }
+ if (opts.tasksQueueBackPressureSize <= 0) {
throw new RangeError(
- 'Cannot construct a worker node with a tasks queue back pressure size that is not a positive integer'
+ 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer'
)
}
}
}
}
}
+
+export const createWorker = <Worker extends IWorker>(
+ type: WorkerType,
+ filePath: string,
+ opts: { env?: Record<string, unknown>, workerOptions?: WorkerOptions }
+): Worker => {
+ switch (type) {
+ case WorkerTypes.thread:
+ return new Worker(filePath, {
+ env: SHARE_ENV,
+ ...opts?.workerOptions
+ }) as unknown as Worker
+ case WorkerTypes.cluster:
+ return cluster.fork(opts?.env) as unknown as Worker
+ default:
+ // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
+ throw new Error(`Unknown worker type '${type}'`)
+ }
+}
+
+export const waitWorkerNodeEvents = async <
+ Worker extends IWorker,
+ Data = unknown
+>(
+ workerNode: IWorkerNode<Worker, Data>,
+ workerNodeEvent: string,
+ numberOfEventsToWait: number
+): Promise<number> => {
+ return await new Promise<number>(resolve => {
+ let events = 0
+ if (numberOfEventsToWait === 0) {
+ resolve(events)
+ return
+ }
+ workerNode.on(workerNodeEvent, () => {
+ ++events
+ if (events === numberOfEventsToWait) {
+ resolve(events)
+ }
+ })
+ })
+}