refactor: convert if...then...else to switch...case
[poolifier.git] / src / pools / worker-node.ts
CommitLineData
85aeb3f3 1import { MessageChannel } from 'node:worker_threads'
e1c2dba7 2import { EventEmitter } from 'node:events'
4b628b48 3import { CircularArray } from '../circular-array'
5c4d16da 4import type { Task } from '../utility-types'
463226a4 5import { DEFAULT_TASK_NAME, getWorkerId, getWorkerType } from '../utils'
574b351d 6import { Deque } from '../deque'
4b628b48 7import {
c3719753
JB
8 type ErrorHandler,
9 type ExitHandler,
4b628b48
JB
10 type IWorker,
11 type IWorkerNode,
c3719753
JB
12 type MessageHandler,
13 type OnlineHandler,
f3a91bac 14 type StrategyData,
4b628b48 15 type WorkerInfo,
c3719753 16 type WorkerNodeOptions,
4b628b48
JB
17 type WorkerType,
18 WorkerTypes,
19 type WorkerUsage
20} from './worker'
c3719753 21import { checkWorkerNodeArguments, createWorker } from './utils'
4b628b48 22
60664f48
JB
23/**
24 * Worker node.
25 *
26 * @typeParam Worker - Type of worker.
27 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
28 */
4b628b48 29export class WorkerNode<Worker extends IWorker, Data = unknown>
e1c2dba7 30 extends EventEmitter
9f95d5eb 31 implements IWorkerNode<Worker, Data> {
671d5154 32 /** @inheritdoc */
4b628b48 33 public readonly worker: Worker
671d5154 34 /** @inheritdoc */
4b628b48 35 public readonly info: WorkerInfo
671d5154 36 /** @inheritdoc */
4b628b48 37 public usage: WorkerUsage
20c6f652 38 /** @inheritdoc */
f3a91bac
JB
39 public strategyData?: StrategyData
40 /** @inheritdoc */
26fb3c18
JB
41 public messageChannel?: MessageChannel
42 /** @inheritdoc */
20c6f652 43 public tasksQueueBackPressureSize: number
574b351d 44 private readonly tasksQueue: Deque<Task<Data>>
47352846 45 private onBackPressureStarted: boolean
26fb3c18 46 private readonly taskFunctionsUsage: Map<string, WorkerUsage>
4b628b48 47
60664f48
JB
48 /**
49 * Constructs a new worker node.
50 *
c3719753 51 * @param type - The worker type.
9974369e 52 * @param filePath - Path to the worker file.
c3719753 53 * @param opts - The worker node options.
60664f48 54 */
c3719753 55 constructor (type: WorkerType, filePath: string, opts: WorkerNodeOptions) {
9f95d5eb 56 super()
c3719753
JB
57 checkWorkerNodeArguments(type, filePath, opts)
58 this.worker = createWorker<Worker>(type, filePath, {
59 env: opts.env,
60 workerOptions: opts.workerOptions
61 })
62 this.info = this.initWorkerInfo(this.worker)
26fb3c18 63 this.usage = this.initWorkerUsage()
75de9f41 64 if (this.info.type === WorkerTypes.thread) {
7884d183
JB
65 this.messageChannel = new MessageChannel()
66 }
c3719753 67 this.tasksQueueBackPressureSize = opts.tasksQueueBackPressureSize
26fb3c18 68 this.tasksQueue = new Deque<Task<Data>>()
47352846 69 this.onBackPressureStarted = false
26fb3c18 70 this.taskFunctionsUsage = new Map<string, WorkerUsage>()
4b628b48
JB
71 }
72
73 /** @inheritdoc */
74 public tasksQueueSize (): number {
75 return this.tasksQueue.size
76 }
77
4b628b48
JB
78 /** @inheritdoc */
79 public enqueueTask (task: Task<Data>): number {
72695f86 80 const tasksQueueSize = this.tasksQueue.push(task)
9f95d5eb 81 if (this.hasBackPressure() && !this.onBackPressureStarted) {
47352846 82 this.onBackPressureStarted = true
e1c2dba7 83 this.emit('backPressure', { workerId: this.info.id as number })
47352846 84 this.onBackPressureStarted = false
72695f86
JB
85 }
86 return tasksQueueSize
87 }
88
89 /** @inheritdoc */
90 public unshiftTask (task: Task<Data>): number {
91 const tasksQueueSize = this.tasksQueue.unshift(task)
9f95d5eb 92 if (this.hasBackPressure() && !this.onBackPressureStarted) {
47352846 93 this.onBackPressureStarted = true
e1c2dba7 94 this.emit('backPressure', { workerId: this.info.id as number })
47352846 95 this.onBackPressureStarted = false
72695f86
JB
96 }
97 return tasksQueueSize
4b628b48
JB
98 }
99
100 /** @inheritdoc */
101 public dequeueTask (): Task<Data> | undefined {
463226a4 102 return this.tasksQueue.shift()
4b628b48
JB
103 }
104
72695f86
JB
105 /** @inheritdoc */
106 public popTask (): Task<Data> | undefined {
463226a4 107 return this.tasksQueue.pop()
72695f86
JB
108 }
109
4b628b48
JB
110 /** @inheritdoc */
111 public clearTasksQueue (): void {
112 this.tasksQueue.clear()
113 }
114
671d5154
JB
115 /** @inheritdoc */
116 public hasBackPressure (): boolean {
8735b4e5 117 return this.tasksQueue.size >= this.tasksQueueBackPressureSize
671d5154
JB
118 }
119
ff128cc9 120 /** @inheritdoc */
4b628b48
JB
121 public resetUsage (): void {
122 this.usage = this.initWorkerUsage()
db0e38ee 123 this.taskFunctionsUsage.clear()
ff128cc9
JB
124 }
125
3f09ed9f 126 /** @inheritdoc */
07e0c9e5
JB
127 public async terminate (): Promise<void> {
128 const waitWorkerExit = new Promise<void>(resolve => {
129 this.registerOnceWorkerEventHandler('exit', () => {
130 resolve()
131 })
132 })
133 this.closeMessageChannel()
134 this.removeAllListeners()
839b98b8
JB
135 switch (this.info.type) {
136 case WorkerTypes.thread:
137 await this.worker.terminate?.()
138 break
139 case WorkerTypes.cluster:
140 this.registerOnceWorkerEventHandler('disconnect', () => {
141 this.worker.kill?.()
142 })
143 this.worker.disconnect?.()
144 break
3f09ed9f 145 }
07e0c9e5 146 await waitWorkerExit
3f09ed9f
JB
147 }
148
c3719753
JB
149 /** @inheritdoc */
150 public registerWorkerEventHandler (
151 event: string,
88af9bf1 152 handler:
c3719753
JB
153 | OnlineHandler<Worker>
154 | MessageHandler<Worker>
155 | ErrorHandler<Worker>
156 | ExitHandler<Worker>
157 ): void {
88af9bf1 158 this.worker.on(event, handler)
c3719753
JB
159 }
160
161 /** @inheritdoc */
162 public registerOnceWorkerEventHandler (
163 event: string,
88af9bf1 164 handler:
c3719753
JB
165 | OnlineHandler<Worker>
166 | MessageHandler<Worker>
167 | ErrorHandler<Worker>
168 | ExitHandler<Worker>
169 ): void {
88af9bf1 170 this.worker.once(event, handler)
c3719753
JB
171 }
172
ff128cc9 173 /** @inheritdoc */
db0e38ee 174 public getTaskFunctionWorkerUsage (name: string): WorkerUsage | undefined {
6703b9f4 175 if (!Array.isArray(this.info.taskFunctionNames)) {
71b2b6d8 176 throw new Error(
db0e38ee 177 `Cannot get task function worker usage for task function name '${name}' when task function names list is not yet defined`
71b2b6d8
JB
178 )
179 }
b558f6b5 180 if (
6703b9f4
JB
181 Array.isArray(this.info.taskFunctionNames) &&
182 this.info.taskFunctionNames.length < 3
b558f6b5 183 ) {
db0e38ee
JB
184 throw new Error(
185 `Cannot get task function worker usage for task function name '${name}' when task function names list has less than 3 elements`
186 )
187 }
188 if (name === DEFAULT_TASK_NAME) {
6703b9f4 189 name = this.info.taskFunctionNames[1]
b558f6b5 190 }
db0e38ee
JB
191 if (!this.taskFunctionsUsage.has(name)) {
192 this.taskFunctionsUsage.set(name, this.initTaskFunctionWorkerUsage(name))
ff128cc9 193 }
db0e38ee 194 return this.taskFunctionsUsage.get(name)
4b628b48
JB
195 }
196
adee6053
JB
197 /** @inheritdoc */
198 public deleteTaskFunctionWorkerUsage (name: string): boolean {
199 return this.taskFunctionsUsage.delete(name)
200 }
201
07e0c9e5
JB
202 private closeMessageChannel (): void {
203 if (this.messageChannel != null) {
204 this.messageChannel.port1.unref()
205 this.messageChannel.port2.unref()
206 this.messageChannel.port1.close()
207 this.messageChannel.port2.close()
208 delete this.messageChannel
209 }
210 }
211
75de9f41 212 private initWorkerInfo (worker: Worker): WorkerInfo {
4b628b48 213 return {
75de9f41
JB
214 id: getWorkerId(worker),
215 type: getWorkerType(worker) as WorkerType,
4b628b48 216 dynamic: false,
7884d183 217 ready: false
4b628b48
JB
218 }
219 }
220
221 private initWorkerUsage (): WorkerUsage {
222 const getTasksQueueSize = (): number => {
dd951876 223 return this.tasksQueue.size
4b628b48 224 }
bf4ef2ca 225 const getTasksQueueMaxSize = (): number => {
dd951876 226 return this.tasksQueue.maxSize
4b628b48
JB
227 }
228 return {
229 tasks: {
230 executed: 0,
231 executing: 0,
232 get queued (): number {
233 return getTasksQueueSize()
234 },
235 get maxQueued (): number {
bf4ef2ca 236 return getTasksQueueMaxSize()
4b628b48 237 },
463226a4 238 sequentiallyStolen: 0,
68cbdc84 239 stolen: 0,
4b628b48
JB
240 failed: 0
241 },
242 runTime: {
c52475b8 243 history: new CircularArray<number>()
4b628b48
JB
244 },
245 waitTime: {
c52475b8 246 history: new CircularArray<number>()
4b628b48
JB
247 },
248 elu: {
249 idle: {
c52475b8 250 history: new CircularArray<number>()
4b628b48
JB
251 },
252 active: {
c52475b8 253 history: new CircularArray<number>()
4b628b48
JB
254 }
255 }
256 }
257 }
258
db0e38ee 259 private initTaskFunctionWorkerUsage (name: string): WorkerUsage {
e5ece61d
JB
260 const getTaskFunctionQueueSize = (): number => {
261 let taskFunctionQueueSize = 0
b25a42cd 262 for (const task of this.tasksQueue) {
dd92a715 263 if (
e5ece61d 264 (task.name === DEFAULT_TASK_NAME &&
6703b9f4 265 name === (this.info.taskFunctionNames as string[])[1]) ||
e5ece61d 266 (task.name !== DEFAULT_TASK_NAME && name === task.name)
dd92a715 267 ) {
e5ece61d 268 ++taskFunctionQueueSize
b25a42cd
JB
269 }
270 }
e5ece61d 271 return taskFunctionQueueSize
b25a42cd
JB
272 }
273 return {
274 tasks: {
275 executed: 0,
276 executing: 0,
277 get queued (): number {
e5ece61d 278 return getTaskFunctionQueueSize()
b25a42cd 279 },
463226a4 280 sequentiallyStolen: 0,
68cbdc84 281 stolen: 0,
b25a42cd
JB
282 failed: 0
283 },
284 runTime: {
c52475b8 285 history: new CircularArray<number>()
b25a42cd
JB
286 },
287 waitTime: {
c52475b8 288 history: new CircularArray<number>()
b25a42cd
JB
289 },
290 elu: {
291 idle: {
c52475b8 292 history: new CircularArray<number>()
b25a42cd
JB
293 },
294 active: {
c52475b8 295 history: new CircularArray<number>()
b25a42cd
JB
296 }
297 }
298 }
299 }
4b628b48 300}