Merge branch 'master' into map-execute
[poolifier.git] / src / pools / utils.ts
1 import cluster, { Worker as ClusterWorker } from 'node:cluster'
2 import { existsSync } from 'node:fs'
3 import { env } from 'node:process'
4 import {
5 SHARE_ENV,
6 Worker as ThreadWorker,
7 type WorkerOptions,
8 } from 'node:worker_threads'
9
10 import type { MessageValue, Task } from '../utility-types.js'
11 import { average, isPlainObject, max, median, min } from '../utils.js'
12 import type { TasksQueueOptions } from './pool.js'
13 import {
14 type MeasurementStatisticsRequirements,
15 WorkerChoiceStrategies,
16 type WorkerChoiceStrategy,
17 } from './selection-strategies/selection-strategies-types.js'
18 import type { WorkerChoiceStrategiesContext } from './selection-strategies/worker-choice-strategies-context.js'
19 import {
20 type IWorker,
21 type IWorkerNode,
22 type MeasurementStatistics,
23 type WorkerNodeOptions,
24 type WorkerType,
25 WorkerTypes,
26 type WorkerUsage,
27 } from './worker.js'
28
29 /**
30 * Default measurement statistics requirements.
31 */
32 export const DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS: MeasurementStatisticsRequirements =
33 {
34 aggregate: false,
35 average: false,
36 median: false,
37 }
38
39 export const getDefaultTasksQueueOptions = (
40 poolMaxSize: number
41 ): Required<TasksQueueOptions> => {
42 return {
43 size: Math.pow(poolMaxSize, 2),
44 concurrency: 1,
45 taskStealing: true,
46 tasksStealingOnBackPressure: false,
47 tasksFinishedTimeout: 2000,
48 }
49 }
50
51 export const checkFilePath = (filePath: string | undefined): void => {
52 if (filePath == null) {
53 throw new TypeError('The worker file path must be specified')
54 }
55 if (typeof filePath !== 'string') {
56 throw new TypeError('The worker file path must be a string')
57 }
58 if (!existsSync(filePath)) {
59 throw new Error(`Cannot find the worker file '${filePath}'`)
60 }
61 }
62
63 export const checkDynamicPoolSize = (
64 min: number,
65 max: number | undefined
66 ): void => {
67 if (max == null) {
68 throw new TypeError(
69 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
70 )
71 } else if (!Number.isSafeInteger(max)) {
72 throw new TypeError(
73 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
74 )
75 } else if (min > max) {
76 throw new RangeError(
77 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
78 )
79 } else if (max === 0) {
80 throw new RangeError(
81 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
82 )
83 } else if (min === max) {
84 throw new RangeError(
85 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
86 )
87 }
88 }
89
90 export const checkValidPriority = (priority: number | undefined): void => {
91 if (priority != null && !Number.isSafeInteger(priority)) {
92 throw new TypeError(`Invalid property 'priority': '${priority.toString()}'`)
93 }
94 if (
95 priority != null &&
96 Number.isSafeInteger(priority) &&
97 (priority < -20 || priority > 19)
98 ) {
99 throw new RangeError("Property 'priority' must be between -20 and 19")
100 }
101 }
102
103 export const checkValidWorkerChoiceStrategy = (
104 workerChoiceStrategy: WorkerChoiceStrategy | undefined
105 ): void => {
106 if (
107 workerChoiceStrategy != null &&
108 !Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)
109 ) {
110 throw new Error(`Invalid worker choice strategy '${workerChoiceStrategy}'`)
111 }
112 }
113
114 export const checkValidTasksQueueOptions = (
115 tasksQueueOptions: TasksQueueOptions | undefined
116 ): void => {
117 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
118 throw new TypeError('Invalid tasks queue options: must be a plain object')
119 }
120 if (
121 tasksQueueOptions?.concurrency != null &&
122 !Number.isSafeInteger(tasksQueueOptions.concurrency)
123 ) {
124 throw new TypeError(
125 'Invalid worker node tasks concurrency: must be an integer'
126 )
127 }
128 if (
129 tasksQueueOptions?.concurrency != null &&
130 tasksQueueOptions.concurrency <= 0
131 ) {
132 throw new RangeError(
133 `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency.toString()} is a negative integer or zero`
134 )
135 }
136 if (
137 tasksQueueOptions?.size != null &&
138 !Number.isSafeInteger(tasksQueueOptions.size)
139 ) {
140 throw new TypeError(
141 'Invalid worker node tasks queue size: must be an integer'
142 )
143 }
144 if (tasksQueueOptions?.size != null && tasksQueueOptions.size <= 0) {
145 throw new RangeError(
146 `Invalid worker node tasks queue size: ${tasksQueueOptions.size.toString()} is a negative integer or zero`
147 )
148 }
149 }
150
151 export const checkWorkerNodeArguments = (
152 type: WorkerType | undefined,
153 filePath: string | undefined,
154 opts: WorkerNodeOptions | undefined
155 ): void => {
156 if (type == null) {
157 throw new TypeError('Cannot construct a worker node without a worker type')
158 }
159 if (!Object.values(WorkerTypes).includes(type)) {
160 throw new TypeError(
161 `Cannot construct a worker node with an invalid worker type '${type}'`
162 )
163 }
164 checkFilePath(filePath)
165 if (opts == null) {
166 throw new TypeError(
167 'Cannot construct a worker node without worker node options'
168 )
169 }
170 if (!isPlainObject(opts)) {
171 throw new TypeError(
172 'Cannot construct a worker node with invalid worker node options: must be a plain object'
173 )
174 }
175 if (opts.tasksQueueBackPressureSize == null) {
176 throw new TypeError(
177 'Cannot construct a worker node without a tasks queue back pressure size option'
178 )
179 }
180 if (!Number.isSafeInteger(opts.tasksQueueBackPressureSize)) {
181 throw new TypeError(
182 'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer'
183 )
184 }
185 if (opts.tasksQueueBackPressureSize <= 0) {
186 throw new RangeError(
187 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer'
188 )
189 }
190 if (opts.tasksQueueBucketSize == null) {
191 throw new TypeError(
192 'Cannot construct a worker node without a tasks queue bucket size option'
193 )
194 }
195 if (!Number.isSafeInteger(opts.tasksQueueBucketSize)) {
196 throw new TypeError(
197 'Cannot construct a worker node with a tasks queue bucket size option that is not an integer'
198 )
199 }
200 if (opts.tasksQueueBucketSize <= 0) {
201 throw new RangeError(
202 'Cannot construct a worker node with a tasks queue bucket size option that is not a positive integer'
203 )
204 }
205 if (opts.tasksQueuePriority == null) {
206 throw new TypeError(
207 'Cannot construct a worker node without a tasks queue priority option'
208 )
209 }
210 if (typeof opts.tasksQueuePriority !== 'boolean') {
211 throw new TypeError(
212 'Cannot construct a worker node with a tasks queue priority option that is not a boolean'
213 )
214 }
215 }
216
217 /**
218 * Updates the given measurement statistics.
219 * @param measurementStatistics - The measurement statistics to update.
220 * @param measurementRequirements - The measurement statistics requirements.
221 * @param measurementValue - The measurement value.
222 * @internal
223 */
224 const updateMeasurementStatistics = (
225 measurementStatistics: MeasurementStatistics,
226 measurementRequirements: MeasurementStatisticsRequirements | undefined,
227 measurementValue: number | undefined
228 ): void => {
229 if (
230 measurementRequirements != null &&
231 measurementValue != null &&
232 measurementRequirements.aggregate
233 ) {
234 measurementStatistics.aggregate =
235 (measurementStatistics.aggregate ?? 0) + measurementValue
236 measurementStatistics.minimum = min(
237 measurementValue,
238 measurementStatistics.minimum ?? Number.POSITIVE_INFINITY
239 )
240 measurementStatistics.maximum = max(
241 measurementValue,
242 measurementStatistics.maximum ?? Number.NEGATIVE_INFINITY
243 )
244 if (measurementRequirements.average || measurementRequirements.median) {
245 measurementStatistics.history.put(measurementValue)
246 if (measurementRequirements.average) {
247 measurementStatistics.average = average(
248 measurementStatistics.history.toArray()
249 )
250 } else if (measurementStatistics.average != null) {
251 delete measurementStatistics.average
252 }
253 if (measurementRequirements.median) {
254 measurementStatistics.median = median(
255 measurementStatistics.history.toArray()
256 )
257 } else if (measurementStatistics.median != null) {
258 delete measurementStatistics.median
259 }
260 }
261 }
262 }
263 if (env.NODE_ENV === 'test') {
264 // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
265 exports.updateMeasurementStatistics = updateMeasurementStatistics
266 }
267
268 export const updateWaitTimeWorkerUsage = <
269 Worker extends IWorker,
270 Data = unknown,
271 Response = unknown
272 >(
273 workerChoiceStrategiesContext:
274 | WorkerChoiceStrategiesContext<Worker, Data, Response>
275 | undefined,
276 workerUsage: WorkerUsage,
277 task: Task<Data>
278 ): void => {
279 const timestamp = performance.now()
280 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
281 updateMeasurementStatistics(
282 workerUsage.waitTime,
283 workerChoiceStrategiesContext?.getTaskStatisticsRequirements().waitTime,
284 taskWaitTime
285 )
286 }
287
288 export const updateTaskStatisticsWorkerUsage = <Response = unknown>(
289 workerUsage: WorkerUsage,
290 message: MessageValue<Response>
291 ): void => {
292 const workerTaskStatistics = workerUsage.tasks
293 if (
294 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
295 workerTaskStatistics.executing != null &&
296 workerTaskStatistics.executing > 0
297 ) {
298 --workerTaskStatistics.executing
299 }
300 if (message.workerError == null) {
301 ++workerTaskStatistics.executed
302 } else {
303 ++workerTaskStatistics.failed
304 }
305 }
306
307 export const updateRunTimeWorkerUsage = <
308 Worker extends IWorker,
309 Data = unknown,
310 Response = unknown
311 >(
312 workerChoiceStrategiesContext:
313 | WorkerChoiceStrategiesContext<Worker, Data, Response>
314 | undefined,
315 workerUsage: WorkerUsage,
316 message: MessageValue<Response>
317 ): void => {
318 if (message.workerError != null) {
319 return
320 }
321 updateMeasurementStatistics(
322 workerUsage.runTime,
323 workerChoiceStrategiesContext?.getTaskStatisticsRequirements().runTime,
324 message.taskPerformance?.runTime ?? 0
325 )
326 }
327
328 export const updateEluWorkerUsage = <
329 Worker extends IWorker,
330 Data = unknown,
331 Response = unknown
332 >(
333 workerChoiceStrategiesContext:
334 | WorkerChoiceStrategiesContext<Worker, Data, Response>
335 | undefined,
336 workerUsage: WorkerUsage,
337 message: MessageValue<Response>
338 ): void => {
339 if (message.workerError != null) {
340 return
341 }
342 const eluTaskStatisticsRequirements =
343 workerChoiceStrategiesContext?.getTaskStatisticsRequirements().elu
344 updateMeasurementStatistics(
345 workerUsage.elu.active,
346 eluTaskStatisticsRequirements,
347 message.taskPerformance?.elu?.active ?? 0
348 )
349 updateMeasurementStatistics(
350 workerUsage.elu.idle,
351 eluTaskStatisticsRequirements,
352 message.taskPerformance?.elu?.idle ?? 0
353 )
354 if (eluTaskStatisticsRequirements?.aggregate === true) {
355 if (message.taskPerformance?.elu != null) {
356 if (workerUsage.elu.utilization != null) {
357 workerUsage.elu.utilization =
358 (workerUsage.elu.utilization +
359 message.taskPerformance.elu.utilization) /
360 2
361 } else {
362 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
363 }
364 }
365 }
366 }
367
368 export const createWorker = <Worker extends IWorker>(
369 type: WorkerType,
370 filePath: string,
371 opts: { env?: Record<string, unknown>; workerOptions?: WorkerOptions }
372 ): Worker => {
373 switch (type) {
374 case WorkerTypes.thread:
375 return new ThreadWorker(filePath, {
376 env: SHARE_ENV,
377 ...opts.workerOptions,
378 }) as unknown as Worker
379 case WorkerTypes.cluster:
380 return cluster.fork(opts.env) as unknown as Worker
381 default:
382 // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
383 throw new Error(`Unknown worker type '${type}'`)
384 }
385 }
386
387 /**
388 * Returns the worker type of the given worker.
389 * @param worker - The worker to get the type of.
390 * @returns The worker type of the given worker.
391 * @internal
392 */
393 export const getWorkerType = (worker: IWorker): WorkerType | undefined => {
394 if (worker instanceof ThreadWorker) {
395 return WorkerTypes.thread
396 } else if (worker instanceof ClusterWorker) {
397 return WorkerTypes.cluster
398 }
399 }
400
401 /**
402 * Returns the worker id of the given worker.
403 * @param worker - The worker to get the id of.
404 * @returns The worker id of the given worker.
405 * @internal
406 */
407 export const getWorkerId = (worker: IWorker): number | undefined => {
408 if (worker instanceof ThreadWorker) {
409 return worker.threadId
410 } else if (worker instanceof ClusterWorker) {
411 return worker.id
412 }
413 }
414
415 export const waitWorkerNodeEvents = async <
416 Worker extends IWorker,
417 Data = unknown
418 >(
419 workerNode: IWorkerNode<Worker, Data>,
420 workerNodeEvent: string,
421 numberOfEventsToWait: number,
422 timeout: number
423 ): Promise<number> => {
424 return await new Promise<number>(resolve => {
425 let events = 0
426 if (numberOfEventsToWait === 0) {
427 resolve(events)
428 return
429 }
430 switch (workerNodeEvent) {
431 case 'idle':
432 case 'backPressure':
433 case 'taskFinished':
434 workerNode.on(workerNodeEvent, () => {
435 ++events
436 if (events === numberOfEventsToWait) {
437 resolve(events)
438 }
439 })
440 break
441 default:
442 throw new Error('Invalid worker node event')
443 }
444 if (timeout >= 0) {
445 setTimeout(() => {
446 resolve(events)
447 }, timeout)
448 }
449 })
450 }