+
+export const checkWorkerNodeArguments = (
+ type: WorkerType,
+ filePath: string,
+ opts: WorkerNodeOptions
+): void => {
+ if (type == null) {
+ throw new TypeError('Cannot construct a worker node without a worker type')
+ }
+ if (!Object.values(WorkerTypes).includes(type)) {
+ throw new TypeError(
+ `Cannot construct a worker node with an invalid worker type '${type}'`
+ )
+ }
+ checkFilePath(filePath)
+ if (opts == null) {
+ throw new TypeError(
+ 'Cannot construct a worker node without worker node options'
+ )
+ }
+ if (!isPlainObject(opts)) {
+ throw new TypeError(
+ 'Cannot construct a worker node with invalid options: must be a plain object'
+ )
+ }
+ if (opts.tasksQueueBackPressureSize == null) {
+ throw new TypeError(
+ 'Cannot construct a worker node without a tasks queue back pressure size option'
+ )
+ }
+ if (!Number.isSafeInteger(opts.tasksQueueBackPressureSize)) {
+ throw new TypeError(
+ 'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer'
+ )
+ }
+ if (opts.tasksQueueBackPressureSize <= 0) {
+ throw new RangeError(
+ 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer'
+ )
+ }
+}
+
+/**
+ * Updates the given measurement statistics.
+ *
+ * @param measurementStatistics - The measurement statistics to update.
+ * @param measurementRequirements - The measurement statistics requirements.
+ * @param measurementValue - The measurement value.
+ * @param numberOfMeasurements - The number of measurements.
+ * @internal
+ */
+export const updateMeasurementStatistics = (
+ measurementStatistics: MeasurementStatistics,
+ measurementRequirements: MeasurementStatisticsRequirements,
+ measurementValue: number
+): void => {
+ if (measurementRequirements.aggregate) {
+ measurementStatistics.aggregate =
+ (measurementStatistics.aggregate ?? 0) + measurementValue
+ measurementStatistics.minimum = min(
+ measurementValue,
+ measurementStatistics.minimum ?? Infinity
+ )
+ measurementStatistics.maximum = max(
+ measurementValue,
+ measurementStatistics.maximum ?? -Infinity
+ )
+ if (
+ (measurementRequirements.average || measurementRequirements.median) &&
+ measurementValue != null
+ ) {
+ measurementStatistics.history.push(measurementValue)
+ if (measurementRequirements.average) {
+ measurementStatistics.average = average(measurementStatistics.history)
+ } else if (measurementStatistics.average != null) {
+ delete measurementStatistics.average
+ }
+ if (measurementRequirements.median) {
+ measurementStatistics.median = median(measurementStatistics.history)
+ } else if (measurementStatistics.median != null) {
+ delete measurementStatistics.median
+ }
+ }
+ }
+}
+
+export const createWorker = <Worker extends IWorker>(
+ type: WorkerType,
+ filePath: string,
+ opts: { env?: Record<string, unknown>, workerOptions?: WorkerOptions }
+): Worker => {
+ switch (type) {
+ case WorkerTypes.thread:
+ return new Worker(filePath, {
+ env: SHARE_ENV,
+ ...opts?.workerOptions
+ }) as unknown as Worker
+ case WorkerTypes.cluster:
+ return cluster.fork(opts?.env) as unknown as Worker
+ default:
+ // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
+ throw new Error(`Unknown worker type '${type}'`)
+ }
+}
+
+export const waitWorkerNodeEvents = async <
+ Worker extends IWorker,
+ Data = unknown
+>(
+ workerNode: IWorkerNode<Worker, Data>,
+ workerNodeEvent: string,
+ numberOfEventsToWait: number
+): Promise<number> => {
+ return await new Promise<number>(resolve => {
+ let events = 0
+ if (numberOfEventsToWait === 0) {
+ resolve(events)
+ return
+ }
+ workerNode.on(workerNodeEvent, () => {
+ ++events
+ if (events === numberOfEventsToWait) {
+ resolve(events)
+ }
+ })
+ })
+}