1 import cluster
, { Worker
as ClusterWorker
} from
'node:cluster'
2 import { existsSync
} from
'node:fs'
3 import { cpus
} from
'node:os'
4 import { env
} from
'node:process'
7 Worker
as ThreadWorker
,
9 } from
'node:worker_threads'
11 import type { MessageValue
, Task
} from
'../utility-types.js'
12 import { average
, isPlainObject
, max
, median
, min
} from
'../utils.js'
13 import type { IPool
, TasksQueueOptions
} from
'./pool.js'
15 type MeasurementStatisticsRequirements
,
16 WorkerChoiceStrategies
,
17 type WorkerChoiceStrategy
,
18 type WorkerChoiceStrategyOptions
19 } from
'./selection-strategies/selection-strategies-types.js'
20 import type { WorkerChoiceStrategyContext
} from
'./selection-strategies/worker-choice-strategy-context.js'
24 type MeasurementStatistics
,
25 type WorkerNodeOptions
,
32 * Default measurement statistics requirements.
34 export const DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS
: MeasurementStatisticsRequirements
=
41 export const getDefaultTasksQueueOptions
= (
43 ): Required
<TasksQueueOptions
> => {
45 size
: Math.pow(poolMaxSize
, 2),
48 tasksStealingOnBackPressure
: true,
49 tasksFinishedTimeout
: 2000
53 export const getWorkerChoiceStrategyRetries
= <
54 Worker
extends IWorker
,
58 pool
: IPool
<Worker
, Data
, Response
>,
59 opts
?: WorkerChoiceStrategyOptions
63 Object.keys(opts
?.weights
?? getDefaultWeights(pool
.info
.maxSize
)).length
67 export const buildWorkerChoiceStrategyOptions
= <
68 Worker
extends IWorker
,
72 pool
: IPool
<Worker
, Data
, Response
>,
73 opts
?: WorkerChoiceStrategyOptions
74 ): WorkerChoiceStrategyOptions
=> {
75 opts
= clone(opts
?? {})
76 opts
.weights
= opts
.weights
?? getDefaultWeights(pool
.info
.maxSize
)
79 runTime
: { median
: false },
80 waitTime
: { median
: false },
81 elu
: { median
: false }
87 const clone
= <T
>(object
: T
): T
=> {
88 return structuredClone
<T
>(object
)
91 const getDefaultWeights
= (
93 defaultWorkerWeight
?: number
94 ): Record
<number, number> => {
95 defaultWorkerWeight
= defaultWorkerWeight
?? getDefaultWorkerWeight()
96 const weights
: Record
<number, number> = {}
97 for (let workerNodeKey
= 0; workerNodeKey
< poolMaxSize
; workerNodeKey
++) {
98 weights
[workerNodeKey
] = defaultWorkerWeight
103 const getDefaultWorkerWeight
= (): number => {
104 let cpusCycleTimeWeight
= 0
105 for (const cpu
of cpus()) {
106 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
107 if (cpu
.speed
== null || cpu
.speed
=== 0) {
109 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition, @typescript-eslint/no-non-null-assertion,
110 cpus().find(cpu
=> cpu
.speed
!= null && cpu
.speed
!== 0)!.speed
112 // CPU estimated cycle time
113 const numberOfDigits
= cpu
.speed
.toString().length
- 1
114 const cpuCycleTime
= 1 / (cpu
.speed
/ Math.pow(10, numberOfDigits
))
115 cpusCycleTimeWeight
+= cpuCycleTime
* Math.pow(10, numberOfDigits
)
117 return Math.round(cpusCycleTimeWeight
/ cpus().length
)
120 export const checkFilePath
= (filePath
: string | undefined): void => {
121 if (filePath
== null) {
122 throw new TypeError('The worker file path must be specified')
124 if (typeof filePath
!== 'string') {
125 throw new TypeError('The worker file path must be a string')
127 if (!existsSync(filePath
)) {
128 throw new Error(`Cannot find the worker file '${filePath}'`)
132 export const checkDynamicPoolSize
= (
134 max
: number | undefined
138 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
140 } else if (!Number.isSafeInteger(max
)) {
142 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
144 } else if (min
> max
) {
145 throw new RangeError(
146 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
148 } else if (max
=== 0) {
149 throw new RangeError(
150 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
152 } else if (min
=== max
) {
153 throw new RangeError(
154 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
159 export const checkValidWorkerChoiceStrategy
= (
160 workerChoiceStrategy
: WorkerChoiceStrategy
| undefined
163 workerChoiceStrategy
!= null &&
164 !Object.values(WorkerChoiceStrategies
).includes(workerChoiceStrategy
)
166 throw new Error(`Invalid worker choice strategy '${workerChoiceStrategy}'`)
170 export const checkValidTasksQueueOptions
= (
171 tasksQueueOptions
: TasksQueueOptions
| undefined
173 if (tasksQueueOptions
!= null && !isPlainObject(tasksQueueOptions
)) {
174 throw new TypeError('Invalid tasks queue options: must be a plain object')
177 tasksQueueOptions
?.concurrency
!= null &&
178 !Number.isSafeInteger(tasksQueueOptions
.concurrency
)
181 'Invalid worker node tasks concurrency: must be an integer'
185 tasksQueueOptions
?.concurrency
!= null &&
186 tasksQueueOptions
.concurrency
<= 0
188 throw new RangeError(
189 `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero`
193 tasksQueueOptions
?.size
!= null &&
194 !Number.isSafeInteger(tasksQueueOptions
.size
)
197 'Invalid worker node tasks queue size: must be an integer'
200 if (tasksQueueOptions
?.size
!= null && tasksQueueOptions
.size
<= 0) {
201 throw new RangeError(
202 `Invalid worker node tasks queue size: ${tasksQueueOptions.size} is a negative integer or zero`
207 export const checkWorkerNodeArguments
= (
208 type: WorkerType
| undefined,
209 filePath
: string | undefined,
210 opts
: WorkerNodeOptions
| undefined
213 throw new TypeError('Cannot construct a worker node without a worker type')
215 if (!Object.values(WorkerTypes
).includes(type)) {
217 `Cannot construct a worker node with an invalid worker type '${type}'`
220 checkFilePath(filePath
)
223 'Cannot construct a worker node without worker node options'
226 if (!isPlainObject(opts
)) {
228 'Cannot construct a worker node with invalid options: must be a plain object'
231 if (opts
.tasksQueueBackPressureSize
== null) {
233 'Cannot construct a worker node without a tasks queue back pressure size option'
236 if (!Number.isSafeInteger(opts
.tasksQueueBackPressureSize
)) {
238 'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer'
241 if (opts
.tasksQueueBackPressureSize
<= 0) {
242 throw new RangeError(
243 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer'
249 * Updates the given measurement statistics.
251 * @param measurementStatistics - The measurement statistics to update.
252 * @param measurementRequirements - The measurement statistics requirements.
253 * @param measurementValue - The measurement value.
256 const updateMeasurementStatistics
= (
257 measurementStatistics
: MeasurementStatistics
,
258 measurementRequirements
: MeasurementStatisticsRequirements
| undefined,
259 measurementValue
: number | undefined
262 measurementRequirements
!= null &&
263 measurementValue
!= null &&
264 measurementRequirements
.aggregate
266 measurementStatistics
.aggregate
=
267 (measurementStatistics
.aggregate
?? 0) + measurementValue
268 measurementStatistics
.minimum
= min(
270 measurementStatistics
.minimum
?? Infinity
272 measurementStatistics
.maximum
= max(
274 measurementStatistics
.maximum
?? -Infinity
276 if (measurementRequirements
.average
|| measurementRequirements
.median
) {
277 measurementStatistics
.history
.push(measurementValue
)
278 if (measurementRequirements
.average
) {
279 measurementStatistics
.average
= average(measurementStatistics
.history
)
280 } else if (measurementStatistics
.average
!= null) {
281 delete measurementStatistics
.average
283 if (measurementRequirements
.median
) {
284 measurementStatistics
.median
= median(measurementStatistics
.history
)
285 } else if (measurementStatistics
.median
!= null) {
286 delete measurementStatistics
.median
291 if (env
.NODE_ENV
=== 'test') {
292 // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
293 exports
.updateMeasurementStatistics
= updateMeasurementStatistics
296 export const updateWaitTimeWorkerUsage
= <
297 Worker
extends IWorker
,
301 workerChoiceStrategyContext
:
302 | WorkerChoiceStrategyContext
<Worker
, Data
, Response
>
304 workerUsage
: WorkerUsage
,
307 const timestamp
= performance
.now()
308 const taskWaitTime
= timestamp
- (task
.timestamp
?? timestamp
)
309 updateMeasurementStatistics(
310 workerUsage
.waitTime
,
311 workerChoiceStrategyContext
?.getTaskStatisticsRequirements().waitTime
,
316 export const updateTaskStatisticsWorkerUsage
= <Response
= unknown
>(
317 workerUsage
: WorkerUsage
,
318 message
: MessageValue
<Response
>
320 const workerTaskStatistics
= workerUsage
.tasks
322 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
323 workerTaskStatistics
.executing
!= null &&
324 workerTaskStatistics
.executing
> 0
326 --workerTaskStatistics
.executing
328 if (message
.workerError
== null) {
329 ++workerTaskStatistics
.executed
331 ++workerTaskStatistics
.failed
335 export const updateRunTimeWorkerUsage
= <
336 Worker
extends IWorker
,
340 workerChoiceStrategyContext
:
341 | WorkerChoiceStrategyContext
<Worker
, Data
, Response
>
343 workerUsage
: WorkerUsage
,
344 message
: MessageValue
<Response
>
346 if (message
.workerError
!= null) {
349 updateMeasurementStatistics(
351 workerChoiceStrategyContext
?.getTaskStatisticsRequirements().runTime
,
352 message
.taskPerformance
?.runTime
?? 0
356 export const updateEluWorkerUsage
= <
357 Worker
extends IWorker
,
361 workerChoiceStrategyContext
:
362 | WorkerChoiceStrategyContext
<Worker
, Data
, Response
>
364 workerUsage
: WorkerUsage
,
365 message
: MessageValue
<Response
>
367 if (message
.workerError
!= null) {
370 const eluTaskStatisticsRequirements
=
371 workerChoiceStrategyContext
?.getTaskStatisticsRequirements().elu
372 updateMeasurementStatistics(
373 workerUsage
.elu
.active
,
374 eluTaskStatisticsRequirements
,
375 message
.taskPerformance
?.elu
?.active
?? 0
377 updateMeasurementStatistics(
378 workerUsage
.elu
.idle
,
379 eluTaskStatisticsRequirements
,
380 message
.taskPerformance
?.elu
?.idle
?? 0
382 if (eluTaskStatisticsRequirements
?.aggregate
=== true) {
383 if (message
.taskPerformance
?.elu
!= null) {
384 if (workerUsage
.elu
.utilization
!= null) {
385 workerUsage
.elu
.utilization
=
386 (workerUsage
.elu
.utilization
+
387 message
.taskPerformance
.elu
.utilization
) /
390 workerUsage
.elu
.utilization
= message
.taskPerformance
.elu
.utilization
396 export const createWorker
= <Worker
extends IWorker
>(
399 opts
: { env
?: Record
<string, unknown
>, workerOptions
?: WorkerOptions
}
402 case WorkerTypes
.thread
:
403 return new ThreadWorker(filePath
, {
405 ...opts
.workerOptions
406 }) as unknown
as Worker
407 case WorkerTypes
.cluster
:
408 return cluster
.fork(opts
.env
) as unknown
as Worker
410 // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
411 throw new Error(`Unknown worker type '${type}'`)
416 * Returns the worker type of the given worker.
418 * @param worker - The worker to get the type of.
419 * @returns The worker type of the given worker.
422 export const getWorkerType
= (worker
: IWorker
): WorkerType
| undefined => {
423 if (worker
instanceof ThreadWorker
) {
424 return WorkerTypes
.thread
425 } else if (worker
instanceof ClusterWorker
) {
426 return WorkerTypes
.cluster
431 * Returns the worker id of the given worker.
433 * @param worker - The worker to get the id of.
434 * @returns The worker id of the given worker.
437 export const getWorkerId
= (worker
: IWorker
): number | undefined => {
438 if (worker
instanceof ThreadWorker
) {
439 return worker
.threadId
440 } else if (worker
instanceof ClusterWorker
) {
445 export const waitWorkerNodeEvents
= async <
446 Worker
extends IWorker
,
449 workerNode
: IWorkerNode
<Worker
, Data
>,
450 workerNodeEvent
: string,
451 numberOfEventsToWait
: number,
453 ): Promise
<number> => {
454 return await new Promise
<number>(resolve
=> {
456 if (numberOfEventsToWait
=== 0) {
460 workerNode
.on(workerNodeEvent
, () => {
462 if (events
=== numberOfEventsToWait
) {