build(deps-dev): apply updates
[poolifier.git] / src / pools / worker.ts
1 import type { EventEmitter } from 'node:events'
2 import type { MessageChannel, WorkerOptions } from 'node:worker_threads'
3
4 import type { CircularArray } from '../circular-array.js'
5 import type { Task } from '../utility-types.js'
6
7 /**
8 * Callback invoked when the worker has started successfully.
9 *
10 * @typeParam Worker - Type of worker.
11 */
12 export type OnlineHandler<Worker extends IWorker> = (this: Worker) => void
13
14 /**
15 * Callback invoked if the worker has received a message.
16 *
17 * @typeParam Worker - Type of worker.
18 */
19 export type MessageHandler<Worker extends IWorker> = (
20 this: Worker,
21 message: unknown
22 ) => void
23
24 /**
25 * Callback invoked if the worker raised an error.
26 *
27 * @typeParam Worker - Type of worker.
28 */
29 export type ErrorHandler<Worker extends IWorker> = (
30 this: Worker,
31 error: Error
32 ) => void
33
34 /**
35 * Callback invoked when the worker exits successfully.
36 *
37 * @typeParam Worker - Type of worker.
38 */
39 export type ExitHandler<Worker extends IWorker> = (
40 this: Worker,
41 exitCode: number
42 ) => void
43
44 /**
45 * Worker event handler.
46 *
47 * @typeParam Worker - Type of worker.
48 */
49 export type EventHandler<Worker extends IWorker> =
50 | OnlineHandler<Worker>
51 | MessageHandler<Worker>
52 | ErrorHandler<Worker>
53 | ExitHandler<Worker>
54
55 /**
56 * Measurement statistics.
57 *
58 * @internal
59 */
60 export interface MeasurementStatistics {
61 /**
62 * Measurement aggregate.
63 */
64 aggregate?: number
65 /**
66 * Measurement minimum.
67 */
68 minimum?: number
69 /**
70 * Measurement maximum.
71 */
72 maximum?: number
73 /**
74 * Measurement average.
75 */
76 average?: number
77 /**
78 * Measurement median.
79 */
80 median?: number
81 /**
82 * Measurement history.
83 */
84 readonly history: CircularArray<number>
85 }
86
87 /**
88 * Event loop utilization measurement statistics.
89 *
90 * @internal
91 */
92 export interface EventLoopUtilizationMeasurementStatistics {
93 readonly idle: MeasurementStatistics
94 readonly active: MeasurementStatistics
95 utilization?: number
96 }
97
98 /**
99 * Task statistics.
100 *
101 * @internal
102 */
103 export interface TaskStatistics {
104 /**
105 * Number of executed tasks.
106 */
107 executed: number
108 /**
109 * Number of executing tasks.
110 */
111 executing: number
112 /**
113 * Number of queued tasks.
114 */
115 readonly queued: number
116 /**
117 * Maximum number of queued tasks.
118 */
119 readonly maxQueued?: number
120 /**
121 * Number of sequentially stolen tasks.
122 */
123 sequentiallyStolen: number
124 /**
125 * Number of stolen tasks.
126 */
127 stolen: number
128 /**
129 * Number of failed tasks.
130 */
131 failed: number
132 }
133
134 /**
135 * Enumeration of worker types.
136 */
137 export const WorkerTypes: Readonly<{ thread: 'thread', cluster: 'cluster' }> =
138 Object.freeze({
139 thread: 'thread',
140 cluster: 'cluster'
141 } as const)
142
143 /**
144 * Worker type.
145 */
146 export type WorkerType = keyof typeof WorkerTypes
147
148 /**
149 * Worker information.
150 *
151 * @internal
152 */
153 export interface WorkerInfo {
154 /**
155 * Worker id.
156 */
157 readonly id: number | undefined
158 /**
159 * Worker type.
160 */
161 readonly type: WorkerType
162 /**
163 * Dynamic flag.
164 */
165 dynamic: boolean
166 /**
167 * Ready flag.
168 */
169 ready: boolean
170 /**
171 * Stealing flag.
172 * This flag is set to `true` when worker node is stealing tasks from another worker node.
173 */
174 stealing: boolean
175 /**
176 * Task function names.
177 */
178 taskFunctionNames?: string[]
179 }
180
181 /**
182 * Worker usage statistics.
183 *
184 * @internal
185 */
186 export interface WorkerUsage {
187 /**
188 * Tasks statistics.
189 */
190 readonly tasks: TaskStatistics
191 /**
192 * Tasks runtime statistics.
193 */
194 readonly runTime: MeasurementStatistics
195 /**
196 * Tasks wait time statistics.
197 */
198 readonly waitTime: MeasurementStatistics
199 /**
200 * Tasks event loop utilization statistics.
201 */
202 readonly elu: EventLoopUtilizationMeasurementStatistics
203 }
204
205 /**
206 * Worker choice strategy data.
207 *
208 * @internal
209 */
210 export interface StrategyData {
211 virtualTaskEndTimestamp?: number
212 }
213
214 /**
215 * Worker interface.
216 */
217 export interface IWorker extends EventEmitter {
218 /**
219 * Cluster worker id.
220 */
221 readonly id?: number
222 /**
223 * Worker thread worker id.
224 */
225 readonly threadId?: number
226 /**
227 * Registers an event handler.
228 *
229 * @param event - The event.
230 * @param handler - The event handler.
231 */
232 readonly on: (event: string, handler: EventHandler<this>) => this
233 /**
234 * Registers once an event handler.
235 *
236 * @param event - The event.
237 * @param handler - The event handler.
238 */
239 readonly once: (event: string, handler: EventHandler<this>) => this
240 /**
241 * Calling `unref()` on a worker allows the thread to exit if this is the only
242 * active handle in the event system. If the worker is already `unref()`ed calling`unref()` again has no effect.
243 * @since v10.5.0
244 */
245 readonly unref?: () => void
246 /**
247 * Stop all JavaScript execution in the worker thread as soon as possible.
248 * Returns a Promise for the exit code that is fulfilled when the `'exit' event` is emitted.
249 */
250 readonly terminate?: () => Promise<number>
251 /**
252 * Cluster worker disconnect.
253 */
254 readonly disconnect?: () => void
255 /**
256 * Cluster worker kill.
257 */
258 readonly kill?: (signal?: string) => void
259 }
260
261 /**
262 * Worker node options.
263 *
264 * @internal
265 */
266 export interface WorkerNodeOptions {
267 workerOptions?: WorkerOptions
268 env?: Record<string, unknown>
269 tasksQueueBackPressureSize: number | undefined
270 }
271
272 /**
273 * Worker node interface.
274 *
275 * @typeParam Worker - Type of worker.
276 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
277 * @internal
278 */
279 export interface IWorkerNode<Worker extends IWorker, Data = unknown>
280 extends EventEmitter {
281 /**
282 * Worker.
283 */
284 readonly worker: Worker
285 /**
286 * Worker info.
287 */
288 readonly info: WorkerInfo
289 /**
290 * Worker usage statistics.
291 */
292 readonly usage: WorkerUsage
293 /**
294 * Worker choice strategy data.
295 * This is used to store data that are specific to the worker choice strategy.
296 */
297 strategyData?: StrategyData
298 /**
299 * Message channel (worker thread only).
300 */
301 readonly messageChannel?: MessageChannel
302 /**
303 * Tasks queue back pressure size.
304 * This is the number of tasks that can be enqueued before the worker node has back pressure.
305 */
306 tasksQueueBackPressureSize: number
307 /**
308 * Tasks queue size.
309 *
310 * @returns The tasks queue size.
311 */
312 readonly tasksQueueSize: () => number
313 /**
314 * Enqueue task.
315 *
316 * @param task - The task to queue.
317 * @returns The tasks queue size.
318 */
319 readonly enqueueTask: (task: Task<Data>) => number
320 /**
321 * Prepends a task to the tasks queue.
322 *
323 * @param task - The task to prepend.
324 * @returns The tasks queue size.
325 */
326 readonly unshiftTask: (task: Task<Data>) => number
327 /**
328 * Dequeue task.
329 *
330 * @returns The dequeued task.
331 */
332 readonly dequeueTask: () => Task<Data> | undefined
333 /**
334 * Pops a task from the tasks queue.
335 *
336 * @returns The popped task.
337 */
338 readonly popTask: () => Task<Data> | undefined
339 /**
340 * Clears tasks queue.
341 */
342 readonly clearTasksQueue: () => void
343 /**
344 * Whether the worker node has back pressure (i.e. its tasks queue is full).
345 *
346 * @returns `true` if the worker node has back pressure, `false` otherwise.
347 */
348 readonly hasBackPressure: () => boolean
349 /**
350 * Resets usage statistics.
351 */
352 readonly resetUsage: () => void
353 /**
354 * Terminates the worker node.
355 */
356 readonly terminate: () => Promise<void>
357 /**
358 * Registers a worker event handler.
359 *
360 * @param event - The event.
361 * @param handler - The event handler.
362 */
363 readonly registerWorkerEventHandler: (
364 event: string,
365 handler: EventHandler<Worker>
366 ) => void
367 /**
368 * Registers once a worker event handler.
369 *
370 * @param event - The event.
371 * @param handler - The event handler.
372 */
373 readonly registerOnceWorkerEventHandler: (
374 event: string,
375 handler: EventHandler<Worker>
376 ) => void
377 /**
378 * Gets task function worker usage statistics.
379 *
380 * @param name - The task function name.
381 * @returns The task function worker usage statistics if the task function worker usage statistics are initialized, `undefined` otherwise.
382 */
383 readonly getTaskFunctionWorkerUsage: (name: string) => WorkerUsage | undefined
384 /**
385 * Deletes task function worker usage statistics.
386 *
387 * @param name - The task function name.
388 * @returns `true` if the task function worker usage statistics were deleted, `false` otherwise.
389 */
390 readonly deleteTaskFunctionWorkerUsage: (name: string) => boolean
391 }
392
393 /**
394 * Worker node event detail.
395 *
396 * @internal
397 */
398 export interface WorkerNodeEventDetail {
399 workerId?: number
400 workerNodeKey?: number
401 }