refactor: code cleanup
[poolifier.git] / src / pools / worker-node.ts
1 import { MessageChannel } from 'node:worker_threads'
2 import { EventEmitter } from 'node:events'
3 import { CircularArray } from '../circular-array.js'
4 import type { Task } from '../utility-types.js'
5 import { DEFAULT_TASK_NAME, getWorkerId, getWorkerType } from '../utils.js'
6 import { Deque } from '../deque.js'
7 import {
8 type ErrorHandler,
9 type ExitHandler,
10 type IWorker,
11 type IWorkerNode,
12 type MessageHandler,
13 type OnlineHandler,
14 type StrategyData,
15 type WorkerInfo,
16 type WorkerNodeOptions,
17 type WorkerType,
18 WorkerTypes,
19 type WorkerUsage
20 } from './worker.js'
21 import { checkWorkerNodeArguments, createWorker } from './utils.js'
22
23 /**
24 * Worker node.
25 *
26 * @typeParam Worker - Type of worker.
27 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
28 */
29 export class WorkerNode<Worker extends IWorker, Data = unknown>
30 extends EventEmitter
31 implements IWorkerNode<Worker, Data> {
32 /** @inheritdoc */
33 public readonly worker: Worker
34 /** @inheritdoc */
35 public readonly info: WorkerInfo
36 /** @inheritdoc */
37 public usage: WorkerUsage
38 /** @inheritdoc */
39 public strategyData?: StrategyData
40 /** @inheritdoc */
41 public messageChannel?: MessageChannel
42 /** @inheritdoc */
43 public tasksQueueBackPressureSize: number
44 private readonly tasksQueue: Deque<Task<Data>>
45 private onBackPressureStarted: boolean
46 private readonly taskFunctionsUsage: Map<string, WorkerUsage>
47
48 /**
49 * Constructs a new worker node.
50 *
51 * @param type - The worker type.
52 * @param filePath - Path to the worker file.
53 * @param opts - The worker node options.
54 */
55 constructor (type: WorkerType, filePath: string, opts: WorkerNodeOptions) {
56 super()
57 checkWorkerNodeArguments(type, filePath, opts)
58 this.worker = createWorker<Worker>(type, filePath, {
59 env: opts.env,
60 workerOptions: opts.workerOptions
61 })
62 this.info = this.initWorkerInfo(this.worker)
63 this.usage = this.initWorkerUsage()
64 if (this.info.type === WorkerTypes.thread) {
65 this.messageChannel = new MessageChannel()
66 }
67 this.tasksQueueBackPressureSize = opts.tasksQueueBackPressureSize
68 this.tasksQueue = new Deque<Task<Data>>()
69 this.onBackPressureStarted = false
70 this.taskFunctionsUsage = new Map<string, WorkerUsage>()
71 }
72
73 /** @inheritdoc */
74 public tasksQueueSize (): number {
75 return this.tasksQueue.size
76 }
77
78 /** @inheritdoc */
79 public enqueueTask (task: Task<Data>): number {
80 const tasksQueueSize = this.tasksQueue.push(task)
81 if (this.hasBackPressure() && !this.onBackPressureStarted) {
82 this.onBackPressureStarted = true
83 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
84 this.emit('backPressure', { workerId: this.info.id! })
85 this.onBackPressureStarted = false
86 }
87 return tasksQueueSize
88 }
89
90 /** @inheritdoc */
91 public unshiftTask (task: Task<Data>): number {
92 const tasksQueueSize = this.tasksQueue.unshift(task)
93 if (this.hasBackPressure() && !this.onBackPressureStarted) {
94 this.onBackPressureStarted = true
95 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
96 this.emit('backPressure', { workerId: this.info.id! })
97 this.onBackPressureStarted = false
98 }
99 return tasksQueueSize
100 }
101
102 /** @inheritdoc */
103 public dequeueTask (): Task<Data> | undefined {
104 return this.tasksQueue.shift()
105 }
106
107 /** @inheritdoc */
108 public popTask (): Task<Data> | undefined {
109 return this.tasksQueue.pop()
110 }
111
112 /** @inheritdoc */
113 public clearTasksQueue (): void {
114 this.tasksQueue.clear()
115 }
116
117 /** @inheritdoc */
118 public hasBackPressure (): boolean {
119 return this.tasksQueue.size >= this.tasksQueueBackPressureSize
120 }
121
122 /** @inheritdoc */
123 public resetUsage (): void {
124 this.usage = this.initWorkerUsage()
125 this.taskFunctionsUsage.clear()
126 }
127
128 /** @inheritdoc */
129 public async terminate (): Promise<void> {
130 const waitWorkerExit = new Promise<void>(resolve => {
131 this.registerOnceWorkerEventHandler('exit', () => {
132 resolve()
133 })
134 })
135 this.closeMessageChannel()
136 this.removeAllListeners()
137 switch (this.info.type) {
138 case WorkerTypes.thread:
139 await this.worker.terminate?.()
140 break
141 case WorkerTypes.cluster:
142 this.registerOnceWorkerEventHandler('disconnect', () => {
143 this.worker.kill?.()
144 })
145 this.worker.disconnect?.()
146 break
147 }
148 await waitWorkerExit
149 }
150
151 /** @inheritdoc */
152 public registerWorkerEventHandler (
153 event: string,
154 handler:
155 | OnlineHandler<Worker>
156 | MessageHandler<Worker>
157 | ErrorHandler<Worker>
158 | ExitHandler<Worker>
159 ): void {
160 this.worker.on(event, handler)
161 }
162
163 /** @inheritdoc */
164 public registerOnceWorkerEventHandler (
165 event: string,
166 handler:
167 | OnlineHandler<Worker>
168 | MessageHandler<Worker>
169 | ErrorHandler<Worker>
170 | ExitHandler<Worker>
171 ): void {
172 this.worker.once(event, handler)
173 }
174
175 /** @inheritdoc */
176 public getTaskFunctionWorkerUsage (name: string): WorkerUsage | undefined {
177 if (!Array.isArray(this.info.taskFunctionNames)) {
178 throw new Error(
179 `Cannot get task function worker usage for task function name '${name}' when task function names list is not yet defined`
180 )
181 }
182 if (
183 Array.isArray(this.info.taskFunctionNames) &&
184 this.info.taskFunctionNames.length < 3
185 ) {
186 throw new Error(
187 `Cannot get task function worker usage for task function name '${name}' when task function names list has less than 3 elements`
188 )
189 }
190 if (name === DEFAULT_TASK_NAME) {
191 name = this.info.taskFunctionNames[1]
192 }
193 if (!this.taskFunctionsUsage.has(name)) {
194 this.taskFunctionsUsage.set(name, this.initTaskFunctionWorkerUsage(name))
195 }
196 return this.taskFunctionsUsage.get(name)
197 }
198
199 /** @inheritdoc */
200 public deleteTaskFunctionWorkerUsage (name: string): boolean {
201 return this.taskFunctionsUsage.delete(name)
202 }
203
204 private closeMessageChannel (): void {
205 if (this.messageChannel != null) {
206 this.messageChannel.port1.unref()
207 this.messageChannel.port2.unref()
208 this.messageChannel.port1.close()
209 this.messageChannel.port2.close()
210 delete this.messageChannel
211 }
212 }
213
214 private initWorkerInfo (worker: Worker): WorkerInfo {
215 return {
216 id: getWorkerId(worker),
217 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
218 type: getWorkerType(worker)!,
219 dynamic: false,
220 ready: false,
221 stealing: false
222 }
223 }
224
225 private initWorkerUsage (): WorkerUsage {
226 const getTasksQueueSize = (): number => {
227 return this.tasksQueue.size
228 }
229 const getTasksQueueMaxSize = (): number => {
230 return this.tasksQueue.maxSize
231 }
232 return {
233 tasks: {
234 executed: 0,
235 executing: 0,
236 get queued (): number {
237 return getTasksQueueSize()
238 },
239 get maxQueued (): number {
240 return getTasksQueueMaxSize()
241 },
242 sequentiallyStolen: 0,
243 stolen: 0,
244 failed: 0
245 },
246 runTime: {
247 history: new CircularArray<number>()
248 },
249 waitTime: {
250 history: new CircularArray<number>()
251 },
252 elu: {
253 idle: {
254 history: new CircularArray<number>()
255 },
256 active: {
257 history: new CircularArray<number>()
258 }
259 }
260 }
261 }
262
263 private initTaskFunctionWorkerUsage (name: string): WorkerUsage {
264 const getTaskFunctionQueueSize = (): number => {
265 let taskFunctionQueueSize = 0
266 for (const task of this.tasksQueue) {
267 if (
268 (task.name === DEFAULT_TASK_NAME &&
269 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
270 name === this.info.taskFunctionNames![1]) ||
271 (task.name !== DEFAULT_TASK_NAME && name === task.name)
272 ) {
273 ++taskFunctionQueueSize
274 }
275 }
276 return taskFunctionQueueSize
277 }
278 return {
279 tasks: {
280 executed: 0,
281 executing: 0,
282 get queued (): number {
283 return getTaskFunctionQueueSize()
284 },
285 sequentiallyStolen: 0,
286 stolen: 0,
287 failed: 0
288 },
289 runTime: {
290 history: new CircularArray<number>()
291 },
292 waitTime: {
293 history: new CircularArray<number>()
294 },
295 elu: {
296 idle: {
297 history: new CircularArray<number>()
298 },
299 active: {
300 history: new CircularArray<number>()
301 }
302 }
303 }
304 }
305 }