X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Futils.ts;h=df9c256e95390d0731b730ae6726678d5bcbdfb7;hb=2eee72204bc851f616ded11cb5381f96c6dc5cbf;hp=7c1b7866e6d958687ba6dc2e2c29827706caab4f;hpb=10c74f8a996cd15f294bdb31ad31d59ab0177db1;p=poolifier.git diff --git a/src/pools/utils.ts b/src/pools/utils.ts index 7c1b7866..df9c256e 100644 --- a/src/pools/utils.ts +++ b/src/pools/utils.ts @@ -1,6 +1,5 @@ import cluster, { Worker as ClusterWorker } from 'node:cluster' import { existsSync } from 'node:fs' -import { cpus } from 'node:os' import { env } from 'node:process' import { SHARE_ENV, @@ -10,14 +9,13 @@ import { import type { MessageValue, Task } from '../utility-types.js' import { average, isPlainObject, max, median, min } from '../utils.js' -import type { IPool, TasksQueueOptions } from './pool.js' +import type { TasksQueueOptions } from './pool.js' import { type MeasurementStatisticsRequirements, WorkerChoiceStrategies, - type WorkerChoiceStrategy, - type WorkerChoiceStrategyOptions + type WorkerChoiceStrategy } from './selection-strategies/selection-strategies-types.js' -import type { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context.js' +import type { WorkerChoiceStrategiesContext } from './selection-strategies/worker-choice-strategies-context.js' import { type IWorker, type IWorkerNode, @@ -45,91 +43,11 @@ export const getDefaultTasksQueueOptions = ( size: Math.pow(poolMaxSize, 2), concurrency: 1, taskStealing: true, - tasksStealingOnBackPressure: true, + tasksStealingOnBackPressure: false, tasksFinishedTimeout: 2000 } } -export const getWorkerChoiceStrategyRetries = < - Worker extends IWorker, - Data, - Response ->( - pool: IPool, - opts?: WorkerChoiceStrategyOptions - ): number => { - return ( - pool.info.maxSize + - Object.keys(opts?.weights ?? getDefaultWeights(pool.info.maxSize)).length - ) -} - -export const buildWorkerChoiceStrategyOptions = < - Worker extends IWorker, - Data, - Response ->( - pool: IPool, - opts?: WorkerChoiceStrategyOptions - ): WorkerChoiceStrategyOptions => { - opts = clone(opts ?? {}) - opts.weights = opts.weights ?? getDefaultWeights(pool.info.maxSize) - return { - ...{ - runTime: { median: false }, - waitTime: { median: false }, - elu: { median: false } - }, - ...opts - } -} - -const clone = (object: T): T => { - return structuredClone(object) -} - -const getDefaultWeights = ( - poolMaxSize: number, - defaultWorkerWeight?: number -): Record => { - defaultWorkerWeight = defaultWorkerWeight ?? getDefaultWorkerWeight() - const weights: Record = {} - for (let workerNodeKey = 0; workerNodeKey < poolMaxSize; workerNodeKey++) { - weights[workerNodeKey] = defaultWorkerWeight - } - return weights -} - -const estimatedCpuSpeed = (): number => { - const runs = 150000000 - const begin = performance.now() - // eslint-disable-next-line no-empty - for (let i = runs; i > 0; i--) {} - const end = performance.now() - const duration = end - begin - return Math.trunc(runs / duration / 1000) // in MHz -} - -const estCpuSpeed = estimatedCpuSpeed() - -const getDefaultWorkerWeight = (estimatedCpuSpeed = estCpuSpeed): number => { - let cpusCycleTimeWeight = 0 - for (const cpu of cpus()) { - // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition - if (cpu.speed == null || cpu.speed === 0) { - cpu.speed = - // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition - cpus().find(cpu => cpu.speed != null && cpu.speed !== 0)?.speed ?? - estimatedCpuSpeed - } - // CPU estimated cycle time - const numberOfDigits = cpu.speed.toString().length - 1 - const cpuCycleTime = 1 / (cpu.speed / Math.pow(10, numberOfDigits)) - cpusCycleTimeWeight += cpuCycleTime * Math.pow(10, numberOfDigits) - } - return Math.round(cpusCycleTimeWeight / cpus().length) -} - export const checkFilePath = (filePath: string | undefined): void => { if (filePath == null) { throw new TypeError('The worker file path must be specified') @@ -169,6 +87,19 @@ export const checkDynamicPoolSize = ( } } +export const checkValidPriority = (priority: number | undefined): void => { + if (priority != null && !Number.isSafeInteger(priority)) { + throw new TypeError(`Invalid property 'priority': '${priority}'`) + } + if ( + priority != null && + Number.isSafeInteger(priority) && + (priority < -20 || priority > 19) + ) { + throw new RangeError("Property 'priority' must be between -20 and 19") + } +} + export const checkValidWorkerChoiceStrategy = ( workerChoiceStrategy: WorkerChoiceStrategy | undefined ): void => { @@ -256,6 +187,21 @@ export const checkWorkerNodeArguments = ( 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer' ) } + if (opts.tasksQueueBucketSize == null) { + throw new TypeError( + 'Cannot construct a worker node without a tasks queue bucket size option' + ) + } + if (!Number.isSafeInteger(opts.tasksQueueBucketSize)) { + throw new TypeError( + 'Cannot construct a worker node with a tasks queue bucket size option that is not an integer' + ) + } + if (opts.tasksQueueBucketSize <= 0) { + throw new RangeError( + 'Cannot construct a worker node with a tasks queue bucket size option that is not a positive integer' + ) + } } /** @@ -311,8 +257,8 @@ export const updateWaitTimeWorkerUsage = < Data = unknown, Response = unknown >( - workerChoiceStrategyContext: - | WorkerChoiceStrategyContext + workerChoiceStrategiesContext: + | WorkerChoiceStrategiesContext | undefined, workerUsage: WorkerUsage, task: Task @@ -321,7 +267,7 @@ export const updateWaitTimeWorkerUsage = < const taskWaitTime = timestamp - (task.timestamp ?? timestamp) updateMeasurementStatistics( workerUsage.waitTime, - workerChoiceStrategyContext?.getTaskStatisticsRequirements().waitTime, + workerChoiceStrategiesContext?.getTaskStatisticsRequirements().waitTime, taskWaitTime ) } @@ -350,8 +296,8 @@ export const updateRunTimeWorkerUsage = < Data = unknown, Response = unknown >( - workerChoiceStrategyContext: - | WorkerChoiceStrategyContext + workerChoiceStrategiesContext: + | WorkerChoiceStrategiesContext | undefined, workerUsage: WorkerUsage, message: MessageValue @@ -361,7 +307,7 @@ export const updateRunTimeWorkerUsage = < } updateMeasurementStatistics( workerUsage.runTime, - workerChoiceStrategyContext?.getTaskStatisticsRequirements().runTime, + workerChoiceStrategiesContext?.getTaskStatisticsRequirements().runTime, message.taskPerformance?.runTime ?? 0 ) } @@ -371,8 +317,8 @@ export const updateEluWorkerUsage = < Data = unknown, Response = unknown >( - workerChoiceStrategyContext: - | WorkerChoiceStrategyContext + workerChoiceStrategiesContext: + | WorkerChoiceStrategiesContext | undefined, workerUsage: WorkerUsage, message: MessageValue @@ -381,7 +327,7 @@ export const updateEluWorkerUsage = < return } const eluTaskStatisticsRequirements = - workerChoiceStrategyContext?.getTaskStatisticsRequirements().elu + workerChoiceStrategiesContext?.getTaskStatisticsRequirements().elu updateMeasurementStatistics( workerUsage.elu.active, eluTaskStatisticsRequirements,