fix: properly account worker choice retries for WRR
[poolifier.git] / src / pools / pool.ts
1 import type { TransferListItem, WorkerOptions } from 'node:worker_threads'
2 import type { EventEmitterAsyncResource } from 'node:events'
3 import type { ClusterSettings } from 'node:cluster'
4 import type { TaskFunction } from '../worker/task-functions'
5 import type {
6 ErrorHandler,
7 ExitHandler,
8 IWorker,
9 IWorkerNode,
10 MessageHandler,
11 OnlineHandler,
12 WorkerType
13 } from './worker'
14 import type {
15 WorkerChoiceStrategy,
16 WorkerChoiceStrategyOptions
17 } from './selection-strategies/selection-strategies-types'
18
19 /**
20 * Enumeration of pool types.
21 */
22 export const PoolTypes = Object.freeze({
23 /**
24 * Fixed pool type.
25 */
26 fixed: 'fixed',
27 /**
28 * Dynamic pool type.
29 */
30 dynamic: 'dynamic'
31 } as const)
32
33 /**
34 * Pool type.
35 */
36 export type PoolType = keyof typeof PoolTypes
37
38 /**
39 * Enumeration of pool events.
40 */
41 export const PoolEvents = Object.freeze({
42 ready: 'ready',
43 busy: 'busy',
44 full: 'full',
45 destroy: 'destroy',
46 error: 'error',
47 taskError: 'taskError',
48 backPressure: 'backPressure'
49 } as const)
50
51 /**
52 * Pool event.
53 */
54 export type PoolEvent = keyof typeof PoolEvents
55
56 /**
57 * Pool information.
58 */
59 export interface PoolInfo {
60 readonly version: string
61 readonly type: PoolType
62 readonly worker: WorkerType
63 readonly started: boolean
64 readonly ready: boolean
65 readonly strategy: WorkerChoiceStrategy
66 readonly minSize: number
67 readonly maxSize: number
68 /** Pool utilization. */
69 readonly utilization?: number
70 /** Pool total worker nodes. */
71 readonly workerNodes: number
72 /** Pool idle worker nodes. */
73 readonly idleWorkerNodes: number
74 /** Pool busy worker nodes. */
75 readonly busyWorkerNodes: number
76 readonly executedTasks: number
77 readonly executingTasks: number
78 readonly queuedTasks?: number
79 readonly maxQueuedTasks?: number
80 readonly backPressure?: boolean
81 readonly stolenTasks?: number
82 readonly failedTasks: number
83 readonly runTime?: {
84 readonly minimum: number
85 readonly maximum: number
86 readonly average?: number
87 readonly median?: number
88 }
89 readonly waitTime?: {
90 readonly minimum: number
91 readonly maximum: number
92 readonly average?: number
93 readonly median?: number
94 }
95 }
96
97 /**
98 * Worker node tasks queue options.
99 */
100 export interface TasksQueueOptions {
101 /**
102 * Maximum tasks queue size per worker node flagging it as back pressured.
103 *
104 * @defaultValue (pool maximum size)^2
105 */
106 readonly size?: number
107 /**
108 * Maximum number of tasks that can be executed concurrently on a worker node.
109 *
110 * @defaultValue 1
111 */
112 readonly concurrency?: number
113 /**
114 * Whether to enable task stealing on idle.
115 *
116 * @defaultValue true
117 */
118 readonly taskStealing?: boolean
119 /**
120 * Whether to enable tasks stealing under back pressure.
121 *
122 * @defaultValue true
123 */
124 readonly tasksStealingOnBackPressure?: boolean
125 /**
126 * Queued tasks finished timeout in milliseconds at worker node termination.
127 *
128 * @defaultValue 2000
129 */
130 readonly tasksFinishedTimeout?: number
131 }
132
133 /**
134 * Options for a poolifier pool.
135 *
136 * @typeParam Worker - Type of worker.
137 */
138 export interface PoolOptions<Worker extends IWorker> {
139 /**
140 * A function that will listen for online event on each worker.
141 *
142 * @defaultValue `() => {}`
143 */
144 onlineHandler?: OnlineHandler<Worker>
145 /**
146 * A function that will listen for message event on each worker.
147 *
148 * @defaultValue `() => {}`
149 */
150 messageHandler?: MessageHandler<Worker>
151 /**
152 * A function that will listen for error event on each worker.
153 *
154 * @defaultValue `() => {}`
155 */
156 errorHandler?: ErrorHandler<Worker>
157 /**
158 * A function that will listen for exit event on each worker.
159 *
160 * @defaultValue `() => {}`
161 */
162 exitHandler?: ExitHandler<Worker>
163 /**
164 * Whether to start the minimum number of workers at pool initialization.
165 *
166 * @defaultValue true
167 */
168 startWorkers?: boolean
169 /**
170 * The worker choice strategy to use in this pool.
171 *
172 * @defaultValue WorkerChoiceStrategies.ROUND_ROBIN
173 */
174 workerChoiceStrategy?: WorkerChoiceStrategy
175 /**
176 * The worker choice strategy options.
177 */
178 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
179 /**
180 * Restart worker on error.
181 */
182 restartWorkerOnError?: boolean
183 /**
184 * Pool events integrated with async resource emission.
185 *
186 * @defaultValue true
187 */
188 enableEvents?: boolean
189 /**
190 * Pool worker node tasks queue.
191 *
192 * @defaultValue false
193 */
194 enableTasksQueue?: boolean
195 /**
196 * Pool worker node tasks queue options.
197 */
198 tasksQueueOptions?: TasksQueueOptions
199 /**
200 * Worker options.
201 *
202 * @see https://nodejs.org/api/worker_threads.html#new-workerfilename-options
203 */
204 workerOptions?: WorkerOptions
205 /**
206 * Key/value pairs to add to worker process environment.
207 *
208 * @see https://nodejs.org/api/cluster.html#cluster_cluster_fork_env
209 */
210 env?: Record<string, unknown>
211 /**
212 * Cluster settings.
213 *
214 * @see https://nodejs.org/api/cluster.html#cluster_cluster_settings
215 */
216 settings?: ClusterSettings
217 }
218
219 /**
220 * Contract definition for a poolifier pool.
221 *
222 * @typeParam Worker - Type of worker which manages this pool.
223 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
224 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
225 */
226 export interface IPool<
227 Worker extends IWorker,
228 Data = unknown,
229 Response = unknown
230 > {
231 /**
232 * Pool information.
233 */
234 readonly info: PoolInfo
235 /**
236 * Pool worker nodes.
237 *
238 * @internal
239 */
240 readonly workerNodes: Array<IWorkerNode<Worker, Data>>
241 /**
242 * Whether the worker node has back pressure (i.e. its tasks queue is full).
243 *
244 * @param workerNodeKey - The worker node key.
245 * @returns `true` if the worker node has back pressure, `false` otherwise.
246 * @internal
247 */
248 readonly hasWorkerNodeBackPressure: (workerNodeKey: number) => boolean
249 /**
250 * Event emitter integrated with async resource on which events can be listened to.
251 * The async tracking tooling identifier is `poolifier:<PoolType>-<WorkerType>-pool`.
252 *
253 * Events that can currently be listened to:
254 *
255 * - `'ready'`: Emitted when the number of workers created in the pool has reached the minimum size expected and are ready.
256 * - `'busy'`: Emitted when the number of workers created in the pool has reached the maximum size expected and are executing concurrently their tasks quota.
257 * - `'full'`: Emitted when the pool is dynamic and the number of workers created has reached the maximum size expected.
258 * - `'destroy'`: Emitted when the pool is destroyed.
259 * - `'error'`: Emitted when an uncaught error occurs.
260 * - `'taskError'`: Emitted when an error occurs while executing a task.
261 * - `'backPressure'`: Emitted when all worker nodes have back pressure (i.e. their tasks queue is full: queue size \>= maximum queue size).
262 */
263 readonly emitter?: EventEmitterAsyncResource
264 /**
265 * Executes the specified function in the worker constructor with the task data input parameter.
266 *
267 * @param data - The optional task input data for the specified task function. This can only be structured-cloneable data.
268 * @param name - The optional name of the task function to execute. If not specified, the default task function will be executed.
269 * @param transferList - An optional array of transferable objects to transfer ownership of. Ownership of the transferred objects is given to the chosen pool's worker_threads worker and they should not be used in the main thread afterwards.
270 * @returns Promise that will be fulfilled when the task is completed.
271 */
272 readonly execute: (
273 data?: Data,
274 name?: string,
275 transferList?: TransferListItem[]
276 ) => Promise<Response>
277 /**
278 * Starts the minimum number of workers in this pool.
279 */
280 readonly start: () => void
281 /**
282 * Terminates all workers in this pool.
283 */
284 readonly destroy: () => Promise<void>
285 /**
286 * Whether the specified task function exists in this pool.
287 *
288 * @param name - The name of the task function.
289 * @returns `true` if the task function exists, `false` otherwise.
290 */
291 readonly hasTaskFunction: (name: string) => boolean
292 /**
293 * Adds a task function to this pool.
294 * If a task function with the same name already exists, it will be overwritten.
295 *
296 * @param name - The name of the task function.
297 * @param fn - The task function.
298 * @returns `true` if the task function was added, `false` otherwise.
299 * @throws {@link https://nodejs.org/api/errors.html#class-typeerror} If the `name` parameter is not a string or an empty string.
300 * @throws {@link https://nodejs.org/api/errors.html#class-typeerror} If the `fn` parameter is not a function.
301 */
302 readonly addTaskFunction: (
303 name: string,
304 fn: TaskFunction<Data, Response>
305 ) => Promise<boolean>
306 /**
307 * Removes a task function from this pool.
308 *
309 * @param name - The name of the task function.
310 * @returns `true` if the task function was removed, `false` otherwise.
311 */
312 readonly removeTaskFunction: (name: string) => Promise<boolean>
313 /**
314 * Lists the names of task function available in this pool.
315 *
316 * @returns The names of task function available in this pool.
317 */
318 readonly listTaskFunctionNames: () => string[]
319 /**
320 * Sets the default task function in this pool.
321 *
322 * @param name - The name of the task function.
323 * @returns `true` if the default task function was set, `false` otherwise.
324 */
325 readonly setDefaultTaskFunction: (name: string) => Promise<boolean>
326 /**
327 * Sets the worker choice strategy in this pool.
328 *
329 * @param workerChoiceStrategy - The worker choice strategy.
330 * @param workerChoiceStrategyOptions - The worker choice strategy options.
331 */
332 readonly setWorkerChoiceStrategy: (
333 workerChoiceStrategy: WorkerChoiceStrategy,
334 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
335 ) => void
336 /**
337 * Sets the worker choice strategy options in this pool.
338 *
339 * @param workerChoiceStrategyOptions - The worker choice strategy options.
340 */
341 readonly setWorkerChoiceStrategyOptions: (
342 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
343 ) => void
344 /**
345 * Enables/disables the worker node tasks queue in this pool.
346 *
347 * @param enable - Whether to enable or disable the worker node tasks queue.
348 * @param tasksQueueOptions - The worker node tasks queue options.
349 */
350 readonly enableTasksQueue: (
351 enable: boolean,
352 tasksQueueOptions?: TasksQueueOptions
353 ) => void
354 /**
355 * Sets the worker node tasks queue options in this pool.
356 *
357 * @param tasksQueueOptions - The worker node tasks queue options.
358 */
359 readonly setTasksQueueOptions: (tasksQueueOptions: TasksQueueOptions) => void
360 }