refactor: convert if...then...else to switch...case
[poolifier.git] / src / pools / worker-node.ts
1 import { MessageChannel } from 'node:worker_threads'
2 import { EventEmitter } from 'node:events'
3 import { CircularArray } from '../circular-array'
4 import type { Task } from '../utility-types'
5 import { DEFAULT_TASK_NAME, getWorkerId, getWorkerType } from '../utils'
6 import { Deque } from '../deque'
7 import {
8 type ErrorHandler,
9 type ExitHandler,
10 type IWorker,
11 type IWorkerNode,
12 type MessageHandler,
13 type OnlineHandler,
14 type StrategyData,
15 type WorkerInfo,
16 type WorkerNodeOptions,
17 type WorkerType,
18 WorkerTypes,
19 type WorkerUsage
20 } from './worker'
21 import { checkWorkerNodeArguments, createWorker } from './utils'
22
23 /**
24 * Worker node.
25 *
26 * @typeParam Worker - Type of worker.
27 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
28 */
29 export class WorkerNode<Worker extends IWorker, Data = unknown>
30 extends EventEmitter
31 implements IWorkerNode<Worker, Data> {
32 /** @inheritdoc */
33 public readonly worker: Worker
34 /** @inheritdoc */
35 public readonly info: WorkerInfo
36 /** @inheritdoc */
37 public usage: WorkerUsage
38 /** @inheritdoc */
39 public strategyData?: StrategyData
40 /** @inheritdoc */
41 public messageChannel?: MessageChannel
42 /** @inheritdoc */
43 public tasksQueueBackPressureSize: number
44 private readonly tasksQueue: Deque<Task<Data>>
45 private onBackPressureStarted: boolean
46 private readonly taskFunctionsUsage: Map<string, WorkerUsage>
47
48 /**
49 * Constructs a new worker node.
50 *
51 * @param type - The worker type.
52 * @param filePath - Path to the worker file.
53 * @param opts - The worker node options.
54 */
55 constructor (type: WorkerType, filePath: string, opts: WorkerNodeOptions) {
56 super()
57 checkWorkerNodeArguments(type, filePath, opts)
58 this.worker = createWorker<Worker>(type, filePath, {
59 env: opts.env,
60 workerOptions: opts.workerOptions
61 })
62 this.info = this.initWorkerInfo(this.worker)
63 this.usage = this.initWorkerUsage()
64 if (this.info.type === WorkerTypes.thread) {
65 this.messageChannel = new MessageChannel()
66 }
67 this.tasksQueueBackPressureSize = opts.tasksQueueBackPressureSize
68 this.tasksQueue = new Deque<Task<Data>>()
69 this.onBackPressureStarted = false
70 this.taskFunctionsUsage = new Map<string, WorkerUsage>()
71 }
72
73 /** @inheritdoc */
74 public tasksQueueSize (): number {
75 return this.tasksQueue.size
76 }
77
78 /** @inheritdoc */
79 public enqueueTask (task: Task<Data>): number {
80 const tasksQueueSize = this.tasksQueue.push(task)
81 if (this.hasBackPressure() && !this.onBackPressureStarted) {
82 this.onBackPressureStarted = true
83 this.emit('backPressure', { workerId: this.info.id as number })
84 this.onBackPressureStarted = false
85 }
86 return tasksQueueSize
87 }
88
89 /** @inheritdoc */
90 public unshiftTask (task: Task<Data>): number {
91 const tasksQueueSize = this.tasksQueue.unshift(task)
92 if (this.hasBackPressure() && !this.onBackPressureStarted) {
93 this.onBackPressureStarted = true
94 this.emit('backPressure', { workerId: this.info.id as number })
95 this.onBackPressureStarted = false
96 }
97 return tasksQueueSize
98 }
99
100 /** @inheritdoc */
101 public dequeueTask (): Task<Data> | undefined {
102 return this.tasksQueue.shift()
103 }
104
105 /** @inheritdoc */
106 public popTask (): Task<Data> | undefined {
107 return this.tasksQueue.pop()
108 }
109
110 /** @inheritdoc */
111 public clearTasksQueue (): void {
112 this.tasksQueue.clear()
113 }
114
115 /** @inheritdoc */
116 public hasBackPressure (): boolean {
117 return this.tasksQueue.size >= this.tasksQueueBackPressureSize
118 }
119
120 /** @inheritdoc */
121 public resetUsage (): void {
122 this.usage = this.initWorkerUsage()
123 this.taskFunctionsUsage.clear()
124 }
125
126 /** @inheritdoc */
127 public async terminate (): Promise<void> {
128 const waitWorkerExit = new Promise<void>(resolve => {
129 this.registerOnceWorkerEventHandler('exit', () => {
130 resolve()
131 })
132 })
133 this.closeMessageChannel()
134 this.removeAllListeners()
135 switch (this.info.type) {
136 case WorkerTypes.thread:
137 await this.worker.terminate?.()
138 break
139 case WorkerTypes.cluster:
140 this.registerOnceWorkerEventHandler('disconnect', () => {
141 this.worker.kill?.()
142 })
143 this.worker.disconnect?.()
144 break
145 }
146 await waitWorkerExit
147 }
148
149 /** @inheritdoc */
150 public registerWorkerEventHandler (
151 event: string,
152 handler:
153 | OnlineHandler<Worker>
154 | MessageHandler<Worker>
155 | ErrorHandler<Worker>
156 | ExitHandler<Worker>
157 ): void {
158 this.worker.on(event, handler)
159 }
160
161 /** @inheritdoc */
162 public registerOnceWorkerEventHandler (
163 event: string,
164 handler:
165 | OnlineHandler<Worker>
166 | MessageHandler<Worker>
167 | ErrorHandler<Worker>
168 | ExitHandler<Worker>
169 ): void {
170 this.worker.once(event, handler)
171 }
172
173 /** @inheritdoc */
174 public getTaskFunctionWorkerUsage (name: string): WorkerUsage | undefined {
175 if (!Array.isArray(this.info.taskFunctionNames)) {
176 throw new Error(
177 `Cannot get task function worker usage for task function name '${name}' when task function names list is not yet defined`
178 )
179 }
180 if (
181 Array.isArray(this.info.taskFunctionNames) &&
182 this.info.taskFunctionNames.length < 3
183 ) {
184 throw new Error(
185 `Cannot get task function worker usage for task function name '${name}' when task function names list has less than 3 elements`
186 )
187 }
188 if (name === DEFAULT_TASK_NAME) {
189 name = this.info.taskFunctionNames[1]
190 }
191 if (!this.taskFunctionsUsage.has(name)) {
192 this.taskFunctionsUsage.set(name, this.initTaskFunctionWorkerUsage(name))
193 }
194 return this.taskFunctionsUsage.get(name)
195 }
196
197 /** @inheritdoc */
198 public deleteTaskFunctionWorkerUsage (name: string): boolean {
199 return this.taskFunctionsUsage.delete(name)
200 }
201
202 private closeMessageChannel (): void {
203 if (this.messageChannel != null) {
204 this.messageChannel.port1.unref()
205 this.messageChannel.port2.unref()
206 this.messageChannel.port1.close()
207 this.messageChannel.port2.close()
208 delete this.messageChannel
209 }
210 }
211
212 private initWorkerInfo (worker: Worker): WorkerInfo {
213 return {
214 id: getWorkerId(worker),
215 type: getWorkerType(worker) as WorkerType,
216 dynamic: false,
217 ready: false
218 }
219 }
220
221 private initWorkerUsage (): WorkerUsage {
222 const getTasksQueueSize = (): number => {
223 return this.tasksQueue.size
224 }
225 const getTasksQueueMaxSize = (): number => {
226 return this.tasksQueue.maxSize
227 }
228 return {
229 tasks: {
230 executed: 0,
231 executing: 0,
232 get queued (): number {
233 return getTasksQueueSize()
234 },
235 get maxQueued (): number {
236 return getTasksQueueMaxSize()
237 },
238 sequentiallyStolen: 0,
239 stolen: 0,
240 failed: 0
241 },
242 runTime: {
243 history: new CircularArray<number>()
244 },
245 waitTime: {
246 history: new CircularArray<number>()
247 },
248 elu: {
249 idle: {
250 history: new CircularArray<number>()
251 },
252 active: {
253 history: new CircularArray<number>()
254 }
255 }
256 }
257 }
258
259 private initTaskFunctionWorkerUsage (name: string): WorkerUsage {
260 const getTaskFunctionQueueSize = (): number => {
261 let taskFunctionQueueSize = 0
262 for (const task of this.tasksQueue) {
263 if (
264 (task.name === DEFAULT_TASK_NAME &&
265 name === (this.info.taskFunctionNames as string[])[1]) ||
266 (task.name !== DEFAULT_TASK_NAME && name === task.name)
267 ) {
268 ++taskFunctionQueueSize
269 }
270 }
271 return taskFunctionQueueSize
272 }
273 return {
274 tasks: {
275 executed: 0,
276 executing: 0,
277 get queued (): number {
278 return getTaskFunctionQueueSize()
279 },
280 sequentiallyStolen: 0,
281 stolen: 0,
282 failed: 0
283 },
284 runTime: {
285 history: new CircularArray<number>()
286 },
287 waitTime: {
288 history: new CircularArray<number>()
289 },
290 elu: {
291 idle: {
292 history: new CircularArray<number>()
293 },
294 active: {
295 history: new CircularArray<number>()
296 }
297 }
298 }
299 }
300 }