refactor: refine worker node event type
[poolifier.git] / src / pools / worker.ts
1 import type { MessageChannel, WorkerOptions } from 'node:worker_threads'
2 import type { EventEmitter } from 'node:events'
3 import type { CircularArray } from '../circular-array.js'
4 import type { Task } from '../utility-types.js'
5
6 /**
7 * Callback invoked when the worker has started successfully.
8 *
9 * @typeParam Worker - Type of worker.
10 */
11 export type OnlineHandler<Worker extends IWorker> = (this: Worker) => void
12
13 /**
14 * Callback invoked if the worker has received a message.
15 *
16 * @typeParam Worker - Type of worker.
17 */
18 export type MessageHandler<Worker extends IWorker> = (
19 this: Worker,
20 message: unknown
21 ) => void
22
23 /**
24 * Callback invoked if the worker raised an error.
25 *
26 * @typeParam Worker - Type of worker.
27 */
28 export type ErrorHandler<Worker extends IWorker> = (
29 this: Worker,
30 error: Error
31 ) => void
32
33 /**
34 * Callback invoked when the worker exits successfully.
35 *
36 * @typeParam Worker - Type of worker.
37 */
38 export type ExitHandler<Worker extends IWorker> = (
39 this: Worker,
40 exitCode: number
41 ) => void
42
43 /**
44 * Worker event handler.
45 *
46 * @typeParam Worker - Type of worker.
47 */
48 export type EventHandler<Worker extends IWorker> =
49 | OnlineHandler<Worker>
50 | MessageHandler<Worker>
51 | ErrorHandler<Worker>
52 | ExitHandler<Worker>
53
54 /**
55 * Measurement statistics.
56 *
57 * @internal
58 */
59 export interface MeasurementStatistics {
60 /**
61 * Measurement aggregate.
62 */
63 aggregate?: number
64 /**
65 * Measurement minimum.
66 */
67 minimum?: number
68 /**
69 * Measurement maximum.
70 */
71 maximum?: number
72 /**
73 * Measurement average.
74 */
75 average?: number
76 /**
77 * Measurement median.
78 */
79 median?: number
80 /**
81 * Measurement history.
82 */
83 readonly history: CircularArray<number>
84 }
85
86 /**
87 * Event loop utilization measurement statistics.
88 *
89 * @internal
90 */
91 export interface EventLoopUtilizationMeasurementStatistics {
92 readonly idle: MeasurementStatistics
93 readonly active: MeasurementStatistics
94 utilization?: number
95 }
96
97 /**
98 * Task statistics.
99 *
100 * @internal
101 */
102 export interface TaskStatistics {
103 /**
104 * Number of executed tasks.
105 */
106 executed: number
107 /**
108 * Number of executing tasks.
109 */
110 executing: number
111 /**
112 * Number of queued tasks.
113 */
114 readonly queued: number
115 /**
116 * Maximum number of queued tasks.
117 */
118 readonly maxQueued?: number
119 /**
120 * Number of sequentially stolen tasks.
121 */
122 sequentiallyStolen: number
123 /**
124 * Number of stolen tasks.
125 */
126 stolen: number
127 /**
128 * Number of failed tasks.
129 */
130 failed: number
131 }
132
133 /**
134 * Enumeration of worker types.
135 */
136 export const WorkerTypes = Object.freeze({
137 thread: 'thread',
138 cluster: 'cluster'
139 } as const)
140
141 /**
142 * Worker type.
143 */
144 export type WorkerType = keyof typeof WorkerTypes
145
146 /**
147 * Worker information.
148 *
149 * @internal
150 */
151 export interface WorkerInfo {
152 /**
153 * Worker id.
154 */
155 readonly id: number | undefined
156 /**
157 * Worker type.
158 */
159 readonly type: WorkerType
160 /**
161 * Dynamic flag.
162 */
163 dynamic: boolean
164 /**
165 * Ready flag.
166 */
167 ready: boolean
168 /**
169 * Stealing flag.
170 * This flag is set to `true` when worker node is stealing tasks from another worker node.
171 */
172 stealing: boolean
173 /**
174 * Task function names.
175 */
176 taskFunctionNames?: string[]
177 }
178
179 /**
180 * Worker usage statistics.
181 *
182 * @internal
183 */
184 export interface WorkerUsage {
185 /**
186 * Tasks statistics.
187 */
188 readonly tasks: TaskStatistics
189 /**
190 * Tasks runtime statistics.
191 */
192 readonly runTime: MeasurementStatistics
193 /**
194 * Tasks wait time statistics.
195 */
196 readonly waitTime: MeasurementStatistics
197 /**
198 * Tasks event loop utilization statistics.
199 */
200 readonly elu: EventLoopUtilizationMeasurementStatistics
201 }
202
203 /**
204 * Worker choice strategy data.
205 *
206 * @internal
207 */
208 export interface StrategyData {
209 virtualTaskEndTimestamp?: number
210 }
211
212 /**
213 * Worker interface.
214 */
215 export interface IWorker {
216 /**
217 * Cluster worker id.
218 */
219 readonly id?: number
220 /**
221 * Worker thread worker id.
222 */
223 readonly threadId?: number
224 /**
225 * Registers an event handler.
226 *
227 * @param event - The event.
228 * @param handler - The event handler.
229 */
230 readonly on: (event: string, handler: EventHandler<this>) => void
231 /**
232 * Registers once an event handler.
233 *
234 * @param event - The event.
235 * @param handler - The event handler.
236 */
237 readonly once: (event: string, handler: EventHandler<this>) => void
238 /**
239 * Stop all JavaScript execution in the worker thread as soon as possible.
240 * Returns a Promise for the exit code that is fulfilled when the `'exit' event` is emitted.
241 */
242 readonly terminate?: () => Promise<number>
243 /**
244 * Cluster worker disconnect.
245 */
246 readonly disconnect?: () => void
247 /**
248 * Cluster worker kill.
249 */
250 readonly kill?: (signal?: string) => void
251 }
252
253 /**
254 * Worker node options.
255 *
256 * @internal
257 */
258 export interface WorkerNodeOptions {
259 workerOptions?: WorkerOptions
260 env?: Record<string, unknown>
261 tasksQueueBackPressureSize: number | undefined
262 }
263
264 /**
265 * Worker node interface.
266 *
267 * @typeParam Worker - Type of worker.
268 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
269 * @internal
270 */
271 export interface IWorkerNode<Worker extends IWorker, Data = unknown>
272 extends EventEmitter {
273 /**
274 * Worker.
275 */
276 readonly worker: Worker
277 /**
278 * Worker info.
279 */
280 readonly info: WorkerInfo
281 /**
282 * Worker usage statistics.
283 */
284 readonly usage: WorkerUsage
285 /**
286 * Worker choice strategy data.
287 * This is used to store data that are specific to the worker choice strategy.
288 */
289 strategyData?: StrategyData
290 /**
291 * Message channel (worker thread only).
292 */
293 readonly messageChannel?: MessageChannel
294 /**
295 * Tasks queue back pressure size.
296 * This is the number of tasks that can be enqueued before the worker node has back pressure.
297 */
298 tasksQueueBackPressureSize: number
299 /**
300 * Tasks queue size.
301 *
302 * @returns The tasks queue size.
303 */
304 readonly tasksQueueSize: () => number
305 /**
306 * Enqueue task.
307 *
308 * @param task - The task to queue.
309 * @returns The tasks queue size.
310 */
311 readonly enqueueTask: (task: Task<Data>) => number
312 /**
313 * Prepends a task to the tasks queue.
314 *
315 * @param task - The task to prepend.
316 * @returns The tasks queue size.
317 */
318 readonly unshiftTask: (task: Task<Data>) => number
319 /**
320 * Dequeue task.
321 *
322 * @returns The dequeued task.
323 */
324 readonly dequeueTask: () => Task<Data> | undefined
325 /**
326 * Pops a task from the tasks queue.
327 *
328 * @returns The popped task.
329 */
330 readonly popTask: () => Task<Data> | undefined
331 /**
332 * Clears tasks queue.
333 */
334 readonly clearTasksQueue: () => void
335 /**
336 * Whether the worker node has back pressure (i.e. its tasks queue is full).
337 *
338 * @returns `true` if the worker node has back pressure, `false` otherwise.
339 */
340 readonly hasBackPressure: () => boolean
341 /**
342 * Resets usage statistics.
343 */
344 readonly resetUsage: () => void
345 /**
346 * Terminates the worker node.
347 */
348 readonly terminate: () => Promise<void>
349 /**
350 * Registers a worker event handler.
351 *
352 * @param event - The event.
353 * @param handler - The event handler.
354 */
355 readonly registerWorkerEventHandler: (
356 event: string,
357 handler: EventHandler<Worker>
358 ) => void
359 /**
360 * Registers once a worker event handler.
361 *
362 * @param event - The event.
363 * @param handler - The event handler.
364 */
365 readonly registerOnceWorkerEventHandler: (
366 event: string,
367 handler: EventHandler<Worker>
368 ) => void
369 /**
370 * Gets task function worker usage statistics.
371 *
372 * @param name - The task function name.
373 * @returns The task function worker usage statistics if the task function worker usage statistics are initialized, `undefined` otherwise.
374 */
375 readonly getTaskFunctionWorkerUsage: (name: string) => WorkerUsage | undefined
376 /**
377 * Deletes task function worker usage statistics.
378 *
379 * @param name - The task function name.
380 * @returns `true` if the task function worker usage statistics were deleted, `false` otherwise.
381 */
382 readonly deleteTaskFunctionWorkerUsage: (name: string) => boolean
383 }
384
385 /**
386 * Worker node event detail.
387 *
388 * @internal
389 */
390 export interface WorkerNodeEventDetail {
391 workerId?: number
392 workerNodeKey?: number
393 }