X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Futils.ts;h=99584d8528d65f5538d3f89e309a6dd77d4fde79;hb=4302d959ff7e8eb6f6bfcecf1e911fc627e98a9c;hp=ae09e8a9f5f7a5a0b2e22a1266c84534e4329e41;hpb=c329fd41c48904770df633b6d5ea2b3d37f3eafd;p=poolifier.git diff --git a/src/pools/utils.ts b/src/pools/utils.ts index ae09e8a9..99584d85 100644 --- a/src/pools/utils.ts +++ b/src/pools/utils.ts @@ -2,14 +2,14 @@ import { existsSync } from 'node:fs' import cluster from 'node:cluster' import { SHARE_ENV, Worker, type WorkerOptions } from 'node:worker_threads' import { env } from 'node:process' -import { average, isPlainObject, max, median, min } from '../utils' -import type { MessageValue, Task } from '../utility-types' +import { average, isPlainObject, max, median, min } from '../utils.js' +import type { MessageValue, Task } from '../utility-types.js' import { type MeasurementStatisticsRequirements, WorkerChoiceStrategies, type WorkerChoiceStrategy -} from './selection-strategies/selection-strategies-types' -import type { TasksQueueOptions } from './pool' +} from './selection-strategies/selection-strategies-types.js' +import type { TasksQueueOptions } from './pool.js' import { type IWorker, type IWorkerNode, @@ -18,8 +18,20 @@ import { type WorkerType, WorkerTypes, type WorkerUsage -} from './worker' -import type { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context' +} from './worker.js' +import type { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context.js' + +export const getDefaultTasksQueueOptions = ( + poolMaxSize: number +): Required => { + return { + size: Math.pow(poolMaxSize, 2), + concurrency: 1, + taskStealing: true, + tasksStealingOnBackPressure: true, + tasksFinishedTimeout: 2000 + } +} export const checkFilePath = (filePath: string): void => { if (filePath == null) { @@ -152,7 +164,6 @@ export const checkWorkerNodeArguments = ( * @param measurementStatistics - The measurement statistics to update. * @param measurementRequirements - The measurement statistics requirements. * @param measurementValue - The measurement value. - * @param numberOfMeasurements - The number of measurements. * @internal */ const updateMeasurementStatistics = ( @@ -324,7 +335,8 @@ export const waitWorkerNodeEvents = async < >( workerNode: IWorkerNode, workerNodeEvent: string, - numberOfEventsToWait: number + numberOfEventsToWait: number, + timeout: number ): Promise => { return await new Promise(resolve => { let events = 0 @@ -338,5 +350,10 @@ export const waitWorkerNodeEvents = async < resolve(events) } }) + if (timeout >= 0) { + setTimeout(() => { + resolve(events) + }, timeout) + } }) }