1 import { existsSync
} from
'node:fs'
2 import cluster
, { Worker
as ClusterWorker
} from
'node:cluster'
5 Worker
as ThreadWorker
,
7 } from
'node:worker_threads'
8 import { env
} from
'node:process'
9 import { randomInt
} from
'node:crypto'
10 import { cpus
} from
'node:os'
11 import { average
, isPlainObject
, max
, median
, min
} from
'../utils.js'
12 import type { MessageValue
, Task
} from
'../utility-types.js'
14 type MeasurementStatisticsRequirements
,
15 WorkerChoiceStrategies
,
16 type WorkerChoiceStrategy
,
17 type WorkerChoiceStrategyOptions
18 } from
'./selection-strategies/selection-strategies-types.js'
19 import type { IPool
, TasksQueueOptions
} from
'./pool.js'
23 type MeasurementStatistics
,
24 type WorkerNodeOptions
,
29 import type { WorkerChoiceStrategyContext
} from
'./selection-strategies/worker-choice-strategy-context.js'
32 * Default measurement statistics requirements.
34 export const DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS
: MeasurementStatisticsRequirements
=
41 export const getDefaultTasksQueueOptions
= (
43 ): Required
<TasksQueueOptions
> => {
45 size
: Math.pow(poolMaxSize
, 2),
48 tasksStealingOnBackPressure
: true,
49 tasksFinishedTimeout
: 2000
53 export const getWorkerChoiceStrategyRetries
= <
54 Worker
extends IWorker
,
58 pool
: IPool
<Worker
, Data
, Response
>,
59 opts
?: WorkerChoiceStrategyOptions
63 Object.keys(opts
?.weights
?? getDefaultWeights(pool
.info
.maxSize
)).length
67 export const buildWorkerChoiceStrategyOptions
= <
68 Worker
extends IWorker
,
72 pool
: IPool
<Worker
, Data
, Response
>,
73 opts
?: WorkerChoiceStrategyOptions
74 ): WorkerChoiceStrategyOptions
=> {
75 opts
= clone(opts
?? {})
76 opts
.weights
= opts
.weights
?? getDefaultWeights(pool
.info
.maxSize
)
79 runTime
: { median
: false },
80 waitTime
: { median
: false },
81 elu
: { median
: false }
87 const clone
= <T
>(object
: T
): T
=> {
88 return structuredClone
<T
>(object
)
91 const getDefaultWeights
= (
93 defaultWorkerWeight
?: number
94 ): Record
<number, number> => {
95 defaultWorkerWeight
= defaultWorkerWeight
?? getDefaultWorkerWeight()
96 const weights
: Record
<number, number> = {}
97 for (let workerNodeKey
= 0; workerNodeKey
< poolMaxSize
; workerNodeKey
++) {
98 weights
[workerNodeKey
] = defaultWorkerWeight
103 const getDefaultWorkerWeight
= (): number => {
104 const cpuSpeed
= randomInt(500, 2500)
105 let cpusCycleTimeWeight
= 0
106 for (const cpu
of cpus()) {
107 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
108 if (cpu
.speed
== null || cpu
.speed
=== 0) {
110 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
111 cpus().find(cpu
=> cpu
.speed
!= null && cpu
.speed
!== 0)?.speed
??
114 // CPU estimated cycle time
115 const numberOfDigits
= cpu
.speed
.toString().length
- 1
116 const cpuCycleTime
= 1 / (cpu
.speed
/ Math.pow(10, numberOfDigits
))
117 cpusCycleTimeWeight
+= cpuCycleTime
* Math.pow(10, numberOfDigits
)
119 return Math.round(cpusCycleTimeWeight
/ cpus().length
)
122 export const checkFilePath
= (filePath
: string | undefined): void => {
123 if (filePath
== null) {
124 throw new TypeError('The worker file path must be specified')
126 if (typeof filePath
!== 'string') {
127 throw new TypeError('The worker file path must be a string')
129 if (!existsSync(filePath
)) {
130 throw new Error(`Cannot find the worker file '${filePath}'`)
134 export const checkDynamicPoolSize
= (
136 max
: number | undefined
140 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
142 } else if (!Number.isSafeInteger(max
)) {
144 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
146 } else if (min
> max
) {
147 throw new RangeError(
148 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
150 } else if (max
=== 0) {
151 throw new RangeError(
152 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
154 } else if (min
=== max
) {
155 throw new RangeError(
156 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
161 export const checkValidWorkerChoiceStrategy
= (
162 workerChoiceStrategy
: WorkerChoiceStrategy
| undefined
165 workerChoiceStrategy
!= null &&
166 !Object.values(WorkerChoiceStrategies
).includes(workerChoiceStrategy
)
168 throw new Error(`Invalid worker choice strategy '${workerChoiceStrategy}'`)
172 export const checkValidTasksQueueOptions
= (
173 tasksQueueOptions
: TasksQueueOptions
| undefined
175 if (tasksQueueOptions
!= null && !isPlainObject(tasksQueueOptions
)) {
176 throw new TypeError('Invalid tasks queue options: must be a plain object')
179 tasksQueueOptions
?.concurrency
!= null &&
180 !Number.isSafeInteger(tasksQueueOptions
.concurrency
)
183 'Invalid worker node tasks concurrency: must be an integer'
187 tasksQueueOptions
?.concurrency
!= null &&
188 tasksQueueOptions
.concurrency
<= 0
190 throw new RangeError(
191 `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero`
195 tasksQueueOptions
?.size
!= null &&
196 !Number.isSafeInteger(tasksQueueOptions
.size
)
199 'Invalid worker node tasks queue size: must be an integer'
202 if (tasksQueueOptions
?.size
!= null && tasksQueueOptions
.size
<= 0) {
203 throw new RangeError(
204 `Invalid worker node tasks queue size: ${tasksQueueOptions.size} is a negative integer or zero`
209 export const checkWorkerNodeArguments
= (
210 type: WorkerType
| undefined,
211 filePath
: string | undefined,
212 opts
: WorkerNodeOptions
| undefined
215 throw new TypeError('Cannot construct a worker node without a worker type')
217 if (!Object.values(WorkerTypes
).includes(type)) {
219 `Cannot construct a worker node with an invalid worker type '${type}'`
222 checkFilePath(filePath
)
225 'Cannot construct a worker node without worker node options'
228 if (!isPlainObject(opts
)) {
230 'Cannot construct a worker node with invalid options: must be a plain object'
233 if (opts
.tasksQueueBackPressureSize
== null) {
235 'Cannot construct a worker node without a tasks queue back pressure size option'
238 if (!Number.isSafeInteger(opts
.tasksQueueBackPressureSize
)) {
240 'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer'
243 if (opts
.tasksQueueBackPressureSize
<= 0) {
244 throw new RangeError(
245 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer'
251 * Updates the given measurement statistics.
253 * @param measurementStatistics - The measurement statistics to update.
254 * @param measurementRequirements - The measurement statistics requirements.
255 * @param measurementValue - The measurement value.
258 const updateMeasurementStatistics
= (
259 measurementStatistics
: MeasurementStatistics
,
260 measurementRequirements
: MeasurementStatisticsRequirements
| undefined,
261 measurementValue
: number | undefined
264 measurementRequirements
!= null &&
265 measurementValue
!= null &&
266 measurementRequirements
.aggregate
268 measurementStatistics
.aggregate
=
269 (measurementStatistics
.aggregate
?? 0) + measurementValue
270 measurementStatistics
.minimum
= min(
272 measurementStatistics
.minimum
?? Infinity
274 measurementStatistics
.maximum
= max(
276 measurementStatistics
.maximum
?? -Infinity
278 if (measurementRequirements
.average
|| measurementRequirements
.median
) {
279 measurementStatistics
.history
.push(measurementValue
)
280 if (measurementRequirements
.average
) {
281 measurementStatistics
.average
= average(measurementStatistics
.history
)
282 } else if (measurementStatistics
.average
!= null) {
283 delete measurementStatistics
.average
285 if (measurementRequirements
.median
) {
286 measurementStatistics
.median
= median(measurementStatistics
.history
)
287 } else if (measurementStatistics
.median
!= null) {
288 delete measurementStatistics
.median
293 if (env
.NODE_ENV
=== 'test') {
294 // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
295 exports
.updateMeasurementStatistics
= updateMeasurementStatistics
298 export const updateWaitTimeWorkerUsage
= <
299 Worker
extends IWorker
,
303 workerChoiceStrategyContext
:
304 | WorkerChoiceStrategyContext
<Worker
, Data
, Response
>
306 workerUsage
: WorkerUsage
,
309 const timestamp
= performance
.now()
310 const taskWaitTime
= timestamp
- (task
.timestamp
?? timestamp
)
311 updateMeasurementStatistics(
312 workerUsage
.waitTime
,
313 workerChoiceStrategyContext
?.getTaskStatisticsRequirements().waitTime
,
318 export const updateTaskStatisticsWorkerUsage
= <Response
= unknown
>(
319 workerUsage
: WorkerUsage
,
320 message
: MessageValue
<Response
>
322 const workerTaskStatistics
= workerUsage
.tasks
324 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
325 workerTaskStatistics
.executing
!= null &&
326 workerTaskStatistics
.executing
> 0
328 --workerTaskStatistics
.executing
330 if (message
.workerError
== null) {
331 ++workerTaskStatistics
.executed
333 ++workerTaskStatistics
.failed
337 export const updateRunTimeWorkerUsage
= <
338 Worker
extends IWorker
,
342 workerChoiceStrategyContext
:
343 | WorkerChoiceStrategyContext
<Worker
, Data
, Response
>
345 workerUsage
: WorkerUsage
,
346 message
: MessageValue
<Response
>
348 if (message
.workerError
!= null) {
351 updateMeasurementStatistics(
353 workerChoiceStrategyContext
?.getTaskStatisticsRequirements().runTime
,
354 message
.taskPerformance
?.runTime
?? 0
358 export const updateEluWorkerUsage
= <
359 Worker
extends IWorker
,
363 workerChoiceStrategyContext
:
364 | WorkerChoiceStrategyContext
<Worker
, Data
, Response
>
366 workerUsage
: WorkerUsage
,
367 message
: MessageValue
<Response
>
369 if (message
.workerError
!= null) {
372 const eluTaskStatisticsRequirements
=
373 workerChoiceStrategyContext
?.getTaskStatisticsRequirements().elu
374 updateMeasurementStatistics(
375 workerUsage
.elu
.active
,
376 eluTaskStatisticsRequirements
,
377 message
.taskPerformance
?.elu
?.active
?? 0
379 updateMeasurementStatistics(
380 workerUsage
.elu
.idle
,
381 eluTaskStatisticsRequirements
,
382 message
.taskPerformance
?.elu
?.idle
?? 0
384 if (eluTaskStatisticsRequirements
?.aggregate
=== true) {
385 if (message
.taskPerformance
?.elu
!= null) {
386 if (workerUsage
.elu
.utilization
!= null) {
387 workerUsage
.elu
.utilization
=
388 (workerUsage
.elu
.utilization
+
389 message
.taskPerformance
.elu
.utilization
) /
392 workerUsage
.elu
.utilization
= message
.taskPerformance
.elu
.utilization
398 export const createWorker
= <Worker
extends IWorker
>(
401 opts
: { env
?: Record
<string, unknown
>, workerOptions
?: WorkerOptions
}
404 case WorkerTypes
.thread
:
405 return new ThreadWorker(filePath
, {
407 ...opts
.workerOptions
408 }) as unknown
as Worker
409 case WorkerTypes
.cluster
:
410 return cluster
.fork(opts
.env
) as unknown
as Worker
412 // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
413 throw new Error(`Unknown worker type '${type}'`)
418 * Returns the worker type of the given worker.
420 * @param worker - The worker to get the type of.
421 * @returns The worker type of the given worker.
424 export const getWorkerType
= (worker
: IWorker
): WorkerType
| undefined => {
425 if (worker
instanceof ThreadWorker
) {
426 return WorkerTypes
.thread
427 } else if (worker
instanceof ClusterWorker
) {
428 return WorkerTypes
.cluster
433 * Returns the worker id of the given worker.
435 * @param worker - The worker to get the id of.
436 * @returns The worker id of the given worker.
439 export const getWorkerId
= (worker
: IWorker
): number | undefined => {
440 if (worker
instanceof ThreadWorker
) {
441 return worker
.threadId
442 } else if (worker
instanceof ClusterWorker
) {
447 export const waitWorkerNodeEvents
= async <
448 Worker
extends IWorker
,
451 workerNode
: IWorkerNode
<Worker
, Data
>,
452 workerNodeEvent
: string,
453 numberOfEventsToWait
: number,
455 ): Promise
<number> => {
456 return await new Promise
<number>(resolve
=> {
458 if (numberOfEventsToWait
=== 0) {
462 workerNode
.on(workerNodeEvent
, () => {
464 if (events
=== numberOfEventsToWait
) {