ee54c6acc3f17216361f3c20b55b61f74376f260
[poolifier.git] / src / pools / worker.ts
1 import type { EventEmitter } from 'node:events'
2 import type { MessageChannel, WorkerOptions } from 'node:worker_threads'
3
4 import type { CircularArray } from '../circular-array.js'
5 import type { Task } from '../utility-types.js'
6
7 /**
8 * Callback invoked when the worker has started successfully.
9 *
10 * @typeParam Worker - Type of worker.
11 */
12 export type OnlineHandler<Worker extends IWorker> = (this: Worker) => void
13
14 /**
15 * Callback invoked if the worker has received a message.
16 *
17 * @typeParam Worker - Type of worker.
18 */
19 export type MessageHandler<Worker extends IWorker> = (
20 this: Worker,
21 message: unknown
22 ) => void
23
24 /**
25 * Callback invoked if the worker raised an error.
26 *
27 * @typeParam Worker - Type of worker.
28 */
29 export type ErrorHandler<Worker extends IWorker> = (
30 this: Worker,
31 error: Error
32 ) => void
33
34 /**
35 * Callback invoked when the worker exits successfully.
36 *
37 * @typeParam Worker - Type of worker.
38 */
39 export type ExitHandler<Worker extends IWorker> = (
40 this: Worker,
41 exitCode: number
42 ) => void
43
44 /**
45 * Worker event handler.
46 *
47 * @typeParam Worker - Type of worker.
48 */
49 export type EventHandler<Worker extends IWorker> =
50 | OnlineHandler<Worker>
51 | MessageHandler<Worker>
52 | ErrorHandler<Worker>
53 | ExitHandler<Worker>
54
55 /**
56 * Measurement statistics.
57 *
58 * @internal
59 */
60 export interface MeasurementStatistics {
61 /**
62 * Measurement aggregate.
63 */
64 aggregate?: number
65 /**
66 * Measurement minimum.
67 */
68 minimum?: number
69 /**
70 * Measurement maximum.
71 */
72 maximum?: number
73 /**
74 * Measurement average.
75 */
76 average?: number
77 /**
78 * Measurement median.
79 */
80 median?: number
81 /**
82 * Measurement history.
83 */
84 readonly history: CircularArray<number>
85 }
86
87 /**
88 * Event loop utilization measurement statistics.
89 *
90 * @internal
91 */
92 export interface EventLoopUtilizationMeasurementStatistics {
93 readonly idle: MeasurementStatistics
94 readonly active: MeasurementStatistics
95 utilization?: number
96 }
97
98 /**
99 * Task statistics.
100 *
101 * @internal
102 */
103 export interface TaskStatistics {
104 /**
105 * Number of executed tasks.
106 */
107 executed: number
108 /**
109 * Number of executing tasks.
110 */
111 executing: number
112 /**
113 * Number of queued tasks.
114 */
115 readonly queued: number
116 /**
117 * Maximum number of queued tasks.
118 */
119 readonly maxQueued?: number
120 /**
121 * Number of sequentially stolen tasks.
122 */
123 sequentiallyStolen: number
124 /**
125 * Number of stolen tasks.
126 */
127 stolen: number
128 /**
129 * Number of failed tasks.
130 */
131 failed: number
132 }
133
134 /**
135 * Enumeration of worker types.
136 */
137 export const WorkerTypes = Object.freeze({
138 thread: 'thread',
139 cluster: 'cluster'
140 } as const)
141
142 /**
143 * Worker type.
144 */
145 export type WorkerType = keyof typeof WorkerTypes
146
147 /**
148 * Worker information.
149 *
150 * @internal
151 */
152 export interface WorkerInfo {
153 /**
154 * Worker id.
155 */
156 readonly id: number | undefined
157 /**
158 * Worker type.
159 */
160 readonly type: WorkerType
161 /**
162 * Dynamic flag.
163 */
164 dynamic: boolean
165 /**
166 * Ready flag.
167 */
168 ready: boolean
169 /**
170 * Stealing flag.
171 * This flag is set to `true` when worker node is stealing tasks from another worker node.
172 */
173 stealing: boolean
174 /**
175 * Task function names.
176 */
177 taskFunctionNames?: string[]
178 }
179
180 /**
181 * Worker usage statistics.
182 *
183 * @internal
184 */
185 export interface WorkerUsage {
186 /**
187 * Tasks statistics.
188 */
189 readonly tasks: TaskStatistics
190 /**
191 * Tasks runtime statistics.
192 */
193 readonly runTime: MeasurementStatistics
194 /**
195 * Tasks wait time statistics.
196 */
197 readonly waitTime: MeasurementStatistics
198 /**
199 * Tasks event loop utilization statistics.
200 */
201 readonly elu: EventLoopUtilizationMeasurementStatistics
202 }
203
204 /**
205 * Worker choice strategy data.
206 *
207 * @internal
208 */
209 export interface StrategyData {
210 virtualTaskEndTimestamp?: number
211 }
212
213 /**
214 * Worker interface.
215 */
216 export interface IWorker extends EventEmitter {
217 /**
218 * Cluster worker id.
219 */
220 readonly id?: number
221 /**
222 * Worker thread worker id.
223 */
224 readonly threadId?: number
225 /**
226 * Registers an event handler.
227 *
228 * @param event - The event.
229 * @param handler - The event handler.
230 */
231 readonly on: (event: string, handler: EventHandler<this>) => this
232 /**
233 * Registers once an event handler.
234 *
235 * @param event - The event.
236 * @param handler - The event handler.
237 */
238 readonly once: (event: string, handler: EventHandler<this>) => this
239 /**
240 * Calling `unref()` on a worker allows the thread to exit if this is the only
241 * active handle in the event system. If the worker is already `unref()`ed calling`unref()` again has no effect.
242 * @since v10.5.0
243 */
244 readonly unref?: () => void
245 /**
246 * Stop all JavaScript execution in the worker thread as soon as possible.
247 * Returns a Promise for the exit code that is fulfilled when the `'exit' event` is emitted.
248 */
249 readonly terminate?: () => Promise<number>
250 /**
251 * Cluster worker disconnect.
252 */
253 readonly disconnect?: () => void
254 /**
255 * Cluster worker kill.
256 */
257 readonly kill?: (signal?: string) => void
258 }
259
260 /**
261 * Worker node options.
262 *
263 * @internal
264 */
265 export interface WorkerNodeOptions {
266 workerOptions?: WorkerOptions
267 env?: Record<string, unknown>
268 tasksQueueBackPressureSize: number | undefined
269 }
270
271 /**
272 * Worker node interface.
273 *
274 * @typeParam Worker - Type of worker.
275 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
276 * @internal
277 */
278 export interface IWorkerNode<Worker extends IWorker, Data = unknown>
279 extends EventEmitter {
280 /**
281 * Worker.
282 */
283 readonly worker: Worker
284 /**
285 * Worker info.
286 */
287 readonly info: WorkerInfo
288 /**
289 * Worker usage statistics.
290 */
291 readonly usage: WorkerUsage
292 /**
293 * Worker choice strategy data.
294 * This is used to store data that are specific to the worker choice strategy.
295 */
296 strategyData?: StrategyData
297 /**
298 * Message channel (worker thread only).
299 */
300 readonly messageChannel?: MessageChannel
301 /**
302 * Tasks queue back pressure size.
303 * This is the number of tasks that can be enqueued before the worker node has back pressure.
304 */
305 tasksQueueBackPressureSize: number
306 /**
307 * Tasks queue size.
308 *
309 * @returns The tasks queue size.
310 */
311 readonly tasksQueueSize: () => number
312 /**
313 * Enqueue task.
314 *
315 * @param task - The task to queue.
316 * @returns The tasks queue size.
317 */
318 readonly enqueueTask: (task: Task<Data>) => number
319 /**
320 * Prepends a task to the tasks queue.
321 *
322 * @param task - The task to prepend.
323 * @returns The tasks queue size.
324 */
325 readonly unshiftTask: (task: Task<Data>) => number
326 /**
327 * Dequeue task.
328 *
329 * @returns The dequeued task.
330 */
331 readonly dequeueTask: () => Task<Data> | undefined
332 /**
333 * Pops a task from the tasks queue.
334 *
335 * @returns The popped task.
336 */
337 readonly popTask: () => Task<Data> | undefined
338 /**
339 * Clears tasks queue.
340 */
341 readonly clearTasksQueue: () => void
342 /**
343 * Whether the worker node has back pressure (i.e. its tasks queue is full).
344 *
345 * @returns `true` if the worker node has back pressure, `false` otherwise.
346 */
347 readonly hasBackPressure: () => boolean
348 /**
349 * Resets usage statistics.
350 */
351 readonly resetUsage: () => void
352 /**
353 * Terminates the worker node.
354 */
355 readonly terminate: () => Promise<void>
356 /**
357 * Registers a worker event handler.
358 *
359 * @param event - The event.
360 * @param handler - The event handler.
361 */
362 readonly registerWorkerEventHandler: (
363 event: string,
364 handler: EventHandler<Worker>
365 ) => void
366 /**
367 * Registers once a worker event handler.
368 *
369 * @param event - The event.
370 * @param handler - The event handler.
371 */
372 readonly registerOnceWorkerEventHandler: (
373 event: string,
374 handler: EventHandler<Worker>
375 ) => void
376 /**
377 * Gets task function worker usage statistics.
378 *
379 * @param name - The task function name.
380 * @returns The task function worker usage statistics if the task function worker usage statistics are initialized, `undefined` otherwise.
381 */
382 readonly getTaskFunctionWorkerUsage: (name: string) => WorkerUsage | undefined
383 /**
384 * Deletes task function worker usage statistics.
385 *
386 * @param name - The task function name.
387 * @returns `true` if the task function worker usage statistics were deleted, `false` otherwise.
388 */
389 readonly deleteTaskFunctionWorkerUsage: (name: string) => boolean
390 }
391
392 /**
393 * Worker node event detail.
394 *
395 * @internal
396 */
397 export interface WorkerNodeEventDetail {
398 workerId?: number
399 workerNodeKey?: number
400 }