refactor: emit worker node event at task end
[poolifier.git] / src / pools / utils.ts
1 import { existsSync } from 'node:fs'
2 import cluster from 'node:cluster'
3 import { SHARE_ENV, Worker, type WorkerOptions } from 'node:worker_threads'
4 import { average, isPlainObject, max, median, min } from '../utils'
5 import {
6 type MeasurementStatisticsRequirements,
7 WorkerChoiceStrategies,
8 type WorkerChoiceStrategy
9 } from './selection-strategies/selection-strategies-types'
10 import type { TasksQueueOptions } from './pool'
11 import {
12 type IWorker,
13 type IWorkerNode,
14 type MeasurementStatistics,
15 type WorkerNodeOptions,
16 type WorkerType,
17 WorkerTypes
18 } from './worker'
19
20 export const checkFilePath = (filePath: string): void => {
21 if (filePath == null) {
22 throw new TypeError('The worker file path must be specified')
23 }
24 if (typeof filePath !== 'string') {
25 throw new TypeError('The worker file path must be a string')
26 }
27 if (!existsSync(filePath)) {
28 throw new Error(`Cannot find the worker file '${filePath}'`)
29 }
30 }
31
32 export const checkDynamicPoolSize = (min: number, max: number): void => {
33 if (max == null) {
34 throw new TypeError(
35 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
36 )
37 } else if (!Number.isSafeInteger(max)) {
38 throw new TypeError(
39 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
40 )
41 } else if (min > max) {
42 throw new RangeError(
43 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
44 )
45 } else if (max === 0) {
46 throw new RangeError(
47 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
48 )
49 } else if (min === max) {
50 throw new RangeError(
51 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
52 )
53 }
54 }
55
56 export const checkValidWorkerChoiceStrategy = (
57 workerChoiceStrategy: WorkerChoiceStrategy
58 ): void => {
59 if (
60 workerChoiceStrategy != null &&
61 !Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)
62 ) {
63 throw new Error(`Invalid worker choice strategy '${workerChoiceStrategy}'`)
64 }
65 }
66
67 export const checkValidTasksQueueOptions = (
68 tasksQueueOptions: TasksQueueOptions
69 ): void => {
70 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
71 throw new TypeError('Invalid tasks queue options: must be a plain object')
72 }
73 if (
74 tasksQueueOptions?.concurrency != null &&
75 !Number.isSafeInteger(tasksQueueOptions.concurrency)
76 ) {
77 throw new TypeError(
78 'Invalid worker node tasks concurrency: must be an integer'
79 )
80 }
81 if (
82 tasksQueueOptions?.concurrency != null &&
83 tasksQueueOptions.concurrency <= 0
84 ) {
85 throw new RangeError(
86 `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero`
87 )
88 }
89 if (
90 tasksQueueOptions?.size != null &&
91 !Number.isSafeInteger(tasksQueueOptions.size)
92 ) {
93 throw new TypeError(
94 'Invalid worker node tasks queue size: must be an integer'
95 )
96 }
97 if (tasksQueueOptions?.size != null && tasksQueueOptions.size <= 0) {
98 throw new RangeError(
99 `Invalid worker node tasks queue size: ${tasksQueueOptions.size} is a negative integer or zero`
100 )
101 }
102 }
103
104 export const checkWorkerNodeArguments = (
105 type: WorkerType,
106 filePath: string,
107 opts: WorkerNodeOptions
108 ): void => {
109 if (type == null) {
110 throw new TypeError('Cannot construct a worker node without a worker type')
111 }
112 if (!Object.values(WorkerTypes).includes(type)) {
113 throw new TypeError(
114 `Cannot construct a worker node with an invalid worker type '${type}'`
115 )
116 }
117 checkFilePath(filePath)
118 if (opts == null) {
119 throw new TypeError(
120 'Cannot construct a worker node without worker node options'
121 )
122 }
123 if (!isPlainObject(opts)) {
124 throw new TypeError(
125 'Cannot construct a worker node with invalid options: must be a plain object'
126 )
127 }
128 if (opts.tasksQueueBackPressureSize == null) {
129 throw new TypeError(
130 'Cannot construct a worker node without a tasks queue back pressure size option'
131 )
132 }
133 if (!Number.isSafeInteger(opts.tasksQueueBackPressureSize)) {
134 throw new TypeError(
135 'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer'
136 )
137 }
138 if (opts.tasksQueueBackPressureSize <= 0) {
139 throw new RangeError(
140 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer'
141 )
142 }
143 }
144
145 /**
146 * Updates the given measurement statistics.
147 *
148 * @param measurementStatistics - The measurement statistics to update.
149 * @param measurementRequirements - The measurement statistics requirements.
150 * @param measurementValue - The measurement value.
151 * @param numberOfMeasurements - The number of measurements.
152 * @internal
153 */
154 export const updateMeasurementStatistics = (
155 measurementStatistics: MeasurementStatistics,
156 measurementRequirements: MeasurementStatisticsRequirements,
157 measurementValue: number
158 ): void => {
159 if (measurementRequirements.aggregate) {
160 measurementStatistics.aggregate =
161 (measurementStatistics.aggregate ?? 0) + measurementValue
162 measurementStatistics.minimum = min(
163 measurementValue,
164 measurementStatistics.minimum ?? Infinity
165 )
166 measurementStatistics.maximum = max(
167 measurementValue,
168 measurementStatistics.maximum ?? -Infinity
169 )
170 if (
171 (measurementRequirements.average || measurementRequirements.median) &&
172 measurementValue != null
173 ) {
174 measurementStatistics.history.push(measurementValue)
175 if (measurementRequirements.average) {
176 measurementStatistics.average = average(measurementStatistics.history)
177 } else if (measurementStatistics.average != null) {
178 delete measurementStatistics.average
179 }
180 if (measurementRequirements.median) {
181 measurementStatistics.median = median(measurementStatistics.history)
182 } else if (measurementStatistics.median != null) {
183 delete measurementStatistics.median
184 }
185 }
186 }
187 }
188
189 export const createWorker = <Worker extends IWorker>(
190 type: WorkerType,
191 filePath: string,
192 opts: { env?: Record<string, unknown>, workerOptions?: WorkerOptions }
193 ): Worker => {
194 switch (type) {
195 case WorkerTypes.thread:
196 return new Worker(filePath, {
197 env: SHARE_ENV,
198 ...opts?.workerOptions
199 }) as unknown as Worker
200 case WorkerTypes.cluster:
201 return cluster.fork(opts?.env) as unknown as Worker
202 default:
203 // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
204 throw new Error(`Unknown worker type '${type}'`)
205 }
206 }
207
208 export const waitWorkerNodeEvents = async <
209 Worker extends IWorker,
210 Data = unknown
211 >(
212 workerNode: IWorkerNode<Worker, Data>,
213 workerNodeEvent: string,
214 numberOfEventsToWait: number
215 ): Promise<number> => {
216 return await new Promise<number>(resolve => {
217 let events = 0
218 if (numberOfEventsToWait === 0) {
219 resolve(events)
220 return
221 }
222 workerNode.on(workerNodeEvent, () => {
223 ++events
224 if (events === numberOfEventsToWait) {
225 resolve(events)
226 }
227 })
228 })
229 }