]> Piment Noir Git Repositories - poolifier.git/blame_incremental - src/pools/utils.ts
docs: publish documentation
[poolifier.git] / src / pools / utils.ts
... / ...
CommitLineData
1import cluster, { Worker as ClusterWorker } from 'node:cluster'
2import { existsSync } from 'node:fs'
3import { env } from 'node:process'
4import {
5 SHARE_ENV,
6 Worker as ThreadWorker,
7 type WorkerOptions,
8} from 'node:worker_threads'
9
10import type { MessageValue, Task } from '../utility-types.js'
11import type { TasksQueueOptions } from './pool.js'
12import type { WorkerChoiceStrategiesContext } from './selection-strategies/worker-choice-strategies-context.js'
13
14import { average, isPlainObject, max, median, min } from '../utils.js'
15import {
16 type MeasurementStatisticsRequirements,
17 WorkerChoiceStrategies,
18 type WorkerChoiceStrategy,
19} from './selection-strategies/selection-strategies-types.js'
20import {
21 type IWorker,
22 type IWorkerNode,
23 type MeasurementStatistics,
24 type WorkerInfo,
25 type WorkerNodeOptions,
26 type WorkerType,
27 WorkerTypes,
28 type WorkerUsage,
29} from './worker.js'
30
31/**
32 * Default measurement statistics requirements.
33 */
34export const DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS: Readonly<MeasurementStatisticsRequirements> =
35 Object.freeze({
36 aggregate: false,
37 average: false,
38 median: false,
39 })
40
41export const getDefaultTasksQueueOptions = (
42 poolMaxSize: number
43): Required<Readonly<TasksQueueOptions>> => {
44 return Object.freeze({
45 concurrency: 1,
46 size: poolMaxSize ** 2,
47 tasksFinishedTimeout: 2000,
48 tasksStealingOnBackPressure: true,
49 tasksStealingRatio: 0.6,
50 taskStealing: true,
51 })
52}
53
54export const checkFilePath = (filePath: string | undefined): void => {
55 if (filePath == null) {
56 throw new TypeError('The worker file path must be specified')
57 }
58 if (typeof filePath !== 'string') {
59 throw new TypeError('The worker file path must be a string')
60 }
61 if (!existsSync(filePath)) {
62 throw new Error(`Cannot find the worker file '${filePath}'`)
63 }
64}
65
66export const checkDynamicPoolSize = (
67 min: number,
68 max: number | undefined
69): void => {
70 if (max == null) {
71 throw new TypeError(
72 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
73 )
74 }
75 if (!Number.isSafeInteger(max)) {
76 throw new TypeError(
77 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
78 )
79 }
80 if (min > max) {
81 throw new RangeError(
82 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
83 )
84 }
85 if (max === 0) {
86 throw new RangeError(
87 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
88 )
89 }
90 if (min === max) {
91 throw new RangeError(
92 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
93 )
94 }
95}
96
97export const checkValidPriority = (priority: number | undefined): void => {
98 if (priority != null && !Number.isSafeInteger(priority)) {
99 throw new TypeError(`Invalid property 'priority': '${priority.toString()}'`)
100 }
101 if (
102 priority != null &&
103 Number.isSafeInteger(priority) &&
104 (priority < -20 || priority > 19)
105 ) {
106 throw new RangeError("Property 'priority' must be between -20 and 19")
107 }
108}
109
110export const checkValidWorkerChoiceStrategy = (
111 workerChoiceStrategy: undefined | WorkerChoiceStrategy
112): void => {
113 if (
114 workerChoiceStrategy != null &&
115 !Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)
116 ) {
117 throw new Error(`Invalid worker choice strategy '${workerChoiceStrategy}'`)
118 }
119}
120
121export const checkValidTasksQueueOptions = (
122 tasksQueueOptions: TasksQueueOptions | undefined
123): void => {
124 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
125 throw new TypeError('Invalid tasks queue options: must be a plain object')
126 }
127 if (
128 tasksQueueOptions?.concurrency != null &&
129 !Number.isSafeInteger(tasksQueueOptions.concurrency)
130 ) {
131 throw new TypeError(
132 'Invalid worker node tasks concurrency: must be an integer'
133 )
134 }
135 if (
136 tasksQueueOptions?.concurrency != null &&
137 tasksQueueOptions.concurrency <= 0
138 ) {
139 throw new RangeError(
140 `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency.toString()} is a negative integer or zero`
141 )
142 }
143 if (
144 tasksQueueOptions?.size != null &&
145 !Number.isSafeInteger(tasksQueueOptions.size)
146 ) {
147 throw new TypeError(
148 'Invalid worker node tasks queue size: must be an integer'
149 )
150 }
151 if (tasksQueueOptions?.size != null && tasksQueueOptions.size <= 0) {
152 throw new RangeError(
153 `Invalid worker node tasks queue size: ${tasksQueueOptions.size.toString()} is a negative integer or zero`
154 )
155 }
156 if (
157 tasksQueueOptions?.tasksStealingRatio != null &&
158 typeof tasksQueueOptions.tasksStealingRatio !== 'number'
159 ) {
160 throw new TypeError(
161 'Invalid worker node tasks stealing ratio: must be a number'
162 )
163 }
164 if (
165 tasksQueueOptions?.tasksStealingRatio != null &&
166 (tasksQueueOptions.tasksStealingRatio < 0 ||
167 tasksQueueOptions.tasksStealingRatio > 1)
168 ) {
169 throw new RangeError(
170 'Invalid worker node tasks stealing ratio: must be between 0 and 1'
171 )
172 }
173}
174
175export const checkWorkerNodeArguments = (
176 type: undefined | WorkerType,
177 filePath: string | undefined,
178 opts: undefined | WorkerNodeOptions
179): void => {
180 if (type == null) {
181 throw new TypeError('Cannot construct a worker node without a worker type')
182 }
183 if (!Object.values(WorkerTypes).includes(type)) {
184 throw new TypeError(
185 `Cannot construct a worker node with an invalid worker type '${type}'`
186 )
187 }
188 checkFilePath(filePath)
189 if (opts == null) {
190 throw new TypeError(
191 'Cannot construct a worker node without worker node options'
192 )
193 }
194 if (!isPlainObject(opts)) {
195 throw new TypeError(
196 'Cannot construct a worker node with invalid worker node options: must be a plain object'
197 )
198 }
199 if (opts.tasksQueueBackPressureSize == null) {
200 throw new TypeError(
201 'Cannot construct a worker node without a tasks queue back pressure size option'
202 )
203 }
204 if (!Number.isSafeInteger(opts.tasksQueueBackPressureSize)) {
205 throw new TypeError(
206 'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer'
207 )
208 }
209 if (opts.tasksQueueBackPressureSize <= 0) {
210 throw new RangeError(
211 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer'
212 )
213 }
214 if (opts.tasksQueueBucketSize == null) {
215 throw new TypeError(
216 'Cannot construct a worker node without a tasks queue bucket size option'
217 )
218 }
219 if (!Number.isSafeInteger(opts.tasksQueueBucketSize)) {
220 throw new TypeError(
221 'Cannot construct a worker node with a tasks queue bucket size option that is not an integer'
222 )
223 }
224 if (opts.tasksQueueBucketSize <= 0) {
225 throw new RangeError(
226 'Cannot construct a worker node with a tasks queue bucket size option that is not a positive integer'
227 )
228 }
229 if (opts.tasksQueuePriority == null) {
230 throw new TypeError(
231 'Cannot construct a worker node without a tasks queue priority option'
232 )
233 }
234 if (typeof opts.tasksQueuePriority !== 'boolean') {
235 throw new TypeError(
236 'Cannot construct a worker node with a tasks queue priority option that is not a boolean'
237 )
238 }
239}
240
241/**
242 * Updates the given measurement statistics.
243 * @param measurementStatistics - The measurement statistics to update.
244 * @param measurementRequirements - The measurement statistics requirements.
245 * @param measurementValue - The measurement value.
246 * @internal
247 */
248const updateMeasurementStatistics = (
249 measurementStatistics: MeasurementStatistics,
250 measurementRequirements: MeasurementStatisticsRequirements | undefined,
251 measurementValue: number | undefined
252): void => {
253 if (
254 measurementRequirements != null &&
255 measurementValue != null &&
256 measurementRequirements.aggregate
257 ) {
258 measurementStatistics.aggregate =
259 (measurementStatistics.aggregate ?? 0) + measurementValue
260 measurementStatistics.minimum = min(
261 measurementValue,
262 measurementStatistics.minimum ?? Number.POSITIVE_INFINITY
263 )
264 measurementStatistics.maximum = max(
265 measurementValue,
266 measurementStatistics.maximum ?? Number.NEGATIVE_INFINITY
267 )
268 if (measurementRequirements.average || measurementRequirements.median) {
269 measurementStatistics.history.put(measurementValue)
270 if (measurementRequirements.average) {
271 measurementStatistics.average = average(
272 measurementStatistics.history.toArray()
273 )
274 } else if (measurementStatistics.average != null) {
275 measurementStatistics.average = undefined
276 }
277 if (measurementRequirements.median) {
278 measurementStatistics.median = median(
279 measurementStatistics.history.toArray()
280 )
281 } else if (measurementStatistics.median != null) {
282 measurementStatistics.median = undefined
283 }
284 }
285 }
286}
287if (env.NODE_ENV === 'test') {
288 // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
289 exports.updateMeasurementStatistics = updateMeasurementStatistics
290}
291
292export const updateWaitTimeWorkerUsage = <
293 Worker extends IWorker,
294 Data = unknown,
295 Response = unknown
296>(
297 workerChoiceStrategiesContext:
298 | undefined
299 | WorkerChoiceStrategiesContext<Worker, Data, Response>,
300 workerUsage: WorkerUsage,
301 task: Task<Data>
302 ): void => {
303 const timestamp = performance.now()
304 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
305 updateMeasurementStatistics(
306 workerUsage.waitTime,
307 workerChoiceStrategiesContext?.getTaskStatisticsRequirements().waitTime,
308 taskWaitTime
309 )
310}
311
312export const updateTaskStatisticsWorkerUsage = <Response = unknown>(
313 workerUsage: WorkerUsage,
314 message: MessageValue<Response>
315): void => {
316 const workerTaskStatistics = workerUsage.tasks
317 if (
318 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
319 workerTaskStatistics.executing != null &&
320 workerTaskStatistics.executing > 0
321 ) {
322 --workerTaskStatistics.executing
323 }
324 if (message.workerError == null) {
325 ++workerTaskStatistics.executed
326 } else {
327 ++workerTaskStatistics.failed
328 }
329}
330
331export const updateRunTimeWorkerUsage = <
332 Worker extends IWorker,
333 Data = unknown,
334 Response = unknown
335>(
336 workerChoiceStrategiesContext:
337 | undefined
338 | WorkerChoiceStrategiesContext<Worker, Data, Response>,
339 workerUsage: WorkerUsage,
340 message: MessageValue<Response>
341 ): void => {
342 if (message.workerError != null) {
343 return
344 }
345 updateMeasurementStatistics(
346 workerUsage.runTime,
347 workerChoiceStrategiesContext?.getTaskStatisticsRequirements().runTime,
348 message.taskPerformance?.runTime ?? 0
349 )
350}
351
352export const updateEluWorkerUsage = <
353 Worker extends IWorker,
354 Data = unknown,
355 Response = unknown
356>(
357 workerChoiceStrategiesContext:
358 | undefined
359 | WorkerChoiceStrategiesContext<Worker, Data, Response>,
360 workerUsage: WorkerUsage,
361 message: MessageValue<Response>
362 ): void => {
363 if (message.workerError != null) {
364 return
365 }
366 const eluTaskStatisticsRequirements =
367 workerChoiceStrategiesContext?.getTaskStatisticsRequirements().elu
368 updateMeasurementStatistics(
369 workerUsage.elu.active,
370 eluTaskStatisticsRequirements,
371 message.taskPerformance?.elu?.active ?? 0
372 )
373 updateMeasurementStatistics(
374 workerUsage.elu.idle,
375 eluTaskStatisticsRequirements,
376 message.taskPerformance?.elu?.idle ?? 0
377 )
378 if (eluTaskStatisticsRequirements?.aggregate === true) {
379 if (message.taskPerformance?.elu != null) {
380 workerUsage.elu.count = (workerUsage.elu.count ?? 0) + 1
381 workerUsage.elu.utilization =
382 ((workerUsage.elu.utilization ?? 0) * (workerUsage.elu.count - 1) +
383 message.taskPerformance.elu.utilization) /
384 workerUsage.elu.count
385 }
386 }
387}
388
389// eslint-disable-next-line @typescript-eslint/no-unnecessary-type-parameters
390export const createWorker = <Worker extends IWorker>(
391 type: WorkerType,
392 filePath: string,
393 opts: { env?: Record<string, unknown>; workerOptions?: WorkerOptions }
394): Worker => {
395 switch (type) {
396 case WorkerTypes.cluster:
397 return cluster.fork(opts.env) as unknown as Worker
398 case WorkerTypes.thread:
399 return new ThreadWorker(filePath, {
400 env: SHARE_ENV,
401 ...opts.workerOptions,
402 }) as unknown as Worker
403 default:
404 // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
405 throw new Error(`Unknown worker type '${type}'`)
406 }
407}
408
409/**
410 * Returns the worker type of the given worker.
411 * @param worker - The worker to get the type of.
412 * @returns The worker type of the given worker.
413 * @internal
414 */
415const getWorkerType = (worker: IWorker): undefined | WorkerType => {
416 if (worker instanceof ThreadWorker) {
417 return WorkerTypes.thread
418 }
419 if (worker instanceof ClusterWorker) {
420 return WorkerTypes.cluster
421 }
422}
423
424/**
425 * Returns the worker id of the given worker.
426 * @param worker - The worker to get the id of.
427 * @returns The worker id of the given worker.
428 * @internal
429 */
430const getWorkerId = (worker: IWorker): number | undefined => {
431 if (worker instanceof ThreadWorker) {
432 return worker.threadId
433 }
434 if (worker instanceof ClusterWorker) {
435 return worker.id
436 }
437}
438
439export const initWorkerInfo = (worker: IWorker): WorkerInfo => {
440 return {
441 backPressure: false,
442 backPressureStealing: false,
443 continuousStealing: false,
444 dynamic: false,
445 id: getWorkerId(worker),
446 queuedTaskAbortion: false,
447 ready: false,
448 stealing: false,
449 stolen: false,
450 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
451 type: getWorkerType(worker)!,
452 }
453}
454
455export const waitWorkerNodeEvents = async <
456 Worker extends IWorker,
457 Data = unknown
458>(
459 workerNode: IWorkerNode<Worker, Data>,
460 workerNodeEvent: string,
461 numberOfEventsToWait: number,
462 timeout: number
463): Promise<number> => {
464 return await new Promise<number>(resolve => {
465 let events = 0
466 if (numberOfEventsToWait === 0) {
467 resolve(events)
468 return
469 }
470 switch (workerNodeEvent) {
471 case 'backPressure':
472 case 'idle':
473 case 'taskFinished':
474 workerNode.on(workerNodeEvent, () => {
475 ++events
476 if (events === numberOfEventsToWait) {
477 resolve(events)
478 }
479 })
480 break
481 default:
482 throw new Error('Invalid worker node event')
483 }
484 if (timeout >= 0) {
485 setTimeout(() => {
486 resolve(events)
487 }, timeout)
488 }
489 })
490}