]>
Commit | Line | Data |
---|---|---|
e9ed6eee | 1 | import cluster, { Worker as ClusterWorker } from 'node:cluster' |
ded253e2 | 2 | import { existsSync } from 'node:fs' |
ded253e2 | 3 | import { env } from 'node:process' |
e9ed6eee JB |
4 | import { |
5 | SHARE_ENV, | |
6 | Worker as ThreadWorker, | |
3a502712 | 7 | type WorkerOptions, |
e9ed6eee | 8 | } from 'node:worker_threads' |
ded253e2 | 9 | |
d35e5717 | 10 | import type { MessageValue, Task } from '../utility-types.js' |
bcfb06ce | 11 | import type { TasksQueueOptions } from './pool.js' |
97231086 JB |
12 | import type { WorkerChoiceStrategiesContext } from './selection-strategies/worker-choice-strategies-context.js' |
13 | ||
14 | import { average, isPlainObject, max, median, min } from '../utils.js' | |
bde6b5d7 | 15 | import { |
bfc75cca | 16 | type MeasurementStatisticsRequirements, |
bde6b5d7 | 17 | WorkerChoiceStrategies, |
3a502712 | 18 | type WorkerChoiceStrategy, |
d35e5717 | 19 | } from './selection-strategies/selection-strategies-types.js' |
c3719753 JB |
20 | import { |
21 | type IWorker, | |
d41a44de | 22 | type IWorkerNode, |
c3719753 | 23 | type MeasurementStatistics, |
559c196a | 24 | type WorkerInfo, |
c3719753 JB |
25 | type WorkerNodeOptions, |
26 | type WorkerType, | |
c329fd41 | 27 | WorkerTypes, |
3a502712 | 28 | type WorkerUsage, |
d35e5717 | 29 | } from './worker.js' |
bde6b5d7 | 30 | |
e9ed6eee JB |
31 | /** |
32 | * Default measurement statistics requirements. | |
33 | */ | |
7cec62a4 JB |
34 | export const DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS: Readonly<MeasurementStatisticsRequirements> = |
35 | Object.freeze({ | |
e9ed6eee JB |
36 | aggregate: false, |
37 | average: false, | |
3a502712 | 38 | median: false, |
7cec62a4 | 39 | }) |
e9ed6eee | 40 | |
32b141fd JB |
41 | export const getDefaultTasksQueueOptions = ( |
42 | poolMaxSize: number | |
7cec62a4 JB |
43 | ): Required<Readonly<TasksQueueOptions>> => { |
44 | return Object.freeze({ | |
32b141fd | 45 | concurrency: 1, |
963e6682 | 46 | size: poolMaxSize ** 2, |
97231086 | 47 | tasksFinishedTimeout: 2000, |
f09b1954 | 48 | tasksStealingOnBackPressure: true, |
e25f86b3 | 49 | tasksStealingRatio: 0.6, |
97231086 | 50 | taskStealing: true, |
7cec62a4 | 51 | }) |
32b141fd JB |
52 | } |
53 | ||
c63a35a0 | 54 | export const checkFilePath = (filePath: string | undefined): void => { |
c3719753 JB |
55 | if (filePath == null) { |
56 | throw new TypeError('The worker file path must be specified') | |
57 | } | |
58 | if (typeof filePath !== 'string') { | |
59 | throw new TypeError('The worker file path must be a string') | |
60 | } | |
bde6b5d7 JB |
61 | if (!existsSync(filePath)) { |
62 | throw new Error(`Cannot find the worker file '${filePath}'`) | |
63 | } | |
64 | } | |
65 | ||
c63a35a0 JB |
66 | export const checkDynamicPoolSize = ( |
67 | min: number, | |
68 | max: number | undefined | |
69 | ): void => { | |
bde6b5d7 JB |
70 | if (max == null) { |
71 | throw new TypeError( | |
72 | 'Cannot instantiate a dynamic pool without specifying the maximum pool size' | |
73 | ) | |
963e6682 JB |
74 | } |
75 | if (!Number.isSafeInteger(max)) { | |
bde6b5d7 JB |
76 | throw new TypeError( |
77 | 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size' | |
78 | ) | |
963e6682 JB |
79 | } |
80 | if (min > max) { | |
bde6b5d7 JB |
81 | throw new RangeError( |
82 | 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size' | |
83 | ) | |
963e6682 JB |
84 | } |
85 | if (max === 0) { | |
bde6b5d7 JB |
86 | throw new RangeError( |
87 | 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero' | |
88 | ) | |
963e6682 JB |
89 | } |
90 | if (min === max) { | |
bde6b5d7 JB |
91 | throw new RangeError( |
92 | 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead' | |
93 | ) | |
94 | } | |
95 | } | |
96 | ||
d0bd5062 JB |
97 | export const checkValidPriority = (priority: number | undefined): void => { |
98 | if (priority != null && !Number.isSafeInteger(priority)) { | |
6e5d7052 | 99 | throw new TypeError(`Invalid property 'priority': '${priority.toString()}'`) |
d0bd5062 JB |
100 | } |
101 | if ( | |
102 | priority != null && | |
103 | Number.isSafeInteger(priority) && | |
104 | (priority < -20 || priority > 19) | |
105 | ) { | |
85bbc7ab | 106 | throw new RangeError("Property 'priority' must be between -20 and 19") |
d0bd5062 JB |
107 | } |
108 | } | |
109 | ||
bde6b5d7 | 110 | export const checkValidWorkerChoiceStrategy = ( |
97231086 | 111 | workerChoiceStrategy: undefined | WorkerChoiceStrategy |
bde6b5d7 JB |
112 | ): void => { |
113 | if ( | |
114 | workerChoiceStrategy != null && | |
115 | !Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy) | |
116 | ) { | |
117 | throw new Error(`Invalid worker choice strategy '${workerChoiceStrategy}'`) | |
118 | } | |
119 | } | |
120 | ||
121 | export const checkValidTasksQueueOptions = ( | |
c63a35a0 | 122 | tasksQueueOptions: TasksQueueOptions | undefined |
bde6b5d7 JB |
123 | ): void => { |
124 | if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) { | |
125 | throw new TypeError('Invalid tasks queue options: must be a plain object') | |
126 | } | |
127 | if ( | |
128 | tasksQueueOptions?.concurrency != null && | |
129 | !Number.isSafeInteger(tasksQueueOptions.concurrency) | |
130 | ) { | |
131 | throw new TypeError( | |
132 | 'Invalid worker node tasks concurrency: must be an integer' | |
133 | ) | |
134 | } | |
135 | if ( | |
136 | tasksQueueOptions?.concurrency != null && | |
137 | tasksQueueOptions.concurrency <= 0 | |
138 | ) { | |
139 | throw new RangeError( | |
6e5d7052 | 140 | `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency.toString()} is a negative integer or zero` |
bde6b5d7 JB |
141 | ) |
142 | } | |
143 | if ( | |
144 | tasksQueueOptions?.size != null && | |
145 | !Number.isSafeInteger(tasksQueueOptions.size) | |
146 | ) { | |
147 | throw new TypeError( | |
148 | 'Invalid worker node tasks queue size: must be an integer' | |
149 | ) | |
150 | } | |
151 | if (tasksQueueOptions?.size != null && tasksQueueOptions.size <= 0) { | |
152 | throw new RangeError( | |
6e5d7052 | 153 | `Invalid worker node tasks queue size: ${tasksQueueOptions.size.toString()} is a negative integer or zero` |
bde6b5d7 JB |
154 | ) |
155 | } | |
e25f86b3 JB |
156 | if ( |
157 | tasksQueueOptions?.tasksStealingRatio != null && | |
158 | typeof tasksQueueOptions.tasksStealingRatio !== 'number' | |
159 | ) { | |
160 | throw new TypeError( | |
161 | 'Invalid worker node tasks stealing ratio: must be a number' | |
162 | ) | |
163 | } | |
164 | if ( | |
165 | tasksQueueOptions?.tasksStealingRatio != null && | |
166 | (tasksQueueOptions.tasksStealingRatio < 0 || | |
167 | tasksQueueOptions.tasksStealingRatio > 1) | |
168 | ) { | |
169 | throw new RangeError( | |
170 | 'Invalid worker node tasks stealing ratio: must be between 0 and 1' | |
171 | ) | |
172 | } | |
bde6b5d7 | 173 | } |
bfc75cca | 174 | |
c3719753 | 175 | export const checkWorkerNodeArguments = ( |
97231086 | 176 | type: undefined | WorkerType, |
c63a35a0 | 177 | filePath: string | undefined, |
97231086 | 178 | opts: undefined | WorkerNodeOptions |
9a38f99e | 179 | ): void => { |
c3719753 JB |
180 | if (type == null) { |
181 | throw new TypeError('Cannot construct a worker node without a worker type') | |
182 | } | |
183 | if (!Object.values(WorkerTypes).includes(type)) { | |
184 | throw new TypeError( | |
185 | `Cannot construct a worker node with an invalid worker type '${type}'` | |
186 | ) | |
9a38f99e | 187 | } |
c3719753 JB |
188 | checkFilePath(filePath) |
189 | if (opts == null) { | |
9a38f99e | 190 | throw new TypeError( |
c3719753 | 191 | 'Cannot construct a worker node without worker node options' |
9a38f99e JB |
192 | ) |
193 | } | |
9974369e | 194 | if (!isPlainObject(opts)) { |
9a38f99e | 195 | throw new TypeError( |
fcfc3353 | 196 | 'Cannot construct a worker node with invalid worker node options: must be a plain object' |
9a38f99e JB |
197 | ) |
198 | } | |
c3719753 JB |
199 | if (opts.tasksQueueBackPressureSize == null) { |
200 | throw new TypeError( | |
201 | 'Cannot construct a worker node without a tasks queue back pressure size option' | |
202 | ) | |
203 | } | |
204 | if (!Number.isSafeInteger(opts.tasksQueueBackPressureSize)) { | |
205 | throw new TypeError( | |
206 | 'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer' | |
207 | ) | |
208 | } | |
209 | if (opts.tasksQueueBackPressureSize <= 0) { | |
9a38f99e | 210 | throw new RangeError( |
c3719753 | 211 | 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer' |
9a38f99e JB |
212 | ) |
213 | } | |
59ca7cff JB |
214 | if (opts.tasksQueueBucketSize == null) { |
215 | throw new TypeError( | |
216 | 'Cannot construct a worker node without a tasks queue bucket size option' | |
217 | ) | |
218 | } | |
219 | if (!Number.isSafeInteger(opts.tasksQueueBucketSize)) { | |
220 | throw new TypeError( | |
221 | 'Cannot construct a worker node with a tasks queue bucket size option that is not an integer' | |
222 | ) | |
223 | } | |
224 | if (opts.tasksQueueBucketSize <= 0) { | |
225 | throw new RangeError( | |
226 | 'Cannot construct a worker node with a tasks queue bucket size option that is not a positive integer' | |
227 | ) | |
228 | } | |
fcfc3353 JB |
229 | if (opts.tasksQueuePriority == null) { |
230 | throw new TypeError( | |
231 | 'Cannot construct a worker node without a tasks queue priority option' | |
232 | ) | |
233 | } | |
234 | if (typeof opts.tasksQueuePriority !== 'boolean') { | |
235 | throw new TypeError( | |
236 | 'Cannot construct a worker node with a tasks queue priority option that is not a boolean' | |
237 | ) | |
238 | } | |
9a38f99e | 239 | } |
bfc75cca JB |
240 | |
241 | /** | |
242 | * Updates the given measurement statistics. | |
bfc75cca JB |
243 | * @param measurementStatistics - The measurement statistics to update. |
244 | * @param measurementRequirements - The measurement statistics requirements. | |
245 | * @param measurementValue - The measurement value. | |
bfc75cca JB |
246 | * @internal |
247 | */ | |
c329fd41 | 248 | const updateMeasurementStatistics = ( |
bfc75cca | 249 | measurementStatistics: MeasurementStatistics, |
c63a35a0 JB |
250 | measurementRequirements: MeasurementStatisticsRequirements | undefined, |
251 | measurementValue: number | undefined | |
bfc75cca | 252 | ): void => { |
c63a35a0 JB |
253 | if ( |
254 | measurementRequirements != null && | |
255 | measurementValue != null && | |
256 | measurementRequirements.aggregate | |
257 | ) { | |
bfc75cca JB |
258 | measurementStatistics.aggregate = |
259 | (measurementStatistics.aggregate ?? 0) + measurementValue | |
260 | measurementStatistics.minimum = min( | |
261 | measurementValue, | |
80115618 | 262 | measurementStatistics.minimum ?? Number.POSITIVE_INFINITY |
bfc75cca JB |
263 | ) |
264 | measurementStatistics.maximum = max( | |
265 | measurementValue, | |
80115618 | 266 | measurementStatistics.maximum ?? Number.NEGATIVE_INFINITY |
bfc75cca | 267 | ) |
c63a35a0 | 268 | if (measurementRequirements.average || measurementRequirements.median) { |
f12182ad | 269 | measurementStatistics.history.put(measurementValue) |
bfc75cca | 270 | if (measurementRequirements.average) { |
f12182ad JB |
271 | measurementStatistics.average = average( |
272 | measurementStatistics.history.toArray() | |
273 | ) | |
bfc75cca | 274 | } else if (measurementStatistics.average != null) { |
f6392d50 | 275 | measurementStatistics.average = undefined |
bfc75cca JB |
276 | } |
277 | if (measurementRequirements.median) { | |
f12182ad JB |
278 | measurementStatistics.median = median( |
279 | measurementStatistics.history.toArray() | |
280 | ) | |
bfc75cca | 281 | } else if (measurementStatistics.median != null) { |
f6392d50 | 282 | measurementStatistics.median = undefined |
bfc75cca JB |
283 | } |
284 | } | |
285 | } | |
286 | } | |
c329fd41 | 287 | if (env.NODE_ENV === 'test') { |
6e5d7052 | 288 | // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access |
c329fd41 JB |
289 | exports.updateMeasurementStatistics = updateMeasurementStatistics |
290 | } | |
291 | ||
292 | export const updateWaitTimeWorkerUsage = < | |
293 | Worker extends IWorker, | |
294 | Data = unknown, | |
295 | Response = unknown | |
296 | >( | |
5bdd0e9a | 297 | workerChoiceStrategiesContext: |
97231086 JB |
298 | | undefined |
299 | | WorkerChoiceStrategiesContext<Worker, Data, Response>, | |
c329fd41 JB |
300 | workerUsage: WorkerUsage, |
301 | task: Task<Data> | |
302 | ): void => { | |
303 | const timestamp = performance.now() | |
304 | const taskWaitTime = timestamp - (task.timestamp ?? timestamp) | |
305 | updateMeasurementStatistics( | |
306 | workerUsage.waitTime, | |
5bdd0e9a | 307 | workerChoiceStrategiesContext?.getTaskStatisticsRequirements().waitTime, |
c329fd41 JB |
308 | taskWaitTime |
309 | ) | |
310 | } | |
311 | ||
312 | export const updateTaskStatisticsWorkerUsage = <Response = unknown>( | |
313 | workerUsage: WorkerUsage, | |
314 | message: MessageValue<Response> | |
315 | ): void => { | |
316 | const workerTaskStatistics = workerUsage.tasks | |
317 | if ( | |
6e5d7052 | 318 | // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition |
c329fd41 JB |
319 | workerTaskStatistics.executing != null && |
320 | workerTaskStatistics.executing > 0 | |
321 | ) { | |
322 | --workerTaskStatistics.executing | |
323 | } | |
324 | if (message.workerError == null) { | |
325 | ++workerTaskStatistics.executed | |
326 | } else { | |
327 | ++workerTaskStatistics.failed | |
328 | } | |
329 | } | |
330 | ||
331 | export const updateRunTimeWorkerUsage = < | |
332 | Worker extends IWorker, | |
333 | Data = unknown, | |
334 | Response = unknown | |
335 | >( | |
5bdd0e9a | 336 | workerChoiceStrategiesContext: |
97231086 JB |
337 | | undefined |
338 | | WorkerChoiceStrategiesContext<Worker, Data, Response>, | |
c329fd41 JB |
339 | workerUsage: WorkerUsage, |
340 | message: MessageValue<Response> | |
341 | ): void => { | |
342 | if (message.workerError != null) { | |
343 | return | |
344 | } | |
345 | updateMeasurementStatistics( | |
346 | workerUsage.runTime, | |
5bdd0e9a | 347 | workerChoiceStrategiesContext?.getTaskStatisticsRequirements().runTime, |
c329fd41 JB |
348 | message.taskPerformance?.runTime ?? 0 |
349 | ) | |
350 | } | |
351 | ||
352 | export const updateEluWorkerUsage = < | |
353 | Worker extends IWorker, | |
354 | Data = unknown, | |
355 | Response = unknown | |
356 | >( | |
5bdd0e9a | 357 | workerChoiceStrategiesContext: |
97231086 JB |
358 | | undefined |
359 | | WorkerChoiceStrategiesContext<Worker, Data, Response>, | |
c329fd41 JB |
360 | workerUsage: WorkerUsage, |
361 | message: MessageValue<Response> | |
362 | ): void => { | |
363 | if (message.workerError != null) { | |
364 | return | |
365 | } | |
c63a35a0 | 366 | const eluTaskStatisticsRequirements = |
5bdd0e9a | 367 | workerChoiceStrategiesContext?.getTaskStatisticsRequirements().elu |
c329fd41 JB |
368 | updateMeasurementStatistics( |
369 | workerUsage.elu.active, | |
370 | eluTaskStatisticsRequirements, | |
371 | message.taskPerformance?.elu?.active ?? 0 | |
372 | ) | |
373 | updateMeasurementStatistics( | |
374 | workerUsage.elu.idle, | |
375 | eluTaskStatisticsRequirements, | |
376 | message.taskPerformance?.elu?.idle ?? 0 | |
377 | ) | |
c63a35a0 | 378 | if (eluTaskStatisticsRequirements?.aggregate === true) { |
c329fd41 | 379 | if (message.taskPerformance?.elu != null) { |
f6392d50 JB |
380 | workerUsage.elu.count = (workerUsage.elu.count ?? 0) + 1 |
381 | workerUsage.elu.utilization = | |
382 | ((workerUsage.elu.utilization ?? 0) * (workerUsage.elu.count - 1) + | |
383 | message.taskPerformance.elu.utilization) / | |
384 | workerUsage.elu.count | |
c329fd41 JB |
385 | } |
386 | } | |
387 | } | |
c3719753 | 388 | |
4a4ffb16 | 389 | // eslint-disable-next-line @typescript-eslint/no-unnecessary-type-parameters |
c3719753 JB |
390 | export const createWorker = <Worker extends IWorker>( |
391 | type: WorkerType, | |
392 | filePath: string, | |
3a502712 | 393 | opts: { env?: Record<string, unknown>; workerOptions?: WorkerOptions } |
c3719753 JB |
394 | ): Worker => { |
395 | switch (type) { | |
97231086 JB |
396 | case WorkerTypes.cluster: |
397 | return cluster.fork(opts.env) as unknown as Worker | |
c3719753 | 398 | case WorkerTypes.thread: |
e9ed6eee | 399 | return new ThreadWorker(filePath, { |
c3719753 | 400 | env: SHARE_ENV, |
3a502712 | 401 | ...opts.workerOptions, |
c3719753 | 402 | }) as unknown as Worker |
c3719753 | 403 | default: |
6e5d7052 | 404 | // eslint-disable-next-line @typescript-eslint/restrict-template-expressions |
c3719753 JB |
405 | throw new Error(`Unknown worker type '${type}'`) |
406 | } | |
407 | } | |
d41a44de | 408 | |
e9ed6eee JB |
409 | /** |
410 | * Returns the worker type of the given worker. | |
e9ed6eee JB |
411 | * @param worker - The worker to get the type of. |
412 | * @returns The worker type of the given worker. | |
413 | * @internal | |
414 | */ | |
559c196a | 415 | const getWorkerType = (worker: IWorker): undefined | WorkerType => { |
e9ed6eee JB |
416 | if (worker instanceof ThreadWorker) { |
417 | return WorkerTypes.thread | |
963e6682 JB |
418 | } |
419 | if (worker instanceof ClusterWorker) { | |
e9ed6eee JB |
420 | return WorkerTypes.cluster |
421 | } | |
422 | } | |
423 | ||
424 | /** | |
425 | * Returns the worker id of the given worker. | |
e9ed6eee JB |
426 | * @param worker - The worker to get the id of. |
427 | * @returns The worker id of the given worker. | |
428 | * @internal | |
429 | */ | |
559c196a | 430 | const getWorkerId = (worker: IWorker): number | undefined => { |
e9ed6eee JB |
431 | if (worker instanceof ThreadWorker) { |
432 | return worker.threadId | |
963e6682 JB |
433 | } |
434 | if (worker instanceof ClusterWorker) { | |
e9ed6eee JB |
435 | return worker.id |
436 | } | |
437 | } | |
438 | ||
559c196a JB |
439 | export const initWorkerInfo = (worker: IWorker): WorkerInfo => { |
440 | return { | |
441 | backPressure: false, | |
442 | backPressureStealing: false, | |
443 | continuousStealing: false, | |
444 | dynamic: false, | |
445 | id: getWorkerId(worker), | |
f4289ecb | 446 | queuedTaskAbortion: false, |
559c196a JB |
447 | ready: false, |
448 | stealing: false, | |
449 | stolen: false, | |
450 | // eslint-disable-next-line @typescript-eslint/no-non-null-assertion | |
451 | type: getWorkerType(worker)!, | |
452 | } | |
453 | } | |
454 | ||
d41a44de JB |
455 | export const waitWorkerNodeEvents = async < |
456 | Worker extends IWorker, | |
457 | Data = unknown | |
458 | >( | |
459 | workerNode: IWorkerNode<Worker, Data>, | |
460 | workerNodeEvent: string, | |
32b141fd JB |
461 | numberOfEventsToWait: number, |
462 | timeout: number | |
d41a44de JB |
463 | ): Promise<number> => { |
464 | return await new Promise<number>(resolve => { | |
465 | let events = 0 | |
466 | if (numberOfEventsToWait === 0) { | |
467 | resolve(events) | |
468 | return | |
469 | } | |
2ef26de4 | 470 | switch (workerNodeEvent) { |
2ef26de4 | 471 | case 'backPressure': |
97231086 | 472 | case 'idle': |
2ef26de4 JB |
473 | case 'taskFinished': |
474 | workerNode.on(workerNodeEvent, () => { | |
475 | ++events | |
476 | if (events === numberOfEventsToWait) { | |
477 | resolve(events) | |
478 | } | |
479 | }) | |
480 | break | |
481 | default: | |
482 | throw new Error('Invalid worker node event') | |
483 | } | |
6f3a391b | 484 | if (timeout >= 0) { |
32b141fd JB |
485 | setTimeout(() => { |
486 | resolve(events) | |
487 | }, timeout) | |
488 | } | |
d41a44de JB |
489 | }) |
490 | } |