c1af1cf79c99d05e7c803e7d072529301b6d678a
[poolifier.git] / src / pools / worker.ts
1 import type { EventEmitter } from 'node:events'
2 import type { MessageChannel, WorkerOptions } from 'node:worker_threads'
3
4 import type { CircularBuffer } from '../circular-buffer.js'
5 import type { Task, TaskFunctionProperties } from '../utility-types.js'
6
7 /**
8 * Callback invoked when the worker has started successfully.
9 *
10 * @typeParam Worker - Type of worker.
11 */
12 export type OnlineHandler<Worker extends IWorker> = (this: Worker) => void
13
14 /**
15 * Callback invoked if the worker has received a message.
16 *
17 * @typeParam Worker - Type of worker.
18 */
19 export type MessageHandler<Worker extends IWorker> = (
20 this: Worker,
21 message: unknown
22 ) => void
23
24 /**
25 * Callback invoked if the worker raised an error.
26 *
27 * @typeParam Worker - Type of worker.
28 */
29 export type ErrorHandler<Worker extends IWorker> = (
30 this: Worker,
31 error: Error
32 ) => void
33
34 /**
35 * Callback invoked when the worker exits successfully.
36 *
37 * @typeParam Worker - Type of worker.
38 */
39 export type ExitHandler<Worker extends IWorker> = (
40 this: Worker,
41 exitCode: number
42 ) => void
43
44 /**
45 * Worker event handler.
46 *
47 * @typeParam Worker - Type of worker.
48 */
49 export type EventHandler<Worker extends IWorker> =
50 | OnlineHandler<Worker>
51 | MessageHandler<Worker>
52 | ErrorHandler<Worker>
53 | ExitHandler<Worker>
54
55 /**
56 * Measurement history size.
57 */
58 export const MeasurementHistorySize = 386
59
60 /**
61 * Measurement statistics.
62 *
63 * @internal
64 */
65 export interface MeasurementStatistics {
66 /**
67 * Measurement aggregate.
68 */
69 aggregate?: number
70 /**
71 * Measurement minimum.
72 */
73 minimum?: number
74 /**
75 * Measurement maximum.
76 */
77 maximum?: number
78 /**
79 * Measurement average.
80 */
81 average?: number
82 /**
83 * Measurement median.
84 */
85 median?: number
86 /**
87 * Measurement history.
88 */
89 readonly history: CircularBuffer
90 }
91
92 /**
93 * Event loop utilization measurement statistics.
94 *
95 * @internal
96 */
97 export interface EventLoopUtilizationMeasurementStatistics {
98 readonly idle: MeasurementStatistics
99 readonly active: MeasurementStatistics
100 utilization?: number
101 }
102
103 /**
104 * Task statistics.
105 *
106 * @internal
107 */
108 export interface TaskStatistics {
109 /**
110 * Number of executed tasks.
111 */
112 executed: number
113 /**
114 * Number of executing tasks.
115 */
116 executing: number
117 /**
118 * Number of queued tasks.
119 */
120 readonly queued: number
121 /**
122 * Maximum number of queued tasks.
123 */
124 readonly maxQueued?: number
125 /**
126 * Number of sequentially stolen tasks.
127 */
128 sequentiallyStolen: number
129 /**
130 * Number of stolen tasks.
131 */
132 stolen: number
133 /**
134 * Number of failed tasks.
135 */
136 failed: number
137 }
138
139 /**
140 * Enumeration of worker types.
141 */
142 export const WorkerTypes: Readonly<{ thread: 'thread', cluster: 'cluster' }> =
143 Object.freeze({
144 thread: 'thread',
145 cluster: 'cluster'
146 } as const)
147
148 /**
149 * Worker type.
150 */
151 export type WorkerType = keyof typeof WorkerTypes
152
153 /**
154 * Worker information.
155 *
156 * @internal
157 */
158 export interface WorkerInfo {
159 /**
160 * Worker id.
161 */
162 readonly id: number | undefined
163 /**
164 * Worker type.
165 */
166 readonly type: WorkerType
167 /**
168 * Dynamic flag.
169 */
170 dynamic: boolean
171 /**
172 * Ready flag.
173 */
174 ready: boolean
175 /**
176 * Stealing flag.
177 * This flag is set to `true` when worker node is stealing tasks from another worker node.
178 */
179 stealing: boolean
180 /**
181 * Back pressure flag.
182 * This flag is set to `true` when worker node tasks queue has back pressure.
183 */
184 backPressure: boolean
185 /**
186 * Task functions properties.
187 */
188 taskFunctionsProperties?: TaskFunctionProperties[]
189 }
190
191 /**
192 * Worker usage statistics.
193 *
194 * @internal
195 */
196 export interface WorkerUsage {
197 /**
198 * Tasks statistics.
199 */
200 readonly tasks: TaskStatistics
201 /**
202 * Tasks runtime statistics.
203 */
204 readonly runTime: MeasurementStatistics
205 /**
206 * Tasks wait time statistics.
207 */
208 readonly waitTime: MeasurementStatistics
209 /**
210 * Tasks event loop utilization statistics.
211 */
212 readonly elu: EventLoopUtilizationMeasurementStatistics
213 }
214
215 /**
216 * Worker choice strategy data.
217 *
218 * @internal
219 */
220 export interface StrategyData {
221 virtualTaskEndTimestamp?: number
222 }
223
224 /**
225 * Worker interface.
226 */
227 export interface IWorker extends EventEmitter {
228 /**
229 * Cluster worker id.
230 */
231 readonly id?: number
232 /**
233 * Worker thread worker id.
234 */
235 readonly threadId?: number
236 /**
237 * Registers an event handler.
238 *
239 * @param event - The event.
240 * @param handler - The event handler.
241 */
242 readonly on: (event: string, handler: EventHandler<this>) => this
243 /**
244 * Registers once an event handler.
245 *
246 * @param event - The event.
247 * @param handler - The event handler.
248 */
249 readonly once: (event: string, handler: EventHandler<this>) => this
250 /**
251 * Calling `unref()` on a worker allows the thread to exit if this is the only
252 * active handle in the event system. If the worker is already `unref()`ed calling`unref()` again has no effect.
253 * @since v10.5.0
254 */
255 readonly unref?: () => void
256 /**
257 * Stop all JavaScript execution in the worker thread as soon as possible.
258 * Returns a Promise for the exit code that is fulfilled when the `'exit' event` is emitted.
259 */
260 readonly terminate?: () => Promise<number>
261 /**
262 * Cluster worker disconnect.
263 */
264 readonly disconnect?: () => void
265 /**
266 * Cluster worker kill.
267 */
268 readonly kill?: (signal?: string) => void
269 }
270
271 /**
272 * Worker node options.
273 *
274 * @internal
275 */
276 export interface WorkerNodeOptions {
277 workerOptions?: WorkerOptions
278 env?: Record<string, unknown>
279 tasksQueueBackPressureSize: number | undefined
280 tasksQueueBucketSize: number | undefined
281 tasksQueuePriority: boolean | undefined
282 }
283
284 /**
285 * Worker node interface.
286 *
287 * @typeParam Worker - Type of worker.
288 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
289 * @internal
290 */
291 export interface IWorkerNode<Worker extends IWorker, Data = unknown>
292 extends EventEmitter {
293 /**
294 * Worker.
295 */
296 readonly worker: Worker
297 /**
298 * Worker info.
299 */
300 readonly info: WorkerInfo
301 /**
302 * Worker usage statistics.
303 */
304 readonly usage: WorkerUsage
305 /**
306 * Worker choice strategy data.
307 * This is used to store data that are specific to the worker choice strategy.
308 */
309 strategyData?: StrategyData
310 /**
311 * Message channel (worker thread only).
312 */
313 readonly messageChannel?: MessageChannel
314 /**
315 * Tasks queue back pressure size.
316 * This is the number of tasks that can be enqueued before the worker node has back pressure.
317 */
318 tasksQueueBackPressureSize: number
319 /**
320 * Sets tasks queue priority.
321 *
322 * @param enablePriority - Whether to enable tasks queue priority.
323 */
324 readonly setTasksQueuePriority: (enablePriority: boolean) => void
325 /**
326 * Tasks queue size.
327 *
328 * @returns The tasks queue size.
329 */
330 readonly tasksQueueSize: () => number
331 /**
332 * Enqueue task.
333 *
334 * @param task - The task to queue.
335 * @returns The tasks queue size.
336 */
337 readonly enqueueTask: (task: Task<Data>) => number
338 /**
339 * Dequeue task.
340 *
341 * @param bucket - The prioritized bucket to dequeue from. @defaultValue 0
342 * @returns The dequeued task.
343 */
344 readonly dequeueTask: (bucket?: number) => Task<Data> | undefined
345 /**
346 * Dequeue last prioritized task.
347 *
348 * @returns The dequeued task.
349 */
350 readonly dequeueLastPrioritizedTask: () => Task<Data> | undefined
351 /**
352 * Clears tasks queue.
353 */
354 readonly clearTasksQueue: () => void
355 /**
356 * Whether the worker node has back pressure (i.e. its tasks queue is full).
357 *
358 * @returns `true` if the worker node has back pressure, `false` otherwise.
359 */
360 readonly hasBackPressure: () => boolean
361 /**
362 * Terminates the worker node.
363 */
364 readonly terminate: () => Promise<void>
365 /**
366 * Registers a worker event handler.
367 *
368 * @param event - The event.
369 * @param handler - The event handler.
370 */
371 readonly registerWorkerEventHandler: (
372 event: string,
373 handler: EventHandler<Worker>
374 ) => void
375 /**
376 * Registers once a worker event handler.
377 *
378 * @param event - The event.
379 * @param handler - The event handler.
380 */
381 readonly registerOnceWorkerEventHandler: (
382 event: string,
383 handler: EventHandler<Worker>
384 ) => void
385 /**
386 * Gets task function worker usage statistics.
387 *
388 * @param name - The task function name.
389 * @returns The task function worker usage statistics if the task function worker usage statistics are initialized, `undefined` otherwise.
390 */
391 readonly getTaskFunctionWorkerUsage: (name: string) => WorkerUsage | undefined
392 /**
393 * Deletes task function worker usage statistics.
394 *
395 * @param name - The task function name.
396 * @returns `true` if the task function worker usage statistics were deleted, `false` otherwise.
397 */
398 readonly deleteTaskFunctionWorkerUsage: (name: string) => boolean
399 }
400
401 /**
402 * Worker node event detail.
403 *
404 * @internal
405 */
406 export interface WorkerNodeEventDetail {
407 workerId?: number
408 workerNodeKey?: number
409 }