1 import cluster
, { Worker
as ClusterWorker
} from
'node:cluster'
2 import { randomInt
} from
'node:crypto'
3 import { existsSync
} from
'node:fs'
4 import { cpus
} from
'node:os'
5 import { env
} from
'node:process'
8 Worker
as ThreadWorker
,
10 } from
'node:worker_threads'
12 import type { MessageValue
, Task
} from
'../utility-types.js'
13 import { average
, isPlainObject
, max
, median
, min
} from
'../utils.js'
14 import type { IPool
, TasksQueueOptions
} from
'./pool.js'
16 type MeasurementStatisticsRequirements
,
17 WorkerChoiceStrategies
,
18 type WorkerChoiceStrategy
,
19 type WorkerChoiceStrategyOptions
20 } from
'./selection-strategies/selection-strategies-types.js'
21 import type { WorkerChoiceStrategyContext
} from
'./selection-strategies/worker-choice-strategy-context.js'
25 type MeasurementStatistics
,
26 type WorkerNodeOptions
,
33 * Default measurement statistics requirements.
35 export const DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS
: MeasurementStatisticsRequirements
=
42 export const getDefaultTasksQueueOptions
= (
44 ): Required
<TasksQueueOptions
> => {
46 size
: Math.pow(poolMaxSize
, 2),
49 tasksStealingOnBackPressure
: true,
50 tasksFinishedTimeout
: 2000
54 export const getWorkerChoiceStrategyRetries
= <
55 Worker
extends IWorker
,
59 pool
: IPool
<Worker
, Data
, Response
>,
60 opts
?: WorkerChoiceStrategyOptions
64 Object.keys(opts
?.weights
?? getDefaultWeights(pool
.info
.maxSize
)).length
68 export const buildWorkerChoiceStrategyOptions
= <
69 Worker
extends IWorker
,
73 pool
: IPool
<Worker
, Data
, Response
>,
74 opts
?: WorkerChoiceStrategyOptions
75 ): WorkerChoiceStrategyOptions
=> {
76 opts
= clone(opts
?? {})
77 opts
.weights
= opts
.weights
?? getDefaultWeights(pool
.info
.maxSize
)
80 runTime
: { median
: false },
81 waitTime
: { median
: false },
82 elu
: { median
: false }
88 const clone
= <T
>(object
: T
): T
=> {
89 return structuredClone
<T
>(object
)
92 const getDefaultWeights
= (
94 defaultWorkerWeight
?: number
95 ): Record
<number, number> => {
96 defaultWorkerWeight
= defaultWorkerWeight
?? getDefaultWorkerWeight()
97 const weights
: Record
<number, number> = {}
98 for (let workerNodeKey
= 0; workerNodeKey
< poolMaxSize
; workerNodeKey
++) {
99 weights
[workerNodeKey
] = defaultWorkerWeight
104 const getDefaultWorkerWeight
= (): number => {
105 const cpuSpeed
= randomInt(500, 2500)
106 let cpusCycleTimeWeight
= 0
107 for (const cpu
of cpus()) {
108 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
109 if (cpu
.speed
== null || cpu
.speed
=== 0) {
111 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
112 cpus().find(cpu
=> cpu
.speed
!= null && cpu
.speed
!== 0)?.speed
??
115 // CPU estimated cycle time
116 const numberOfDigits
= cpu
.speed
.toString().length
- 1
117 const cpuCycleTime
= 1 / (cpu
.speed
/ Math.pow(10, numberOfDigits
))
118 cpusCycleTimeWeight
+= cpuCycleTime
* Math.pow(10, numberOfDigits
)
120 return Math.round(cpusCycleTimeWeight
/ cpus().length
)
123 export const checkFilePath
= (filePath
: string | undefined): void => {
124 if (filePath
== null) {
125 throw new TypeError('The worker file path must be specified')
127 if (typeof filePath
!== 'string') {
128 throw new TypeError('The worker file path must be a string')
130 if (!existsSync(filePath
)) {
131 throw new Error(`Cannot find the worker file '${filePath}'`)
135 export const checkDynamicPoolSize
= (
137 max
: number | undefined
141 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
143 } else if (!Number.isSafeInteger(max
)) {
145 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
147 } else if (min
> max
) {
148 throw new RangeError(
149 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
151 } else if (max
=== 0) {
152 throw new RangeError(
153 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
155 } else if (min
=== max
) {
156 throw new RangeError(
157 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
162 export const checkValidWorkerChoiceStrategy
= (
163 workerChoiceStrategy
: WorkerChoiceStrategy
| undefined
166 workerChoiceStrategy
!= null &&
167 !Object.values(WorkerChoiceStrategies
).includes(workerChoiceStrategy
)
169 throw new Error(`Invalid worker choice strategy '${workerChoiceStrategy}'`)
173 export const checkValidTasksQueueOptions
= (
174 tasksQueueOptions
: TasksQueueOptions
| undefined
176 if (tasksQueueOptions
!= null && !isPlainObject(tasksQueueOptions
)) {
177 throw new TypeError('Invalid tasks queue options: must be a plain object')
180 tasksQueueOptions
?.concurrency
!= null &&
181 !Number.isSafeInteger(tasksQueueOptions
.concurrency
)
184 'Invalid worker node tasks concurrency: must be an integer'
188 tasksQueueOptions
?.concurrency
!= null &&
189 tasksQueueOptions
.concurrency
<= 0
191 throw new RangeError(
192 `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero`
196 tasksQueueOptions
?.size
!= null &&
197 !Number.isSafeInteger(tasksQueueOptions
.size
)
200 'Invalid worker node tasks queue size: must be an integer'
203 if (tasksQueueOptions
?.size
!= null && tasksQueueOptions
.size
<= 0) {
204 throw new RangeError(
205 `Invalid worker node tasks queue size: ${tasksQueueOptions.size} is a negative integer or zero`
210 export const checkWorkerNodeArguments
= (
211 type: WorkerType
| undefined,
212 filePath
: string | undefined,
213 opts
: WorkerNodeOptions
| undefined
216 throw new TypeError('Cannot construct a worker node without a worker type')
218 if (!Object.values(WorkerTypes
).includes(type)) {
220 `Cannot construct a worker node with an invalid worker type '${type}'`
223 checkFilePath(filePath
)
226 'Cannot construct a worker node without worker node options'
229 if (!isPlainObject(opts
)) {
231 'Cannot construct a worker node with invalid options: must be a plain object'
234 if (opts
.tasksQueueBackPressureSize
== null) {
236 'Cannot construct a worker node without a tasks queue back pressure size option'
239 if (!Number.isSafeInteger(opts
.tasksQueueBackPressureSize
)) {
241 'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer'
244 if (opts
.tasksQueueBackPressureSize
<= 0) {
245 throw new RangeError(
246 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer'
252 * Updates the given measurement statistics.
254 * @param measurementStatistics - The measurement statistics to update.
255 * @param measurementRequirements - The measurement statistics requirements.
256 * @param measurementValue - The measurement value.
259 const updateMeasurementStatistics
= (
260 measurementStatistics
: MeasurementStatistics
,
261 measurementRequirements
: MeasurementStatisticsRequirements
| undefined,
262 measurementValue
: number | undefined
265 measurementRequirements
!= null &&
266 measurementValue
!= null &&
267 measurementRequirements
.aggregate
269 measurementStatistics
.aggregate
=
270 (measurementStatistics
.aggregate
?? 0) + measurementValue
271 measurementStatistics
.minimum
= min(
273 measurementStatistics
.minimum
?? Infinity
275 measurementStatistics
.maximum
= max(
277 measurementStatistics
.maximum
?? -Infinity
279 if (measurementRequirements
.average
|| measurementRequirements
.median
) {
280 measurementStatistics
.history
.push(measurementValue
)
281 if (measurementRequirements
.average
) {
282 measurementStatistics
.average
= average(measurementStatistics
.history
)
283 } else if (measurementStatistics
.average
!= null) {
284 delete measurementStatistics
.average
286 if (measurementRequirements
.median
) {
287 measurementStatistics
.median
= median(measurementStatistics
.history
)
288 } else if (measurementStatistics
.median
!= null) {
289 delete measurementStatistics
.median
294 if (env
.NODE_ENV
=== 'test') {
295 // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
296 exports
.updateMeasurementStatistics
= updateMeasurementStatistics
299 export const updateWaitTimeWorkerUsage
= <
300 Worker
extends IWorker
,
304 workerChoiceStrategyContext
:
305 | WorkerChoiceStrategyContext
<Worker
, Data
, Response
>
307 workerUsage
: WorkerUsage
,
310 const timestamp
= performance
.now()
311 const taskWaitTime
= timestamp
- (task
.timestamp
?? timestamp
)
312 updateMeasurementStatistics(
313 workerUsage
.waitTime
,
314 workerChoiceStrategyContext
?.getTaskStatisticsRequirements().waitTime
,
319 export const updateTaskStatisticsWorkerUsage
= <Response
= unknown
>(
320 workerUsage
: WorkerUsage
,
321 message
: MessageValue
<Response
>
323 const workerTaskStatistics
= workerUsage
.tasks
325 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
326 workerTaskStatistics
.executing
!= null &&
327 workerTaskStatistics
.executing
> 0
329 --workerTaskStatistics
.executing
331 if (message
.workerError
== null) {
332 ++workerTaskStatistics
.executed
334 ++workerTaskStatistics
.failed
338 export const updateRunTimeWorkerUsage
= <
339 Worker
extends IWorker
,
343 workerChoiceStrategyContext
:
344 | WorkerChoiceStrategyContext
<Worker
, Data
, Response
>
346 workerUsage
: WorkerUsage
,
347 message
: MessageValue
<Response
>
349 if (message
.workerError
!= null) {
352 updateMeasurementStatistics(
354 workerChoiceStrategyContext
?.getTaskStatisticsRequirements().runTime
,
355 message
.taskPerformance
?.runTime
?? 0
359 export const updateEluWorkerUsage
= <
360 Worker
extends IWorker
,
364 workerChoiceStrategyContext
:
365 | WorkerChoiceStrategyContext
<Worker
, Data
, Response
>
367 workerUsage
: WorkerUsage
,
368 message
: MessageValue
<Response
>
370 if (message
.workerError
!= null) {
373 const eluTaskStatisticsRequirements
=
374 workerChoiceStrategyContext
?.getTaskStatisticsRequirements().elu
375 updateMeasurementStatistics(
376 workerUsage
.elu
.active
,
377 eluTaskStatisticsRequirements
,
378 message
.taskPerformance
?.elu
?.active
?? 0
380 updateMeasurementStatistics(
381 workerUsage
.elu
.idle
,
382 eluTaskStatisticsRequirements
,
383 message
.taskPerformance
?.elu
?.idle
?? 0
385 if (eluTaskStatisticsRequirements
?.aggregate
=== true) {
386 if (message
.taskPerformance
?.elu
!= null) {
387 if (workerUsage
.elu
.utilization
!= null) {
388 workerUsage
.elu
.utilization
=
389 (workerUsage
.elu
.utilization
+
390 message
.taskPerformance
.elu
.utilization
) /
393 workerUsage
.elu
.utilization
= message
.taskPerformance
.elu
.utilization
399 export const createWorker
= <Worker
extends IWorker
>(
402 opts
: { env
?: Record
<string, unknown
>, workerOptions
?: WorkerOptions
}
405 case WorkerTypes
.thread
:
406 return new ThreadWorker(filePath
, {
408 ...opts
.workerOptions
409 }) as unknown
as Worker
410 case WorkerTypes
.cluster
:
411 return cluster
.fork(opts
.env
) as unknown
as Worker
413 // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
414 throw new Error(`Unknown worker type '${type}'`)
419 * Returns the worker type of the given worker.
421 * @param worker - The worker to get the type of.
422 * @returns The worker type of the given worker.
425 export const getWorkerType
= (worker
: IWorker
): WorkerType
| undefined => {
426 if (worker
instanceof ThreadWorker
) {
427 return WorkerTypes
.thread
428 } else if (worker
instanceof ClusterWorker
) {
429 return WorkerTypes
.cluster
434 * Returns the worker id of the given worker.
436 * @param worker - The worker to get the id of.
437 * @returns The worker id of the given worker.
440 export const getWorkerId
= (worker
: IWorker
): number | undefined => {
441 if (worker
instanceof ThreadWorker
) {
442 return worker
.threadId
443 } else if (worker
instanceof ClusterWorker
) {
448 export const waitWorkerNodeEvents
= async <
449 Worker
extends IWorker
,
452 workerNode
: IWorkerNode
<Worker
, Data
>,
453 workerNodeEvent
: string,
454 numberOfEventsToWait
: number,
456 ): Promise
<number> => {
457 return await new Promise
<number>(resolve
=> {
459 if (numberOfEventsToWait
=== 0) {
463 workerNode
.on(workerNodeEvent
, () => {
465 if (events
=== numberOfEventsToWait
) {