]> Piment Noir Git Repositories - poolifier.git/blame - src/pools/utils.ts
docs: publish documentation
[poolifier.git] / src / pools / utils.ts
CommitLineData
e9ed6eee 1import cluster, { Worker as ClusterWorker } from 'node:cluster'
ded253e2 2import { existsSync } from 'node:fs'
ded253e2 3import { env } from 'node:process'
e9ed6eee
JB
4import {
5 SHARE_ENV,
6 Worker as ThreadWorker,
3a502712 7 type WorkerOptions,
e9ed6eee 8} from 'node:worker_threads'
ded253e2 9
d35e5717 10import type { MessageValue, Task } from '../utility-types.js'
bcfb06ce 11import type { TasksQueueOptions } from './pool.js'
97231086
JB
12import type { WorkerChoiceStrategiesContext } from './selection-strategies/worker-choice-strategies-context.js'
13
14import { average, isPlainObject, max, median, min } from '../utils.js'
bde6b5d7 15import {
bfc75cca 16 type MeasurementStatisticsRequirements,
bde6b5d7 17 WorkerChoiceStrategies,
3a502712 18 type WorkerChoiceStrategy,
d35e5717 19} from './selection-strategies/selection-strategies-types.js'
c3719753
JB
20import {
21 type IWorker,
d41a44de 22 type IWorkerNode,
c3719753 23 type MeasurementStatistics,
559c196a 24 type WorkerInfo,
c3719753
JB
25 type WorkerNodeOptions,
26 type WorkerType,
c329fd41 27 WorkerTypes,
3a502712 28 type WorkerUsage,
d35e5717 29} from './worker.js'
bde6b5d7 30
e9ed6eee
JB
31/**
32 * Default measurement statistics requirements.
33 */
7cec62a4
JB
34export const DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS: Readonly<MeasurementStatisticsRequirements> =
35 Object.freeze({
e9ed6eee
JB
36 aggregate: false,
37 average: false,
3a502712 38 median: false,
7cec62a4 39 })
e9ed6eee 40
32b141fd
JB
41export const getDefaultTasksQueueOptions = (
42 poolMaxSize: number
7cec62a4
JB
43): Required<Readonly<TasksQueueOptions>> => {
44 return Object.freeze({
32b141fd 45 concurrency: 1,
963e6682 46 size: poolMaxSize ** 2,
97231086 47 tasksFinishedTimeout: 2000,
f09b1954 48 tasksStealingOnBackPressure: true,
e25f86b3 49 tasksStealingRatio: 0.6,
97231086 50 taskStealing: true,
7cec62a4 51 })
32b141fd
JB
52}
53
c63a35a0 54export const checkFilePath = (filePath: string | undefined): void => {
c3719753
JB
55 if (filePath == null) {
56 throw new TypeError('The worker file path must be specified')
57 }
58 if (typeof filePath !== 'string') {
59 throw new TypeError('The worker file path must be a string')
60 }
bde6b5d7
JB
61 if (!existsSync(filePath)) {
62 throw new Error(`Cannot find the worker file '${filePath}'`)
63 }
64}
65
c63a35a0
JB
66export const checkDynamicPoolSize = (
67 min: number,
68 max: number | undefined
69): void => {
bde6b5d7
JB
70 if (max == null) {
71 throw new TypeError(
72 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
73 )
963e6682
JB
74 }
75 if (!Number.isSafeInteger(max)) {
bde6b5d7
JB
76 throw new TypeError(
77 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
78 )
963e6682
JB
79 }
80 if (min > max) {
bde6b5d7
JB
81 throw new RangeError(
82 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
83 )
963e6682
JB
84 }
85 if (max === 0) {
bde6b5d7
JB
86 throw new RangeError(
87 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
88 )
963e6682
JB
89 }
90 if (min === max) {
bde6b5d7
JB
91 throw new RangeError(
92 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
93 )
94 }
95}
96
d0bd5062
JB
97export const checkValidPriority = (priority: number | undefined): void => {
98 if (priority != null && !Number.isSafeInteger(priority)) {
6e5d7052 99 throw new TypeError(`Invalid property 'priority': '${priority.toString()}'`)
d0bd5062
JB
100 }
101 if (
102 priority != null &&
103 Number.isSafeInteger(priority) &&
104 (priority < -20 || priority > 19)
105 ) {
85bbc7ab 106 throw new RangeError("Property 'priority' must be between -20 and 19")
d0bd5062
JB
107 }
108}
109
bde6b5d7 110export const checkValidWorkerChoiceStrategy = (
97231086 111 workerChoiceStrategy: undefined | WorkerChoiceStrategy
bde6b5d7
JB
112): void => {
113 if (
114 workerChoiceStrategy != null &&
115 !Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)
116 ) {
117 throw new Error(`Invalid worker choice strategy '${workerChoiceStrategy}'`)
118 }
119}
120
121export const checkValidTasksQueueOptions = (
c63a35a0 122 tasksQueueOptions: TasksQueueOptions | undefined
bde6b5d7
JB
123): void => {
124 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
125 throw new TypeError('Invalid tasks queue options: must be a plain object')
126 }
127 if (
128 tasksQueueOptions?.concurrency != null &&
129 !Number.isSafeInteger(tasksQueueOptions.concurrency)
130 ) {
131 throw new TypeError(
132 'Invalid worker node tasks concurrency: must be an integer'
133 )
134 }
135 if (
136 tasksQueueOptions?.concurrency != null &&
137 tasksQueueOptions.concurrency <= 0
138 ) {
139 throw new RangeError(
6e5d7052 140 `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency.toString()} is a negative integer or zero`
bde6b5d7
JB
141 )
142 }
143 if (
144 tasksQueueOptions?.size != null &&
145 !Number.isSafeInteger(tasksQueueOptions.size)
146 ) {
147 throw new TypeError(
148 'Invalid worker node tasks queue size: must be an integer'
149 )
150 }
151 if (tasksQueueOptions?.size != null && tasksQueueOptions.size <= 0) {
152 throw new RangeError(
6e5d7052 153 `Invalid worker node tasks queue size: ${tasksQueueOptions.size.toString()} is a negative integer or zero`
bde6b5d7
JB
154 )
155 }
e25f86b3
JB
156 if (
157 tasksQueueOptions?.tasksStealingRatio != null &&
158 typeof tasksQueueOptions.tasksStealingRatio !== 'number'
159 ) {
160 throw new TypeError(
161 'Invalid worker node tasks stealing ratio: must be a number'
162 )
163 }
164 if (
165 tasksQueueOptions?.tasksStealingRatio != null &&
166 (tasksQueueOptions.tasksStealingRatio < 0 ||
167 tasksQueueOptions.tasksStealingRatio > 1)
168 ) {
169 throw new RangeError(
170 'Invalid worker node tasks stealing ratio: must be between 0 and 1'
171 )
172 }
bde6b5d7 173}
bfc75cca 174
c3719753 175export const checkWorkerNodeArguments = (
97231086 176 type: undefined | WorkerType,
c63a35a0 177 filePath: string | undefined,
97231086 178 opts: undefined | WorkerNodeOptions
9a38f99e 179): void => {
c3719753
JB
180 if (type == null) {
181 throw new TypeError('Cannot construct a worker node without a worker type')
182 }
183 if (!Object.values(WorkerTypes).includes(type)) {
184 throw new TypeError(
185 `Cannot construct a worker node with an invalid worker type '${type}'`
186 )
9a38f99e 187 }
c3719753
JB
188 checkFilePath(filePath)
189 if (opts == null) {
9a38f99e 190 throw new TypeError(
c3719753 191 'Cannot construct a worker node without worker node options'
9a38f99e
JB
192 )
193 }
9974369e 194 if (!isPlainObject(opts)) {
9a38f99e 195 throw new TypeError(
fcfc3353 196 'Cannot construct a worker node with invalid worker node options: must be a plain object'
9a38f99e
JB
197 )
198 }
c3719753
JB
199 if (opts.tasksQueueBackPressureSize == null) {
200 throw new TypeError(
201 'Cannot construct a worker node without a tasks queue back pressure size option'
202 )
203 }
204 if (!Number.isSafeInteger(opts.tasksQueueBackPressureSize)) {
205 throw new TypeError(
206 'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer'
207 )
208 }
209 if (opts.tasksQueueBackPressureSize <= 0) {
9a38f99e 210 throw new RangeError(
c3719753 211 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer'
9a38f99e
JB
212 )
213 }
59ca7cff
JB
214 if (opts.tasksQueueBucketSize == null) {
215 throw new TypeError(
216 'Cannot construct a worker node without a tasks queue bucket size option'
217 )
218 }
219 if (!Number.isSafeInteger(opts.tasksQueueBucketSize)) {
220 throw new TypeError(
221 'Cannot construct a worker node with a tasks queue bucket size option that is not an integer'
222 )
223 }
224 if (opts.tasksQueueBucketSize <= 0) {
225 throw new RangeError(
226 'Cannot construct a worker node with a tasks queue bucket size option that is not a positive integer'
227 )
228 }
fcfc3353
JB
229 if (opts.tasksQueuePriority == null) {
230 throw new TypeError(
231 'Cannot construct a worker node without a tasks queue priority option'
232 )
233 }
234 if (typeof opts.tasksQueuePriority !== 'boolean') {
235 throw new TypeError(
236 'Cannot construct a worker node with a tasks queue priority option that is not a boolean'
237 )
238 }
9a38f99e 239}
bfc75cca
JB
240
241/**
242 * Updates the given measurement statistics.
bfc75cca
JB
243 * @param measurementStatistics - The measurement statistics to update.
244 * @param measurementRequirements - The measurement statistics requirements.
245 * @param measurementValue - The measurement value.
bfc75cca
JB
246 * @internal
247 */
c329fd41 248const updateMeasurementStatistics = (
bfc75cca 249 measurementStatistics: MeasurementStatistics,
c63a35a0
JB
250 measurementRequirements: MeasurementStatisticsRequirements | undefined,
251 measurementValue: number | undefined
bfc75cca 252): void => {
c63a35a0
JB
253 if (
254 measurementRequirements != null &&
255 measurementValue != null &&
256 measurementRequirements.aggregate
257 ) {
bfc75cca
JB
258 measurementStatistics.aggregate =
259 (measurementStatistics.aggregate ?? 0) + measurementValue
260 measurementStatistics.minimum = min(
261 measurementValue,
80115618 262 measurementStatistics.minimum ?? Number.POSITIVE_INFINITY
bfc75cca
JB
263 )
264 measurementStatistics.maximum = max(
265 measurementValue,
80115618 266 measurementStatistics.maximum ?? Number.NEGATIVE_INFINITY
bfc75cca 267 )
c63a35a0 268 if (measurementRequirements.average || measurementRequirements.median) {
f12182ad 269 measurementStatistics.history.put(measurementValue)
bfc75cca 270 if (measurementRequirements.average) {
f12182ad
JB
271 measurementStatistics.average = average(
272 measurementStatistics.history.toArray()
273 )
bfc75cca 274 } else if (measurementStatistics.average != null) {
f6392d50 275 measurementStatistics.average = undefined
bfc75cca
JB
276 }
277 if (measurementRequirements.median) {
f12182ad
JB
278 measurementStatistics.median = median(
279 measurementStatistics.history.toArray()
280 )
bfc75cca 281 } else if (measurementStatistics.median != null) {
f6392d50 282 measurementStatistics.median = undefined
bfc75cca
JB
283 }
284 }
285 }
286}
c329fd41 287if (env.NODE_ENV === 'test') {
6e5d7052 288 // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
c329fd41
JB
289 exports.updateMeasurementStatistics = updateMeasurementStatistics
290}
291
292export const updateWaitTimeWorkerUsage = <
293 Worker extends IWorker,
294 Data = unknown,
295 Response = unknown
296>(
5bdd0e9a 297 workerChoiceStrategiesContext:
97231086
JB
298 | undefined
299 | WorkerChoiceStrategiesContext<Worker, Data, Response>,
c329fd41
JB
300 workerUsage: WorkerUsage,
301 task: Task<Data>
302 ): void => {
303 const timestamp = performance.now()
304 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
305 updateMeasurementStatistics(
306 workerUsage.waitTime,
5bdd0e9a 307 workerChoiceStrategiesContext?.getTaskStatisticsRequirements().waitTime,
c329fd41
JB
308 taskWaitTime
309 )
310}
311
312export const updateTaskStatisticsWorkerUsage = <Response = unknown>(
313 workerUsage: WorkerUsage,
314 message: MessageValue<Response>
315): void => {
316 const workerTaskStatistics = workerUsage.tasks
317 if (
6e5d7052 318 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
c329fd41
JB
319 workerTaskStatistics.executing != null &&
320 workerTaskStatistics.executing > 0
321 ) {
322 --workerTaskStatistics.executing
323 }
324 if (message.workerError == null) {
325 ++workerTaskStatistics.executed
326 } else {
327 ++workerTaskStatistics.failed
328 }
329}
330
331export const updateRunTimeWorkerUsage = <
332 Worker extends IWorker,
333 Data = unknown,
334 Response = unknown
335>(
5bdd0e9a 336 workerChoiceStrategiesContext:
97231086
JB
337 | undefined
338 | WorkerChoiceStrategiesContext<Worker, Data, Response>,
c329fd41
JB
339 workerUsage: WorkerUsage,
340 message: MessageValue<Response>
341 ): void => {
342 if (message.workerError != null) {
343 return
344 }
345 updateMeasurementStatistics(
346 workerUsage.runTime,
5bdd0e9a 347 workerChoiceStrategiesContext?.getTaskStatisticsRequirements().runTime,
c329fd41
JB
348 message.taskPerformance?.runTime ?? 0
349 )
350}
351
352export const updateEluWorkerUsage = <
353 Worker extends IWorker,
354 Data = unknown,
355 Response = unknown
356>(
5bdd0e9a 357 workerChoiceStrategiesContext:
97231086
JB
358 | undefined
359 | WorkerChoiceStrategiesContext<Worker, Data, Response>,
c329fd41
JB
360 workerUsage: WorkerUsage,
361 message: MessageValue<Response>
362 ): void => {
363 if (message.workerError != null) {
364 return
365 }
c63a35a0 366 const eluTaskStatisticsRequirements =
5bdd0e9a 367 workerChoiceStrategiesContext?.getTaskStatisticsRequirements().elu
c329fd41
JB
368 updateMeasurementStatistics(
369 workerUsage.elu.active,
370 eluTaskStatisticsRequirements,
371 message.taskPerformance?.elu?.active ?? 0
372 )
373 updateMeasurementStatistics(
374 workerUsage.elu.idle,
375 eluTaskStatisticsRequirements,
376 message.taskPerformance?.elu?.idle ?? 0
377 )
c63a35a0 378 if (eluTaskStatisticsRequirements?.aggregate === true) {
c329fd41 379 if (message.taskPerformance?.elu != null) {
f6392d50
JB
380 workerUsage.elu.count = (workerUsage.elu.count ?? 0) + 1
381 workerUsage.elu.utilization =
382 ((workerUsage.elu.utilization ?? 0) * (workerUsage.elu.count - 1) +
383 message.taskPerformance.elu.utilization) /
384 workerUsage.elu.count
c329fd41
JB
385 }
386 }
387}
c3719753 388
4a4ffb16 389// eslint-disable-next-line @typescript-eslint/no-unnecessary-type-parameters
c3719753
JB
390export const createWorker = <Worker extends IWorker>(
391 type: WorkerType,
392 filePath: string,
3a502712 393 opts: { env?: Record<string, unknown>; workerOptions?: WorkerOptions }
c3719753
JB
394): Worker => {
395 switch (type) {
97231086
JB
396 case WorkerTypes.cluster:
397 return cluster.fork(opts.env) as unknown as Worker
c3719753 398 case WorkerTypes.thread:
e9ed6eee 399 return new ThreadWorker(filePath, {
c3719753 400 env: SHARE_ENV,
3a502712 401 ...opts.workerOptions,
c3719753 402 }) as unknown as Worker
c3719753 403 default:
6e5d7052 404 // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
c3719753
JB
405 throw new Error(`Unknown worker type '${type}'`)
406 }
407}
d41a44de 408
e9ed6eee
JB
409/**
410 * Returns the worker type of the given worker.
e9ed6eee
JB
411 * @param worker - The worker to get the type of.
412 * @returns The worker type of the given worker.
413 * @internal
414 */
559c196a 415const getWorkerType = (worker: IWorker): undefined | WorkerType => {
e9ed6eee
JB
416 if (worker instanceof ThreadWorker) {
417 return WorkerTypes.thread
963e6682
JB
418 }
419 if (worker instanceof ClusterWorker) {
e9ed6eee
JB
420 return WorkerTypes.cluster
421 }
422}
423
424/**
425 * Returns the worker id of the given worker.
e9ed6eee
JB
426 * @param worker - The worker to get the id of.
427 * @returns The worker id of the given worker.
428 * @internal
429 */
559c196a 430const getWorkerId = (worker: IWorker): number | undefined => {
e9ed6eee
JB
431 if (worker instanceof ThreadWorker) {
432 return worker.threadId
963e6682
JB
433 }
434 if (worker instanceof ClusterWorker) {
e9ed6eee
JB
435 return worker.id
436 }
437}
438
559c196a
JB
439export const initWorkerInfo = (worker: IWorker): WorkerInfo => {
440 return {
441 backPressure: false,
442 backPressureStealing: false,
443 continuousStealing: false,
444 dynamic: false,
445 id: getWorkerId(worker),
f4289ecb 446 queuedTaskAbortion: false,
559c196a
JB
447 ready: false,
448 stealing: false,
449 stolen: false,
450 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
451 type: getWorkerType(worker)!,
452 }
453}
454
d41a44de
JB
455export const waitWorkerNodeEvents = async <
456 Worker extends IWorker,
457 Data = unknown
458>(
459 workerNode: IWorkerNode<Worker, Data>,
460 workerNodeEvent: string,
32b141fd
JB
461 numberOfEventsToWait: number,
462 timeout: number
d41a44de
JB
463): Promise<number> => {
464 return await new Promise<number>(resolve => {
465 let events = 0
466 if (numberOfEventsToWait === 0) {
467 resolve(events)
468 return
469 }
2ef26de4 470 switch (workerNodeEvent) {
2ef26de4 471 case 'backPressure':
97231086 472 case 'idle':
2ef26de4
JB
473 case 'taskFinished':
474 workerNode.on(workerNodeEvent, () => {
475 ++events
476 if (events === numberOfEventsToWait) {
477 resolve(events)
478 }
479 })
480 break
481 default:
482 throw new Error('Invalid worker node event')
483 }
6f3a391b 484 if (timeout >= 0) {
32b141fd
JB
485 setTimeout(() => {
486 resolve(events)
487 }, timeout)
488 }
d41a44de
JB
489 })
490}