]> Piment Noir Git Repositories - poolifier.git/blob - src/pools/worker.ts
edc659d37169e3833d99da62314bc0b3e5430baa
[poolifier.git] / src / pools / worker.ts
1 import type { EventEmitter } from 'node:events'
2 import type { MessageChannel, WorkerOptions } from 'node:worker_threads'
3
4 import type { CircularBuffer } from '../circular-buffer.js'
5 import type { PriorityQueue } from '../queues/priority-queue.js'
6 import type { Task, TaskFunctionProperties } from '../utility-types.js'
7
8 /**
9 * Callback invoked if the worker raised an error.
10 * @typeParam Worker - Type of worker.
11 */
12 export type ErrorHandler<Worker extends IWorker> = (
13 this: Worker,
14 error: Error
15 ) => void
16
17 /**
18 * Worker event handler.
19 * @typeParam Worker - Type of worker.
20 */
21 export type EventHandler<Worker extends IWorker> =
22 | ErrorHandler<Worker>
23 | ExitHandler<Worker>
24 | MessageHandler<Worker>
25 | OnlineHandler<Worker>
26
27 /**
28 * Callback invoked when the worker exits successfully.
29 * @typeParam Worker - Type of worker.
30 */
31 export type ExitHandler<Worker extends IWorker> = (
32 this: Worker,
33 exitCode: number
34 ) => void
35
36 /**
37 * Callback invoked if the worker has received a message.
38 * @typeParam Worker - Type of worker.
39 */
40 export type MessageHandler<Worker extends IWorker> = (
41 this: Worker,
42 message: unknown
43 ) => void
44
45 /**
46 * Callback invoked when the worker has started successfully.
47 * @typeParam Worker - Type of worker.
48 */
49 export type OnlineHandler<Worker extends IWorker> = (this: Worker) => void
50
51 /**
52 * Measurement history size.
53 */
54 export const MeasurementHistorySize = 386
55
56 /**
57 * Event loop utilization measurement statistics.
58 * @internal
59 */
60 export interface EventLoopUtilizationMeasurementStatistics {
61 readonly active: MeasurementStatistics
62 readonly idle: MeasurementStatistics
63 utilization?: number
64 }
65
66 /**
67 * Measurement statistics.
68 * @internal
69 */
70 export interface MeasurementStatistics {
71 /**
72 * Measurement aggregate.
73 */
74 aggregate?: number
75 /**
76 * Measurement average.
77 */
78 average?: number
79 /**
80 * Measurement history.
81 */
82 readonly history: CircularBuffer
83 /**
84 * Measurement maximum.
85 */
86 maximum?: number
87 /**
88 * Measurement median.
89 */
90 median?: number
91 /**
92 * Measurement minimum.
93 */
94 minimum?: number
95 }
96
97 /**
98 * Task statistics.
99 * @internal
100 */
101 export interface TaskStatistics {
102 /**
103 * Number of executed tasks.
104 */
105 executed: number
106 /**
107 * Number of executing tasks.
108 */
109 executing: number
110 /**
111 * Number of failed tasks.
112 */
113 failed: number
114 /**
115 * Maximum number of queued tasks.
116 */
117 readonly maxQueued?: number
118 /**
119 * Number of queued tasks.
120 */
121 readonly queued: number
122 /**
123 * Number of sequentially stolen tasks.
124 */
125 sequentiallyStolen: number
126 /**
127 * Number of stolen tasks.
128 */
129 stolen: number
130 }
131
132 /**
133 * Enumeration of worker types.
134 */
135 export const WorkerTypes: Readonly<{ cluster: 'cluster'; thread: 'thread' }> =
136 Object.freeze({
137 cluster: 'cluster',
138 thread: 'thread',
139 } as const)
140
141 /**
142 * Worker interface.
143 */
144 export interface IWorker extends EventEmitter {
145 /**
146 * Cluster worker disconnect.
147 */
148 readonly disconnect?: () => void
149 /**
150 * Cluster worker id.
151 */
152 readonly id?: number
153 /**
154 * Cluster worker kill.
155 */
156 readonly kill?: (signal?: string) => void
157 /**
158 * Registers an event handler.
159 * @param event - The event.
160 * @param handler - The event handler.
161 */
162 readonly on: (event: string, handler: EventHandler<this>) => this
163 /**
164 * Registers once an event handler.
165 * @param event - The event.
166 * @param handler - The event handler.
167 */
168 readonly once: (event: string, handler: EventHandler<this>) => this
169 /**
170 * Stop all JavaScript execution in the worker thread as soon as possible.
171 * Returns a Promise for the exit code that is fulfilled when the `'exit' event` is emitted.
172 */
173 readonly terminate?: () => Promise<number>
174 /**
175 * Worker thread worker id.
176 */
177 readonly threadId?: number
178 /**
179 * Calling `unref()` on a worker allows the thread to exit if this is the only
180 * active handle in the event system. If the worker is already `unref()`ed calling`unref()` again has no effect.
181 * @since v10.5.0
182 */
183 readonly unref?: () => void
184 }
185
186 /**
187 * Worker node interface.
188 * @typeParam Worker - Type of worker.
189 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
190 * @internal
191 */
192 export interface IWorkerNode<Worker extends IWorker, Data = unknown>
193 extends EventEmitter {
194 /**
195 * Clears tasks queue.
196 */
197 readonly clearTasksQueue: () => void
198 /**
199 * Deletes a task from the tasks queue.
200 * @param task - The task to delete.
201 * @returns `true` if the task was deleted, `false` otherwise.
202 */
203 readonly deleteTask: (task: Task<Data>) => boolean
204 /**
205 * Deletes task function worker usage statistics.
206 * @param name - The task function name.
207 * @returns `true` if the task function worker usage statistics were deleted, `false` otherwise.
208 */
209 readonly deleteTaskFunctionWorkerUsage: (name: string) => boolean
210 /**
211 * Dequeue last prioritized task.
212 * @returns The dequeued task.
213 */
214 readonly dequeueLastPrioritizedTask: () => Task<Data> | undefined
215 /**
216 * Dequeue task.
217 * @param bucket - The prioritized bucket to dequeue from. @defaultValue 0
218 * @returns The dequeued task.
219 */
220 readonly dequeueTask: (bucket?: number) => Task<Data> | undefined
221 /**
222 * Enqueue task.
223 * @param task - The task to queue.
224 * @returns The tasks queue size.
225 */
226 readonly enqueueTask: (task: Task<Data>) => number
227 /**
228 * Gets task function worker usage statistics.
229 * @param name - The task function name.
230 * @returns The task function worker usage statistics if the task function worker usage statistics are initialized, `undefined` otherwise.
231 */
232 readonly getTaskFunctionWorkerUsage: (name: string) => undefined | WorkerUsage
233 /**
234 * Worker info.
235 */
236 readonly info: WorkerInfo
237 /**
238 * Message channel (worker thread only).
239 */
240 readonly messageChannel?: MessageChannel
241 /**
242 * Registers once a worker event handler.
243 * @param event - The event.
244 * @param handler - The event handler.
245 */
246 readonly registerOnceWorkerEventHandler: (
247 event: string,
248 handler: EventHandler<Worker>
249 ) => void
250 /**
251 * Registers a worker event handler.
252 * @param event - The event.
253 * @param handler - The event handler.
254 */
255 readonly registerWorkerEventHandler: (
256 event: string,
257 handler: EventHandler<Worker>
258 ) => void
259 /**
260 * Sets tasks queue priority.
261 * @param enablePriority - Whether to enable tasks queue priority.
262 */
263 readonly setTasksQueuePriority: (enablePriority: boolean) => void
264 /**
265 * Worker choice strategy data.
266 * This is used to store data that are specific to the worker choice strategy.
267 */
268 strategyData?: StrategyData
269 /**
270 * Tasks queue.
271 */
272 readonly tasksQueue: PriorityQueue<Task<Data>>
273 /**
274 * Tasks queue back pressure size.
275 * This is the number of tasks that can be enqueued before the worker node has back pressure.
276 */
277 tasksQueueBackPressureSize: number
278 /**
279 * Tasks queue size.
280 * @returns The tasks queue size.
281 */
282 readonly tasksQueueSize: () => number
283 /**
284 * Terminates the worker node.
285 */
286 readonly terminate: () => Promise<void>
287 /**
288 * Worker usage statistics.
289 */
290 readonly usage: WorkerUsage
291 /**
292 * Worker.
293 */
294 readonly worker: Worker
295 }
296
297 /**
298 * Worker choice strategy data.
299 * @internal
300 */
301 export interface StrategyData {
302 virtualTaskEndTimestamp?: number
303 }
304
305 /**
306 * Worker information.
307 * @internal
308 */
309 export interface WorkerInfo {
310 /**
311 * Back pressure flag.
312 * This flag is set to `true` when worker node tasks queue is back pressured.
313 */
314 backPressure: boolean
315 /**
316 * Back pressure stealing flag.
317 * This flag is set to `true` when worker node is stealing one task from another back pressured worker node.
318 */
319 backPressureStealing: boolean
320 /**
321 * Continuous stealing flag.
322 * This flag is set to `true` when worker node is continuously stealing tasks from other worker nodes.
323 */
324 continuousStealing: boolean
325 /**
326 * Dynamic flag.
327 */
328 dynamic: boolean
329 /**
330 * Worker id.
331 */
332 readonly id: number | undefined
333 /**
334 * Queued task abortion flag.
335 * This flag is set to `true` when worker node is aborting a queued task.
336 */
337 queuedTaskAbortion: boolean
338 /**
339 * Ready flag.
340 */
341 ready: boolean
342 /**
343 * Stealing flag.
344 * This flag is set to `true` when worker node is stealing one task from another worker node.
345 */
346 stealing: boolean
347 /**
348 * Stolen flag.
349 * This flag is set to `true` when worker node has one task stolen from another worker node.
350 */
351 stolen: boolean
352 /**
353 * Task functions properties.
354 */
355 taskFunctionsProperties?: TaskFunctionProperties[]
356 /**
357 * Worker type.
358 */
359 readonly type: WorkerType
360 }
361
362 /**
363 * Worker node event detail.
364 * @internal
365 */
366 export interface WorkerNodeEventDetail {
367 taskId?: `${string}-${string}-${string}-${string}-${string}`
368 workerId?: number
369 workerNodeKey?: number
370 }
371
372 /**
373 * Worker node options.
374 * @internal
375 */
376 export interface WorkerNodeOptions {
377 env?: Record<string, unknown>
378 tasksQueueBackPressureSize: number | undefined
379 tasksQueueBucketSize: number | undefined
380 tasksQueuePriority: boolean | undefined
381 workerOptions?: WorkerOptions
382 }
383
384 /**
385 * Worker type.
386 */
387 export type WorkerType = keyof typeof WorkerTypes
388
389 /**
390 * Worker usage statistics.
391 * @internal
392 */
393 export interface WorkerUsage {
394 /**
395 * Tasks event loop utilization statistics.
396 */
397 readonly elu: EventLoopUtilizationMeasurementStatistics
398 /**
399 * Tasks runtime statistics.
400 */
401 readonly runTime: MeasurementStatistics
402 /**
403 * Tasks statistics.
404 */
405 readonly tasks: TaskStatistics
406 /**
407 * Tasks wait time statistics.
408 */
409 readonly waitTime: MeasurementStatistics
410 }