fix: readd cpu speed computation but only if necessary
[poolifier.git] / src / pools / utils.ts
1 import cluster, { Worker as ClusterWorker } from 'node:cluster'
2 import { existsSync } from 'node:fs'
3 import { cpus } from 'node:os'
4 import { env } from 'node:process'
5 import {
6 SHARE_ENV,
7 Worker as ThreadWorker,
8 type WorkerOptions
9 } from 'node:worker_threads'
10
11 import type { MessageValue, Task } from '../utility-types.js'
12 import { average, isPlainObject, max, median, min } from '../utils.js'
13 import type { IPool, TasksQueueOptions } from './pool.js'
14 import {
15 type MeasurementStatisticsRequirements,
16 WorkerChoiceStrategies,
17 type WorkerChoiceStrategy,
18 type WorkerChoiceStrategyOptions
19 } from './selection-strategies/selection-strategies-types.js'
20 import type { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context.js'
21 import {
22 type IWorker,
23 type IWorkerNode,
24 type MeasurementStatistics,
25 type WorkerNodeOptions,
26 type WorkerType,
27 WorkerTypes,
28 type WorkerUsage
29 } from './worker.js'
30
31 /**
32 * Default measurement statistics requirements.
33 */
34 export const DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS: MeasurementStatisticsRequirements =
35 {
36 aggregate: false,
37 average: false,
38 median: false
39 }
40
41 export const getDefaultTasksQueueOptions = (
42 poolMaxSize: number
43 ): Required<TasksQueueOptions> => {
44 return {
45 size: Math.pow(poolMaxSize, 2),
46 concurrency: 1,
47 taskStealing: true,
48 tasksStealingOnBackPressure: true,
49 tasksFinishedTimeout: 2000
50 }
51 }
52
53 export const getWorkerChoiceStrategyRetries = <
54 Worker extends IWorker,
55 Data,
56 Response
57 >(
58 pool: IPool<Worker, Data, Response>,
59 opts?: WorkerChoiceStrategyOptions
60 ): number => {
61 return (
62 pool.info.maxSize +
63 Object.keys(opts?.weights ?? getDefaultWeights(pool.info.maxSize)).length
64 )
65 }
66
67 export const buildWorkerChoiceStrategyOptions = <
68 Worker extends IWorker,
69 Data,
70 Response
71 >(
72 pool: IPool<Worker, Data, Response>,
73 opts?: WorkerChoiceStrategyOptions
74 ): WorkerChoiceStrategyOptions => {
75 opts = clone(opts ?? {})
76 opts.weights = opts.weights ?? getDefaultWeights(pool.info.maxSize)
77 return {
78 ...{
79 runTime: { median: false },
80 waitTime: { median: false },
81 elu: { median: false }
82 },
83 ...opts
84 }
85 }
86
87 const clone = <T>(object: T): T => {
88 return structuredClone<T>(object)
89 }
90
91 const getDefaultWeights = (
92 poolMaxSize: number,
93 defaultWorkerWeight?: number
94 ): Record<number, number> => {
95 defaultWorkerWeight = defaultWorkerWeight ?? getDefaultWorkerWeight()
96 const weights: Record<number, number> = {}
97 for (let workerNodeKey = 0; workerNodeKey < poolMaxSize; workerNodeKey++) {
98 weights[workerNodeKey] = defaultWorkerWeight
99 }
100 return weights
101 }
102
103 const estimatedCpuSpeed = (): number => {
104 const runs = 150000000
105 const begin = performance.now()
106 // eslint-disable-next-line no-empty
107 for (let i = runs; i > 0; i--) {}
108 const end = performance.now()
109 const duration = end - begin
110 return Math.trunc(runs / duration / 1000) // in MHz
111 }
112
113 const getDefaultWorkerWeight = (): number => {
114 const currentCpus = cpus()
115 let estCpuSpeed: number | undefined
116 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
117 if (currentCpus.every(cpu => cpu.speed == null || cpu.speed === 0)) {
118 estCpuSpeed = estimatedCpuSpeed()
119 }
120 let cpusCycleTimeWeight = 0
121 for (const cpu of currentCpus) {
122 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
123 if (cpu.speed == null || cpu.speed === 0) {
124 cpu.speed =
125 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
126 currentCpus.find(cpu => cpu.speed != null && cpu.speed !== 0)?.speed ??
127 estCpuSpeed ??
128 2000
129 }
130 // CPU estimated cycle time
131 const numberOfDigits = cpu.speed.toString().length - 1
132 const cpuCycleTime = 1 / (cpu.speed / Math.pow(10, numberOfDigits))
133 cpusCycleTimeWeight += cpuCycleTime * Math.pow(10, numberOfDigits)
134 }
135 return Math.round(cpusCycleTimeWeight / currentCpus.length)
136 }
137
138 export const checkFilePath = (filePath: string | undefined): void => {
139 if (filePath == null) {
140 throw new TypeError('The worker file path must be specified')
141 }
142 if (typeof filePath !== 'string') {
143 throw new TypeError('The worker file path must be a string')
144 }
145 if (!existsSync(filePath)) {
146 throw new Error(`Cannot find the worker file '${filePath}'`)
147 }
148 }
149
150 export const checkDynamicPoolSize = (
151 min: number,
152 max: number | undefined
153 ): void => {
154 if (max == null) {
155 throw new TypeError(
156 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
157 )
158 } else if (!Number.isSafeInteger(max)) {
159 throw new TypeError(
160 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
161 )
162 } else if (min > max) {
163 throw new RangeError(
164 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
165 )
166 } else if (max === 0) {
167 throw new RangeError(
168 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
169 )
170 } else if (min === max) {
171 throw new RangeError(
172 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
173 )
174 }
175 }
176
177 export const checkValidWorkerChoiceStrategy = (
178 workerChoiceStrategy: WorkerChoiceStrategy | undefined
179 ): void => {
180 if (
181 workerChoiceStrategy != null &&
182 !Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)
183 ) {
184 throw new Error(`Invalid worker choice strategy '${workerChoiceStrategy}'`)
185 }
186 }
187
188 export const checkValidTasksQueueOptions = (
189 tasksQueueOptions: TasksQueueOptions | undefined
190 ): void => {
191 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
192 throw new TypeError('Invalid tasks queue options: must be a plain object')
193 }
194 if (
195 tasksQueueOptions?.concurrency != null &&
196 !Number.isSafeInteger(tasksQueueOptions.concurrency)
197 ) {
198 throw new TypeError(
199 'Invalid worker node tasks concurrency: must be an integer'
200 )
201 }
202 if (
203 tasksQueueOptions?.concurrency != null &&
204 tasksQueueOptions.concurrency <= 0
205 ) {
206 throw new RangeError(
207 `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero`
208 )
209 }
210 if (
211 tasksQueueOptions?.size != null &&
212 !Number.isSafeInteger(tasksQueueOptions.size)
213 ) {
214 throw new TypeError(
215 'Invalid worker node tasks queue size: must be an integer'
216 )
217 }
218 if (tasksQueueOptions?.size != null && tasksQueueOptions.size <= 0) {
219 throw new RangeError(
220 `Invalid worker node tasks queue size: ${tasksQueueOptions.size} is a negative integer or zero`
221 )
222 }
223 }
224
225 export const checkWorkerNodeArguments = (
226 type: WorkerType | undefined,
227 filePath: string | undefined,
228 opts: WorkerNodeOptions | undefined
229 ): void => {
230 if (type == null) {
231 throw new TypeError('Cannot construct a worker node without a worker type')
232 }
233 if (!Object.values(WorkerTypes).includes(type)) {
234 throw new TypeError(
235 `Cannot construct a worker node with an invalid worker type '${type}'`
236 )
237 }
238 checkFilePath(filePath)
239 if (opts == null) {
240 throw new TypeError(
241 'Cannot construct a worker node without worker node options'
242 )
243 }
244 if (!isPlainObject(opts)) {
245 throw new TypeError(
246 'Cannot construct a worker node with invalid options: must be a plain object'
247 )
248 }
249 if (opts.tasksQueueBackPressureSize == null) {
250 throw new TypeError(
251 'Cannot construct a worker node without a tasks queue back pressure size option'
252 )
253 }
254 if (!Number.isSafeInteger(opts.tasksQueueBackPressureSize)) {
255 throw new TypeError(
256 'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer'
257 )
258 }
259 if (opts.tasksQueueBackPressureSize <= 0) {
260 throw new RangeError(
261 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer'
262 )
263 }
264 }
265
266 /**
267 * Updates the given measurement statistics.
268 *
269 * @param measurementStatistics - The measurement statistics to update.
270 * @param measurementRequirements - The measurement statistics requirements.
271 * @param measurementValue - The measurement value.
272 * @internal
273 */
274 const updateMeasurementStatistics = (
275 measurementStatistics: MeasurementStatistics,
276 measurementRequirements: MeasurementStatisticsRequirements | undefined,
277 measurementValue: number | undefined
278 ): void => {
279 if (
280 measurementRequirements != null &&
281 measurementValue != null &&
282 measurementRequirements.aggregate
283 ) {
284 measurementStatistics.aggregate =
285 (measurementStatistics.aggregate ?? 0) + measurementValue
286 measurementStatistics.minimum = min(
287 measurementValue,
288 measurementStatistics.minimum ?? Infinity
289 )
290 measurementStatistics.maximum = max(
291 measurementValue,
292 measurementStatistics.maximum ?? -Infinity
293 )
294 if (measurementRequirements.average || measurementRequirements.median) {
295 measurementStatistics.history.push(measurementValue)
296 if (measurementRequirements.average) {
297 measurementStatistics.average = average(measurementStatistics.history)
298 } else if (measurementStatistics.average != null) {
299 delete measurementStatistics.average
300 }
301 if (measurementRequirements.median) {
302 measurementStatistics.median = median(measurementStatistics.history)
303 } else if (measurementStatistics.median != null) {
304 delete measurementStatistics.median
305 }
306 }
307 }
308 }
309 if (env.NODE_ENV === 'test') {
310 // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
311 exports.updateMeasurementStatistics = updateMeasurementStatistics
312 }
313
314 export const updateWaitTimeWorkerUsage = <
315 Worker extends IWorker,
316 Data = unknown,
317 Response = unknown
318 >(
319 workerChoiceStrategyContext:
320 | WorkerChoiceStrategyContext<Worker, Data, Response>
321 | undefined,
322 workerUsage: WorkerUsage,
323 task: Task<Data>
324 ): void => {
325 const timestamp = performance.now()
326 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
327 updateMeasurementStatistics(
328 workerUsage.waitTime,
329 workerChoiceStrategyContext?.getTaskStatisticsRequirements().waitTime,
330 taskWaitTime
331 )
332 }
333
334 export const updateTaskStatisticsWorkerUsage = <Response = unknown>(
335 workerUsage: WorkerUsage,
336 message: MessageValue<Response>
337 ): void => {
338 const workerTaskStatistics = workerUsage.tasks
339 if (
340 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
341 workerTaskStatistics.executing != null &&
342 workerTaskStatistics.executing > 0
343 ) {
344 --workerTaskStatistics.executing
345 }
346 if (message.workerError == null) {
347 ++workerTaskStatistics.executed
348 } else {
349 ++workerTaskStatistics.failed
350 }
351 }
352
353 export const updateRunTimeWorkerUsage = <
354 Worker extends IWorker,
355 Data = unknown,
356 Response = unknown
357 >(
358 workerChoiceStrategyContext:
359 | WorkerChoiceStrategyContext<Worker, Data, Response>
360 | undefined,
361 workerUsage: WorkerUsage,
362 message: MessageValue<Response>
363 ): void => {
364 if (message.workerError != null) {
365 return
366 }
367 updateMeasurementStatistics(
368 workerUsage.runTime,
369 workerChoiceStrategyContext?.getTaskStatisticsRequirements().runTime,
370 message.taskPerformance?.runTime ?? 0
371 )
372 }
373
374 export const updateEluWorkerUsage = <
375 Worker extends IWorker,
376 Data = unknown,
377 Response = unknown
378 >(
379 workerChoiceStrategyContext:
380 | WorkerChoiceStrategyContext<Worker, Data, Response>
381 | undefined,
382 workerUsage: WorkerUsage,
383 message: MessageValue<Response>
384 ): void => {
385 if (message.workerError != null) {
386 return
387 }
388 const eluTaskStatisticsRequirements =
389 workerChoiceStrategyContext?.getTaskStatisticsRequirements().elu
390 updateMeasurementStatistics(
391 workerUsage.elu.active,
392 eluTaskStatisticsRequirements,
393 message.taskPerformance?.elu?.active ?? 0
394 )
395 updateMeasurementStatistics(
396 workerUsage.elu.idle,
397 eluTaskStatisticsRequirements,
398 message.taskPerformance?.elu?.idle ?? 0
399 )
400 if (eluTaskStatisticsRequirements?.aggregate === true) {
401 if (message.taskPerformance?.elu != null) {
402 if (workerUsage.elu.utilization != null) {
403 workerUsage.elu.utilization =
404 (workerUsage.elu.utilization +
405 message.taskPerformance.elu.utilization) /
406 2
407 } else {
408 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
409 }
410 }
411 }
412 }
413
414 export const createWorker = <Worker extends IWorker>(
415 type: WorkerType,
416 filePath: string,
417 opts: { env?: Record<string, unknown>, workerOptions?: WorkerOptions }
418 ): Worker => {
419 switch (type) {
420 case WorkerTypes.thread:
421 return new ThreadWorker(filePath, {
422 env: SHARE_ENV,
423 ...opts.workerOptions
424 }) as unknown as Worker
425 case WorkerTypes.cluster:
426 return cluster.fork(opts.env) as unknown as Worker
427 default:
428 // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
429 throw new Error(`Unknown worker type '${type}'`)
430 }
431 }
432
433 /**
434 * Returns the worker type of the given worker.
435 *
436 * @param worker - The worker to get the type of.
437 * @returns The worker type of the given worker.
438 * @internal
439 */
440 export const getWorkerType = (worker: IWorker): WorkerType | undefined => {
441 if (worker instanceof ThreadWorker) {
442 return WorkerTypes.thread
443 } else if (worker instanceof ClusterWorker) {
444 return WorkerTypes.cluster
445 }
446 }
447
448 /**
449 * Returns the worker id of the given worker.
450 *
451 * @param worker - The worker to get the id of.
452 * @returns The worker id of the given worker.
453 * @internal
454 */
455 export const getWorkerId = (worker: IWorker): number | undefined => {
456 if (worker instanceof ThreadWorker) {
457 return worker.threadId
458 } else if (worker instanceof ClusterWorker) {
459 return worker.id
460 }
461 }
462
463 export const waitWorkerNodeEvents = async <
464 Worker extends IWorker,
465 Data = unknown
466 >(
467 workerNode: IWorkerNode<Worker, Data>,
468 workerNodeEvent: string,
469 numberOfEventsToWait: number,
470 timeout: number
471 ): Promise<number> => {
472 return await new Promise<number>(resolve => {
473 let events = 0
474 if (numberOfEventsToWait === 0) {
475 resolve(events)
476 return
477 }
478 workerNode.on(workerNodeEvent, () => {
479 ++events
480 if (events === numberOfEventsToWait) {
481 resolve(events)
482 }
483 })
484 if (timeout >= 0) {
485 setTimeout(() => {
486 resolve(events)
487 }, timeout)
488 }
489 })
490 }