1 import { existsSync
} from
'node:fs'
2 import cluster from
'node:cluster'
3 import { SHARE_ENV
, Worker
, type WorkerOptions
} from
'node:worker_threads'
4 import { env
} from
'node:process'
5 import { average
, isPlainObject
, max
, median
, min
} from
'../utils.js'
6 import type { MessageValue
, Task
} from
'../utility-types.js'
8 type MeasurementStatisticsRequirements
,
9 WorkerChoiceStrategies
,
10 type WorkerChoiceStrategy
11 } from
'./selection-strategies/selection-strategies-types.js'
12 import type { TasksQueueOptions
} from
'./pool.js'
16 type MeasurementStatistics
,
17 type WorkerNodeOptions
,
22 import type { WorkerChoiceStrategyContext
} from
'./selection-strategies/worker-choice-strategy-context.js'
24 export const getDefaultTasksQueueOptions
= (
26 ): Required
<TasksQueueOptions
> => {
28 size
: Math.pow(poolMaxSize
, 2),
31 tasksStealingOnBackPressure
: true,
32 tasksFinishedTimeout
: 2000
36 export const checkFilePath
= (filePath
: string | undefined): void => {
37 if (filePath
== null) {
38 throw new TypeError('The worker file path must be specified')
40 if (typeof filePath
!== 'string') {
41 throw new TypeError('The worker file path must be a string')
43 if (!existsSync(filePath
)) {
44 throw new Error(`Cannot find the worker file '${filePath}'`)
48 export const checkDynamicPoolSize
= (
50 max
: number | undefined
54 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
56 } else if (!Number.isSafeInteger(max
)) {
58 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
60 } else if (min
> max
) {
62 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
64 } else if (max
=== 0) {
66 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
68 } else if (min
=== max
) {
70 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
75 export const checkValidWorkerChoiceStrategy
= (
76 workerChoiceStrategy
: WorkerChoiceStrategy
| undefined
79 workerChoiceStrategy
!= null &&
80 !Object.values(WorkerChoiceStrategies
).includes(workerChoiceStrategy
)
82 throw new Error(`Invalid worker choice strategy '${workerChoiceStrategy}'`)
86 export const checkValidTasksQueueOptions
= (
87 tasksQueueOptions
: TasksQueueOptions
| undefined
89 if (tasksQueueOptions
!= null && !isPlainObject(tasksQueueOptions
)) {
90 throw new TypeError('Invalid tasks queue options: must be a plain object')
93 tasksQueueOptions
?.concurrency
!= null &&
94 !Number.isSafeInteger(tasksQueueOptions
.concurrency
)
97 'Invalid worker node tasks concurrency: must be an integer'
101 tasksQueueOptions
?.concurrency
!= null &&
102 tasksQueueOptions
.concurrency
<= 0
104 throw new RangeError(
105 `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero`
109 tasksQueueOptions
?.size
!= null &&
110 !Number.isSafeInteger(tasksQueueOptions
.size
)
113 'Invalid worker node tasks queue size: must be an integer'
116 if (tasksQueueOptions
?.size
!= null && tasksQueueOptions
.size
<= 0) {
117 throw new RangeError(
118 `Invalid worker node tasks queue size: ${tasksQueueOptions.size} is a negative integer or zero`
123 export const checkWorkerNodeArguments
= (
124 type: WorkerType
| undefined,
125 filePath
: string | undefined,
126 opts
: WorkerNodeOptions
| undefined
129 throw new TypeError('Cannot construct a worker node without a worker type')
131 if (!Object.values(WorkerTypes
).includes(type)) {
133 `Cannot construct a worker node with an invalid worker type '${type}'`
136 checkFilePath(filePath
)
139 'Cannot construct a worker node without worker node options'
142 if (!isPlainObject(opts
)) {
144 'Cannot construct a worker node with invalid options: must be a plain object'
147 if (opts
.tasksQueueBackPressureSize
== null) {
149 'Cannot construct a worker node without a tasks queue back pressure size option'
152 if (!Number.isSafeInteger(opts
.tasksQueueBackPressureSize
)) {
154 'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer'
157 if (opts
.tasksQueueBackPressureSize
<= 0) {
158 throw new RangeError(
159 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer'
165 * Updates the given measurement statistics.
167 * @param measurementStatistics - The measurement statistics to update.
168 * @param measurementRequirements - The measurement statistics requirements.
169 * @param measurementValue - The measurement value.
172 const updateMeasurementStatistics
= (
173 measurementStatistics
: MeasurementStatistics
,
174 measurementRequirements
: MeasurementStatisticsRequirements
| undefined,
175 measurementValue
: number | undefined
178 measurementRequirements
!= null &&
179 measurementValue
!= null &&
180 measurementRequirements
.aggregate
182 measurementStatistics
.aggregate
=
183 (measurementStatistics
.aggregate
?? 0) + measurementValue
184 measurementStatistics
.minimum
= min(
186 measurementStatistics
.minimum
?? Infinity
188 measurementStatistics
.maximum
= max(
190 measurementStatistics
.maximum
?? -Infinity
192 if (measurementRequirements
.average
|| measurementRequirements
.median
) {
193 measurementStatistics
.history
.push(measurementValue
)
194 if (measurementRequirements
.average
) {
195 measurementStatistics
.average
= average(measurementStatistics
.history
)
196 } else if (measurementStatistics
.average
!= null) {
197 delete measurementStatistics
.average
199 if (measurementRequirements
.median
) {
200 measurementStatistics
.median
= median(measurementStatistics
.history
)
201 } else if (measurementStatistics
.median
!= null) {
202 delete measurementStatistics
.median
207 if (env
.NODE_ENV
=== 'test') {
208 // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
209 exports
.updateMeasurementStatistics
= updateMeasurementStatistics
212 export const updateWaitTimeWorkerUsage
= <
213 Worker
extends IWorker
,
217 workerChoiceStrategyContext
:
218 | WorkerChoiceStrategyContext
<Worker
, Data
, Response
>
220 workerUsage
: WorkerUsage
,
223 const timestamp
= performance
.now()
224 const taskWaitTime
= timestamp
- (task
.timestamp
?? timestamp
)
225 updateMeasurementStatistics(
226 workerUsage
.waitTime
,
227 workerChoiceStrategyContext
?.getTaskStatisticsRequirements()?.waitTime
,
232 export const updateTaskStatisticsWorkerUsage
= <Response
= unknown
>(
233 workerUsage
: WorkerUsage
,
234 message
: MessageValue
<Response
>
236 const workerTaskStatistics
= workerUsage
.tasks
238 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
239 workerTaskStatistics
.executing
!= null &&
240 workerTaskStatistics
.executing
> 0
242 --workerTaskStatistics
.executing
244 if (message
.workerError
== null) {
245 ++workerTaskStatistics
.executed
247 ++workerTaskStatistics
.failed
251 export const updateRunTimeWorkerUsage
= <
252 Worker
extends IWorker
,
256 workerChoiceStrategyContext
:
257 | WorkerChoiceStrategyContext
<Worker
, Data
, Response
>
259 workerUsage
: WorkerUsage
,
260 message
: MessageValue
<Response
>
262 if (message
.workerError
!= null) {
265 updateMeasurementStatistics(
267 workerChoiceStrategyContext
?.getTaskStatisticsRequirements()?.runTime
,
268 message
.taskPerformance
?.runTime
?? 0
272 export const updateEluWorkerUsage
= <
273 Worker
extends IWorker
,
277 workerChoiceStrategyContext
:
278 | WorkerChoiceStrategyContext
<Worker
, Data
, Response
>
280 workerUsage
: WorkerUsage
,
281 message
: MessageValue
<Response
>
283 if (message
.workerError
!= null) {
286 const eluTaskStatisticsRequirements
=
287 workerChoiceStrategyContext
?.getTaskStatisticsRequirements()?.elu
288 updateMeasurementStatistics(
289 workerUsage
.elu
.active
,
290 eluTaskStatisticsRequirements
,
291 message
.taskPerformance
?.elu
?.active
?? 0
293 updateMeasurementStatistics(
294 workerUsage
.elu
.idle
,
295 eluTaskStatisticsRequirements
,
296 message
.taskPerformance
?.elu
?.idle
?? 0
298 if (eluTaskStatisticsRequirements
?.aggregate
=== true) {
299 if (message
.taskPerformance
?.elu
!= null) {
300 if (workerUsage
.elu
.utilization
!= null) {
301 workerUsage
.elu
.utilization
=
302 (workerUsage
.elu
.utilization
+
303 message
.taskPerformance
.elu
.utilization
) /
306 workerUsage
.elu
.utilization
= message
.taskPerformance
.elu
.utilization
312 export const createWorker
= <Worker
extends IWorker
>(
315 opts
: { env
?: Record
<string, unknown
>, workerOptions
?: WorkerOptions
}
318 case WorkerTypes
.thread
:
319 return new Worker(filePath
, {
321 ...opts
.workerOptions
322 }) as unknown
as Worker
323 case WorkerTypes
.cluster
:
324 return cluster
.fork(opts
.env
) as unknown
as Worker
326 // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
327 throw new Error(`Unknown worker type '${type}'`)
331 export const waitWorkerNodeEvents
= async <
332 Worker
extends IWorker
,
335 workerNode
: IWorkerNode
<Worker
, Data
>,
336 workerNodeEvent
: string,
337 numberOfEventsToWait
: number,
339 ): Promise
<number> => {
340 return await new Promise
<number>(resolve
=> {
342 if (numberOfEventsToWait
=== 0) {
346 workerNode
.on(workerNodeEvent
, () => {
348 if (events
=== numberOfEventsToWait
) {