docs: generate documentation
[poolifier.git] / src / pools / utils.ts
CommitLineData
bde6b5d7 1import { existsSync } from 'node:fs'
c3719753
JB
2import cluster from 'node:cluster'
3import { SHARE_ENV, Worker, type WorkerOptions } from 'node:worker_threads'
bfc75cca 4import { average, isPlainObject, max, median, min } from '../utils'
bde6b5d7 5import {
bfc75cca 6 type MeasurementStatisticsRequirements,
bde6b5d7
JB
7 WorkerChoiceStrategies,
8 type WorkerChoiceStrategy
9} from './selection-strategies/selection-strategies-types'
10import type { TasksQueueOptions } from './pool'
c3719753
JB
11import {
12 type IWorker,
d41a44de 13 type IWorkerNode,
c3719753
JB
14 type MeasurementStatistics,
15 type WorkerNodeOptions,
16 type WorkerType,
17 WorkerTypes
18} from './worker'
bde6b5d7
JB
19
20export const checkFilePath = (filePath: string): void => {
c3719753
JB
21 if (filePath == null) {
22 throw new TypeError('The worker file path must be specified')
23 }
24 if (typeof filePath !== 'string') {
25 throw new TypeError('The worker file path must be a string')
26 }
bde6b5d7
JB
27 if (!existsSync(filePath)) {
28 throw new Error(`Cannot find the worker file '${filePath}'`)
29 }
30}
31
32export const checkDynamicPoolSize = (min: number, max: number): void => {
33 if (max == null) {
34 throw new TypeError(
35 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
36 )
37 } else if (!Number.isSafeInteger(max)) {
38 throw new TypeError(
39 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
40 )
41 } else if (min > max) {
42 throw new RangeError(
43 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
44 )
45 } else if (max === 0) {
46 throw new RangeError(
47 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
48 )
49 } else if (min === max) {
50 throw new RangeError(
51 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
52 )
53 }
54}
55
56export const checkValidWorkerChoiceStrategy = (
57 workerChoiceStrategy: WorkerChoiceStrategy
58): void => {
59 if (
60 workerChoiceStrategy != null &&
61 !Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)
62 ) {
63 throw new Error(`Invalid worker choice strategy '${workerChoiceStrategy}'`)
64 }
65}
66
67export const checkValidTasksQueueOptions = (
68 tasksQueueOptions: TasksQueueOptions
69): void => {
70 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
71 throw new TypeError('Invalid tasks queue options: must be a plain object')
72 }
73 if (
74 tasksQueueOptions?.concurrency != null &&
75 !Number.isSafeInteger(tasksQueueOptions.concurrency)
76 ) {
77 throw new TypeError(
78 'Invalid worker node tasks concurrency: must be an integer'
79 )
80 }
81 if (
82 tasksQueueOptions?.concurrency != null &&
83 tasksQueueOptions.concurrency <= 0
84 ) {
85 throw new RangeError(
86 `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero`
87 )
88 }
89 if (
90 tasksQueueOptions?.size != null &&
91 !Number.isSafeInteger(tasksQueueOptions.size)
92 ) {
93 throw new TypeError(
94 'Invalid worker node tasks queue size: must be an integer'
95 )
96 }
97 if (tasksQueueOptions?.size != null && tasksQueueOptions.size <= 0) {
98 throw new RangeError(
99 `Invalid worker node tasks queue size: ${tasksQueueOptions.size} is a negative integer or zero`
100 )
101 }
102}
bfc75cca 103
c3719753
JB
104export const checkWorkerNodeArguments = (
105 type: WorkerType,
106 filePath: string,
107 opts: WorkerNodeOptions
9a38f99e 108): void => {
c3719753
JB
109 if (type == null) {
110 throw new TypeError('Cannot construct a worker node without a worker type')
111 }
112 if (!Object.values(WorkerTypes).includes(type)) {
113 throw new TypeError(
114 `Cannot construct a worker node with an invalid worker type '${type}'`
115 )
9a38f99e 116 }
c3719753
JB
117 checkFilePath(filePath)
118 if (opts == null) {
9a38f99e 119 throw new TypeError(
c3719753 120 'Cannot construct a worker node without worker node options'
9a38f99e
JB
121 )
122 }
9974369e 123 if (!isPlainObject(opts)) {
9a38f99e 124 throw new TypeError(
c3719753 125 'Cannot construct a worker node with invalid options: must be a plain object'
9a38f99e
JB
126 )
127 }
c3719753
JB
128 if (opts.tasksQueueBackPressureSize == null) {
129 throw new TypeError(
130 'Cannot construct a worker node without a tasks queue back pressure size option'
131 )
132 }
133 if (!Number.isSafeInteger(opts.tasksQueueBackPressureSize)) {
134 throw new TypeError(
135 'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer'
136 )
137 }
138 if (opts.tasksQueueBackPressureSize <= 0) {
9a38f99e 139 throw new RangeError(
c3719753 140 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer'
9a38f99e
JB
141 )
142 }
143}
bfc75cca
JB
144
145/**
146 * Updates the given measurement statistics.
147 *
148 * @param measurementStatistics - The measurement statistics to update.
149 * @param measurementRequirements - The measurement statistics requirements.
150 * @param measurementValue - The measurement value.
151 * @param numberOfMeasurements - The number of measurements.
152 * @internal
153 */
154export const updateMeasurementStatistics = (
155 measurementStatistics: MeasurementStatistics,
156 measurementRequirements: MeasurementStatisticsRequirements,
157 measurementValue: number
158): void => {
159 if (measurementRequirements.aggregate) {
160 measurementStatistics.aggregate =
161 (measurementStatistics.aggregate ?? 0) + measurementValue
162 measurementStatistics.minimum = min(
163 measurementValue,
164 measurementStatistics.minimum ?? Infinity
165 )
166 measurementStatistics.maximum = max(
167 measurementValue,
168 measurementStatistics.maximum ?? -Infinity
169 )
170 if (
171 (measurementRequirements.average || measurementRequirements.median) &&
172 measurementValue != null
173 ) {
174 measurementStatistics.history.push(measurementValue)
175 if (measurementRequirements.average) {
176 measurementStatistics.average = average(measurementStatistics.history)
177 } else if (measurementStatistics.average != null) {
178 delete measurementStatistics.average
179 }
180 if (measurementRequirements.median) {
181 measurementStatistics.median = median(measurementStatistics.history)
182 } else if (measurementStatistics.median != null) {
183 delete measurementStatistics.median
184 }
185 }
186 }
187}
c3719753
JB
188
189export const createWorker = <Worker extends IWorker>(
190 type: WorkerType,
191 filePath: string,
192 opts: { env?: Record<string, unknown>, workerOptions?: WorkerOptions }
193): Worker => {
194 switch (type) {
195 case WorkerTypes.thread:
196 return new Worker(filePath, {
197 env: SHARE_ENV,
198 ...opts?.workerOptions
199 }) as unknown as Worker
200 case WorkerTypes.cluster:
201 return cluster.fork(opts?.env) as unknown as Worker
202 default:
203 // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
204 throw new Error(`Unknown worker type '${type}'`)
205 }
206}
d41a44de
JB
207
208export const waitWorkerNodeEvents = async <
209 Worker extends IWorker,
210 Data = unknown
211>(
212 workerNode: IWorkerNode<Worker, Data>,
213 workerNodeEvent: string,
214 numberOfEventsToWait: number
215): Promise<number> => {
216 return await new Promise<number>(resolve => {
217 let events = 0
218 if (numberOfEventsToWait === 0) {
219 resolve(events)
220 return
221 }
222 workerNode.on(workerNodeEvent, () => {
223 ++events
224 if (events === numberOfEventsToWait) {
225 resolve(events)
226 }
227 })
228 })
229}