Merge branch 'master' into combined-prs-branch
[poolifier.git] / src / pools / utils.ts
1 import { existsSync } from 'node:fs'
2 import cluster from 'node:cluster'
3 import { SHARE_ENV, Worker, type WorkerOptions } from 'node:worker_threads'
4 import { env } from 'node:process'
5 import { average, isPlainObject, max, median, min } from '../utils.js'
6 import type { MessageValue, Task } from '../utility-types.js'
7 import {
8 type MeasurementStatisticsRequirements,
9 WorkerChoiceStrategies,
10 type WorkerChoiceStrategy
11 } from './selection-strategies/selection-strategies-types.js'
12 import type { TasksQueueOptions } from './pool.js'
13 import {
14 type IWorker,
15 type IWorkerNode,
16 type MeasurementStatistics,
17 type WorkerNodeOptions,
18 type WorkerType,
19 WorkerTypes,
20 type WorkerUsage
21 } from './worker.js'
22 import type { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context.js'
23
24 export const getDefaultTasksQueueOptions = (
25 poolMaxSize: number
26 ): Required<TasksQueueOptions> => {
27 return {
28 size: Math.pow(poolMaxSize, 2),
29 concurrency: 1,
30 taskStealing: true,
31 tasksStealingOnBackPressure: true,
32 tasksFinishedTimeout: 2000
33 }
34 }
35
36 export const checkFilePath = (filePath: string | undefined): void => {
37 if (filePath == null) {
38 throw new TypeError('The worker file path must be specified')
39 }
40 if (typeof filePath !== 'string') {
41 throw new TypeError('The worker file path must be a string')
42 }
43 if (!existsSync(filePath)) {
44 throw new Error(`Cannot find the worker file '${filePath}'`)
45 }
46 }
47
48 export const checkDynamicPoolSize = (
49 min: number,
50 max: number | undefined
51 ): void => {
52 if (max == null) {
53 throw new TypeError(
54 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
55 )
56 } else if (!Number.isSafeInteger(max)) {
57 throw new TypeError(
58 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
59 )
60 } else if (min > max) {
61 throw new RangeError(
62 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
63 )
64 } else if (max === 0) {
65 throw new RangeError(
66 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
67 )
68 } else if (min === max) {
69 throw new RangeError(
70 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
71 )
72 }
73 }
74
75 export const checkValidWorkerChoiceStrategy = (
76 workerChoiceStrategy: WorkerChoiceStrategy | undefined
77 ): void => {
78 if (
79 workerChoiceStrategy != null &&
80 !Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)
81 ) {
82 throw new Error(`Invalid worker choice strategy '${workerChoiceStrategy}'`)
83 }
84 }
85
86 export const checkValidTasksQueueOptions = (
87 tasksQueueOptions: TasksQueueOptions | undefined
88 ): void => {
89 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
90 throw new TypeError('Invalid tasks queue options: must be a plain object')
91 }
92 if (
93 tasksQueueOptions?.concurrency != null &&
94 !Number.isSafeInteger(tasksQueueOptions.concurrency)
95 ) {
96 throw new TypeError(
97 'Invalid worker node tasks concurrency: must be an integer'
98 )
99 }
100 if (
101 tasksQueueOptions?.concurrency != null &&
102 tasksQueueOptions.concurrency <= 0
103 ) {
104 throw new RangeError(
105 `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero`
106 )
107 }
108 if (
109 tasksQueueOptions?.size != null &&
110 !Number.isSafeInteger(tasksQueueOptions.size)
111 ) {
112 throw new TypeError(
113 'Invalid worker node tasks queue size: must be an integer'
114 )
115 }
116 if (tasksQueueOptions?.size != null && tasksQueueOptions.size <= 0) {
117 throw new RangeError(
118 `Invalid worker node tasks queue size: ${tasksQueueOptions.size} is a negative integer or zero`
119 )
120 }
121 }
122
123 export const checkWorkerNodeArguments = (
124 type: WorkerType | undefined,
125 filePath: string | undefined,
126 opts: WorkerNodeOptions | undefined
127 ): void => {
128 if (type == null) {
129 throw new TypeError('Cannot construct a worker node without a worker type')
130 }
131 if (!Object.values(WorkerTypes).includes(type)) {
132 throw new TypeError(
133 `Cannot construct a worker node with an invalid worker type '${type}'`
134 )
135 }
136 checkFilePath(filePath)
137 if (opts == null) {
138 throw new TypeError(
139 'Cannot construct a worker node without worker node options'
140 )
141 }
142 if (!isPlainObject(opts)) {
143 throw new TypeError(
144 'Cannot construct a worker node with invalid options: must be a plain object'
145 )
146 }
147 if (opts.tasksQueueBackPressureSize == null) {
148 throw new TypeError(
149 'Cannot construct a worker node without a tasks queue back pressure size option'
150 )
151 }
152 if (!Number.isSafeInteger(opts.tasksQueueBackPressureSize)) {
153 throw new TypeError(
154 'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer'
155 )
156 }
157 if (opts.tasksQueueBackPressureSize <= 0) {
158 throw new RangeError(
159 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer'
160 )
161 }
162 }
163
164 /**
165 * Updates the given measurement statistics.
166 *
167 * @param measurementStatistics - The measurement statistics to update.
168 * @param measurementRequirements - The measurement statistics requirements.
169 * @param measurementValue - The measurement value.
170 * @internal
171 */
172 const updateMeasurementStatistics = (
173 measurementStatistics: MeasurementStatistics,
174 measurementRequirements: MeasurementStatisticsRequirements | undefined,
175 measurementValue: number | undefined
176 ): void => {
177 if (
178 measurementRequirements != null &&
179 measurementValue != null &&
180 measurementRequirements.aggregate
181 ) {
182 measurementStatistics.aggregate =
183 (measurementStatistics.aggregate ?? 0) + measurementValue
184 measurementStatistics.minimum = min(
185 measurementValue,
186 measurementStatistics.minimum ?? Infinity
187 )
188 measurementStatistics.maximum = max(
189 measurementValue,
190 measurementStatistics.maximum ?? -Infinity
191 )
192 if (measurementRequirements.average || measurementRequirements.median) {
193 measurementStatistics.history.push(measurementValue)
194 if (measurementRequirements.average) {
195 measurementStatistics.average = average(measurementStatistics.history)
196 } else if (measurementStatistics.average != null) {
197 delete measurementStatistics.average
198 }
199 if (measurementRequirements.median) {
200 measurementStatistics.median = median(measurementStatistics.history)
201 } else if (measurementStatistics.median != null) {
202 delete measurementStatistics.median
203 }
204 }
205 }
206 }
207 if (env.NODE_ENV === 'test') {
208 // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
209 exports.updateMeasurementStatistics = updateMeasurementStatistics
210 }
211
212 export const updateWaitTimeWorkerUsage = <
213 Worker extends IWorker,
214 Data = unknown,
215 Response = unknown
216 >(
217 workerChoiceStrategyContext:
218 | WorkerChoiceStrategyContext<Worker, Data, Response>
219 | undefined,
220 workerUsage: WorkerUsage,
221 task: Task<Data>
222 ): void => {
223 const timestamp = performance.now()
224 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
225 updateMeasurementStatistics(
226 workerUsage.waitTime,
227 workerChoiceStrategyContext?.getTaskStatisticsRequirements().waitTime,
228 taskWaitTime
229 )
230 }
231
232 export const updateTaskStatisticsWorkerUsage = <Response = unknown>(
233 workerUsage: WorkerUsage,
234 message: MessageValue<Response>
235 ): void => {
236 const workerTaskStatistics = workerUsage.tasks
237 if (
238 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
239 workerTaskStatistics.executing != null &&
240 workerTaskStatistics.executing > 0
241 ) {
242 --workerTaskStatistics.executing
243 }
244 if (message.workerError == null) {
245 ++workerTaskStatistics.executed
246 } else {
247 ++workerTaskStatistics.failed
248 }
249 }
250
251 export const updateRunTimeWorkerUsage = <
252 Worker extends IWorker,
253 Data = unknown,
254 Response = unknown
255 >(
256 workerChoiceStrategyContext:
257 | WorkerChoiceStrategyContext<Worker, Data, Response>
258 | undefined,
259 workerUsage: WorkerUsage,
260 message: MessageValue<Response>
261 ): void => {
262 if (message.workerError != null) {
263 return
264 }
265 updateMeasurementStatistics(
266 workerUsage.runTime,
267 workerChoiceStrategyContext?.getTaskStatisticsRequirements().runTime,
268 message.taskPerformance?.runTime ?? 0
269 )
270 }
271
272 export const updateEluWorkerUsage = <
273 Worker extends IWorker,
274 Data = unknown,
275 Response = unknown
276 >(
277 workerChoiceStrategyContext:
278 | WorkerChoiceStrategyContext<Worker, Data, Response>
279 | undefined,
280 workerUsage: WorkerUsage,
281 message: MessageValue<Response>
282 ): void => {
283 if (message.workerError != null) {
284 return
285 }
286 const eluTaskStatisticsRequirements =
287 workerChoiceStrategyContext?.getTaskStatisticsRequirements().elu
288 updateMeasurementStatistics(
289 workerUsage.elu.active,
290 eluTaskStatisticsRequirements,
291 message.taskPerformance?.elu?.active ?? 0
292 )
293 updateMeasurementStatistics(
294 workerUsage.elu.idle,
295 eluTaskStatisticsRequirements,
296 message.taskPerformance?.elu?.idle ?? 0
297 )
298 if (eluTaskStatisticsRequirements?.aggregate === true) {
299 if (message.taskPerformance?.elu != null) {
300 if (workerUsage.elu.utilization != null) {
301 workerUsage.elu.utilization =
302 (workerUsage.elu.utilization +
303 message.taskPerformance.elu.utilization) /
304 2
305 } else {
306 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
307 }
308 }
309 }
310 }
311
312 export const createWorker = <Worker extends IWorker>(
313 type: WorkerType,
314 filePath: string,
315 opts: { env?: Record<string, unknown>, workerOptions?: WorkerOptions }
316 ): Worker => {
317 switch (type) {
318 case WorkerTypes.thread:
319 return new Worker(filePath, {
320 env: SHARE_ENV,
321 ...opts.workerOptions
322 }) as unknown as Worker
323 case WorkerTypes.cluster:
324 return cluster.fork(opts.env) as unknown as Worker
325 default:
326 // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
327 throw new Error(`Unknown worker type '${type}'`)
328 }
329 }
330
331 export const waitWorkerNodeEvents = async <
332 Worker extends IWorker,
333 Data = unknown
334 >(
335 workerNode: IWorkerNode<Worker, Data>,
336 workerNodeEvent: string,
337 numberOfEventsToWait: number,
338 timeout: number
339 ): Promise<number> => {
340 return await new Promise<number>(resolve => {
341 let events = 0
342 if (numberOfEventsToWait === 0) {
343 resolve(events)
344 return
345 }
346 workerNode.on(workerNodeEvent, () => {
347 ++events
348 if (events === numberOfEventsToWait) {
349 resolve(events)
350 }
351 })
352 if (timeout >= 0) {
353 setTimeout(() => {
354 resolve(events)
355 }, timeout)
356 }
357 })
358 }