1 import { existsSync
} from
'node:fs'
2 import cluster from
'node:cluster'
3 import { SHARE_ENV
, Worker
, type WorkerOptions
} from
'node:worker_threads'
4 import { average
, isPlainObject
, max
, median
, min
} from
'../utils'
6 type MeasurementStatisticsRequirements
,
7 WorkerChoiceStrategies
,
8 type WorkerChoiceStrategy
9 } from
'./selection-strategies/selection-strategies-types'
10 import type { TasksQueueOptions
} from
'./pool'
14 type MeasurementStatistics
,
15 type WorkerNodeOptions
,
20 export const checkFilePath
= (filePath
: string): void => {
21 if (filePath
== null) {
22 throw new TypeError('The worker file path must be specified')
24 if (typeof filePath
!== 'string') {
25 throw new TypeError('The worker file path must be a string')
27 if (!existsSync(filePath
)) {
28 throw new Error(`Cannot find the worker file '${filePath}'`)
32 export const checkDynamicPoolSize
= (min
: number, max
: number): void => {
35 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
37 } else if (!Number.isSafeInteger(max
)) {
39 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
41 } else if (min
> max
) {
43 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
45 } else if (max
=== 0) {
47 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
49 } else if (min
=== max
) {
51 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
56 export const checkValidWorkerChoiceStrategy
= (
57 workerChoiceStrategy
: WorkerChoiceStrategy
60 workerChoiceStrategy
!= null &&
61 !Object.values(WorkerChoiceStrategies
).includes(workerChoiceStrategy
)
63 throw new Error(`Invalid worker choice strategy '${workerChoiceStrategy}'`)
67 export const checkValidTasksQueueOptions
= (
68 tasksQueueOptions
: TasksQueueOptions
70 if (tasksQueueOptions
!= null && !isPlainObject(tasksQueueOptions
)) {
71 throw new TypeError('Invalid tasks queue options: must be a plain object')
74 tasksQueueOptions
?.concurrency
!= null &&
75 !Number.isSafeInteger(tasksQueueOptions
.concurrency
)
78 'Invalid worker node tasks concurrency: must be an integer'
82 tasksQueueOptions
?.concurrency
!= null &&
83 tasksQueueOptions
.concurrency
<= 0
86 `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero`
90 tasksQueueOptions
?.size
!= null &&
91 !Number.isSafeInteger(tasksQueueOptions
.size
)
94 'Invalid worker node tasks queue size: must be an integer'
97 if (tasksQueueOptions
?.size
!= null && tasksQueueOptions
.size
<= 0) {
99 `Invalid worker node tasks queue size: ${tasksQueueOptions.size} is a negative integer or zero`
104 export const checkWorkerNodeArguments
= (
107 opts
: WorkerNodeOptions
110 throw new TypeError('Cannot construct a worker node without a worker type')
112 if (!Object.values(WorkerTypes
).includes(type)) {
114 `Cannot construct a worker node with an invalid worker type '${type}'`
117 checkFilePath(filePath
)
120 'Cannot construct a worker node without worker node options'
123 if (!isPlainObject(opts
)) {
125 'Cannot construct a worker node with invalid options: must be a plain object'
128 if (opts
.tasksQueueBackPressureSize
== null) {
130 'Cannot construct a worker node without a tasks queue back pressure size option'
133 if (!Number.isSafeInteger(opts
.tasksQueueBackPressureSize
)) {
135 'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer'
138 if (opts
.tasksQueueBackPressureSize
<= 0) {
139 throw new RangeError(
140 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer'
146 * Updates the given measurement statistics.
148 * @param measurementStatistics - The measurement statistics to update.
149 * @param measurementRequirements - The measurement statistics requirements.
150 * @param measurementValue - The measurement value.
151 * @param numberOfMeasurements - The number of measurements.
154 export const updateMeasurementStatistics
= (
155 measurementStatistics
: MeasurementStatistics
,
156 measurementRequirements
: MeasurementStatisticsRequirements
,
157 measurementValue
: number
159 if (measurementRequirements
.aggregate
) {
160 measurementStatistics
.aggregate
=
161 (measurementStatistics
.aggregate
?? 0) + measurementValue
162 measurementStatistics
.minimum
= min(
164 measurementStatistics
.minimum
?? Infinity
166 measurementStatistics
.maximum
= max(
168 measurementStatistics
.maximum
?? -Infinity
171 (measurementRequirements
.average
|| measurementRequirements
.median
) &&
172 measurementValue
!= null
174 measurementStatistics
.history
.push(measurementValue
)
175 if (measurementRequirements
.average
) {
176 measurementStatistics
.average
= average(measurementStatistics
.history
)
177 } else if (measurementStatistics
.average
!= null) {
178 delete measurementStatistics
.average
180 if (measurementRequirements
.median
) {
181 measurementStatistics
.median
= median(measurementStatistics
.history
)
182 } else if (measurementStatistics
.median
!= null) {
183 delete measurementStatistics
.median
189 export const createWorker
= <Worker
extends IWorker
>(
192 opts
: { env
?: Record
<string, unknown
>, workerOptions
?: WorkerOptions
}
195 case WorkerTypes
.thread
:
196 return new Worker(filePath
, {
198 ...opts
?.workerOptions
199 }) as unknown
as Worker
200 case WorkerTypes
.cluster
:
201 return cluster
.fork(opts
?.env
) as unknown
as Worker
203 // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
204 throw new Error(`Unknown worker type '${type}'`)
208 export const waitWorkerNodeEvents
= async <
209 Worker
extends IWorker
,
212 workerNode
: IWorkerNode
<Worker
, Data
>,
213 workerNodeEvent
: string,
214 numberOfEventsToWait
: number
215 ): Promise
<number> => {
216 return await new Promise
<number>(resolve
=> {
218 if (numberOfEventsToWait
=== 0) {
222 workerNode
.on(workerNodeEvent
, () => {
224 if (events
=== numberOfEventsToWait
) {