Merge pull request #1250 from poolifier/combined-prs-branch
[poolifier.git] / src / pools / utils.ts
1 import { existsSync } from 'node:fs'
2 import { average, isPlainObject, max, median, min } from '../utils'
3 import {
4 type MeasurementStatisticsRequirements,
5 WorkerChoiceStrategies,
6 type WorkerChoiceStrategy
7 } from './selection-strategies/selection-strategies-types'
8 import type { TasksQueueOptions } from './pool'
9 import type { IWorker, MeasurementStatistics } from './worker'
10
11 export const checkFilePath = (filePath: string): void => {
12 if (
13 filePath == null ||
14 typeof filePath !== 'string' ||
15 (typeof filePath === 'string' && filePath.trim().length === 0)
16 ) {
17 throw new Error('Please specify a file with a worker implementation')
18 }
19 if (!existsSync(filePath)) {
20 throw new Error(`Cannot find the worker file '${filePath}'`)
21 }
22 }
23
24 export const checkDynamicPoolSize = (min: number, max: number): void => {
25 if (max == null) {
26 throw new TypeError(
27 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
28 )
29 } else if (!Number.isSafeInteger(max)) {
30 throw new TypeError(
31 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
32 )
33 } else if (min > max) {
34 throw new RangeError(
35 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
36 )
37 } else if (max === 0) {
38 throw new RangeError(
39 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
40 )
41 } else if (min === max) {
42 throw new RangeError(
43 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
44 )
45 }
46 }
47
48 export const checkValidWorkerChoiceStrategy = (
49 workerChoiceStrategy: WorkerChoiceStrategy
50 ): void => {
51 if (
52 workerChoiceStrategy != null &&
53 !Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)
54 ) {
55 throw new Error(`Invalid worker choice strategy '${workerChoiceStrategy}'`)
56 }
57 }
58
59 export const checkValidTasksQueueOptions = (
60 tasksQueueOptions: TasksQueueOptions
61 ): void => {
62 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
63 throw new TypeError('Invalid tasks queue options: must be a plain object')
64 }
65 if (
66 tasksQueueOptions?.concurrency != null &&
67 !Number.isSafeInteger(tasksQueueOptions.concurrency)
68 ) {
69 throw new TypeError(
70 'Invalid worker node tasks concurrency: must be an integer'
71 )
72 }
73 if (
74 tasksQueueOptions?.concurrency != null &&
75 tasksQueueOptions.concurrency <= 0
76 ) {
77 throw new RangeError(
78 `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero`
79 )
80 }
81 if (
82 tasksQueueOptions?.size != null &&
83 !Number.isSafeInteger(tasksQueueOptions.size)
84 ) {
85 throw new TypeError(
86 'Invalid worker node tasks queue size: must be an integer'
87 )
88 }
89 if (tasksQueueOptions?.size != null && tasksQueueOptions.size <= 0) {
90 throw new RangeError(
91 `Invalid worker node tasks queue size: ${tasksQueueOptions.size} is a negative integer or zero`
92 )
93 }
94 }
95
96 export const checkWorkerNodeArguments = <Worker extends IWorker>(
97 worker: Worker,
98 tasksQueueBackPressureSize: number
99 ): void => {
100 if (worker == null) {
101 throw new TypeError('Cannot construct a worker node without a worker')
102 }
103 if (tasksQueueBackPressureSize == null) {
104 throw new TypeError(
105 'Cannot construct a worker node without a tasks queue back pressure size'
106 )
107 }
108 if (!Number.isSafeInteger(tasksQueueBackPressureSize)) {
109 throw new TypeError(
110 'Cannot construct a worker node with a tasks queue back pressure size that is not an integer'
111 )
112 }
113 if (tasksQueueBackPressureSize <= 0) {
114 throw new RangeError(
115 'Cannot construct a worker node with a tasks queue back pressure size that is not a positive integer'
116 )
117 }
118 }
119
120 /**
121 * Updates the given measurement statistics.
122 *
123 * @param measurementStatistics - The measurement statistics to update.
124 * @param measurementRequirements - The measurement statistics requirements.
125 * @param measurementValue - The measurement value.
126 * @param numberOfMeasurements - The number of measurements.
127 * @internal
128 */
129 export const updateMeasurementStatistics = (
130 measurementStatistics: MeasurementStatistics,
131 measurementRequirements: MeasurementStatisticsRequirements,
132 measurementValue: number
133 ): void => {
134 if (measurementRequirements.aggregate) {
135 measurementStatistics.aggregate =
136 (measurementStatistics.aggregate ?? 0) + measurementValue
137 measurementStatistics.minimum = min(
138 measurementValue,
139 measurementStatistics.minimum ?? Infinity
140 )
141 measurementStatistics.maximum = max(
142 measurementValue,
143 measurementStatistics.maximum ?? -Infinity
144 )
145 if (
146 (measurementRequirements.average || measurementRequirements.median) &&
147 measurementValue != null
148 ) {
149 measurementStatistics.history.push(measurementValue)
150 if (measurementRequirements.average) {
151 measurementStatistics.average = average(measurementStatistics.history)
152 } else if (measurementStatistics.average != null) {
153 delete measurementStatistics.average
154 }
155 if (measurementRequirements.median) {
156 measurementStatistics.median = median(measurementStatistics.history)
157 } else if (measurementStatistics.median != null) {
158 delete measurementStatistics.median
159 }
160 }
161 }
162 }