1 import cluster
, { Worker
as ClusterWorker
} from
'node:cluster'
2 import { existsSync
} from
'node:fs'
3 import { env
} from
'node:process'
6 Worker
as ThreadWorker
,
8 } from
'node:worker_threads'
10 import type { MessageValue
, Task
} from
'../utility-types.js'
11 import { average
, isPlainObject
, max
, median
, min
} from
'../utils.js'
12 import type { TasksQueueOptions
} from
'./pool.js'
14 type MeasurementStatisticsRequirements
,
15 WorkerChoiceStrategies
,
16 type WorkerChoiceStrategy
17 } from
'./selection-strategies/selection-strategies-types.js'
18 import type { WorkerChoiceStrategiesContext
} from
'./selection-strategies/worker-choice-strategies-context.js'
22 type MeasurementStatistics
,
23 type WorkerNodeOptions
,
30 * Default measurement statistics requirements.
32 export const DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS
: MeasurementStatisticsRequirements
=
39 export const getDefaultTasksQueueOptions
= (
41 ): Required
<TasksQueueOptions
> => {
43 size
: Math.pow(poolMaxSize
, 2),
46 tasksStealingOnBackPressure
: true,
47 tasksFinishedTimeout
: 2000
51 export const checkFilePath
= (filePath
: string | undefined): void => {
52 if (filePath
== null) {
53 throw new TypeError('The worker file path must be specified')
55 if (typeof filePath
!== 'string') {
56 throw new TypeError('The worker file path must be a string')
58 if (!existsSync(filePath
)) {
59 throw new Error(`Cannot find the worker file '${filePath}'`)
63 export const checkDynamicPoolSize
= (
65 max
: number | undefined
69 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
71 } else if (!Number.isSafeInteger(max
)) {
73 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
75 } else if (min
> max
) {
77 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
79 } else if (max
=== 0) {
81 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
83 } else if (min
=== max
) {
85 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
90 export const checkValidWorkerChoiceStrategy
= (
91 workerChoiceStrategy
: WorkerChoiceStrategy
| undefined
94 workerChoiceStrategy
!= null &&
95 !Object.values(WorkerChoiceStrategies
).includes(workerChoiceStrategy
)
97 throw new Error(`Invalid worker choice strategy '${workerChoiceStrategy}'`)
101 export const checkValidTasksQueueOptions
= (
102 tasksQueueOptions
: TasksQueueOptions
| undefined
104 if (tasksQueueOptions
!= null && !isPlainObject(tasksQueueOptions
)) {
105 throw new TypeError('Invalid tasks queue options: must be a plain object')
108 tasksQueueOptions
?.concurrency
!= null &&
109 !Number.isSafeInteger(tasksQueueOptions
.concurrency
)
112 'Invalid worker node tasks concurrency: must be an integer'
116 tasksQueueOptions
?.concurrency
!= null &&
117 tasksQueueOptions
.concurrency
<= 0
119 throw new RangeError(
120 `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero`
124 tasksQueueOptions
?.size
!= null &&
125 !Number.isSafeInteger(tasksQueueOptions
.size
)
128 'Invalid worker node tasks queue size: must be an integer'
131 if (tasksQueueOptions
?.size
!= null && tasksQueueOptions
.size
<= 0) {
132 throw new RangeError(
133 `Invalid worker node tasks queue size: ${tasksQueueOptions.size} is a negative integer or zero`
138 export const checkWorkerNodeArguments
= (
139 type: WorkerType
| undefined,
140 filePath
: string | undefined,
141 opts
: WorkerNodeOptions
| undefined
144 throw new TypeError('Cannot construct a worker node without a worker type')
146 if (!Object.values(WorkerTypes
).includes(type)) {
148 `Cannot construct a worker node with an invalid worker type '${type}'`
151 checkFilePath(filePath
)
154 'Cannot construct a worker node without worker node options'
157 if (!isPlainObject(opts
)) {
159 'Cannot construct a worker node with invalid options: must be a plain object'
162 if (opts
.tasksQueueBackPressureSize
== null) {
164 'Cannot construct a worker node without a tasks queue back pressure size option'
167 if (!Number.isSafeInteger(opts
.tasksQueueBackPressureSize
)) {
169 'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer'
172 if (opts
.tasksQueueBackPressureSize
<= 0) {
173 throw new RangeError(
174 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer'
180 * Updates the given measurement statistics.
182 * @param measurementStatistics - The measurement statistics to update.
183 * @param measurementRequirements - The measurement statistics requirements.
184 * @param measurementValue - The measurement value.
187 const updateMeasurementStatistics
= (
188 measurementStatistics
: MeasurementStatistics
,
189 measurementRequirements
: MeasurementStatisticsRequirements
| undefined,
190 measurementValue
: number | undefined
193 measurementRequirements
!= null &&
194 measurementValue
!= null &&
195 measurementRequirements
.aggregate
197 measurementStatistics
.aggregate
=
198 (measurementStatistics
.aggregate
?? 0) + measurementValue
199 measurementStatistics
.minimum
= min(
201 measurementStatistics
.minimum
?? Infinity
203 measurementStatistics
.maximum
= max(
205 measurementStatistics
.maximum
?? -Infinity
207 if (measurementRequirements
.average
|| measurementRequirements
.median
) {
208 measurementStatistics
.history
.push(measurementValue
)
209 if (measurementRequirements
.average
) {
210 measurementStatistics
.average
= average(measurementStatistics
.history
)
211 } else if (measurementStatistics
.average
!= null) {
212 delete measurementStatistics
.average
214 if (measurementRequirements
.median
) {
215 measurementStatistics
.median
= median(measurementStatistics
.history
)
216 } else if (measurementStatistics
.median
!= null) {
217 delete measurementStatistics
.median
222 if (env
.NODE_ENV
=== 'test') {
223 // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
224 exports
.updateMeasurementStatistics
= updateMeasurementStatistics
227 export const updateWaitTimeWorkerUsage
= <
228 Worker
extends IWorker
,
232 workerChoiceStrategyContext
:
233 | WorkerChoiceStrategiesContext
<Worker
, Data
, Response
>
235 workerUsage
: WorkerUsage
,
238 const timestamp
= performance
.now()
239 const taskWaitTime
= timestamp
- (task
.timestamp
?? timestamp
)
240 updateMeasurementStatistics(
241 workerUsage
.waitTime
,
242 workerChoiceStrategyContext
?.getTaskStatisticsRequirements().waitTime
,
247 export const updateTaskStatisticsWorkerUsage
= <Response
= unknown
>(
248 workerUsage
: WorkerUsage
,
249 message
: MessageValue
<Response
>
251 const workerTaskStatistics
= workerUsage
.tasks
253 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
254 workerTaskStatistics
.executing
!= null &&
255 workerTaskStatistics
.executing
> 0
257 --workerTaskStatistics
.executing
259 if (message
.workerError
== null) {
260 ++workerTaskStatistics
.executed
262 ++workerTaskStatistics
.failed
266 export const updateRunTimeWorkerUsage
= <
267 Worker
extends IWorker
,
271 workerChoiceStrategyContext
:
272 | WorkerChoiceStrategiesContext
<Worker
, Data
, Response
>
274 workerUsage
: WorkerUsage
,
275 message
: MessageValue
<Response
>
277 if (message
.workerError
!= null) {
280 updateMeasurementStatistics(
282 workerChoiceStrategyContext
?.getTaskStatisticsRequirements().runTime
,
283 message
.taskPerformance
?.runTime
?? 0
287 export const updateEluWorkerUsage
= <
288 Worker
extends IWorker
,
292 workerChoiceStrategyContext
:
293 | WorkerChoiceStrategiesContext
<Worker
, Data
, Response
>
295 workerUsage
: WorkerUsage
,
296 message
: MessageValue
<Response
>
298 if (message
.workerError
!= null) {
301 const eluTaskStatisticsRequirements
=
302 workerChoiceStrategyContext
?.getTaskStatisticsRequirements().elu
303 updateMeasurementStatistics(
304 workerUsage
.elu
.active
,
305 eluTaskStatisticsRequirements
,
306 message
.taskPerformance
?.elu
?.active
?? 0
308 updateMeasurementStatistics(
309 workerUsage
.elu
.idle
,
310 eluTaskStatisticsRequirements
,
311 message
.taskPerformance
?.elu
?.idle
?? 0
313 if (eluTaskStatisticsRequirements
?.aggregate
=== true) {
314 if (message
.taskPerformance
?.elu
!= null) {
315 if (workerUsage
.elu
.utilization
!= null) {
316 workerUsage
.elu
.utilization
=
317 (workerUsage
.elu
.utilization
+
318 message
.taskPerformance
.elu
.utilization
) /
321 workerUsage
.elu
.utilization
= message
.taskPerformance
.elu
.utilization
327 export const createWorker
= <Worker
extends IWorker
>(
330 opts
: { env
?: Record
<string, unknown
>, workerOptions
?: WorkerOptions
}
333 case WorkerTypes
.thread
:
334 return new ThreadWorker(filePath
, {
336 ...opts
.workerOptions
337 }) as unknown
as Worker
338 case WorkerTypes
.cluster
:
339 return cluster
.fork(opts
.env
) as unknown
as Worker
341 // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
342 throw new Error(`Unknown worker type '${type}'`)
347 * Returns the worker type of the given worker.
349 * @param worker - The worker to get the type of.
350 * @returns The worker type of the given worker.
353 export const getWorkerType
= (worker
: IWorker
): WorkerType
| undefined => {
354 if (worker
instanceof ThreadWorker
) {
355 return WorkerTypes
.thread
356 } else if (worker
instanceof ClusterWorker
) {
357 return WorkerTypes
.cluster
362 * Returns the worker id of the given worker.
364 * @param worker - The worker to get the id of.
365 * @returns The worker id of the given worker.
368 export const getWorkerId
= (worker
: IWorker
): number | undefined => {
369 if (worker
instanceof ThreadWorker
) {
370 return worker
.threadId
371 } else if (worker
instanceof ClusterWorker
) {
376 export const waitWorkerNodeEvents
= async <
377 Worker
extends IWorker
,
380 workerNode
: IWorkerNode
<Worker
, Data
>,
381 workerNodeEvent
: string,
382 numberOfEventsToWait
: number,
384 ): Promise
<number> => {
385 return await new Promise
<number>(resolve
=> {
387 if (numberOfEventsToWait
=== 0) {
391 workerNode
.on(workerNodeEvent
, () => {
393 if (events
=== numberOfEventsToWait
) {