build(deps-dev): apply updates
[poolifier.git] / src / pools / utils.ts
CommitLineData
bde6b5d7 1import { existsSync } from 'node:fs'
c3719753
JB
2import cluster from 'node:cluster'
3import { SHARE_ENV, Worker, type WorkerOptions } from 'node:worker_threads'
bfc75cca 4import { average, isPlainObject, max, median, min } from '../utils'
bde6b5d7 5import {
bfc75cca 6 type MeasurementStatisticsRequirements,
bde6b5d7
JB
7 WorkerChoiceStrategies,
8 type WorkerChoiceStrategy
9} from './selection-strategies/selection-strategies-types'
10import type { TasksQueueOptions } from './pool'
c3719753
JB
11import {
12 type IWorker,
13 type MeasurementStatistics,
14 type WorkerNodeOptions,
15 type WorkerType,
16 WorkerTypes
17} from './worker'
bde6b5d7
JB
18
19export const checkFilePath = (filePath: string): void => {
c3719753
JB
20 if (filePath == null) {
21 throw new TypeError('The worker file path must be specified')
22 }
23 if (typeof filePath !== 'string') {
24 throw new TypeError('The worker file path must be a string')
25 }
bde6b5d7
JB
26 if (!existsSync(filePath)) {
27 throw new Error(`Cannot find the worker file '${filePath}'`)
28 }
29}
30
31export const checkDynamicPoolSize = (min: number, max: number): void => {
32 if (max == null) {
33 throw new TypeError(
34 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
35 )
36 } else if (!Number.isSafeInteger(max)) {
37 throw new TypeError(
38 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
39 )
40 } else if (min > max) {
41 throw new RangeError(
42 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
43 )
44 } else if (max === 0) {
45 throw new RangeError(
46 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
47 )
48 } else if (min === max) {
49 throw new RangeError(
50 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
51 )
52 }
53}
54
55export const checkValidWorkerChoiceStrategy = (
56 workerChoiceStrategy: WorkerChoiceStrategy
57): void => {
58 if (
59 workerChoiceStrategy != null &&
60 !Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)
61 ) {
62 throw new Error(`Invalid worker choice strategy '${workerChoiceStrategy}'`)
63 }
64}
65
66export const checkValidTasksQueueOptions = (
67 tasksQueueOptions: TasksQueueOptions
68): void => {
69 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
70 throw new TypeError('Invalid tasks queue options: must be a plain object')
71 }
72 if (
73 tasksQueueOptions?.concurrency != null &&
74 !Number.isSafeInteger(tasksQueueOptions.concurrency)
75 ) {
76 throw new TypeError(
77 'Invalid worker node tasks concurrency: must be an integer'
78 )
79 }
80 if (
81 tasksQueueOptions?.concurrency != null &&
82 tasksQueueOptions.concurrency <= 0
83 ) {
84 throw new RangeError(
85 `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero`
86 )
87 }
88 if (
89 tasksQueueOptions?.size != null &&
90 !Number.isSafeInteger(tasksQueueOptions.size)
91 ) {
92 throw new TypeError(
93 'Invalid worker node tasks queue size: must be an integer'
94 )
95 }
96 if (tasksQueueOptions?.size != null && tasksQueueOptions.size <= 0) {
97 throw new RangeError(
98 `Invalid worker node tasks queue size: ${tasksQueueOptions.size} is a negative integer or zero`
99 )
100 }
101}
bfc75cca 102
c3719753
JB
103export const checkWorkerNodeArguments = (
104 type: WorkerType,
105 filePath: string,
106 opts: WorkerNodeOptions
9a38f99e 107): void => {
c3719753
JB
108 if (type == null) {
109 throw new TypeError('Cannot construct a worker node without a worker type')
110 }
111 if (!Object.values(WorkerTypes).includes(type)) {
112 throw new TypeError(
113 `Cannot construct a worker node with an invalid worker type '${type}'`
114 )
9a38f99e 115 }
c3719753
JB
116 checkFilePath(filePath)
117 if (opts == null) {
9a38f99e 118 throw new TypeError(
c3719753 119 'Cannot construct a worker node without worker node options'
9a38f99e
JB
120 )
121 }
9974369e 122 if (!isPlainObject(opts)) {
9a38f99e 123 throw new TypeError(
c3719753 124 'Cannot construct a worker node with invalid options: must be a plain object'
9a38f99e
JB
125 )
126 }
c3719753
JB
127 if (opts.tasksQueueBackPressureSize == null) {
128 throw new TypeError(
129 'Cannot construct a worker node without a tasks queue back pressure size option'
130 )
131 }
132 if (!Number.isSafeInteger(opts.tasksQueueBackPressureSize)) {
133 throw new TypeError(
134 'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer'
135 )
136 }
137 if (opts.tasksQueueBackPressureSize <= 0) {
9a38f99e 138 throw new RangeError(
c3719753 139 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer'
9a38f99e
JB
140 )
141 }
142}
bfc75cca
JB
143
144/**
145 * Updates the given measurement statistics.
146 *
147 * @param measurementStatistics - The measurement statistics to update.
148 * @param measurementRequirements - The measurement statistics requirements.
149 * @param measurementValue - The measurement value.
150 * @param numberOfMeasurements - The number of measurements.
151 * @internal
152 */
153export const updateMeasurementStatistics = (
154 measurementStatistics: MeasurementStatistics,
155 measurementRequirements: MeasurementStatisticsRequirements,
156 measurementValue: number
157): void => {
158 if (measurementRequirements.aggregate) {
159 measurementStatistics.aggregate =
160 (measurementStatistics.aggregate ?? 0) + measurementValue
161 measurementStatistics.minimum = min(
162 measurementValue,
163 measurementStatistics.minimum ?? Infinity
164 )
165 measurementStatistics.maximum = max(
166 measurementValue,
167 measurementStatistics.maximum ?? -Infinity
168 )
169 if (
170 (measurementRequirements.average || measurementRequirements.median) &&
171 measurementValue != null
172 ) {
173 measurementStatistics.history.push(measurementValue)
174 if (measurementRequirements.average) {
175 measurementStatistics.average = average(measurementStatistics.history)
176 } else if (measurementStatistics.average != null) {
177 delete measurementStatistics.average
178 }
179 if (measurementRequirements.median) {
180 measurementStatistics.median = median(measurementStatistics.history)
181 } else if (measurementStatistics.median != null) {
182 delete measurementStatistics.median
183 }
184 }
185 }
186}
c3719753
JB
187
188export const createWorker = <Worker extends IWorker>(
189 type: WorkerType,
190 filePath: string,
191 opts: { env?: Record<string, unknown>, workerOptions?: WorkerOptions }
192): Worker => {
193 switch (type) {
194 case WorkerTypes.thread:
195 return new Worker(filePath, {
196 env: SHARE_ENV,
197 ...opts?.workerOptions
198 }) as unknown as Worker
199 case WorkerTypes.cluster:
200 return cluster.fork(opts?.env) as unknown as Worker
201 default:
202 // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
203 throw new Error(`Unknown worker type '${type}'`)
204 }
205}