refactor: factor out worker communication channel closing
[poolifier.git] / src / pools / worker.ts
1 import type { MessageChannel } from 'node:worker_threads'
2 import type { CircularArray } from '../circular-array'
3 import type { Task } from '../utility-types'
4
5 /**
6 * Callback invoked if the worker has received a message.
7 */
8 export type MessageHandler<Worker extends IWorker> = (
9 this: Worker,
10 message: unknown
11 ) => void
12
13 /**
14 * Callback invoked if the worker raised an error.
15 */
16 export type ErrorHandler<Worker extends IWorker> = (
17 this: Worker,
18 error: Error
19 ) => void
20
21 /**
22 * Callback invoked when the worker has started successfully.
23 */
24 export type OnlineHandler<Worker extends IWorker> = (this: Worker) => void
25
26 /**
27 * Callback invoked when the worker exits successfully.
28 */
29 export type ExitHandler<Worker extends IWorker> = (
30 this: Worker,
31 exitCode: number
32 ) => void
33
34 /**
35 * Measurement statistics.
36 *
37 * @internal
38 */
39 export interface MeasurementStatistics {
40 /**
41 * Measurement aggregate.
42 */
43 aggregate?: number
44 /**
45 * Measurement minimum.
46 */
47 minimum?: number
48 /**
49 * Measurement maximum.
50 */
51 maximum?: number
52 /**
53 * Measurement average.
54 */
55 average?: number
56 /**
57 * Measurement median.
58 */
59 median?: number
60 /**
61 * Measurement history.
62 */
63 readonly history: CircularArray<number>
64 }
65
66 /**
67 * Event loop utilization measurement statistics.
68 *
69 * @internal
70 */
71 export interface EventLoopUtilizationMeasurementStatistics {
72 readonly idle: MeasurementStatistics
73 readonly active: MeasurementStatistics
74 utilization?: number
75 }
76
77 /**
78 * Task statistics.
79 *
80 * @internal
81 */
82 export interface TaskStatistics {
83 /**
84 * Number of executed tasks.
85 */
86 executed: number
87 /**
88 * Number of executing tasks.
89 */
90 executing: number
91 /**
92 * Number of queued tasks.
93 */
94 readonly queued: number
95 /**
96 * Maximum number of queued tasks.
97 */
98 readonly maxQueued?: number
99 /**
100 * Number of failed tasks.
101 */
102 failed: number
103 }
104
105 /**
106 * Enumeration of worker types.
107 */
108 export const WorkerTypes = Object.freeze({
109 cluster: 'cluster',
110 thread: 'thread'
111 } as const)
112
113 /**
114 * Worker type.
115 */
116 export type WorkerType = keyof typeof WorkerTypes
117
118 /**
119 * Worker information.
120 *
121 * @internal
122 */
123 export interface WorkerInfo {
124 /**
125 * Worker id.
126 */
127 readonly id: number | undefined
128 /**
129 * Worker type.
130 */
131 type: WorkerType
132 /**
133 * Dynamic flag.
134 */
135 dynamic: boolean
136 /**
137 * Ready flag.
138 */
139 ready: boolean
140 /**
141 * Message channel.
142 */
143 messageChannel?: MessageChannel
144 }
145
146 /**
147 * Worker usage statistics.
148 *
149 * @internal
150 */
151 export interface WorkerUsage {
152 /**
153 * Tasks statistics.
154 */
155 readonly tasks: TaskStatistics
156 /**
157 * Tasks runtime statistics.
158 */
159 readonly runTime: MeasurementStatistics
160 /**
161 * Tasks wait time statistics.
162 */
163 readonly waitTime: MeasurementStatistics
164 /**
165 * Tasks event loop utilization statistics.
166 */
167 readonly elu: EventLoopUtilizationMeasurementStatistics
168 }
169
170 /**
171 * Worker interface.
172 */
173 export interface IWorker {
174 /**
175 * Worker id.
176 */
177 readonly id?: number
178 readonly threadId?: number
179 /**
180 * Registers an event listener.
181 *
182 * @param event - The event.
183 * @param handler - The event handler.
184 */
185 readonly on: ((event: 'message', handler: MessageHandler<this>) => void) &
186 ((event: 'error', handler: ErrorHandler<this>) => void) &
187 ((event: 'online', handler: OnlineHandler<this>) => void) &
188 ((event: 'exit', handler: ExitHandler<this>) => void)
189 /**
190 * Registers a listener to the exit event that will only be performed once.
191 *
192 * @param event - `'exit'`.
193 * @param handler - The exit handler.
194 */
195 readonly once: (event: 'exit', handler: ExitHandler<this>) => void
196 }
197
198 /**
199 * Worker node interface.
200 *
201 * @typeParam Worker - Type of worker.
202 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
203 * @internal
204 */
205 export interface IWorkerNode<Worker extends IWorker, Data = unknown> {
206 /**
207 * Worker.
208 */
209 readonly worker: Worker
210 /**
211 * Worker info.
212 */
213 readonly info: WorkerInfo
214 /**
215 * Worker usage statistics.
216 */
217 usage: WorkerUsage
218 /**
219 * Tasks queue size.
220 *
221 * @returns The tasks queue size.
222 */
223 readonly tasksQueueSize: () => number
224 /**
225 * Enqueue task.
226 *
227 * @param task - The task to queue.
228 * @returns The task queue size.
229 */
230 readonly enqueueTask: (task: Task<Data>) => number
231 /**
232 * Dequeue task.
233 *
234 * @returns The dequeued task.
235 */
236 readonly dequeueTask: () => Task<Data> | undefined
237 /**
238 * Clears tasks queue.
239 */
240 readonly clearTasksQueue: () => void
241 /**
242 * Resets usage statistics .
243 */
244 readonly resetUsage: () => void
245 /**
246 * Close communication channel.
247 */
248 readonly closeChannel: () => void
249 /**
250 * Gets task worker usage statistics.
251 */
252 readonly getTaskWorkerUsage: (name: string) => WorkerUsage | undefined
253 }