1 import cluster
, { Worker
as ClusterWorker
} from
'node:cluster'
2 import { existsSync
} from
'node:fs'
3 import { env
} from
'node:process'
6 Worker
as ThreadWorker
,
8 } from
'node:worker_threads'
10 import type { MessageValue
, Task
} from
'../utility-types.js'
11 import { average
, isPlainObject
, max
, median
, min
} from
'../utils.js'
12 import type { TasksQueueOptions
} from
'./pool.js'
14 type MeasurementStatisticsRequirements
,
15 WorkerChoiceStrategies
,
16 type WorkerChoiceStrategy
17 } from
'./selection-strategies/selection-strategies-types.js'
18 import type { WorkerChoiceStrategiesContext
} from
'./selection-strategies/worker-choice-strategies-context.js'
22 type MeasurementStatistics
,
23 type WorkerNodeOptions
,
30 * Default measurement statistics requirements.
32 export const DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS
: MeasurementStatisticsRequirements
=
39 export const getDefaultTasksQueueOptions
= (
41 ): Required
<TasksQueueOptions
> => {
43 size
: Math.pow(poolMaxSize
, 2),
46 tasksStealingOnBackPressure
: false,
47 tasksFinishedTimeout
: 2000
51 export const checkFilePath
= (filePath
: string | undefined): void => {
52 if (filePath
== null) {
53 throw new TypeError('The worker file path must be specified')
55 if (typeof filePath
!== 'string') {
56 throw new TypeError('The worker file path must be a string')
58 if (!existsSync(filePath
)) {
59 throw new Error(`Cannot find the worker file '${filePath}'`)
63 export const checkDynamicPoolSize
= (
65 max
: number | undefined
69 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
71 } else if (!Number.isSafeInteger(max
)) {
73 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
75 } else if (min
> max
) {
77 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
79 } else if (max
=== 0) {
81 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
83 } else if (min
=== max
) {
85 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
90 export const checkValidPriority
= (priority
: number | undefined): void => {
91 if (priority
!= null && !Number.isSafeInteger(priority
)) {
92 throw new TypeError(`Invalid property 'priority': '${priority}'`)
96 Number.isSafeInteger(priority
) &&
97 (priority
< -20 || priority
> 19)
99 throw new RangeError("Property 'priority' must be between -20 and 19")
103 export const checkValidWorkerChoiceStrategy
= (
104 workerChoiceStrategy
: WorkerChoiceStrategy
| undefined
107 workerChoiceStrategy
!= null &&
108 !Object.values(WorkerChoiceStrategies
).includes(workerChoiceStrategy
)
110 throw new Error(`Invalid worker choice strategy '${workerChoiceStrategy}'`)
114 export const checkValidTasksQueueOptions
= (
115 tasksQueueOptions
: TasksQueueOptions
| undefined
117 if (tasksQueueOptions
!= null && !isPlainObject(tasksQueueOptions
)) {
118 throw new TypeError('Invalid tasks queue options: must be a plain object')
121 tasksQueueOptions
?.concurrency
!= null &&
122 !Number.isSafeInteger(tasksQueueOptions
.concurrency
)
125 'Invalid worker node tasks concurrency: must be an integer'
129 tasksQueueOptions
?.concurrency
!= null &&
130 tasksQueueOptions
.concurrency
<= 0
132 throw new RangeError(
133 `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero`
137 tasksQueueOptions
?.size
!= null &&
138 !Number.isSafeInteger(tasksQueueOptions
.size
)
141 'Invalid worker node tasks queue size: must be an integer'
144 if (tasksQueueOptions
?.size
!= null && tasksQueueOptions
.size
<= 0) {
145 throw new RangeError(
146 `Invalid worker node tasks queue size: ${tasksQueueOptions.size} is a negative integer or zero`
151 export const checkWorkerNodeArguments
= (
152 type: WorkerType
| undefined,
153 filePath
: string | undefined,
154 opts
: WorkerNodeOptions
| undefined
157 throw new TypeError('Cannot construct a worker node without a worker type')
159 if (!Object.values(WorkerTypes
).includes(type)) {
161 `Cannot construct a worker node with an invalid worker type '${type}'`
164 checkFilePath(filePath
)
167 'Cannot construct a worker node without worker node options'
170 if (!isPlainObject(opts
)) {
172 'Cannot construct a worker node with invalid options: must be a plain object'
175 if (opts
.tasksQueueBackPressureSize
== null) {
177 'Cannot construct a worker node without a tasks queue back pressure size option'
180 if (!Number.isSafeInteger(opts
.tasksQueueBackPressureSize
)) {
182 'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer'
185 if (opts
.tasksQueueBackPressureSize
<= 0) {
186 throw new RangeError(
187 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer'
190 if (opts
.tasksQueueBucketSize
== null) {
192 'Cannot construct a worker node without a tasks queue bucket size option'
195 if (!Number.isSafeInteger(opts
.tasksQueueBucketSize
)) {
197 'Cannot construct a worker node with a tasks queue bucket size option that is not an integer'
200 if (opts
.tasksQueueBucketSize
<= 0) {
201 throw new RangeError(
202 'Cannot construct a worker node with a tasks queue bucket size option that is not a positive integer'
208 * Updates the given measurement statistics.
210 * @param measurementStatistics - The measurement statistics to update.
211 * @param measurementRequirements - The measurement statistics requirements.
212 * @param measurementValue - The measurement value.
215 const updateMeasurementStatistics
= (
216 measurementStatistics
: MeasurementStatistics
,
217 measurementRequirements
: MeasurementStatisticsRequirements
| undefined,
218 measurementValue
: number | undefined
221 measurementRequirements
!= null &&
222 measurementValue
!= null &&
223 measurementRequirements
.aggregate
225 measurementStatistics
.aggregate
=
226 (measurementStatistics
.aggregate
?? 0) + measurementValue
227 measurementStatistics
.minimum
= min(
229 measurementStatistics
.minimum
?? Infinity
231 measurementStatistics
.maximum
= max(
233 measurementStatistics
.maximum
?? -Infinity
235 if (measurementRequirements
.average
|| measurementRequirements
.median
) {
236 measurementStatistics
.history
.push(measurementValue
)
237 if (measurementRequirements
.average
) {
238 measurementStatistics
.average
= average(measurementStatistics
.history
)
239 } else if (measurementStatistics
.average
!= null) {
240 delete measurementStatistics
.average
242 if (measurementRequirements
.median
) {
243 measurementStatistics
.median
= median(measurementStatistics
.history
)
244 } else if (measurementStatistics
.median
!= null) {
245 delete measurementStatistics
.median
250 if (env
.NODE_ENV
=== 'test') {
251 // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
252 exports
.updateMeasurementStatistics
= updateMeasurementStatistics
255 export const updateWaitTimeWorkerUsage
= <
256 Worker
extends IWorker
,
260 workerChoiceStrategiesContext
:
261 | WorkerChoiceStrategiesContext
<Worker
, Data
, Response
>
263 workerUsage
: WorkerUsage
,
266 const timestamp
= performance
.now()
267 const taskWaitTime
= timestamp
- (task
.timestamp
?? timestamp
)
268 updateMeasurementStatistics(
269 workerUsage
.waitTime
,
270 workerChoiceStrategiesContext
?.getTaskStatisticsRequirements().waitTime
,
275 export const updateTaskStatisticsWorkerUsage
= <Response
= unknown
>(
276 workerUsage
: WorkerUsage
,
277 message
: MessageValue
<Response
>
279 const workerTaskStatistics
= workerUsage
.tasks
281 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
282 workerTaskStatistics
.executing
!= null &&
283 workerTaskStatistics
.executing
> 0
285 --workerTaskStatistics
.executing
287 if (message
.workerError
== null) {
288 ++workerTaskStatistics
.executed
290 ++workerTaskStatistics
.failed
294 export const updateRunTimeWorkerUsage
= <
295 Worker
extends IWorker
,
299 workerChoiceStrategiesContext
:
300 | WorkerChoiceStrategiesContext
<Worker
, Data
, Response
>
302 workerUsage
: WorkerUsage
,
303 message
: MessageValue
<Response
>
305 if (message
.workerError
!= null) {
308 updateMeasurementStatistics(
310 workerChoiceStrategiesContext
?.getTaskStatisticsRequirements().runTime
,
311 message
.taskPerformance
?.runTime
?? 0
315 export const updateEluWorkerUsage
= <
316 Worker
extends IWorker
,
320 workerChoiceStrategiesContext
:
321 | WorkerChoiceStrategiesContext
<Worker
, Data
, Response
>
323 workerUsage
: WorkerUsage
,
324 message
: MessageValue
<Response
>
326 if (message
.workerError
!= null) {
329 const eluTaskStatisticsRequirements
=
330 workerChoiceStrategiesContext
?.getTaskStatisticsRequirements().elu
331 updateMeasurementStatistics(
332 workerUsage
.elu
.active
,
333 eluTaskStatisticsRequirements
,
334 message
.taskPerformance
?.elu
?.active
?? 0
336 updateMeasurementStatistics(
337 workerUsage
.elu
.idle
,
338 eluTaskStatisticsRequirements
,
339 message
.taskPerformance
?.elu
?.idle
?? 0
341 if (eluTaskStatisticsRequirements
?.aggregate
=== true) {
342 if (message
.taskPerformance
?.elu
!= null) {
343 if (workerUsage
.elu
.utilization
!= null) {
344 workerUsage
.elu
.utilization
=
345 (workerUsage
.elu
.utilization
+
346 message
.taskPerformance
.elu
.utilization
) /
349 workerUsage
.elu
.utilization
= message
.taskPerformance
.elu
.utilization
355 export const createWorker
= <Worker
extends IWorker
>(
358 opts
: { env
?: Record
<string, unknown
>, workerOptions
?: WorkerOptions
}
361 case WorkerTypes
.thread
:
362 return new ThreadWorker(filePath
, {
364 ...opts
.workerOptions
365 }) as unknown
as Worker
366 case WorkerTypes
.cluster
:
367 return cluster
.fork(opts
.env
) as unknown
as Worker
369 // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
370 throw new Error(`Unknown worker type '${type}'`)
375 * Returns the worker type of the given worker.
377 * @param worker - The worker to get the type of.
378 * @returns The worker type of the given worker.
381 export const getWorkerType
= (worker
: IWorker
): WorkerType
| undefined => {
382 if (worker
instanceof ThreadWorker
) {
383 return WorkerTypes
.thread
384 } else if (worker
instanceof ClusterWorker
) {
385 return WorkerTypes
.cluster
390 * Returns the worker id of the given worker.
392 * @param worker - The worker to get the id of.
393 * @returns The worker id of the given worker.
396 export const getWorkerId
= (worker
: IWorker
): number | undefined => {
397 if (worker
instanceof ThreadWorker
) {
398 return worker
.threadId
399 } else if (worker
instanceof ClusterWorker
) {
404 export const waitWorkerNodeEvents
= async <
405 Worker
extends IWorker
,
408 workerNode
: IWorkerNode
<Worker
, Data
>,
409 workerNodeEvent
: string,
410 numberOfEventsToWait
: number,
412 ): Promise
<number> => {
413 return await new Promise
<number>(resolve
=> {
415 if (numberOfEventsToWait
=== 0) {
419 switch (workerNodeEvent
) {
423 workerNode
.on(workerNodeEvent
, () => {
425 if (events
=== numberOfEventsToWait
) {
431 throw new Error('Invalid worker node event')