refactor: cleanup worker error handling code
[poolifier.git] / src / pools / worker-node.ts
1 import { MessageChannel } from 'node:worker_threads'
2 import { EventEmitter } from 'node:events'
3 import { CircularArray } from '../circular-array'
4 import type { Task } from '../utility-types'
5 import { DEFAULT_TASK_NAME, getWorkerId, getWorkerType } from '../utils'
6 import { Deque } from '../deque'
7 import {
8 type ErrorHandler,
9 type ExitHandler,
10 type IWorker,
11 type IWorkerNode,
12 type MessageHandler,
13 type OnlineHandler,
14 type StrategyData,
15 type WorkerInfo,
16 type WorkerNodeOptions,
17 type WorkerType,
18 WorkerTypes,
19 type WorkerUsage
20 } from './worker'
21 import { checkWorkerNodeArguments, createWorker } from './utils'
22
23 /**
24 * Worker node.
25 *
26 * @typeParam Worker - Type of worker.
27 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
28 */
29 export class WorkerNode<Worker extends IWorker, Data = unknown>
30 extends EventEmitter
31 implements IWorkerNode<Worker, Data> {
32 /** @inheritdoc */
33 public readonly worker: Worker
34 /** @inheritdoc */
35 public readonly info: WorkerInfo
36 /** @inheritdoc */
37 public usage: WorkerUsage
38 /** @inheritdoc */
39 public strategyData?: StrategyData
40 /** @inheritdoc */
41 public messageChannel?: MessageChannel
42 /** @inheritdoc */
43 public tasksQueueBackPressureSize: number
44 private readonly tasksQueue: Deque<Task<Data>>
45 private onBackPressureStarted: boolean
46 private readonly taskFunctionsUsage: Map<string, WorkerUsage>
47
48 /**
49 * Constructs a new worker node.
50 *
51 * @param type - The worker type.
52 * @param filePath - Path to the worker file.
53 * @param opts - The worker node options.
54 */
55 constructor (type: WorkerType, filePath: string, opts: WorkerNodeOptions) {
56 super()
57 checkWorkerNodeArguments(type, filePath, opts)
58 this.worker = createWorker<Worker>(type, filePath, {
59 env: opts.env,
60 workerOptions: opts.workerOptions
61 })
62 this.info = this.initWorkerInfo(this.worker)
63 this.usage = this.initWorkerUsage()
64 if (this.info.type === WorkerTypes.thread) {
65 this.messageChannel = new MessageChannel()
66 }
67 this.tasksQueueBackPressureSize = opts.tasksQueueBackPressureSize
68 this.tasksQueue = new Deque<Task<Data>>()
69 this.onBackPressureStarted = false
70 this.taskFunctionsUsage = new Map<string, WorkerUsage>()
71 }
72
73 /** @inheritdoc */
74 public tasksQueueSize (): number {
75 return this.tasksQueue.size
76 }
77
78 /** @inheritdoc */
79 public enqueueTask (task: Task<Data>): number {
80 const tasksQueueSize = this.tasksQueue.push(task)
81 if (this.hasBackPressure() && !this.onBackPressureStarted) {
82 this.onBackPressureStarted = true
83 this.emit('backPressure', { workerId: this.info.id as number })
84 this.onBackPressureStarted = false
85 }
86 return tasksQueueSize
87 }
88
89 /** @inheritdoc */
90 public unshiftTask (task: Task<Data>): number {
91 const tasksQueueSize = this.tasksQueue.unshift(task)
92 if (this.hasBackPressure() && !this.onBackPressureStarted) {
93 this.onBackPressureStarted = true
94 this.emit('backPressure', { workerId: this.info.id as number })
95 this.onBackPressureStarted = false
96 }
97 return tasksQueueSize
98 }
99
100 /** @inheritdoc */
101 public dequeueTask (): Task<Data> | undefined {
102 return this.tasksQueue.shift()
103 }
104
105 /** @inheritdoc */
106 public popTask (): Task<Data> | undefined {
107 return this.tasksQueue.pop()
108 }
109
110 /** @inheritdoc */
111 public clearTasksQueue (): void {
112 this.tasksQueue.clear()
113 }
114
115 /** @inheritdoc */
116 public hasBackPressure (): boolean {
117 return this.tasksQueue.size >= this.tasksQueueBackPressureSize
118 }
119
120 /** @inheritdoc */
121 public resetUsage (): void {
122 this.usage = this.initWorkerUsage()
123 this.taskFunctionsUsage.clear()
124 }
125
126 /** @inheritdoc */
127 public async terminate (): Promise<void> {
128 const waitWorkerExit = new Promise<void>(resolve => {
129 this.registerOnceWorkerEventHandler('exit', () => {
130 resolve()
131 })
132 })
133 this.closeMessageChannel()
134 this.removeAllListeners()
135 if (this.info.type === WorkerTypes.thread) {
136 await this.worker.terminate?.()
137 } else if (this.info.type === WorkerTypes.cluster) {
138 this.registerOnceWorkerEventHandler('disconnect', () => {
139 this.worker.kill?.()
140 })
141 this.worker.disconnect?.()
142 }
143 await waitWorkerExit
144 }
145
146 /** @inheritdoc */
147 public registerWorkerEventHandler (
148 event: string,
149 listener:
150 | OnlineHandler<Worker>
151 | MessageHandler<Worker>
152 | ErrorHandler<Worker>
153 | ExitHandler<Worker>
154 ): void {
155 this.worker.on(event, listener)
156 }
157
158 /** @inheritdoc */
159 public registerOnceWorkerEventHandler (
160 event: string,
161 listener:
162 | OnlineHandler<Worker>
163 | MessageHandler<Worker>
164 | ErrorHandler<Worker>
165 | ExitHandler<Worker>
166 ): void {
167 this.worker.once(event, listener)
168 }
169
170 /** @inheritdoc */
171 public getTaskFunctionWorkerUsage (name: string): WorkerUsage | undefined {
172 if (!Array.isArray(this.info.taskFunctionNames)) {
173 throw new Error(
174 `Cannot get task function worker usage for task function name '${name}' when task function names list is not yet defined`
175 )
176 }
177 if (
178 Array.isArray(this.info.taskFunctionNames) &&
179 this.info.taskFunctionNames.length < 3
180 ) {
181 throw new Error(
182 `Cannot get task function worker usage for task function name '${name}' when task function names list has less than 3 elements`
183 )
184 }
185 if (name === DEFAULT_TASK_NAME) {
186 name = this.info.taskFunctionNames[1]
187 }
188 if (!this.taskFunctionsUsage.has(name)) {
189 this.taskFunctionsUsage.set(name, this.initTaskFunctionWorkerUsage(name))
190 }
191 return this.taskFunctionsUsage.get(name)
192 }
193
194 /** @inheritdoc */
195 public deleteTaskFunctionWorkerUsage (name: string): boolean {
196 return this.taskFunctionsUsage.delete(name)
197 }
198
199 private closeMessageChannel (): void {
200 if (this.messageChannel != null) {
201 this.messageChannel.port1.unref()
202 this.messageChannel.port2.unref()
203 this.messageChannel.port1.close()
204 this.messageChannel.port2.close()
205 delete this.messageChannel
206 }
207 }
208
209 private initWorkerInfo (worker: Worker): WorkerInfo {
210 return {
211 id: getWorkerId(worker),
212 type: getWorkerType(worker) as WorkerType,
213 dynamic: false,
214 ready: false
215 }
216 }
217
218 private initWorkerUsage (): WorkerUsage {
219 const getTasksQueueSize = (): number => {
220 return this.tasksQueue.size
221 }
222 const getTasksQueueMaxSize = (): number => {
223 return this.tasksQueue.maxSize
224 }
225 return {
226 tasks: {
227 executed: 0,
228 executing: 0,
229 get queued (): number {
230 return getTasksQueueSize()
231 },
232 get maxQueued (): number {
233 return getTasksQueueMaxSize()
234 },
235 sequentiallyStolen: 0,
236 stolen: 0,
237 failed: 0
238 },
239 runTime: {
240 history: new CircularArray<number>()
241 },
242 waitTime: {
243 history: new CircularArray<number>()
244 },
245 elu: {
246 idle: {
247 history: new CircularArray<number>()
248 },
249 active: {
250 history: new CircularArray<number>()
251 }
252 }
253 }
254 }
255
256 private initTaskFunctionWorkerUsage (name: string): WorkerUsage {
257 const getTaskFunctionQueueSize = (): number => {
258 let taskFunctionQueueSize = 0
259 for (const task of this.tasksQueue) {
260 if (
261 (task.name === DEFAULT_TASK_NAME &&
262 name === (this.info.taskFunctionNames as string[])[1]) ||
263 (task.name !== DEFAULT_TASK_NAME && name === task.name)
264 ) {
265 ++taskFunctionQueueSize
266 }
267 }
268 return taskFunctionQueueSize
269 }
270 return {
271 tasks: {
272 executed: 0,
273 executing: 0,
274 get queued (): number {
275 return getTaskFunctionQueueSize()
276 },
277 sequentiallyStolen: 0,
278 stolen: 0,
279 failed: 0
280 },
281 runTime: {
282 history: new CircularArray<number>()
283 },
284 waitTime: {
285 history: new CircularArray<number>()
286 },
287 elu: {
288 idle: {
289 history: new CircularArray<number>()
290 },
291 active: {
292 history: new CircularArray<number>()
293 }
294 }
295 }
296 }
297 }