build(deps-dev): apply updates
[poolifier.git] / src / pools / worker.ts
1 import type { EventEmitter } from 'node:events'
2 import type { MessageChannel, WorkerOptions } from 'node:worker_threads'
3
4 import type { CircularBuffer } from '../circular-buffer.js'
5 import type { Task, TaskFunctionProperties } from '../utility-types.js'
6
7 /**
8 * Callback invoked when the worker has started successfully.
9 * @typeParam Worker - Type of worker.
10 */
11 export type OnlineHandler<Worker extends IWorker> = (this: Worker) => void
12
13 /**
14 * Callback invoked if the worker has received a message.
15 * @typeParam Worker - Type of worker.
16 */
17 export type MessageHandler<Worker extends IWorker> = (
18 this: Worker,
19 message: unknown
20 ) => void
21
22 /**
23 * Callback invoked if the worker raised an error.
24 * @typeParam Worker - Type of worker.
25 */
26 export type ErrorHandler<Worker extends IWorker> = (
27 this: Worker,
28 error: Error
29 ) => void
30
31 /**
32 * Callback invoked when the worker exits successfully.
33 * @typeParam Worker - Type of worker.
34 */
35 export type ExitHandler<Worker extends IWorker> = (
36 this: Worker,
37 exitCode: number
38 ) => void
39
40 /**
41 * Worker event handler.
42 * @typeParam Worker - Type of worker.
43 */
44 export type EventHandler<Worker extends IWorker> =
45 | OnlineHandler<Worker>
46 | MessageHandler<Worker>
47 | ErrorHandler<Worker>
48 | ExitHandler<Worker>
49
50 /**
51 * Measurement history size.
52 */
53 export const MeasurementHistorySize = 386
54
55 /**
56 * Measurement statistics.
57 * @internal
58 */
59 export interface MeasurementStatistics {
60 /**
61 * Measurement aggregate.
62 */
63 aggregate?: number
64 /**
65 * Measurement minimum.
66 */
67 minimum?: number
68 /**
69 * Measurement maximum.
70 */
71 maximum?: number
72 /**
73 * Measurement average.
74 */
75 average?: number
76 /**
77 * Measurement median.
78 */
79 median?: number
80 /**
81 * Measurement history.
82 */
83 readonly history: CircularBuffer
84 }
85
86 /**
87 * Event loop utilization measurement statistics.
88 * @internal
89 */
90 export interface EventLoopUtilizationMeasurementStatistics {
91 readonly idle: MeasurementStatistics
92 readonly active: MeasurementStatistics
93 utilization?: number
94 }
95
96 /**
97 * Task statistics.
98 * @internal
99 */
100 export interface TaskStatistics {
101 /**
102 * Number of executed tasks.
103 */
104 executed: number
105 /**
106 * Number of executing tasks.
107 */
108 executing: number
109 /**
110 * Number of queued tasks.
111 */
112 readonly queued: number
113 /**
114 * Maximum number of queued tasks.
115 */
116 readonly maxQueued?: number
117 /**
118 * Number of sequentially stolen tasks.
119 */
120 sequentiallyStolen: number
121 /**
122 * Number of stolen tasks.
123 */
124 stolen: number
125 /**
126 * Number of failed tasks.
127 */
128 failed: number
129 }
130
131 /**
132 * Enumeration of worker types.
133 */
134 export const WorkerTypes: Readonly<{ thread: 'thread'; cluster: 'cluster' }> =
135 Object.freeze({
136 thread: 'thread',
137 cluster: 'cluster',
138 } as const)
139
140 /**
141 * Worker type.
142 */
143 export type WorkerType = keyof typeof WorkerTypes
144
145 /**
146 * Worker information.
147 * @internal
148 */
149 export interface WorkerInfo {
150 /**
151 * Worker id.
152 */
153 readonly id: number | undefined
154 /**
155 * Worker type.
156 */
157 readonly type: WorkerType
158 /**
159 * Dynamic flag.
160 */
161 dynamic: boolean
162 /**
163 * Ready flag.
164 */
165 ready: boolean
166 /**
167 * Stealing flag.
168 * This flag is set to `true` when worker node is stealing tasks from another worker node.
169 */
170 stealing: boolean
171 /**
172 * Back pressure flag.
173 * This flag is set to `true` when worker node tasks queue has back pressure.
174 */
175 backPressure: boolean
176 /**
177 * Task functions properties.
178 */
179 taskFunctionsProperties?: TaskFunctionProperties[]
180 }
181
182 /**
183 * Worker usage statistics.
184 * @internal
185 */
186 export interface WorkerUsage {
187 /**
188 * Tasks statistics.
189 */
190 readonly tasks: TaskStatistics
191 /**
192 * Tasks runtime statistics.
193 */
194 readonly runTime: MeasurementStatistics
195 /**
196 * Tasks wait time statistics.
197 */
198 readonly waitTime: MeasurementStatistics
199 /**
200 * Tasks event loop utilization statistics.
201 */
202 readonly elu: EventLoopUtilizationMeasurementStatistics
203 }
204
205 /**
206 * Worker choice strategy data.
207 * @internal
208 */
209 export interface StrategyData {
210 virtualTaskEndTimestamp?: number
211 }
212
213 /**
214 * Worker interface.
215 */
216 export interface IWorker extends EventEmitter {
217 /**
218 * Cluster worker id.
219 */
220 readonly id?: number
221 /**
222 * Worker thread worker id.
223 */
224 readonly threadId?: number
225 /**
226 * Registers an event handler.
227 * @param event - The event.
228 * @param handler - The event handler.
229 */
230 readonly on: (event: string, handler: EventHandler<this>) => this
231 /**
232 * Registers once an event handler.
233 * @param event - The event.
234 * @param handler - The event handler.
235 */
236 readonly once: (event: string, handler: EventHandler<this>) => this
237 /**
238 * Calling `unref()` on a worker allows the thread to exit if this is the only
239 * active handle in the event system. If the worker is already `unref()`ed calling`unref()` again has no effect.
240 * @since v10.5.0
241 */
242 readonly unref?: () => void
243 /**
244 * Stop all JavaScript execution in the worker thread as soon as possible.
245 * Returns a Promise for the exit code that is fulfilled when the `'exit' event` is emitted.
246 */
247 readonly terminate?: () => Promise<number>
248 /**
249 * Cluster worker disconnect.
250 */
251 readonly disconnect?: () => void
252 /**
253 * Cluster worker kill.
254 */
255 readonly kill?: (signal?: string) => void
256 }
257
258 /**
259 * Worker node options.
260 * @internal
261 */
262 export interface WorkerNodeOptions {
263 workerOptions?: WorkerOptions
264 env?: Record<string, unknown>
265 tasksQueueBackPressureSize: number | undefined
266 tasksQueueBucketSize: number | undefined
267 tasksQueuePriority: boolean | undefined
268 }
269
270 /**
271 * Worker node interface.
272 * @typeParam Worker - Type of worker.
273 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
274 * @internal
275 */
276 export interface IWorkerNode<Worker extends IWorker, Data = unknown>
277 extends EventEmitter {
278 /**
279 * Worker.
280 */
281 readonly worker: Worker
282 /**
283 * Worker info.
284 */
285 readonly info: WorkerInfo
286 /**
287 * Worker usage statistics.
288 */
289 readonly usage: WorkerUsage
290 /**
291 * Worker choice strategy data.
292 * This is used to store data that are specific to the worker choice strategy.
293 */
294 strategyData?: StrategyData
295 /**
296 * Message channel (worker thread only).
297 */
298 readonly messageChannel?: MessageChannel
299 /**
300 * Tasks queue back pressure size.
301 * This is the number of tasks that can be enqueued before the worker node has back pressure.
302 */
303 tasksQueueBackPressureSize: number
304 /**
305 * Sets tasks queue priority.
306 * @param enablePriority - Whether to enable tasks queue priority.
307 */
308 readonly setTasksQueuePriority: (enablePriority: boolean) => void
309 /**
310 * Tasks queue size.
311 * @returns The tasks queue size.
312 */
313 readonly tasksQueueSize: () => number
314 /**
315 * Enqueue task.
316 * @param task - The task to queue.
317 * @returns The tasks queue size.
318 */
319 readonly enqueueTask: (task: Task<Data>) => number
320 /**
321 * Dequeue task.
322 * @param bucket - The prioritized bucket to dequeue from. @defaultValue 0
323 * @returns The dequeued task.
324 */
325 readonly dequeueTask: (bucket?: number) => Task<Data> | undefined
326 /**
327 * Dequeue last prioritized task.
328 * @returns The dequeued task.
329 */
330 readonly dequeueLastPrioritizedTask: () => Task<Data> | undefined
331 /**
332 * Clears tasks queue.
333 */
334 readonly clearTasksQueue: () => void
335 /**
336 * Whether the worker node has back pressure (i.e. its tasks queue is full).
337 * @returns `true` if the worker node has back pressure, `false` otherwise.
338 */
339 readonly hasBackPressure: () => boolean
340 /**
341 * Terminates the worker node.
342 */
343 readonly terminate: () => Promise<void>
344 /**
345 * Registers a worker event handler.
346 * @param event - The event.
347 * @param handler - The event handler.
348 */
349 readonly registerWorkerEventHandler: (
350 event: string,
351 handler: EventHandler<Worker>
352 ) => void
353 /**
354 * Registers once a worker event handler.
355 * @param event - The event.
356 * @param handler - The event handler.
357 */
358 readonly registerOnceWorkerEventHandler: (
359 event: string,
360 handler: EventHandler<Worker>
361 ) => void
362 /**
363 * Gets task function worker usage statistics.
364 * @param name - The task function name.
365 * @returns The task function worker usage statistics if the task function worker usage statistics are initialized, `undefined` otherwise.
366 */
367 readonly getTaskFunctionWorkerUsage: (name: string) => WorkerUsage | undefined
368 /**
369 * Deletes task function worker usage statistics.
370 * @param name - The task function name.
371 * @returns `true` if the task function worker usage statistics were deleted, `false` otherwise.
372 */
373 readonly deleteTaskFunctionWorkerUsage: (name: string) => boolean
374 }
375
376 /**
377 * Worker node event detail.
378 * @internal
379 */
380 export interface WorkerNodeEventDetail {
381 workerId?: number
382 workerNodeKey?: number
383 }