X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Futils.ts;h=724c4ab960c691e8f4a5d8661f9c442ff14d5b3d;hb=9a453f49fb0ee7c284e6540c840b41835bb4a0bb;hp=eb800c4ab50f9d39e2b6fc98ebb4cf39bfc134b5;hpb=7ef6859d79e3a30918f2c53c898e94cd22104622;p=poolifier.git diff --git a/src/pools/utils.ts b/src/pools/utils.ts index eb800c4a..724c4ab9 100644 --- a/src/pools/utils.ts +++ b/src/pools/utils.ts @@ -1,20 +1,156 @@ +import cluster, { Worker as ClusterWorker } from 'node:cluster' import { existsSync } from 'node:fs' -import { average, isPlainObject, max, median, min } from '../utils' +import { cpus } from 'node:os' +import { env } from 'node:process' +import { + SHARE_ENV, + Worker as ThreadWorker, + type WorkerOptions +} from 'node:worker_threads' + +import type { MessageValue, Task } from '../utility-types.js' +import { average, isPlainObject, max, median, min } from '../utils.js' +import type { IPool, TasksQueueOptions } from './pool.js' import { type MeasurementStatisticsRequirements, WorkerChoiceStrategies, - type WorkerChoiceStrategy -} from './selection-strategies/selection-strategies-types' -import type { TasksQueueOptions } from './pool' -import type { IWorker, MeasurementStatistics } from './worker' + type WorkerChoiceStrategy, + type WorkerChoiceStrategyOptions +} from './selection-strategies/selection-strategies-types.js' +import type { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context.js' +import { + type IWorker, + type IWorkerNode, + type MeasurementStatistics, + type WorkerNodeOptions, + type WorkerType, + WorkerTypes, + type WorkerUsage +} from './worker.js' -export const checkFilePath = (filePath: string): void => { +/** + * Default measurement statistics requirements. + */ +export const DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS: MeasurementStatisticsRequirements = + { + aggregate: false, + average: false, + median: false + } + +export const getDefaultTasksQueueOptions = ( + poolMaxSize: number +): Required => { + return { + size: Math.pow(poolMaxSize, 2), + concurrency: 1, + taskStealing: true, + tasksStealingOnBackPressure: true, + tasksFinishedTimeout: 2000 + } +} + +export const getWorkerChoiceStrategyRetries = < + Worker extends IWorker, + Data, + Response +>( + pool: IPool, + opts?: WorkerChoiceStrategyOptions + ): number => { + return ( + pool.info.maxSize + + Object.keys(opts?.weights ?? getDefaultWeights(pool.info.maxSize)).length + ) +} + +export const buildWorkerChoiceStrategyOptions = < + Worker extends IWorker, + Data, + Response +>( + pool: IPool, + opts?: WorkerChoiceStrategyOptions + ): WorkerChoiceStrategyOptions => { + opts = clone(opts ?? {}) + opts.weights = opts.weights ?? getDefaultWeights(pool.info.maxSize) + return { + ...{ + runTime: { median: false }, + waitTime: { median: false }, + elu: { median: false } + }, + ...opts + } +} + +const clone = (object: T): T => { + return structuredClone(object) +} + +const getDefaultWeights = ( + poolMaxSize: number, + defaultWorkerWeight?: number +): Record => { + defaultWorkerWeight = defaultWorkerWeight ?? getDefaultWorkerWeight() + const weights: Record = {} + for (let workerNodeKey = 0; workerNodeKey < poolMaxSize; workerNodeKey++) { + weights[workerNodeKey] = defaultWorkerWeight + } + return weights +} + +const estimatedCpuSpeed = (): number => { + const runs = 150000000 + const begin = performance.now() + // eslint-disable-next-line no-empty + for (let i = runs; i > 0; i--) {} + const end = performance.now() + const duration = end - begin + return Math.trunc(runs / duration / 1000) // in MHz +} + +const getDefaultWorkerWeight = (): number => { + const currentCpus = cpus() + let estCpuSpeed: number | undefined + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition + if (currentCpus.every(cpu => cpu.speed == null || cpu.speed === 0)) { + estCpuSpeed = estimatedCpuSpeed() + } + let cpusCycleTimeWeight = 0 + for (const cpu of currentCpus) { + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition + if (cpu.speed == null || cpu.speed === 0) { + cpu.speed = + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition + currentCpus.find(cpu => cpu.speed != null && cpu.speed !== 0)?.speed ?? + estCpuSpeed ?? + 2000 + } + // CPU estimated cycle time + const numberOfDigits = cpu.speed.toString().length - 1 + const cpuCycleTime = 1 / (cpu.speed / Math.pow(10, numberOfDigits)) + cpusCycleTimeWeight += cpuCycleTime * Math.pow(10, numberOfDigits) + } + return Math.round(cpusCycleTimeWeight / currentCpus.length) +} + +export const checkFilePath = (filePath: string | undefined): void => { + if (filePath == null) { + throw new TypeError('The worker file path must be specified') + } + if (typeof filePath !== 'string') { + throw new TypeError('The worker file path must be a string') + } if (!existsSync(filePath)) { throw new Error(`Cannot find the worker file '${filePath}'`) } } -export const checkDynamicPoolSize = (min: number, max: number): void => { +export const checkDynamicPoolSize = ( + min: number, + max: number | undefined +): void => { if (max == null) { throw new TypeError( 'Cannot instantiate a dynamic pool without specifying the maximum pool size' @@ -39,7 +175,7 @@ export const checkDynamicPoolSize = (min: number, max: number): void => { } export const checkValidWorkerChoiceStrategy = ( - workerChoiceStrategy: WorkerChoiceStrategy + workerChoiceStrategy: WorkerChoiceStrategy | undefined ): void => { if ( workerChoiceStrategy != null && @@ -50,7 +186,7 @@ export const checkValidWorkerChoiceStrategy = ( } export const checkValidTasksQueueOptions = ( - tasksQueueOptions: TasksQueueOptions + tasksQueueOptions: TasksQueueOptions | undefined ): void => { if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) { throw new TypeError('Invalid tasks queue options: must be a plain object') @@ -86,26 +222,43 @@ export const checkValidTasksQueueOptions = ( } } -export const checkWorkerNodeArguments = ( - worker: Worker, - tasksQueueBackPressureSize: number +export const checkWorkerNodeArguments = ( + type: WorkerType | undefined, + filePath: string | undefined, + opts: WorkerNodeOptions | undefined ): void => { - if (worker == null) { - throw new TypeError('Cannot construct a worker node without a worker') + if (type == null) { + throw new TypeError('Cannot construct a worker node without a worker type') + } + if (!Object.values(WorkerTypes).includes(type)) { + throw new TypeError( + `Cannot construct a worker node with an invalid worker type '${type}'` + ) + } + checkFilePath(filePath) + if (opts == null) { + throw new TypeError( + 'Cannot construct a worker node without worker node options' + ) } - if (tasksQueueBackPressureSize == null) { + if (!isPlainObject(opts)) { throw new TypeError( - 'Cannot construct a worker node without a tasks queue back pressure size' + 'Cannot construct a worker node with invalid options: must be a plain object' ) } - if (!Number.isSafeInteger(tasksQueueBackPressureSize)) { + if (opts.tasksQueueBackPressureSize == null) { throw new TypeError( - 'Cannot construct a worker node with a tasks queue back pressure size that is not an integer' + 'Cannot construct a worker node without a tasks queue back pressure size option' ) } - if (tasksQueueBackPressureSize <= 0) { + if (!Number.isSafeInteger(opts.tasksQueueBackPressureSize)) { + throw new TypeError( + 'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer' + ) + } + if (opts.tasksQueueBackPressureSize <= 0) { throw new RangeError( - 'Cannot construct a worker node with a tasks queue back pressure size that is not a positive integer' + 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer' ) } } @@ -116,15 +269,18 @@ export const checkWorkerNodeArguments = ( * @param measurementStatistics - The measurement statistics to update. * @param measurementRequirements - The measurement statistics requirements. * @param measurementValue - The measurement value. - * @param numberOfMeasurements - The number of measurements. * @internal */ -export const updateMeasurementStatistics = ( +const updateMeasurementStatistics = ( measurementStatistics: MeasurementStatistics, - measurementRequirements: MeasurementStatisticsRequirements, - measurementValue: number + measurementRequirements: MeasurementStatisticsRequirements | undefined, + measurementValue: number | undefined ): void => { - if (measurementRequirements.aggregate) { + if ( + measurementRequirements != null && + measurementValue != null && + measurementRequirements.aggregate + ) { measurementStatistics.aggregate = (measurementStatistics.aggregate ?? 0) + measurementValue measurementStatistics.minimum = min( @@ -135,10 +291,7 @@ export const updateMeasurementStatistics = ( measurementValue, measurementStatistics.maximum ?? -Infinity ) - if ( - (measurementRequirements.average || measurementRequirements.median) && - measurementValue != null - ) { + if (measurementRequirements.average || measurementRequirements.median) { measurementStatistics.history.push(measurementValue) if (measurementRequirements.average) { measurementStatistics.average = average(measurementStatistics.history) @@ -153,3 +306,185 @@ export const updateMeasurementStatistics = ( } } } +if (env.NODE_ENV === 'test') { + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + exports.updateMeasurementStatistics = updateMeasurementStatistics +} + +export const updateWaitTimeWorkerUsage = < + Worker extends IWorker, + Data = unknown, + Response = unknown +>( + workerChoiceStrategyContext: + | WorkerChoiceStrategyContext + | undefined, + workerUsage: WorkerUsage, + task: Task + ): void => { + const timestamp = performance.now() + const taskWaitTime = timestamp - (task.timestamp ?? timestamp) + updateMeasurementStatistics( + workerUsage.waitTime, + workerChoiceStrategyContext?.getTaskStatisticsRequirements().waitTime, + taskWaitTime + ) +} + +export const updateTaskStatisticsWorkerUsage = ( + workerUsage: WorkerUsage, + message: MessageValue +): void => { + const workerTaskStatistics = workerUsage.tasks + if ( + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition + workerTaskStatistics.executing != null && + workerTaskStatistics.executing > 0 + ) { + --workerTaskStatistics.executing + } + if (message.workerError == null) { + ++workerTaskStatistics.executed + } else { + ++workerTaskStatistics.failed + } +} + +export const updateRunTimeWorkerUsage = < + Worker extends IWorker, + Data = unknown, + Response = unknown +>( + workerChoiceStrategyContext: + | WorkerChoiceStrategyContext + | undefined, + workerUsage: WorkerUsage, + message: MessageValue + ): void => { + if (message.workerError != null) { + return + } + updateMeasurementStatistics( + workerUsage.runTime, + workerChoiceStrategyContext?.getTaskStatisticsRequirements().runTime, + message.taskPerformance?.runTime ?? 0 + ) +} + +export const updateEluWorkerUsage = < + Worker extends IWorker, + Data = unknown, + Response = unknown +>( + workerChoiceStrategyContext: + | WorkerChoiceStrategyContext + | undefined, + workerUsage: WorkerUsage, + message: MessageValue + ): void => { + if (message.workerError != null) { + return + } + const eluTaskStatisticsRequirements = + workerChoiceStrategyContext?.getTaskStatisticsRequirements().elu + updateMeasurementStatistics( + workerUsage.elu.active, + eluTaskStatisticsRequirements, + message.taskPerformance?.elu?.active ?? 0 + ) + updateMeasurementStatistics( + workerUsage.elu.idle, + eluTaskStatisticsRequirements, + message.taskPerformance?.elu?.idle ?? 0 + ) + if (eluTaskStatisticsRequirements?.aggregate === true) { + if (message.taskPerformance?.elu != null) { + if (workerUsage.elu.utilization != null) { + workerUsage.elu.utilization = + (workerUsage.elu.utilization + + message.taskPerformance.elu.utilization) / + 2 + } else { + workerUsage.elu.utilization = message.taskPerformance.elu.utilization + } + } + } +} + +export const createWorker = ( + type: WorkerType, + filePath: string, + opts: { env?: Record, workerOptions?: WorkerOptions } +): Worker => { + switch (type) { + case WorkerTypes.thread: + return new ThreadWorker(filePath, { + env: SHARE_ENV, + ...opts.workerOptions + }) as unknown as Worker + case WorkerTypes.cluster: + return cluster.fork(opts.env) as unknown as Worker + default: + // eslint-disable-next-line @typescript-eslint/restrict-template-expressions + throw new Error(`Unknown worker type '${type}'`) + } +} + +/** + * Returns the worker type of the given worker. + * + * @param worker - The worker to get the type of. + * @returns The worker type of the given worker. + * @internal + */ +export const getWorkerType = (worker: IWorker): WorkerType | undefined => { + if (worker instanceof ThreadWorker) { + return WorkerTypes.thread + } else if (worker instanceof ClusterWorker) { + return WorkerTypes.cluster + } +} + +/** + * Returns the worker id of the given worker. + * + * @param worker - The worker to get the id of. + * @returns The worker id of the given worker. + * @internal + */ +export const getWorkerId = (worker: IWorker): number | undefined => { + if (worker instanceof ThreadWorker) { + return worker.threadId + } else if (worker instanceof ClusterWorker) { + return worker.id + } +} + +export const waitWorkerNodeEvents = async < + Worker extends IWorker, + Data = unknown +>( + workerNode: IWorkerNode, + workerNodeEvent: string, + numberOfEventsToWait: number, + timeout: number +): Promise => { + return await new Promise(resolve => { + let events = 0 + if (numberOfEventsToWait === 0) { + resolve(events) + return + } + workerNode.on(workerNodeEvent, () => { + ++events + if (events === numberOfEventsToWait) { + resolve(events) + } + }) + if (timeout >= 0) { + setTimeout(() => { + resolve(events) + }, timeout) + } + }) +}