2c021534baddd48b06f3c9b0a9aa7a7fbfa2d0cd
[poolifier.git] / src / pools / utils.ts
1 import cluster, { Worker as ClusterWorker } from 'node:cluster'
2 import { randomInt } from 'node:crypto'
3 import { existsSync } from 'node:fs'
4 import { cpus } from 'node:os'
5 import { env } from 'node:process'
6 import {
7 SHARE_ENV,
8 Worker as ThreadWorker,
9 type WorkerOptions
10 } from 'node:worker_threads'
11
12 import type { MessageValue, Task } from '../utility-types.js'
13 import { average, isPlainObject, max, median, min } from '../utils.js'
14 import type { IPool, TasksQueueOptions } from './pool.js'
15 import {
16 type MeasurementStatisticsRequirements,
17 WorkerChoiceStrategies,
18 type WorkerChoiceStrategy,
19 type WorkerChoiceStrategyOptions
20 } from './selection-strategies/selection-strategies-types.js'
21 import type { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context.js'
22 import {
23 type IWorker,
24 type IWorkerNode,
25 type MeasurementStatistics,
26 type WorkerNodeOptions,
27 type WorkerType,
28 WorkerTypes,
29 type WorkerUsage
30 } from './worker.js'
31
32 /**
33 * Default measurement statistics requirements.
34 */
35 export const DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS: MeasurementStatisticsRequirements =
36 {
37 aggregate: false,
38 average: false,
39 median: false
40 }
41
42 export const getDefaultTasksQueueOptions = (
43 poolMaxSize: number
44 ): Required<TasksQueueOptions> => {
45 return {
46 size: Math.pow(poolMaxSize, 2),
47 concurrency: 1,
48 taskStealing: true,
49 tasksStealingOnBackPressure: true,
50 tasksFinishedTimeout: 2000
51 }
52 }
53
54 export const getWorkerChoiceStrategyRetries = <
55 Worker extends IWorker,
56 Data,
57 Response
58 >(
59 pool: IPool<Worker, Data, Response>,
60 opts?: WorkerChoiceStrategyOptions
61 ): number => {
62 return (
63 pool.info.maxSize +
64 Object.keys(opts?.weights ?? getDefaultWeights(pool.info.maxSize)).length
65 )
66 }
67
68 export const buildWorkerChoiceStrategyOptions = <
69 Worker extends IWorker,
70 Data,
71 Response
72 >(
73 pool: IPool<Worker, Data, Response>,
74 opts?: WorkerChoiceStrategyOptions
75 ): WorkerChoiceStrategyOptions => {
76 opts = clone(opts ?? {})
77 opts.weights = opts.weights ?? getDefaultWeights(pool.info.maxSize)
78 return {
79 ...{
80 runTime: { median: false },
81 waitTime: { median: false },
82 elu: { median: false }
83 },
84 ...opts
85 }
86 }
87
88 const clone = <T>(object: T): T => {
89 return structuredClone<T>(object)
90 }
91
92 const getDefaultWeights = (
93 poolMaxSize: number,
94 defaultWorkerWeight?: number
95 ): Record<number, number> => {
96 defaultWorkerWeight = defaultWorkerWeight ?? getDefaultWorkerWeight()
97 const weights: Record<number, number> = {}
98 for (let workerNodeKey = 0; workerNodeKey < poolMaxSize; workerNodeKey++) {
99 weights[workerNodeKey] = defaultWorkerWeight
100 }
101 return weights
102 }
103
104 const getDefaultWorkerWeight = (): number => {
105 const cpuSpeed = randomInt(500, 2500)
106 let cpusCycleTimeWeight = 0
107 for (const cpu of cpus()) {
108 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
109 if (cpu.speed == null || cpu.speed === 0) {
110 cpu.speed =
111 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
112 cpus().find(cpu => cpu.speed != null && cpu.speed !== 0)?.speed ??
113 cpuSpeed
114 }
115 // CPU estimated cycle time
116 const numberOfDigits = cpu.speed.toString().length - 1
117 const cpuCycleTime = 1 / (cpu.speed / Math.pow(10, numberOfDigits))
118 cpusCycleTimeWeight += cpuCycleTime * Math.pow(10, numberOfDigits)
119 }
120 return Math.round(cpusCycleTimeWeight / cpus().length)
121 }
122
123 export const checkFilePath = (filePath: string | undefined): void => {
124 if (filePath == null) {
125 throw new TypeError('The worker file path must be specified')
126 }
127 if (typeof filePath !== 'string') {
128 throw new TypeError('The worker file path must be a string')
129 }
130 if (!existsSync(filePath)) {
131 throw new Error(`Cannot find the worker file '${filePath}'`)
132 }
133 }
134
135 export const checkDynamicPoolSize = (
136 min: number,
137 max: number | undefined
138 ): void => {
139 if (max == null) {
140 throw new TypeError(
141 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
142 )
143 } else if (!Number.isSafeInteger(max)) {
144 throw new TypeError(
145 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
146 )
147 } else if (min > max) {
148 throw new RangeError(
149 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
150 )
151 } else if (max === 0) {
152 throw new RangeError(
153 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
154 )
155 } else if (min === max) {
156 throw new RangeError(
157 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
158 )
159 }
160 }
161
162 export const checkValidWorkerChoiceStrategy = (
163 workerChoiceStrategy: WorkerChoiceStrategy | undefined
164 ): void => {
165 if (
166 workerChoiceStrategy != null &&
167 !Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)
168 ) {
169 throw new Error(`Invalid worker choice strategy '${workerChoiceStrategy}'`)
170 }
171 }
172
173 export const checkValidTasksQueueOptions = (
174 tasksQueueOptions: TasksQueueOptions | undefined
175 ): void => {
176 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
177 throw new TypeError('Invalid tasks queue options: must be a plain object')
178 }
179 if (
180 tasksQueueOptions?.concurrency != null &&
181 !Number.isSafeInteger(tasksQueueOptions.concurrency)
182 ) {
183 throw new TypeError(
184 'Invalid worker node tasks concurrency: must be an integer'
185 )
186 }
187 if (
188 tasksQueueOptions?.concurrency != null &&
189 tasksQueueOptions.concurrency <= 0
190 ) {
191 throw new RangeError(
192 `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero`
193 )
194 }
195 if (
196 tasksQueueOptions?.size != null &&
197 !Number.isSafeInteger(tasksQueueOptions.size)
198 ) {
199 throw new TypeError(
200 'Invalid worker node tasks queue size: must be an integer'
201 )
202 }
203 if (tasksQueueOptions?.size != null && tasksQueueOptions.size <= 0) {
204 throw new RangeError(
205 `Invalid worker node tasks queue size: ${tasksQueueOptions.size} is a negative integer or zero`
206 )
207 }
208 }
209
210 export const checkWorkerNodeArguments = (
211 type: WorkerType | undefined,
212 filePath: string | undefined,
213 opts: WorkerNodeOptions | undefined
214 ): void => {
215 if (type == null) {
216 throw new TypeError('Cannot construct a worker node without a worker type')
217 }
218 if (!Object.values(WorkerTypes).includes(type)) {
219 throw new TypeError(
220 `Cannot construct a worker node with an invalid worker type '${type}'`
221 )
222 }
223 checkFilePath(filePath)
224 if (opts == null) {
225 throw new TypeError(
226 'Cannot construct a worker node without worker node options'
227 )
228 }
229 if (!isPlainObject(opts)) {
230 throw new TypeError(
231 'Cannot construct a worker node with invalid options: must be a plain object'
232 )
233 }
234 if (opts.tasksQueueBackPressureSize == null) {
235 throw new TypeError(
236 'Cannot construct a worker node without a tasks queue back pressure size option'
237 )
238 }
239 if (!Number.isSafeInteger(opts.tasksQueueBackPressureSize)) {
240 throw new TypeError(
241 'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer'
242 )
243 }
244 if (opts.tasksQueueBackPressureSize <= 0) {
245 throw new RangeError(
246 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer'
247 )
248 }
249 }
250
251 /**
252 * Updates the given measurement statistics.
253 *
254 * @param measurementStatistics - The measurement statistics to update.
255 * @param measurementRequirements - The measurement statistics requirements.
256 * @param measurementValue - The measurement value.
257 * @internal
258 */
259 const updateMeasurementStatistics = (
260 measurementStatistics: MeasurementStatistics,
261 measurementRequirements: MeasurementStatisticsRequirements | undefined,
262 measurementValue: number | undefined
263 ): void => {
264 if (
265 measurementRequirements != null &&
266 measurementValue != null &&
267 measurementRequirements.aggregate
268 ) {
269 measurementStatistics.aggregate =
270 (measurementStatistics.aggregate ?? 0) + measurementValue
271 measurementStatistics.minimum = min(
272 measurementValue,
273 measurementStatistics.minimum ?? Infinity
274 )
275 measurementStatistics.maximum = max(
276 measurementValue,
277 measurementStatistics.maximum ?? -Infinity
278 )
279 if (measurementRequirements.average || measurementRequirements.median) {
280 measurementStatistics.history.push(measurementValue)
281 if (measurementRequirements.average) {
282 measurementStatistics.average = average(measurementStatistics.history)
283 } else if (measurementStatistics.average != null) {
284 delete measurementStatistics.average
285 }
286 if (measurementRequirements.median) {
287 measurementStatistics.median = median(measurementStatistics.history)
288 } else if (measurementStatistics.median != null) {
289 delete measurementStatistics.median
290 }
291 }
292 }
293 }
294 if (env.NODE_ENV === 'test') {
295 // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
296 exports.updateMeasurementStatistics = updateMeasurementStatistics
297 }
298
299 export const updateWaitTimeWorkerUsage = <
300 Worker extends IWorker,
301 Data = unknown,
302 Response = unknown
303 >(
304 workerChoiceStrategyContext:
305 | WorkerChoiceStrategyContext<Worker, Data, Response>
306 | undefined,
307 workerUsage: WorkerUsage,
308 task: Task<Data>
309 ): void => {
310 const timestamp = performance.now()
311 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
312 updateMeasurementStatistics(
313 workerUsage.waitTime,
314 workerChoiceStrategyContext?.getTaskStatisticsRequirements().waitTime,
315 taskWaitTime
316 )
317 }
318
319 export const updateTaskStatisticsWorkerUsage = <Response = unknown>(
320 workerUsage: WorkerUsage,
321 message: MessageValue<Response>
322 ): void => {
323 const workerTaskStatistics = workerUsage.tasks
324 if (
325 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
326 workerTaskStatistics.executing != null &&
327 workerTaskStatistics.executing > 0
328 ) {
329 --workerTaskStatistics.executing
330 }
331 if (message.workerError == null) {
332 ++workerTaskStatistics.executed
333 } else {
334 ++workerTaskStatistics.failed
335 }
336 }
337
338 export const updateRunTimeWorkerUsage = <
339 Worker extends IWorker,
340 Data = unknown,
341 Response = unknown
342 >(
343 workerChoiceStrategyContext:
344 | WorkerChoiceStrategyContext<Worker, Data, Response>
345 | undefined,
346 workerUsage: WorkerUsage,
347 message: MessageValue<Response>
348 ): void => {
349 if (message.workerError != null) {
350 return
351 }
352 updateMeasurementStatistics(
353 workerUsage.runTime,
354 workerChoiceStrategyContext?.getTaskStatisticsRequirements().runTime,
355 message.taskPerformance?.runTime ?? 0
356 )
357 }
358
359 export const updateEluWorkerUsage = <
360 Worker extends IWorker,
361 Data = unknown,
362 Response = unknown
363 >(
364 workerChoiceStrategyContext:
365 | WorkerChoiceStrategyContext<Worker, Data, Response>
366 | undefined,
367 workerUsage: WorkerUsage,
368 message: MessageValue<Response>
369 ): void => {
370 if (message.workerError != null) {
371 return
372 }
373 const eluTaskStatisticsRequirements =
374 workerChoiceStrategyContext?.getTaskStatisticsRequirements().elu
375 updateMeasurementStatistics(
376 workerUsage.elu.active,
377 eluTaskStatisticsRequirements,
378 message.taskPerformance?.elu?.active ?? 0
379 )
380 updateMeasurementStatistics(
381 workerUsage.elu.idle,
382 eluTaskStatisticsRequirements,
383 message.taskPerformance?.elu?.idle ?? 0
384 )
385 if (eluTaskStatisticsRequirements?.aggregate === true) {
386 if (message.taskPerformance?.elu != null) {
387 if (workerUsage.elu.utilization != null) {
388 workerUsage.elu.utilization =
389 (workerUsage.elu.utilization +
390 message.taskPerformance.elu.utilization) /
391 2
392 } else {
393 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
394 }
395 }
396 }
397 }
398
399 export const createWorker = <Worker extends IWorker>(
400 type: WorkerType,
401 filePath: string,
402 opts: { env?: Record<string, unknown>, workerOptions?: WorkerOptions }
403 ): Worker => {
404 switch (type) {
405 case WorkerTypes.thread:
406 return new ThreadWorker(filePath, {
407 env: SHARE_ENV,
408 ...opts.workerOptions
409 }) as unknown as Worker
410 case WorkerTypes.cluster:
411 return cluster.fork(opts.env) as unknown as Worker
412 default:
413 // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
414 throw new Error(`Unknown worker type '${type}'`)
415 }
416 }
417
418 /**
419 * Returns the worker type of the given worker.
420 *
421 * @param worker - The worker to get the type of.
422 * @returns The worker type of the given worker.
423 * @internal
424 */
425 export const getWorkerType = (worker: IWorker): WorkerType | undefined => {
426 if (worker instanceof ThreadWorker) {
427 return WorkerTypes.thread
428 } else if (worker instanceof ClusterWorker) {
429 return WorkerTypes.cluster
430 }
431 }
432
433 /**
434 * Returns the worker id of the given worker.
435 *
436 * @param worker - The worker to get the id of.
437 * @returns The worker id of the given worker.
438 * @internal
439 */
440 export const getWorkerId = (worker: IWorker): number | undefined => {
441 if (worker instanceof ThreadWorker) {
442 return worker.threadId
443 } else if (worker instanceof ClusterWorker) {
444 return worker.id
445 }
446 }
447
448 export const waitWorkerNodeEvents = async <
449 Worker extends IWorker,
450 Data = unknown
451 >(
452 workerNode: IWorkerNode<Worker, Data>,
453 workerNodeEvent: string,
454 numberOfEventsToWait: number,
455 timeout: number
456 ): Promise<number> => {
457 return await new Promise<number>(resolve => {
458 let events = 0
459 if (numberOfEventsToWait === 0) {
460 resolve(events)
461 return
462 }
463 workerNode.on(workerNodeEvent, () => {
464 ++events
465 if (events === numberOfEventsToWait) {
466 resolve(events)
467 }
468 })
469 if (timeout >= 0) {
470 setTimeout(() => {
471 resolve(events)
472 }, timeout)
473 }
474 })
475 }