fix: use estimated cpu speed instead of random one
[poolifier.git] / src / pools / utils.ts
CommitLineData
e9ed6eee 1import cluster, { Worker as ClusterWorker } from 'node:cluster'
ded253e2
JB
2import { existsSync } from 'node:fs'
3import { cpus } from 'node:os'
4import { env } from 'node:process'
e9ed6eee
JB
5import {
6 SHARE_ENV,
7 Worker as ThreadWorker,
8 type WorkerOptions
9} from 'node:worker_threads'
ded253e2 10
d35e5717 11import type { MessageValue, Task } from '../utility-types.js'
ded253e2
JB
12import { average, isPlainObject, max, median, min } from '../utils.js'
13import type { IPool, TasksQueueOptions } from './pool.js'
bde6b5d7 14import {
bfc75cca 15 type MeasurementStatisticsRequirements,
bde6b5d7 16 WorkerChoiceStrategies,
e9ed6eee
JB
17 type WorkerChoiceStrategy,
18 type WorkerChoiceStrategyOptions
d35e5717 19} from './selection-strategies/selection-strategies-types.js'
ded253e2 20import type { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context.js'
c3719753
JB
21import {
22 type IWorker,
d41a44de 23 type IWorkerNode,
c3719753
JB
24 type MeasurementStatistics,
25 type WorkerNodeOptions,
26 type WorkerType,
c329fd41
JB
27 WorkerTypes,
28 type WorkerUsage
d35e5717 29} from './worker.js'
bde6b5d7 30
e9ed6eee
JB
31/**
32 * Default measurement statistics requirements.
33 */
34export const DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS: MeasurementStatisticsRequirements =
35 {
36 aggregate: false,
37 average: false,
38 median: false
39 }
40
32b141fd
JB
41export const getDefaultTasksQueueOptions = (
42 poolMaxSize: number
43): Required<TasksQueueOptions> => {
44 return {
45 size: Math.pow(poolMaxSize, 2),
46 concurrency: 1,
47 taskStealing: true,
48 tasksStealingOnBackPressure: true,
568d0075 49 tasksFinishedTimeout: 2000
32b141fd
JB
50 }
51}
52
e9ed6eee
JB
53export const getWorkerChoiceStrategyRetries = <
54 Worker extends IWorker,
55 Data,
56 Response
57>(
58 pool: IPool<Worker, Data, Response>,
59 opts?: WorkerChoiceStrategyOptions
60 ): number => {
61 return (
62 pool.info.maxSize +
63 Object.keys(opts?.weights ?? getDefaultWeights(pool.info.maxSize)).length
64 )
65}
66
67export const buildWorkerChoiceStrategyOptions = <
68 Worker extends IWorker,
69 Data,
70 Response
71>(
72 pool: IPool<Worker, Data, Response>,
73 opts?: WorkerChoiceStrategyOptions
74 ): WorkerChoiceStrategyOptions => {
75 opts = clone(opts ?? {})
76 opts.weights = opts.weights ?? getDefaultWeights(pool.info.maxSize)
77 return {
78 ...{
79 runTime: { median: false },
80 waitTime: { median: false },
81 elu: { median: false }
82 },
83 ...opts
84 }
85}
86
87const clone = <T>(object: T): T => {
88 return structuredClone<T>(object)
89}
90
91const getDefaultWeights = (
92 poolMaxSize: number,
93 defaultWorkerWeight?: number
94): Record<number, number> => {
95 defaultWorkerWeight = defaultWorkerWeight ?? getDefaultWorkerWeight()
96 const weights: Record<number, number> = {}
97 for (let workerNodeKey = 0; workerNodeKey < poolMaxSize; workerNodeKey++) {
98 weights[workerNodeKey] = defaultWorkerWeight
99 }
100 return weights
101}
102
10c74f8a
JB
103const estimatedCpuSpeed = (): number => {
104 const runs = 150000000
105 const begin = performance.now()
106 // eslint-disable-next-line no-empty
107 for (let i = runs; i > 0; i--) {}
108 const end = performance.now()
109 const duration = end - begin
110 return Math.trunc(runs / duration / 1000) // in MHz
111}
112
113const estCpuSpeed = estimatedCpuSpeed()
114
115const getDefaultWorkerWeight = (estimatedCpuSpeed = estCpuSpeed): number => {
e9ed6eee
JB
116 let cpusCycleTimeWeight = 0
117 for (const cpu of cpus()) {
118 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
119 if (cpu.speed == null || cpu.speed === 0) {
120 cpu.speed =
121 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
122 cpus().find(cpu => cpu.speed != null && cpu.speed !== 0)?.speed ??
10c74f8a 123 estimatedCpuSpeed
e9ed6eee
JB
124 }
125 // CPU estimated cycle time
126 const numberOfDigits = cpu.speed.toString().length - 1
127 const cpuCycleTime = 1 / (cpu.speed / Math.pow(10, numberOfDigits))
128 cpusCycleTimeWeight += cpuCycleTime * Math.pow(10, numberOfDigits)
129 }
130 return Math.round(cpusCycleTimeWeight / cpus().length)
131}
132
c63a35a0 133export const checkFilePath = (filePath: string | undefined): void => {
c3719753
JB
134 if (filePath == null) {
135 throw new TypeError('The worker file path must be specified')
136 }
137 if (typeof filePath !== 'string') {
138 throw new TypeError('The worker file path must be a string')
139 }
bde6b5d7
JB
140 if (!existsSync(filePath)) {
141 throw new Error(`Cannot find the worker file '${filePath}'`)
142 }
143}
144
c63a35a0
JB
145export const checkDynamicPoolSize = (
146 min: number,
147 max: number | undefined
148): void => {
bde6b5d7
JB
149 if (max == null) {
150 throw new TypeError(
151 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
152 )
153 } else if (!Number.isSafeInteger(max)) {
154 throw new TypeError(
155 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
156 )
157 } else if (min > max) {
158 throw new RangeError(
159 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
160 )
161 } else if (max === 0) {
162 throw new RangeError(
163 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
164 )
165 } else if (min === max) {
166 throw new RangeError(
167 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
168 )
169 }
170}
171
172export const checkValidWorkerChoiceStrategy = (
c63a35a0 173 workerChoiceStrategy: WorkerChoiceStrategy | undefined
bde6b5d7
JB
174): void => {
175 if (
176 workerChoiceStrategy != null &&
177 !Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)
178 ) {
179 throw new Error(`Invalid worker choice strategy '${workerChoiceStrategy}'`)
180 }
181}
182
183export const checkValidTasksQueueOptions = (
c63a35a0 184 tasksQueueOptions: TasksQueueOptions | undefined
bde6b5d7
JB
185): void => {
186 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
187 throw new TypeError('Invalid tasks queue options: must be a plain object')
188 }
189 if (
190 tasksQueueOptions?.concurrency != null &&
191 !Number.isSafeInteger(tasksQueueOptions.concurrency)
192 ) {
193 throw new TypeError(
194 'Invalid worker node tasks concurrency: must be an integer'
195 )
196 }
197 if (
198 tasksQueueOptions?.concurrency != null &&
199 tasksQueueOptions.concurrency <= 0
200 ) {
201 throw new RangeError(
202 `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero`
203 )
204 }
205 if (
206 tasksQueueOptions?.size != null &&
207 !Number.isSafeInteger(tasksQueueOptions.size)
208 ) {
209 throw new TypeError(
210 'Invalid worker node tasks queue size: must be an integer'
211 )
212 }
213 if (tasksQueueOptions?.size != null && tasksQueueOptions.size <= 0) {
214 throw new RangeError(
215 `Invalid worker node tasks queue size: ${tasksQueueOptions.size} is a negative integer or zero`
216 )
217 }
218}
bfc75cca 219
c3719753 220export const checkWorkerNodeArguments = (
c63a35a0
JB
221 type: WorkerType | undefined,
222 filePath: string | undefined,
223 opts: WorkerNodeOptions | undefined
9a38f99e 224): void => {
c3719753
JB
225 if (type == null) {
226 throw new TypeError('Cannot construct a worker node without a worker type')
227 }
228 if (!Object.values(WorkerTypes).includes(type)) {
229 throw new TypeError(
230 `Cannot construct a worker node with an invalid worker type '${type}'`
231 )
9a38f99e 232 }
c3719753
JB
233 checkFilePath(filePath)
234 if (opts == null) {
9a38f99e 235 throw new TypeError(
c3719753 236 'Cannot construct a worker node without worker node options'
9a38f99e
JB
237 )
238 }
9974369e 239 if (!isPlainObject(opts)) {
9a38f99e 240 throw new TypeError(
c3719753 241 'Cannot construct a worker node with invalid options: must be a plain object'
9a38f99e
JB
242 )
243 }
c3719753
JB
244 if (opts.tasksQueueBackPressureSize == null) {
245 throw new TypeError(
246 'Cannot construct a worker node without a tasks queue back pressure size option'
247 )
248 }
249 if (!Number.isSafeInteger(opts.tasksQueueBackPressureSize)) {
250 throw new TypeError(
251 'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer'
252 )
253 }
254 if (opts.tasksQueueBackPressureSize <= 0) {
9a38f99e 255 throw new RangeError(
c3719753 256 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer'
9a38f99e
JB
257 )
258 }
259}
bfc75cca
JB
260
261/**
262 * Updates the given measurement statistics.
263 *
264 * @param measurementStatistics - The measurement statistics to update.
265 * @param measurementRequirements - The measurement statistics requirements.
266 * @param measurementValue - The measurement value.
bfc75cca
JB
267 * @internal
268 */
c329fd41 269const updateMeasurementStatistics = (
bfc75cca 270 measurementStatistics: MeasurementStatistics,
c63a35a0
JB
271 measurementRequirements: MeasurementStatisticsRequirements | undefined,
272 measurementValue: number | undefined
bfc75cca 273): void => {
c63a35a0
JB
274 if (
275 measurementRequirements != null &&
276 measurementValue != null &&
277 measurementRequirements.aggregate
278 ) {
bfc75cca
JB
279 measurementStatistics.aggregate =
280 (measurementStatistics.aggregate ?? 0) + measurementValue
281 measurementStatistics.minimum = min(
282 measurementValue,
283 measurementStatistics.minimum ?? Infinity
284 )
285 measurementStatistics.maximum = max(
286 measurementValue,
287 measurementStatistics.maximum ?? -Infinity
288 )
c63a35a0 289 if (measurementRequirements.average || measurementRequirements.median) {
bfc75cca
JB
290 measurementStatistics.history.push(measurementValue)
291 if (measurementRequirements.average) {
292 measurementStatistics.average = average(measurementStatistics.history)
293 } else if (measurementStatistics.average != null) {
294 delete measurementStatistics.average
295 }
296 if (measurementRequirements.median) {
297 measurementStatistics.median = median(measurementStatistics.history)
298 } else if (measurementStatistics.median != null) {
299 delete measurementStatistics.median
300 }
301 }
302 }
303}
c329fd41
JB
304if (env.NODE_ENV === 'test') {
305 // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
306 exports.updateMeasurementStatistics = updateMeasurementStatistics
307}
308
309export const updateWaitTimeWorkerUsage = <
310 Worker extends IWorker,
311 Data = unknown,
312 Response = unknown
313>(
c63a35a0
JB
314 workerChoiceStrategyContext:
315 | WorkerChoiceStrategyContext<Worker, Data, Response>
316 | undefined,
c329fd41
JB
317 workerUsage: WorkerUsage,
318 task: Task<Data>
319 ): void => {
320 const timestamp = performance.now()
321 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
322 updateMeasurementStatistics(
323 workerUsage.waitTime,
f4d0a470 324 workerChoiceStrategyContext?.getTaskStatisticsRequirements().waitTime,
c329fd41
JB
325 taskWaitTime
326 )
327}
328
329export const updateTaskStatisticsWorkerUsage = <Response = unknown>(
330 workerUsage: WorkerUsage,
331 message: MessageValue<Response>
332): void => {
333 const workerTaskStatistics = workerUsage.tasks
334 if (
c63a35a0 335 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
c329fd41
JB
336 workerTaskStatistics.executing != null &&
337 workerTaskStatistics.executing > 0
338 ) {
339 --workerTaskStatistics.executing
340 }
341 if (message.workerError == null) {
342 ++workerTaskStatistics.executed
343 } else {
344 ++workerTaskStatistics.failed
345 }
346}
347
348export const updateRunTimeWorkerUsage = <
349 Worker extends IWorker,
350 Data = unknown,
351 Response = unknown
352>(
c63a35a0
JB
353 workerChoiceStrategyContext:
354 | WorkerChoiceStrategyContext<Worker, Data, Response>
355 | undefined,
c329fd41
JB
356 workerUsage: WorkerUsage,
357 message: MessageValue<Response>
358 ): void => {
359 if (message.workerError != null) {
360 return
361 }
362 updateMeasurementStatistics(
363 workerUsage.runTime,
f4d0a470 364 workerChoiceStrategyContext?.getTaskStatisticsRequirements().runTime,
c329fd41
JB
365 message.taskPerformance?.runTime ?? 0
366 )
367}
368
369export const updateEluWorkerUsage = <
370 Worker extends IWorker,
371 Data = unknown,
372 Response = unknown
373>(
c63a35a0
JB
374 workerChoiceStrategyContext:
375 | WorkerChoiceStrategyContext<Worker, Data, Response>
376 | undefined,
c329fd41
JB
377 workerUsage: WorkerUsage,
378 message: MessageValue<Response>
379 ): void => {
380 if (message.workerError != null) {
381 return
382 }
c63a35a0 383 const eluTaskStatisticsRequirements =
f4d0a470 384 workerChoiceStrategyContext?.getTaskStatisticsRequirements().elu
c329fd41
JB
385 updateMeasurementStatistics(
386 workerUsage.elu.active,
387 eluTaskStatisticsRequirements,
388 message.taskPerformance?.elu?.active ?? 0
389 )
390 updateMeasurementStatistics(
391 workerUsage.elu.idle,
392 eluTaskStatisticsRequirements,
393 message.taskPerformance?.elu?.idle ?? 0
394 )
c63a35a0 395 if (eluTaskStatisticsRequirements?.aggregate === true) {
c329fd41
JB
396 if (message.taskPerformance?.elu != null) {
397 if (workerUsage.elu.utilization != null) {
398 workerUsage.elu.utilization =
399 (workerUsage.elu.utilization +
400 message.taskPerformance.elu.utilization) /
401 2
402 } else {
403 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
404 }
405 }
406 }
407}
c3719753
JB
408
409export const createWorker = <Worker extends IWorker>(
410 type: WorkerType,
411 filePath: string,
412 opts: { env?: Record<string, unknown>, workerOptions?: WorkerOptions }
413): Worker => {
414 switch (type) {
415 case WorkerTypes.thread:
e9ed6eee 416 return new ThreadWorker(filePath, {
c3719753 417 env: SHARE_ENV,
c63a35a0 418 ...opts.workerOptions
c3719753
JB
419 }) as unknown as Worker
420 case WorkerTypes.cluster:
c63a35a0 421 return cluster.fork(opts.env) as unknown as Worker
c3719753
JB
422 default:
423 // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
424 throw new Error(`Unknown worker type '${type}'`)
425 }
426}
d41a44de 427
e9ed6eee
JB
428/**
429 * Returns the worker type of the given worker.
430 *
431 * @param worker - The worker to get the type of.
432 * @returns The worker type of the given worker.
433 * @internal
434 */
435export const getWorkerType = (worker: IWorker): WorkerType | undefined => {
436 if (worker instanceof ThreadWorker) {
437 return WorkerTypes.thread
438 } else if (worker instanceof ClusterWorker) {
439 return WorkerTypes.cluster
440 }
441}
442
443/**
444 * Returns the worker id of the given worker.
445 *
446 * @param worker - The worker to get the id of.
447 * @returns The worker id of the given worker.
448 * @internal
449 */
450export const getWorkerId = (worker: IWorker): number | undefined => {
451 if (worker instanceof ThreadWorker) {
452 return worker.threadId
453 } else if (worker instanceof ClusterWorker) {
454 return worker.id
455 }
456}
457
d41a44de
JB
458export const waitWorkerNodeEvents = async <
459 Worker extends IWorker,
460 Data = unknown
461>(
462 workerNode: IWorkerNode<Worker, Data>,
463 workerNodeEvent: string,
32b141fd
JB
464 numberOfEventsToWait: number,
465 timeout: number
d41a44de
JB
466): Promise<number> => {
467 return await new Promise<number>(resolve => {
468 let events = 0
469 if (numberOfEventsToWait === 0) {
470 resolve(events)
471 return
472 }
473 workerNode.on(workerNodeEvent, () => {
474 ++events
475 if (events === numberOfEventsToWait) {
476 resolve(events)
477 }
478 })
6f3a391b 479 if (timeout >= 0) {
32b141fd
JB
480 setTimeout(() => {
481 resolve(events)
482 }, timeout)
483 }
d41a44de
JB
484 })
485}