build: fix eslint configuration with type checking
[poolifier.git] / src / pools / utils.ts
CommitLineData
e9ed6eee 1import cluster, { Worker as ClusterWorker } from 'node:cluster'
ded253e2 2import { existsSync } from 'node:fs'
ded253e2 3import { env } from 'node:process'
e9ed6eee
JB
4import {
5 SHARE_ENV,
6 Worker as ThreadWorker,
3a502712 7 type WorkerOptions,
e9ed6eee 8} from 'node:worker_threads'
ded253e2 9
d35e5717 10import type { MessageValue, Task } from '../utility-types.js'
ded253e2 11import { average, isPlainObject, max, median, min } from '../utils.js'
bcfb06ce 12import type { TasksQueueOptions } from './pool.js'
bde6b5d7 13import {
bfc75cca 14 type MeasurementStatisticsRequirements,
bde6b5d7 15 WorkerChoiceStrategies,
3a502712 16 type WorkerChoiceStrategy,
d35e5717 17} from './selection-strategies/selection-strategies-types.js'
bcfb06ce 18import type { WorkerChoiceStrategiesContext } from './selection-strategies/worker-choice-strategies-context.js'
c3719753
JB
19import {
20 type IWorker,
d41a44de 21 type IWorkerNode,
c3719753
JB
22 type MeasurementStatistics,
23 type WorkerNodeOptions,
24 type WorkerType,
c329fd41 25 WorkerTypes,
3a502712 26 type WorkerUsage,
d35e5717 27} from './worker.js'
bde6b5d7 28
e9ed6eee
JB
29/**
30 * Default measurement statistics requirements.
31 */
32export const DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS: MeasurementStatisticsRequirements =
33 {
34 aggregate: false,
35 average: false,
3a502712 36 median: false,
e9ed6eee
JB
37 }
38
32b141fd
JB
39export const getDefaultTasksQueueOptions = (
40 poolMaxSize: number
41): Required<TasksQueueOptions> => {
42 return {
43 size: Math.pow(poolMaxSize, 2),
44 concurrency: 1,
45 taskStealing: true,
2eee7220 46 tasksStealingOnBackPressure: false,
3a502712 47 tasksFinishedTimeout: 2000,
32b141fd
JB
48 }
49}
50
c63a35a0 51export const checkFilePath = (filePath: string | undefined): void => {
c3719753
JB
52 if (filePath == null) {
53 throw new TypeError('The worker file path must be specified')
54 }
55 if (typeof filePath !== 'string') {
56 throw new TypeError('The worker file path must be a string')
57 }
bde6b5d7
JB
58 if (!existsSync(filePath)) {
59 throw new Error(`Cannot find the worker file '${filePath}'`)
60 }
61}
62
c63a35a0
JB
63export const checkDynamicPoolSize = (
64 min: number,
65 max: number | undefined
66): void => {
bde6b5d7
JB
67 if (max == null) {
68 throw new TypeError(
69 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
70 )
71 } else if (!Number.isSafeInteger(max)) {
72 throw new TypeError(
73 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
74 )
75 } else if (min > max) {
76 throw new RangeError(
77 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
78 )
79 } else if (max === 0) {
80 throw new RangeError(
81 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
82 )
83 } else if (min === max) {
84 throw new RangeError(
85 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
86 )
87 }
88}
89
d0bd5062
JB
90export const checkValidPriority = (priority: number | undefined): void => {
91 if (priority != null && !Number.isSafeInteger(priority)) {
85bbc7ab 92 throw new TypeError(`Invalid property 'priority': '${priority}'`)
d0bd5062
JB
93 }
94 if (
95 priority != null &&
96 Number.isSafeInteger(priority) &&
97 (priority < -20 || priority > 19)
98 ) {
85bbc7ab 99 throw new RangeError("Property 'priority' must be between -20 and 19")
d0bd5062
JB
100 }
101}
102
bde6b5d7 103export const checkValidWorkerChoiceStrategy = (
c63a35a0 104 workerChoiceStrategy: WorkerChoiceStrategy | undefined
bde6b5d7
JB
105): void => {
106 if (
107 workerChoiceStrategy != null &&
108 !Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)
109 ) {
110 throw new Error(`Invalid worker choice strategy '${workerChoiceStrategy}'`)
111 }
112}
113
114export const checkValidTasksQueueOptions = (
c63a35a0 115 tasksQueueOptions: TasksQueueOptions | undefined
bde6b5d7
JB
116): void => {
117 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
118 throw new TypeError('Invalid tasks queue options: must be a plain object')
119 }
120 if (
121 tasksQueueOptions?.concurrency != null &&
122 !Number.isSafeInteger(tasksQueueOptions.concurrency)
123 ) {
124 throw new TypeError(
125 'Invalid worker node tasks concurrency: must be an integer'
126 )
127 }
128 if (
129 tasksQueueOptions?.concurrency != null &&
130 tasksQueueOptions.concurrency <= 0
131 ) {
132 throw new RangeError(
133 `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero`
134 )
135 }
136 if (
137 tasksQueueOptions?.size != null &&
138 !Number.isSafeInteger(tasksQueueOptions.size)
139 ) {
140 throw new TypeError(
141 'Invalid worker node tasks queue size: must be an integer'
142 )
143 }
144 if (tasksQueueOptions?.size != null && tasksQueueOptions.size <= 0) {
145 throw new RangeError(
146 `Invalid worker node tasks queue size: ${tasksQueueOptions.size} is a negative integer or zero`
147 )
148 }
149}
bfc75cca 150
c3719753 151export const checkWorkerNodeArguments = (
c63a35a0
JB
152 type: WorkerType | undefined,
153 filePath: string | undefined,
154 opts: WorkerNodeOptions | undefined
9a38f99e 155): void => {
c3719753
JB
156 if (type == null) {
157 throw new TypeError('Cannot construct a worker node without a worker type')
158 }
159 if (!Object.values(WorkerTypes).includes(type)) {
160 throw new TypeError(
161 `Cannot construct a worker node with an invalid worker type '${type}'`
162 )
9a38f99e 163 }
c3719753
JB
164 checkFilePath(filePath)
165 if (opts == null) {
9a38f99e 166 throw new TypeError(
c3719753 167 'Cannot construct a worker node without worker node options'
9a38f99e
JB
168 )
169 }
9974369e 170 if (!isPlainObject(opts)) {
9a38f99e 171 throw new TypeError(
fcfc3353 172 'Cannot construct a worker node with invalid worker node options: must be a plain object'
9a38f99e
JB
173 )
174 }
c3719753
JB
175 if (opts.tasksQueueBackPressureSize == null) {
176 throw new TypeError(
177 'Cannot construct a worker node without a tasks queue back pressure size option'
178 )
179 }
180 if (!Number.isSafeInteger(opts.tasksQueueBackPressureSize)) {
181 throw new TypeError(
182 'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer'
183 )
184 }
185 if (opts.tasksQueueBackPressureSize <= 0) {
9a38f99e 186 throw new RangeError(
c3719753 187 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer'
9a38f99e
JB
188 )
189 }
59ca7cff
JB
190 if (opts.tasksQueueBucketSize == null) {
191 throw new TypeError(
192 'Cannot construct a worker node without a tasks queue bucket size option'
193 )
194 }
195 if (!Number.isSafeInteger(opts.tasksQueueBucketSize)) {
196 throw new TypeError(
197 'Cannot construct a worker node with a tasks queue bucket size option that is not an integer'
198 )
199 }
200 if (opts.tasksQueueBucketSize <= 0) {
201 throw new RangeError(
202 'Cannot construct a worker node with a tasks queue bucket size option that is not a positive integer'
203 )
204 }
fcfc3353
JB
205 if (opts.tasksQueuePriority == null) {
206 throw new TypeError(
207 'Cannot construct a worker node without a tasks queue priority option'
208 )
209 }
210 if (typeof opts.tasksQueuePriority !== 'boolean') {
211 throw new TypeError(
212 'Cannot construct a worker node with a tasks queue priority option that is not a boolean'
213 )
214 }
9a38f99e 215}
bfc75cca
JB
216
217/**
218 * Updates the given measurement statistics.
bfc75cca
JB
219 * @param measurementStatistics - The measurement statistics to update.
220 * @param measurementRequirements - The measurement statistics requirements.
221 * @param measurementValue - The measurement value.
bfc75cca
JB
222 * @internal
223 */
c329fd41 224const updateMeasurementStatistics = (
bfc75cca 225 measurementStatistics: MeasurementStatistics,
c63a35a0
JB
226 measurementRequirements: MeasurementStatisticsRequirements | undefined,
227 measurementValue: number | undefined
bfc75cca 228): void => {
c63a35a0
JB
229 if (
230 measurementRequirements != null &&
231 measurementValue != null &&
232 measurementRequirements.aggregate
233 ) {
bfc75cca
JB
234 measurementStatistics.aggregate =
235 (measurementStatistics.aggregate ?? 0) + measurementValue
236 measurementStatistics.minimum = min(
237 measurementValue,
80115618 238 measurementStatistics.minimum ?? Number.POSITIVE_INFINITY
bfc75cca
JB
239 )
240 measurementStatistics.maximum = max(
241 measurementValue,
80115618 242 measurementStatistics.maximum ?? Number.NEGATIVE_INFINITY
bfc75cca 243 )
c63a35a0 244 if (measurementRequirements.average || measurementRequirements.median) {
f12182ad 245 measurementStatistics.history.put(measurementValue)
bfc75cca 246 if (measurementRequirements.average) {
f12182ad
JB
247 measurementStatistics.average = average(
248 measurementStatistics.history.toArray()
249 )
bfc75cca
JB
250 } else if (measurementStatistics.average != null) {
251 delete measurementStatistics.average
252 }
253 if (measurementRequirements.median) {
f12182ad
JB
254 measurementStatistics.median = median(
255 measurementStatistics.history.toArray()
256 )
bfc75cca
JB
257 } else if (measurementStatistics.median != null) {
258 delete measurementStatistics.median
259 }
260 }
261 }
262}
c329fd41 263if (env.NODE_ENV === 'test') {
c329fd41
JB
264 exports.updateMeasurementStatistics = updateMeasurementStatistics
265}
266
267export const updateWaitTimeWorkerUsage = <
268 Worker extends IWorker,
269 Data = unknown,
270 Response = unknown
271>(
5bdd0e9a 272 workerChoiceStrategiesContext:
bcfb06ce 273 | WorkerChoiceStrategiesContext<Worker, Data, Response>
c63a35a0 274 | undefined,
c329fd41
JB
275 workerUsage: WorkerUsage,
276 task: Task<Data>
277 ): void => {
278 const timestamp = performance.now()
279 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
280 updateMeasurementStatistics(
281 workerUsage.waitTime,
5bdd0e9a 282 workerChoiceStrategiesContext?.getTaskStatisticsRequirements().waitTime,
c329fd41
JB
283 taskWaitTime
284 )
285}
286
287export const updateTaskStatisticsWorkerUsage = <Response = unknown>(
288 workerUsage: WorkerUsage,
289 message: MessageValue<Response>
290): void => {
291 const workerTaskStatistics = workerUsage.tasks
292 if (
293 workerTaskStatistics.executing != null &&
294 workerTaskStatistics.executing > 0
295 ) {
296 --workerTaskStatistics.executing
297 }
298 if (message.workerError == null) {
299 ++workerTaskStatistics.executed
300 } else {
301 ++workerTaskStatistics.failed
302 }
303}
304
305export const updateRunTimeWorkerUsage = <
306 Worker extends IWorker,
307 Data = unknown,
308 Response = unknown
309>(
5bdd0e9a 310 workerChoiceStrategiesContext:
bcfb06ce 311 | WorkerChoiceStrategiesContext<Worker, Data, Response>
c63a35a0 312 | undefined,
c329fd41
JB
313 workerUsage: WorkerUsage,
314 message: MessageValue<Response>
315 ): void => {
316 if (message.workerError != null) {
317 return
318 }
319 updateMeasurementStatistics(
320 workerUsage.runTime,
5bdd0e9a 321 workerChoiceStrategiesContext?.getTaskStatisticsRequirements().runTime,
c329fd41
JB
322 message.taskPerformance?.runTime ?? 0
323 )
324}
325
326export const updateEluWorkerUsage = <
327 Worker extends IWorker,
328 Data = unknown,
329 Response = unknown
330>(
5bdd0e9a 331 workerChoiceStrategiesContext:
bcfb06ce 332 | WorkerChoiceStrategiesContext<Worker, Data, Response>
c63a35a0 333 | undefined,
c329fd41
JB
334 workerUsage: WorkerUsage,
335 message: MessageValue<Response>
336 ): void => {
337 if (message.workerError != null) {
338 return
339 }
c63a35a0 340 const eluTaskStatisticsRequirements =
5bdd0e9a 341 workerChoiceStrategiesContext?.getTaskStatisticsRequirements().elu
c329fd41
JB
342 updateMeasurementStatistics(
343 workerUsage.elu.active,
344 eluTaskStatisticsRequirements,
345 message.taskPerformance?.elu?.active ?? 0
346 )
347 updateMeasurementStatistics(
348 workerUsage.elu.idle,
349 eluTaskStatisticsRequirements,
350 message.taskPerformance?.elu?.idle ?? 0
351 )
c63a35a0 352 if (eluTaskStatisticsRequirements?.aggregate === true) {
c329fd41
JB
353 if (message.taskPerformance?.elu != null) {
354 if (workerUsage.elu.utilization != null) {
355 workerUsage.elu.utilization =
356 (workerUsage.elu.utilization +
357 message.taskPerformance.elu.utilization) /
358 2
359 } else {
360 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
361 }
362 }
363 }
364}
c3719753
JB
365
366export const createWorker = <Worker extends IWorker>(
367 type: WorkerType,
368 filePath: string,
3a502712 369 opts: { env?: Record<string, unknown>; workerOptions?: WorkerOptions }
c3719753
JB
370): Worker => {
371 switch (type) {
372 case WorkerTypes.thread:
e9ed6eee 373 return new ThreadWorker(filePath, {
c3719753 374 env: SHARE_ENV,
3a502712 375 ...opts.workerOptions,
c3719753
JB
376 }) as unknown as Worker
377 case WorkerTypes.cluster:
c63a35a0 378 return cluster.fork(opts.env) as unknown as Worker
c3719753 379 default:
c3719753
JB
380 throw new Error(`Unknown worker type '${type}'`)
381 }
382}
d41a44de 383
e9ed6eee
JB
384/**
385 * Returns the worker type of the given worker.
e9ed6eee
JB
386 * @param worker - The worker to get the type of.
387 * @returns The worker type of the given worker.
388 * @internal
389 */
390export const getWorkerType = (worker: IWorker): WorkerType | undefined => {
391 if (worker instanceof ThreadWorker) {
392 return WorkerTypes.thread
393 } else if (worker instanceof ClusterWorker) {
394 return WorkerTypes.cluster
395 }
396}
397
398/**
399 * Returns the worker id of the given worker.
e9ed6eee
JB
400 * @param worker - The worker to get the id of.
401 * @returns The worker id of the given worker.
402 * @internal
403 */
404export const getWorkerId = (worker: IWorker): number | undefined => {
405 if (worker instanceof ThreadWorker) {
406 return worker.threadId
407 } else if (worker instanceof ClusterWorker) {
408 return worker.id
409 }
410}
411
d41a44de
JB
412export const waitWorkerNodeEvents = async <
413 Worker extends IWorker,
414 Data = unknown
415>(
416 workerNode: IWorkerNode<Worker, Data>,
417 workerNodeEvent: string,
32b141fd
JB
418 numberOfEventsToWait: number,
419 timeout: number
d41a44de
JB
420): Promise<number> => {
421 return await new Promise<number>(resolve => {
422 let events = 0
423 if (numberOfEventsToWait === 0) {
424 resolve(events)
425 return
426 }
2ef26de4
JB
427 switch (workerNodeEvent) {
428 case 'idle':
429 case 'backPressure':
430 case 'taskFinished':
431 workerNode.on(workerNodeEvent, () => {
432 ++events
433 if (events === numberOfEventsToWait) {
434 resolve(events)
435 }
436 })
437 break
438 default:
439 throw new Error('Invalid worker node event')
440 }
6f3a391b 441 if (timeout >= 0) {
32b141fd
JB
442 setTimeout(() => {
443 resolve(events)
444 }, timeout)
445 }
d41a44de
JB
446 })
447}