X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Futils.ts;h=99584d8528d65f5538d3f89e309a6dd77d4fde79;hb=28883f84b5381bb9af4c71e3bccd5297f6ab5fcf;hp=5e21c38c0af657dab99aca255e80f0e39888b6a6;hpb=d41a44de8cc111add35f7daa7834e23055bce558;p=poolifier.git diff --git a/src/pools/utils.ts b/src/pools/utils.ts index 5e21c38c..99584d85 100644 --- a/src/pools/utils.ts +++ b/src/pools/utils.ts @@ -1,21 +1,37 @@ import { existsSync } from 'node:fs' import cluster from 'node:cluster' import { SHARE_ENV, Worker, type WorkerOptions } from 'node:worker_threads' -import { average, isPlainObject, max, median, min } from '../utils' +import { env } from 'node:process' +import { average, isPlainObject, max, median, min } from '../utils.js' +import type { MessageValue, Task } from '../utility-types.js' import { type MeasurementStatisticsRequirements, WorkerChoiceStrategies, type WorkerChoiceStrategy -} from './selection-strategies/selection-strategies-types' -import type { TasksQueueOptions } from './pool' +} from './selection-strategies/selection-strategies-types.js' +import type { TasksQueueOptions } from './pool.js' import { type IWorker, type IWorkerNode, type MeasurementStatistics, type WorkerNodeOptions, type WorkerType, - WorkerTypes -} from './worker' + WorkerTypes, + type WorkerUsage +} from './worker.js' +import type { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context.js' + +export const getDefaultTasksQueueOptions = ( + poolMaxSize: number +): Required => { + return { + size: Math.pow(poolMaxSize, 2), + concurrency: 1, + taskStealing: true, + tasksStealingOnBackPressure: true, + tasksFinishedTimeout: 2000 + } +} export const checkFilePath = (filePath: string): void => { if (filePath == null) { @@ -148,10 +164,9 @@ export const checkWorkerNodeArguments = ( * @param measurementStatistics - The measurement statistics to update. * @param measurementRequirements - The measurement statistics requirements. * @param measurementValue - The measurement value. - * @param numberOfMeasurements - The number of measurements. * @internal */ -export const updateMeasurementStatistics = ( +const updateMeasurementStatistics = ( measurementStatistics: MeasurementStatistics, measurementRequirements: MeasurementStatisticsRequirements, measurementValue: number @@ -185,6 +200,115 @@ export const updateMeasurementStatistics = ( } } } +if (env.NODE_ENV === 'test') { + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + exports.updateMeasurementStatistics = updateMeasurementStatistics +} + +export const updateWaitTimeWorkerUsage = < + Worker extends IWorker, + Data = unknown, + Response = unknown +>( + workerChoiceStrategyContext: WorkerChoiceStrategyContext< + Worker, + Data, + Response + >, + workerUsage: WorkerUsage, + task: Task + ): void => { + const timestamp = performance.now() + const taskWaitTime = timestamp - (task.timestamp ?? timestamp) + updateMeasurementStatistics( + workerUsage.waitTime, + workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime, + taskWaitTime + ) +} + +export const updateTaskStatisticsWorkerUsage = ( + workerUsage: WorkerUsage, + message: MessageValue +): void => { + const workerTaskStatistics = workerUsage.tasks + if ( + workerTaskStatistics.executing != null && + workerTaskStatistics.executing > 0 + ) { + --workerTaskStatistics.executing + } + if (message.workerError == null) { + ++workerTaskStatistics.executed + } else { + ++workerTaskStatistics.failed + } +} + +export const updateRunTimeWorkerUsage = < + Worker extends IWorker, + Data = unknown, + Response = unknown +>( + workerChoiceStrategyContext: WorkerChoiceStrategyContext< + Worker, + Data, + Response + >, + workerUsage: WorkerUsage, + message: MessageValue + ): void => { + if (message.workerError != null) { + return + } + updateMeasurementStatistics( + workerUsage.runTime, + workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime, + message.taskPerformance?.runTime ?? 0 + ) +} + +export const updateEluWorkerUsage = < + Worker extends IWorker, + Data = unknown, + Response = unknown +>( + workerChoiceStrategyContext: WorkerChoiceStrategyContext< + Worker, + Data, + Response + >, + workerUsage: WorkerUsage, + message: MessageValue + ): void => { + if (message.workerError != null) { + return + } + const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements = + workerChoiceStrategyContext.getTaskStatisticsRequirements().elu + updateMeasurementStatistics( + workerUsage.elu.active, + eluTaskStatisticsRequirements, + message.taskPerformance?.elu?.active ?? 0 + ) + updateMeasurementStatistics( + workerUsage.elu.idle, + eluTaskStatisticsRequirements, + message.taskPerformance?.elu?.idle ?? 0 + ) + if (eluTaskStatisticsRequirements.aggregate) { + if (message.taskPerformance?.elu != null) { + if (workerUsage.elu.utilization != null) { + workerUsage.elu.utilization = + (workerUsage.elu.utilization + + message.taskPerformance.elu.utilization) / + 2 + } else { + workerUsage.elu.utilization = message.taskPerformance.elu.utilization + } + } + } +} export const createWorker = ( type: WorkerType, @@ -211,7 +335,8 @@ export const waitWorkerNodeEvents = async < >( workerNode: IWorkerNode, workerNodeEvent: string, - numberOfEventsToWait: number + numberOfEventsToWait: number, + timeout: number ): Promise => { return await new Promise(resolve => { let events = 0 @@ -225,5 +350,10 @@ export const waitWorkerNodeEvents = async < resolve(events) } }) + if (timeout >= 0) { + setTimeout(() => { + resolve(events) + }, timeout) + } }) }