perf: remove unneeded estimated cpu speed computation
[poolifier.git] / src / pools / utils.ts
CommitLineData
e9ed6eee 1import cluster, { Worker as ClusterWorker } from 'node:cluster'
ded253e2
JB
2import { existsSync } from 'node:fs'
3import { cpus } from 'node:os'
4import { env } from 'node:process'
e9ed6eee
JB
5import {
6 SHARE_ENV,
7 Worker as ThreadWorker,
8 type WorkerOptions
9} from 'node:worker_threads'
ded253e2 10
d35e5717 11import type { MessageValue, Task } from '../utility-types.js'
ded253e2
JB
12import { average, isPlainObject, max, median, min } from '../utils.js'
13import type { IPool, TasksQueueOptions } from './pool.js'
bde6b5d7 14import {
bfc75cca 15 type MeasurementStatisticsRequirements,
bde6b5d7 16 WorkerChoiceStrategies,
e9ed6eee
JB
17 type WorkerChoiceStrategy,
18 type WorkerChoiceStrategyOptions
d35e5717 19} from './selection-strategies/selection-strategies-types.js'
ded253e2 20import type { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context.js'
c3719753
JB
21import {
22 type IWorker,
d41a44de 23 type IWorkerNode,
c3719753
JB
24 type MeasurementStatistics,
25 type WorkerNodeOptions,
26 type WorkerType,
c329fd41
JB
27 WorkerTypes,
28 type WorkerUsage
d35e5717 29} from './worker.js'
bde6b5d7 30
e9ed6eee
JB
31/**
32 * Default measurement statistics requirements.
33 */
34export const DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS: MeasurementStatisticsRequirements =
35 {
36 aggregate: false,
37 average: false,
38 median: false
39 }
40
32b141fd
JB
41export const getDefaultTasksQueueOptions = (
42 poolMaxSize: number
43): Required<TasksQueueOptions> => {
44 return {
45 size: Math.pow(poolMaxSize, 2),
46 concurrency: 1,
47 taskStealing: true,
48 tasksStealingOnBackPressure: true,
568d0075 49 tasksFinishedTimeout: 2000
32b141fd
JB
50 }
51}
52
e9ed6eee
JB
53export const getWorkerChoiceStrategyRetries = <
54 Worker extends IWorker,
55 Data,
56 Response
57>(
58 pool: IPool<Worker, Data, Response>,
59 opts?: WorkerChoiceStrategyOptions
60 ): number => {
61 return (
62 pool.info.maxSize +
63 Object.keys(opts?.weights ?? getDefaultWeights(pool.info.maxSize)).length
64 )
65}
66
67export const buildWorkerChoiceStrategyOptions = <
68 Worker extends IWorker,
69 Data,
70 Response
71>(
72 pool: IPool<Worker, Data, Response>,
73 opts?: WorkerChoiceStrategyOptions
74 ): WorkerChoiceStrategyOptions => {
75 opts = clone(opts ?? {})
76 opts.weights = opts.weights ?? getDefaultWeights(pool.info.maxSize)
77 return {
78 ...{
79 runTime: { median: false },
80 waitTime: { median: false },
81 elu: { median: false }
82 },
83 ...opts
84 }
85}
86
87const clone = <T>(object: T): T => {
88 return structuredClone<T>(object)
89}
90
91const getDefaultWeights = (
92 poolMaxSize: number,
93 defaultWorkerWeight?: number
94): Record<number, number> => {
95 defaultWorkerWeight = defaultWorkerWeight ?? getDefaultWorkerWeight()
96 const weights: Record<number, number> = {}
97 for (let workerNodeKey = 0; workerNodeKey < poolMaxSize; workerNodeKey++) {
98 weights[workerNodeKey] = defaultWorkerWeight
99 }
100 return weights
101}
102
bbbf303c 103const getDefaultWorkerWeight = (): number => {
e9ed6eee
JB
104 let cpusCycleTimeWeight = 0
105 for (const cpu of cpus()) {
106 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
107 if (cpu.speed == null || cpu.speed === 0) {
108 cpu.speed =
bbbf303c
JB
109 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition, @typescript-eslint/no-non-null-assertion,
110 cpus().find(cpu => cpu.speed != null && cpu.speed !== 0)!.speed
e9ed6eee
JB
111 }
112 // CPU estimated cycle time
113 const numberOfDigits = cpu.speed.toString().length - 1
114 const cpuCycleTime = 1 / (cpu.speed / Math.pow(10, numberOfDigits))
115 cpusCycleTimeWeight += cpuCycleTime * Math.pow(10, numberOfDigits)
116 }
117 return Math.round(cpusCycleTimeWeight / cpus().length)
118}
119
c63a35a0 120export const checkFilePath = (filePath: string | undefined): void => {
c3719753
JB
121 if (filePath == null) {
122 throw new TypeError('The worker file path must be specified')
123 }
124 if (typeof filePath !== 'string') {
125 throw new TypeError('The worker file path must be a string')
126 }
bde6b5d7
JB
127 if (!existsSync(filePath)) {
128 throw new Error(`Cannot find the worker file '${filePath}'`)
129 }
130}
131
c63a35a0
JB
132export const checkDynamicPoolSize = (
133 min: number,
134 max: number | undefined
135): void => {
bde6b5d7
JB
136 if (max == null) {
137 throw new TypeError(
138 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
139 )
140 } else if (!Number.isSafeInteger(max)) {
141 throw new TypeError(
142 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
143 )
144 } else if (min > max) {
145 throw new RangeError(
146 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
147 )
148 } else if (max === 0) {
149 throw new RangeError(
150 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
151 )
152 } else if (min === max) {
153 throw new RangeError(
154 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
155 )
156 }
157}
158
159export const checkValidWorkerChoiceStrategy = (
c63a35a0 160 workerChoiceStrategy: WorkerChoiceStrategy | undefined
bde6b5d7
JB
161): void => {
162 if (
163 workerChoiceStrategy != null &&
164 !Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)
165 ) {
166 throw new Error(`Invalid worker choice strategy '${workerChoiceStrategy}'`)
167 }
168}
169
170export const checkValidTasksQueueOptions = (
c63a35a0 171 tasksQueueOptions: TasksQueueOptions | undefined
bde6b5d7
JB
172): void => {
173 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
174 throw new TypeError('Invalid tasks queue options: must be a plain object')
175 }
176 if (
177 tasksQueueOptions?.concurrency != null &&
178 !Number.isSafeInteger(tasksQueueOptions.concurrency)
179 ) {
180 throw new TypeError(
181 'Invalid worker node tasks concurrency: must be an integer'
182 )
183 }
184 if (
185 tasksQueueOptions?.concurrency != null &&
186 tasksQueueOptions.concurrency <= 0
187 ) {
188 throw new RangeError(
189 `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero`
190 )
191 }
192 if (
193 tasksQueueOptions?.size != null &&
194 !Number.isSafeInteger(tasksQueueOptions.size)
195 ) {
196 throw new TypeError(
197 'Invalid worker node tasks queue size: must be an integer'
198 )
199 }
200 if (tasksQueueOptions?.size != null && tasksQueueOptions.size <= 0) {
201 throw new RangeError(
202 `Invalid worker node tasks queue size: ${tasksQueueOptions.size} is a negative integer or zero`
203 )
204 }
205}
bfc75cca 206
c3719753 207export const checkWorkerNodeArguments = (
c63a35a0
JB
208 type: WorkerType | undefined,
209 filePath: string | undefined,
210 opts: WorkerNodeOptions | undefined
9a38f99e 211): void => {
c3719753
JB
212 if (type == null) {
213 throw new TypeError('Cannot construct a worker node without a worker type')
214 }
215 if (!Object.values(WorkerTypes).includes(type)) {
216 throw new TypeError(
217 `Cannot construct a worker node with an invalid worker type '${type}'`
218 )
9a38f99e 219 }
c3719753
JB
220 checkFilePath(filePath)
221 if (opts == null) {
9a38f99e 222 throw new TypeError(
c3719753 223 'Cannot construct a worker node without worker node options'
9a38f99e
JB
224 )
225 }
9974369e 226 if (!isPlainObject(opts)) {
9a38f99e 227 throw new TypeError(
c3719753 228 'Cannot construct a worker node with invalid options: must be a plain object'
9a38f99e
JB
229 )
230 }
c3719753
JB
231 if (opts.tasksQueueBackPressureSize == null) {
232 throw new TypeError(
233 'Cannot construct a worker node without a tasks queue back pressure size option'
234 )
235 }
236 if (!Number.isSafeInteger(opts.tasksQueueBackPressureSize)) {
237 throw new TypeError(
238 'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer'
239 )
240 }
241 if (opts.tasksQueueBackPressureSize <= 0) {
9a38f99e 242 throw new RangeError(
c3719753 243 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer'
9a38f99e
JB
244 )
245 }
246}
bfc75cca
JB
247
248/**
249 * Updates the given measurement statistics.
250 *
251 * @param measurementStatistics - The measurement statistics to update.
252 * @param measurementRequirements - The measurement statistics requirements.
253 * @param measurementValue - The measurement value.
bfc75cca
JB
254 * @internal
255 */
c329fd41 256const updateMeasurementStatistics = (
bfc75cca 257 measurementStatistics: MeasurementStatistics,
c63a35a0
JB
258 measurementRequirements: MeasurementStatisticsRequirements | undefined,
259 measurementValue: number | undefined
bfc75cca 260): void => {
c63a35a0
JB
261 if (
262 measurementRequirements != null &&
263 measurementValue != null &&
264 measurementRequirements.aggregate
265 ) {
bfc75cca
JB
266 measurementStatistics.aggregate =
267 (measurementStatistics.aggregate ?? 0) + measurementValue
268 measurementStatistics.minimum = min(
269 measurementValue,
270 measurementStatistics.minimum ?? Infinity
271 )
272 measurementStatistics.maximum = max(
273 measurementValue,
274 measurementStatistics.maximum ?? -Infinity
275 )
c63a35a0 276 if (measurementRequirements.average || measurementRequirements.median) {
bfc75cca
JB
277 measurementStatistics.history.push(measurementValue)
278 if (measurementRequirements.average) {
279 measurementStatistics.average = average(measurementStatistics.history)
280 } else if (measurementStatistics.average != null) {
281 delete measurementStatistics.average
282 }
283 if (measurementRequirements.median) {
284 measurementStatistics.median = median(measurementStatistics.history)
285 } else if (measurementStatistics.median != null) {
286 delete measurementStatistics.median
287 }
288 }
289 }
290}
c329fd41
JB
291if (env.NODE_ENV === 'test') {
292 // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
293 exports.updateMeasurementStatistics = updateMeasurementStatistics
294}
295
296export const updateWaitTimeWorkerUsage = <
297 Worker extends IWorker,
298 Data = unknown,
299 Response = unknown
300>(
c63a35a0
JB
301 workerChoiceStrategyContext:
302 | WorkerChoiceStrategyContext<Worker, Data, Response>
303 | undefined,
c329fd41
JB
304 workerUsage: WorkerUsage,
305 task: Task<Data>
306 ): void => {
307 const timestamp = performance.now()
308 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
309 updateMeasurementStatistics(
310 workerUsage.waitTime,
f4d0a470 311 workerChoiceStrategyContext?.getTaskStatisticsRequirements().waitTime,
c329fd41
JB
312 taskWaitTime
313 )
314}
315
316export const updateTaskStatisticsWorkerUsage = <Response = unknown>(
317 workerUsage: WorkerUsage,
318 message: MessageValue<Response>
319): void => {
320 const workerTaskStatistics = workerUsage.tasks
321 if (
c63a35a0 322 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
c329fd41
JB
323 workerTaskStatistics.executing != null &&
324 workerTaskStatistics.executing > 0
325 ) {
326 --workerTaskStatistics.executing
327 }
328 if (message.workerError == null) {
329 ++workerTaskStatistics.executed
330 } else {
331 ++workerTaskStatistics.failed
332 }
333}
334
335export const updateRunTimeWorkerUsage = <
336 Worker extends IWorker,
337 Data = unknown,
338 Response = unknown
339>(
c63a35a0
JB
340 workerChoiceStrategyContext:
341 | WorkerChoiceStrategyContext<Worker, Data, Response>
342 | undefined,
c329fd41
JB
343 workerUsage: WorkerUsage,
344 message: MessageValue<Response>
345 ): void => {
346 if (message.workerError != null) {
347 return
348 }
349 updateMeasurementStatistics(
350 workerUsage.runTime,
f4d0a470 351 workerChoiceStrategyContext?.getTaskStatisticsRequirements().runTime,
c329fd41
JB
352 message.taskPerformance?.runTime ?? 0
353 )
354}
355
356export const updateEluWorkerUsage = <
357 Worker extends IWorker,
358 Data = unknown,
359 Response = unknown
360>(
c63a35a0
JB
361 workerChoiceStrategyContext:
362 | WorkerChoiceStrategyContext<Worker, Data, Response>
363 | undefined,
c329fd41
JB
364 workerUsage: WorkerUsage,
365 message: MessageValue<Response>
366 ): void => {
367 if (message.workerError != null) {
368 return
369 }
c63a35a0 370 const eluTaskStatisticsRequirements =
f4d0a470 371 workerChoiceStrategyContext?.getTaskStatisticsRequirements().elu
c329fd41
JB
372 updateMeasurementStatistics(
373 workerUsage.elu.active,
374 eluTaskStatisticsRequirements,
375 message.taskPerformance?.elu?.active ?? 0
376 )
377 updateMeasurementStatistics(
378 workerUsage.elu.idle,
379 eluTaskStatisticsRequirements,
380 message.taskPerformance?.elu?.idle ?? 0
381 )
c63a35a0 382 if (eluTaskStatisticsRequirements?.aggregate === true) {
c329fd41
JB
383 if (message.taskPerformance?.elu != null) {
384 if (workerUsage.elu.utilization != null) {
385 workerUsage.elu.utilization =
386 (workerUsage.elu.utilization +
387 message.taskPerformance.elu.utilization) /
388 2
389 } else {
390 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
391 }
392 }
393 }
394}
c3719753
JB
395
396export const createWorker = <Worker extends IWorker>(
397 type: WorkerType,
398 filePath: string,
399 opts: { env?: Record<string, unknown>, workerOptions?: WorkerOptions }
400): Worker => {
401 switch (type) {
402 case WorkerTypes.thread:
e9ed6eee 403 return new ThreadWorker(filePath, {
c3719753 404 env: SHARE_ENV,
c63a35a0 405 ...opts.workerOptions
c3719753
JB
406 }) as unknown as Worker
407 case WorkerTypes.cluster:
c63a35a0 408 return cluster.fork(opts.env) as unknown as Worker
c3719753
JB
409 default:
410 // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
411 throw new Error(`Unknown worker type '${type}'`)
412 }
413}
d41a44de 414
e9ed6eee
JB
415/**
416 * Returns the worker type of the given worker.
417 *
418 * @param worker - The worker to get the type of.
419 * @returns The worker type of the given worker.
420 * @internal
421 */
422export const getWorkerType = (worker: IWorker): WorkerType | undefined => {
423 if (worker instanceof ThreadWorker) {
424 return WorkerTypes.thread
425 } else if (worker instanceof ClusterWorker) {
426 return WorkerTypes.cluster
427 }
428}
429
430/**
431 * Returns the worker id of the given worker.
432 *
433 * @param worker - The worker to get the id of.
434 * @returns The worker id of the given worker.
435 * @internal
436 */
437export const getWorkerId = (worker: IWorker): number | undefined => {
438 if (worker instanceof ThreadWorker) {
439 return worker.threadId
440 } else if (worker instanceof ClusterWorker) {
441 return worker.id
442 }
443}
444
d41a44de
JB
445export const waitWorkerNodeEvents = async <
446 Worker extends IWorker,
447 Data = unknown
448>(
449 workerNode: IWorkerNode<Worker, Data>,
450 workerNodeEvent: string,
32b141fd
JB
451 numberOfEventsToWait: number,
452 timeout: number
d41a44de
JB
453): Promise<number> => {
454 return await new Promise<number>(resolve => {
455 let events = 0
456 if (numberOfEventsToWait === 0) {
457 resolve(events)
458 return
459 }
460 workerNode.on(workerNodeEvent, () => {
461 ++events
462 if (events === numberOfEventsToWait) {
463 resolve(events)
464 }
465 })
6f3a391b 466 if (timeout >= 0) {
32b141fd
JB
467 setTimeout(() => {
468 resolve(events)
469 }, timeout)
470 }
d41a44de
JB
471 })
472}