feat: add per task function strategy support
[poolifier.git] / src / pools / utils.ts
CommitLineData
e9ed6eee 1import cluster, { Worker as ClusterWorker } from 'node:cluster'
ded253e2 2import { existsSync } from 'node:fs'
ded253e2 3import { env } from 'node:process'
e9ed6eee
JB
4import {
5 SHARE_ENV,
6 Worker as ThreadWorker,
7 type WorkerOptions
8} from 'node:worker_threads'
ded253e2 9
d35e5717 10import type { MessageValue, Task } from '../utility-types.js'
ded253e2 11import { average, isPlainObject, max, median, min } from '../utils.js'
bcfb06ce 12import type { TasksQueueOptions } from './pool.js'
bde6b5d7 13import {
bfc75cca 14 type MeasurementStatisticsRequirements,
bde6b5d7 15 WorkerChoiceStrategies,
bcfb06ce 16 type WorkerChoiceStrategy
d35e5717 17} from './selection-strategies/selection-strategies-types.js'
bcfb06ce 18import type { WorkerChoiceStrategiesContext } from './selection-strategies/worker-choice-strategies-context.js'
c3719753
JB
19import {
20 type IWorker,
d41a44de 21 type IWorkerNode,
c3719753
JB
22 type MeasurementStatistics,
23 type WorkerNodeOptions,
24 type WorkerType,
c329fd41
JB
25 WorkerTypes,
26 type WorkerUsage
d35e5717 27} from './worker.js'
bde6b5d7 28
e9ed6eee
JB
29/**
30 * Default measurement statistics requirements.
31 */
32export const DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS: MeasurementStatisticsRequirements =
33 {
34 aggregate: false,
35 average: false,
36 median: false
37 }
38
32b141fd
JB
39export const getDefaultTasksQueueOptions = (
40 poolMaxSize: number
41): Required<TasksQueueOptions> => {
42 return {
43 size: Math.pow(poolMaxSize, 2),
44 concurrency: 1,
45 taskStealing: true,
46 tasksStealingOnBackPressure: true,
568d0075 47 tasksFinishedTimeout: 2000
32b141fd
JB
48 }
49}
50
c63a35a0 51export const checkFilePath = (filePath: string | undefined): void => {
c3719753
JB
52 if (filePath == null) {
53 throw new TypeError('The worker file path must be specified')
54 }
55 if (typeof filePath !== 'string') {
56 throw new TypeError('The worker file path must be a string')
57 }
bde6b5d7
JB
58 if (!existsSync(filePath)) {
59 throw new Error(`Cannot find the worker file '${filePath}'`)
60 }
61}
62
c63a35a0
JB
63export const checkDynamicPoolSize = (
64 min: number,
65 max: number | undefined
66): void => {
bde6b5d7
JB
67 if (max == null) {
68 throw new TypeError(
69 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
70 )
71 } else if (!Number.isSafeInteger(max)) {
72 throw new TypeError(
73 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
74 )
75 } else if (min > max) {
76 throw new RangeError(
77 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
78 )
79 } else if (max === 0) {
80 throw new RangeError(
81 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
82 )
83 } else if (min === max) {
84 throw new RangeError(
85 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
86 )
87 }
88}
89
90export const checkValidWorkerChoiceStrategy = (
c63a35a0 91 workerChoiceStrategy: WorkerChoiceStrategy | undefined
bde6b5d7
JB
92): void => {
93 if (
94 workerChoiceStrategy != null &&
95 !Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)
96 ) {
97 throw new Error(`Invalid worker choice strategy '${workerChoiceStrategy}'`)
98 }
99}
100
101export const checkValidTasksQueueOptions = (
c63a35a0 102 tasksQueueOptions: TasksQueueOptions | undefined
bde6b5d7
JB
103): void => {
104 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
105 throw new TypeError('Invalid tasks queue options: must be a plain object')
106 }
107 if (
108 tasksQueueOptions?.concurrency != null &&
109 !Number.isSafeInteger(tasksQueueOptions.concurrency)
110 ) {
111 throw new TypeError(
112 'Invalid worker node tasks concurrency: must be an integer'
113 )
114 }
115 if (
116 tasksQueueOptions?.concurrency != null &&
117 tasksQueueOptions.concurrency <= 0
118 ) {
119 throw new RangeError(
120 `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero`
121 )
122 }
123 if (
124 tasksQueueOptions?.size != null &&
125 !Number.isSafeInteger(tasksQueueOptions.size)
126 ) {
127 throw new TypeError(
128 'Invalid worker node tasks queue size: must be an integer'
129 )
130 }
131 if (tasksQueueOptions?.size != null && tasksQueueOptions.size <= 0) {
132 throw new RangeError(
133 `Invalid worker node tasks queue size: ${tasksQueueOptions.size} is a negative integer or zero`
134 )
135 }
136}
bfc75cca 137
c3719753 138export const checkWorkerNodeArguments = (
c63a35a0
JB
139 type: WorkerType | undefined,
140 filePath: string | undefined,
141 opts: WorkerNodeOptions | undefined
9a38f99e 142): void => {
c3719753
JB
143 if (type == null) {
144 throw new TypeError('Cannot construct a worker node without a worker type')
145 }
146 if (!Object.values(WorkerTypes).includes(type)) {
147 throw new TypeError(
148 `Cannot construct a worker node with an invalid worker type '${type}'`
149 )
9a38f99e 150 }
c3719753
JB
151 checkFilePath(filePath)
152 if (opts == null) {
9a38f99e 153 throw new TypeError(
c3719753 154 'Cannot construct a worker node without worker node options'
9a38f99e
JB
155 )
156 }
9974369e 157 if (!isPlainObject(opts)) {
9a38f99e 158 throw new TypeError(
c3719753 159 'Cannot construct a worker node with invalid options: must be a plain object'
9a38f99e
JB
160 )
161 }
c3719753
JB
162 if (opts.tasksQueueBackPressureSize == null) {
163 throw new TypeError(
164 'Cannot construct a worker node without a tasks queue back pressure size option'
165 )
166 }
167 if (!Number.isSafeInteger(opts.tasksQueueBackPressureSize)) {
168 throw new TypeError(
169 'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer'
170 )
171 }
172 if (opts.tasksQueueBackPressureSize <= 0) {
9a38f99e 173 throw new RangeError(
c3719753 174 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer'
9a38f99e
JB
175 )
176 }
177}
bfc75cca
JB
178
179/**
180 * Updates the given measurement statistics.
181 *
182 * @param measurementStatistics - The measurement statistics to update.
183 * @param measurementRequirements - The measurement statistics requirements.
184 * @param measurementValue - The measurement value.
bfc75cca
JB
185 * @internal
186 */
c329fd41 187const updateMeasurementStatistics = (
bfc75cca 188 measurementStatistics: MeasurementStatistics,
c63a35a0
JB
189 measurementRequirements: MeasurementStatisticsRequirements | undefined,
190 measurementValue: number | undefined
bfc75cca 191): void => {
c63a35a0
JB
192 if (
193 measurementRequirements != null &&
194 measurementValue != null &&
195 measurementRequirements.aggregate
196 ) {
bfc75cca
JB
197 measurementStatistics.aggregate =
198 (measurementStatistics.aggregate ?? 0) + measurementValue
199 measurementStatistics.minimum = min(
200 measurementValue,
201 measurementStatistics.minimum ?? Infinity
202 )
203 measurementStatistics.maximum = max(
204 measurementValue,
205 measurementStatistics.maximum ?? -Infinity
206 )
c63a35a0 207 if (measurementRequirements.average || measurementRequirements.median) {
bfc75cca
JB
208 measurementStatistics.history.push(measurementValue)
209 if (measurementRequirements.average) {
210 measurementStatistics.average = average(measurementStatistics.history)
211 } else if (measurementStatistics.average != null) {
212 delete measurementStatistics.average
213 }
214 if (measurementRequirements.median) {
215 measurementStatistics.median = median(measurementStatistics.history)
216 } else if (measurementStatistics.median != null) {
217 delete measurementStatistics.median
218 }
219 }
220 }
221}
c329fd41
JB
222if (env.NODE_ENV === 'test') {
223 // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
224 exports.updateMeasurementStatistics = updateMeasurementStatistics
225}
226
227export const updateWaitTimeWorkerUsage = <
228 Worker extends IWorker,
229 Data = unknown,
230 Response = unknown
231>(
c63a35a0 232 workerChoiceStrategyContext:
bcfb06ce 233 | WorkerChoiceStrategiesContext<Worker, Data, Response>
c63a35a0 234 | undefined,
c329fd41
JB
235 workerUsage: WorkerUsage,
236 task: Task<Data>
237 ): void => {
238 const timestamp = performance.now()
239 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
240 updateMeasurementStatistics(
241 workerUsage.waitTime,
f4d0a470 242 workerChoiceStrategyContext?.getTaskStatisticsRequirements().waitTime,
c329fd41
JB
243 taskWaitTime
244 )
245}
246
247export const updateTaskStatisticsWorkerUsage = <Response = unknown>(
248 workerUsage: WorkerUsage,
249 message: MessageValue<Response>
250): void => {
251 const workerTaskStatistics = workerUsage.tasks
252 if (
c63a35a0 253 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
c329fd41
JB
254 workerTaskStatistics.executing != null &&
255 workerTaskStatistics.executing > 0
256 ) {
257 --workerTaskStatistics.executing
258 }
259 if (message.workerError == null) {
260 ++workerTaskStatistics.executed
261 } else {
262 ++workerTaskStatistics.failed
263 }
264}
265
266export const updateRunTimeWorkerUsage = <
267 Worker extends IWorker,
268 Data = unknown,
269 Response = unknown
270>(
c63a35a0 271 workerChoiceStrategyContext:
bcfb06ce 272 | WorkerChoiceStrategiesContext<Worker, Data, Response>
c63a35a0 273 | undefined,
c329fd41
JB
274 workerUsage: WorkerUsage,
275 message: MessageValue<Response>
276 ): void => {
277 if (message.workerError != null) {
278 return
279 }
280 updateMeasurementStatistics(
281 workerUsage.runTime,
f4d0a470 282 workerChoiceStrategyContext?.getTaskStatisticsRequirements().runTime,
c329fd41
JB
283 message.taskPerformance?.runTime ?? 0
284 )
285}
286
287export const updateEluWorkerUsage = <
288 Worker extends IWorker,
289 Data = unknown,
290 Response = unknown
291>(
c63a35a0 292 workerChoiceStrategyContext:
bcfb06ce 293 | WorkerChoiceStrategiesContext<Worker, Data, Response>
c63a35a0 294 | undefined,
c329fd41
JB
295 workerUsage: WorkerUsage,
296 message: MessageValue<Response>
297 ): void => {
298 if (message.workerError != null) {
299 return
300 }
c63a35a0 301 const eluTaskStatisticsRequirements =
f4d0a470 302 workerChoiceStrategyContext?.getTaskStatisticsRequirements().elu
c329fd41
JB
303 updateMeasurementStatistics(
304 workerUsage.elu.active,
305 eluTaskStatisticsRequirements,
306 message.taskPerformance?.elu?.active ?? 0
307 )
308 updateMeasurementStatistics(
309 workerUsage.elu.idle,
310 eluTaskStatisticsRequirements,
311 message.taskPerformance?.elu?.idle ?? 0
312 )
c63a35a0 313 if (eluTaskStatisticsRequirements?.aggregate === true) {
c329fd41
JB
314 if (message.taskPerformance?.elu != null) {
315 if (workerUsage.elu.utilization != null) {
316 workerUsage.elu.utilization =
317 (workerUsage.elu.utilization +
318 message.taskPerformance.elu.utilization) /
319 2
320 } else {
321 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
322 }
323 }
324 }
325}
c3719753
JB
326
327export const createWorker = <Worker extends IWorker>(
328 type: WorkerType,
329 filePath: string,
330 opts: { env?: Record<string, unknown>, workerOptions?: WorkerOptions }
331): Worker => {
332 switch (type) {
333 case WorkerTypes.thread:
e9ed6eee 334 return new ThreadWorker(filePath, {
c3719753 335 env: SHARE_ENV,
c63a35a0 336 ...opts.workerOptions
c3719753
JB
337 }) as unknown as Worker
338 case WorkerTypes.cluster:
c63a35a0 339 return cluster.fork(opts.env) as unknown as Worker
c3719753
JB
340 default:
341 // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
342 throw new Error(`Unknown worker type '${type}'`)
343 }
344}
d41a44de 345
e9ed6eee
JB
346/**
347 * Returns the worker type of the given worker.
348 *
349 * @param worker - The worker to get the type of.
350 * @returns The worker type of the given worker.
351 * @internal
352 */
353export const getWorkerType = (worker: IWorker): WorkerType | undefined => {
354 if (worker instanceof ThreadWorker) {
355 return WorkerTypes.thread
356 } else if (worker instanceof ClusterWorker) {
357 return WorkerTypes.cluster
358 }
359}
360
361/**
362 * Returns the worker id of the given worker.
363 *
364 * @param worker - The worker to get the id of.
365 * @returns The worker id of the given worker.
366 * @internal
367 */
368export const getWorkerId = (worker: IWorker): number | undefined => {
369 if (worker instanceof ThreadWorker) {
370 return worker.threadId
371 } else if (worker instanceof ClusterWorker) {
372 return worker.id
373 }
374}
375
d41a44de
JB
376export const waitWorkerNodeEvents = async <
377 Worker extends IWorker,
378 Data = unknown
379>(
380 workerNode: IWorkerNode<Worker, Data>,
381 workerNodeEvent: string,
32b141fd
JB
382 numberOfEventsToWait: number,
383 timeout: number
d41a44de
JB
384): Promise<number> => {
385 return await new Promise<number>(resolve => {
386 let events = 0
387 if (numberOfEventsToWait === 0) {
388 resolve(events)
389 return
390 }
391 workerNode.on(workerNodeEvent, () => {
392 ++events
393 if (events === numberOfEventsToWait) {
394 resolve(events)
395 }
396 })
6f3a391b 397 if (timeout >= 0) {
32b141fd
JB
398 setTimeout(() => {
399 resolve(events)
400 }, timeout)
401 }
d41a44de
JB
402 })
403}