Commit | Line | Data |
---|---|---|
e9ed6eee | 1 | import cluster, { Worker as ClusterWorker } from 'node:cluster' |
ded253e2 | 2 | import { existsSync } from 'node:fs' |
ded253e2 | 3 | import { env } from 'node:process' |
e9ed6eee JB |
4 | import { |
5 | SHARE_ENV, | |
6 | Worker as ThreadWorker, | |
3a502712 | 7 | type WorkerOptions, |
e9ed6eee | 8 | } from 'node:worker_threads' |
ded253e2 | 9 | |
d35e5717 | 10 | import type { MessageValue, Task } from '../utility-types.js' |
ded253e2 | 11 | import { average, isPlainObject, max, median, min } from '../utils.js' |
bcfb06ce | 12 | import type { TasksQueueOptions } from './pool.js' |
bde6b5d7 | 13 | import { |
bfc75cca | 14 | type MeasurementStatisticsRequirements, |
bde6b5d7 | 15 | WorkerChoiceStrategies, |
3a502712 | 16 | type WorkerChoiceStrategy, |
d35e5717 | 17 | } from './selection-strategies/selection-strategies-types.js' |
bcfb06ce | 18 | import type { WorkerChoiceStrategiesContext } from './selection-strategies/worker-choice-strategies-context.js' |
c3719753 JB |
19 | import { |
20 | type IWorker, | |
d41a44de | 21 | type IWorkerNode, |
c3719753 JB |
22 | type MeasurementStatistics, |
23 | type WorkerNodeOptions, | |
24 | type WorkerType, | |
c329fd41 | 25 | WorkerTypes, |
3a502712 | 26 | type WorkerUsage, |
d35e5717 | 27 | } from './worker.js' |
bde6b5d7 | 28 | |
e9ed6eee JB |
29 | /** |
30 | * Default measurement statistics requirements. | |
31 | */ | |
32 | export const DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS: MeasurementStatisticsRequirements = | |
33 | { | |
34 | aggregate: false, | |
35 | average: false, | |
3a502712 | 36 | median: false, |
e9ed6eee JB |
37 | } |
38 | ||
32b141fd JB |
39 | export const getDefaultTasksQueueOptions = ( |
40 | poolMaxSize: number | |
41 | ): Required<TasksQueueOptions> => { | |
42 | return { | |
43 | size: Math.pow(poolMaxSize, 2), | |
44 | concurrency: 1, | |
45 | taskStealing: true, | |
2eee7220 | 46 | tasksStealingOnBackPressure: false, |
3a502712 | 47 | tasksFinishedTimeout: 2000, |
32b141fd JB |
48 | } |
49 | } | |
50 | ||
c63a35a0 | 51 | export const checkFilePath = (filePath: string | undefined): void => { |
c3719753 JB |
52 | if (filePath == null) { |
53 | throw new TypeError('The worker file path must be specified') | |
54 | } | |
55 | if (typeof filePath !== 'string') { | |
56 | throw new TypeError('The worker file path must be a string') | |
57 | } | |
bde6b5d7 JB |
58 | if (!existsSync(filePath)) { |
59 | throw new Error(`Cannot find the worker file '${filePath}'`) | |
60 | } | |
61 | } | |
62 | ||
c63a35a0 JB |
63 | export const checkDynamicPoolSize = ( |
64 | min: number, | |
65 | max: number | undefined | |
66 | ): void => { | |
bde6b5d7 JB |
67 | if (max == null) { |
68 | throw new TypeError( | |
69 | 'Cannot instantiate a dynamic pool without specifying the maximum pool size' | |
70 | ) | |
71 | } else if (!Number.isSafeInteger(max)) { | |
72 | throw new TypeError( | |
73 | 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size' | |
74 | ) | |
75 | } else if (min > max) { | |
76 | throw new RangeError( | |
77 | 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size' | |
78 | ) | |
79 | } else if (max === 0) { | |
80 | throw new RangeError( | |
81 | 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero' | |
82 | ) | |
83 | } else if (min === max) { | |
84 | throw new RangeError( | |
85 | 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead' | |
86 | ) | |
87 | } | |
88 | } | |
89 | ||
d0bd5062 JB |
90 | export const checkValidPriority = (priority: number | undefined): void => { |
91 | if (priority != null && !Number.isSafeInteger(priority)) { | |
6e5d7052 | 92 | throw new TypeError(`Invalid property 'priority': '${priority.toString()}'`) |
d0bd5062 JB |
93 | } |
94 | if ( | |
95 | priority != null && | |
96 | Number.isSafeInteger(priority) && | |
97 | (priority < -20 || priority > 19) | |
98 | ) { | |
85bbc7ab | 99 | throw new RangeError("Property 'priority' must be between -20 and 19") |
d0bd5062 JB |
100 | } |
101 | } | |
102 | ||
bde6b5d7 | 103 | export const checkValidWorkerChoiceStrategy = ( |
c63a35a0 | 104 | workerChoiceStrategy: WorkerChoiceStrategy | undefined |
bde6b5d7 JB |
105 | ): void => { |
106 | if ( | |
107 | workerChoiceStrategy != null && | |
108 | !Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy) | |
109 | ) { | |
110 | throw new Error(`Invalid worker choice strategy '${workerChoiceStrategy}'`) | |
111 | } | |
112 | } | |
113 | ||
114 | export const checkValidTasksQueueOptions = ( | |
c63a35a0 | 115 | tasksQueueOptions: TasksQueueOptions | undefined |
bde6b5d7 JB |
116 | ): void => { |
117 | if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) { | |
118 | throw new TypeError('Invalid tasks queue options: must be a plain object') | |
119 | } | |
120 | if ( | |
121 | tasksQueueOptions?.concurrency != null && | |
122 | !Number.isSafeInteger(tasksQueueOptions.concurrency) | |
123 | ) { | |
124 | throw new TypeError( | |
125 | 'Invalid worker node tasks concurrency: must be an integer' | |
126 | ) | |
127 | } | |
128 | if ( | |
129 | tasksQueueOptions?.concurrency != null && | |
130 | tasksQueueOptions.concurrency <= 0 | |
131 | ) { | |
132 | throw new RangeError( | |
6e5d7052 | 133 | `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency.toString()} is a negative integer or zero` |
bde6b5d7 JB |
134 | ) |
135 | } | |
136 | if ( | |
137 | tasksQueueOptions?.size != null && | |
138 | !Number.isSafeInteger(tasksQueueOptions.size) | |
139 | ) { | |
140 | throw new TypeError( | |
141 | 'Invalid worker node tasks queue size: must be an integer' | |
142 | ) | |
143 | } | |
144 | if (tasksQueueOptions?.size != null && tasksQueueOptions.size <= 0) { | |
145 | throw new RangeError( | |
6e5d7052 | 146 | `Invalid worker node tasks queue size: ${tasksQueueOptions.size.toString()} is a negative integer or zero` |
bde6b5d7 JB |
147 | ) |
148 | } | |
149 | } | |
bfc75cca | 150 | |
c3719753 | 151 | export const checkWorkerNodeArguments = ( |
c63a35a0 JB |
152 | type: WorkerType | undefined, |
153 | filePath: string | undefined, | |
154 | opts: WorkerNodeOptions | undefined | |
9a38f99e | 155 | ): void => { |
c3719753 JB |
156 | if (type == null) { |
157 | throw new TypeError('Cannot construct a worker node without a worker type') | |
158 | } | |
159 | if (!Object.values(WorkerTypes).includes(type)) { | |
160 | throw new TypeError( | |
161 | `Cannot construct a worker node with an invalid worker type '${type}'` | |
162 | ) | |
9a38f99e | 163 | } |
c3719753 JB |
164 | checkFilePath(filePath) |
165 | if (opts == null) { | |
9a38f99e | 166 | throw new TypeError( |
c3719753 | 167 | 'Cannot construct a worker node without worker node options' |
9a38f99e JB |
168 | ) |
169 | } | |
9974369e | 170 | if (!isPlainObject(opts)) { |
9a38f99e | 171 | throw new TypeError( |
fcfc3353 | 172 | 'Cannot construct a worker node with invalid worker node options: must be a plain object' |
9a38f99e JB |
173 | ) |
174 | } | |
c3719753 JB |
175 | if (opts.tasksQueueBackPressureSize == null) { |
176 | throw new TypeError( | |
177 | 'Cannot construct a worker node without a tasks queue back pressure size option' | |
178 | ) | |
179 | } | |
180 | if (!Number.isSafeInteger(opts.tasksQueueBackPressureSize)) { | |
181 | throw new TypeError( | |
182 | 'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer' | |
183 | ) | |
184 | } | |
185 | if (opts.tasksQueueBackPressureSize <= 0) { | |
9a38f99e | 186 | throw new RangeError( |
c3719753 | 187 | 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer' |
9a38f99e JB |
188 | ) |
189 | } | |
59ca7cff JB |
190 | if (opts.tasksQueueBucketSize == null) { |
191 | throw new TypeError( | |
192 | 'Cannot construct a worker node without a tasks queue bucket size option' | |
193 | ) | |
194 | } | |
195 | if (!Number.isSafeInteger(opts.tasksQueueBucketSize)) { | |
196 | throw new TypeError( | |
197 | 'Cannot construct a worker node with a tasks queue bucket size option that is not an integer' | |
198 | ) | |
199 | } | |
200 | if (opts.tasksQueueBucketSize <= 0) { | |
201 | throw new RangeError( | |
202 | 'Cannot construct a worker node with a tasks queue bucket size option that is not a positive integer' | |
203 | ) | |
204 | } | |
fcfc3353 JB |
205 | if (opts.tasksQueuePriority == null) { |
206 | throw new TypeError( | |
207 | 'Cannot construct a worker node without a tasks queue priority option' | |
208 | ) | |
209 | } | |
210 | if (typeof opts.tasksQueuePriority !== 'boolean') { | |
211 | throw new TypeError( | |
212 | 'Cannot construct a worker node with a tasks queue priority option that is not a boolean' | |
213 | ) | |
214 | } | |
9a38f99e | 215 | } |
bfc75cca JB |
216 | |
217 | /** | |
218 | * Updates the given measurement statistics. | |
bfc75cca JB |
219 | * @param measurementStatistics - The measurement statistics to update. |
220 | * @param measurementRequirements - The measurement statistics requirements. | |
221 | * @param measurementValue - The measurement value. | |
bfc75cca JB |
222 | * @internal |
223 | */ | |
c329fd41 | 224 | const updateMeasurementStatistics = ( |
bfc75cca | 225 | measurementStatistics: MeasurementStatistics, |
c63a35a0 JB |
226 | measurementRequirements: MeasurementStatisticsRequirements | undefined, |
227 | measurementValue: number | undefined | |
bfc75cca | 228 | ): void => { |
c63a35a0 JB |
229 | if ( |
230 | measurementRequirements != null && | |
231 | measurementValue != null && | |
232 | measurementRequirements.aggregate | |
233 | ) { | |
bfc75cca JB |
234 | measurementStatistics.aggregate = |
235 | (measurementStatistics.aggregate ?? 0) + measurementValue | |
236 | measurementStatistics.minimum = min( | |
237 | measurementValue, | |
80115618 | 238 | measurementStatistics.minimum ?? Number.POSITIVE_INFINITY |
bfc75cca JB |
239 | ) |
240 | measurementStatistics.maximum = max( | |
241 | measurementValue, | |
80115618 | 242 | measurementStatistics.maximum ?? Number.NEGATIVE_INFINITY |
bfc75cca | 243 | ) |
c63a35a0 | 244 | if (measurementRequirements.average || measurementRequirements.median) { |
f12182ad | 245 | measurementStatistics.history.put(measurementValue) |
bfc75cca | 246 | if (measurementRequirements.average) { |
f12182ad JB |
247 | measurementStatistics.average = average( |
248 | measurementStatistics.history.toArray() | |
249 | ) | |
bfc75cca JB |
250 | } else if (measurementStatistics.average != null) { |
251 | delete measurementStatistics.average | |
252 | } | |
253 | if (measurementRequirements.median) { | |
f12182ad JB |
254 | measurementStatistics.median = median( |
255 | measurementStatistics.history.toArray() | |
256 | ) | |
bfc75cca JB |
257 | } else if (measurementStatistics.median != null) { |
258 | delete measurementStatistics.median | |
259 | } | |
260 | } | |
261 | } | |
262 | } | |
c329fd41 | 263 | if (env.NODE_ENV === 'test') { |
6e5d7052 | 264 | // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access |
c329fd41 JB |
265 | exports.updateMeasurementStatistics = updateMeasurementStatistics |
266 | } | |
267 | ||
268 | export const updateWaitTimeWorkerUsage = < | |
269 | Worker extends IWorker, | |
270 | Data = unknown, | |
271 | Response = unknown | |
272 | >( | |
5bdd0e9a | 273 | workerChoiceStrategiesContext: |
bcfb06ce | 274 | | WorkerChoiceStrategiesContext<Worker, Data, Response> |
c63a35a0 | 275 | | undefined, |
c329fd41 JB |
276 | workerUsage: WorkerUsage, |
277 | task: Task<Data> | |
278 | ): void => { | |
279 | const timestamp = performance.now() | |
280 | const taskWaitTime = timestamp - (task.timestamp ?? timestamp) | |
281 | updateMeasurementStatistics( | |
282 | workerUsage.waitTime, | |
5bdd0e9a | 283 | workerChoiceStrategiesContext?.getTaskStatisticsRequirements().waitTime, |
c329fd41 JB |
284 | taskWaitTime |
285 | ) | |
286 | } | |
287 | ||
288 | export const updateTaskStatisticsWorkerUsage = <Response = unknown>( | |
289 | workerUsage: WorkerUsage, | |
290 | message: MessageValue<Response> | |
291 | ): void => { | |
292 | const workerTaskStatistics = workerUsage.tasks | |
293 | if ( | |
6e5d7052 | 294 | // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition |
c329fd41 JB |
295 | workerTaskStatistics.executing != null && |
296 | workerTaskStatistics.executing > 0 | |
297 | ) { | |
298 | --workerTaskStatistics.executing | |
299 | } | |
300 | if (message.workerError == null) { | |
301 | ++workerTaskStatistics.executed | |
302 | } else { | |
303 | ++workerTaskStatistics.failed | |
304 | } | |
305 | } | |
306 | ||
307 | export const updateRunTimeWorkerUsage = < | |
308 | Worker extends IWorker, | |
309 | Data = unknown, | |
310 | Response = unknown | |
311 | >( | |
5bdd0e9a | 312 | workerChoiceStrategiesContext: |
bcfb06ce | 313 | | WorkerChoiceStrategiesContext<Worker, Data, Response> |
c63a35a0 | 314 | | undefined, |
c329fd41 JB |
315 | workerUsage: WorkerUsage, |
316 | message: MessageValue<Response> | |
317 | ): void => { | |
318 | if (message.workerError != null) { | |
319 | return | |
320 | } | |
321 | updateMeasurementStatistics( | |
322 | workerUsage.runTime, | |
5bdd0e9a | 323 | workerChoiceStrategiesContext?.getTaskStatisticsRequirements().runTime, |
c329fd41 JB |
324 | message.taskPerformance?.runTime ?? 0 |
325 | ) | |
326 | } | |
327 | ||
328 | export const updateEluWorkerUsage = < | |
329 | Worker extends IWorker, | |
330 | Data = unknown, | |
331 | Response = unknown | |
332 | >( | |
5bdd0e9a | 333 | workerChoiceStrategiesContext: |
bcfb06ce | 334 | | WorkerChoiceStrategiesContext<Worker, Data, Response> |
c63a35a0 | 335 | | undefined, |
c329fd41 JB |
336 | workerUsage: WorkerUsage, |
337 | message: MessageValue<Response> | |
338 | ): void => { | |
339 | if (message.workerError != null) { | |
340 | return | |
341 | } | |
c63a35a0 | 342 | const eluTaskStatisticsRequirements = |
5bdd0e9a | 343 | workerChoiceStrategiesContext?.getTaskStatisticsRequirements().elu |
c329fd41 JB |
344 | updateMeasurementStatistics( |
345 | workerUsage.elu.active, | |
346 | eluTaskStatisticsRequirements, | |
347 | message.taskPerformance?.elu?.active ?? 0 | |
348 | ) | |
349 | updateMeasurementStatistics( | |
350 | workerUsage.elu.idle, | |
351 | eluTaskStatisticsRequirements, | |
352 | message.taskPerformance?.elu?.idle ?? 0 | |
353 | ) | |
c63a35a0 | 354 | if (eluTaskStatisticsRequirements?.aggregate === true) { |
c329fd41 JB |
355 | if (message.taskPerformance?.elu != null) { |
356 | if (workerUsage.elu.utilization != null) { | |
357 | workerUsage.elu.utilization = | |
358 | (workerUsage.elu.utilization + | |
359 | message.taskPerformance.elu.utilization) / | |
360 | 2 | |
361 | } else { | |
362 | workerUsage.elu.utilization = message.taskPerformance.elu.utilization | |
363 | } | |
364 | } | |
365 | } | |
366 | } | |
c3719753 JB |
367 | |
368 | export const createWorker = <Worker extends IWorker>( | |
369 | type: WorkerType, | |
370 | filePath: string, | |
3a502712 | 371 | opts: { env?: Record<string, unknown>; workerOptions?: WorkerOptions } |
c3719753 JB |
372 | ): Worker => { |
373 | switch (type) { | |
374 | case WorkerTypes.thread: | |
e9ed6eee | 375 | return new ThreadWorker(filePath, { |
c3719753 | 376 | env: SHARE_ENV, |
3a502712 | 377 | ...opts.workerOptions, |
c3719753 JB |
378 | }) as unknown as Worker |
379 | case WorkerTypes.cluster: | |
c63a35a0 | 380 | return cluster.fork(opts.env) as unknown as Worker |
c3719753 | 381 | default: |
6e5d7052 | 382 | // eslint-disable-next-line @typescript-eslint/restrict-template-expressions |
c3719753 JB |
383 | throw new Error(`Unknown worker type '${type}'`) |
384 | } | |
385 | } | |
d41a44de | 386 | |
e9ed6eee JB |
387 | /** |
388 | * Returns the worker type of the given worker. | |
e9ed6eee JB |
389 | * @param worker - The worker to get the type of. |
390 | * @returns The worker type of the given worker. | |
391 | * @internal | |
392 | */ | |
393 | export const getWorkerType = (worker: IWorker): WorkerType | undefined => { | |
394 | if (worker instanceof ThreadWorker) { | |
395 | return WorkerTypes.thread | |
396 | } else if (worker instanceof ClusterWorker) { | |
397 | return WorkerTypes.cluster | |
398 | } | |
399 | } | |
400 | ||
401 | /** | |
402 | * Returns the worker id of the given worker. | |
e9ed6eee JB |
403 | * @param worker - The worker to get the id of. |
404 | * @returns The worker id of the given worker. | |
405 | * @internal | |
406 | */ | |
407 | export const getWorkerId = (worker: IWorker): number | undefined => { | |
408 | if (worker instanceof ThreadWorker) { | |
409 | return worker.threadId | |
410 | } else if (worker instanceof ClusterWorker) { | |
411 | return worker.id | |
412 | } | |
413 | } | |
414 | ||
d41a44de JB |
415 | export const waitWorkerNodeEvents = async < |
416 | Worker extends IWorker, | |
417 | Data = unknown | |
418 | >( | |
419 | workerNode: IWorkerNode<Worker, Data>, | |
420 | workerNodeEvent: string, | |
32b141fd JB |
421 | numberOfEventsToWait: number, |
422 | timeout: number | |
d41a44de JB |
423 | ): Promise<number> => { |
424 | return await new Promise<number>(resolve => { | |
425 | let events = 0 | |
426 | if (numberOfEventsToWait === 0) { | |
427 | resolve(events) | |
428 | return | |
429 | } | |
2ef26de4 JB |
430 | switch (workerNodeEvent) { |
431 | case 'idle': | |
432 | case 'backPressure': | |
433 | case 'taskFinished': | |
434 | workerNode.on(workerNodeEvent, () => { | |
435 | ++events | |
436 | if (events === numberOfEventsToWait) { | |
437 | resolve(events) | |
438 | } | |
439 | }) | |
440 | break | |
441 | default: | |
442 | throw new Error('Invalid worker node event') | |
443 | } | |
6f3a391b | 444 | if (timeout >= 0) { |
32b141fd JB |
445 | setTimeout(() => { |
446 | resolve(events) | |
447 | }, timeout) | |
448 | } | |
d41a44de JB |
449 | }) |
450 | } |