refactor: cleanup worker error handling code
[poolifier.git] / src / pools / utils.ts
1 import { existsSync } from 'node:fs'
2 import cluster from 'node:cluster'
3 import { SHARE_ENV, Worker, type WorkerOptions } from 'node:worker_threads'
4 import { average, isPlainObject, max, median, min } from '../utils'
5 import {
6 type MeasurementStatisticsRequirements,
7 WorkerChoiceStrategies,
8 type WorkerChoiceStrategy
9 } from './selection-strategies/selection-strategies-types'
10 import type { TasksQueueOptions } from './pool'
11 import {
12 type IWorker,
13 type MeasurementStatistics,
14 type WorkerNodeOptions,
15 type WorkerType,
16 WorkerTypes
17 } from './worker'
18
19 export const checkFilePath = (filePath: string): void => {
20 if (filePath == null) {
21 throw new TypeError('The worker file path must be specified')
22 }
23 if (typeof filePath !== 'string') {
24 throw new TypeError('The worker file path must be a string')
25 }
26 if (!existsSync(filePath)) {
27 throw new Error(`Cannot find the worker file '${filePath}'`)
28 }
29 }
30
31 export const checkDynamicPoolSize = (min: number, max: number): void => {
32 if (max == null) {
33 throw new TypeError(
34 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
35 )
36 } else if (!Number.isSafeInteger(max)) {
37 throw new TypeError(
38 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
39 )
40 } else if (min > max) {
41 throw new RangeError(
42 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
43 )
44 } else if (max === 0) {
45 throw new RangeError(
46 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
47 )
48 } else if (min === max) {
49 throw new RangeError(
50 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
51 )
52 }
53 }
54
55 export const checkValidWorkerChoiceStrategy = (
56 workerChoiceStrategy: WorkerChoiceStrategy
57 ): void => {
58 if (
59 workerChoiceStrategy != null &&
60 !Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)
61 ) {
62 throw new Error(`Invalid worker choice strategy '${workerChoiceStrategy}'`)
63 }
64 }
65
66 export const checkValidTasksQueueOptions = (
67 tasksQueueOptions: TasksQueueOptions
68 ): void => {
69 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
70 throw new TypeError('Invalid tasks queue options: must be a plain object')
71 }
72 if (
73 tasksQueueOptions?.concurrency != null &&
74 !Number.isSafeInteger(tasksQueueOptions.concurrency)
75 ) {
76 throw new TypeError(
77 'Invalid worker node tasks concurrency: must be an integer'
78 )
79 }
80 if (
81 tasksQueueOptions?.concurrency != null &&
82 tasksQueueOptions.concurrency <= 0
83 ) {
84 throw new RangeError(
85 `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero`
86 )
87 }
88 if (
89 tasksQueueOptions?.size != null &&
90 !Number.isSafeInteger(tasksQueueOptions.size)
91 ) {
92 throw new TypeError(
93 'Invalid worker node tasks queue size: must be an integer'
94 )
95 }
96 if (tasksQueueOptions?.size != null && tasksQueueOptions.size <= 0) {
97 throw new RangeError(
98 `Invalid worker node tasks queue size: ${tasksQueueOptions.size} is a negative integer or zero`
99 )
100 }
101 }
102
103 export const checkWorkerNodeArguments = (
104 type: WorkerType,
105 filePath: string,
106 opts: WorkerNodeOptions
107 ): void => {
108 if (type == null) {
109 throw new TypeError('Cannot construct a worker node without a worker type')
110 }
111 if (!Object.values(WorkerTypes).includes(type)) {
112 throw new TypeError(
113 `Cannot construct a worker node with an invalid worker type '${type}'`
114 )
115 }
116 checkFilePath(filePath)
117 if (opts == null) {
118 throw new TypeError(
119 'Cannot construct a worker node without worker node options'
120 )
121 }
122 if (!isPlainObject(opts)) {
123 throw new TypeError(
124 'Cannot construct a worker node with invalid options: must be a plain object'
125 )
126 }
127 if (opts.tasksQueueBackPressureSize == null) {
128 throw new TypeError(
129 'Cannot construct a worker node without a tasks queue back pressure size option'
130 )
131 }
132 if (!Number.isSafeInteger(opts.tasksQueueBackPressureSize)) {
133 throw new TypeError(
134 'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer'
135 )
136 }
137 if (opts.tasksQueueBackPressureSize <= 0) {
138 throw new RangeError(
139 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer'
140 )
141 }
142 }
143
144 /**
145 * Updates the given measurement statistics.
146 *
147 * @param measurementStatistics - The measurement statistics to update.
148 * @param measurementRequirements - The measurement statistics requirements.
149 * @param measurementValue - The measurement value.
150 * @param numberOfMeasurements - The number of measurements.
151 * @internal
152 */
153 export const updateMeasurementStatistics = (
154 measurementStatistics: MeasurementStatistics,
155 measurementRequirements: MeasurementStatisticsRequirements,
156 measurementValue: number
157 ): void => {
158 if (measurementRequirements.aggregate) {
159 measurementStatistics.aggregate =
160 (measurementStatistics.aggregate ?? 0) + measurementValue
161 measurementStatistics.minimum = min(
162 measurementValue,
163 measurementStatistics.minimum ?? Infinity
164 )
165 measurementStatistics.maximum = max(
166 measurementValue,
167 measurementStatistics.maximum ?? -Infinity
168 )
169 if (
170 (measurementRequirements.average || measurementRequirements.median) &&
171 measurementValue != null
172 ) {
173 measurementStatistics.history.push(measurementValue)
174 if (measurementRequirements.average) {
175 measurementStatistics.average = average(measurementStatistics.history)
176 } else if (measurementStatistics.average != null) {
177 delete measurementStatistics.average
178 }
179 if (measurementRequirements.median) {
180 measurementStatistics.median = median(measurementStatistics.history)
181 } else if (measurementStatistics.median != null) {
182 delete measurementStatistics.median
183 }
184 }
185 }
186 }
187
188 export const createWorker = <Worker extends IWorker>(
189 type: WorkerType,
190 filePath: string,
191 opts: { env?: Record<string, unknown>, workerOptions?: WorkerOptions }
192 ): Worker => {
193 switch (type) {
194 case WorkerTypes.thread:
195 return new Worker(filePath, {
196 env: SHARE_ENV,
197 ...opts?.workerOptions
198 }) as unknown as Worker
199 case WorkerTypes.cluster:
200 return cluster.fork(opts?.env) as unknown as Worker
201 default:
202 // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
203 throw new Error(`Unknown worker type '${type}'`)
204 }
205 }