Commit | Line | Data |
---|---|---|
bde6b5d7 | 1 | import { existsSync } from 'node:fs' |
c3719753 JB |
2 | import cluster from 'node:cluster' |
3 | import { SHARE_ENV, Worker, type WorkerOptions } from 'node:worker_threads' | |
c329fd41 | 4 | import { env } from 'node:process' |
d35e5717 JB |
5 | import { average, isPlainObject, max, median, min } from '../utils.js' |
6 | import type { MessageValue, Task } from '../utility-types.js' | |
bde6b5d7 | 7 | import { |
bfc75cca | 8 | type MeasurementStatisticsRequirements, |
bde6b5d7 JB |
9 | WorkerChoiceStrategies, |
10 | type WorkerChoiceStrategy | |
d35e5717 JB |
11 | } from './selection-strategies/selection-strategies-types.js' |
12 | import type { TasksQueueOptions } from './pool.js' | |
c3719753 JB |
13 | import { |
14 | type IWorker, | |
d41a44de | 15 | type IWorkerNode, |
c3719753 JB |
16 | type MeasurementStatistics, |
17 | type WorkerNodeOptions, | |
18 | type WorkerType, | |
c329fd41 JB |
19 | WorkerTypes, |
20 | type WorkerUsage | |
d35e5717 JB |
21 | } from './worker.js' |
22 | import type { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context.js' | |
bde6b5d7 | 23 | |
32b141fd JB |
24 | export const getDefaultTasksQueueOptions = ( |
25 | poolMaxSize: number | |
26 | ): Required<TasksQueueOptions> => { | |
27 | return { | |
28 | size: Math.pow(poolMaxSize, 2), | |
29 | concurrency: 1, | |
30 | taskStealing: true, | |
31 | tasksStealingOnBackPressure: true, | |
568d0075 | 32 | tasksFinishedTimeout: 2000 |
32b141fd JB |
33 | } |
34 | } | |
35 | ||
c63a35a0 | 36 | export const checkFilePath = (filePath: string | undefined): void => { |
c3719753 JB |
37 | if (filePath == null) { |
38 | throw new TypeError('The worker file path must be specified') | |
39 | } | |
40 | if (typeof filePath !== 'string') { | |
41 | throw new TypeError('The worker file path must be a string') | |
42 | } | |
bde6b5d7 JB |
43 | if (!existsSync(filePath)) { |
44 | throw new Error(`Cannot find the worker file '${filePath}'`) | |
45 | } | |
46 | } | |
47 | ||
c63a35a0 JB |
48 | export const checkDynamicPoolSize = ( |
49 | min: number, | |
50 | max: number | undefined | |
51 | ): void => { | |
bde6b5d7 JB |
52 | if (max == null) { |
53 | throw new TypeError( | |
54 | 'Cannot instantiate a dynamic pool without specifying the maximum pool size' | |
55 | ) | |
56 | } else if (!Number.isSafeInteger(max)) { | |
57 | throw new TypeError( | |
58 | 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size' | |
59 | ) | |
60 | } else if (min > max) { | |
61 | throw new RangeError( | |
62 | 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size' | |
63 | ) | |
64 | } else if (max === 0) { | |
65 | throw new RangeError( | |
66 | 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero' | |
67 | ) | |
68 | } else if (min === max) { | |
69 | throw new RangeError( | |
70 | 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead' | |
71 | ) | |
72 | } | |
73 | } | |
74 | ||
75 | export const checkValidWorkerChoiceStrategy = ( | |
c63a35a0 | 76 | workerChoiceStrategy: WorkerChoiceStrategy | undefined |
bde6b5d7 JB |
77 | ): void => { |
78 | if ( | |
79 | workerChoiceStrategy != null && | |
80 | !Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy) | |
81 | ) { | |
82 | throw new Error(`Invalid worker choice strategy '${workerChoiceStrategy}'`) | |
83 | } | |
84 | } | |
85 | ||
86 | export const checkValidTasksQueueOptions = ( | |
c63a35a0 | 87 | tasksQueueOptions: TasksQueueOptions | undefined |
bde6b5d7 JB |
88 | ): void => { |
89 | if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) { | |
90 | throw new TypeError('Invalid tasks queue options: must be a plain object') | |
91 | } | |
92 | if ( | |
93 | tasksQueueOptions?.concurrency != null && | |
94 | !Number.isSafeInteger(tasksQueueOptions.concurrency) | |
95 | ) { | |
96 | throw new TypeError( | |
97 | 'Invalid worker node tasks concurrency: must be an integer' | |
98 | ) | |
99 | } | |
100 | if ( | |
101 | tasksQueueOptions?.concurrency != null && | |
102 | tasksQueueOptions.concurrency <= 0 | |
103 | ) { | |
104 | throw new RangeError( | |
105 | `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero` | |
106 | ) | |
107 | } | |
108 | if ( | |
109 | tasksQueueOptions?.size != null && | |
110 | !Number.isSafeInteger(tasksQueueOptions.size) | |
111 | ) { | |
112 | throw new TypeError( | |
113 | 'Invalid worker node tasks queue size: must be an integer' | |
114 | ) | |
115 | } | |
116 | if (tasksQueueOptions?.size != null && tasksQueueOptions.size <= 0) { | |
117 | throw new RangeError( | |
118 | `Invalid worker node tasks queue size: ${tasksQueueOptions.size} is a negative integer or zero` | |
119 | ) | |
120 | } | |
121 | } | |
bfc75cca | 122 | |
c3719753 | 123 | export const checkWorkerNodeArguments = ( |
c63a35a0 JB |
124 | type: WorkerType | undefined, |
125 | filePath: string | undefined, | |
126 | opts: WorkerNodeOptions | undefined | |
9a38f99e | 127 | ): void => { |
c3719753 JB |
128 | if (type == null) { |
129 | throw new TypeError('Cannot construct a worker node without a worker type') | |
130 | } | |
131 | if (!Object.values(WorkerTypes).includes(type)) { | |
132 | throw new TypeError( | |
133 | `Cannot construct a worker node with an invalid worker type '${type}'` | |
134 | ) | |
9a38f99e | 135 | } |
c3719753 JB |
136 | checkFilePath(filePath) |
137 | if (opts == null) { | |
9a38f99e | 138 | throw new TypeError( |
c3719753 | 139 | 'Cannot construct a worker node without worker node options' |
9a38f99e JB |
140 | ) |
141 | } | |
9974369e | 142 | if (!isPlainObject(opts)) { |
9a38f99e | 143 | throw new TypeError( |
c3719753 | 144 | 'Cannot construct a worker node with invalid options: must be a plain object' |
9a38f99e JB |
145 | ) |
146 | } | |
c3719753 JB |
147 | if (opts.tasksQueueBackPressureSize == null) { |
148 | throw new TypeError( | |
149 | 'Cannot construct a worker node without a tasks queue back pressure size option' | |
150 | ) | |
151 | } | |
152 | if (!Number.isSafeInteger(opts.tasksQueueBackPressureSize)) { | |
153 | throw new TypeError( | |
154 | 'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer' | |
155 | ) | |
156 | } | |
157 | if (opts.tasksQueueBackPressureSize <= 0) { | |
9a38f99e | 158 | throw new RangeError( |
c3719753 | 159 | 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer' |
9a38f99e JB |
160 | ) |
161 | } | |
162 | } | |
bfc75cca JB |
163 | |
164 | /** | |
165 | * Updates the given measurement statistics. | |
166 | * | |
167 | * @param measurementStatistics - The measurement statistics to update. | |
168 | * @param measurementRequirements - The measurement statistics requirements. | |
169 | * @param measurementValue - The measurement value. | |
bfc75cca JB |
170 | * @internal |
171 | */ | |
c329fd41 | 172 | const updateMeasurementStatistics = ( |
bfc75cca | 173 | measurementStatistics: MeasurementStatistics, |
c63a35a0 JB |
174 | measurementRequirements: MeasurementStatisticsRequirements | undefined, |
175 | measurementValue: number | undefined | |
bfc75cca | 176 | ): void => { |
c63a35a0 JB |
177 | if ( |
178 | measurementRequirements != null && | |
179 | measurementValue != null && | |
180 | measurementRequirements.aggregate | |
181 | ) { | |
bfc75cca JB |
182 | measurementStatistics.aggregate = |
183 | (measurementStatistics.aggregate ?? 0) + measurementValue | |
184 | measurementStatistics.minimum = min( | |
185 | measurementValue, | |
186 | measurementStatistics.minimum ?? Infinity | |
187 | ) | |
188 | measurementStatistics.maximum = max( | |
189 | measurementValue, | |
190 | measurementStatistics.maximum ?? -Infinity | |
191 | ) | |
c63a35a0 | 192 | if (measurementRequirements.average || measurementRequirements.median) { |
bfc75cca JB |
193 | measurementStatistics.history.push(measurementValue) |
194 | if (measurementRequirements.average) { | |
195 | measurementStatistics.average = average(measurementStatistics.history) | |
196 | } else if (measurementStatistics.average != null) { | |
197 | delete measurementStatistics.average | |
198 | } | |
199 | if (measurementRequirements.median) { | |
200 | measurementStatistics.median = median(measurementStatistics.history) | |
201 | } else if (measurementStatistics.median != null) { | |
202 | delete measurementStatistics.median | |
203 | } | |
204 | } | |
205 | } | |
206 | } | |
c329fd41 JB |
207 | if (env.NODE_ENV === 'test') { |
208 | // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access | |
209 | exports.updateMeasurementStatistics = updateMeasurementStatistics | |
210 | } | |
211 | ||
212 | export const updateWaitTimeWorkerUsage = < | |
213 | Worker extends IWorker, | |
214 | Data = unknown, | |
215 | Response = unknown | |
216 | >( | |
c63a35a0 JB |
217 | workerChoiceStrategyContext: |
218 | | WorkerChoiceStrategyContext<Worker, Data, Response> | |
219 | | undefined, | |
c329fd41 JB |
220 | workerUsage: WorkerUsage, |
221 | task: Task<Data> | |
222 | ): void => { | |
223 | const timestamp = performance.now() | |
224 | const taskWaitTime = timestamp - (task.timestamp ?? timestamp) | |
225 | updateMeasurementStatistics( | |
226 | workerUsage.waitTime, | |
c63a35a0 | 227 | workerChoiceStrategyContext?.getTaskStatisticsRequirements()?.waitTime, |
c329fd41 JB |
228 | taskWaitTime |
229 | ) | |
230 | } | |
231 | ||
232 | export const updateTaskStatisticsWorkerUsage = <Response = unknown>( | |
233 | workerUsage: WorkerUsage, | |
234 | message: MessageValue<Response> | |
235 | ): void => { | |
236 | const workerTaskStatistics = workerUsage.tasks | |
237 | if ( | |
c63a35a0 | 238 | // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition |
c329fd41 JB |
239 | workerTaskStatistics.executing != null && |
240 | workerTaskStatistics.executing > 0 | |
241 | ) { | |
242 | --workerTaskStatistics.executing | |
243 | } | |
244 | if (message.workerError == null) { | |
245 | ++workerTaskStatistics.executed | |
246 | } else { | |
247 | ++workerTaskStatistics.failed | |
248 | } | |
249 | } | |
250 | ||
251 | export const updateRunTimeWorkerUsage = < | |
252 | Worker extends IWorker, | |
253 | Data = unknown, | |
254 | Response = unknown | |
255 | >( | |
c63a35a0 JB |
256 | workerChoiceStrategyContext: |
257 | | WorkerChoiceStrategyContext<Worker, Data, Response> | |
258 | | undefined, | |
c329fd41 JB |
259 | workerUsage: WorkerUsage, |
260 | message: MessageValue<Response> | |
261 | ): void => { | |
262 | if (message.workerError != null) { | |
263 | return | |
264 | } | |
265 | updateMeasurementStatistics( | |
266 | workerUsage.runTime, | |
c63a35a0 | 267 | workerChoiceStrategyContext?.getTaskStatisticsRequirements()?.runTime, |
c329fd41 JB |
268 | message.taskPerformance?.runTime ?? 0 |
269 | ) | |
270 | } | |
271 | ||
272 | export const updateEluWorkerUsage = < | |
273 | Worker extends IWorker, | |
274 | Data = unknown, | |
275 | Response = unknown | |
276 | >( | |
c63a35a0 JB |
277 | workerChoiceStrategyContext: |
278 | | WorkerChoiceStrategyContext<Worker, Data, Response> | |
279 | | undefined, | |
c329fd41 JB |
280 | workerUsage: WorkerUsage, |
281 | message: MessageValue<Response> | |
282 | ): void => { | |
283 | if (message.workerError != null) { | |
284 | return | |
285 | } | |
c63a35a0 JB |
286 | const eluTaskStatisticsRequirements = |
287 | workerChoiceStrategyContext?.getTaskStatisticsRequirements()?.elu | |
c329fd41 JB |
288 | updateMeasurementStatistics( |
289 | workerUsage.elu.active, | |
290 | eluTaskStatisticsRequirements, | |
291 | message.taskPerformance?.elu?.active ?? 0 | |
292 | ) | |
293 | updateMeasurementStatistics( | |
294 | workerUsage.elu.idle, | |
295 | eluTaskStatisticsRequirements, | |
296 | message.taskPerformance?.elu?.idle ?? 0 | |
297 | ) | |
c63a35a0 | 298 | if (eluTaskStatisticsRequirements?.aggregate === true) { |
c329fd41 JB |
299 | if (message.taskPerformance?.elu != null) { |
300 | if (workerUsage.elu.utilization != null) { | |
301 | workerUsage.elu.utilization = | |
302 | (workerUsage.elu.utilization + | |
303 | message.taskPerformance.elu.utilization) / | |
304 | 2 | |
305 | } else { | |
306 | workerUsage.elu.utilization = message.taskPerformance.elu.utilization | |
307 | } | |
308 | } | |
309 | } | |
310 | } | |
c3719753 JB |
311 | |
312 | export const createWorker = <Worker extends IWorker>( | |
313 | type: WorkerType, | |
314 | filePath: string, | |
315 | opts: { env?: Record<string, unknown>, workerOptions?: WorkerOptions } | |
316 | ): Worker => { | |
317 | switch (type) { | |
318 | case WorkerTypes.thread: | |
319 | return new Worker(filePath, { | |
320 | env: SHARE_ENV, | |
c63a35a0 | 321 | ...opts.workerOptions |
c3719753 JB |
322 | }) as unknown as Worker |
323 | case WorkerTypes.cluster: | |
c63a35a0 | 324 | return cluster.fork(opts.env) as unknown as Worker |
c3719753 JB |
325 | default: |
326 | // eslint-disable-next-line @typescript-eslint/restrict-template-expressions | |
327 | throw new Error(`Unknown worker type '${type}'`) | |
328 | } | |
329 | } | |
d41a44de JB |
330 | |
331 | export const waitWorkerNodeEvents = async < | |
332 | Worker extends IWorker, | |
333 | Data = unknown | |
334 | >( | |
335 | workerNode: IWorkerNode<Worker, Data>, | |
336 | workerNodeEvent: string, | |
32b141fd JB |
337 | numberOfEventsToWait: number, |
338 | timeout: number | |
d41a44de JB |
339 | ): Promise<number> => { |
340 | return await new Promise<number>(resolve => { | |
341 | let events = 0 | |
342 | if (numberOfEventsToWait === 0) { | |
343 | resolve(events) | |
344 | return | |
345 | } | |
346 | workerNode.on(workerNodeEvent, () => { | |
347 | ++events | |
348 | if (events === numberOfEventsToWait) { | |
349 | resolve(events) | |
350 | } | |
351 | }) | |
6f3a391b | 352 | if (timeout >= 0) { |
32b141fd JB |
353 | setTimeout(() => { |
354 | resolve(events) | |
355 | }, timeout) | |
356 | } | |
d41a44de JB |
357 | }) |
358 | } |