X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Futils.ts;h=619e2061f382cd82e8b30666149fd4ef11578045;hb=9d9fb7b64a28d150583fe1bcd895893f6736d9d4;hp=e534ead2363947b40096c2734517a1d7c1ea3550;hpb=9a38f99e676160c0bc7d28fe88f27b01fa31b5a1;p=poolifier.git diff --git a/src/pools/utils.ts b/src/pools/utils.ts index e534ead2..619e2061 100644 --- a/src/pools/utils.ts +++ b/src/pools/utils.ts @@ -1,19 +1,44 @@ import { existsSync } from 'node:fs' -import { isPlainObject } from '../utils' +import cluster from 'node:cluster' +import { SHARE_ENV, Worker, type WorkerOptions } from 'node:worker_threads' +import { env } from 'node:process' +import { average, isPlainObject, max, median, min } from '../utils' +import type { MessageValue, Task } from '../utility-types' import { + type MeasurementStatisticsRequirements, WorkerChoiceStrategies, type WorkerChoiceStrategy } from './selection-strategies/selection-strategies-types' import type { TasksQueueOptions } from './pool' -import type { IWorker } from './worker' +import { + type IWorker, + type IWorkerNode, + type MeasurementStatistics, + type WorkerNodeOptions, + type WorkerType, + WorkerTypes, + type WorkerUsage +} from './worker' +import type { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context' + +export const getDefaultTasksQueueOptions = ( + poolMaxSize: number +): Required => { + return { + size: Math.pow(poolMaxSize, 2), + concurrency: 1, + taskStealing: true, + tasksStealingOnBackPressure: true, + tasksFinishedTimeout: 2000 + } +} export const checkFilePath = (filePath: string): void => { - if ( - filePath == null || - typeof filePath !== 'string' || - (typeof filePath === 'string' && filePath.trim().length === 0) - ) { - throw new Error('Please specify a file with a worker implementation') + if (filePath == null) { + throw new TypeError('The worker file path must be specified') + } + if (typeof filePath !== 'string') { + throw new TypeError('The worker file path must be a string') } if (!existsSync(filePath)) { throw new Error(`Cannot find the worker file '${filePath}'`) @@ -91,26 +116,244 @@ export const checkValidTasksQueueOptions = ( ) } } -export const checkWorkerNodeArguments = ( - worker: Worker, - tasksQueueBackPressureSize: number + +export const checkWorkerNodeArguments = ( + type: WorkerType, + filePath: string, + opts: WorkerNodeOptions ): void => { - if (worker == null) { - throw new TypeError('Cannot construct a worker node without a worker') + if (type == null) { + throw new TypeError('Cannot construct a worker node without a worker type') + } + if (!Object.values(WorkerTypes).includes(type)) { + throw new TypeError( + `Cannot construct a worker node with an invalid worker type '${type}'` + ) } - if (tasksQueueBackPressureSize == null) { + checkFilePath(filePath) + if (opts == null) { throw new TypeError( - 'Cannot construct a worker node without a tasks queue back pressure size' + 'Cannot construct a worker node without worker node options' ) } - if (!Number.isSafeInteger(tasksQueueBackPressureSize)) { + if (!isPlainObject(opts)) { throw new TypeError( - 'Cannot construct a worker node with a tasks queue back pressure size that is not an integer' + 'Cannot construct a worker node with invalid options: must be a plain object' ) } - if (tasksQueueBackPressureSize <= 0) { + if (opts.tasksQueueBackPressureSize == null) { + throw new TypeError( + 'Cannot construct a worker node without a tasks queue back pressure size option' + ) + } + if (!Number.isSafeInteger(opts.tasksQueueBackPressureSize)) { + throw new TypeError( + 'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer' + ) + } + if (opts.tasksQueueBackPressureSize <= 0) { throw new RangeError( - 'Cannot construct a worker node with a tasks queue back pressure size that is not a positive integer' + 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer' + ) + } +} + +/** + * Updates the given measurement statistics. + * + * @param measurementStatistics - The measurement statistics to update. + * @param measurementRequirements - The measurement statistics requirements. + * @param measurementValue - The measurement value. + * @internal + */ +const updateMeasurementStatistics = ( + measurementStatistics: MeasurementStatistics, + measurementRequirements: MeasurementStatisticsRequirements, + measurementValue: number +): void => { + if (measurementRequirements.aggregate) { + measurementStatistics.aggregate = + (measurementStatistics.aggregate ?? 0) + measurementValue + measurementStatistics.minimum = min( + measurementValue, + measurementStatistics.minimum ?? Infinity + ) + measurementStatistics.maximum = max( + measurementValue, + measurementStatistics.maximum ?? -Infinity ) + if ( + (measurementRequirements.average || measurementRequirements.median) && + measurementValue != null + ) { + measurementStatistics.history.push(measurementValue) + if (measurementRequirements.average) { + measurementStatistics.average = average(measurementStatistics.history) + } else if (measurementStatistics.average != null) { + delete measurementStatistics.average + } + if (measurementRequirements.median) { + measurementStatistics.median = median(measurementStatistics.history) + } else if (measurementStatistics.median != null) { + delete measurementStatistics.median + } + } } } +if (env.NODE_ENV === 'test') { + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + exports.updateMeasurementStatistics = updateMeasurementStatistics +} + +export const updateWaitTimeWorkerUsage = < + Worker extends IWorker, + Data = unknown, + Response = unknown +>( + workerChoiceStrategyContext: WorkerChoiceStrategyContext< + Worker, + Data, + Response + >, + workerUsage: WorkerUsage, + task: Task + ): void => { + const timestamp = performance.now() + const taskWaitTime = timestamp - (task.timestamp ?? timestamp) + updateMeasurementStatistics( + workerUsage.waitTime, + workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime, + taskWaitTime + ) +} + +export const updateTaskStatisticsWorkerUsage = ( + workerUsage: WorkerUsage, + message: MessageValue +): void => { + const workerTaskStatistics = workerUsage.tasks + if ( + workerTaskStatistics.executing != null && + workerTaskStatistics.executing > 0 + ) { + --workerTaskStatistics.executing + } + if (message.workerError == null) { + ++workerTaskStatistics.executed + } else { + ++workerTaskStatistics.failed + } +} + +export const updateRunTimeWorkerUsage = < + Worker extends IWorker, + Data = unknown, + Response = unknown +>( + workerChoiceStrategyContext: WorkerChoiceStrategyContext< + Worker, + Data, + Response + >, + workerUsage: WorkerUsage, + message: MessageValue + ): void => { + if (message.workerError != null) { + return + } + updateMeasurementStatistics( + workerUsage.runTime, + workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime, + message.taskPerformance?.runTime ?? 0 + ) +} + +export const updateEluWorkerUsage = < + Worker extends IWorker, + Data = unknown, + Response = unknown +>( + workerChoiceStrategyContext: WorkerChoiceStrategyContext< + Worker, + Data, + Response + >, + workerUsage: WorkerUsage, + message: MessageValue + ): void => { + if (message.workerError != null) { + return + } + const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements = + workerChoiceStrategyContext.getTaskStatisticsRequirements().elu + updateMeasurementStatistics( + workerUsage.elu.active, + eluTaskStatisticsRequirements, + message.taskPerformance?.elu?.active ?? 0 + ) + updateMeasurementStatistics( + workerUsage.elu.idle, + eluTaskStatisticsRequirements, + message.taskPerformance?.elu?.idle ?? 0 + ) + if (eluTaskStatisticsRequirements.aggregate) { + if (message.taskPerformance?.elu != null) { + if (workerUsage.elu.utilization != null) { + workerUsage.elu.utilization = + (workerUsage.elu.utilization + + message.taskPerformance.elu.utilization) / + 2 + } else { + workerUsage.elu.utilization = message.taskPerformance.elu.utilization + } + } + } +} + +export const createWorker = ( + type: WorkerType, + filePath: string, + opts: { env?: Record, workerOptions?: WorkerOptions } +): Worker => { + switch (type) { + case WorkerTypes.thread: + return new Worker(filePath, { + env: SHARE_ENV, + ...opts?.workerOptions + }) as unknown as Worker + case WorkerTypes.cluster: + return cluster.fork(opts?.env) as unknown as Worker + default: + // eslint-disable-next-line @typescript-eslint/restrict-template-expressions + throw new Error(`Unknown worker type '${type}'`) + } +} + +export const waitWorkerNodeEvents = async < + Worker extends IWorker, + Data = unknown +>( + workerNode: IWorkerNode, + workerNodeEvent: string, + numberOfEventsToWait: number, + timeout: number +): Promise => { + return await new Promise(resolve => { + let events = 0 + if (numberOfEventsToWait === 0) { + resolve(events) + return + } + workerNode.on(workerNodeEvent, () => { + ++events + if (events === numberOfEventsToWait) { + resolve(events) + } + }) + if (timeout >= 0) { + setTimeout(() => { + resolve(events) + }, timeout) + } + }) +}