Merge branch 'master' of github.com:poolifier/poolifier
[poolifier.git] / src / pools / utils.ts
CommitLineData
bde6b5d7 1import { existsSync } from 'node:fs'
c3719753
JB
2import cluster from 'node:cluster'
3import { SHARE_ENV, Worker, type WorkerOptions } from 'node:worker_threads'
c329fd41 4import { env } from 'node:process'
bfc75cca 5import { average, isPlainObject, max, median, min } from '../utils'
c329fd41 6import type { MessageValue, Task } from '../utility-types'
bde6b5d7 7import {
bfc75cca 8 type MeasurementStatisticsRequirements,
bde6b5d7
JB
9 WorkerChoiceStrategies,
10 type WorkerChoiceStrategy
11} from './selection-strategies/selection-strategies-types'
12import type { TasksQueueOptions } from './pool'
c3719753
JB
13import {
14 type IWorker,
d41a44de 15 type IWorkerNode,
c3719753
JB
16 type MeasurementStatistics,
17 type WorkerNodeOptions,
18 type WorkerType,
c329fd41
JB
19 WorkerTypes,
20 type WorkerUsage
c3719753 21} from './worker'
c329fd41 22import type { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
bde6b5d7 23
32b141fd
JB
24export const getDefaultTasksQueueOptions = (
25 poolMaxSize: number
26): Required<TasksQueueOptions> => {
27 return {
28 size: Math.pow(poolMaxSize, 2),
29 concurrency: 1,
30 taskStealing: true,
31 tasksStealingOnBackPressure: true,
568d0075 32 tasksFinishedTimeout: 2000
32b141fd
JB
33 }
34}
35
bde6b5d7 36export const checkFilePath = (filePath: string): void => {
c3719753
JB
37 if (filePath == null) {
38 throw new TypeError('The worker file path must be specified')
39 }
40 if (typeof filePath !== 'string') {
41 throw new TypeError('The worker file path must be a string')
42 }
bde6b5d7
JB
43 if (!existsSync(filePath)) {
44 throw new Error(`Cannot find the worker file '${filePath}'`)
45 }
46}
47
48export const checkDynamicPoolSize = (min: number, max: number): void => {
49 if (max == null) {
50 throw new TypeError(
51 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
52 )
53 } else if (!Number.isSafeInteger(max)) {
54 throw new TypeError(
55 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
56 )
57 } else if (min > max) {
58 throw new RangeError(
59 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
60 )
61 } else if (max === 0) {
62 throw new RangeError(
63 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
64 )
65 } else if (min === max) {
66 throw new RangeError(
67 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
68 )
69 }
70}
71
72export const checkValidWorkerChoiceStrategy = (
73 workerChoiceStrategy: WorkerChoiceStrategy
74): void => {
75 if (
76 workerChoiceStrategy != null &&
77 !Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)
78 ) {
79 throw new Error(`Invalid worker choice strategy '${workerChoiceStrategy}'`)
80 }
81}
82
83export const checkValidTasksQueueOptions = (
84 tasksQueueOptions: TasksQueueOptions
85): void => {
86 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
87 throw new TypeError('Invalid tasks queue options: must be a plain object')
88 }
89 if (
90 tasksQueueOptions?.concurrency != null &&
91 !Number.isSafeInteger(tasksQueueOptions.concurrency)
92 ) {
93 throw new TypeError(
94 'Invalid worker node tasks concurrency: must be an integer'
95 )
96 }
97 if (
98 tasksQueueOptions?.concurrency != null &&
99 tasksQueueOptions.concurrency <= 0
100 ) {
101 throw new RangeError(
102 `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero`
103 )
104 }
105 if (
106 tasksQueueOptions?.size != null &&
107 !Number.isSafeInteger(tasksQueueOptions.size)
108 ) {
109 throw new TypeError(
110 'Invalid worker node tasks queue size: must be an integer'
111 )
112 }
113 if (tasksQueueOptions?.size != null && tasksQueueOptions.size <= 0) {
114 throw new RangeError(
115 `Invalid worker node tasks queue size: ${tasksQueueOptions.size} is a negative integer or zero`
116 )
117 }
118}
bfc75cca 119
c3719753
JB
120export const checkWorkerNodeArguments = (
121 type: WorkerType,
122 filePath: string,
123 opts: WorkerNodeOptions
9a38f99e 124): void => {
c3719753
JB
125 if (type == null) {
126 throw new TypeError('Cannot construct a worker node without a worker type')
127 }
128 if (!Object.values(WorkerTypes).includes(type)) {
129 throw new TypeError(
130 `Cannot construct a worker node with an invalid worker type '${type}'`
131 )
9a38f99e 132 }
c3719753
JB
133 checkFilePath(filePath)
134 if (opts == null) {
9a38f99e 135 throw new TypeError(
c3719753 136 'Cannot construct a worker node without worker node options'
9a38f99e
JB
137 )
138 }
9974369e 139 if (!isPlainObject(opts)) {
9a38f99e 140 throw new TypeError(
c3719753 141 'Cannot construct a worker node with invalid options: must be a plain object'
9a38f99e
JB
142 )
143 }
c3719753
JB
144 if (opts.tasksQueueBackPressureSize == null) {
145 throw new TypeError(
146 'Cannot construct a worker node without a tasks queue back pressure size option'
147 )
148 }
149 if (!Number.isSafeInteger(opts.tasksQueueBackPressureSize)) {
150 throw new TypeError(
151 'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer'
152 )
153 }
154 if (opts.tasksQueueBackPressureSize <= 0) {
9a38f99e 155 throw new RangeError(
c3719753 156 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer'
9a38f99e
JB
157 )
158 }
159}
bfc75cca
JB
160
161/**
162 * Updates the given measurement statistics.
163 *
164 * @param measurementStatistics - The measurement statistics to update.
165 * @param measurementRequirements - The measurement statistics requirements.
166 * @param measurementValue - The measurement value.
bfc75cca
JB
167 * @internal
168 */
c329fd41 169const updateMeasurementStatistics = (
bfc75cca
JB
170 measurementStatistics: MeasurementStatistics,
171 measurementRequirements: MeasurementStatisticsRequirements,
172 measurementValue: number
173): void => {
174 if (measurementRequirements.aggregate) {
175 measurementStatistics.aggregate =
176 (measurementStatistics.aggregate ?? 0) + measurementValue
177 measurementStatistics.minimum = min(
178 measurementValue,
179 measurementStatistics.minimum ?? Infinity
180 )
181 measurementStatistics.maximum = max(
182 measurementValue,
183 measurementStatistics.maximum ?? -Infinity
184 )
185 if (
186 (measurementRequirements.average || measurementRequirements.median) &&
187 measurementValue != null
188 ) {
189 measurementStatistics.history.push(measurementValue)
190 if (measurementRequirements.average) {
191 measurementStatistics.average = average(measurementStatistics.history)
192 } else if (measurementStatistics.average != null) {
193 delete measurementStatistics.average
194 }
195 if (measurementRequirements.median) {
196 measurementStatistics.median = median(measurementStatistics.history)
197 } else if (measurementStatistics.median != null) {
198 delete measurementStatistics.median
199 }
200 }
201 }
202}
c329fd41
JB
203if (env.NODE_ENV === 'test') {
204 // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
205 exports.updateMeasurementStatistics = updateMeasurementStatistics
206}
207
208export const updateWaitTimeWorkerUsage = <
209 Worker extends IWorker,
210 Data = unknown,
211 Response = unknown
212>(
213 workerChoiceStrategyContext: WorkerChoiceStrategyContext<
214 Worker,
215 Data,
216 Response
217 >,
218 workerUsage: WorkerUsage,
219 task: Task<Data>
220 ): void => {
221 const timestamp = performance.now()
222 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
223 updateMeasurementStatistics(
224 workerUsage.waitTime,
225 workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
226 taskWaitTime
227 )
228}
229
230export const updateTaskStatisticsWorkerUsage = <Response = unknown>(
231 workerUsage: WorkerUsage,
232 message: MessageValue<Response>
233): void => {
234 const workerTaskStatistics = workerUsage.tasks
235 if (
236 workerTaskStatistics.executing != null &&
237 workerTaskStatistics.executing > 0
238 ) {
239 --workerTaskStatistics.executing
240 }
241 if (message.workerError == null) {
242 ++workerTaskStatistics.executed
243 } else {
244 ++workerTaskStatistics.failed
245 }
246}
247
248export const updateRunTimeWorkerUsage = <
249 Worker extends IWorker,
250 Data = unknown,
251 Response = unknown
252>(
253 workerChoiceStrategyContext: WorkerChoiceStrategyContext<
254 Worker,
255 Data,
256 Response
257 >,
258 workerUsage: WorkerUsage,
259 message: MessageValue<Response>
260 ): void => {
261 if (message.workerError != null) {
262 return
263 }
264 updateMeasurementStatistics(
265 workerUsage.runTime,
266 workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
267 message.taskPerformance?.runTime ?? 0
268 )
269}
270
271export const updateEluWorkerUsage = <
272 Worker extends IWorker,
273 Data = unknown,
274 Response = unknown
275>(
276 workerChoiceStrategyContext: WorkerChoiceStrategyContext<
277 Worker,
278 Data,
279 Response
280 >,
281 workerUsage: WorkerUsage,
282 message: MessageValue<Response>
283 ): void => {
284 if (message.workerError != null) {
285 return
286 }
287 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
288 workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
289 updateMeasurementStatistics(
290 workerUsage.elu.active,
291 eluTaskStatisticsRequirements,
292 message.taskPerformance?.elu?.active ?? 0
293 )
294 updateMeasurementStatistics(
295 workerUsage.elu.idle,
296 eluTaskStatisticsRequirements,
297 message.taskPerformance?.elu?.idle ?? 0
298 )
299 if (eluTaskStatisticsRequirements.aggregate) {
300 if (message.taskPerformance?.elu != null) {
301 if (workerUsage.elu.utilization != null) {
302 workerUsage.elu.utilization =
303 (workerUsage.elu.utilization +
304 message.taskPerformance.elu.utilization) /
305 2
306 } else {
307 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
308 }
309 }
310 }
311}
c3719753
JB
312
313export const createWorker = <Worker extends IWorker>(
314 type: WorkerType,
315 filePath: string,
316 opts: { env?: Record<string, unknown>, workerOptions?: WorkerOptions }
317): Worker => {
318 switch (type) {
319 case WorkerTypes.thread:
320 return new Worker(filePath, {
321 env: SHARE_ENV,
322 ...opts?.workerOptions
323 }) as unknown as Worker
324 case WorkerTypes.cluster:
325 return cluster.fork(opts?.env) as unknown as Worker
326 default:
327 // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
328 throw new Error(`Unknown worker type '${type}'`)
329 }
330}
d41a44de
JB
331
332export const waitWorkerNodeEvents = async <
333 Worker extends IWorker,
334 Data = unknown
335>(
336 workerNode: IWorkerNode<Worker, Data>,
337 workerNodeEvent: string,
32b141fd
JB
338 numberOfEventsToWait: number,
339 timeout: number
d41a44de
JB
340): Promise<number> => {
341 return await new Promise<number>(resolve => {
342 let events = 0
343 if (numberOfEventsToWait === 0) {
344 resolve(events)
345 return
346 }
347 workerNode.on(workerNodeEvent, () => {
348 ++events
349 if (events === numberOfEventsToWait) {
350 resolve(events)
351 }
352 })
6f3a391b 353 if (timeout >= 0) {
32b141fd
JB
354 setTimeout(() => {
355 resolve(events)
356 }, timeout)
357 }
d41a44de
JB
358 })
359}