776986d5000f2c2d33cdf8f9daf444038df74ef6
[poolifier.git] / src / pools / worker.ts
1 import type { MessageChannel } from 'node:worker_threads'
2 import type { CircularArray } from '../circular-array'
3 import type { Task } from '../utility-types'
4
5 /**
6 * Callback invoked when the worker has started successfully.
7 *
8 * @typeParam Worker - Type of worker.
9 */
10 export type OnlineHandler<Worker extends IWorker> = (this: Worker) => void
11
12 /**
13 * Callback invoked if the worker has received a message.
14 *
15 * @typeParam Worker - Type of worker.
16 */
17 export type MessageHandler<Worker extends IWorker> = (
18 this: Worker,
19 message: unknown
20 ) => void
21
22 /**
23 * Callback invoked if the worker raised an error.
24 *
25 * @typeParam Worker - Type of worker.
26 */
27 export type ErrorHandler<Worker extends IWorker> = (
28 this: Worker,
29 error: Error
30 ) => void
31
32 /**
33 * Callback invoked when the worker exits successfully.
34 *
35 * @typeParam Worker - Type of worker.
36 */
37 export type ExitHandler<Worker extends IWorker> = (
38 this: Worker,
39 exitCode: number
40 ) => void
41
42 /**
43 * Measurement statistics.
44 *
45 * @internal
46 */
47 export interface MeasurementStatistics {
48 /**
49 * Measurement aggregate.
50 */
51 aggregate?: number
52 /**
53 * Measurement minimum.
54 */
55 minimum?: number
56 /**
57 * Measurement maximum.
58 */
59 maximum?: number
60 /**
61 * Measurement average.
62 */
63 average?: number
64 /**
65 * Measurement median.
66 */
67 median?: number
68 /**
69 * Measurement history.
70 */
71 readonly history: CircularArray<number>
72 }
73
74 /**
75 * Event loop utilization measurement statistics.
76 *
77 * @internal
78 */
79 export interface EventLoopUtilizationMeasurementStatistics {
80 readonly idle: MeasurementStatistics
81 readonly active: MeasurementStatistics
82 utilization?: number
83 }
84
85 /**
86 * Task statistics.
87 *
88 * @internal
89 */
90 export interface TaskStatistics {
91 /**
92 * Number of executed tasks.
93 */
94 executed: number
95 /**
96 * Number of executing tasks.
97 */
98 executing: number
99 /**
100 * Number of queued tasks.
101 */
102 readonly queued: number
103 /**
104 * Maximum number of queued tasks.
105 */
106 readonly maxQueued?: number
107 /**
108 * Number of sequentially stolen tasks.
109 */
110 sequentiallyStolen: number
111 /**
112 * Number of stolen tasks.
113 */
114 stolen: number
115 /**
116 * Number of failed tasks.
117 */
118 failed: number
119 }
120
121 /**
122 * Enumeration of worker types.
123 */
124 export const WorkerTypes = Object.freeze({
125 thread: 'thread',
126 cluster: 'cluster'
127 } as const)
128
129 /**
130 * Worker type.
131 */
132 export type WorkerType = keyof typeof WorkerTypes
133
134 /**
135 * Worker information.
136 *
137 * @internal
138 */
139 export interface WorkerInfo {
140 /**
141 * Worker id.
142 */
143 readonly id: number | undefined
144 /**
145 * Worker type.
146 */
147 readonly type: WorkerType
148 /**
149 * Dynamic flag.
150 */
151 dynamic: boolean
152 /**
153 * Ready flag.
154 */
155 ready: boolean
156 /**
157 * Task function names.
158 */
159 taskFunctionNames?: string[]
160 }
161
162 /**
163 * Worker usage statistics.
164 *
165 * @internal
166 */
167 export interface WorkerUsage {
168 /**
169 * Tasks statistics.
170 */
171 readonly tasks: TaskStatistics
172 /**
173 * Tasks runtime statistics.
174 */
175 readonly runTime: MeasurementStatistics
176 /**
177 * Tasks wait time statistics.
178 */
179 readonly waitTime: MeasurementStatistics
180 /**
181 * Tasks event loop utilization statistics.
182 */
183 readonly elu: EventLoopUtilizationMeasurementStatistics
184 }
185
186 /**
187 * Worker choice strategy data.
188 *
189 * @internal
190 */
191 export interface StrategyData {
192 virtualTaskEndTimestamp?: number
193 }
194
195 /**
196 * Worker interface.
197 */
198 export interface IWorker {
199 /**
200 * Worker id.
201 */
202 readonly id?: number
203 readonly threadId?: number
204 /**
205 * Registers an event listener.
206 *
207 * @param event - The event.
208 * @param handler - The event handler.
209 */
210 readonly on: ((event: 'online', handler: OnlineHandler<this>) => void) &
211 ((event: 'message', handler: MessageHandler<this>) => void) &
212 ((event: 'error', handler: ErrorHandler<this>) => void) &
213 ((event: 'exit', handler: ExitHandler<this>) => void)
214 /**
215 * Registers a listener to the exit event that will only be performed once.
216 *
217 * @param event - The `'exit'` event.
218 * @param handler - The exit handler.
219 */
220 readonly once: (event: 'exit', handler: ExitHandler<this>) => void
221 }
222
223 /**
224 * Worker node event detail.
225 *
226 * @internal
227 */
228 export interface WorkerNodeEventDetail {
229 workerId: number
230 workerNodeKey?: number
231 }
232
233 /**
234 * Worker node interface.
235 *
236 * @typeParam Worker - Type of worker.
237 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
238 * @internal
239 */
240 export interface IWorkerNode<Worker extends IWorker, Data = unknown>
241 extends EventTarget {
242 /**
243 * Worker.
244 */
245 readonly worker: Worker
246 /**
247 * Worker info.
248 */
249 readonly info: WorkerInfo
250 /**
251 * Worker usage statistics.
252 */
253 readonly usage: WorkerUsage
254 /**
255 * Worker choice strategy data.
256 * This is used to store data that are specific to the worker choice strategy.
257 */
258 strategyData?: StrategyData
259 /**
260 * Message channel (worker_threads only).
261 */
262 readonly messageChannel?: MessageChannel
263 /**
264 * Tasks queue back pressure size.
265 * This is the number of tasks that can be enqueued before the worker node has back pressure.
266 */
267 tasksQueueBackPressureSize: number
268 /**
269 * Tasks queue size.
270 *
271 * @returns The tasks queue size.
272 */
273 readonly tasksQueueSize: () => number
274 /**
275 * Enqueue task.
276 *
277 * @param task - The task to queue.
278 * @returns The tasks queue size.
279 */
280 readonly enqueueTask: (task: Task<Data>) => number
281 /**
282 * Prepends a task to the tasks queue.
283 *
284 * @param task - The task to prepend.
285 * @returns The tasks queue size.
286 */
287 readonly unshiftTask: (task: Task<Data>) => number
288 /**
289 * Dequeue task.
290 *
291 * @returns The dequeued task.
292 */
293 readonly dequeueTask: () => Task<Data> | undefined
294 /**
295 * Pops a task from the tasks queue.
296 *
297 * @returns The popped task.
298 */
299 readonly popTask: () => Task<Data> | undefined
300 /**
301 * Clears tasks queue.
302 */
303 readonly clearTasksQueue: () => void
304 /**
305 * Whether the worker node has back pressure (i.e. its tasks queue is full).
306 *
307 * @returns `true` if the worker node has back pressure, `false` otherwise.
308 */
309 readonly hasBackPressure: () => boolean
310 /**
311 * Resets usage statistics.
312 */
313 readonly resetUsage: () => void
314 /**
315 * Closes communication channel.
316 */
317 readonly closeChannel: () => void
318 /**
319 * Gets task function worker usage statistics.
320 *
321 * @param name - The task function name.
322 * @returns The task function worker usage statistics if the task function worker usage statistics are initialized, `undefined` otherwise.
323 */
324 readonly getTaskFunctionWorkerUsage: (name: string) => WorkerUsage | undefined
325 /**
326 * Deletes task function worker usage statistics.
327 *
328 * @param name - The task function name.
329 * @returns `true` if the task function worker usage statistics were deleted, `false` otherwise.
330 */
331 readonly deleteTaskFunctionWorkerUsage: (name: string) => boolean
332 }