fix: ensure worker node is defined before emitting
[poolifier.git] / src / pools / utils.ts
CommitLineData
bde6b5d7 1import { existsSync } from 'node:fs'
c3719753
JB
2import cluster from 'node:cluster'
3import { SHARE_ENV, Worker, type WorkerOptions } from 'node:worker_threads'
c329fd41 4import { env } from 'node:process'
d35e5717
JB
5import { average, isPlainObject, max, median, min } from '../utils.js'
6import type { MessageValue, Task } from '../utility-types.js'
bde6b5d7 7import {
bfc75cca 8 type MeasurementStatisticsRequirements,
bde6b5d7
JB
9 WorkerChoiceStrategies,
10 type WorkerChoiceStrategy
d35e5717
JB
11} from './selection-strategies/selection-strategies-types.js'
12import type { TasksQueueOptions } from './pool.js'
c3719753
JB
13import {
14 type IWorker,
d41a44de 15 type IWorkerNode,
c3719753
JB
16 type MeasurementStatistics,
17 type WorkerNodeOptions,
18 type WorkerType,
c329fd41
JB
19 WorkerTypes,
20 type WorkerUsage
d35e5717
JB
21} from './worker.js'
22import type { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context.js'
bde6b5d7 23
32b141fd
JB
24export const getDefaultTasksQueueOptions = (
25 poolMaxSize: number
26): Required<TasksQueueOptions> => {
27 return {
28 size: Math.pow(poolMaxSize, 2),
29 concurrency: 1,
30 taskStealing: true,
31 tasksStealingOnBackPressure: true,
568d0075 32 tasksFinishedTimeout: 2000
32b141fd
JB
33 }
34}
35
c63a35a0 36export const checkFilePath = (filePath: string | undefined): void => {
c3719753
JB
37 if (filePath == null) {
38 throw new TypeError('The worker file path must be specified')
39 }
40 if (typeof filePath !== 'string') {
41 throw new TypeError('The worker file path must be a string')
42 }
bde6b5d7
JB
43 if (!existsSync(filePath)) {
44 throw new Error(`Cannot find the worker file '${filePath}'`)
45 }
46}
47
c63a35a0
JB
48export const checkDynamicPoolSize = (
49 min: number,
50 max: number | undefined
51): void => {
bde6b5d7
JB
52 if (max == null) {
53 throw new TypeError(
54 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
55 )
56 } else if (!Number.isSafeInteger(max)) {
57 throw new TypeError(
58 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
59 )
60 } else if (min > max) {
61 throw new RangeError(
62 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
63 )
64 } else if (max === 0) {
65 throw new RangeError(
66 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
67 )
68 } else if (min === max) {
69 throw new RangeError(
70 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
71 )
72 }
73}
74
75export const checkValidWorkerChoiceStrategy = (
c63a35a0 76 workerChoiceStrategy: WorkerChoiceStrategy | undefined
bde6b5d7
JB
77): void => {
78 if (
79 workerChoiceStrategy != null &&
80 !Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)
81 ) {
82 throw new Error(`Invalid worker choice strategy '${workerChoiceStrategy}'`)
83 }
84}
85
86export const checkValidTasksQueueOptions = (
c63a35a0 87 tasksQueueOptions: TasksQueueOptions | undefined
bde6b5d7
JB
88): void => {
89 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
90 throw new TypeError('Invalid tasks queue options: must be a plain object')
91 }
92 if (
93 tasksQueueOptions?.concurrency != null &&
94 !Number.isSafeInteger(tasksQueueOptions.concurrency)
95 ) {
96 throw new TypeError(
97 'Invalid worker node tasks concurrency: must be an integer'
98 )
99 }
100 if (
101 tasksQueueOptions?.concurrency != null &&
102 tasksQueueOptions.concurrency <= 0
103 ) {
104 throw new RangeError(
105 `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero`
106 )
107 }
108 if (
109 tasksQueueOptions?.size != null &&
110 !Number.isSafeInteger(tasksQueueOptions.size)
111 ) {
112 throw new TypeError(
113 'Invalid worker node tasks queue size: must be an integer'
114 )
115 }
116 if (tasksQueueOptions?.size != null && tasksQueueOptions.size <= 0) {
117 throw new RangeError(
118 `Invalid worker node tasks queue size: ${tasksQueueOptions.size} is a negative integer or zero`
119 )
120 }
121}
bfc75cca 122
c3719753 123export const checkWorkerNodeArguments = (
c63a35a0
JB
124 type: WorkerType | undefined,
125 filePath: string | undefined,
126 opts: WorkerNodeOptions | undefined
9a38f99e 127): void => {
c3719753
JB
128 if (type == null) {
129 throw new TypeError('Cannot construct a worker node without a worker type')
130 }
131 if (!Object.values(WorkerTypes).includes(type)) {
132 throw new TypeError(
133 `Cannot construct a worker node with an invalid worker type '${type}'`
134 )
9a38f99e 135 }
c3719753
JB
136 checkFilePath(filePath)
137 if (opts == null) {
9a38f99e 138 throw new TypeError(
c3719753 139 'Cannot construct a worker node without worker node options'
9a38f99e
JB
140 )
141 }
9974369e 142 if (!isPlainObject(opts)) {
9a38f99e 143 throw new TypeError(
c3719753 144 'Cannot construct a worker node with invalid options: must be a plain object'
9a38f99e
JB
145 )
146 }
c3719753
JB
147 if (opts.tasksQueueBackPressureSize == null) {
148 throw new TypeError(
149 'Cannot construct a worker node without a tasks queue back pressure size option'
150 )
151 }
152 if (!Number.isSafeInteger(opts.tasksQueueBackPressureSize)) {
153 throw new TypeError(
154 'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer'
155 )
156 }
157 if (opts.tasksQueueBackPressureSize <= 0) {
9a38f99e 158 throw new RangeError(
c3719753 159 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer'
9a38f99e
JB
160 )
161 }
162}
bfc75cca
JB
163
164/**
165 * Updates the given measurement statistics.
166 *
167 * @param measurementStatistics - The measurement statistics to update.
168 * @param measurementRequirements - The measurement statistics requirements.
169 * @param measurementValue - The measurement value.
bfc75cca
JB
170 * @internal
171 */
c329fd41 172const updateMeasurementStatistics = (
bfc75cca 173 measurementStatistics: MeasurementStatistics,
c63a35a0
JB
174 measurementRequirements: MeasurementStatisticsRequirements | undefined,
175 measurementValue: number | undefined
bfc75cca 176): void => {
c63a35a0
JB
177 if (
178 measurementRequirements != null &&
179 measurementValue != null &&
180 measurementRequirements.aggregate
181 ) {
bfc75cca
JB
182 measurementStatistics.aggregate =
183 (measurementStatistics.aggregate ?? 0) + measurementValue
184 measurementStatistics.minimum = min(
185 measurementValue,
186 measurementStatistics.minimum ?? Infinity
187 )
188 measurementStatistics.maximum = max(
189 measurementValue,
190 measurementStatistics.maximum ?? -Infinity
191 )
c63a35a0 192 if (measurementRequirements.average || measurementRequirements.median) {
bfc75cca
JB
193 measurementStatistics.history.push(measurementValue)
194 if (measurementRequirements.average) {
195 measurementStatistics.average = average(measurementStatistics.history)
196 } else if (measurementStatistics.average != null) {
197 delete measurementStatistics.average
198 }
199 if (measurementRequirements.median) {
200 measurementStatistics.median = median(measurementStatistics.history)
201 } else if (measurementStatistics.median != null) {
202 delete measurementStatistics.median
203 }
204 }
205 }
206}
c329fd41
JB
207if (env.NODE_ENV === 'test') {
208 // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
209 exports.updateMeasurementStatistics = updateMeasurementStatistics
210}
211
212export const updateWaitTimeWorkerUsage = <
213 Worker extends IWorker,
214 Data = unknown,
215 Response = unknown
216>(
c63a35a0
JB
217 workerChoiceStrategyContext:
218 | WorkerChoiceStrategyContext<Worker, Data, Response>
219 | undefined,
c329fd41
JB
220 workerUsage: WorkerUsage,
221 task: Task<Data>
222 ): void => {
223 const timestamp = performance.now()
224 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
225 updateMeasurementStatistics(
226 workerUsage.waitTime,
c63a35a0 227 workerChoiceStrategyContext?.getTaskStatisticsRequirements()?.waitTime,
c329fd41
JB
228 taskWaitTime
229 )
230}
231
232export const updateTaskStatisticsWorkerUsage = <Response = unknown>(
233 workerUsage: WorkerUsage,
234 message: MessageValue<Response>
235): void => {
236 const workerTaskStatistics = workerUsage.tasks
237 if (
c63a35a0 238 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
c329fd41
JB
239 workerTaskStatistics.executing != null &&
240 workerTaskStatistics.executing > 0
241 ) {
242 --workerTaskStatistics.executing
243 }
244 if (message.workerError == null) {
245 ++workerTaskStatistics.executed
246 } else {
247 ++workerTaskStatistics.failed
248 }
249}
250
251export const updateRunTimeWorkerUsage = <
252 Worker extends IWorker,
253 Data = unknown,
254 Response = unknown
255>(
c63a35a0
JB
256 workerChoiceStrategyContext:
257 | WorkerChoiceStrategyContext<Worker, Data, Response>
258 | undefined,
c329fd41
JB
259 workerUsage: WorkerUsage,
260 message: MessageValue<Response>
261 ): void => {
262 if (message.workerError != null) {
263 return
264 }
265 updateMeasurementStatistics(
266 workerUsage.runTime,
c63a35a0 267 workerChoiceStrategyContext?.getTaskStatisticsRequirements()?.runTime,
c329fd41
JB
268 message.taskPerformance?.runTime ?? 0
269 )
270}
271
272export const updateEluWorkerUsage = <
273 Worker extends IWorker,
274 Data = unknown,
275 Response = unknown
276>(
c63a35a0
JB
277 workerChoiceStrategyContext:
278 | WorkerChoiceStrategyContext<Worker, Data, Response>
279 | undefined,
c329fd41
JB
280 workerUsage: WorkerUsage,
281 message: MessageValue<Response>
282 ): void => {
283 if (message.workerError != null) {
284 return
285 }
c63a35a0
JB
286 const eluTaskStatisticsRequirements =
287 workerChoiceStrategyContext?.getTaskStatisticsRequirements()?.elu
c329fd41
JB
288 updateMeasurementStatistics(
289 workerUsage.elu.active,
290 eluTaskStatisticsRequirements,
291 message.taskPerformance?.elu?.active ?? 0
292 )
293 updateMeasurementStatistics(
294 workerUsage.elu.idle,
295 eluTaskStatisticsRequirements,
296 message.taskPerformance?.elu?.idle ?? 0
297 )
c63a35a0 298 if (eluTaskStatisticsRequirements?.aggregate === true) {
c329fd41
JB
299 if (message.taskPerformance?.elu != null) {
300 if (workerUsage.elu.utilization != null) {
301 workerUsage.elu.utilization =
302 (workerUsage.elu.utilization +
303 message.taskPerformance.elu.utilization) /
304 2
305 } else {
306 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
307 }
308 }
309 }
310}
c3719753
JB
311
312export const createWorker = <Worker extends IWorker>(
313 type: WorkerType,
314 filePath: string,
315 opts: { env?: Record<string, unknown>, workerOptions?: WorkerOptions }
316): Worker => {
317 switch (type) {
318 case WorkerTypes.thread:
319 return new Worker(filePath, {
320 env: SHARE_ENV,
c63a35a0 321 ...opts.workerOptions
c3719753
JB
322 }) as unknown as Worker
323 case WorkerTypes.cluster:
c63a35a0 324 return cluster.fork(opts.env) as unknown as Worker
c3719753
JB
325 default:
326 // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
327 throw new Error(`Unknown worker type '${type}'`)
328 }
329}
d41a44de
JB
330
331export const waitWorkerNodeEvents = async <
332 Worker extends IWorker,
333 Data = unknown
334>(
335 workerNode: IWorkerNode<Worker, Data>,
336 workerNodeEvent: string,
32b141fd
JB
337 numberOfEventsToWait: number,
338 timeout: number
d41a44de
JB
339): Promise<number> => {
340 return await new Promise<number>(resolve => {
341 let events = 0
342 if (numberOfEventsToWait === 0) {
343 resolve(events)
344 return
345 }
346 workerNode.on(workerNodeEvent, () => {
347 ++events
348 if (events === numberOfEventsToWait) {
349 resolve(events)
350 }
351 })
6f3a391b 352 if (timeout >= 0) {
32b141fd
JB
353 setTimeout(() => {
354 resolve(events)
355 }, timeout)
356 }
d41a44de
JB
357 })
358}