1 import cluster
, { Worker
as ClusterWorker
} from
'node:cluster'
2 import { existsSync
} from
'node:fs'
3 import { env
} from
'node:process'
6 Worker
as ThreadWorker
,
8 } from
'node:worker_threads'
10 import type { MessageValue
, Task
} from
'../utility-types.js'
11 import { average
, isPlainObject
, max
, median
, min
} from
'../utils.js'
12 import type { TasksQueueOptions
} from
'./pool.js'
14 type MeasurementStatisticsRequirements
,
15 WorkerChoiceStrategies
,
16 type WorkerChoiceStrategy
17 } from
'./selection-strategies/selection-strategies-types.js'
18 import type { WorkerChoiceStrategiesContext
} from
'./selection-strategies/worker-choice-strategies-context.js'
22 type MeasurementStatistics
,
23 type WorkerNodeOptions
,
30 * Default measurement statistics requirements.
32 export const DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS
: MeasurementStatisticsRequirements
=
39 export const getDefaultTasksQueueOptions
= (
41 ): Required
<TasksQueueOptions
> => {
43 size
: Math.pow(poolMaxSize
, 2),
46 tasksStealingOnBackPressure
: true,
47 tasksFinishedTimeout
: 2000
51 export const checkFilePath
= (filePath
: string | undefined): void => {
52 if (filePath
== null) {
53 throw new TypeError('The worker file path must be specified')
55 if (typeof filePath
!== 'string') {
56 throw new TypeError('The worker file path must be a string')
58 if (!existsSync(filePath
)) {
59 throw new Error(`Cannot find the worker file '${filePath}'`)
63 export const checkDynamicPoolSize
= (
65 max
: number | undefined
69 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
71 } else if (!Number.isSafeInteger(max
)) {
73 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
75 } else if (min
> max
) {
77 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
79 } else if (max
=== 0) {
81 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
83 } else if (min
=== max
) {
85 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
90 export const checkValidPriority
= (priority
: number | undefined): void => {
91 if (priority
!= null && !Number.isSafeInteger(priority
)) {
92 throw new TypeError(`Invalid property 'priority': '${priority}'`)
96 Number.isSafeInteger(priority
) &&
97 (priority
< -20 || priority
> 19)
99 throw new RangeError("Property 'priority' must be between -20 and 19")
103 export const checkValidWorkerChoiceStrategy
= (
104 workerChoiceStrategy
: WorkerChoiceStrategy
| undefined
107 workerChoiceStrategy
!= null &&
108 !Object.values(WorkerChoiceStrategies
).includes(workerChoiceStrategy
)
110 throw new Error(`Invalid worker choice strategy '${workerChoiceStrategy}'`)
114 export const checkValidTasksQueueOptions
= (
115 tasksQueueOptions
: TasksQueueOptions
| undefined
117 if (tasksQueueOptions
!= null && !isPlainObject(tasksQueueOptions
)) {
118 throw new TypeError('Invalid tasks queue options: must be a plain object')
121 tasksQueueOptions
?.concurrency
!= null &&
122 !Number.isSafeInteger(tasksQueueOptions
.concurrency
)
125 'Invalid worker node tasks concurrency: must be an integer'
129 tasksQueueOptions
?.concurrency
!= null &&
130 tasksQueueOptions
.concurrency
<= 0
132 throw new RangeError(
133 `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero`
137 tasksQueueOptions
?.size
!= null &&
138 !Number.isSafeInteger(tasksQueueOptions
.size
)
141 'Invalid worker node tasks queue size: must be an integer'
144 if (tasksQueueOptions
?.size
!= null && tasksQueueOptions
.size
<= 0) {
145 throw new RangeError(
146 `Invalid worker node tasks queue size: ${tasksQueueOptions.size} is a negative integer or zero`
151 export const checkWorkerNodeArguments
= (
152 type: WorkerType
| undefined,
153 filePath
: string | undefined,
154 opts
: WorkerNodeOptions
| undefined
157 throw new TypeError('Cannot construct a worker node without a worker type')
159 if (!Object.values(WorkerTypes
).includes(type)) {
161 `Cannot construct a worker node with an invalid worker type '${type}'`
164 checkFilePath(filePath
)
167 'Cannot construct a worker node without worker node options'
170 if (!isPlainObject(opts
)) {
172 'Cannot construct a worker node with invalid options: must be a plain object'
175 if (opts
.tasksQueueBackPressureSize
== null) {
177 'Cannot construct a worker node without a tasks queue back pressure size option'
180 if (!Number.isSafeInteger(opts
.tasksQueueBackPressureSize
)) {
182 'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer'
185 if (opts
.tasksQueueBackPressureSize
<= 0) {
186 throw new RangeError(
187 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer'
193 * Updates the given measurement statistics.
195 * @param measurementStatistics - The measurement statistics to update.
196 * @param measurementRequirements - The measurement statistics requirements.
197 * @param measurementValue - The measurement value.
200 const updateMeasurementStatistics
= (
201 measurementStatistics
: MeasurementStatistics
,
202 measurementRequirements
: MeasurementStatisticsRequirements
| undefined,
203 measurementValue
: number | undefined
206 measurementRequirements
!= null &&
207 measurementValue
!= null &&
208 measurementRequirements
.aggregate
210 measurementStatistics
.aggregate
=
211 (measurementStatistics
.aggregate
?? 0) + measurementValue
212 measurementStatistics
.minimum
= min(
214 measurementStatistics
.minimum
?? Infinity
216 measurementStatistics
.maximum
= max(
218 measurementStatistics
.maximum
?? -Infinity
220 if (measurementRequirements
.average
|| measurementRequirements
.median
) {
221 measurementStatistics
.history
.push(measurementValue
)
222 if (measurementRequirements
.average
) {
223 measurementStatistics
.average
= average(measurementStatistics
.history
)
224 } else if (measurementStatistics
.average
!= null) {
225 delete measurementStatistics
.average
227 if (measurementRequirements
.median
) {
228 measurementStatistics
.median
= median(measurementStatistics
.history
)
229 } else if (measurementStatistics
.median
!= null) {
230 delete measurementStatistics
.median
235 if (env
.NODE_ENV
=== 'test') {
236 // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
237 exports
.updateMeasurementStatistics
= updateMeasurementStatistics
240 export const updateWaitTimeWorkerUsage
= <
241 Worker
extends IWorker
,
245 workerChoiceStrategiesContext
:
246 | WorkerChoiceStrategiesContext
<Worker
, Data
, Response
>
248 workerUsage
: WorkerUsage
,
251 const timestamp
= performance
.now()
252 const taskWaitTime
= timestamp
- (task
.timestamp
?? timestamp
)
253 updateMeasurementStatistics(
254 workerUsage
.waitTime
,
255 workerChoiceStrategiesContext
?.getTaskStatisticsRequirements().waitTime
,
260 export const updateTaskStatisticsWorkerUsage
= <Response
= unknown
>(
261 workerUsage
: WorkerUsage
,
262 message
: MessageValue
<Response
>
264 const workerTaskStatistics
= workerUsage
.tasks
266 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
267 workerTaskStatistics
.executing
!= null &&
268 workerTaskStatistics
.executing
> 0
270 --workerTaskStatistics
.executing
272 if (message
.workerError
== null) {
273 ++workerTaskStatistics
.executed
275 ++workerTaskStatistics
.failed
279 export const updateRunTimeWorkerUsage
= <
280 Worker
extends IWorker
,
284 workerChoiceStrategiesContext
:
285 | WorkerChoiceStrategiesContext
<Worker
, Data
, Response
>
287 workerUsage
: WorkerUsage
,
288 message
: MessageValue
<Response
>
290 if (message
.workerError
!= null) {
293 updateMeasurementStatistics(
295 workerChoiceStrategiesContext
?.getTaskStatisticsRequirements().runTime
,
296 message
.taskPerformance
?.runTime
?? 0
300 export const updateEluWorkerUsage
= <
301 Worker
extends IWorker
,
305 workerChoiceStrategiesContext
:
306 | WorkerChoiceStrategiesContext
<Worker
, Data
, Response
>
308 workerUsage
: WorkerUsage
,
309 message
: MessageValue
<Response
>
311 if (message
.workerError
!= null) {
314 const eluTaskStatisticsRequirements
=
315 workerChoiceStrategiesContext
?.getTaskStatisticsRequirements().elu
316 updateMeasurementStatistics(
317 workerUsage
.elu
.active
,
318 eluTaskStatisticsRequirements
,
319 message
.taskPerformance
?.elu
?.active
?? 0
321 updateMeasurementStatistics(
322 workerUsage
.elu
.idle
,
323 eluTaskStatisticsRequirements
,
324 message
.taskPerformance
?.elu
?.idle
?? 0
326 if (eluTaskStatisticsRequirements
?.aggregate
=== true) {
327 if (message
.taskPerformance
?.elu
!= null) {
328 if (workerUsage
.elu
.utilization
!= null) {
329 workerUsage
.elu
.utilization
=
330 (workerUsage
.elu
.utilization
+
331 message
.taskPerformance
.elu
.utilization
) /
334 workerUsage
.elu
.utilization
= message
.taskPerformance
.elu
.utilization
340 export const createWorker
= <Worker
extends IWorker
>(
343 opts
: { env
?: Record
<string, unknown
>, workerOptions
?: WorkerOptions
}
346 case WorkerTypes
.thread
:
347 return new ThreadWorker(filePath
, {
349 ...opts
.workerOptions
350 }) as unknown
as Worker
351 case WorkerTypes
.cluster
:
352 return cluster
.fork(opts
.env
) as unknown
as Worker
354 // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
355 throw new Error(`Unknown worker type '${type}'`)
360 * Returns the worker type of the given worker.
362 * @param worker - The worker to get the type of.
363 * @returns The worker type of the given worker.
366 export const getWorkerType
= (worker
: IWorker
): WorkerType
| undefined => {
367 if (worker
instanceof ThreadWorker
) {
368 return WorkerTypes
.thread
369 } else if (worker
instanceof ClusterWorker
) {
370 return WorkerTypes
.cluster
375 * Returns the worker id of the given worker.
377 * @param worker - The worker to get the id of.
378 * @returns The worker id of the given worker.
381 export const getWorkerId
= (worker
: IWorker
): number | undefined => {
382 if (worker
instanceof ThreadWorker
) {
383 return worker
.threadId
384 } else if (worker
instanceof ClusterWorker
) {
389 export const waitWorkerNodeEvents
= async <
390 Worker
extends IWorker
,
393 workerNode
: IWorkerNode
<Worker
, Data
>,
394 workerNodeEvent
: string,
395 numberOfEventsToWait
: number,
397 ): Promise
<number> => {
398 return await new Promise
<number>(resolve
=> {
400 if (numberOfEventsToWait
=== 0) {
404 workerNode
.on(workerNodeEvent
, () => {
406 if (events
=== numberOfEventsToWait
) {