build(deps-dev): apply updates
[poolifier.git] / src / pools / utils.ts
CommitLineData
e9ed6eee 1import cluster, { Worker as ClusterWorker } from 'node:cluster'
ded253e2
JB
2import { randomInt } from 'node:crypto'
3import { existsSync } from 'node:fs'
4import { cpus } from 'node:os'
5import { env } from 'node:process'
e9ed6eee
JB
6import {
7 SHARE_ENV,
8 Worker as ThreadWorker,
9 type WorkerOptions
10} from 'node:worker_threads'
ded253e2 11
d35e5717 12import type { MessageValue, Task } from '../utility-types.js'
ded253e2
JB
13import { average, isPlainObject, max, median, min } from '../utils.js'
14import type { IPool, TasksQueueOptions } from './pool.js'
bde6b5d7 15import {
bfc75cca 16 type MeasurementStatisticsRequirements,
bde6b5d7 17 WorkerChoiceStrategies,
e9ed6eee
JB
18 type WorkerChoiceStrategy,
19 type WorkerChoiceStrategyOptions
d35e5717 20} from './selection-strategies/selection-strategies-types.js'
ded253e2 21import type { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context.js'
c3719753
JB
22import {
23 type IWorker,
d41a44de 24 type IWorkerNode,
c3719753
JB
25 type MeasurementStatistics,
26 type WorkerNodeOptions,
27 type WorkerType,
c329fd41
JB
28 WorkerTypes,
29 type WorkerUsage
d35e5717 30} from './worker.js'
bde6b5d7 31
e9ed6eee
JB
32/**
33 * Default measurement statistics requirements.
34 */
35export const DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS: MeasurementStatisticsRequirements =
36 {
37 aggregate: false,
38 average: false,
39 median: false
40 }
41
32b141fd
JB
42export const getDefaultTasksQueueOptions = (
43 poolMaxSize: number
44): Required<TasksQueueOptions> => {
45 return {
46 size: Math.pow(poolMaxSize, 2),
47 concurrency: 1,
48 taskStealing: true,
49 tasksStealingOnBackPressure: true,
568d0075 50 tasksFinishedTimeout: 2000
32b141fd
JB
51 }
52}
53
e9ed6eee
JB
54export const getWorkerChoiceStrategyRetries = <
55 Worker extends IWorker,
56 Data,
57 Response
58>(
59 pool: IPool<Worker, Data, Response>,
60 opts?: WorkerChoiceStrategyOptions
61 ): number => {
62 return (
63 pool.info.maxSize +
64 Object.keys(opts?.weights ?? getDefaultWeights(pool.info.maxSize)).length
65 )
66}
67
68export const buildWorkerChoiceStrategyOptions = <
69 Worker extends IWorker,
70 Data,
71 Response
72>(
73 pool: IPool<Worker, Data, Response>,
74 opts?: WorkerChoiceStrategyOptions
75 ): WorkerChoiceStrategyOptions => {
76 opts = clone(opts ?? {})
77 opts.weights = opts.weights ?? getDefaultWeights(pool.info.maxSize)
78 return {
79 ...{
80 runTime: { median: false },
81 waitTime: { median: false },
82 elu: { median: false }
83 },
84 ...opts
85 }
86}
87
88const clone = <T>(object: T): T => {
89 return structuredClone<T>(object)
90}
91
92const getDefaultWeights = (
93 poolMaxSize: number,
94 defaultWorkerWeight?: number
95): Record<number, number> => {
96 defaultWorkerWeight = defaultWorkerWeight ?? getDefaultWorkerWeight()
97 const weights: Record<number, number> = {}
98 for (let workerNodeKey = 0; workerNodeKey < poolMaxSize; workerNodeKey++) {
99 weights[workerNodeKey] = defaultWorkerWeight
100 }
101 return weights
102}
103
104const getDefaultWorkerWeight = (): number => {
105 const cpuSpeed = randomInt(500, 2500)
106 let cpusCycleTimeWeight = 0
107 for (const cpu of cpus()) {
108 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
109 if (cpu.speed == null || cpu.speed === 0) {
110 cpu.speed =
111 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
112 cpus().find(cpu => cpu.speed != null && cpu.speed !== 0)?.speed ??
113 cpuSpeed
114 }
115 // CPU estimated cycle time
116 const numberOfDigits = cpu.speed.toString().length - 1
117 const cpuCycleTime = 1 / (cpu.speed / Math.pow(10, numberOfDigits))
118 cpusCycleTimeWeight += cpuCycleTime * Math.pow(10, numberOfDigits)
119 }
120 return Math.round(cpusCycleTimeWeight / cpus().length)
121}
122
c63a35a0 123export const checkFilePath = (filePath: string | undefined): void => {
c3719753
JB
124 if (filePath == null) {
125 throw new TypeError('The worker file path must be specified')
126 }
127 if (typeof filePath !== 'string') {
128 throw new TypeError('The worker file path must be a string')
129 }
bde6b5d7
JB
130 if (!existsSync(filePath)) {
131 throw new Error(`Cannot find the worker file '${filePath}'`)
132 }
133}
134
c63a35a0
JB
135export const checkDynamicPoolSize = (
136 min: number,
137 max: number | undefined
138): void => {
bde6b5d7
JB
139 if (max == null) {
140 throw new TypeError(
141 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
142 )
143 } else if (!Number.isSafeInteger(max)) {
144 throw new TypeError(
145 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
146 )
147 } else if (min > max) {
148 throw new RangeError(
149 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
150 )
151 } else if (max === 0) {
152 throw new RangeError(
153 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
154 )
155 } else if (min === max) {
156 throw new RangeError(
157 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
158 )
159 }
160}
161
162export const checkValidWorkerChoiceStrategy = (
c63a35a0 163 workerChoiceStrategy: WorkerChoiceStrategy | undefined
bde6b5d7
JB
164): void => {
165 if (
166 workerChoiceStrategy != null &&
167 !Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)
168 ) {
169 throw new Error(`Invalid worker choice strategy '${workerChoiceStrategy}'`)
170 }
171}
172
173export const checkValidTasksQueueOptions = (
c63a35a0 174 tasksQueueOptions: TasksQueueOptions | undefined
bde6b5d7
JB
175): void => {
176 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
177 throw new TypeError('Invalid tasks queue options: must be a plain object')
178 }
179 if (
180 tasksQueueOptions?.concurrency != null &&
181 !Number.isSafeInteger(tasksQueueOptions.concurrency)
182 ) {
183 throw new TypeError(
184 'Invalid worker node tasks concurrency: must be an integer'
185 )
186 }
187 if (
188 tasksQueueOptions?.concurrency != null &&
189 tasksQueueOptions.concurrency <= 0
190 ) {
191 throw new RangeError(
192 `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero`
193 )
194 }
195 if (
196 tasksQueueOptions?.size != null &&
197 !Number.isSafeInteger(tasksQueueOptions.size)
198 ) {
199 throw new TypeError(
200 'Invalid worker node tasks queue size: must be an integer'
201 )
202 }
203 if (tasksQueueOptions?.size != null && tasksQueueOptions.size <= 0) {
204 throw new RangeError(
205 `Invalid worker node tasks queue size: ${tasksQueueOptions.size} is a negative integer or zero`
206 )
207 }
208}
bfc75cca 209
c3719753 210export const checkWorkerNodeArguments = (
c63a35a0
JB
211 type: WorkerType | undefined,
212 filePath: string | undefined,
213 opts: WorkerNodeOptions | undefined
9a38f99e 214): void => {
c3719753
JB
215 if (type == null) {
216 throw new TypeError('Cannot construct a worker node without a worker type')
217 }
218 if (!Object.values(WorkerTypes).includes(type)) {
219 throw new TypeError(
220 `Cannot construct a worker node with an invalid worker type '${type}'`
221 )
9a38f99e 222 }
c3719753
JB
223 checkFilePath(filePath)
224 if (opts == null) {
9a38f99e 225 throw new TypeError(
c3719753 226 'Cannot construct a worker node without worker node options'
9a38f99e
JB
227 )
228 }
9974369e 229 if (!isPlainObject(opts)) {
9a38f99e 230 throw new TypeError(
c3719753 231 'Cannot construct a worker node with invalid options: must be a plain object'
9a38f99e
JB
232 )
233 }
c3719753
JB
234 if (opts.tasksQueueBackPressureSize == null) {
235 throw new TypeError(
236 'Cannot construct a worker node without a tasks queue back pressure size option'
237 )
238 }
239 if (!Number.isSafeInteger(opts.tasksQueueBackPressureSize)) {
240 throw new TypeError(
241 'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer'
242 )
243 }
244 if (opts.tasksQueueBackPressureSize <= 0) {
9a38f99e 245 throw new RangeError(
c3719753 246 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer'
9a38f99e
JB
247 )
248 }
249}
bfc75cca
JB
250
251/**
252 * Updates the given measurement statistics.
253 *
254 * @param measurementStatistics - The measurement statistics to update.
255 * @param measurementRequirements - The measurement statistics requirements.
256 * @param measurementValue - The measurement value.
bfc75cca
JB
257 * @internal
258 */
c329fd41 259const updateMeasurementStatistics = (
bfc75cca 260 measurementStatistics: MeasurementStatistics,
c63a35a0
JB
261 measurementRequirements: MeasurementStatisticsRequirements | undefined,
262 measurementValue: number | undefined
bfc75cca 263): void => {
c63a35a0
JB
264 if (
265 measurementRequirements != null &&
266 measurementValue != null &&
267 measurementRequirements.aggregate
268 ) {
bfc75cca
JB
269 measurementStatistics.aggregate =
270 (measurementStatistics.aggregate ?? 0) + measurementValue
271 measurementStatistics.minimum = min(
272 measurementValue,
273 measurementStatistics.minimum ?? Infinity
274 )
275 measurementStatistics.maximum = max(
276 measurementValue,
277 measurementStatistics.maximum ?? -Infinity
278 )
c63a35a0 279 if (measurementRequirements.average || measurementRequirements.median) {
bfc75cca
JB
280 measurementStatistics.history.push(measurementValue)
281 if (measurementRequirements.average) {
282 measurementStatistics.average = average(measurementStatistics.history)
283 } else if (measurementStatistics.average != null) {
284 delete measurementStatistics.average
285 }
286 if (measurementRequirements.median) {
287 measurementStatistics.median = median(measurementStatistics.history)
288 } else if (measurementStatistics.median != null) {
289 delete measurementStatistics.median
290 }
291 }
292 }
293}
c329fd41
JB
294if (env.NODE_ENV === 'test') {
295 // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
296 exports.updateMeasurementStatistics = updateMeasurementStatistics
297}
298
299export const updateWaitTimeWorkerUsage = <
300 Worker extends IWorker,
301 Data = unknown,
302 Response = unknown
303>(
c63a35a0
JB
304 workerChoiceStrategyContext:
305 | WorkerChoiceStrategyContext<Worker, Data, Response>
306 | undefined,
c329fd41
JB
307 workerUsage: WorkerUsage,
308 task: Task<Data>
309 ): void => {
310 const timestamp = performance.now()
311 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
312 updateMeasurementStatistics(
313 workerUsage.waitTime,
f4d0a470 314 workerChoiceStrategyContext?.getTaskStatisticsRequirements().waitTime,
c329fd41
JB
315 taskWaitTime
316 )
317}
318
319export const updateTaskStatisticsWorkerUsage = <Response = unknown>(
320 workerUsage: WorkerUsage,
321 message: MessageValue<Response>
322): void => {
323 const workerTaskStatistics = workerUsage.tasks
324 if (
c63a35a0 325 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
c329fd41
JB
326 workerTaskStatistics.executing != null &&
327 workerTaskStatistics.executing > 0
328 ) {
329 --workerTaskStatistics.executing
330 }
331 if (message.workerError == null) {
332 ++workerTaskStatistics.executed
333 } else {
334 ++workerTaskStatistics.failed
335 }
336}
337
338export const updateRunTimeWorkerUsage = <
339 Worker extends IWorker,
340 Data = unknown,
341 Response = unknown
342>(
c63a35a0
JB
343 workerChoiceStrategyContext:
344 | WorkerChoiceStrategyContext<Worker, Data, Response>
345 | undefined,
c329fd41
JB
346 workerUsage: WorkerUsage,
347 message: MessageValue<Response>
348 ): void => {
349 if (message.workerError != null) {
350 return
351 }
352 updateMeasurementStatistics(
353 workerUsage.runTime,
f4d0a470 354 workerChoiceStrategyContext?.getTaskStatisticsRequirements().runTime,
c329fd41
JB
355 message.taskPerformance?.runTime ?? 0
356 )
357}
358
359export const updateEluWorkerUsage = <
360 Worker extends IWorker,
361 Data = unknown,
362 Response = unknown
363>(
c63a35a0
JB
364 workerChoiceStrategyContext:
365 | WorkerChoiceStrategyContext<Worker, Data, Response>
366 | undefined,
c329fd41
JB
367 workerUsage: WorkerUsage,
368 message: MessageValue<Response>
369 ): void => {
370 if (message.workerError != null) {
371 return
372 }
c63a35a0 373 const eluTaskStatisticsRequirements =
f4d0a470 374 workerChoiceStrategyContext?.getTaskStatisticsRequirements().elu
c329fd41
JB
375 updateMeasurementStatistics(
376 workerUsage.elu.active,
377 eluTaskStatisticsRequirements,
378 message.taskPerformance?.elu?.active ?? 0
379 )
380 updateMeasurementStatistics(
381 workerUsage.elu.idle,
382 eluTaskStatisticsRequirements,
383 message.taskPerformance?.elu?.idle ?? 0
384 )
c63a35a0 385 if (eluTaskStatisticsRequirements?.aggregate === true) {
c329fd41
JB
386 if (message.taskPerformance?.elu != null) {
387 if (workerUsage.elu.utilization != null) {
388 workerUsage.elu.utilization =
389 (workerUsage.elu.utilization +
390 message.taskPerformance.elu.utilization) /
391 2
392 } else {
393 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
394 }
395 }
396 }
397}
c3719753
JB
398
399export const createWorker = <Worker extends IWorker>(
400 type: WorkerType,
401 filePath: string,
402 opts: { env?: Record<string, unknown>, workerOptions?: WorkerOptions }
403): Worker => {
404 switch (type) {
405 case WorkerTypes.thread:
e9ed6eee 406 return new ThreadWorker(filePath, {
c3719753 407 env: SHARE_ENV,
c63a35a0 408 ...opts.workerOptions
c3719753
JB
409 }) as unknown as Worker
410 case WorkerTypes.cluster:
c63a35a0 411 return cluster.fork(opts.env) as unknown as Worker
c3719753
JB
412 default:
413 // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
414 throw new Error(`Unknown worker type '${type}'`)
415 }
416}
d41a44de 417
e9ed6eee
JB
418/**
419 * Returns the worker type of the given worker.
420 *
421 * @param worker - The worker to get the type of.
422 * @returns The worker type of the given worker.
423 * @internal
424 */
425export const getWorkerType = (worker: IWorker): WorkerType | undefined => {
426 if (worker instanceof ThreadWorker) {
427 return WorkerTypes.thread
428 } else if (worker instanceof ClusterWorker) {
429 return WorkerTypes.cluster
430 }
431}
432
433/**
434 * Returns the worker id of the given worker.
435 *
436 * @param worker - The worker to get the id of.
437 * @returns The worker id of the given worker.
438 * @internal
439 */
440export const getWorkerId = (worker: IWorker): number | undefined => {
441 if (worker instanceof ThreadWorker) {
442 return worker.threadId
443 } else if (worker instanceof ClusterWorker) {
444 return worker.id
445 }
446}
447
d41a44de
JB
448export const waitWorkerNodeEvents = async <
449 Worker extends IWorker,
450 Data = unknown
451>(
452 workerNode: IWorkerNode<Worker, Data>,
453 workerNodeEvent: string,
32b141fd
JB
454 numberOfEventsToWait: number,
455 timeout: number
d41a44de
JB
456): Promise<number> => {
457 return await new Promise<number>(resolve => {
458 let events = 0
459 if (numberOfEventsToWait === 0) {
460 resolve(events)
461 return
462 }
463 workerNode.on(workerNodeEvent, () => {
464 ++events
465 if (events === numberOfEventsToWait) {
466 resolve(events)
467 }
468 })
6f3a391b 469 if (timeout >= 0) {
32b141fd
JB
470 setTimeout(() => {
471 resolve(events)
472 }, timeout)
473 }
d41a44de
JB
474 })
475}