X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;ds=sidebyside;f=src%2Fpools%2Futils.ts;h=c0e803a2f4cb5a2bd88b3350e98b3a754c4e5b51;hb=07a85ff14ba3b9c2363c775358c373c5d4d51e8f;hp=ae09e8a9f5f7a5a0b2e22a1266c84534e4329e41;hpb=c329fd41c48904770df633b6d5ea2b3d37f3eafd;p=poolifier.git diff --git a/src/pools/utils.ts b/src/pools/utils.ts index ae09e8a9..c0e803a2 100644 --- a/src/pools/utils.ts +++ b/src/pools/utils.ts @@ -2,14 +2,14 @@ import { existsSync } from 'node:fs' import cluster from 'node:cluster' import { SHARE_ENV, Worker, type WorkerOptions } from 'node:worker_threads' import { env } from 'node:process' -import { average, isPlainObject, max, median, min } from '../utils' -import type { MessageValue, Task } from '../utility-types' +import { average, isPlainObject, max, median, min } from '../utils.js' +import type { MessageValue, Task } from '../utility-types.js' import { type MeasurementStatisticsRequirements, WorkerChoiceStrategies, type WorkerChoiceStrategy -} from './selection-strategies/selection-strategies-types' -import type { TasksQueueOptions } from './pool' +} from './selection-strategies/selection-strategies-types.js' +import type { TasksQueueOptions } from './pool.js' import { type IWorker, type IWorkerNode, @@ -18,10 +18,22 @@ import { type WorkerType, WorkerTypes, type WorkerUsage -} from './worker' -import type { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context' +} from './worker.js' +import type { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context.js' -export const checkFilePath = (filePath: string): void => { +export const getDefaultTasksQueueOptions = ( + poolMaxSize: number +): Required => { + return { + size: Math.pow(poolMaxSize, 2), + concurrency: 1, + taskStealing: true, + tasksStealingOnBackPressure: true, + tasksFinishedTimeout: 2000 + } +} + +export const checkFilePath = (filePath: string | undefined): void => { if (filePath == null) { throw new TypeError('The worker file path must be specified') } @@ -33,7 +45,10 @@ export const checkFilePath = (filePath: string): void => { } } -export const checkDynamicPoolSize = (min: number, max: number): void => { +export const checkDynamicPoolSize = ( + min: number, + max: number | undefined +): void => { if (max == null) { throw new TypeError( 'Cannot instantiate a dynamic pool without specifying the maximum pool size' @@ -58,7 +73,7 @@ export const checkDynamicPoolSize = (min: number, max: number): void => { } export const checkValidWorkerChoiceStrategy = ( - workerChoiceStrategy: WorkerChoiceStrategy + workerChoiceStrategy: WorkerChoiceStrategy | undefined ): void => { if ( workerChoiceStrategy != null && @@ -69,7 +84,7 @@ export const checkValidWorkerChoiceStrategy = ( } export const checkValidTasksQueueOptions = ( - tasksQueueOptions: TasksQueueOptions + tasksQueueOptions: TasksQueueOptions | undefined ): void => { if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) { throw new TypeError('Invalid tasks queue options: must be a plain object') @@ -106,9 +121,9 @@ export const checkValidTasksQueueOptions = ( } export const checkWorkerNodeArguments = ( - type: WorkerType, - filePath: string, - opts: WorkerNodeOptions + type: WorkerType | undefined, + filePath: string | undefined, + opts: WorkerNodeOptions | undefined ): void => { if (type == null) { throw new TypeError('Cannot construct a worker node without a worker type') @@ -152,15 +167,18 @@ export const checkWorkerNodeArguments = ( * @param measurementStatistics - The measurement statistics to update. * @param measurementRequirements - The measurement statistics requirements. * @param measurementValue - The measurement value. - * @param numberOfMeasurements - The number of measurements. * @internal */ const updateMeasurementStatistics = ( measurementStatistics: MeasurementStatistics, - measurementRequirements: MeasurementStatisticsRequirements, - measurementValue: number + measurementRequirements: MeasurementStatisticsRequirements | undefined, + measurementValue: number | undefined ): void => { - if (measurementRequirements.aggregate) { + if ( + measurementRequirements != null && + measurementValue != null && + measurementRequirements.aggregate + ) { measurementStatistics.aggregate = (measurementStatistics.aggregate ?? 0) + measurementValue measurementStatistics.minimum = min( @@ -171,10 +189,7 @@ const updateMeasurementStatistics = ( measurementValue, measurementStatistics.maximum ?? -Infinity ) - if ( - (measurementRequirements.average || measurementRequirements.median) && - measurementValue != null - ) { + if (measurementRequirements.average || measurementRequirements.median) { measurementStatistics.history.push(measurementValue) if (measurementRequirements.average) { measurementStatistics.average = average(measurementStatistics.history) @@ -199,11 +214,9 @@ export const updateWaitTimeWorkerUsage = < Data = unknown, Response = unknown >( - workerChoiceStrategyContext: WorkerChoiceStrategyContext< - Worker, - Data, - Response - >, + workerChoiceStrategyContext: + | WorkerChoiceStrategyContext + | undefined, workerUsage: WorkerUsage, task: Task ): void => { @@ -211,7 +224,7 @@ export const updateWaitTimeWorkerUsage = < const taskWaitTime = timestamp - (task.timestamp ?? timestamp) updateMeasurementStatistics( workerUsage.waitTime, - workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime, + workerChoiceStrategyContext?.getTaskStatisticsRequirements()?.waitTime, taskWaitTime ) } @@ -222,6 +235,7 @@ export const updateTaskStatisticsWorkerUsage = ( ): void => { const workerTaskStatistics = workerUsage.tasks if ( + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition workerTaskStatistics.executing != null && workerTaskStatistics.executing > 0 ) { @@ -239,11 +253,9 @@ export const updateRunTimeWorkerUsage = < Data = unknown, Response = unknown >( - workerChoiceStrategyContext: WorkerChoiceStrategyContext< - Worker, - Data, - Response - >, + workerChoiceStrategyContext: + | WorkerChoiceStrategyContext + | undefined, workerUsage: WorkerUsage, message: MessageValue ): void => { @@ -252,7 +264,7 @@ export const updateRunTimeWorkerUsage = < } updateMeasurementStatistics( workerUsage.runTime, - workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime, + workerChoiceStrategyContext?.getTaskStatisticsRequirements()?.runTime, message.taskPerformance?.runTime ?? 0 ) } @@ -262,19 +274,17 @@ export const updateEluWorkerUsage = < Data = unknown, Response = unknown >( - workerChoiceStrategyContext: WorkerChoiceStrategyContext< - Worker, - Data, - Response - >, + workerChoiceStrategyContext: + | WorkerChoiceStrategyContext + | undefined, workerUsage: WorkerUsage, message: MessageValue ): void => { if (message.workerError != null) { return } - const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements = - workerChoiceStrategyContext.getTaskStatisticsRequirements().elu + const eluTaskStatisticsRequirements = + workerChoiceStrategyContext?.getTaskStatisticsRequirements()?.elu updateMeasurementStatistics( workerUsage.elu.active, eluTaskStatisticsRequirements, @@ -285,7 +295,7 @@ export const updateEluWorkerUsage = < eluTaskStatisticsRequirements, message.taskPerformance?.elu?.idle ?? 0 ) - if (eluTaskStatisticsRequirements.aggregate) { + if (eluTaskStatisticsRequirements?.aggregate === true) { if (message.taskPerformance?.elu != null) { if (workerUsage.elu.utilization != null) { workerUsage.elu.utilization = @@ -308,10 +318,10 @@ export const createWorker = ( case WorkerTypes.thread: return new Worker(filePath, { env: SHARE_ENV, - ...opts?.workerOptions + ...opts.workerOptions }) as unknown as Worker case WorkerTypes.cluster: - return cluster.fork(opts?.env) as unknown as Worker + return cluster.fork(opts.env) as unknown as Worker default: // eslint-disable-next-line @typescript-eslint/restrict-template-expressions throw new Error(`Unknown worker type '${type}'`) @@ -324,7 +334,8 @@ export const waitWorkerNodeEvents = async < >( workerNode: IWorkerNode, workerNodeEvent: string, - numberOfEventsToWait: number + numberOfEventsToWait: number, + timeout: number ): Promise => { return await new Promise(resolve => { let events = 0 @@ -338,5 +349,10 @@ export const waitWorkerNodeEvents = async < resolve(events) } }) + if (timeout >= 0) { + setTimeout(() => { + resolve(events) + }, timeout) + } }) }