X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Futils.ts;h=ff2709826076459a8ef4ee417221b333b3ca70e3;hb=eceaf36f69c97b0376194a849e77f6ea15fb8f7b;hp=c0e803a2f4cb5a2bd88b3350e98b3a754c4e5b51;hpb=c63a35a04c190989be80f9218d97e0aca739475e;p=poolifier.git diff --git a/src/pools/utils.ts b/src/pools/utils.ts index c0e803a2..ff270982 100644 --- a/src/pools/utils.ts +++ b/src/pools/utils.ts @@ -1,15 +1,22 @@ import { existsSync } from 'node:fs' -import cluster from 'node:cluster' -import { SHARE_ENV, Worker, type WorkerOptions } from 'node:worker_threads' +import cluster, { Worker as ClusterWorker } from 'node:cluster' +import { + SHARE_ENV, + Worker as ThreadWorker, + type WorkerOptions +} from 'node:worker_threads' import { env } from 'node:process' +import { randomInt } from 'node:crypto' +import { cpus } from 'node:os' import { average, isPlainObject, max, median, min } from '../utils.js' import type { MessageValue, Task } from '../utility-types.js' import { type MeasurementStatisticsRequirements, WorkerChoiceStrategies, - type WorkerChoiceStrategy + type WorkerChoiceStrategy, + type WorkerChoiceStrategyOptions } from './selection-strategies/selection-strategies-types.js' -import type { TasksQueueOptions } from './pool.js' +import type { IPool, TasksQueueOptions } from './pool.js' import { type IWorker, type IWorkerNode, @@ -21,6 +28,16 @@ import { } from './worker.js' import type { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context.js' +/** + * Default measurement statistics requirements. + */ +export const DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS: MeasurementStatisticsRequirements = + { + aggregate: false, + average: false, + median: false + } + export const getDefaultTasksQueueOptions = ( poolMaxSize: number ): Required => { @@ -33,6 +50,75 @@ export const getDefaultTasksQueueOptions = ( } } +export const getWorkerChoiceStrategyRetries = < + Worker extends IWorker, + Data, + Response +>( + pool: IPool, + opts?: WorkerChoiceStrategyOptions + ): number => { + return ( + pool.info.maxSize + + Object.keys(opts?.weights ?? getDefaultWeights(pool.info.maxSize)).length + ) +} + +export const buildWorkerChoiceStrategyOptions = < + Worker extends IWorker, + Data, + Response +>( + pool: IPool, + opts?: WorkerChoiceStrategyOptions + ): WorkerChoiceStrategyOptions => { + opts = clone(opts ?? {}) + opts.weights = opts.weights ?? getDefaultWeights(pool.info.maxSize) + return { + ...{ + runTime: { median: false }, + waitTime: { median: false }, + elu: { median: false } + }, + ...opts + } +} + +const clone = (object: T): T => { + return structuredClone(object) +} + +const getDefaultWeights = ( + poolMaxSize: number, + defaultWorkerWeight?: number +): Record => { + defaultWorkerWeight = defaultWorkerWeight ?? getDefaultWorkerWeight() + const weights: Record = {} + for (let workerNodeKey = 0; workerNodeKey < poolMaxSize; workerNodeKey++) { + weights[workerNodeKey] = defaultWorkerWeight + } + return weights +} + +const getDefaultWorkerWeight = (): number => { + const cpuSpeed = randomInt(500, 2500) + let cpusCycleTimeWeight = 0 + for (const cpu of cpus()) { + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition + if (cpu.speed == null || cpu.speed === 0) { + cpu.speed = + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition + cpus().find(cpu => cpu.speed != null && cpu.speed !== 0)?.speed ?? + cpuSpeed + } + // CPU estimated cycle time + const numberOfDigits = cpu.speed.toString().length - 1 + const cpuCycleTime = 1 / (cpu.speed / Math.pow(10, numberOfDigits)) + cpusCycleTimeWeight += cpuCycleTime * Math.pow(10, numberOfDigits) + } + return Math.round(cpusCycleTimeWeight / cpus().length) +} + export const checkFilePath = (filePath: string | undefined): void => { if (filePath == null) { throw new TypeError('The worker file path must be specified') @@ -224,7 +310,7 @@ export const updateWaitTimeWorkerUsage = < const taskWaitTime = timestamp - (task.timestamp ?? timestamp) updateMeasurementStatistics( workerUsage.waitTime, - workerChoiceStrategyContext?.getTaskStatisticsRequirements()?.waitTime, + workerChoiceStrategyContext?.getTaskStatisticsRequirements().waitTime, taskWaitTime ) } @@ -264,7 +350,7 @@ export const updateRunTimeWorkerUsage = < } updateMeasurementStatistics( workerUsage.runTime, - workerChoiceStrategyContext?.getTaskStatisticsRequirements()?.runTime, + workerChoiceStrategyContext?.getTaskStatisticsRequirements().runTime, message.taskPerformance?.runTime ?? 0 ) } @@ -284,7 +370,7 @@ export const updateEluWorkerUsage = < return } const eluTaskStatisticsRequirements = - workerChoiceStrategyContext?.getTaskStatisticsRequirements()?.elu + workerChoiceStrategyContext?.getTaskStatisticsRequirements().elu updateMeasurementStatistics( workerUsage.elu.active, eluTaskStatisticsRequirements, @@ -316,7 +402,7 @@ export const createWorker = ( ): Worker => { switch (type) { case WorkerTypes.thread: - return new Worker(filePath, { + return new ThreadWorker(filePath, { env: SHARE_ENV, ...opts.workerOptions }) as unknown as Worker @@ -328,6 +414,36 @@ export const createWorker = ( } } +/** + * Returns the worker type of the given worker. + * + * @param worker - The worker to get the type of. + * @returns The worker type of the given worker. + * @internal + */ +export const getWorkerType = (worker: IWorker): WorkerType | undefined => { + if (worker instanceof ThreadWorker) { + return WorkerTypes.thread + } else if (worker instanceof ClusterWorker) { + return WorkerTypes.cluster + } +} + +/** + * Returns the worker id of the given worker. + * + * @param worker - The worker to get the id of. + * @returns The worker id of the given worker. + * @internal + */ +export const getWorkerId = (worker: IWorker): number | undefined => { + if (worker instanceof ThreadWorker) { + return worker.threadId + } else if (worker instanceof ClusterWorker) { + return worker.id + } +} + export const waitWorkerNodeEvents = async < Worker extends IWorker, Data = unknown