Commit | Line | Data |
---|---|---|
bde6b5d7 | 1 | import { existsSync } from 'node:fs' |
c3719753 JB |
2 | import cluster from 'node:cluster' |
3 | import { SHARE_ENV, Worker, type WorkerOptions } from 'node:worker_threads' | |
bfc75cca | 4 | import { average, isPlainObject, max, median, min } from '../utils' |
bde6b5d7 | 5 | import { |
bfc75cca | 6 | type MeasurementStatisticsRequirements, |
bde6b5d7 JB |
7 | WorkerChoiceStrategies, |
8 | type WorkerChoiceStrategy | |
9 | } from './selection-strategies/selection-strategies-types' | |
10 | import type { TasksQueueOptions } from './pool' | |
c3719753 JB |
11 | import { |
12 | type IWorker, | |
13 | type MeasurementStatistics, | |
14 | type WorkerNodeOptions, | |
15 | type WorkerType, | |
16 | WorkerTypes | |
17 | } from './worker' | |
bde6b5d7 JB |
18 | |
19 | export const checkFilePath = (filePath: string): void => { | |
c3719753 JB |
20 | if (filePath == null) { |
21 | throw new TypeError('The worker file path must be specified') | |
22 | } | |
23 | if (typeof filePath !== 'string') { | |
24 | throw new TypeError('The worker file path must be a string') | |
25 | } | |
bde6b5d7 JB |
26 | if (!existsSync(filePath)) { |
27 | throw new Error(`Cannot find the worker file '${filePath}'`) | |
28 | } | |
29 | } | |
30 | ||
31 | export const checkDynamicPoolSize = (min: number, max: number): void => { | |
32 | if (max == null) { | |
33 | throw new TypeError( | |
34 | 'Cannot instantiate a dynamic pool without specifying the maximum pool size' | |
35 | ) | |
36 | } else if (!Number.isSafeInteger(max)) { | |
37 | throw new TypeError( | |
38 | 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size' | |
39 | ) | |
40 | } else if (min > max) { | |
41 | throw new RangeError( | |
42 | 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size' | |
43 | ) | |
44 | } else if (max === 0) { | |
45 | throw new RangeError( | |
46 | 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero' | |
47 | ) | |
48 | } else if (min === max) { | |
49 | throw new RangeError( | |
50 | 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead' | |
51 | ) | |
52 | } | |
53 | } | |
54 | ||
55 | export const checkValidWorkerChoiceStrategy = ( | |
56 | workerChoiceStrategy: WorkerChoiceStrategy | |
57 | ): void => { | |
58 | if ( | |
59 | workerChoiceStrategy != null && | |
60 | !Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy) | |
61 | ) { | |
62 | throw new Error(`Invalid worker choice strategy '${workerChoiceStrategy}'`) | |
63 | } | |
64 | } | |
65 | ||
66 | export const checkValidTasksQueueOptions = ( | |
67 | tasksQueueOptions: TasksQueueOptions | |
68 | ): void => { | |
69 | if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) { | |
70 | throw new TypeError('Invalid tasks queue options: must be a plain object') | |
71 | } | |
72 | if ( | |
73 | tasksQueueOptions?.concurrency != null && | |
74 | !Number.isSafeInteger(tasksQueueOptions.concurrency) | |
75 | ) { | |
76 | throw new TypeError( | |
77 | 'Invalid worker node tasks concurrency: must be an integer' | |
78 | ) | |
79 | } | |
80 | if ( | |
81 | tasksQueueOptions?.concurrency != null && | |
82 | tasksQueueOptions.concurrency <= 0 | |
83 | ) { | |
84 | throw new RangeError( | |
85 | `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero` | |
86 | ) | |
87 | } | |
88 | if ( | |
89 | tasksQueueOptions?.size != null && | |
90 | !Number.isSafeInteger(tasksQueueOptions.size) | |
91 | ) { | |
92 | throw new TypeError( | |
93 | 'Invalid worker node tasks queue size: must be an integer' | |
94 | ) | |
95 | } | |
96 | if (tasksQueueOptions?.size != null && tasksQueueOptions.size <= 0) { | |
97 | throw new RangeError( | |
98 | `Invalid worker node tasks queue size: ${tasksQueueOptions.size} is a negative integer or zero` | |
99 | ) | |
100 | } | |
101 | } | |
bfc75cca | 102 | |
c3719753 JB |
103 | export const checkWorkerNodeArguments = ( |
104 | type: WorkerType, | |
105 | filePath: string, | |
106 | opts: WorkerNodeOptions | |
9a38f99e | 107 | ): void => { |
c3719753 JB |
108 | if (type == null) { |
109 | throw new TypeError('Cannot construct a worker node without a worker type') | |
110 | } | |
111 | if (!Object.values(WorkerTypes).includes(type)) { | |
112 | throw new TypeError( | |
113 | `Cannot construct a worker node with an invalid worker type '${type}'` | |
114 | ) | |
9a38f99e | 115 | } |
c3719753 JB |
116 | checkFilePath(filePath) |
117 | if (opts == null) { | |
9a38f99e | 118 | throw new TypeError( |
c3719753 | 119 | 'Cannot construct a worker node without worker node options' |
9a38f99e JB |
120 | ) |
121 | } | |
9974369e | 122 | if (!isPlainObject(opts)) { |
9a38f99e | 123 | throw new TypeError( |
c3719753 | 124 | 'Cannot construct a worker node with invalid options: must be a plain object' |
9a38f99e JB |
125 | ) |
126 | } | |
c3719753 JB |
127 | if (opts.tasksQueueBackPressureSize == null) { |
128 | throw new TypeError( | |
129 | 'Cannot construct a worker node without a tasks queue back pressure size option' | |
130 | ) | |
131 | } | |
132 | if (!Number.isSafeInteger(opts.tasksQueueBackPressureSize)) { | |
133 | throw new TypeError( | |
134 | 'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer' | |
135 | ) | |
136 | } | |
137 | if (opts.tasksQueueBackPressureSize <= 0) { | |
9a38f99e | 138 | throw new RangeError( |
c3719753 | 139 | 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer' |
9a38f99e JB |
140 | ) |
141 | } | |
142 | } | |
bfc75cca JB |
143 | |
144 | /** | |
145 | * Updates the given measurement statistics. | |
146 | * | |
147 | * @param measurementStatistics - The measurement statistics to update. | |
148 | * @param measurementRequirements - The measurement statistics requirements. | |
149 | * @param measurementValue - The measurement value. | |
150 | * @param numberOfMeasurements - The number of measurements. | |
151 | * @internal | |
152 | */ | |
153 | export const updateMeasurementStatistics = ( | |
154 | measurementStatistics: MeasurementStatistics, | |
155 | measurementRequirements: MeasurementStatisticsRequirements, | |
156 | measurementValue: number | |
157 | ): void => { | |
158 | if (measurementRequirements.aggregate) { | |
159 | measurementStatistics.aggregate = | |
160 | (measurementStatistics.aggregate ?? 0) + measurementValue | |
161 | measurementStatistics.minimum = min( | |
162 | measurementValue, | |
163 | measurementStatistics.minimum ?? Infinity | |
164 | ) | |
165 | measurementStatistics.maximum = max( | |
166 | measurementValue, | |
167 | measurementStatistics.maximum ?? -Infinity | |
168 | ) | |
169 | if ( | |
170 | (measurementRequirements.average || measurementRequirements.median) && | |
171 | measurementValue != null | |
172 | ) { | |
173 | measurementStatistics.history.push(measurementValue) | |
174 | if (measurementRequirements.average) { | |
175 | measurementStatistics.average = average(measurementStatistics.history) | |
176 | } else if (measurementStatistics.average != null) { | |
177 | delete measurementStatistics.average | |
178 | } | |
179 | if (measurementRequirements.median) { | |
180 | measurementStatistics.median = median(measurementStatistics.history) | |
181 | } else if (measurementStatistics.median != null) { | |
182 | delete measurementStatistics.median | |
183 | } | |
184 | } | |
185 | } | |
186 | } | |
c3719753 JB |
187 | |
188 | export const createWorker = <Worker extends IWorker>( | |
189 | type: WorkerType, | |
190 | filePath: string, | |
191 | opts: { env?: Record<string, unknown>, workerOptions?: WorkerOptions } | |
192 | ): Worker => { | |
193 | switch (type) { | |
194 | case WorkerTypes.thread: | |
195 | return new Worker(filePath, { | |
196 | env: SHARE_ENV, | |
197 | ...opts?.workerOptions | |
198 | }) as unknown as Worker | |
199 | case WorkerTypes.cluster: | |
200 | return cluster.fork(opts?.env) as unknown as Worker | |
201 | default: | |
202 | // eslint-disable-next-line @typescript-eslint/restrict-template-expressions | |
203 | throw new Error(`Unknown worker type '${type}'`) | |
204 | } | |
205 | } |