refactor: cleanup handleWorkerNodeIdleEvent()
[poolifier.git] / src / pools / worker.ts
1 import type { EventEmitter } from 'node:events'
2 import type { MessageChannel, WorkerOptions } from 'node:worker_threads'
3
4 import type { CircularArray } from '../circular-array.js'
5 import type { Task, TaskFunctionProperties } from '../utility-types.js'
6
7 /**
8 * Callback invoked when the worker has started successfully.
9 *
10 * @typeParam Worker - Type of worker.
11 */
12 export type OnlineHandler<Worker extends IWorker> = (this: Worker) => void
13
14 /**
15 * Callback invoked if the worker has received a message.
16 *
17 * @typeParam Worker - Type of worker.
18 */
19 export type MessageHandler<Worker extends IWorker> = (
20 this: Worker,
21 message: unknown
22 ) => void
23
24 /**
25 * Callback invoked if the worker raised an error.
26 *
27 * @typeParam Worker - Type of worker.
28 */
29 export type ErrorHandler<Worker extends IWorker> = (
30 this: Worker,
31 error: Error
32 ) => void
33
34 /**
35 * Callback invoked when the worker exits successfully.
36 *
37 * @typeParam Worker - Type of worker.
38 */
39 export type ExitHandler<Worker extends IWorker> = (
40 this: Worker,
41 exitCode: number
42 ) => void
43
44 /**
45 * Worker event handler.
46 *
47 * @typeParam Worker - Type of worker.
48 */
49 export type EventHandler<Worker extends IWorker> =
50 | OnlineHandler<Worker>
51 | MessageHandler<Worker>
52 | ErrorHandler<Worker>
53 | ExitHandler<Worker>
54
55 /**
56 * Measurement statistics.
57 *
58 * @internal
59 */
60 export interface MeasurementStatistics {
61 /**
62 * Measurement aggregate.
63 */
64 aggregate?: number
65 /**
66 * Measurement minimum.
67 */
68 minimum?: number
69 /**
70 * Measurement maximum.
71 */
72 maximum?: number
73 /**
74 * Measurement average.
75 */
76 average?: number
77 /**
78 * Measurement median.
79 */
80 median?: number
81 /**
82 * Measurement history.
83 */
84 readonly history: CircularArray<number>
85 }
86
87 /**
88 * Event loop utilization measurement statistics.
89 *
90 * @internal
91 */
92 export interface EventLoopUtilizationMeasurementStatistics {
93 readonly idle: MeasurementStatistics
94 readonly active: MeasurementStatistics
95 utilization?: number
96 }
97
98 /**
99 * Task statistics.
100 *
101 * @internal
102 */
103 export interface TaskStatistics {
104 /**
105 * Number of executed tasks.
106 */
107 executed: number
108 /**
109 * Number of executing tasks.
110 */
111 executing: number
112 /**
113 * Number of queued tasks.
114 */
115 readonly queued: number
116 /**
117 * Maximum number of queued tasks.
118 */
119 readonly maxQueued?: number
120 /**
121 * Number of sequentially stolen tasks.
122 */
123 sequentiallyStolen: number
124 /**
125 * Number of stolen tasks.
126 */
127 stolen: number
128 /**
129 * Number of failed tasks.
130 */
131 failed: number
132 }
133
134 /**
135 * Enumeration of worker types.
136 */
137 export const WorkerTypes: Readonly<{ thread: 'thread', cluster: 'cluster' }> =
138 Object.freeze({
139 thread: 'thread',
140 cluster: 'cluster'
141 } as const)
142
143 /**
144 * Worker type.
145 */
146 export type WorkerType = keyof typeof WorkerTypes
147
148 /**
149 * Worker information.
150 *
151 * @internal
152 */
153 export interface WorkerInfo {
154 /**
155 * Worker id.
156 */
157 readonly id: number | undefined
158 /**
159 * Worker type.
160 */
161 readonly type: WorkerType
162 /**
163 * Dynamic flag.
164 */
165 dynamic: boolean
166 /**
167 * Ready flag.
168 */
169 ready: boolean
170 /**
171 * Stealing flag.
172 * This flag is set to `true` when worker node is stealing tasks from another worker node.
173 */
174 stealing: boolean
175 /**
176 * Back pressure flag.
177 * This flag is set to `true` when worker node tasks queue has back pressure.
178 */
179 backPressure: boolean
180 /**
181 * Task functions properties.
182 */
183 taskFunctionsProperties?: TaskFunctionProperties[]
184 }
185
186 /**
187 * Worker usage statistics.
188 *
189 * @internal
190 */
191 export interface WorkerUsage {
192 /**
193 * Tasks statistics.
194 */
195 readonly tasks: TaskStatistics
196 /**
197 * Tasks runtime statistics.
198 */
199 readonly runTime: MeasurementStatistics
200 /**
201 * Tasks wait time statistics.
202 */
203 readonly waitTime: MeasurementStatistics
204 /**
205 * Tasks event loop utilization statistics.
206 */
207 readonly elu: EventLoopUtilizationMeasurementStatistics
208 }
209
210 /**
211 * Worker choice strategy data.
212 *
213 * @internal
214 */
215 export interface StrategyData {
216 virtualTaskEndTimestamp?: number
217 }
218
219 /**
220 * Worker interface.
221 */
222 export interface IWorker extends EventEmitter {
223 /**
224 * Cluster worker id.
225 */
226 readonly id?: number
227 /**
228 * Worker thread worker id.
229 */
230 readonly threadId?: number
231 /**
232 * Registers an event handler.
233 *
234 * @param event - The event.
235 * @param handler - The event handler.
236 */
237 readonly on: (event: string, handler: EventHandler<this>) => this
238 /**
239 * Registers once an event handler.
240 *
241 * @param event - The event.
242 * @param handler - The event handler.
243 */
244 readonly once: (event: string, handler: EventHandler<this>) => this
245 /**
246 * Calling `unref()` on a worker allows the thread to exit if this is the only
247 * active handle in the event system. If the worker is already `unref()`ed calling`unref()` again has no effect.
248 * @since v10.5.0
249 */
250 readonly unref?: () => void
251 /**
252 * Stop all JavaScript execution in the worker thread as soon as possible.
253 * Returns a Promise for the exit code that is fulfilled when the `'exit' event` is emitted.
254 */
255 readonly terminate?: () => Promise<number>
256 /**
257 * Cluster worker disconnect.
258 */
259 readonly disconnect?: () => void
260 /**
261 * Cluster worker kill.
262 */
263 readonly kill?: (signal?: string) => void
264 }
265
266 /**
267 * Worker node options.
268 *
269 * @internal
270 */
271 export interface WorkerNodeOptions {
272 workerOptions?: WorkerOptions
273 env?: Record<string, unknown>
274 tasksQueueBackPressureSize: number | undefined
275 tasksQueueBucketSize: number | undefined
276 }
277
278 /**
279 * Worker node interface.
280 *
281 * @typeParam Worker - Type of worker.
282 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
283 * @internal
284 */
285 export interface IWorkerNode<Worker extends IWorker, Data = unknown>
286 extends EventEmitter {
287 /**
288 * Worker.
289 */
290 readonly worker: Worker
291 /**
292 * Worker info.
293 */
294 readonly info: WorkerInfo
295 /**
296 * Worker usage statistics.
297 */
298 readonly usage: WorkerUsage
299 /**
300 * Worker choice strategy data.
301 * This is used to store data that are specific to the worker choice strategy.
302 */
303 strategyData?: StrategyData
304 /**
305 * Message channel (worker thread only).
306 */
307 readonly messageChannel?: MessageChannel
308 /**
309 * Tasks queue back pressure size.
310 * This is the number of tasks that can be enqueued before the worker node has back pressure.
311 */
312 tasksQueueBackPressureSize: number
313 /**
314 * Tasks queue size.
315 *
316 * @returns The tasks queue size.
317 */
318 readonly tasksQueueSize: () => number
319 /**
320 * Enqueue task.
321 *
322 * @param task - The task to queue.
323 * @returns The tasks queue size.
324 */
325 readonly enqueueTask: (task: Task<Data>) => number
326 /**
327 * Dequeue task.
328 *
329 * @param bucket - The prioritized bucket to dequeue from. @defaultValue 0
330 * @returns The dequeued task.
331 */
332 readonly dequeueTask: (bucket?: number) => Task<Data> | undefined
333 /**
334 * Dequeue last prioritized task.
335 *
336 * @returns The dequeued task.
337 */
338 readonly dequeueLastPrioritizedTask: () => Task<Data> | undefined
339 /**
340 * Clears tasks queue.
341 */
342 readonly clearTasksQueue: () => void
343 /**
344 * Whether the worker node has back pressure (i.e. its tasks queue is full).
345 *
346 * @returns `true` if the worker node has back pressure, `false` otherwise.
347 */
348 readonly hasBackPressure: () => boolean
349 /**
350 * Terminates the worker node.
351 */
352 readonly terminate: () => Promise<void>
353 /**
354 * Registers a worker event handler.
355 *
356 * @param event - The event.
357 * @param handler - The event handler.
358 */
359 readonly registerWorkerEventHandler: (
360 event: string,
361 handler: EventHandler<Worker>
362 ) => void
363 /**
364 * Registers once a worker event handler.
365 *
366 * @param event - The event.
367 * @param handler - The event handler.
368 */
369 readonly registerOnceWorkerEventHandler: (
370 event: string,
371 handler: EventHandler<Worker>
372 ) => void
373 /**
374 * Gets task function worker usage statistics.
375 *
376 * @param name - The task function name.
377 * @returns The task function worker usage statistics if the task function worker usage statistics are initialized, `undefined` otherwise.
378 */
379 readonly getTaskFunctionWorkerUsage: (name: string) => WorkerUsage | undefined
380 /**
381 * Deletes task function worker usage statistics.
382 *
383 * @param name - The task function name.
384 * @returns `true` if the task function worker usage statistics were deleted, `false` otherwise.
385 */
386 readonly deleteTaskFunctionWorkerUsage: (name: string) => boolean
387 }
388
389 /**
390 * Worker node event detail.
391 *
392 * @internal
393 */
394 export interface WorkerNodeEventDetail {
395 workerId?: number
396 workerNodeKey?: number
397 }