Merge dependabot/npm_and_yarn/examples/typescript/http-server-pool/fastify-cluster...
[poolifier.git] / src / pools / utils.ts
1 import { existsSync } from 'node:fs'
2 import cluster from 'node:cluster'
3 import { SHARE_ENV, Worker, type WorkerOptions } from 'node:worker_threads'
4 import { env } from 'node:process'
5 import { average, isPlainObject, max, median, min } from '../utils'
6 import type { MessageValue, Task } from '../utility-types'
7 import {
8 type MeasurementStatisticsRequirements,
9 WorkerChoiceStrategies,
10 type WorkerChoiceStrategy
11 } from './selection-strategies/selection-strategies-types'
12 import type { TasksQueueOptions } from './pool'
13 import {
14 type IWorker,
15 type IWorkerNode,
16 type MeasurementStatistics,
17 type WorkerNodeOptions,
18 type WorkerType,
19 WorkerTypes,
20 type WorkerUsage
21 } from './worker'
22 import type { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
23
24 export const getDefaultTasksQueueOptions = (
25 poolMaxSize: number
26 ): Required<TasksQueueOptions> => {
27 return {
28 size: Math.pow(poolMaxSize, 2),
29 concurrency: 1,
30 taskStealing: true,
31 tasksStealingOnBackPressure: true,
32 tasksFinishedTimeout: 2000
33 }
34 }
35
36 export const checkFilePath = (filePath: string): void => {
37 if (filePath == null) {
38 throw new TypeError('The worker file path must be specified')
39 }
40 if (typeof filePath !== 'string') {
41 throw new TypeError('The worker file path must be a string')
42 }
43 if (!existsSync(filePath)) {
44 throw new Error(`Cannot find the worker file '${filePath}'`)
45 }
46 }
47
48 export const checkDynamicPoolSize = (min: number, max: number): void => {
49 if (max == null) {
50 throw new TypeError(
51 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
52 )
53 } else if (!Number.isSafeInteger(max)) {
54 throw new TypeError(
55 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
56 )
57 } else if (min > max) {
58 throw new RangeError(
59 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
60 )
61 } else if (max === 0) {
62 throw new RangeError(
63 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
64 )
65 } else if (min === max) {
66 throw new RangeError(
67 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
68 )
69 }
70 }
71
72 export const checkValidWorkerChoiceStrategy = (
73 workerChoiceStrategy: WorkerChoiceStrategy
74 ): void => {
75 if (
76 workerChoiceStrategy != null &&
77 !Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)
78 ) {
79 throw new Error(`Invalid worker choice strategy '${workerChoiceStrategy}'`)
80 }
81 }
82
83 export const checkValidTasksQueueOptions = (
84 tasksQueueOptions: TasksQueueOptions
85 ): void => {
86 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
87 throw new TypeError('Invalid tasks queue options: must be a plain object')
88 }
89 if (
90 tasksQueueOptions?.concurrency != null &&
91 !Number.isSafeInteger(tasksQueueOptions.concurrency)
92 ) {
93 throw new TypeError(
94 'Invalid worker node tasks concurrency: must be an integer'
95 )
96 }
97 if (
98 tasksQueueOptions?.concurrency != null &&
99 tasksQueueOptions.concurrency <= 0
100 ) {
101 throw new RangeError(
102 `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero`
103 )
104 }
105 if (
106 tasksQueueOptions?.size != null &&
107 !Number.isSafeInteger(tasksQueueOptions.size)
108 ) {
109 throw new TypeError(
110 'Invalid worker node tasks queue size: must be an integer'
111 )
112 }
113 if (tasksQueueOptions?.size != null && tasksQueueOptions.size <= 0) {
114 throw new RangeError(
115 `Invalid worker node tasks queue size: ${tasksQueueOptions.size} is a negative integer or zero`
116 )
117 }
118 }
119
120 export const checkWorkerNodeArguments = (
121 type: WorkerType,
122 filePath: string,
123 opts: WorkerNodeOptions
124 ): void => {
125 if (type == null) {
126 throw new TypeError('Cannot construct a worker node without a worker type')
127 }
128 if (!Object.values(WorkerTypes).includes(type)) {
129 throw new TypeError(
130 `Cannot construct a worker node with an invalid worker type '${type}'`
131 )
132 }
133 checkFilePath(filePath)
134 if (opts == null) {
135 throw new TypeError(
136 'Cannot construct a worker node without worker node options'
137 )
138 }
139 if (!isPlainObject(opts)) {
140 throw new TypeError(
141 'Cannot construct a worker node with invalid options: must be a plain object'
142 )
143 }
144 if (opts.tasksQueueBackPressureSize == null) {
145 throw new TypeError(
146 'Cannot construct a worker node without a tasks queue back pressure size option'
147 )
148 }
149 if (!Number.isSafeInteger(opts.tasksQueueBackPressureSize)) {
150 throw new TypeError(
151 'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer'
152 )
153 }
154 if (opts.tasksQueueBackPressureSize <= 0) {
155 throw new RangeError(
156 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer'
157 )
158 }
159 }
160
161 /**
162 * Updates the given measurement statistics.
163 *
164 * @param measurementStatistics - The measurement statistics to update.
165 * @param measurementRequirements - The measurement statistics requirements.
166 * @param measurementValue - The measurement value.
167 * @internal
168 */
169 const updateMeasurementStatistics = (
170 measurementStatistics: MeasurementStatistics,
171 measurementRequirements: MeasurementStatisticsRequirements,
172 measurementValue: number
173 ): void => {
174 if (measurementRequirements.aggregate) {
175 measurementStatistics.aggregate =
176 (measurementStatistics.aggregate ?? 0) + measurementValue
177 measurementStatistics.minimum = min(
178 measurementValue,
179 measurementStatistics.minimum ?? Infinity
180 )
181 measurementStatistics.maximum = max(
182 measurementValue,
183 measurementStatistics.maximum ?? -Infinity
184 )
185 if (
186 (measurementRequirements.average || measurementRequirements.median) &&
187 measurementValue != null
188 ) {
189 measurementStatistics.history.push(measurementValue)
190 if (measurementRequirements.average) {
191 measurementStatistics.average = average(measurementStatistics.history)
192 } else if (measurementStatistics.average != null) {
193 delete measurementStatistics.average
194 }
195 if (measurementRequirements.median) {
196 measurementStatistics.median = median(measurementStatistics.history)
197 } else if (measurementStatistics.median != null) {
198 delete measurementStatistics.median
199 }
200 }
201 }
202 }
203 if (env.NODE_ENV === 'test') {
204 // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
205 exports.updateMeasurementStatistics = updateMeasurementStatistics
206 }
207
208 export const updateWaitTimeWorkerUsage = <
209 Worker extends IWorker,
210 Data = unknown,
211 Response = unknown
212 >(
213 workerChoiceStrategyContext: WorkerChoiceStrategyContext<
214 Worker,
215 Data,
216 Response
217 >,
218 workerUsage: WorkerUsage,
219 task: Task<Data>
220 ): void => {
221 const timestamp = performance.now()
222 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
223 updateMeasurementStatistics(
224 workerUsage.waitTime,
225 workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
226 taskWaitTime
227 )
228 }
229
230 export const updateTaskStatisticsWorkerUsage = <Response = unknown>(
231 workerUsage: WorkerUsage,
232 message: MessageValue<Response>
233 ): void => {
234 const workerTaskStatistics = workerUsage.tasks
235 if (
236 workerTaskStatistics.executing != null &&
237 workerTaskStatistics.executing > 0
238 ) {
239 --workerTaskStatistics.executing
240 }
241 if (message.workerError == null) {
242 ++workerTaskStatistics.executed
243 } else {
244 ++workerTaskStatistics.failed
245 }
246 }
247
248 export const updateRunTimeWorkerUsage = <
249 Worker extends IWorker,
250 Data = unknown,
251 Response = unknown
252 >(
253 workerChoiceStrategyContext: WorkerChoiceStrategyContext<
254 Worker,
255 Data,
256 Response
257 >,
258 workerUsage: WorkerUsage,
259 message: MessageValue<Response>
260 ): void => {
261 if (message.workerError != null) {
262 return
263 }
264 updateMeasurementStatistics(
265 workerUsage.runTime,
266 workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
267 message.taskPerformance?.runTime ?? 0
268 )
269 }
270
271 export const updateEluWorkerUsage = <
272 Worker extends IWorker,
273 Data = unknown,
274 Response = unknown
275 >(
276 workerChoiceStrategyContext: WorkerChoiceStrategyContext<
277 Worker,
278 Data,
279 Response
280 >,
281 workerUsage: WorkerUsage,
282 message: MessageValue<Response>
283 ): void => {
284 if (message.workerError != null) {
285 return
286 }
287 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
288 workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
289 updateMeasurementStatistics(
290 workerUsage.elu.active,
291 eluTaskStatisticsRequirements,
292 message.taskPerformance?.elu?.active ?? 0
293 )
294 updateMeasurementStatistics(
295 workerUsage.elu.idle,
296 eluTaskStatisticsRequirements,
297 message.taskPerformance?.elu?.idle ?? 0
298 )
299 if (eluTaskStatisticsRequirements.aggregate) {
300 if (message.taskPerformance?.elu != null) {
301 if (workerUsage.elu.utilization != null) {
302 workerUsage.elu.utilization =
303 (workerUsage.elu.utilization +
304 message.taskPerformance.elu.utilization) /
305 2
306 } else {
307 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
308 }
309 }
310 }
311 }
312
313 export const createWorker = <Worker extends IWorker>(
314 type: WorkerType,
315 filePath: string,
316 opts: { env?: Record<string, unknown>, workerOptions?: WorkerOptions }
317 ): Worker => {
318 switch (type) {
319 case WorkerTypes.thread:
320 return new Worker(filePath, {
321 env: SHARE_ENV,
322 ...opts?.workerOptions
323 }) as unknown as Worker
324 case WorkerTypes.cluster:
325 return cluster.fork(opts?.env) as unknown as Worker
326 default:
327 // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
328 throw new Error(`Unknown worker type '${type}'`)
329 }
330 }
331
332 export const waitWorkerNodeEvents = async <
333 Worker extends IWorker,
334 Data = unknown
335 >(
336 workerNode: IWorkerNode<Worker, Data>,
337 workerNodeEvent: string,
338 numberOfEventsToWait: number,
339 timeout: number
340 ): Promise<number> => {
341 return await new Promise<number>(resolve => {
342 let events = 0
343 if (numberOfEventsToWait === 0) {
344 resolve(events)
345 return
346 }
347 workerNode.on(workerNodeEvent, () => {
348 ++events
349 if (events === numberOfEventsToWait) {
350 resolve(events)
351 }
352 })
353 if (timeout >= 0) {
354 setTimeout(() => {
355 resolve(events)
356 }, timeout)
357 }
358 })
359 }