Commit | Line | Data |
---|---|---|
e9ed6eee | 1 | import cluster, { Worker as ClusterWorker } from 'node:cluster' |
ded253e2 JB |
2 | import { existsSync } from 'node:fs' |
3 | import { cpus } from 'node:os' | |
4 | import { env } from 'node:process' | |
e9ed6eee JB |
5 | import { |
6 | SHARE_ENV, | |
7 | Worker as ThreadWorker, | |
8 | type WorkerOptions | |
9 | } from 'node:worker_threads' | |
ded253e2 | 10 | |
d35e5717 | 11 | import type { MessageValue, Task } from '../utility-types.js' |
ded253e2 JB |
12 | import { average, isPlainObject, max, median, min } from '../utils.js' |
13 | import type { IPool, TasksQueueOptions } from './pool.js' | |
bde6b5d7 | 14 | import { |
bfc75cca | 15 | type MeasurementStatisticsRequirements, |
bde6b5d7 | 16 | WorkerChoiceStrategies, |
e9ed6eee JB |
17 | type WorkerChoiceStrategy, |
18 | type WorkerChoiceStrategyOptions | |
d35e5717 | 19 | } from './selection-strategies/selection-strategies-types.js' |
ded253e2 | 20 | import type { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context.js' |
c3719753 JB |
21 | import { |
22 | type IWorker, | |
d41a44de | 23 | type IWorkerNode, |
c3719753 JB |
24 | type MeasurementStatistics, |
25 | type WorkerNodeOptions, | |
26 | type WorkerType, | |
c329fd41 JB |
27 | WorkerTypes, |
28 | type WorkerUsage | |
d35e5717 | 29 | } from './worker.js' |
bde6b5d7 | 30 | |
e9ed6eee JB |
31 | /** |
32 | * Default measurement statistics requirements. | |
33 | */ | |
34 | export const DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS: MeasurementStatisticsRequirements = | |
35 | { | |
36 | aggregate: false, | |
37 | average: false, | |
38 | median: false | |
39 | } | |
40 | ||
32b141fd JB |
41 | export const getDefaultTasksQueueOptions = ( |
42 | poolMaxSize: number | |
43 | ): Required<TasksQueueOptions> => { | |
44 | return { | |
45 | size: Math.pow(poolMaxSize, 2), | |
46 | concurrency: 1, | |
47 | taskStealing: true, | |
48 | tasksStealingOnBackPressure: true, | |
568d0075 | 49 | tasksFinishedTimeout: 2000 |
32b141fd JB |
50 | } |
51 | } | |
52 | ||
e9ed6eee JB |
53 | export const getWorkerChoiceStrategyRetries = < |
54 | Worker extends IWorker, | |
55 | Data, | |
56 | Response | |
57 | >( | |
58 | pool: IPool<Worker, Data, Response>, | |
59 | opts?: WorkerChoiceStrategyOptions | |
60 | ): number => { | |
61 | return ( | |
62 | pool.info.maxSize + | |
63 | Object.keys(opts?.weights ?? getDefaultWeights(pool.info.maxSize)).length | |
64 | ) | |
65 | } | |
66 | ||
67 | export const buildWorkerChoiceStrategyOptions = < | |
68 | Worker extends IWorker, | |
69 | Data, | |
70 | Response | |
71 | >( | |
72 | pool: IPool<Worker, Data, Response>, | |
73 | opts?: WorkerChoiceStrategyOptions | |
74 | ): WorkerChoiceStrategyOptions => { | |
75 | opts = clone(opts ?? {}) | |
76 | opts.weights = opts.weights ?? getDefaultWeights(pool.info.maxSize) | |
77 | return { | |
78 | ...{ | |
79 | runTime: { median: false }, | |
80 | waitTime: { median: false }, | |
81 | elu: { median: false } | |
82 | }, | |
83 | ...opts | |
84 | } | |
85 | } | |
86 | ||
87 | const clone = <T>(object: T): T => { | |
88 | return structuredClone<T>(object) | |
89 | } | |
90 | ||
91 | const getDefaultWeights = ( | |
92 | poolMaxSize: number, | |
93 | defaultWorkerWeight?: number | |
94 | ): Record<number, number> => { | |
95 | defaultWorkerWeight = defaultWorkerWeight ?? getDefaultWorkerWeight() | |
96 | const weights: Record<number, number> = {} | |
97 | for (let workerNodeKey = 0; workerNodeKey < poolMaxSize; workerNodeKey++) { | |
98 | weights[workerNodeKey] = defaultWorkerWeight | |
99 | } | |
100 | return weights | |
101 | } | |
102 | ||
e0ab0e1d JB |
103 | const estimatedCpuSpeed = (): number => { |
104 | const runs = 150000000 | |
105 | const begin = performance.now() | |
106 | // eslint-disable-next-line no-empty | |
107 | for (let i = runs; i > 0; i--) {} | |
108 | const end = performance.now() | |
109 | const duration = end - begin | |
110 | return Math.trunc(runs / duration / 1000) // in MHz | |
111 | } | |
112 | ||
bbbf303c | 113 | const getDefaultWorkerWeight = (): number => { |
e0ab0e1d JB |
114 | const currentCpus = cpus() |
115 | let estCpuSpeed: number | undefined | |
116 | // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition | |
117 | if (currentCpus.every(cpu => cpu.speed == null || cpu.speed === 0)) { | |
118 | estCpuSpeed = estimatedCpuSpeed() | |
119 | } | |
e9ed6eee | 120 | let cpusCycleTimeWeight = 0 |
e0ab0e1d | 121 | for (const cpu of currentCpus) { |
e9ed6eee JB |
122 | // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition |
123 | if (cpu.speed == null || cpu.speed === 0) { | |
124 | cpu.speed = | |
e0ab0e1d JB |
125 | // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition |
126 | currentCpus.find(cpu => cpu.speed != null && cpu.speed !== 0)?.speed ?? | |
127 | estCpuSpeed ?? | |
128 | 2000 | |
e9ed6eee JB |
129 | } |
130 | // CPU estimated cycle time | |
131 | const numberOfDigits = cpu.speed.toString().length - 1 | |
132 | const cpuCycleTime = 1 / (cpu.speed / Math.pow(10, numberOfDigits)) | |
133 | cpusCycleTimeWeight += cpuCycleTime * Math.pow(10, numberOfDigits) | |
134 | } | |
e0ab0e1d | 135 | return Math.round(cpusCycleTimeWeight / currentCpus.length) |
e9ed6eee JB |
136 | } |
137 | ||
c63a35a0 | 138 | export const checkFilePath = (filePath: string | undefined): void => { |
c3719753 JB |
139 | if (filePath == null) { |
140 | throw new TypeError('The worker file path must be specified') | |
141 | } | |
142 | if (typeof filePath !== 'string') { | |
143 | throw new TypeError('The worker file path must be a string') | |
144 | } | |
bde6b5d7 JB |
145 | if (!existsSync(filePath)) { |
146 | throw new Error(`Cannot find the worker file '${filePath}'`) | |
147 | } | |
148 | } | |
149 | ||
c63a35a0 JB |
150 | export const checkDynamicPoolSize = ( |
151 | min: number, | |
152 | max: number | undefined | |
153 | ): void => { | |
bde6b5d7 JB |
154 | if (max == null) { |
155 | throw new TypeError( | |
156 | 'Cannot instantiate a dynamic pool without specifying the maximum pool size' | |
157 | ) | |
158 | } else if (!Number.isSafeInteger(max)) { | |
159 | throw new TypeError( | |
160 | 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size' | |
161 | ) | |
162 | } else if (min > max) { | |
163 | throw new RangeError( | |
164 | 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size' | |
165 | ) | |
166 | } else if (max === 0) { | |
167 | throw new RangeError( | |
168 | 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero' | |
169 | ) | |
170 | } else if (min === max) { | |
171 | throw new RangeError( | |
172 | 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead' | |
173 | ) | |
174 | } | |
175 | } | |
176 | ||
177 | export const checkValidWorkerChoiceStrategy = ( | |
c63a35a0 | 178 | workerChoiceStrategy: WorkerChoiceStrategy | undefined |
bde6b5d7 JB |
179 | ): void => { |
180 | if ( | |
181 | workerChoiceStrategy != null && | |
182 | !Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy) | |
183 | ) { | |
184 | throw new Error(`Invalid worker choice strategy '${workerChoiceStrategy}'`) | |
185 | } | |
186 | } | |
187 | ||
188 | export const checkValidTasksQueueOptions = ( | |
c63a35a0 | 189 | tasksQueueOptions: TasksQueueOptions | undefined |
bde6b5d7 JB |
190 | ): void => { |
191 | if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) { | |
192 | throw new TypeError('Invalid tasks queue options: must be a plain object') | |
193 | } | |
194 | if ( | |
195 | tasksQueueOptions?.concurrency != null && | |
196 | !Number.isSafeInteger(tasksQueueOptions.concurrency) | |
197 | ) { | |
198 | throw new TypeError( | |
199 | 'Invalid worker node tasks concurrency: must be an integer' | |
200 | ) | |
201 | } | |
202 | if ( | |
203 | tasksQueueOptions?.concurrency != null && | |
204 | tasksQueueOptions.concurrency <= 0 | |
205 | ) { | |
206 | throw new RangeError( | |
207 | `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero` | |
208 | ) | |
209 | } | |
210 | if ( | |
211 | tasksQueueOptions?.size != null && | |
212 | !Number.isSafeInteger(tasksQueueOptions.size) | |
213 | ) { | |
214 | throw new TypeError( | |
215 | 'Invalid worker node tasks queue size: must be an integer' | |
216 | ) | |
217 | } | |
218 | if (tasksQueueOptions?.size != null && tasksQueueOptions.size <= 0) { | |
219 | throw new RangeError( | |
220 | `Invalid worker node tasks queue size: ${tasksQueueOptions.size} is a negative integer or zero` | |
221 | ) | |
222 | } | |
223 | } | |
bfc75cca | 224 | |
c3719753 | 225 | export const checkWorkerNodeArguments = ( |
c63a35a0 JB |
226 | type: WorkerType | undefined, |
227 | filePath: string | undefined, | |
228 | opts: WorkerNodeOptions | undefined | |
9a38f99e | 229 | ): void => { |
c3719753 JB |
230 | if (type == null) { |
231 | throw new TypeError('Cannot construct a worker node without a worker type') | |
232 | } | |
233 | if (!Object.values(WorkerTypes).includes(type)) { | |
234 | throw new TypeError( | |
235 | `Cannot construct a worker node with an invalid worker type '${type}'` | |
236 | ) | |
9a38f99e | 237 | } |
c3719753 JB |
238 | checkFilePath(filePath) |
239 | if (opts == null) { | |
9a38f99e | 240 | throw new TypeError( |
c3719753 | 241 | 'Cannot construct a worker node without worker node options' |
9a38f99e JB |
242 | ) |
243 | } | |
9974369e | 244 | if (!isPlainObject(opts)) { |
9a38f99e | 245 | throw new TypeError( |
c3719753 | 246 | 'Cannot construct a worker node with invalid options: must be a plain object' |
9a38f99e JB |
247 | ) |
248 | } | |
c3719753 JB |
249 | if (opts.tasksQueueBackPressureSize == null) { |
250 | throw new TypeError( | |
251 | 'Cannot construct a worker node without a tasks queue back pressure size option' | |
252 | ) | |
253 | } | |
254 | if (!Number.isSafeInteger(opts.tasksQueueBackPressureSize)) { | |
255 | throw new TypeError( | |
256 | 'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer' | |
257 | ) | |
258 | } | |
259 | if (opts.tasksQueueBackPressureSize <= 0) { | |
9a38f99e | 260 | throw new RangeError( |
c3719753 | 261 | 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer' |
9a38f99e JB |
262 | ) |
263 | } | |
264 | } | |
bfc75cca JB |
265 | |
266 | /** | |
267 | * Updates the given measurement statistics. | |
268 | * | |
269 | * @param measurementStatistics - The measurement statistics to update. | |
270 | * @param measurementRequirements - The measurement statistics requirements. | |
271 | * @param measurementValue - The measurement value. | |
bfc75cca JB |
272 | * @internal |
273 | */ | |
c329fd41 | 274 | const updateMeasurementStatistics = ( |
bfc75cca | 275 | measurementStatistics: MeasurementStatistics, |
c63a35a0 JB |
276 | measurementRequirements: MeasurementStatisticsRequirements | undefined, |
277 | measurementValue: number | undefined | |
bfc75cca | 278 | ): void => { |
c63a35a0 JB |
279 | if ( |
280 | measurementRequirements != null && | |
281 | measurementValue != null && | |
282 | measurementRequirements.aggregate | |
283 | ) { | |
bfc75cca JB |
284 | measurementStatistics.aggregate = |
285 | (measurementStatistics.aggregate ?? 0) + measurementValue | |
286 | measurementStatistics.minimum = min( | |
287 | measurementValue, | |
288 | measurementStatistics.minimum ?? Infinity | |
289 | ) | |
290 | measurementStatistics.maximum = max( | |
291 | measurementValue, | |
292 | measurementStatistics.maximum ?? -Infinity | |
293 | ) | |
c63a35a0 | 294 | if (measurementRequirements.average || measurementRequirements.median) { |
bfc75cca JB |
295 | measurementStatistics.history.push(measurementValue) |
296 | if (measurementRequirements.average) { | |
297 | measurementStatistics.average = average(measurementStatistics.history) | |
298 | } else if (measurementStatistics.average != null) { | |
299 | delete measurementStatistics.average | |
300 | } | |
301 | if (measurementRequirements.median) { | |
302 | measurementStatistics.median = median(measurementStatistics.history) | |
303 | } else if (measurementStatistics.median != null) { | |
304 | delete measurementStatistics.median | |
305 | } | |
306 | } | |
307 | } | |
308 | } | |
c329fd41 JB |
309 | if (env.NODE_ENV === 'test') { |
310 | // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access | |
311 | exports.updateMeasurementStatistics = updateMeasurementStatistics | |
312 | } | |
313 | ||
314 | export const updateWaitTimeWorkerUsage = < | |
315 | Worker extends IWorker, | |
316 | Data = unknown, | |
317 | Response = unknown | |
318 | >( | |
c63a35a0 JB |
319 | workerChoiceStrategyContext: |
320 | | WorkerChoiceStrategyContext<Worker, Data, Response> | |
321 | | undefined, | |
c329fd41 JB |
322 | workerUsage: WorkerUsage, |
323 | task: Task<Data> | |
324 | ): void => { | |
325 | const timestamp = performance.now() | |
326 | const taskWaitTime = timestamp - (task.timestamp ?? timestamp) | |
327 | updateMeasurementStatistics( | |
328 | workerUsage.waitTime, | |
f4d0a470 | 329 | workerChoiceStrategyContext?.getTaskStatisticsRequirements().waitTime, |
c329fd41 JB |
330 | taskWaitTime |
331 | ) | |
332 | } | |
333 | ||
334 | export const updateTaskStatisticsWorkerUsage = <Response = unknown>( | |
335 | workerUsage: WorkerUsage, | |
336 | message: MessageValue<Response> | |
337 | ): void => { | |
338 | const workerTaskStatistics = workerUsage.tasks | |
339 | if ( | |
c63a35a0 | 340 | // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition |
c329fd41 JB |
341 | workerTaskStatistics.executing != null && |
342 | workerTaskStatistics.executing > 0 | |
343 | ) { | |
344 | --workerTaskStatistics.executing | |
345 | } | |
346 | if (message.workerError == null) { | |
347 | ++workerTaskStatistics.executed | |
348 | } else { | |
349 | ++workerTaskStatistics.failed | |
350 | } | |
351 | } | |
352 | ||
353 | export const updateRunTimeWorkerUsage = < | |
354 | Worker extends IWorker, | |
355 | Data = unknown, | |
356 | Response = unknown | |
357 | >( | |
c63a35a0 JB |
358 | workerChoiceStrategyContext: |
359 | | WorkerChoiceStrategyContext<Worker, Data, Response> | |
360 | | undefined, | |
c329fd41 JB |
361 | workerUsage: WorkerUsage, |
362 | message: MessageValue<Response> | |
363 | ): void => { | |
364 | if (message.workerError != null) { | |
365 | return | |
366 | } | |
367 | updateMeasurementStatistics( | |
368 | workerUsage.runTime, | |
f4d0a470 | 369 | workerChoiceStrategyContext?.getTaskStatisticsRequirements().runTime, |
c329fd41 JB |
370 | message.taskPerformance?.runTime ?? 0 |
371 | ) | |
372 | } | |
373 | ||
374 | export const updateEluWorkerUsage = < | |
375 | Worker extends IWorker, | |
376 | Data = unknown, | |
377 | Response = unknown | |
378 | >( | |
c63a35a0 JB |
379 | workerChoiceStrategyContext: |
380 | | WorkerChoiceStrategyContext<Worker, Data, Response> | |
381 | | undefined, | |
c329fd41 JB |
382 | workerUsage: WorkerUsage, |
383 | message: MessageValue<Response> | |
384 | ): void => { | |
385 | if (message.workerError != null) { | |
386 | return | |
387 | } | |
c63a35a0 | 388 | const eluTaskStatisticsRequirements = |
f4d0a470 | 389 | workerChoiceStrategyContext?.getTaskStatisticsRequirements().elu |
c329fd41 JB |
390 | updateMeasurementStatistics( |
391 | workerUsage.elu.active, | |
392 | eluTaskStatisticsRequirements, | |
393 | message.taskPerformance?.elu?.active ?? 0 | |
394 | ) | |
395 | updateMeasurementStatistics( | |
396 | workerUsage.elu.idle, | |
397 | eluTaskStatisticsRequirements, | |
398 | message.taskPerformance?.elu?.idle ?? 0 | |
399 | ) | |
c63a35a0 | 400 | if (eluTaskStatisticsRequirements?.aggregate === true) { |
c329fd41 JB |
401 | if (message.taskPerformance?.elu != null) { |
402 | if (workerUsage.elu.utilization != null) { | |
403 | workerUsage.elu.utilization = | |
404 | (workerUsage.elu.utilization + | |
405 | message.taskPerformance.elu.utilization) / | |
406 | 2 | |
407 | } else { | |
408 | workerUsage.elu.utilization = message.taskPerformance.elu.utilization | |
409 | } | |
410 | } | |
411 | } | |
412 | } | |
c3719753 JB |
413 | |
414 | export const createWorker = <Worker extends IWorker>( | |
415 | type: WorkerType, | |
416 | filePath: string, | |
417 | opts: { env?: Record<string, unknown>, workerOptions?: WorkerOptions } | |
418 | ): Worker => { | |
419 | switch (type) { | |
420 | case WorkerTypes.thread: | |
e9ed6eee | 421 | return new ThreadWorker(filePath, { |
c3719753 | 422 | env: SHARE_ENV, |
c63a35a0 | 423 | ...opts.workerOptions |
c3719753 JB |
424 | }) as unknown as Worker |
425 | case WorkerTypes.cluster: | |
c63a35a0 | 426 | return cluster.fork(opts.env) as unknown as Worker |
c3719753 JB |
427 | default: |
428 | // eslint-disable-next-line @typescript-eslint/restrict-template-expressions | |
429 | throw new Error(`Unknown worker type '${type}'`) | |
430 | } | |
431 | } | |
d41a44de | 432 | |
e9ed6eee JB |
433 | /** |
434 | * Returns the worker type of the given worker. | |
435 | * | |
436 | * @param worker - The worker to get the type of. | |
437 | * @returns The worker type of the given worker. | |
438 | * @internal | |
439 | */ | |
440 | export const getWorkerType = (worker: IWorker): WorkerType | undefined => { | |
441 | if (worker instanceof ThreadWorker) { | |
442 | return WorkerTypes.thread | |
443 | } else if (worker instanceof ClusterWorker) { | |
444 | return WorkerTypes.cluster | |
445 | } | |
446 | } | |
447 | ||
448 | /** | |
449 | * Returns the worker id of the given worker. | |
450 | * | |
451 | * @param worker - The worker to get the id of. | |
452 | * @returns The worker id of the given worker. | |
453 | * @internal | |
454 | */ | |
455 | export const getWorkerId = (worker: IWorker): number | undefined => { | |
456 | if (worker instanceof ThreadWorker) { | |
457 | return worker.threadId | |
458 | } else if (worker instanceof ClusterWorker) { | |
459 | return worker.id | |
460 | } | |
461 | } | |
462 | ||
d41a44de JB |
463 | export const waitWorkerNodeEvents = async < |
464 | Worker extends IWorker, | |
465 | Data = unknown | |
466 | >( | |
467 | workerNode: IWorkerNode<Worker, Data>, | |
468 | workerNodeEvent: string, | |
32b141fd JB |
469 | numberOfEventsToWait: number, |
470 | timeout: number | |
d41a44de JB |
471 | ): Promise<number> => { |
472 | return await new Promise<number>(resolve => { | |
473 | let events = 0 | |
474 | if (numberOfEventsToWait === 0) { | |
475 | resolve(events) | |
476 | return | |
477 | } | |
478 | workerNode.on(workerNodeEvent, () => { | |
479 | ++events | |
480 | if (events === numberOfEventsToWait) { | |
481 | resolve(events) | |
482 | } | |
483 | }) | |
6f3a391b | 484 | if (timeout >= 0) { |
32b141fd JB |
485 | setTimeout(() => { |
486 | resolve(events) | |
487 | }, timeout) | |
488 | } | |
d41a44de JB |
489 | }) |
490 | } |