1 import cluster
, { Worker
as ClusterWorker
} from
'node:cluster'
2 import { existsSync
} from
'node:fs'
3 import { env
} from
'node:process'
6 Worker
as ThreadWorker
,
8 } from
'node:worker_threads'
10 import type { MessageValue
, Task
} from
'../utility-types.js'
11 import { average
, isPlainObject
, max
, median
, min
} from
'../utils.js'
12 import type { TasksQueueOptions
} from
'./pool.js'
14 type MeasurementStatisticsRequirements
,
15 WorkerChoiceStrategies
,
16 type WorkerChoiceStrategy
,
17 } from
'./selection-strategies/selection-strategies-types.js'
18 import type { WorkerChoiceStrategiesContext
} from
'./selection-strategies/worker-choice-strategies-context.js'
22 type MeasurementStatistics
,
23 type WorkerNodeOptions
,
30 * Default measurement statistics requirements.
32 export const DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS
: MeasurementStatisticsRequirements
=
39 export const getDefaultTasksQueueOptions
= (
41 ): Required
<TasksQueueOptions
> => {
43 size
: Math.pow(poolMaxSize
, 2),
46 tasksStealingOnBackPressure
: false,
47 tasksFinishedTimeout
: 2000,
51 export const checkFilePath
= (filePath
: string | undefined): void => {
52 if (filePath
== null) {
53 throw new TypeError('The worker file path must be specified')
55 if (typeof filePath
!== 'string') {
56 throw new TypeError('The worker file path must be a string')
58 if (!existsSync(filePath
)) {
59 throw new Error(`Cannot find the worker file '${filePath}'`)
63 export const checkDynamicPoolSize
= (
65 max
: number | undefined
69 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
71 } else if (!Number.isSafeInteger(max
)) {
73 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
75 } else if (min
> max
) {
77 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
79 } else if (max
=== 0) {
81 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
83 } else if (min
=== max
) {
85 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
90 export const checkValidPriority
= (priority
: number | undefined): void => {
91 if (priority
!= null && !Number.isSafeInteger(priority
)) {
92 throw new TypeError(`Invalid property 'priority': '${priority.toString()}'`)
96 Number.isSafeInteger(priority
) &&
97 (priority
< -20 || priority
> 19)
99 throw new RangeError("Property 'priority' must be between -20 and 19")
103 export const checkValidWorkerChoiceStrategy
= (
104 workerChoiceStrategy
: WorkerChoiceStrategy
| undefined
107 workerChoiceStrategy
!= null &&
108 !Object.values(WorkerChoiceStrategies
).includes(workerChoiceStrategy
)
110 throw new Error(`Invalid worker choice strategy '${workerChoiceStrategy}'`)
114 export const checkValidTasksQueueOptions
= (
115 tasksQueueOptions
: TasksQueueOptions
| undefined
117 if (tasksQueueOptions
!= null && !isPlainObject(tasksQueueOptions
)) {
118 throw new TypeError('Invalid tasks queue options: must be a plain object')
121 tasksQueueOptions
?.concurrency
!= null &&
122 !Number.isSafeInteger(tasksQueueOptions
.concurrency
)
125 'Invalid worker node tasks concurrency: must be an integer'
129 tasksQueueOptions
?.concurrency
!= null &&
130 tasksQueueOptions
.concurrency
<= 0
132 throw new RangeError(
133 `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency.toString()} is a negative integer or zero`
137 tasksQueueOptions
?.size
!= null &&
138 !Number.isSafeInteger(tasksQueueOptions
.size
)
141 'Invalid worker node tasks queue size: must be an integer'
144 if (tasksQueueOptions
?.size
!= null && tasksQueueOptions
.size
<= 0) {
145 throw new RangeError(
146 `Invalid worker node tasks queue size: ${tasksQueueOptions.size.toString()} is a negative integer or zero`
151 export const checkWorkerNodeArguments
= (
152 type: WorkerType
| undefined,
153 filePath
: string | undefined,
154 opts
: WorkerNodeOptions
| undefined
157 throw new TypeError('Cannot construct a worker node without a worker type')
159 if (!Object.values(WorkerTypes
).includes(type)) {
161 `Cannot construct a worker node with an invalid worker type '${type}'`
164 checkFilePath(filePath
)
167 'Cannot construct a worker node without worker node options'
170 if (!isPlainObject(opts
)) {
172 'Cannot construct a worker node with invalid worker node options: must be a plain object'
175 if (opts
.tasksQueueBackPressureSize
== null) {
177 'Cannot construct a worker node without a tasks queue back pressure size option'
180 if (!Number.isSafeInteger(opts
.tasksQueueBackPressureSize
)) {
182 'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer'
185 if (opts
.tasksQueueBackPressureSize
<= 0) {
186 throw new RangeError(
187 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer'
190 if (opts
.tasksQueueBucketSize
== null) {
192 'Cannot construct a worker node without a tasks queue bucket size option'
195 if (!Number.isSafeInteger(opts
.tasksQueueBucketSize
)) {
197 'Cannot construct a worker node with a tasks queue bucket size option that is not an integer'
200 if (opts
.tasksQueueBucketSize
<= 0) {
201 throw new RangeError(
202 'Cannot construct a worker node with a tasks queue bucket size option that is not a positive integer'
205 if (opts
.tasksQueuePriority
== null) {
207 'Cannot construct a worker node without a tasks queue priority option'
210 if (typeof opts
.tasksQueuePriority
!== 'boolean') {
212 'Cannot construct a worker node with a tasks queue priority option that is not a boolean'
218 * Updates the given measurement statistics.
219 * @param measurementStatistics - The measurement statistics to update.
220 * @param measurementRequirements - The measurement statistics requirements.
221 * @param measurementValue - The measurement value.
224 const updateMeasurementStatistics
= (
225 measurementStatistics
: MeasurementStatistics
,
226 measurementRequirements
: MeasurementStatisticsRequirements
| undefined,
227 measurementValue
: number | undefined
230 measurementRequirements
!= null &&
231 measurementValue
!= null &&
232 measurementRequirements
.aggregate
234 measurementStatistics
.aggregate
=
235 (measurementStatistics
.aggregate
?? 0) + measurementValue
236 measurementStatistics
.minimum
= min(
238 measurementStatistics
.minimum
?? Number.POSITIVE_INFINITY
240 measurementStatistics
.maximum
= max(
242 measurementStatistics
.maximum
?? Number.NEGATIVE_INFINITY
244 if (measurementRequirements
.average
|| measurementRequirements
.median
) {
245 measurementStatistics
.history
.put(measurementValue
)
246 if (measurementRequirements
.average
) {
247 measurementStatistics
.average
= average(
248 measurementStatistics
.history
.toArray()
250 } else if (measurementStatistics
.average
!= null) {
251 delete measurementStatistics
.average
253 if (measurementRequirements
.median
) {
254 measurementStatistics
.median
= median(
255 measurementStatistics
.history
.toArray()
257 } else if (measurementStatistics
.median
!= null) {
258 delete measurementStatistics
.median
263 if (env
.NODE_ENV
=== 'test') {
264 // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
265 exports
.updateMeasurementStatistics
= updateMeasurementStatistics
268 export const updateWaitTimeWorkerUsage
= <
269 Worker
extends IWorker
,
273 workerChoiceStrategiesContext
:
274 | WorkerChoiceStrategiesContext
<Worker
, Data
, Response
>
276 workerUsage
: WorkerUsage
,
279 const timestamp
= performance
.now()
280 const taskWaitTime
= timestamp
- (task
.timestamp
?? timestamp
)
281 updateMeasurementStatistics(
282 workerUsage
.waitTime
,
283 workerChoiceStrategiesContext
?.getTaskStatisticsRequirements().waitTime
,
288 export const updateTaskStatisticsWorkerUsage
= <Response
= unknown
>(
289 workerUsage
: WorkerUsage
,
290 message
: MessageValue
<Response
>
292 const workerTaskStatistics
= workerUsage
.tasks
294 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
295 workerTaskStatistics
.executing
!= null &&
296 workerTaskStatistics
.executing
> 0
298 --workerTaskStatistics
.executing
300 if (message
.workerError
== null) {
301 ++workerTaskStatistics
.executed
303 ++workerTaskStatistics
.failed
307 export const updateRunTimeWorkerUsage
= <
308 Worker
extends IWorker
,
312 workerChoiceStrategiesContext
:
313 | WorkerChoiceStrategiesContext
<Worker
, Data
, Response
>
315 workerUsage
: WorkerUsage
,
316 message
: MessageValue
<Response
>
318 if (message
.workerError
!= null) {
321 updateMeasurementStatistics(
323 workerChoiceStrategiesContext
?.getTaskStatisticsRequirements().runTime
,
324 message
.taskPerformance
?.runTime
?? 0
328 export const updateEluWorkerUsage
= <
329 Worker
extends IWorker
,
333 workerChoiceStrategiesContext
:
334 | WorkerChoiceStrategiesContext
<Worker
, Data
, Response
>
336 workerUsage
: WorkerUsage
,
337 message
: MessageValue
<Response
>
339 if (message
.workerError
!= null) {
342 const eluTaskStatisticsRequirements
=
343 workerChoiceStrategiesContext
?.getTaskStatisticsRequirements().elu
344 updateMeasurementStatistics(
345 workerUsage
.elu
.active
,
346 eluTaskStatisticsRequirements
,
347 message
.taskPerformance
?.elu
?.active
?? 0
349 updateMeasurementStatistics(
350 workerUsage
.elu
.idle
,
351 eluTaskStatisticsRequirements
,
352 message
.taskPerformance
?.elu
?.idle
?? 0
354 if (eluTaskStatisticsRequirements
?.aggregate
=== true) {
355 if (message
.taskPerformance
?.elu
!= null) {
356 if (workerUsage
.elu
.utilization
!= null) {
357 workerUsage
.elu
.utilization
=
358 (workerUsage
.elu
.utilization
+
359 message
.taskPerformance
.elu
.utilization
) /
362 workerUsage
.elu
.utilization
= message
.taskPerformance
.elu
.utilization
368 export const createWorker
= <Worker
extends IWorker
>(
371 opts
: { env
?: Record
<string, unknown
>; workerOptions
?: WorkerOptions
}
374 case WorkerTypes
.thread
:
375 return new ThreadWorker(filePath
, {
377 ...opts
.workerOptions
,
378 }) as unknown
as Worker
379 case WorkerTypes
.cluster
:
380 return cluster
.fork(opts
.env
) as unknown
as Worker
382 // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
383 throw new Error(`Unknown worker type '${type}'`)
388 * Returns the worker type of the given worker.
389 * @param worker - The worker to get the type of.
390 * @returns The worker type of the given worker.
393 export const getWorkerType
= (worker
: IWorker
): WorkerType
| undefined => {
394 if (worker
instanceof ThreadWorker
) {
395 return WorkerTypes
.thread
396 } else if (worker
instanceof ClusterWorker
) {
397 return WorkerTypes
.cluster
402 * Returns the worker id of the given worker.
403 * @param worker - The worker to get the id of.
404 * @returns The worker id of the given worker.
407 export const getWorkerId
= (worker
: IWorker
): number | undefined => {
408 if (worker
instanceof ThreadWorker
) {
409 return worker
.threadId
410 } else if (worker
instanceof ClusterWorker
) {
415 export const waitWorkerNodeEvents
= async <
416 Worker
extends IWorker
,
419 workerNode
: IWorkerNode
<Worker
, Data
>,
420 workerNodeEvent
: string,
421 numberOfEventsToWait
: number,
423 ): Promise
<number> => {
424 return await new Promise
<number>(resolve
=> {
426 if (numberOfEventsToWait
=== 0) {
430 switch (workerNodeEvent
) {
434 workerNode
.on(workerNodeEvent
, () => {
436 if (events
=== numberOfEventsToWait
) {
442 throw new Error('Invalid worker node event')