fix: validate worker node event to wait
[poolifier.git] / src / pools / pool.ts
1 import type { ClusterSettings } from 'node:cluster'
2 import type { EventEmitterAsyncResource } from 'node:events'
3 import type { TransferListItem, WorkerOptions } from 'node:worker_threads'
4
5 import type { TaskFunctionProperties } from '../utility-types.js'
6 import type {
7 TaskFunction,
8 TaskFunctionObject
9 } from '../worker/task-functions.js'
10 import type {
11 WorkerChoiceStrategy,
12 WorkerChoiceStrategyOptions
13 } from './selection-strategies/selection-strategies-types.js'
14 import type {
15 ErrorHandler,
16 ExitHandler,
17 IWorker,
18 IWorkerNode,
19 MessageHandler,
20 OnlineHandler,
21 WorkerType
22 } from './worker.js'
23
24 /**
25 * Enumeration of pool types.
26 */
27 export const PoolTypes: Readonly<{
28 fixed: 'fixed'
29 dynamic: 'dynamic'
30 }> = Object.freeze({
31 /**
32 * Fixed pool type.
33 */
34 fixed: 'fixed',
35 /**
36 * Dynamic pool type.
37 */
38 dynamic: 'dynamic'
39 } as const)
40
41 /**
42 * Pool type.
43 */
44 export type PoolType = keyof typeof PoolTypes
45
46 /**
47 * Enumeration of pool events.
48 */
49 export const PoolEvents: Readonly<{
50 ready: 'ready'
51 busy: 'busy'
52 full: 'full'
53 empty: 'empty'
54 destroy: 'destroy'
55 error: 'error'
56 taskError: 'taskError'
57 backPressure: 'backPressure'
58 }> = Object.freeze({
59 ready: 'ready',
60 busy: 'busy',
61 full: 'full',
62 empty: 'empty',
63 destroy: 'destroy',
64 error: 'error',
65 taskError: 'taskError',
66 backPressure: 'backPressure'
67 } as const)
68
69 /**
70 * Pool event.
71 */
72 export type PoolEvent = keyof typeof PoolEvents
73
74 /**
75 * Pool information.
76 */
77 export interface PoolInfo {
78 readonly version: string
79 readonly type: PoolType
80 readonly worker: WorkerType
81 readonly started: boolean
82 readonly ready: boolean
83 readonly defaultStrategy: WorkerChoiceStrategy
84 readonly strategyRetries: number
85 readonly minSize: number
86 readonly maxSize: number
87 /** Pool utilization. */
88 readonly utilization?: number
89 /** Pool total worker nodes. */
90 readonly workerNodes: number
91 /** Pool stealing worker nodes. */
92 readonly stealingWorkerNodes?: number
93 /** Pool idle worker nodes. */
94 readonly idleWorkerNodes: number
95 /** Pool busy worker nodes. */
96 readonly busyWorkerNodes: number
97 readonly executedTasks: number
98 readonly executingTasks: number
99 readonly queuedTasks?: number
100 readonly maxQueuedTasks?: number
101 readonly backPressure?: boolean
102 readonly stolenTasks?: number
103 readonly failedTasks: number
104 readonly runTime?: {
105 readonly minimum: number
106 readonly maximum: number
107 readonly average?: number
108 readonly median?: number
109 }
110 readonly waitTime?: {
111 readonly minimum: number
112 readonly maximum: number
113 readonly average?: number
114 readonly median?: number
115 }
116 readonly elu?: {
117 idle: {
118 readonly minimum: number
119 readonly maximum: number
120 readonly average?: number
121 readonly median?: number
122 }
123 active: {
124 readonly minimum: number
125 readonly maximum: number
126 readonly average?: number
127 readonly median?: number
128 }
129 }
130 }
131
132 /**
133 * Worker node tasks queue options.
134 */
135 export interface TasksQueueOptions {
136 /**
137 * Maximum tasks queue size per worker node flagging it as back pressured.
138 *
139 * @defaultValue (pool maximum size)^2
140 */
141 readonly size?: number
142 /**
143 * Maximum number of tasks that can be executed concurrently on a worker node.
144 *
145 * @defaultValue 1
146 */
147 readonly concurrency?: number
148 /**
149 * Whether to enable task stealing on idle.
150 *
151 * @defaultValue true
152 */
153 readonly taskStealing?: boolean
154 /**
155 * Whether to enable tasks stealing under back pressure.
156 *
157 * @defaultValue false
158 */
159 readonly tasksStealingOnBackPressure?: boolean
160 /**
161 * Queued tasks finished timeout in milliseconds at worker node termination.
162 *
163 * @defaultValue 2000
164 */
165 readonly tasksFinishedTimeout?: number
166 }
167
168 /**
169 * Options for a poolifier pool.
170 *
171 * @typeParam Worker - Type of worker.
172 */
173 export interface PoolOptions<Worker extends IWorker> {
174 /**
175 * A function that will listen for online event on each worker.
176 *
177 * @defaultValue `() => {}`
178 */
179 onlineHandler?: OnlineHandler<Worker>
180 /**
181 * A function that will listen for message event on each worker.
182 *
183 * @defaultValue `() => {}`
184 */
185 messageHandler?: MessageHandler<Worker>
186 /**
187 * A function that will listen for error event on each worker.
188 *
189 * @defaultValue `() => {}`
190 */
191 errorHandler?: ErrorHandler<Worker>
192 /**
193 * A function that will listen for exit event on each worker.
194 *
195 * @defaultValue `() => {}`
196 */
197 exitHandler?: ExitHandler<Worker>
198 /**
199 * Whether to start the minimum number of workers at pool initialization.
200 *
201 * @defaultValue true
202 */
203 startWorkers?: boolean
204 /**
205 * The default worker choice strategy to use in this pool.
206 *
207 * @defaultValue WorkerChoiceStrategies.ROUND_ROBIN
208 */
209 workerChoiceStrategy?: WorkerChoiceStrategy
210 /**
211 * The worker choice strategy options.
212 */
213 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
214 /**
215 * Restart worker on error.
216 */
217 restartWorkerOnError?: boolean
218 /**
219 * Pool events integrated with async resource emission.
220 *
221 * @defaultValue true
222 */
223 enableEvents?: boolean
224 /**
225 * Pool worker node tasks queue.
226 *
227 * @defaultValue false
228 */
229 enableTasksQueue?: boolean
230 /**
231 * Pool worker node tasks queue options.
232 */
233 tasksQueueOptions?: TasksQueueOptions
234 /**
235 * Worker options.
236 *
237 * @see https://nodejs.org/api/worker_threads.html#new-workerfilename-options
238 */
239 workerOptions?: WorkerOptions
240 /**
241 * Key/value pairs to add to worker process environment.
242 *
243 * @see https://nodejs.org/api/cluster.html#cluster_cluster_fork_env
244 */
245 env?: Record<string, unknown>
246 /**
247 * Cluster settings.
248 *
249 * @see https://nodejs.org/api/cluster.html#cluster_cluster_settings
250 */
251 settings?: ClusterSettings
252 }
253
254 /**
255 * Contract definition for a poolifier pool.
256 *
257 * @typeParam Worker - Type of worker which manages this pool.
258 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
259 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
260 */
261 export interface IPool<
262 Worker extends IWorker,
263 Data = unknown,
264 Response = unknown
265 > {
266 /**
267 * Pool information.
268 */
269 readonly info: PoolInfo
270 /**
271 * Pool worker nodes.
272 *
273 * @internal
274 */
275 readonly workerNodes: Array<IWorkerNode<Worker, Data>>
276 /**
277 * Pool event emitter integrated with async resource.
278 * The async tracking tooling identifier is `poolifier:<PoolType>-<WorkerType>-pool`.
279 *
280 * Events that can currently be listened to:
281 *
282 * - `'ready'`: Emitted when the number of workers created in the pool has reached the minimum size expected and are ready. If the pool is dynamic with a minimum number of workers is set to zero, this event is emitted when at least one dynamic worker is ready.
283 * - `'busy'`: Emitted when the number of workers created in the pool has reached the maximum size expected and are executing concurrently their tasks quota.
284 * - `'full'`: Emitted when the pool is dynamic and the number of workers created has reached the maximum size expected.
285 * - `'empty'`: Emitted when the pool is dynamic with a minimum number of workers set to zero and the number of workers has reached the minimum size expected.
286 * - `'destroy'`: Emitted when the pool is destroyed.
287 * - `'error'`: Emitted when an uncaught error occurs.
288 * - `'taskError'`: Emitted when an error occurs while executing a task.
289 * - `'backPressure'`: Emitted when all worker nodes have back pressure (i.e. their tasks queue is full: queue size \>= maximum queue size).
290 */
291 readonly emitter?: EventEmitterAsyncResource
292 /**
293 * Executes the specified function in the worker constructor with the task data input parameter.
294 *
295 * @param data - The optional task input data for the specified task function. This can only be structured-cloneable data.
296 * @param name - The optional name of the task function to execute. If not specified, the default task function will be executed.
297 * @param transferList - An optional array of transferable objects to transfer ownership of. Ownership of the transferred objects is given to the chosen pool's worker_threads worker and they should not be used in the main thread afterwards.
298 * @returns Promise that will be fulfilled when the task is completed.
299 */
300 readonly execute: (
301 data?: Data,
302 name?: string,
303 transferList?: readonly TransferListItem[]
304 ) => Promise<Response>
305 /**
306 * Starts the minimum number of workers in this pool.
307 */
308 readonly start: () => void
309 /**
310 * Terminates all workers in this pool.
311 */
312 readonly destroy: () => Promise<void>
313 /**
314 * Whether the specified task function exists in this pool.
315 *
316 * @param name - The name of the task function.
317 * @returns `true` if the task function exists, `false` otherwise.
318 */
319 readonly hasTaskFunction: (name: string) => boolean
320 /**
321 * Adds a task function to this pool.
322 * If a task function with the same name already exists, it will be overwritten.
323 *
324 * @param name - The name of the task function.
325 * @param fn - The task function.
326 * @returns `true` if the task function was added, `false` otherwise.
327 * @throws {@link https://nodejs.org/api/errors.html#class-typeerror} If the `name` parameter is not a string or an empty string.
328 * @throws {@link https://nodejs.org/api/errors.html#class-typeerror} If the `fn` parameter is not a function or task function object.
329 */
330 readonly addTaskFunction: (
331 name: string,
332 fn: TaskFunction<Data, Response> | TaskFunctionObject<Data, Response>
333 ) => Promise<boolean>
334 /**
335 * Removes a task function from this pool.
336 *
337 * @param name - The name of the task function.
338 * @returns `true` if the task function was removed, `false` otherwise.
339 */
340 readonly removeTaskFunction: (name: string) => Promise<boolean>
341 /**
342 * Lists the properties of task functions available in this pool.
343 *
344 * @returns The properties of task functions available in this pool.
345 */
346 readonly listTaskFunctionsProperties: () => TaskFunctionProperties[]
347 /**
348 * Sets the default task function in this pool.
349 *
350 * @param name - The name of the task function.
351 * @returns `true` if the default task function was set, `false` otherwise.
352 */
353 readonly setDefaultTaskFunction: (name: string) => Promise<boolean>
354 /**
355 * Sets the default worker choice strategy in this pool.
356 *
357 * @param workerChoiceStrategy - The default worker choice strategy.
358 * @param workerChoiceStrategyOptions - The worker choice strategy options.
359 */
360 readonly setWorkerChoiceStrategy: (
361 workerChoiceStrategy: WorkerChoiceStrategy,
362 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
363 ) => void
364 /**
365 * Sets the worker choice strategy options in this pool.
366 *
367 * @param workerChoiceStrategyOptions - The worker choice strategy options.
368 * @returns `true` if the worker choice strategy options were set, `false` otherwise.
369 */
370 readonly setWorkerChoiceStrategyOptions: (
371 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
372 ) => boolean
373 /**
374 * Enables/disables the worker node tasks queue in this pool.
375 *
376 * @param enable - Whether to enable or disable the worker node tasks queue.
377 * @param tasksQueueOptions - The worker node tasks queue options.
378 */
379 readonly enableTasksQueue: (
380 enable: boolean,
381 tasksQueueOptions?: TasksQueueOptions
382 ) => void
383 /**
384 * Sets the worker node tasks queue options in this pool.
385 *
386 * @param tasksQueueOptions - The worker node tasks queue options.
387 */
388 readonly setTasksQueueOptions: (tasksQueueOptions: TasksQueueOptions) => void
389 }