X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Futils.ts;h=24f2cfb2a6d075b2133473bef04aa9a650607c5b;hb=27469db4f589429f5dd309187b1de2aae0275cb0;hp=6a6c1fa9d3aa9d14946d760e09301ad878ce901a;hpb=3e931141fe4cbbb1221697a1ee4fd26f4c419c82;p=poolifier.git diff --git a/src/pools/utils.ts b/src/pools/utils.ts index 6a6c1fa9..24f2cfb2 100644 --- a/src/pools/utils.ts +++ b/src/pools/utils.ts @@ -4,7 +4,7 @@ import { env } from 'node:process' import { SHARE_ENV, Worker as ThreadWorker, - type WorkerOptions + type WorkerOptions, } from 'node:worker_threads' import type { MessageValue, Task } from '../utility-types.js' @@ -13,7 +13,7 @@ import type { TasksQueueOptions } from './pool.js' import { type MeasurementStatisticsRequirements, WorkerChoiceStrategies, - type WorkerChoiceStrategy + type WorkerChoiceStrategy, } from './selection-strategies/selection-strategies-types.js' import type { WorkerChoiceStrategiesContext } from './selection-strategies/worker-choice-strategies-context.js' import { @@ -23,7 +23,7 @@ import { type WorkerNodeOptions, type WorkerType, WorkerTypes, - type WorkerUsage + type WorkerUsage, } from './worker.js' /** @@ -33,7 +33,7 @@ export const DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS: MeasurementStatisticsR { aggregate: false, average: false, - median: false + median: false, } export const getDefaultTasksQueueOptions = ( @@ -43,8 +43,8 @@ export const getDefaultTasksQueueOptions = ( size: Math.pow(poolMaxSize, 2), concurrency: 1, taskStealing: true, - tasksStealingOnBackPressure: true, - tasksFinishedTimeout: 2000 + tasksStealingOnBackPressure: false, + tasksFinishedTimeout: 2000, } } @@ -87,6 +87,19 @@ export const checkDynamicPoolSize = ( } } +export const checkValidPriority = (priority: number | undefined): void => { + if (priority != null && !Number.isSafeInteger(priority)) { + throw new TypeError(`Invalid property 'priority': '${priority.toString()}'`) + } + if ( + priority != null && + Number.isSafeInteger(priority) && + (priority < -20 || priority > 19) + ) { + throw new RangeError("Property 'priority' must be between -20 and 19") + } +} + export const checkValidWorkerChoiceStrategy = ( workerChoiceStrategy: WorkerChoiceStrategy | undefined ): void => { @@ -117,7 +130,7 @@ export const checkValidTasksQueueOptions = ( tasksQueueOptions.concurrency <= 0 ) { throw new RangeError( - `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero` + `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency.toString()} is a negative integer or zero` ) } if ( @@ -130,7 +143,7 @@ export const checkValidTasksQueueOptions = ( } if (tasksQueueOptions?.size != null && tasksQueueOptions.size <= 0) { throw new RangeError( - `Invalid worker node tasks queue size: ${tasksQueueOptions.size} is a negative integer or zero` + `Invalid worker node tasks queue size: ${tasksQueueOptions.size.toString()} is a negative integer or zero` ) } } @@ -156,7 +169,7 @@ export const checkWorkerNodeArguments = ( } if (!isPlainObject(opts)) { throw new TypeError( - 'Cannot construct a worker node with invalid options: must be a plain object' + 'Cannot construct a worker node with invalid worker node options: must be a plain object' ) } if (opts.tasksQueueBackPressureSize == null) { @@ -174,11 +187,35 @@ export const checkWorkerNodeArguments = ( 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer' ) } + if (opts.tasksQueueBucketSize == null) { + throw new TypeError( + 'Cannot construct a worker node without a tasks queue bucket size option' + ) + } + if (!Number.isSafeInteger(opts.tasksQueueBucketSize)) { + throw new TypeError( + 'Cannot construct a worker node with a tasks queue bucket size option that is not an integer' + ) + } + if (opts.tasksQueueBucketSize <= 0) { + throw new RangeError( + 'Cannot construct a worker node with a tasks queue bucket size option that is not a positive integer' + ) + } + if (opts.tasksQueuePriority == null) { + throw new TypeError( + 'Cannot construct a worker node without a tasks queue priority option' + ) + } + if (typeof opts.tasksQueuePriority !== 'boolean') { + throw new TypeError( + 'Cannot construct a worker node with a tasks queue priority option that is not a boolean' + ) + } } /** * Updates the given measurement statistics. - * * @param measurementStatistics - The measurement statistics to update. * @param measurementRequirements - The measurement statistics requirements. * @param measurementValue - The measurement value. @@ -198,21 +235,25 @@ const updateMeasurementStatistics = ( (measurementStatistics.aggregate ?? 0) + measurementValue measurementStatistics.minimum = min( measurementValue, - measurementStatistics.minimum ?? Infinity + measurementStatistics.minimum ?? Number.POSITIVE_INFINITY ) measurementStatistics.maximum = max( measurementValue, - measurementStatistics.maximum ?? -Infinity + measurementStatistics.maximum ?? Number.NEGATIVE_INFINITY ) if (measurementRequirements.average || measurementRequirements.median) { - measurementStatistics.history.push(measurementValue) + measurementStatistics.history.put(measurementValue) if (measurementRequirements.average) { - measurementStatistics.average = average(measurementStatistics.history) + measurementStatistics.average = average( + measurementStatistics.history.toArray() + ) } else if (measurementStatistics.average != null) { delete measurementStatistics.average } if (measurementRequirements.median) { - measurementStatistics.median = median(measurementStatistics.history) + measurementStatistics.median = median( + measurementStatistics.history.toArray() + ) } else if (measurementStatistics.median != null) { delete measurementStatistics.median } @@ -229,7 +270,7 @@ export const updateWaitTimeWorkerUsage = < Data = unknown, Response = unknown >( - workerChoiceStrategyContext: + workerChoiceStrategiesContext: | WorkerChoiceStrategiesContext | undefined, workerUsage: WorkerUsage, @@ -239,7 +280,7 @@ export const updateWaitTimeWorkerUsage = < const taskWaitTime = timestamp - (task.timestamp ?? timestamp) updateMeasurementStatistics( workerUsage.waitTime, - workerChoiceStrategyContext?.getTaskStatisticsRequirements().waitTime, + workerChoiceStrategiesContext?.getTaskStatisticsRequirements().waitTime, taskWaitTime ) } @@ -268,7 +309,7 @@ export const updateRunTimeWorkerUsage = < Data = unknown, Response = unknown >( - workerChoiceStrategyContext: + workerChoiceStrategiesContext: | WorkerChoiceStrategiesContext | undefined, workerUsage: WorkerUsage, @@ -279,7 +320,7 @@ export const updateRunTimeWorkerUsage = < } updateMeasurementStatistics( workerUsage.runTime, - workerChoiceStrategyContext?.getTaskStatisticsRequirements().runTime, + workerChoiceStrategiesContext?.getTaskStatisticsRequirements().runTime, message.taskPerformance?.runTime ?? 0 ) } @@ -289,7 +330,7 @@ export const updateEluWorkerUsage = < Data = unknown, Response = unknown >( - workerChoiceStrategyContext: + workerChoiceStrategiesContext: | WorkerChoiceStrategiesContext | undefined, workerUsage: WorkerUsage, @@ -299,7 +340,7 @@ export const updateEluWorkerUsage = < return } const eluTaskStatisticsRequirements = - workerChoiceStrategyContext?.getTaskStatisticsRequirements().elu + workerChoiceStrategiesContext?.getTaskStatisticsRequirements().elu updateMeasurementStatistics( workerUsage.elu.active, eluTaskStatisticsRequirements, @@ -327,13 +368,13 @@ export const updateEluWorkerUsage = < export const createWorker = ( type: WorkerType, filePath: string, - opts: { env?: Record, workerOptions?: WorkerOptions } + opts: { env?: Record; workerOptions?: WorkerOptions } ): Worker => { switch (type) { case WorkerTypes.thread: return new ThreadWorker(filePath, { env: SHARE_ENV, - ...opts.workerOptions + ...opts.workerOptions, }) as unknown as Worker case WorkerTypes.cluster: return cluster.fork(opts.env) as unknown as Worker @@ -345,7 +386,6 @@ export const createWorker = ( /** * Returns the worker type of the given worker. - * * @param worker - The worker to get the type of. * @returns The worker type of the given worker. * @internal @@ -360,7 +400,6 @@ export const getWorkerType = (worker: IWorker): WorkerType | undefined => { /** * Returns the worker id of the given worker. - * * @param worker - The worker to get the id of. * @returns The worker id of the given worker. * @internal @@ -388,12 +427,20 @@ export const waitWorkerNodeEvents = async < resolve(events) return } - workerNode.on(workerNodeEvent, () => { - ++events - if (events === numberOfEventsToWait) { - resolve(events) - } - }) + switch (workerNodeEvent) { + case 'idle': + case 'backPressure': + case 'taskFinished': + workerNode.on(workerNodeEvent, () => { + ++events + if (events === numberOfEventsToWait) { + resolve(events) + } + }) + break + default: + throw new Error('Invalid worker node event') + } if (timeout >= 0) { setTimeout(() => { resolve(events)