fix: fix continuous tasks stealing on idle start at worker node idling
[poolifier.git] / src / pools / worker.ts
... / ...
CommitLineData
1import type { MessageChannel } from 'node:worker_threads'
2import type { CircularArray } from '../circular-array'
3import type { Task } from '../utility-types'
4
5/**
6 * Callback invoked when the worker has started successfully.
7 *
8 * @typeParam Worker - Type of worker.
9 */
10export type OnlineHandler<Worker extends IWorker> = (this: Worker) => void
11
12/**
13 * Callback invoked if the worker has received a message.
14 *
15 * @typeParam Worker - Type of worker.
16 */
17export type MessageHandler<Worker extends IWorker> = (
18 this: Worker,
19 message: unknown
20) => void
21
22/**
23 * Callback invoked if the worker raised an error.
24 *
25 * @typeParam Worker - Type of worker.
26 */
27export type ErrorHandler<Worker extends IWorker> = (
28 this: Worker,
29 error: Error
30) => void
31
32/**
33 * Callback invoked when the worker exits successfully.
34 *
35 * @typeParam Worker - Type of worker.
36 */
37export type ExitHandler<Worker extends IWorker> = (
38 this: Worker,
39 exitCode: number
40) => void
41
42/**
43 * Measurement statistics.
44 *
45 * @internal
46 */
47export interface MeasurementStatistics {
48 /**
49 * Measurement aggregate.
50 */
51 aggregate?: number
52 /**
53 * Measurement minimum.
54 */
55 minimum?: number
56 /**
57 * Measurement maximum.
58 */
59 maximum?: number
60 /**
61 * Measurement average.
62 */
63 average?: number
64 /**
65 * Measurement median.
66 */
67 median?: number
68 /**
69 * Measurement history.
70 */
71 readonly history: CircularArray<number>
72}
73
74/**
75 * Event loop utilization measurement statistics.
76 *
77 * @internal
78 */
79export interface EventLoopUtilizationMeasurementStatistics {
80 readonly idle: MeasurementStatistics
81 readonly active: MeasurementStatistics
82 utilization?: number
83}
84
85/**
86 * Task statistics.
87 *
88 * @internal
89 */
90export interface TaskStatistics {
91 /**
92 * Number of executed tasks.
93 */
94 executed: number
95 /**
96 * Number of executing tasks.
97 */
98 executing: number
99 /**
100 * Number of queued tasks.
101 */
102 readonly queued: number
103 /**
104 * Maximum number of queued tasks.
105 */
106 readonly maxQueued?: number
107 /**
108 * Number of sequentially stolen tasks.
109 */
110 sequentiallyStolen: number
111 /**
112 * Number of stolen tasks.
113 */
114 stolen: number
115 /**
116 * Number of failed tasks.
117 */
118 failed: number
119}
120
121/**
122 * Enumeration of worker types.
123 */
124export const WorkerTypes = Object.freeze({
125 thread: 'thread',
126 cluster: 'cluster'
127} as const)
128
129/**
130 * Worker type.
131 */
132export type WorkerType = keyof typeof WorkerTypes
133
134/**
135 * Worker information.
136 *
137 * @internal
138 */
139export interface WorkerInfo {
140 /**
141 * Worker id.
142 */
143 readonly id: number | undefined
144 /**
145 * Worker type.
146 */
147 readonly type: WorkerType
148 /**
149 * Dynamic flag.
150 */
151 dynamic: boolean
152 /**
153 * Ready flag.
154 */
155 ready: boolean
156 /**
157 * Task function names.
158 */
159 taskFunctionNames?: string[]
160}
161
162/**
163 * Worker usage statistics.
164 *
165 * @internal
166 */
167export interface WorkerUsage {
168 /**
169 * Tasks statistics.
170 */
171 readonly tasks: TaskStatistics
172 /**
173 * Tasks runtime statistics.
174 */
175 readonly runTime: MeasurementStatistics
176 /**
177 * Tasks wait time statistics.
178 */
179 readonly waitTime: MeasurementStatistics
180 /**
181 * Tasks event loop utilization statistics.
182 */
183 readonly elu: EventLoopUtilizationMeasurementStatistics
184}
185
186/**
187 * Worker choice strategy data.
188 *
189 * @internal
190 */
191export interface StrategyData {
192 virtualTaskEndTimestamp?: number
193}
194
195/**
196 * Worker interface.
197 */
198export interface IWorker {
199 /**
200 * Worker id.
201 */
202 readonly id?: number
203 readonly threadId?: number
204 /**
205 * Registers an event listener.
206 *
207 * @param event - The event.
208 * @param handler - The event handler.
209 */
210 readonly on: ((event: 'online', handler: OnlineHandler<this>) => void) &
211 ((event: 'message', handler: MessageHandler<this>) => void) &
212 ((event: 'error', handler: ErrorHandler<this>) => void) &
213 ((event: 'exit', handler: ExitHandler<this>) => void)
214 /**
215 * Registers a listener to the exit event that will only be performed once.
216 *
217 * @param event - The `'exit'` event.
218 * @param handler - The exit handler.
219 */
220 readonly once: (event: 'exit', handler: ExitHandler<this>) => void
221}
222
223/**
224 * Worker node event detail.
225 *
226 * @internal
227 */
228export interface WorkerNodeEventDetail {
229 workerId: number
230 workerNodeKey?: number
231}
232
233/**
234 * Worker node interface.
235 *
236 * @typeParam Worker - Type of worker.
237 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
238 * @internal
239 */
240export interface IWorkerNode<Worker extends IWorker, Data = unknown>
241 extends EventTarget {
242 /**
243 * Worker.
244 */
245 readonly worker: Worker
246 /**
247 * Worker info.
248 */
249 readonly info: WorkerInfo
250 /**
251 * Worker usage statistics.
252 */
253 readonly usage: WorkerUsage
254 /**
255 * Worker choice strategy data.
256 * This is used to store data that are specific to the worker choice strategy.
257 */
258 strategyData?: StrategyData
259 /**
260 * Message channel (worker_threads only).
261 */
262 readonly messageChannel?: MessageChannel
263 /**
264 * Tasks queue back pressure size.
265 * This is the number of tasks that can be enqueued before the worker node has back pressure.
266 */
267 tasksQueueBackPressureSize: number
268 /**
269 * Tasks queue size.
270 *
271 * @returns The tasks queue size.
272 */
273 readonly tasksQueueSize: () => number
274 /**
275 * Enqueue task.
276 *
277 * @param task - The task to queue.
278 * @returns The tasks queue size.
279 */
280 readonly enqueueTask: (task: Task<Data>) => number
281 /**
282 * Prepends a task to the tasks queue.
283 *
284 * @param task - The task to prepend.
285 * @returns The tasks queue size.
286 */
287 readonly unshiftTask: (task: Task<Data>) => number
288 /**
289 * Dequeue task.
290 *
291 * @returns The dequeued task.
292 */
293 readonly dequeueTask: () => Task<Data> | undefined
294 /**
295 * Pops a task from the tasks queue.
296 *
297 * @returns The popped task.
298 */
299 readonly popTask: () => Task<Data> | undefined
300 /**
301 * Clears tasks queue.
302 */
303 readonly clearTasksQueue: () => void
304 /**
305 * Whether the worker node has back pressure (i.e. its tasks queue is full).
306 *
307 * @returns `true` if the worker node has back pressure, `false` otherwise.
308 */
309 readonly hasBackPressure: () => boolean
310 /**
311 * Resets usage statistics.
312 */
313 readonly resetUsage: () => void
314 /**
315 * Closes communication channel.
316 */
317 readonly closeChannel: () => void
318 /**
319 * Gets task function worker usage statistics.
320 *
321 * @param name - The task function name.
322 * @returns The task function worker usage statistics if the task function worker usage statistics are initialized, `undefined` otherwise.
323 */
324 readonly getTaskFunctionWorkerUsage: (name: string) => WorkerUsage | undefined
325 /**
326 * Deletes task function worker usage statistics.
327 *
328 * @param name - The task function name.
329 * @returns `true` if the task function worker usage statistics were deleted, `false` otherwise.
330 */
331 readonly deleteTaskFunctionWorkerUsage: (name: string) => boolean
332}