build(deps-dev): apply updates
[poolifier.git] / src / pools / utils.ts
CommitLineData
bde6b5d7 1import { existsSync } from 'node:fs'
e9ed6eee
JB
2import cluster, { Worker as ClusterWorker } from 'node:cluster'
3import {
4 SHARE_ENV,
5 Worker as ThreadWorker,
6 type WorkerOptions
7} from 'node:worker_threads'
c329fd41 8import { env } from 'node:process'
e9ed6eee
JB
9import { randomInt } from 'node:crypto'
10import { cpus } from 'node:os'
d35e5717
JB
11import { average, isPlainObject, max, median, min } from '../utils.js'
12import type { MessageValue, Task } from '../utility-types.js'
bde6b5d7 13import {
bfc75cca 14 type MeasurementStatisticsRequirements,
bde6b5d7 15 WorkerChoiceStrategies,
e9ed6eee
JB
16 type WorkerChoiceStrategy,
17 type WorkerChoiceStrategyOptions
d35e5717 18} from './selection-strategies/selection-strategies-types.js'
e9ed6eee 19import type { IPool, TasksQueueOptions } from './pool.js'
c3719753
JB
20import {
21 type IWorker,
d41a44de 22 type IWorkerNode,
c3719753
JB
23 type MeasurementStatistics,
24 type WorkerNodeOptions,
25 type WorkerType,
c329fd41
JB
26 WorkerTypes,
27 type WorkerUsage
d35e5717
JB
28} from './worker.js'
29import type { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context.js'
bde6b5d7 30
e9ed6eee
JB
31/**
32 * Default measurement statistics requirements.
33 */
34export const DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS: MeasurementStatisticsRequirements =
35 {
36 aggregate: false,
37 average: false,
38 median: false
39 }
40
32b141fd
JB
41export const getDefaultTasksQueueOptions = (
42 poolMaxSize: number
43): Required<TasksQueueOptions> => {
44 return {
45 size: Math.pow(poolMaxSize, 2),
46 concurrency: 1,
47 taskStealing: true,
48 tasksStealingOnBackPressure: true,
568d0075 49 tasksFinishedTimeout: 2000
32b141fd
JB
50 }
51}
52
e9ed6eee
JB
53export const getWorkerChoiceStrategyRetries = <
54 Worker extends IWorker,
55 Data,
56 Response
57>(
58 pool: IPool<Worker, Data, Response>,
59 opts?: WorkerChoiceStrategyOptions
60 ): number => {
61 return (
62 pool.info.maxSize +
63 Object.keys(opts?.weights ?? getDefaultWeights(pool.info.maxSize)).length
64 )
65}
66
67export const buildWorkerChoiceStrategyOptions = <
68 Worker extends IWorker,
69 Data,
70 Response
71>(
72 pool: IPool<Worker, Data, Response>,
73 opts?: WorkerChoiceStrategyOptions
74 ): WorkerChoiceStrategyOptions => {
75 opts = clone(opts ?? {})
76 opts.weights = opts.weights ?? getDefaultWeights(pool.info.maxSize)
77 return {
78 ...{
79 runTime: { median: false },
80 waitTime: { median: false },
81 elu: { median: false }
82 },
83 ...opts
84 }
85}
86
87const clone = <T>(object: T): T => {
88 return structuredClone<T>(object)
89}
90
91const getDefaultWeights = (
92 poolMaxSize: number,
93 defaultWorkerWeight?: number
94): Record<number, number> => {
95 defaultWorkerWeight = defaultWorkerWeight ?? getDefaultWorkerWeight()
96 const weights: Record<number, number> = {}
97 for (let workerNodeKey = 0; workerNodeKey < poolMaxSize; workerNodeKey++) {
98 weights[workerNodeKey] = defaultWorkerWeight
99 }
100 return weights
101}
102
103const getDefaultWorkerWeight = (): number => {
104 const cpuSpeed = randomInt(500, 2500)
105 let cpusCycleTimeWeight = 0
106 for (const cpu of cpus()) {
107 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
108 if (cpu.speed == null || cpu.speed === 0) {
109 cpu.speed =
110 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
111 cpus().find(cpu => cpu.speed != null && cpu.speed !== 0)?.speed ??
112 cpuSpeed
113 }
114 // CPU estimated cycle time
115 const numberOfDigits = cpu.speed.toString().length - 1
116 const cpuCycleTime = 1 / (cpu.speed / Math.pow(10, numberOfDigits))
117 cpusCycleTimeWeight += cpuCycleTime * Math.pow(10, numberOfDigits)
118 }
119 return Math.round(cpusCycleTimeWeight / cpus().length)
120}
121
c63a35a0 122export const checkFilePath = (filePath: string | undefined): void => {
c3719753
JB
123 if (filePath == null) {
124 throw new TypeError('The worker file path must be specified')
125 }
126 if (typeof filePath !== 'string') {
127 throw new TypeError('The worker file path must be a string')
128 }
bde6b5d7
JB
129 if (!existsSync(filePath)) {
130 throw new Error(`Cannot find the worker file '${filePath}'`)
131 }
132}
133
c63a35a0
JB
134export const checkDynamicPoolSize = (
135 min: number,
136 max: number | undefined
137): void => {
bde6b5d7
JB
138 if (max == null) {
139 throw new TypeError(
140 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
141 )
142 } else if (!Number.isSafeInteger(max)) {
143 throw new TypeError(
144 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
145 )
146 } else if (min > max) {
147 throw new RangeError(
148 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
149 )
150 } else if (max === 0) {
151 throw new RangeError(
152 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
153 )
154 } else if (min === max) {
155 throw new RangeError(
156 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
157 )
158 }
159}
160
161export const checkValidWorkerChoiceStrategy = (
c63a35a0 162 workerChoiceStrategy: WorkerChoiceStrategy | undefined
bde6b5d7
JB
163): void => {
164 if (
165 workerChoiceStrategy != null &&
166 !Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)
167 ) {
168 throw new Error(`Invalid worker choice strategy '${workerChoiceStrategy}'`)
169 }
170}
171
172export const checkValidTasksQueueOptions = (
c63a35a0 173 tasksQueueOptions: TasksQueueOptions | undefined
bde6b5d7
JB
174): void => {
175 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
176 throw new TypeError('Invalid tasks queue options: must be a plain object')
177 }
178 if (
179 tasksQueueOptions?.concurrency != null &&
180 !Number.isSafeInteger(tasksQueueOptions.concurrency)
181 ) {
182 throw new TypeError(
183 'Invalid worker node tasks concurrency: must be an integer'
184 )
185 }
186 if (
187 tasksQueueOptions?.concurrency != null &&
188 tasksQueueOptions.concurrency <= 0
189 ) {
190 throw new RangeError(
191 `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero`
192 )
193 }
194 if (
195 tasksQueueOptions?.size != null &&
196 !Number.isSafeInteger(tasksQueueOptions.size)
197 ) {
198 throw new TypeError(
199 'Invalid worker node tasks queue size: must be an integer'
200 )
201 }
202 if (tasksQueueOptions?.size != null && tasksQueueOptions.size <= 0) {
203 throw new RangeError(
204 `Invalid worker node tasks queue size: ${tasksQueueOptions.size} is a negative integer or zero`
205 )
206 }
207}
bfc75cca 208
c3719753 209export const checkWorkerNodeArguments = (
c63a35a0
JB
210 type: WorkerType | undefined,
211 filePath: string | undefined,
212 opts: WorkerNodeOptions | undefined
9a38f99e 213): void => {
c3719753
JB
214 if (type == null) {
215 throw new TypeError('Cannot construct a worker node without a worker type')
216 }
217 if (!Object.values(WorkerTypes).includes(type)) {
218 throw new TypeError(
219 `Cannot construct a worker node with an invalid worker type '${type}'`
220 )
9a38f99e 221 }
c3719753
JB
222 checkFilePath(filePath)
223 if (opts == null) {
9a38f99e 224 throw new TypeError(
c3719753 225 'Cannot construct a worker node without worker node options'
9a38f99e
JB
226 )
227 }
9974369e 228 if (!isPlainObject(opts)) {
9a38f99e 229 throw new TypeError(
c3719753 230 'Cannot construct a worker node with invalid options: must be a plain object'
9a38f99e
JB
231 )
232 }
c3719753
JB
233 if (opts.tasksQueueBackPressureSize == null) {
234 throw new TypeError(
235 'Cannot construct a worker node without a tasks queue back pressure size option'
236 )
237 }
238 if (!Number.isSafeInteger(opts.tasksQueueBackPressureSize)) {
239 throw new TypeError(
240 'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer'
241 )
242 }
243 if (opts.tasksQueueBackPressureSize <= 0) {
9a38f99e 244 throw new RangeError(
c3719753 245 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer'
9a38f99e
JB
246 )
247 }
248}
bfc75cca
JB
249
250/**
251 * Updates the given measurement statistics.
252 *
253 * @param measurementStatistics - The measurement statistics to update.
254 * @param measurementRequirements - The measurement statistics requirements.
255 * @param measurementValue - The measurement value.
bfc75cca
JB
256 * @internal
257 */
c329fd41 258const updateMeasurementStatistics = (
bfc75cca 259 measurementStatistics: MeasurementStatistics,
c63a35a0
JB
260 measurementRequirements: MeasurementStatisticsRequirements | undefined,
261 measurementValue: number | undefined
bfc75cca 262): void => {
c63a35a0
JB
263 if (
264 measurementRequirements != null &&
265 measurementValue != null &&
266 measurementRequirements.aggregate
267 ) {
bfc75cca
JB
268 measurementStatistics.aggregate =
269 (measurementStatistics.aggregate ?? 0) + measurementValue
270 measurementStatistics.minimum = min(
271 measurementValue,
272 measurementStatistics.minimum ?? Infinity
273 )
274 measurementStatistics.maximum = max(
275 measurementValue,
276 measurementStatistics.maximum ?? -Infinity
277 )
c63a35a0 278 if (measurementRequirements.average || measurementRequirements.median) {
bfc75cca
JB
279 measurementStatistics.history.push(measurementValue)
280 if (measurementRequirements.average) {
281 measurementStatistics.average = average(measurementStatistics.history)
282 } else if (measurementStatistics.average != null) {
283 delete measurementStatistics.average
284 }
285 if (measurementRequirements.median) {
286 measurementStatistics.median = median(measurementStatistics.history)
287 } else if (measurementStatistics.median != null) {
288 delete measurementStatistics.median
289 }
290 }
291 }
292}
c329fd41
JB
293if (env.NODE_ENV === 'test') {
294 // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
295 exports.updateMeasurementStatistics = updateMeasurementStatistics
296}
297
298export const updateWaitTimeWorkerUsage = <
299 Worker extends IWorker,
300 Data = unknown,
301 Response = unknown
302>(
c63a35a0
JB
303 workerChoiceStrategyContext:
304 | WorkerChoiceStrategyContext<Worker, Data, Response>
305 | undefined,
c329fd41
JB
306 workerUsage: WorkerUsage,
307 task: Task<Data>
308 ): void => {
309 const timestamp = performance.now()
310 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
311 updateMeasurementStatistics(
312 workerUsage.waitTime,
f4d0a470 313 workerChoiceStrategyContext?.getTaskStatisticsRequirements().waitTime,
c329fd41
JB
314 taskWaitTime
315 )
316}
317
318export const updateTaskStatisticsWorkerUsage = <Response = unknown>(
319 workerUsage: WorkerUsage,
320 message: MessageValue<Response>
321): void => {
322 const workerTaskStatistics = workerUsage.tasks
323 if (
c63a35a0 324 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
c329fd41
JB
325 workerTaskStatistics.executing != null &&
326 workerTaskStatistics.executing > 0
327 ) {
328 --workerTaskStatistics.executing
329 }
330 if (message.workerError == null) {
331 ++workerTaskStatistics.executed
332 } else {
333 ++workerTaskStatistics.failed
334 }
335}
336
337export const updateRunTimeWorkerUsage = <
338 Worker extends IWorker,
339 Data = unknown,
340 Response = unknown
341>(
c63a35a0
JB
342 workerChoiceStrategyContext:
343 | WorkerChoiceStrategyContext<Worker, Data, Response>
344 | undefined,
c329fd41
JB
345 workerUsage: WorkerUsage,
346 message: MessageValue<Response>
347 ): void => {
348 if (message.workerError != null) {
349 return
350 }
351 updateMeasurementStatistics(
352 workerUsage.runTime,
f4d0a470 353 workerChoiceStrategyContext?.getTaskStatisticsRequirements().runTime,
c329fd41
JB
354 message.taskPerformance?.runTime ?? 0
355 )
356}
357
358export const updateEluWorkerUsage = <
359 Worker extends IWorker,
360 Data = unknown,
361 Response = unknown
362>(
c63a35a0
JB
363 workerChoiceStrategyContext:
364 | WorkerChoiceStrategyContext<Worker, Data, Response>
365 | undefined,
c329fd41
JB
366 workerUsage: WorkerUsage,
367 message: MessageValue<Response>
368 ): void => {
369 if (message.workerError != null) {
370 return
371 }
c63a35a0 372 const eluTaskStatisticsRequirements =
f4d0a470 373 workerChoiceStrategyContext?.getTaskStatisticsRequirements().elu
c329fd41
JB
374 updateMeasurementStatistics(
375 workerUsage.elu.active,
376 eluTaskStatisticsRequirements,
377 message.taskPerformance?.elu?.active ?? 0
378 )
379 updateMeasurementStatistics(
380 workerUsage.elu.idle,
381 eluTaskStatisticsRequirements,
382 message.taskPerformance?.elu?.idle ?? 0
383 )
c63a35a0 384 if (eluTaskStatisticsRequirements?.aggregate === true) {
c329fd41
JB
385 if (message.taskPerformance?.elu != null) {
386 if (workerUsage.elu.utilization != null) {
387 workerUsage.elu.utilization =
388 (workerUsage.elu.utilization +
389 message.taskPerformance.elu.utilization) /
390 2
391 } else {
392 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
393 }
394 }
395 }
396}
c3719753
JB
397
398export const createWorker = <Worker extends IWorker>(
399 type: WorkerType,
400 filePath: string,
401 opts: { env?: Record<string, unknown>, workerOptions?: WorkerOptions }
402): Worker => {
403 switch (type) {
404 case WorkerTypes.thread:
e9ed6eee 405 return new ThreadWorker(filePath, {
c3719753 406 env: SHARE_ENV,
c63a35a0 407 ...opts.workerOptions
c3719753
JB
408 }) as unknown as Worker
409 case WorkerTypes.cluster:
c63a35a0 410 return cluster.fork(opts.env) as unknown as Worker
c3719753
JB
411 default:
412 // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
413 throw new Error(`Unknown worker type '${type}'`)
414 }
415}
d41a44de 416
e9ed6eee
JB
417/**
418 * Returns the worker type of the given worker.
419 *
420 * @param worker - The worker to get the type of.
421 * @returns The worker type of the given worker.
422 * @internal
423 */
424export const getWorkerType = (worker: IWorker): WorkerType | undefined => {
425 if (worker instanceof ThreadWorker) {
426 return WorkerTypes.thread
427 } else if (worker instanceof ClusterWorker) {
428 return WorkerTypes.cluster
429 }
430}
431
432/**
433 * Returns the worker id of the given worker.
434 *
435 * @param worker - The worker to get the id of.
436 * @returns The worker id of the given worker.
437 * @internal
438 */
439export const getWorkerId = (worker: IWorker): number | undefined => {
440 if (worker instanceof ThreadWorker) {
441 return worker.threadId
442 } else if (worker instanceof ClusterWorker) {
443 return worker.id
444 }
445}
446
d41a44de
JB
447export const waitWorkerNodeEvents = async <
448 Worker extends IWorker,
449 Data = unknown
450>(
451 workerNode: IWorkerNode<Worker, Data>,
452 workerNodeEvent: string,
32b141fd
JB
453 numberOfEventsToWait: number,
454 timeout: number
d41a44de
JB
455): Promise<number> => {
456 return await new Promise<number>(resolve => {
457 let events = 0
458 if (numberOfEventsToWait === 0) {
459 resolve(events)
460 return
461 }
462 workerNode.on(workerNodeEvent, () => {
463 ++events
464 if (events === numberOfEventsToWait) {
465 resolve(events)
466 }
467 })
6f3a391b 468 if (timeout >= 0) {
32b141fd
JB
469 setTimeout(() => {
470 resolve(events)
471 }, timeout)
472 }
d41a44de
JB
473 })
474}