X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Futils.ts;h=df9c256e95390d0731b730ae6726678d5bcbdfb7;hb=71ccb3ae4a6c834371a799746ed7802e8c98de4c;hp=c0e803a2f4cb5a2bd88b3350e98b3a754c4e5b51;hpb=c63a35a04c190989be80f9218d97e0aca739475e;p=poolifier.git diff --git a/src/pools/utils.ts b/src/pools/utils.ts index c0e803a2..df9c256e 100644 --- a/src/pools/utils.ts +++ b/src/pools/utils.ts @@ -1,15 +1,21 @@ +import cluster, { Worker as ClusterWorker } from 'node:cluster' import { existsSync } from 'node:fs' -import cluster from 'node:cluster' -import { SHARE_ENV, Worker, type WorkerOptions } from 'node:worker_threads' import { env } from 'node:process' -import { average, isPlainObject, max, median, min } from '../utils.js' +import { + SHARE_ENV, + Worker as ThreadWorker, + type WorkerOptions +} from 'node:worker_threads' + import type { MessageValue, Task } from '../utility-types.js' +import { average, isPlainObject, max, median, min } from '../utils.js' +import type { TasksQueueOptions } from './pool.js' import { type MeasurementStatisticsRequirements, WorkerChoiceStrategies, type WorkerChoiceStrategy } from './selection-strategies/selection-strategies-types.js' -import type { TasksQueueOptions } from './pool.js' +import type { WorkerChoiceStrategiesContext } from './selection-strategies/worker-choice-strategies-context.js' import { type IWorker, type IWorkerNode, @@ -19,7 +25,16 @@ import { WorkerTypes, type WorkerUsage } from './worker.js' -import type { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context.js' + +/** + * Default measurement statistics requirements. + */ +export const DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS: MeasurementStatisticsRequirements = + { + aggregate: false, + average: false, + median: false + } export const getDefaultTasksQueueOptions = ( poolMaxSize: number @@ -28,7 +43,7 @@ export const getDefaultTasksQueueOptions = ( size: Math.pow(poolMaxSize, 2), concurrency: 1, taskStealing: true, - tasksStealingOnBackPressure: true, + tasksStealingOnBackPressure: false, tasksFinishedTimeout: 2000 } } @@ -72,6 +87,19 @@ export const checkDynamicPoolSize = ( } } +export const checkValidPriority = (priority: number | undefined): void => { + if (priority != null && !Number.isSafeInteger(priority)) { + throw new TypeError(`Invalid property 'priority': '${priority}'`) + } + if ( + priority != null && + Number.isSafeInteger(priority) && + (priority < -20 || priority > 19) + ) { + throw new RangeError("Property 'priority' must be between -20 and 19") + } +} + export const checkValidWorkerChoiceStrategy = ( workerChoiceStrategy: WorkerChoiceStrategy | undefined ): void => { @@ -159,6 +187,21 @@ export const checkWorkerNodeArguments = ( 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer' ) } + if (opts.tasksQueueBucketSize == null) { + throw new TypeError( + 'Cannot construct a worker node without a tasks queue bucket size option' + ) + } + if (!Number.isSafeInteger(opts.tasksQueueBucketSize)) { + throw new TypeError( + 'Cannot construct a worker node with a tasks queue bucket size option that is not an integer' + ) + } + if (opts.tasksQueueBucketSize <= 0) { + throw new RangeError( + 'Cannot construct a worker node with a tasks queue bucket size option that is not a positive integer' + ) + } } /** @@ -214,8 +257,8 @@ export const updateWaitTimeWorkerUsage = < Data = unknown, Response = unknown >( - workerChoiceStrategyContext: - | WorkerChoiceStrategyContext + workerChoiceStrategiesContext: + | WorkerChoiceStrategiesContext | undefined, workerUsage: WorkerUsage, task: Task @@ -224,7 +267,7 @@ export const updateWaitTimeWorkerUsage = < const taskWaitTime = timestamp - (task.timestamp ?? timestamp) updateMeasurementStatistics( workerUsage.waitTime, - workerChoiceStrategyContext?.getTaskStatisticsRequirements()?.waitTime, + workerChoiceStrategiesContext?.getTaskStatisticsRequirements().waitTime, taskWaitTime ) } @@ -253,8 +296,8 @@ export const updateRunTimeWorkerUsage = < Data = unknown, Response = unknown >( - workerChoiceStrategyContext: - | WorkerChoiceStrategyContext + workerChoiceStrategiesContext: + | WorkerChoiceStrategiesContext | undefined, workerUsage: WorkerUsage, message: MessageValue @@ -264,7 +307,7 @@ export const updateRunTimeWorkerUsage = < } updateMeasurementStatistics( workerUsage.runTime, - workerChoiceStrategyContext?.getTaskStatisticsRequirements()?.runTime, + workerChoiceStrategiesContext?.getTaskStatisticsRequirements().runTime, message.taskPerformance?.runTime ?? 0 ) } @@ -274,8 +317,8 @@ export const updateEluWorkerUsage = < Data = unknown, Response = unknown >( - workerChoiceStrategyContext: - | WorkerChoiceStrategyContext + workerChoiceStrategiesContext: + | WorkerChoiceStrategiesContext | undefined, workerUsage: WorkerUsage, message: MessageValue @@ -284,7 +327,7 @@ export const updateEluWorkerUsage = < return } const eluTaskStatisticsRequirements = - workerChoiceStrategyContext?.getTaskStatisticsRequirements()?.elu + workerChoiceStrategiesContext?.getTaskStatisticsRequirements().elu updateMeasurementStatistics( workerUsage.elu.active, eluTaskStatisticsRequirements, @@ -316,7 +359,7 @@ export const createWorker = ( ): Worker => { switch (type) { case WorkerTypes.thread: - return new Worker(filePath, { + return new ThreadWorker(filePath, { env: SHARE_ENV, ...opts.workerOptions }) as unknown as Worker @@ -328,6 +371,36 @@ export const createWorker = ( } } +/** + * Returns the worker type of the given worker. + * + * @param worker - The worker to get the type of. + * @returns The worker type of the given worker. + * @internal + */ +export const getWorkerType = (worker: IWorker): WorkerType | undefined => { + if (worker instanceof ThreadWorker) { + return WorkerTypes.thread + } else if (worker instanceof ClusterWorker) { + return WorkerTypes.cluster + } +} + +/** + * Returns the worker id of the given worker. + * + * @param worker - The worker to get the id of. + * @returns The worker id of the given worker. + * @internal + */ +export const getWorkerId = (worker: IWorker): number | undefined => { + if (worker instanceof ThreadWorker) { + return worker.threadId + } else if (worker instanceof ClusterWorker) { + return worker.id + } +} + export const waitWorkerNodeEvents = async < Worker extends IWorker, Data = unknown