X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Futils.ts;h=f2ede8b6d2b07d44316cfdaa14abd91cb10204cc;hb=f12182ad6dc553c7a5dfeee01bcde65c0177f671;hp=c7ab2d1e664894bbfba218103ee9d85b15e8b0e8;hpb=6f3a391bc171dd4492a67362b4f1f97a074906ad;p=poolifier.git diff --git a/src/pools/utils.ts b/src/pools/utils.ts index c7ab2d1e..f2ede8b6 100644 --- a/src/pools/utils.ts +++ b/src/pools/utils.ts @@ -1,15 +1,21 @@ +import cluster, { Worker as ClusterWorker } from 'node:cluster' import { existsSync } from 'node:fs' -import cluster from 'node:cluster' -import { SHARE_ENV, Worker, type WorkerOptions } from 'node:worker_threads' import { env } from 'node:process' -import { average, isPlainObject, max, median, min } from '../utils' -import type { MessageValue, Task } from '../utility-types' +import { + SHARE_ENV, + Worker as ThreadWorker, + type WorkerOptions +} from 'node:worker_threads' + +import type { MessageValue, Task } from '../utility-types.js' +import { average, isPlainObject, max, median, min } from '../utils.js' +import type { TasksQueueOptions } from './pool.js' import { type MeasurementStatisticsRequirements, WorkerChoiceStrategies, type WorkerChoiceStrategy -} from './selection-strategies/selection-strategies-types' -import type { TasksQueueOptions } from './pool' +} from './selection-strategies/selection-strategies-types.js' +import type { WorkerChoiceStrategiesContext } from './selection-strategies/worker-choice-strategies-context.js' import { type IWorker, type IWorkerNode, @@ -18,8 +24,17 @@ import { type WorkerType, WorkerTypes, type WorkerUsage -} from './worker' -import type { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context' +} from './worker.js' + +/** + * Default measurement statistics requirements. + */ +export const DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS: MeasurementStatisticsRequirements = + { + aggregate: false, + average: false, + median: false + } export const getDefaultTasksQueueOptions = ( poolMaxSize: number @@ -28,12 +43,12 @@ export const getDefaultTasksQueueOptions = ( size: Math.pow(poolMaxSize, 2), concurrency: 1, taskStealing: true, - tasksStealingOnBackPressure: true, + tasksStealingOnBackPressure: false, tasksFinishedTimeout: 2000 } } -export const checkFilePath = (filePath: string): void => { +export const checkFilePath = (filePath: string | undefined): void => { if (filePath == null) { throw new TypeError('The worker file path must be specified') } @@ -45,7 +60,10 @@ export const checkFilePath = (filePath: string): void => { } } -export const checkDynamicPoolSize = (min: number, max: number): void => { +export const checkDynamicPoolSize = ( + min: number, + max: number | undefined +): void => { if (max == null) { throw new TypeError( 'Cannot instantiate a dynamic pool without specifying the maximum pool size' @@ -69,8 +87,21 @@ export const checkDynamicPoolSize = (min: number, max: number): void => { } } +export const checkValidPriority = (priority: number | undefined): void => { + if (priority != null && !Number.isSafeInteger(priority)) { + throw new TypeError(`Invalid property 'priority': '${priority}'`) + } + if ( + priority != null && + Number.isSafeInteger(priority) && + (priority < -20 || priority > 19) + ) { + throw new RangeError("Property 'priority' must be between -20 and 19") + } +} + export const checkValidWorkerChoiceStrategy = ( - workerChoiceStrategy: WorkerChoiceStrategy + workerChoiceStrategy: WorkerChoiceStrategy | undefined ): void => { if ( workerChoiceStrategy != null && @@ -81,7 +112,7 @@ export const checkValidWorkerChoiceStrategy = ( } export const checkValidTasksQueueOptions = ( - tasksQueueOptions: TasksQueueOptions + tasksQueueOptions: TasksQueueOptions | undefined ): void => { if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) { throw new TypeError('Invalid tasks queue options: must be a plain object') @@ -118,9 +149,9 @@ export const checkValidTasksQueueOptions = ( } export const checkWorkerNodeArguments = ( - type: WorkerType, - filePath: string, - opts: WorkerNodeOptions + type: WorkerType | undefined, + filePath: string | undefined, + opts: WorkerNodeOptions | undefined ): void => { if (type == null) { throw new TypeError('Cannot construct a worker node without a worker type') @@ -156,6 +187,21 @@ export const checkWorkerNodeArguments = ( 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer' ) } + if (opts.tasksQueueBucketSize == null) { + throw new TypeError( + 'Cannot construct a worker node without a tasks queue bucket size option' + ) + } + if (!Number.isSafeInteger(opts.tasksQueueBucketSize)) { + throw new TypeError( + 'Cannot construct a worker node with a tasks queue bucket size option that is not an integer' + ) + } + if (opts.tasksQueueBucketSize <= 0) { + throw new RangeError( + 'Cannot construct a worker node with a tasks queue bucket size option that is not a positive integer' + ) + } } /** @@ -164,37 +210,41 @@ export const checkWorkerNodeArguments = ( * @param measurementStatistics - The measurement statistics to update. * @param measurementRequirements - The measurement statistics requirements. * @param measurementValue - The measurement value. - * @param numberOfMeasurements - The number of measurements. * @internal */ const updateMeasurementStatistics = ( measurementStatistics: MeasurementStatistics, - measurementRequirements: MeasurementStatisticsRequirements, - measurementValue: number + measurementRequirements: MeasurementStatisticsRequirements | undefined, + measurementValue: number | undefined ): void => { - if (measurementRequirements.aggregate) { + if ( + measurementRequirements != null && + measurementValue != null && + measurementRequirements.aggregate + ) { measurementStatistics.aggregate = (measurementStatistics.aggregate ?? 0) + measurementValue measurementStatistics.minimum = min( measurementValue, - measurementStatistics.minimum ?? Infinity + measurementStatistics.minimum ?? Number.POSITIVE_INFINITY ) measurementStatistics.maximum = max( measurementValue, - measurementStatistics.maximum ?? -Infinity + measurementStatistics.maximum ?? Number.NEGATIVE_INFINITY ) - if ( - (measurementRequirements.average || measurementRequirements.median) && - measurementValue != null - ) { - measurementStatistics.history.push(measurementValue) + if (measurementRequirements.average || measurementRequirements.median) { + measurementStatistics.history.put(measurementValue) if (measurementRequirements.average) { - measurementStatistics.average = average(measurementStatistics.history) + measurementStatistics.average = average( + measurementStatistics.history.toArray() + ) } else if (measurementStatistics.average != null) { delete measurementStatistics.average } if (measurementRequirements.median) { - measurementStatistics.median = median(measurementStatistics.history) + measurementStatistics.median = median( + measurementStatistics.history.toArray() + ) } else if (measurementStatistics.median != null) { delete measurementStatistics.median } @@ -211,11 +261,9 @@ export const updateWaitTimeWorkerUsage = < Data = unknown, Response = unknown >( - workerChoiceStrategyContext: WorkerChoiceStrategyContext< - Worker, - Data, - Response - >, + workerChoiceStrategiesContext: + | WorkerChoiceStrategiesContext + | undefined, workerUsage: WorkerUsage, task: Task ): void => { @@ -223,7 +271,7 @@ export const updateWaitTimeWorkerUsage = < const taskWaitTime = timestamp - (task.timestamp ?? timestamp) updateMeasurementStatistics( workerUsage.waitTime, - workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime, + workerChoiceStrategiesContext?.getTaskStatisticsRequirements().waitTime, taskWaitTime ) } @@ -234,6 +282,7 @@ export const updateTaskStatisticsWorkerUsage = ( ): void => { const workerTaskStatistics = workerUsage.tasks if ( + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition workerTaskStatistics.executing != null && workerTaskStatistics.executing > 0 ) { @@ -251,11 +300,9 @@ export const updateRunTimeWorkerUsage = < Data = unknown, Response = unknown >( - workerChoiceStrategyContext: WorkerChoiceStrategyContext< - Worker, - Data, - Response - >, + workerChoiceStrategiesContext: + | WorkerChoiceStrategiesContext + | undefined, workerUsage: WorkerUsage, message: MessageValue ): void => { @@ -264,7 +311,7 @@ export const updateRunTimeWorkerUsage = < } updateMeasurementStatistics( workerUsage.runTime, - workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime, + workerChoiceStrategiesContext?.getTaskStatisticsRequirements().runTime, message.taskPerformance?.runTime ?? 0 ) } @@ -274,19 +321,17 @@ export const updateEluWorkerUsage = < Data = unknown, Response = unknown >( - workerChoiceStrategyContext: WorkerChoiceStrategyContext< - Worker, - Data, - Response - >, + workerChoiceStrategiesContext: + | WorkerChoiceStrategiesContext + | undefined, workerUsage: WorkerUsage, message: MessageValue ): void => { if (message.workerError != null) { return } - const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements = - workerChoiceStrategyContext.getTaskStatisticsRequirements().elu + const eluTaskStatisticsRequirements = + workerChoiceStrategiesContext?.getTaskStatisticsRequirements().elu updateMeasurementStatistics( workerUsage.elu.active, eluTaskStatisticsRequirements, @@ -297,7 +342,7 @@ export const updateEluWorkerUsage = < eluTaskStatisticsRequirements, message.taskPerformance?.elu?.idle ?? 0 ) - if (eluTaskStatisticsRequirements.aggregate) { + if (eluTaskStatisticsRequirements?.aggregate === true) { if (message.taskPerformance?.elu != null) { if (workerUsage.elu.utilization != null) { workerUsage.elu.utilization = @@ -318,18 +363,48 @@ export const createWorker = ( ): Worker => { switch (type) { case WorkerTypes.thread: - return new Worker(filePath, { + return new ThreadWorker(filePath, { env: SHARE_ENV, - ...opts?.workerOptions + ...opts.workerOptions }) as unknown as Worker case WorkerTypes.cluster: - return cluster.fork(opts?.env) as unknown as Worker + return cluster.fork(opts.env) as unknown as Worker default: // eslint-disable-next-line @typescript-eslint/restrict-template-expressions throw new Error(`Unknown worker type '${type}'`) } } +/** + * Returns the worker type of the given worker. + * + * @param worker - The worker to get the type of. + * @returns The worker type of the given worker. + * @internal + */ +export const getWorkerType = (worker: IWorker): WorkerType | undefined => { + if (worker instanceof ThreadWorker) { + return WorkerTypes.thread + } else if (worker instanceof ClusterWorker) { + return WorkerTypes.cluster + } +} + +/** + * Returns the worker id of the given worker. + * + * @param worker - The worker to get the id of. + * @returns The worker id of the given worker. + * @internal + */ +export const getWorkerId = (worker: IWorker): number | undefined => { + if (worker instanceof ThreadWorker) { + return worker.threadId + } else if (worker instanceof ClusterWorker) { + return worker.id + } +} + export const waitWorkerNodeEvents = async < Worker extends IWorker, Data = unknown @@ -345,12 +420,20 @@ export const waitWorkerNodeEvents = async < resolve(events) return } - workerNode.on(workerNodeEvent, () => { - ++events - if (events === numberOfEventsToWait) { - resolve(events) - } - }) + switch (workerNodeEvent) { + case 'idle': + case 'backPressure': + case 'taskFinished': + workerNode.on(workerNodeEvent, () => { + ++events + if (events === numberOfEventsToWait) { + resolve(events) + } + }) + break + default: + throw new Error('Invalid worker node event') + } if (timeout >= 0) { setTimeout(() => { resolve(events)