X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Futils.ts;h=c3ca698cac24da378dddf9451031a304b1a5c5a3;hb=2ef26de4e268829f5d0b591c135a44f4d16ae5bb;hp=6a6c1fa9d3aa9d14946d760e09301ad878ce901a;hpb=3e931141fe4cbbb1221697a1ee4fd26f4c419c82;p=poolifier.git diff --git a/src/pools/utils.ts b/src/pools/utils.ts index 6a6c1fa9..c3ca698c 100644 --- a/src/pools/utils.ts +++ b/src/pools/utils.ts @@ -43,7 +43,7 @@ export const getDefaultTasksQueueOptions = ( size: Math.pow(poolMaxSize, 2), concurrency: 1, taskStealing: true, - tasksStealingOnBackPressure: true, + tasksStealingOnBackPressure: false, tasksFinishedTimeout: 2000 } } @@ -87,6 +87,19 @@ export const checkDynamicPoolSize = ( } } +export const checkValidPriority = (priority: number | undefined): void => { + if (priority != null && !Number.isSafeInteger(priority)) { + throw new TypeError(`Invalid property 'priority': '${priority}'`) + } + if ( + priority != null && + Number.isSafeInteger(priority) && + (priority < -20 || priority > 19) + ) { + throw new RangeError("Property 'priority' must be between -20 and 19") + } +} + export const checkValidWorkerChoiceStrategy = ( workerChoiceStrategy: WorkerChoiceStrategy | undefined ): void => { @@ -174,6 +187,21 @@ export const checkWorkerNodeArguments = ( 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer' ) } + if (opts.tasksQueueBucketSize == null) { + throw new TypeError( + 'Cannot construct a worker node without a tasks queue bucket size option' + ) + } + if (!Number.isSafeInteger(opts.tasksQueueBucketSize)) { + throw new TypeError( + 'Cannot construct a worker node with a tasks queue bucket size option that is not an integer' + ) + } + if (opts.tasksQueueBucketSize <= 0) { + throw new RangeError( + 'Cannot construct a worker node with a tasks queue bucket size option that is not a positive integer' + ) + } } /** @@ -229,7 +257,7 @@ export const updateWaitTimeWorkerUsage = < Data = unknown, Response = unknown >( - workerChoiceStrategyContext: + workerChoiceStrategiesContext: | WorkerChoiceStrategiesContext | undefined, workerUsage: WorkerUsage, @@ -239,7 +267,7 @@ export const updateWaitTimeWorkerUsage = < const taskWaitTime = timestamp - (task.timestamp ?? timestamp) updateMeasurementStatistics( workerUsage.waitTime, - workerChoiceStrategyContext?.getTaskStatisticsRequirements().waitTime, + workerChoiceStrategiesContext?.getTaskStatisticsRequirements().waitTime, taskWaitTime ) } @@ -268,7 +296,7 @@ export const updateRunTimeWorkerUsage = < Data = unknown, Response = unknown >( - workerChoiceStrategyContext: + workerChoiceStrategiesContext: | WorkerChoiceStrategiesContext | undefined, workerUsage: WorkerUsage, @@ -279,7 +307,7 @@ export const updateRunTimeWorkerUsage = < } updateMeasurementStatistics( workerUsage.runTime, - workerChoiceStrategyContext?.getTaskStatisticsRequirements().runTime, + workerChoiceStrategiesContext?.getTaskStatisticsRequirements().runTime, message.taskPerformance?.runTime ?? 0 ) } @@ -289,7 +317,7 @@ export const updateEluWorkerUsage = < Data = unknown, Response = unknown >( - workerChoiceStrategyContext: + workerChoiceStrategiesContext: | WorkerChoiceStrategiesContext | undefined, workerUsage: WorkerUsage, @@ -299,7 +327,7 @@ export const updateEluWorkerUsage = < return } const eluTaskStatisticsRequirements = - workerChoiceStrategyContext?.getTaskStatisticsRequirements().elu + workerChoiceStrategiesContext?.getTaskStatisticsRequirements().elu updateMeasurementStatistics( workerUsage.elu.active, eluTaskStatisticsRequirements, @@ -388,12 +416,20 @@ export const waitWorkerNodeEvents = async < resolve(events) return } - workerNode.on(workerNodeEvent, () => { - ++events - if (events === numberOfEventsToWait) { - resolve(events) - } - }) + switch (workerNodeEvent) { + case 'idle': + case 'backPressure': + case 'taskFinished': + workerNode.on(workerNodeEvent, () => { + ++events + if (events === numberOfEventsToWait) { + resolve(events) + } + }) + break + default: + throw new Error('Invalid worker node event') + } if (timeout >= 0) { setTimeout(() => { resolve(events)