X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Futils.ts;h=5e21c38c0af657dab99aca255e80f0e39888b6a6;hb=b536cc10e28c013cbb615def4e1f251ddc019e50;hp=e08b31aa838537c1ceee37f284ff39ac53960308;hpb=bfc75ccaf49d915d2b7e73c92360787b3245321a;p=poolifier.git diff --git a/src/pools/utils.ts b/src/pools/utils.ts index e08b31aa..5e21c38c 100644 --- a/src/pools/utils.ts +++ b/src/pools/utils.ts @@ -1,4 +1,6 @@ import { existsSync } from 'node:fs' +import cluster from 'node:cluster' +import { SHARE_ENV, Worker, type WorkerOptions } from 'node:worker_threads' import { average, isPlainObject, max, median, min } from '../utils' import { type MeasurementStatisticsRequirements, @@ -6,15 +8,21 @@ import { type WorkerChoiceStrategy } from './selection-strategies/selection-strategies-types' import type { TasksQueueOptions } from './pool' -import type { IWorker, MeasurementStatistics } from './worker' +import { + type IWorker, + type IWorkerNode, + type MeasurementStatistics, + type WorkerNodeOptions, + type WorkerType, + WorkerTypes +} from './worker' export const checkFilePath = (filePath: string): void => { - if ( - filePath == null || - typeof filePath !== 'string' || - (typeof filePath === 'string' && filePath.trim().length === 0) - ) { - throw new Error('Please specify a file with a worker implementation') + if (filePath == null) { + throw new TypeError('The worker file path must be specified') + } + if (typeof filePath !== 'string') { + throw new TypeError('The worker file path must be a string') } if (!existsSync(filePath)) { throw new Error(`Cannot find the worker file '${filePath}'`) @@ -93,26 +101,43 @@ export const checkValidTasksQueueOptions = ( } } -export const checkWorkerNodeArguments = ( - worker: Worker, - tasksQueueBackPressureSize: number +export const checkWorkerNodeArguments = ( + type: WorkerType, + filePath: string, + opts: WorkerNodeOptions ): void => { - if (worker == null) { - throw new TypeError('Cannot construct a worker node without a worker') + if (type == null) { + throw new TypeError('Cannot construct a worker node without a worker type') + } + if (!Object.values(WorkerTypes).includes(type)) { + throw new TypeError( + `Cannot construct a worker node with an invalid worker type '${type}'` + ) + } + checkFilePath(filePath) + if (opts == null) { + throw new TypeError( + 'Cannot construct a worker node without worker node options' + ) } - if (tasksQueueBackPressureSize == null) { + if (!isPlainObject(opts)) { throw new TypeError( - 'Cannot construct a worker node without a tasks queue back pressure size' + 'Cannot construct a worker node with invalid options: must be a plain object' ) } - if (!Number.isSafeInteger(tasksQueueBackPressureSize)) { + if (opts.tasksQueueBackPressureSize == null) { throw new TypeError( - 'Cannot construct a worker node with a tasks queue back pressure size that is not an integer' + 'Cannot construct a worker node without a tasks queue back pressure size option' ) } - if (tasksQueueBackPressureSize <= 0) { + if (!Number.isSafeInteger(opts.tasksQueueBackPressureSize)) { + throw new TypeError( + 'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer' + ) + } + if (opts.tasksQueueBackPressureSize <= 0) { throw new RangeError( - 'Cannot construct a worker node with a tasks queue back pressure size that is not a positive integer' + 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer' ) } } @@ -160,3 +185,45 @@ export const updateMeasurementStatistics = ( } } } + +export const createWorker = ( + type: WorkerType, + filePath: string, + opts: { env?: Record, workerOptions?: WorkerOptions } +): Worker => { + switch (type) { + case WorkerTypes.thread: + return new Worker(filePath, { + env: SHARE_ENV, + ...opts?.workerOptions + }) as unknown as Worker + case WorkerTypes.cluster: + return cluster.fork(opts?.env) as unknown as Worker + default: + // eslint-disable-next-line @typescript-eslint/restrict-template-expressions + throw new Error(`Unknown worker type '${type}'`) + } +} + +export const waitWorkerNodeEvents = async < + Worker extends IWorker, + Data = unknown +>( + workerNode: IWorkerNode, + workerNodeEvent: string, + numberOfEventsToWait: number +): Promise => { + return await new Promise(resolve => { + let events = 0 + if (numberOfEventsToWait === 0) { + resolve(events) + return + } + workerNode.on(workerNodeEvent, () => { + ++events + if (events === numberOfEventsToWait) { + resolve(events) + } + }) + }) +}