1 import cluster
, { Worker
as ClusterWorker
} from
'node:cluster'
2 import { existsSync
} from
'node:fs'
3 import { cpus
} from
'node:os'
4 import { env
} from
'node:process'
7 Worker
as ThreadWorker
,
9 } from
'node:worker_threads'
11 import type { MessageValue
, Task
} from
'../utility-types.js'
12 import { average
, isPlainObject
, max
, median
, min
} from
'../utils.js'
13 import type { IPool
, TasksQueueOptions
} from
'./pool.js'
15 type MeasurementStatisticsRequirements
,
16 WorkerChoiceStrategies
,
17 type WorkerChoiceStrategy
,
18 type WorkerChoiceStrategyOptions
19 } from
'./selection-strategies/selection-strategies-types.js'
20 import type { WorkerChoiceStrategyContext
} from
'./selection-strategies/worker-choice-strategy-context.js'
24 type MeasurementStatistics
,
25 type WorkerNodeOptions
,
32 * Default measurement statistics requirements.
34 export const DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS
: MeasurementStatisticsRequirements
=
41 export const getDefaultTasksQueueOptions
= (
43 ): Required
<TasksQueueOptions
> => {
45 size
: Math.pow(poolMaxSize
, 2),
48 tasksStealingOnBackPressure
: true,
49 tasksFinishedTimeout
: 2000
53 export const getWorkerChoiceStrategyRetries
= <
54 Worker
extends IWorker
,
58 pool
: IPool
<Worker
, Data
, Response
>,
59 opts
?: WorkerChoiceStrategyOptions
63 Object.keys(opts
?.weights
?? getDefaultWeights(pool
.info
.maxSize
)).length
67 export const buildWorkerChoiceStrategyOptions
= <
68 Worker
extends IWorker
,
72 pool
: IPool
<Worker
, Data
, Response
>,
73 opts
?: WorkerChoiceStrategyOptions
74 ): WorkerChoiceStrategyOptions
=> {
75 opts
= clone(opts
?? {})
76 opts
.weights
= opts
.weights
?? getDefaultWeights(pool
.info
.maxSize
)
79 runTime
: { median
: false },
80 waitTime
: { median
: false },
81 elu
: { median
: false }
87 const clone
= <T
>(object
: T
): T
=> {
88 return structuredClone
<T
>(object
)
91 const getDefaultWeights
= (
93 defaultWorkerWeight
?: number
94 ): Record
<number, number> => {
95 defaultWorkerWeight
= defaultWorkerWeight
?? getDefaultWorkerWeight()
96 const weights
: Record
<number, number> = {}
97 for (let workerNodeKey
= 0; workerNodeKey
< poolMaxSize
; workerNodeKey
++) {
98 weights
[workerNodeKey
] = defaultWorkerWeight
103 const estimatedCpuSpeed
= (): number => {
104 const runs
= 150000000
105 const begin
= performance
.now()
106 // eslint-disable-next-line no-empty
107 for (let i
= runs
; i
> 0; i
--) {}
108 const end
= performance
.now()
109 const duration
= end
- begin
110 return Math.trunc(runs
/ duration
/ 1000) // in MHz
113 const estCpuSpeed
= estimatedCpuSpeed()
115 const getDefaultWorkerWeight
= (estimatedCpuSpeed
= estCpuSpeed
): number => {
116 let cpusCycleTimeWeight
= 0
117 for (const cpu
of cpus()) {
118 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
119 if (cpu
.speed
== null || cpu
.speed
=== 0) {
121 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
122 cpus().find(cpu
=> cpu
.speed
!= null && cpu
.speed
!== 0)?.speed
??
125 // CPU estimated cycle time
126 const numberOfDigits
= cpu
.speed
.toString().length
- 1
127 const cpuCycleTime
= 1 / (cpu
.speed
/ Math.pow(10, numberOfDigits
))
128 cpusCycleTimeWeight
+= cpuCycleTime
* Math.pow(10, numberOfDigits
)
130 return Math.round(cpusCycleTimeWeight
/ cpus().length
)
133 export const checkFilePath
= (filePath
: string | undefined): void => {
134 if (filePath
== null) {
135 throw new TypeError('The worker file path must be specified')
137 if (typeof filePath
!== 'string') {
138 throw new TypeError('The worker file path must be a string')
140 if (!existsSync(filePath
)) {
141 throw new Error(`Cannot find the worker file '${filePath}'`)
145 export const checkDynamicPoolSize
= (
147 max
: number | undefined
151 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
153 } else if (!Number.isSafeInteger(max
)) {
155 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
157 } else if (min
> max
) {
158 throw new RangeError(
159 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
161 } else if (max
=== 0) {
162 throw new RangeError(
163 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
165 } else if (min
=== max
) {
166 throw new RangeError(
167 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
172 export const checkValidWorkerChoiceStrategy
= (
173 workerChoiceStrategy
: WorkerChoiceStrategy
| undefined
176 workerChoiceStrategy
!= null &&
177 !Object.values(WorkerChoiceStrategies
).includes(workerChoiceStrategy
)
179 throw new Error(`Invalid worker choice strategy '${workerChoiceStrategy}'`)
183 export const checkValidTasksQueueOptions
= (
184 tasksQueueOptions
: TasksQueueOptions
| undefined
186 if (tasksQueueOptions
!= null && !isPlainObject(tasksQueueOptions
)) {
187 throw new TypeError('Invalid tasks queue options: must be a plain object')
190 tasksQueueOptions
?.concurrency
!= null &&
191 !Number.isSafeInteger(tasksQueueOptions
.concurrency
)
194 'Invalid worker node tasks concurrency: must be an integer'
198 tasksQueueOptions
?.concurrency
!= null &&
199 tasksQueueOptions
.concurrency
<= 0
201 throw new RangeError(
202 `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero`
206 tasksQueueOptions
?.size
!= null &&
207 !Number.isSafeInteger(tasksQueueOptions
.size
)
210 'Invalid worker node tasks queue size: must be an integer'
213 if (tasksQueueOptions
?.size
!= null && tasksQueueOptions
.size
<= 0) {
214 throw new RangeError(
215 `Invalid worker node tasks queue size: ${tasksQueueOptions.size} is a negative integer or zero`
220 export const checkWorkerNodeArguments
= (
221 type: WorkerType
| undefined,
222 filePath
: string | undefined,
223 opts
: WorkerNodeOptions
| undefined
226 throw new TypeError('Cannot construct a worker node without a worker type')
228 if (!Object.values(WorkerTypes
).includes(type)) {
230 `Cannot construct a worker node with an invalid worker type '${type}'`
233 checkFilePath(filePath
)
236 'Cannot construct a worker node without worker node options'
239 if (!isPlainObject(opts
)) {
241 'Cannot construct a worker node with invalid options: must be a plain object'
244 if (opts
.tasksQueueBackPressureSize
== null) {
246 'Cannot construct a worker node without a tasks queue back pressure size option'
249 if (!Number.isSafeInteger(opts
.tasksQueueBackPressureSize
)) {
251 'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer'
254 if (opts
.tasksQueueBackPressureSize
<= 0) {
255 throw new RangeError(
256 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer'
262 * Updates the given measurement statistics.
264 * @param measurementStatistics - The measurement statistics to update.
265 * @param measurementRequirements - The measurement statistics requirements.
266 * @param measurementValue - The measurement value.
269 const updateMeasurementStatistics
= (
270 measurementStatistics
: MeasurementStatistics
,
271 measurementRequirements
: MeasurementStatisticsRequirements
| undefined,
272 measurementValue
: number | undefined
275 measurementRequirements
!= null &&
276 measurementValue
!= null &&
277 measurementRequirements
.aggregate
279 measurementStatistics
.aggregate
=
280 (measurementStatistics
.aggregate
?? 0) + measurementValue
281 measurementStatistics
.minimum
= min(
283 measurementStatistics
.minimum
?? Infinity
285 measurementStatistics
.maximum
= max(
287 measurementStatistics
.maximum
?? -Infinity
289 if (measurementRequirements
.average
|| measurementRequirements
.median
) {
290 measurementStatistics
.history
.push(measurementValue
)
291 if (measurementRequirements
.average
) {
292 measurementStatistics
.average
= average(measurementStatistics
.history
)
293 } else if (measurementStatistics
.average
!= null) {
294 delete measurementStatistics
.average
296 if (measurementRequirements
.median
) {
297 measurementStatistics
.median
= median(measurementStatistics
.history
)
298 } else if (measurementStatistics
.median
!= null) {
299 delete measurementStatistics
.median
304 if (env
.NODE_ENV
=== 'test') {
305 // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
306 exports
.updateMeasurementStatistics
= updateMeasurementStatistics
309 export const updateWaitTimeWorkerUsage
= <
310 Worker
extends IWorker
,
314 workerChoiceStrategyContext
:
315 | WorkerChoiceStrategyContext
<Worker
, Data
, Response
>
317 workerUsage
: WorkerUsage
,
320 const timestamp
= performance
.now()
321 const taskWaitTime
= timestamp
- (task
.timestamp
?? timestamp
)
322 updateMeasurementStatistics(
323 workerUsage
.waitTime
,
324 workerChoiceStrategyContext
?.getTaskStatisticsRequirements().waitTime
,
329 export const updateTaskStatisticsWorkerUsage
= <Response
= unknown
>(
330 workerUsage
: WorkerUsage
,
331 message
: MessageValue
<Response
>
333 const workerTaskStatistics
= workerUsage
.tasks
335 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
336 workerTaskStatistics
.executing
!= null &&
337 workerTaskStatistics
.executing
> 0
339 --workerTaskStatistics
.executing
341 if (message
.workerError
== null) {
342 ++workerTaskStatistics
.executed
344 ++workerTaskStatistics
.failed
348 export const updateRunTimeWorkerUsage
= <
349 Worker
extends IWorker
,
353 workerChoiceStrategyContext
:
354 | WorkerChoiceStrategyContext
<Worker
, Data
, Response
>
356 workerUsage
: WorkerUsage
,
357 message
: MessageValue
<Response
>
359 if (message
.workerError
!= null) {
362 updateMeasurementStatistics(
364 workerChoiceStrategyContext
?.getTaskStatisticsRequirements().runTime
,
365 message
.taskPerformance
?.runTime
?? 0
369 export const updateEluWorkerUsage
= <
370 Worker
extends IWorker
,
374 workerChoiceStrategyContext
:
375 | WorkerChoiceStrategyContext
<Worker
, Data
, Response
>
377 workerUsage
: WorkerUsage
,
378 message
: MessageValue
<Response
>
380 if (message
.workerError
!= null) {
383 const eluTaskStatisticsRequirements
=
384 workerChoiceStrategyContext
?.getTaskStatisticsRequirements().elu
385 updateMeasurementStatistics(
386 workerUsage
.elu
.active
,
387 eluTaskStatisticsRequirements
,
388 message
.taskPerformance
?.elu
?.active
?? 0
390 updateMeasurementStatistics(
391 workerUsage
.elu
.idle
,
392 eluTaskStatisticsRequirements
,
393 message
.taskPerformance
?.elu
?.idle
?? 0
395 if (eluTaskStatisticsRequirements
?.aggregate
=== true) {
396 if (message
.taskPerformance
?.elu
!= null) {
397 if (workerUsage
.elu
.utilization
!= null) {
398 workerUsage
.elu
.utilization
=
399 (workerUsage
.elu
.utilization
+
400 message
.taskPerformance
.elu
.utilization
) /
403 workerUsage
.elu
.utilization
= message
.taskPerformance
.elu
.utilization
409 export const createWorker
= <Worker
extends IWorker
>(
412 opts
: { env
?: Record
<string, unknown
>, workerOptions
?: WorkerOptions
}
415 case WorkerTypes
.thread
:
416 return new ThreadWorker(filePath
, {
418 ...opts
.workerOptions
419 }) as unknown
as Worker
420 case WorkerTypes
.cluster
:
421 return cluster
.fork(opts
.env
) as unknown
as Worker
423 // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
424 throw new Error(`Unknown worker type '${type}'`)
429 * Returns the worker type of the given worker.
431 * @param worker - The worker to get the type of.
432 * @returns The worker type of the given worker.
435 export const getWorkerType
= (worker
: IWorker
): WorkerType
| undefined => {
436 if (worker
instanceof ThreadWorker
) {
437 return WorkerTypes
.thread
438 } else if (worker
instanceof ClusterWorker
) {
439 return WorkerTypes
.cluster
444 * Returns the worker id of the given worker.
446 * @param worker - The worker to get the id of.
447 * @returns The worker id of the given worker.
450 export const getWorkerId
= (worker
: IWorker
): number | undefined => {
451 if (worker
instanceof ThreadWorker
) {
452 return worker
.threadId
453 } else if (worker
instanceof ClusterWorker
) {
458 export const waitWorkerNodeEvents
= async <
459 Worker
extends IWorker
,
462 workerNode
: IWorkerNode
<Worker
, Data
>,
463 workerNodeEvent
: string,
464 numberOfEventsToWait
: number,
466 ): Promise
<number> => {
467 return await new Promise
<number>(resolve
=> {
469 if (numberOfEventsToWait
=== 0) {
473 workerNode
.on(workerNodeEvent
, () => {
475 if (events
=== numberOfEventsToWait
) {