perf: update worker choice strategies internal if needed
[poolifier.git] / src / pools / utils.ts
1 import { existsSync } from 'node:fs'
2 import cluster from 'node:cluster'
3 import { SHARE_ENV, Worker, type WorkerOptions } from 'node:worker_threads'
4 import { env } from 'node:process'
5 import { average, isPlainObject, max, median, min } from '../utils'
6 import type { MessageValue, Task } from '../utility-types'
7 import {
8 type MeasurementStatisticsRequirements,
9 WorkerChoiceStrategies,
10 type WorkerChoiceStrategy
11 } from './selection-strategies/selection-strategies-types'
12 import type { TasksQueueOptions } from './pool'
13 import {
14 type IWorker,
15 type IWorkerNode,
16 type MeasurementStatistics,
17 type WorkerNodeOptions,
18 type WorkerType,
19 WorkerTypes,
20 type WorkerUsage
21 } from './worker'
22 import type { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
23
24 export const checkFilePath = (filePath: string): void => {
25 if (filePath == null) {
26 throw new TypeError('The worker file path must be specified')
27 }
28 if (typeof filePath !== 'string') {
29 throw new TypeError('The worker file path must be a string')
30 }
31 if (!existsSync(filePath)) {
32 throw new Error(`Cannot find the worker file '${filePath}'`)
33 }
34 }
35
36 export const checkDynamicPoolSize = (min: number, max: number): void => {
37 if (max == null) {
38 throw new TypeError(
39 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
40 )
41 } else if (!Number.isSafeInteger(max)) {
42 throw new TypeError(
43 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
44 )
45 } else if (min > max) {
46 throw new RangeError(
47 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
48 )
49 } else if (max === 0) {
50 throw new RangeError(
51 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
52 )
53 } else if (min === max) {
54 throw new RangeError(
55 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
56 )
57 }
58 }
59
60 export const checkValidWorkerChoiceStrategy = (
61 workerChoiceStrategy: WorkerChoiceStrategy
62 ): void => {
63 if (
64 workerChoiceStrategy != null &&
65 !Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)
66 ) {
67 throw new Error(`Invalid worker choice strategy '${workerChoiceStrategy}'`)
68 }
69 }
70
71 export const checkValidTasksQueueOptions = (
72 tasksQueueOptions: TasksQueueOptions
73 ): void => {
74 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
75 throw new TypeError('Invalid tasks queue options: must be a plain object')
76 }
77 if (
78 tasksQueueOptions?.concurrency != null &&
79 !Number.isSafeInteger(tasksQueueOptions.concurrency)
80 ) {
81 throw new TypeError(
82 'Invalid worker node tasks concurrency: must be an integer'
83 )
84 }
85 if (
86 tasksQueueOptions?.concurrency != null &&
87 tasksQueueOptions.concurrency <= 0
88 ) {
89 throw new RangeError(
90 `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero`
91 )
92 }
93 if (
94 tasksQueueOptions?.size != null &&
95 !Number.isSafeInteger(tasksQueueOptions.size)
96 ) {
97 throw new TypeError(
98 'Invalid worker node tasks queue size: must be an integer'
99 )
100 }
101 if (tasksQueueOptions?.size != null && tasksQueueOptions.size <= 0) {
102 throw new RangeError(
103 `Invalid worker node tasks queue size: ${tasksQueueOptions.size} is a negative integer or zero`
104 )
105 }
106 }
107
108 export const checkWorkerNodeArguments = (
109 type: WorkerType,
110 filePath: string,
111 opts: WorkerNodeOptions
112 ): void => {
113 if (type == null) {
114 throw new TypeError('Cannot construct a worker node without a worker type')
115 }
116 if (!Object.values(WorkerTypes).includes(type)) {
117 throw new TypeError(
118 `Cannot construct a worker node with an invalid worker type '${type}'`
119 )
120 }
121 checkFilePath(filePath)
122 if (opts == null) {
123 throw new TypeError(
124 'Cannot construct a worker node without worker node options'
125 )
126 }
127 if (!isPlainObject(opts)) {
128 throw new TypeError(
129 'Cannot construct a worker node with invalid options: must be a plain object'
130 )
131 }
132 if (opts.tasksQueueBackPressureSize == null) {
133 throw new TypeError(
134 'Cannot construct a worker node without a tasks queue back pressure size option'
135 )
136 }
137 if (!Number.isSafeInteger(opts.tasksQueueBackPressureSize)) {
138 throw new TypeError(
139 'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer'
140 )
141 }
142 if (opts.tasksQueueBackPressureSize <= 0) {
143 throw new RangeError(
144 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer'
145 )
146 }
147 }
148
149 /**
150 * Updates the given measurement statistics.
151 *
152 * @param measurementStatistics - The measurement statistics to update.
153 * @param measurementRequirements - The measurement statistics requirements.
154 * @param measurementValue - The measurement value.
155 * @param numberOfMeasurements - The number of measurements.
156 * @internal
157 */
158 const updateMeasurementStatistics = (
159 measurementStatistics: MeasurementStatistics,
160 measurementRequirements: MeasurementStatisticsRequirements,
161 measurementValue: number
162 ): void => {
163 if (measurementRequirements.aggregate) {
164 measurementStatistics.aggregate =
165 (measurementStatistics.aggregate ?? 0) + measurementValue
166 measurementStatistics.minimum = min(
167 measurementValue,
168 measurementStatistics.minimum ?? Infinity
169 )
170 measurementStatistics.maximum = max(
171 measurementValue,
172 measurementStatistics.maximum ?? -Infinity
173 )
174 if (
175 (measurementRequirements.average || measurementRequirements.median) &&
176 measurementValue != null
177 ) {
178 measurementStatistics.history.push(measurementValue)
179 if (measurementRequirements.average) {
180 measurementStatistics.average = average(measurementStatistics.history)
181 } else if (measurementStatistics.average != null) {
182 delete measurementStatistics.average
183 }
184 if (measurementRequirements.median) {
185 measurementStatistics.median = median(measurementStatistics.history)
186 } else if (measurementStatistics.median != null) {
187 delete measurementStatistics.median
188 }
189 }
190 }
191 }
192 if (env.NODE_ENV === 'test') {
193 // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
194 exports.updateMeasurementStatistics = updateMeasurementStatistics
195 }
196
197 export const updateWaitTimeWorkerUsage = <
198 Worker extends IWorker,
199 Data = unknown,
200 Response = unknown
201 >(
202 workerChoiceStrategyContext: WorkerChoiceStrategyContext<
203 Worker,
204 Data,
205 Response
206 >,
207 workerUsage: WorkerUsage,
208 task: Task<Data>
209 ): void => {
210 const timestamp = performance.now()
211 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
212 updateMeasurementStatistics(
213 workerUsage.waitTime,
214 workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
215 taskWaitTime
216 )
217 }
218
219 export const updateTaskStatisticsWorkerUsage = <Response = unknown>(
220 workerUsage: WorkerUsage,
221 message: MessageValue<Response>
222 ): void => {
223 const workerTaskStatistics = workerUsage.tasks
224 if (
225 workerTaskStatistics.executing != null &&
226 workerTaskStatistics.executing > 0
227 ) {
228 --workerTaskStatistics.executing
229 }
230 if (message.workerError == null) {
231 ++workerTaskStatistics.executed
232 } else {
233 ++workerTaskStatistics.failed
234 }
235 }
236
237 export const updateRunTimeWorkerUsage = <
238 Worker extends IWorker,
239 Data = unknown,
240 Response = unknown
241 >(
242 workerChoiceStrategyContext: WorkerChoiceStrategyContext<
243 Worker,
244 Data,
245 Response
246 >,
247 workerUsage: WorkerUsage,
248 message: MessageValue<Response>
249 ): void => {
250 if (message.workerError != null) {
251 return
252 }
253 updateMeasurementStatistics(
254 workerUsage.runTime,
255 workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
256 message.taskPerformance?.runTime ?? 0
257 )
258 }
259
260 export const updateEluWorkerUsage = <
261 Worker extends IWorker,
262 Data = unknown,
263 Response = unknown
264 >(
265 workerChoiceStrategyContext: WorkerChoiceStrategyContext<
266 Worker,
267 Data,
268 Response
269 >,
270 workerUsage: WorkerUsage,
271 message: MessageValue<Response>
272 ): void => {
273 if (message.workerError != null) {
274 return
275 }
276 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
277 workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
278 updateMeasurementStatistics(
279 workerUsage.elu.active,
280 eluTaskStatisticsRequirements,
281 message.taskPerformance?.elu?.active ?? 0
282 )
283 updateMeasurementStatistics(
284 workerUsage.elu.idle,
285 eluTaskStatisticsRequirements,
286 message.taskPerformance?.elu?.idle ?? 0
287 )
288 if (eluTaskStatisticsRequirements.aggregate) {
289 if (message.taskPerformance?.elu != null) {
290 if (workerUsage.elu.utilization != null) {
291 workerUsage.elu.utilization =
292 (workerUsage.elu.utilization +
293 message.taskPerformance.elu.utilization) /
294 2
295 } else {
296 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
297 }
298 }
299 }
300 }
301
302 export const createWorker = <Worker extends IWorker>(
303 type: WorkerType,
304 filePath: string,
305 opts: { env?: Record<string, unknown>, workerOptions?: WorkerOptions }
306 ): Worker => {
307 switch (type) {
308 case WorkerTypes.thread:
309 return new Worker(filePath, {
310 env: SHARE_ENV,
311 ...opts?.workerOptions
312 }) as unknown as Worker
313 case WorkerTypes.cluster:
314 return cluster.fork(opts?.env) as unknown as Worker
315 default:
316 // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
317 throw new Error(`Unknown worker type '${type}'`)
318 }
319 }
320
321 export const waitWorkerNodeEvents = async <
322 Worker extends IWorker,
323 Data = unknown
324 >(
325 workerNode: IWorkerNode<Worker, Data>,
326 workerNodeEvent: string,
327 numberOfEventsToWait: number
328 ): Promise<number> => {
329 return await new Promise<number>(resolve => {
330 let events = 0
331 if (numberOfEventsToWait === 0) {
332 resolve(events)
333 return
334 }
335 workerNode.on(workerNodeEvent, () => {
336 ++events
337 if (events === numberOfEventsToWait) {
338 resolve(events)
339 }
340 })
341 })
342 }