1 import { existsSync
} from
'node:fs'
2 import cluster from
'node:cluster'
3 import { SHARE_ENV
, Worker
, type WorkerOptions
} from
'node:worker_threads'
4 import { env
} from
'node:process'
5 import { average
, isPlainObject
, max
, median
, min
} from
'../utils'
6 import type { MessageValue
, Task
} from
'../utility-types'
8 type MeasurementStatisticsRequirements
,
9 WorkerChoiceStrategies
,
10 type WorkerChoiceStrategy
11 } from
'./selection-strategies/selection-strategies-types'
12 import type { TasksQueueOptions
} from
'./pool'
16 type MeasurementStatistics
,
17 type WorkerNodeOptions
,
22 import type { WorkerChoiceStrategyContext
} from
'./selection-strategies/worker-choice-strategy-context'
24 export const checkFilePath
= (filePath
: string): void => {
25 if (filePath
== null) {
26 throw new TypeError('The worker file path must be specified')
28 if (typeof filePath
!== 'string') {
29 throw new TypeError('The worker file path must be a string')
31 if (!existsSync(filePath
)) {
32 throw new Error(`Cannot find the worker file '${filePath}'`)
36 export const checkDynamicPoolSize
= (min
: number, max
: number): void => {
39 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
41 } else if (!Number.isSafeInteger(max
)) {
43 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
45 } else if (min
> max
) {
47 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
49 } else if (max
=== 0) {
51 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
53 } else if (min
=== max
) {
55 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
60 export const checkValidWorkerChoiceStrategy
= (
61 workerChoiceStrategy
: WorkerChoiceStrategy
64 workerChoiceStrategy
!= null &&
65 !Object.values(WorkerChoiceStrategies
).includes(workerChoiceStrategy
)
67 throw new Error(`Invalid worker choice strategy '${workerChoiceStrategy}'`)
71 export const checkValidTasksQueueOptions
= (
72 tasksQueueOptions
: TasksQueueOptions
74 if (tasksQueueOptions
!= null && !isPlainObject(tasksQueueOptions
)) {
75 throw new TypeError('Invalid tasks queue options: must be a plain object')
78 tasksQueueOptions
?.concurrency
!= null &&
79 !Number.isSafeInteger(tasksQueueOptions
.concurrency
)
82 'Invalid worker node tasks concurrency: must be an integer'
86 tasksQueueOptions
?.concurrency
!= null &&
87 tasksQueueOptions
.concurrency
<= 0
90 `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero`
94 tasksQueueOptions
?.size
!= null &&
95 !Number.isSafeInteger(tasksQueueOptions
.size
)
98 'Invalid worker node tasks queue size: must be an integer'
101 if (tasksQueueOptions
?.size
!= null && tasksQueueOptions
.size
<= 0) {
102 throw new RangeError(
103 `Invalid worker node tasks queue size: ${tasksQueueOptions.size} is a negative integer or zero`
108 export const checkWorkerNodeArguments
= (
111 opts
: WorkerNodeOptions
114 throw new TypeError('Cannot construct a worker node without a worker type')
116 if (!Object.values(WorkerTypes
).includes(type)) {
118 `Cannot construct a worker node with an invalid worker type '${type}'`
121 checkFilePath(filePath
)
124 'Cannot construct a worker node without worker node options'
127 if (!isPlainObject(opts
)) {
129 'Cannot construct a worker node with invalid options: must be a plain object'
132 if (opts
.tasksQueueBackPressureSize
== null) {
134 'Cannot construct a worker node without a tasks queue back pressure size option'
137 if (!Number.isSafeInteger(opts
.tasksQueueBackPressureSize
)) {
139 'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer'
142 if (opts
.tasksQueueBackPressureSize
<= 0) {
143 throw new RangeError(
144 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer'
150 * Updates the given measurement statistics.
152 * @param measurementStatistics - The measurement statistics to update.
153 * @param measurementRequirements - The measurement statistics requirements.
154 * @param measurementValue - The measurement value.
155 * @param numberOfMeasurements - The number of measurements.
158 const updateMeasurementStatistics
= (
159 measurementStatistics
: MeasurementStatistics
,
160 measurementRequirements
: MeasurementStatisticsRequirements
,
161 measurementValue
: number
163 if (measurementRequirements
.aggregate
) {
164 measurementStatistics
.aggregate
=
165 (measurementStatistics
.aggregate
?? 0) + measurementValue
166 measurementStatistics
.minimum
= min(
168 measurementStatistics
.minimum
?? Infinity
170 measurementStatistics
.maximum
= max(
172 measurementStatistics
.maximum
?? -Infinity
175 (measurementRequirements
.average
|| measurementRequirements
.median
) &&
176 measurementValue
!= null
178 measurementStatistics
.history
.push(measurementValue
)
179 if (measurementRequirements
.average
) {
180 measurementStatistics
.average
= average(measurementStatistics
.history
)
181 } else if (measurementStatistics
.average
!= null) {
182 delete measurementStatistics
.average
184 if (measurementRequirements
.median
) {
185 measurementStatistics
.median
= median(measurementStatistics
.history
)
186 } else if (measurementStatistics
.median
!= null) {
187 delete measurementStatistics
.median
192 if (env
.NODE_ENV
=== 'test') {
193 // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
194 exports
.updateMeasurementStatistics
= updateMeasurementStatistics
197 export const updateWaitTimeWorkerUsage
= <
198 Worker
extends IWorker
,
202 workerChoiceStrategyContext
: WorkerChoiceStrategyContext
<
207 workerUsage
: WorkerUsage
,
210 const timestamp
= performance
.now()
211 const taskWaitTime
= timestamp
- (task
.timestamp
?? timestamp
)
212 updateMeasurementStatistics(
213 workerUsage
.waitTime
,
214 workerChoiceStrategyContext
.getTaskStatisticsRequirements().waitTime
,
219 export const updateTaskStatisticsWorkerUsage
= <Response
= unknown
>(
220 workerUsage
: WorkerUsage
,
221 message
: MessageValue
<Response
>
223 const workerTaskStatistics
= workerUsage
.tasks
225 workerTaskStatistics
.executing
!= null &&
226 workerTaskStatistics
.executing
> 0
228 --workerTaskStatistics
.executing
230 if (message
.workerError
== null) {
231 ++workerTaskStatistics
.executed
233 ++workerTaskStatistics
.failed
237 export const updateRunTimeWorkerUsage
= <
238 Worker
extends IWorker
,
242 workerChoiceStrategyContext
: WorkerChoiceStrategyContext
<
247 workerUsage
: WorkerUsage
,
248 message
: MessageValue
<Response
>
250 if (message
.workerError
!= null) {
253 updateMeasurementStatistics(
255 workerChoiceStrategyContext
.getTaskStatisticsRequirements().runTime
,
256 message
.taskPerformance
?.runTime
?? 0
260 export const updateEluWorkerUsage
= <
261 Worker
extends IWorker
,
265 workerChoiceStrategyContext
: WorkerChoiceStrategyContext
<
270 workerUsage
: WorkerUsage
,
271 message
: MessageValue
<Response
>
273 if (message
.workerError
!= null) {
276 const eluTaskStatisticsRequirements
: MeasurementStatisticsRequirements
=
277 workerChoiceStrategyContext
.getTaskStatisticsRequirements().elu
278 updateMeasurementStatistics(
279 workerUsage
.elu
.active
,
280 eluTaskStatisticsRequirements
,
281 message
.taskPerformance
?.elu
?.active
?? 0
283 updateMeasurementStatistics(
284 workerUsage
.elu
.idle
,
285 eluTaskStatisticsRequirements
,
286 message
.taskPerformance
?.elu
?.idle
?? 0
288 if (eluTaskStatisticsRequirements
.aggregate
) {
289 if (message
.taskPerformance
?.elu
!= null) {
290 if (workerUsage
.elu
.utilization
!= null) {
291 workerUsage
.elu
.utilization
=
292 (workerUsage
.elu
.utilization
+
293 message
.taskPerformance
.elu
.utilization
) /
296 workerUsage
.elu
.utilization
= message
.taskPerformance
.elu
.utilization
302 export const createWorker
= <Worker
extends IWorker
>(
305 opts
: { env
?: Record
<string, unknown
>, workerOptions
?: WorkerOptions
}
308 case WorkerTypes
.thread
:
309 return new Worker(filePath
, {
311 ...opts
?.workerOptions
312 }) as unknown
as Worker
313 case WorkerTypes
.cluster
:
314 return cluster
.fork(opts
?.env
) as unknown
as Worker
316 // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
317 throw new Error(`Unknown worker type '${type}'`)
321 export const waitWorkerNodeEvents
= async <
322 Worker
extends IWorker
,
325 workerNode
: IWorkerNode
<Worker
, Data
>,
326 workerNodeEvent
: string,
327 numberOfEventsToWait
: number
328 ): Promise
<number> => {
329 return await new Promise
<number>(resolve
=> {
331 if (numberOfEventsToWait
=== 0) {
335 workerNode
.on(workerNodeEvent
, () => {
337 if (events
=== numberOfEventsToWait
) {