1 import cluster
, { Worker
as ClusterWorker
} from
'node:cluster'
2 import { existsSync
} from
'node:fs'
3 import { env
} from
'node:process'
6 Worker
as ThreadWorker
,
8 } from
'node:worker_threads'
10 import type { MessageValue
, Task
} from
'../utility-types.js'
11 import { average
, isPlainObject
, max
, median
, min
} from
'../utils.js'
12 import type { TasksQueueOptions
} from
'./pool.js'
14 type MeasurementStatisticsRequirements
,
15 WorkerChoiceStrategies
,
16 type WorkerChoiceStrategy
17 } from
'./selection-strategies/selection-strategies-types.js'
18 import type { WorkerChoiceStrategiesContext
} from
'./selection-strategies/worker-choice-strategies-context.js'
22 type MeasurementStatistics
,
23 type WorkerNodeOptions
,
30 * Default measurement statistics requirements.
32 export const DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS
: MeasurementStatisticsRequirements
=
39 export const getDefaultTasksQueueOptions
= (
41 ): Required
<TasksQueueOptions
> => {
43 size
: Math.pow(poolMaxSize
, 2),
46 tasksStealingOnBackPressure
: false,
47 tasksFinishedTimeout
: 2000
51 export const checkFilePath
= (filePath
: string | undefined): void => {
52 if (filePath
== null) {
53 throw new TypeError('The worker file path must be specified')
55 if (typeof filePath
!== 'string') {
56 throw new TypeError('The worker file path must be a string')
58 if (!existsSync(filePath
)) {
59 throw new Error(`Cannot find the worker file '${filePath}'`)
63 export const checkDynamicPoolSize
= (
65 max
: number | undefined
69 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
71 } else if (!Number.isSafeInteger(max
)) {
73 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
75 } else if (min
> max
) {
77 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
79 } else if (max
=== 0) {
81 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
83 } else if (min
=== max
) {
85 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
90 export const checkValidPriority
= (priority
: number | undefined): void => {
91 if (priority
!= null && !Number.isSafeInteger(priority
)) {
92 throw new TypeError(`Invalid property 'priority': '${priority}'`)
96 Number.isSafeInteger(priority
) &&
97 (priority
< -20 || priority
> 19)
99 throw new RangeError("Property 'priority' must be between -20 and 19")
103 export const checkValidWorkerChoiceStrategy
= (
104 workerChoiceStrategy
: WorkerChoiceStrategy
| undefined
107 workerChoiceStrategy
!= null &&
108 !Object.values(WorkerChoiceStrategies
).includes(workerChoiceStrategy
)
110 throw new Error(`Invalid worker choice strategy '${workerChoiceStrategy}'`)
114 export const checkValidTasksQueueOptions
= (
115 tasksQueueOptions
: TasksQueueOptions
| undefined
117 if (tasksQueueOptions
!= null && !isPlainObject(tasksQueueOptions
)) {
118 throw new TypeError('Invalid tasks queue options: must be a plain object')
121 tasksQueueOptions
?.concurrency
!= null &&
122 !Number.isSafeInteger(tasksQueueOptions
.concurrency
)
125 'Invalid worker node tasks concurrency: must be an integer'
129 tasksQueueOptions
?.concurrency
!= null &&
130 tasksQueueOptions
.concurrency
<= 0
132 throw new RangeError(
133 `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero`
137 tasksQueueOptions
?.size
!= null &&
138 !Number.isSafeInteger(tasksQueueOptions
.size
)
141 'Invalid worker node tasks queue size: must be an integer'
144 if (tasksQueueOptions
?.size
!= null && tasksQueueOptions
.size
<= 0) {
145 throw new RangeError(
146 `Invalid worker node tasks queue size: ${tasksQueueOptions.size} is a negative integer or zero`
151 export const checkWorkerNodeArguments
= (
152 type: WorkerType
| undefined,
153 filePath
: string | undefined,
154 opts
: WorkerNodeOptions
| undefined
157 throw new TypeError('Cannot construct a worker node without a worker type')
159 if (!Object.values(WorkerTypes
).includes(type)) {
161 `Cannot construct a worker node with an invalid worker type '${type}'`
164 checkFilePath(filePath
)
167 'Cannot construct a worker node without worker node options'
170 if (!isPlainObject(opts
)) {
172 'Cannot construct a worker node with invalid worker node options: must be a plain object'
175 if (opts
.tasksQueueBackPressureSize
== null) {
177 'Cannot construct a worker node without a tasks queue back pressure size option'
180 if (!Number.isSafeInteger(opts
.tasksQueueBackPressureSize
)) {
182 'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer'
185 if (opts
.tasksQueueBackPressureSize
<= 0) {
186 throw new RangeError(
187 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer'
190 if (opts
.tasksQueueBucketSize
== null) {
192 'Cannot construct a worker node without a tasks queue bucket size option'
195 if (!Number.isSafeInteger(opts
.tasksQueueBucketSize
)) {
197 'Cannot construct a worker node with a tasks queue bucket size option that is not an integer'
200 if (opts
.tasksQueueBucketSize
<= 0) {
201 throw new RangeError(
202 'Cannot construct a worker node with a tasks queue bucket size option that is not a positive integer'
205 if (opts
.tasksQueuePriority
== null) {
207 'Cannot construct a worker node without a tasks queue priority option'
210 if (typeof opts
.tasksQueuePriority
!== 'boolean') {
212 'Cannot construct a worker node with a tasks queue priority option that is not a boolean'
218 * Updates the given measurement statistics.
220 * @param measurementStatistics - The measurement statistics to update.
221 * @param measurementRequirements - The measurement statistics requirements.
222 * @param measurementValue - The measurement value.
225 const updateMeasurementStatistics
= (
226 measurementStatistics
: MeasurementStatistics
,
227 measurementRequirements
: MeasurementStatisticsRequirements
| undefined,
228 measurementValue
: number | undefined
231 measurementRequirements
!= null &&
232 measurementValue
!= null &&
233 measurementRequirements
.aggregate
235 measurementStatistics
.aggregate
=
236 (measurementStatistics
.aggregate
?? 0) + measurementValue
237 measurementStatistics
.minimum
= min(
239 measurementStatistics
.minimum
?? Number.POSITIVE_INFINITY
241 measurementStatistics
.maximum
= max(
243 measurementStatistics
.maximum
?? Number.NEGATIVE_INFINITY
245 if (measurementRequirements
.average
|| measurementRequirements
.median
) {
246 measurementStatistics
.history
.put(measurementValue
)
247 if (measurementRequirements
.average
) {
248 measurementStatistics
.average
= average(
249 measurementStatistics
.history
.toArray()
251 } else if (measurementStatistics
.average
!= null) {
252 delete measurementStatistics
.average
254 if (measurementRequirements
.median
) {
255 measurementStatistics
.median
= median(
256 measurementStatistics
.history
.toArray()
258 } else if (measurementStatistics
.median
!= null) {
259 delete measurementStatistics
.median
264 if (env
.NODE_ENV
=== 'test') {
265 // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
266 exports
.updateMeasurementStatistics
= updateMeasurementStatistics
269 export const updateWaitTimeWorkerUsage
= <
270 Worker
extends IWorker
,
274 workerChoiceStrategiesContext
:
275 | WorkerChoiceStrategiesContext
<Worker
, Data
, Response
>
277 workerUsage
: WorkerUsage
,
280 const timestamp
= performance
.now()
281 const taskWaitTime
= timestamp
- (task
.timestamp
?? timestamp
)
282 updateMeasurementStatistics(
283 workerUsage
.waitTime
,
284 workerChoiceStrategiesContext
?.getTaskStatisticsRequirements().waitTime
,
289 export const updateTaskStatisticsWorkerUsage
= <Response
= unknown
>(
290 workerUsage
: WorkerUsage
,
291 message
: MessageValue
<Response
>
293 const workerTaskStatistics
= workerUsage
.tasks
295 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
296 workerTaskStatistics
.executing
!= null &&
297 workerTaskStatistics
.executing
> 0
299 --workerTaskStatistics
.executing
301 if (message
.workerError
== null) {
302 ++workerTaskStatistics
.executed
304 ++workerTaskStatistics
.failed
308 export const updateRunTimeWorkerUsage
= <
309 Worker
extends IWorker
,
313 workerChoiceStrategiesContext
:
314 | WorkerChoiceStrategiesContext
<Worker
, Data
, Response
>
316 workerUsage
: WorkerUsage
,
317 message
: MessageValue
<Response
>
319 if (message
.workerError
!= null) {
322 updateMeasurementStatistics(
324 workerChoiceStrategiesContext
?.getTaskStatisticsRequirements().runTime
,
325 message
.taskPerformance
?.runTime
?? 0
329 export const updateEluWorkerUsage
= <
330 Worker
extends IWorker
,
334 workerChoiceStrategiesContext
:
335 | WorkerChoiceStrategiesContext
<Worker
, Data
, Response
>
337 workerUsage
: WorkerUsage
,
338 message
: MessageValue
<Response
>
340 if (message
.workerError
!= null) {
343 const eluTaskStatisticsRequirements
=
344 workerChoiceStrategiesContext
?.getTaskStatisticsRequirements().elu
345 updateMeasurementStatistics(
346 workerUsage
.elu
.active
,
347 eluTaskStatisticsRequirements
,
348 message
.taskPerformance
?.elu
?.active
?? 0
350 updateMeasurementStatistics(
351 workerUsage
.elu
.idle
,
352 eluTaskStatisticsRequirements
,
353 message
.taskPerformance
?.elu
?.idle
?? 0
355 if (eluTaskStatisticsRequirements
?.aggregate
=== true) {
356 if (message
.taskPerformance
?.elu
!= null) {
357 if (workerUsage
.elu
.utilization
!= null) {
358 workerUsage
.elu
.utilization
=
359 (workerUsage
.elu
.utilization
+
360 message
.taskPerformance
.elu
.utilization
) /
363 workerUsage
.elu
.utilization
= message
.taskPerformance
.elu
.utilization
369 export const createWorker
= <Worker
extends IWorker
>(
372 opts
: { env
?: Record
<string, unknown
>, workerOptions
?: WorkerOptions
}
375 case WorkerTypes
.thread
:
376 return new ThreadWorker(filePath
, {
378 ...opts
.workerOptions
379 }) as unknown
as Worker
380 case WorkerTypes
.cluster
:
381 return cluster
.fork(opts
.env
) as unknown
as Worker
383 // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
384 throw new Error(`Unknown worker type '${type}'`)
389 * Returns the worker type of the given worker.
391 * @param worker - The worker to get the type of.
392 * @returns The worker type of the given worker.
395 export const getWorkerType
= (worker
: IWorker
): WorkerType
| undefined => {
396 if (worker
instanceof ThreadWorker
) {
397 return WorkerTypes
.thread
398 } else if (worker
instanceof ClusterWorker
) {
399 return WorkerTypes
.cluster
404 * Returns the worker id of the given worker.
406 * @param worker - The worker to get the id of.
407 * @returns The worker id of the given worker.
410 export const getWorkerId
= (worker
: IWorker
): number | undefined => {
411 if (worker
instanceof ThreadWorker
) {
412 return worker
.threadId
413 } else if (worker
instanceof ClusterWorker
) {
418 export const waitWorkerNodeEvents
= async <
419 Worker
extends IWorker
,
422 workerNode
: IWorkerNode
<Worker
, Data
>,
423 workerNodeEvent
: string,
424 numberOfEventsToWait
: number,
426 ): Promise
<number> => {
427 return await new Promise
<number>(resolve
=> {
429 if (numberOfEventsToWait
=== 0) {
433 switch (workerNodeEvent
) {
437 workerNode
.on(workerNodeEvent
, () => {
439 if (events
=== numberOfEventsToWait
) {
445 throw new Error('Invalid worker node event')