feat: add queued tasks end timeout support to worker node termination
[poolifier.git] / src / pools / utils.ts
1 import { existsSync } from 'node:fs'
2 import cluster from 'node:cluster'
3 import { SHARE_ENV, Worker, type WorkerOptions } from 'node:worker_threads'
4 import { env } from 'node:process'
5 import { average, isPlainObject, max, median, min } from '../utils'
6 import type { MessageValue, Task } from '../utility-types'
7 import {
8 type MeasurementStatisticsRequirements,
9 WorkerChoiceStrategies,
10 type WorkerChoiceStrategy
11 } from './selection-strategies/selection-strategies-types'
12 import type { TasksQueueOptions } from './pool'
13 import {
14 type IWorker,
15 type IWorkerNode,
16 type MeasurementStatistics,
17 type WorkerNodeOptions,
18 type WorkerType,
19 WorkerTypes,
20 type WorkerUsage
21 } from './worker'
22 import type { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
23
24 export const getDefaultTasksQueueOptions = (
25 poolMaxSize: number
26 ): Required<TasksQueueOptions> => {
27 return {
28 size: Math.pow(poolMaxSize, 2),
29 concurrency: 1,
30 taskStealing: true,
31 tasksStealingOnBackPressure: true,
32 tasksFinishedTimeout: 1000
33 }
34 }
35
36 export const checkFilePath = (filePath: string): void => {
37 if (filePath == null) {
38 throw new TypeError('The worker file path must be specified')
39 }
40 if (typeof filePath !== 'string') {
41 throw new TypeError('The worker file path must be a string')
42 }
43 if (!existsSync(filePath)) {
44 throw new Error(`Cannot find the worker file '${filePath}'`)
45 }
46 }
47
48 export const checkDynamicPoolSize = (min: number, max: number): void => {
49 if (max == null) {
50 throw new TypeError(
51 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
52 )
53 } else if (!Number.isSafeInteger(max)) {
54 throw new TypeError(
55 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
56 )
57 } else if (min > max) {
58 throw new RangeError(
59 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
60 )
61 } else if (max === 0) {
62 throw new RangeError(
63 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
64 )
65 } else if (min === max) {
66 throw new RangeError(
67 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
68 )
69 }
70 }
71
72 export const checkValidWorkerChoiceStrategy = (
73 workerChoiceStrategy: WorkerChoiceStrategy
74 ): void => {
75 if (
76 workerChoiceStrategy != null &&
77 !Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)
78 ) {
79 throw new Error(`Invalid worker choice strategy '${workerChoiceStrategy}'`)
80 }
81 }
82
83 export const checkValidTasksQueueOptions = (
84 tasksQueueOptions: TasksQueueOptions
85 ): void => {
86 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
87 throw new TypeError('Invalid tasks queue options: must be a plain object')
88 }
89 if (
90 tasksQueueOptions?.concurrency != null &&
91 !Number.isSafeInteger(tasksQueueOptions.concurrency)
92 ) {
93 throw new TypeError(
94 'Invalid worker node tasks concurrency: must be an integer'
95 )
96 }
97 if (
98 tasksQueueOptions?.concurrency != null &&
99 tasksQueueOptions.concurrency <= 0
100 ) {
101 throw new RangeError(
102 `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero`
103 )
104 }
105 if (
106 tasksQueueOptions?.size != null &&
107 !Number.isSafeInteger(tasksQueueOptions.size)
108 ) {
109 throw new TypeError(
110 'Invalid worker node tasks queue size: must be an integer'
111 )
112 }
113 if (tasksQueueOptions?.size != null && tasksQueueOptions.size <= 0) {
114 throw new RangeError(
115 `Invalid worker node tasks queue size: ${tasksQueueOptions.size} is a negative integer or zero`
116 )
117 }
118 }
119
120 export const checkWorkerNodeArguments = (
121 type: WorkerType,
122 filePath: string,
123 opts: WorkerNodeOptions
124 ): void => {
125 if (type == null) {
126 throw new TypeError('Cannot construct a worker node without a worker type')
127 }
128 if (!Object.values(WorkerTypes).includes(type)) {
129 throw new TypeError(
130 `Cannot construct a worker node with an invalid worker type '${type}'`
131 )
132 }
133 checkFilePath(filePath)
134 if (opts == null) {
135 throw new TypeError(
136 'Cannot construct a worker node without worker node options'
137 )
138 }
139 if (!isPlainObject(opts)) {
140 throw new TypeError(
141 'Cannot construct a worker node with invalid options: must be a plain object'
142 )
143 }
144 if (opts.tasksQueueBackPressureSize == null) {
145 throw new TypeError(
146 'Cannot construct a worker node without a tasks queue back pressure size option'
147 )
148 }
149 if (!Number.isSafeInteger(opts.tasksQueueBackPressureSize)) {
150 throw new TypeError(
151 'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer'
152 )
153 }
154 if (opts.tasksQueueBackPressureSize <= 0) {
155 throw new RangeError(
156 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer'
157 )
158 }
159 }
160
161 /**
162 * Updates the given measurement statistics.
163 *
164 * @param measurementStatistics - The measurement statistics to update.
165 * @param measurementRequirements - The measurement statistics requirements.
166 * @param measurementValue - The measurement value.
167 * @param numberOfMeasurements - The number of measurements.
168 * @internal
169 */
170 const updateMeasurementStatistics = (
171 measurementStatistics: MeasurementStatistics,
172 measurementRequirements: MeasurementStatisticsRequirements,
173 measurementValue: number
174 ): void => {
175 if (measurementRequirements.aggregate) {
176 measurementStatistics.aggregate =
177 (measurementStatistics.aggregate ?? 0) + measurementValue
178 measurementStatistics.minimum = min(
179 measurementValue,
180 measurementStatistics.minimum ?? Infinity
181 )
182 measurementStatistics.maximum = max(
183 measurementValue,
184 measurementStatistics.maximum ?? -Infinity
185 )
186 if (
187 (measurementRequirements.average || measurementRequirements.median) &&
188 measurementValue != null
189 ) {
190 measurementStatistics.history.push(measurementValue)
191 if (measurementRequirements.average) {
192 measurementStatistics.average = average(measurementStatistics.history)
193 } else if (measurementStatistics.average != null) {
194 delete measurementStatistics.average
195 }
196 if (measurementRequirements.median) {
197 measurementStatistics.median = median(measurementStatistics.history)
198 } else if (measurementStatistics.median != null) {
199 delete measurementStatistics.median
200 }
201 }
202 }
203 }
204 if (env.NODE_ENV === 'test') {
205 // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
206 exports.updateMeasurementStatistics = updateMeasurementStatistics
207 }
208
209 export const updateWaitTimeWorkerUsage = <
210 Worker extends IWorker,
211 Data = unknown,
212 Response = unknown
213 >(
214 workerChoiceStrategyContext: WorkerChoiceStrategyContext<
215 Worker,
216 Data,
217 Response
218 >,
219 workerUsage: WorkerUsage,
220 task: Task<Data>
221 ): void => {
222 const timestamp = performance.now()
223 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
224 updateMeasurementStatistics(
225 workerUsage.waitTime,
226 workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
227 taskWaitTime
228 )
229 }
230
231 export const updateTaskStatisticsWorkerUsage = <Response = unknown>(
232 workerUsage: WorkerUsage,
233 message: MessageValue<Response>
234 ): void => {
235 const workerTaskStatistics = workerUsage.tasks
236 if (
237 workerTaskStatistics.executing != null &&
238 workerTaskStatistics.executing > 0
239 ) {
240 --workerTaskStatistics.executing
241 }
242 if (message.workerError == null) {
243 ++workerTaskStatistics.executed
244 } else {
245 ++workerTaskStatistics.failed
246 }
247 }
248
249 export const updateRunTimeWorkerUsage = <
250 Worker extends IWorker,
251 Data = unknown,
252 Response = unknown
253 >(
254 workerChoiceStrategyContext: WorkerChoiceStrategyContext<
255 Worker,
256 Data,
257 Response
258 >,
259 workerUsage: WorkerUsage,
260 message: MessageValue<Response>
261 ): void => {
262 if (message.workerError != null) {
263 return
264 }
265 updateMeasurementStatistics(
266 workerUsage.runTime,
267 workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
268 message.taskPerformance?.runTime ?? 0
269 )
270 }
271
272 export const updateEluWorkerUsage = <
273 Worker extends IWorker,
274 Data = unknown,
275 Response = unknown
276 >(
277 workerChoiceStrategyContext: WorkerChoiceStrategyContext<
278 Worker,
279 Data,
280 Response
281 >,
282 workerUsage: WorkerUsage,
283 message: MessageValue<Response>
284 ): void => {
285 if (message.workerError != null) {
286 return
287 }
288 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
289 workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
290 updateMeasurementStatistics(
291 workerUsage.elu.active,
292 eluTaskStatisticsRequirements,
293 message.taskPerformance?.elu?.active ?? 0
294 )
295 updateMeasurementStatistics(
296 workerUsage.elu.idle,
297 eluTaskStatisticsRequirements,
298 message.taskPerformance?.elu?.idle ?? 0
299 )
300 if (eluTaskStatisticsRequirements.aggregate) {
301 if (message.taskPerformance?.elu != null) {
302 if (workerUsage.elu.utilization != null) {
303 workerUsage.elu.utilization =
304 (workerUsage.elu.utilization +
305 message.taskPerformance.elu.utilization) /
306 2
307 } else {
308 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
309 }
310 }
311 }
312 }
313
314 export const createWorker = <Worker extends IWorker>(
315 type: WorkerType,
316 filePath: string,
317 opts: { env?: Record<string, unknown>, workerOptions?: WorkerOptions }
318 ): Worker => {
319 switch (type) {
320 case WorkerTypes.thread:
321 return new Worker(filePath, {
322 env: SHARE_ENV,
323 ...opts?.workerOptions
324 }) as unknown as Worker
325 case WorkerTypes.cluster:
326 return cluster.fork(opts?.env) as unknown as Worker
327 default:
328 // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
329 throw new Error(`Unknown worker type '${type}'`)
330 }
331 }
332
333 export const waitWorkerNodeEvents = async <
334 Worker extends IWorker,
335 Data = unknown
336 >(
337 workerNode: IWorkerNode<Worker, Data>,
338 workerNodeEvent: string,
339 numberOfEventsToWait: number,
340 timeout: number
341 ): Promise<number> => {
342 return await new Promise<number>(resolve => {
343 let events = 0
344 if (numberOfEventsToWait === 0) {
345 resolve(events)
346 return
347 }
348 workerNode.on(workerNodeEvent, () => {
349 ++events
350 if (events === numberOfEventsToWait) {
351 resolve(events)
352 }
353 })
354 if (timeout > 0) {
355 setTimeout(() => {
356 resolve(events)
357 }, timeout)
358 }
359 })
360 }