1 import cluster
, { Worker
as ClusterWorker
} from
'node:cluster'
2 import { existsSync
} from
'node:fs'
3 import { cpus
} from
'node:os'
4 import { env
} from
'node:process'
7 Worker
as ThreadWorker
,
9 } from
'node:worker_threads'
11 import type { MessageValue
, Task
} from
'../utility-types.js'
12 import { average
, isPlainObject
, max
, median
, min
} from
'../utils.js'
13 import type { IPool
, TasksQueueOptions
} from
'./pool.js'
15 type MeasurementStatisticsRequirements
,
16 WorkerChoiceStrategies
,
17 type WorkerChoiceStrategy
,
18 type WorkerChoiceStrategyOptions
19 } from
'./selection-strategies/selection-strategies-types.js'
20 import type { WorkerChoiceStrategyContext
} from
'./selection-strategies/worker-choice-strategy-context.js'
24 type MeasurementStatistics
,
25 type WorkerNodeOptions
,
32 * Default measurement statistics requirements.
34 export const DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS
: MeasurementStatisticsRequirements
=
41 export const getDefaultTasksQueueOptions
= (
43 ): Required
<TasksQueueOptions
> => {
45 size
: Math.pow(poolMaxSize
, 2),
48 tasksStealingOnBackPressure
: true,
49 tasksFinishedTimeout
: 2000
53 export const getWorkerChoiceStrategyRetries
= <
54 Worker
extends IWorker
,
58 pool
: IPool
<Worker
, Data
, Response
>,
59 opts
?: WorkerChoiceStrategyOptions
63 Object.keys(opts
?.weights
?? getDefaultWeights(pool
.info
.maxSize
)).length
67 export const buildWorkerChoiceStrategyOptions
= <
68 Worker
extends IWorker
,
72 pool
: IPool
<Worker
, Data
, Response
>,
73 opts
?: WorkerChoiceStrategyOptions
74 ): WorkerChoiceStrategyOptions
=> {
75 opts
= clone(opts
?? {})
76 opts
.weights
= opts
.weights
?? getDefaultWeights(pool
.info
.maxSize
)
79 runTime
: { median
: false },
80 waitTime
: { median
: false },
81 elu
: { median
: false }
87 const clone
= <T
>(object
: T
): T
=> {
88 return structuredClone
<T
>(object
)
91 const getDefaultWeights
= (
93 defaultWorkerWeight
?: number
94 ): Record
<number, number> => {
95 defaultWorkerWeight
= defaultWorkerWeight
?? getDefaultWorkerWeight()
96 const weights
: Record
<number, number> = {}
97 for (let workerNodeKey
= 0; workerNodeKey
< poolMaxSize
; workerNodeKey
++) {
98 weights
[workerNodeKey
] = defaultWorkerWeight
103 const estimatedCpuSpeed
= (): number => {
104 const runs
= 150000000
105 const begin
= performance
.now()
106 // eslint-disable-next-line no-empty
107 for (let i
= runs
; i
> 0; i
--) {}
108 const end
= performance
.now()
109 const duration
= end
- begin
110 return Math.trunc(runs
/ duration
/ 1000) // in MHz
113 const getDefaultWorkerWeight
= (): number => {
114 const currentCpus
= cpus()
115 let estCpuSpeed
: number | undefined
116 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
117 if (currentCpus
.every(cpu
=> cpu
.speed
== null || cpu
.speed
=== 0)) {
118 estCpuSpeed
= estimatedCpuSpeed()
120 let cpusCycleTimeWeight
= 0
121 for (const cpu
of currentCpus
) {
122 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
123 if (cpu
.speed
== null || cpu
.speed
=== 0) {
125 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
126 currentCpus
.find(cpu
=> cpu
.speed
!= null && cpu
.speed
!== 0)?.speed
??
130 // CPU estimated cycle time
131 const numberOfDigits
= cpu
.speed
.toString().length
- 1
132 const cpuCycleTime
= 1 / (cpu
.speed
/ Math.pow(10, numberOfDigits
))
133 cpusCycleTimeWeight
+= cpuCycleTime
* Math.pow(10, numberOfDigits
)
135 return Math.round(cpusCycleTimeWeight
/ currentCpus
.length
)
138 export const checkFilePath
= (filePath
: string | undefined): void => {
139 if (filePath
== null) {
140 throw new TypeError('The worker file path must be specified')
142 if (typeof filePath
!== 'string') {
143 throw new TypeError('The worker file path must be a string')
145 if (!existsSync(filePath
)) {
146 throw new Error(`Cannot find the worker file '${filePath}'`)
150 export const checkDynamicPoolSize
= (
152 max
: number | undefined
156 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
158 } else if (!Number.isSafeInteger(max
)) {
160 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
162 } else if (min
> max
) {
163 throw new RangeError(
164 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
166 } else if (max
=== 0) {
167 throw new RangeError(
168 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
170 } else if (min
=== max
) {
171 throw new RangeError(
172 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
177 export const checkValidWorkerChoiceStrategy
= (
178 workerChoiceStrategy
: WorkerChoiceStrategy
| undefined
181 workerChoiceStrategy
!= null &&
182 !Object.values(WorkerChoiceStrategies
).includes(workerChoiceStrategy
)
184 throw new Error(`Invalid worker choice strategy '${workerChoiceStrategy}'`)
188 export const checkValidTasksQueueOptions
= (
189 tasksQueueOptions
: TasksQueueOptions
| undefined
191 if (tasksQueueOptions
!= null && !isPlainObject(tasksQueueOptions
)) {
192 throw new TypeError('Invalid tasks queue options: must be a plain object')
195 tasksQueueOptions
?.concurrency
!= null &&
196 !Number.isSafeInteger(tasksQueueOptions
.concurrency
)
199 'Invalid worker node tasks concurrency: must be an integer'
203 tasksQueueOptions
?.concurrency
!= null &&
204 tasksQueueOptions
.concurrency
<= 0
206 throw new RangeError(
207 `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero`
211 tasksQueueOptions
?.size
!= null &&
212 !Number.isSafeInteger(tasksQueueOptions
.size
)
215 'Invalid worker node tasks queue size: must be an integer'
218 if (tasksQueueOptions
?.size
!= null && tasksQueueOptions
.size
<= 0) {
219 throw new RangeError(
220 `Invalid worker node tasks queue size: ${tasksQueueOptions.size} is a negative integer or zero`
225 export const checkWorkerNodeArguments
= (
226 type: WorkerType
| undefined,
227 filePath
: string | undefined,
228 opts
: WorkerNodeOptions
| undefined
231 throw new TypeError('Cannot construct a worker node without a worker type')
233 if (!Object.values(WorkerTypes
).includes(type)) {
235 `Cannot construct a worker node with an invalid worker type '${type}'`
238 checkFilePath(filePath
)
241 'Cannot construct a worker node without worker node options'
244 if (!isPlainObject(opts
)) {
246 'Cannot construct a worker node with invalid options: must be a plain object'
249 if (opts
.tasksQueueBackPressureSize
== null) {
251 'Cannot construct a worker node without a tasks queue back pressure size option'
254 if (!Number.isSafeInteger(opts
.tasksQueueBackPressureSize
)) {
256 'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer'
259 if (opts
.tasksQueueBackPressureSize
<= 0) {
260 throw new RangeError(
261 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer'
267 * Updates the given measurement statistics.
269 * @param measurementStatistics - The measurement statistics to update.
270 * @param measurementRequirements - The measurement statistics requirements.
271 * @param measurementValue - The measurement value.
274 const updateMeasurementStatistics
= (
275 measurementStatistics
: MeasurementStatistics
,
276 measurementRequirements
: MeasurementStatisticsRequirements
| undefined,
277 measurementValue
: number | undefined
280 measurementRequirements
!= null &&
281 measurementValue
!= null &&
282 measurementRequirements
.aggregate
284 measurementStatistics
.aggregate
=
285 (measurementStatistics
.aggregate
?? 0) + measurementValue
286 measurementStatistics
.minimum
= min(
288 measurementStatistics
.minimum
?? Infinity
290 measurementStatistics
.maximum
= max(
292 measurementStatistics
.maximum
?? -Infinity
294 if (measurementRequirements
.average
|| measurementRequirements
.median
) {
295 measurementStatistics
.history
.push(measurementValue
)
296 if (measurementRequirements
.average
) {
297 measurementStatistics
.average
= average(measurementStatistics
.history
)
298 } else if (measurementStatistics
.average
!= null) {
299 delete measurementStatistics
.average
301 if (measurementRequirements
.median
) {
302 measurementStatistics
.median
= median(measurementStatistics
.history
)
303 } else if (measurementStatistics
.median
!= null) {
304 delete measurementStatistics
.median
309 if (env
.NODE_ENV
=== 'test') {
310 // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
311 exports
.updateMeasurementStatistics
= updateMeasurementStatistics
314 export const updateWaitTimeWorkerUsage
= <
315 Worker
extends IWorker
,
319 workerChoiceStrategyContext
:
320 | WorkerChoiceStrategyContext
<Worker
, Data
, Response
>
322 workerUsage
: WorkerUsage
,
325 const timestamp
= performance
.now()
326 const taskWaitTime
= timestamp
- (task
.timestamp
?? timestamp
)
327 updateMeasurementStatistics(
328 workerUsage
.waitTime
,
329 workerChoiceStrategyContext
?.getTaskStatisticsRequirements().waitTime
,
334 export const updateTaskStatisticsWorkerUsage
= <Response
= unknown
>(
335 workerUsage
: WorkerUsage
,
336 message
: MessageValue
<Response
>
338 const workerTaskStatistics
= workerUsage
.tasks
340 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
341 workerTaskStatistics
.executing
!= null &&
342 workerTaskStatistics
.executing
> 0
344 --workerTaskStatistics
.executing
346 if (message
.workerError
== null) {
347 ++workerTaskStatistics
.executed
349 ++workerTaskStatistics
.failed
353 export const updateRunTimeWorkerUsage
= <
354 Worker
extends IWorker
,
358 workerChoiceStrategyContext
:
359 | WorkerChoiceStrategyContext
<Worker
, Data
, Response
>
361 workerUsage
: WorkerUsage
,
362 message
: MessageValue
<Response
>
364 if (message
.workerError
!= null) {
367 updateMeasurementStatistics(
369 workerChoiceStrategyContext
?.getTaskStatisticsRequirements().runTime
,
370 message
.taskPerformance
?.runTime
?? 0
374 export const updateEluWorkerUsage
= <
375 Worker
extends IWorker
,
379 workerChoiceStrategyContext
:
380 | WorkerChoiceStrategyContext
<Worker
, Data
, Response
>
382 workerUsage
: WorkerUsage
,
383 message
: MessageValue
<Response
>
385 if (message
.workerError
!= null) {
388 const eluTaskStatisticsRequirements
=
389 workerChoiceStrategyContext
?.getTaskStatisticsRequirements().elu
390 updateMeasurementStatistics(
391 workerUsage
.elu
.active
,
392 eluTaskStatisticsRequirements
,
393 message
.taskPerformance
?.elu
?.active
?? 0
395 updateMeasurementStatistics(
396 workerUsage
.elu
.idle
,
397 eluTaskStatisticsRequirements
,
398 message
.taskPerformance
?.elu
?.idle
?? 0
400 if (eluTaskStatisticsRequirements
?.aggregate
=== true) {
401 if (message
.taskPerformance
?.elu
!= null) {
402 if (workerUsage
.elu
.utilization
!= null) {
403 workerUsage
.elu
.utilization
=
404 (workerUsage
.elu
.utilization
+
405 message
.taskPerformance
.elu
.utilization
) /
408 workerUsage
.elu
.utilization
= message
.taskPerformance
.elu
.utilization
414 export const createWorker
= <Worker
extends IWorker
>(
417 opts
: { env
?: Record
<string, unknown
>, workerOptions
?: WorkerOptions
}
420 case WorkerTypes
.thread
:
421 return new ThreadWorker(filePath
, {
423 ...opts
.workerOptions
424 }) as unknown
as Worker
425 case WorkerTypes
.cluster
:
426 return cluster
.fork(opts
.env
) as unknown
as Worker
428 // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
429 throw new Error(`Unknown worker type '${type}'`)
434 * Returns the worker type of the given worker.
436 * @param worker - The worker to get the type of.
437 * @returns The worker type of the given worker.
440 export const getWorkerType
= (worker
: IWorker
): WorkerType
| undefined => {
441 if (worker
instanceof ThreadWorker
) {
442 return WorkerTypes
.thread
443 } else if (worker
instanceof ClusterWorker
) {
444 return WorkerTypes
.cluster
449 * Returns the worker id of the given worker.
451 * @param worker - The worker to get the id of.
452 * @returns The worker id of the given worker.
455 export const getWorkerId
= (worker
: IWorker
): number | undefined => {
456 if (worker
instanceof ThreadWorker
) {
457 return worker
.threadId
458 } else if (worker
instanceof ClusterWorker
) {
463 export const waitWorkerNodeEvents
= async <
464 Worker
extends IWorker
,
467 workerNode
: IWorkerNode
<Worker
, Data
>,
468 workerNodeEvent
: string,
469 numberOfEventsToWait
: number,
471 ): Promise
<number> => {
472 return await new Promise
<number>(resolve
=> {
474 if (numberOfEventsToWait
=== 0) {
478 workerNode
.on(workerNodeEvent
, () => {
480 if (events
=== numberOfEventsToWait
) {