refactor: cleanup some eslint rules disablement
[poolifier.git] / src / pools / worker-node.ts
1 import { MessageChannel } from 'node:worker_threads'
2 import { EventEmitter } from 'node:events'
3 import { CircularArray } from '../circular-array.js'
4 import type { Task } from '../utility-types.js'
5 import { DEFAULT_TASK_NAME, getWorkerId, getWorkerType } from '../utils.js'
6 import { Deque } from '../deque.js'
7 import {
8 type EventHandler,
9 type IWorker,
10 type IWorkerNode,
11 type StrategyData,
12 type WorkerInfo,
13 type WorkerNodeOptions,
14 type WorkerType,
15 WorkerTypes,
16 type WorkerUsage
17 } from './worker.js'
18 import { checkWorkerNodeArguments, createWorker } from './utils.js'
19
20 /**
21 * Worker node.
22 *
23 * @typeParam Worker - Type of worker.
24 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
25 */
26 export class WorkerNode<Worker extends IWorker, Data = unknown>
27 extends EventEmitter
28 implements IWorkerNode<Worker, Data> {
29 /** @inheritdoc */
30 public readonly worker: Worker
31 /** @inheritdoc */
32 public readonly info: WorkerInfo
33 /** @inheritdoc */
34 public usage: WorkerUsage
35 /** @inheritdoc */
36 public strategyData?: StrategyData
37 /** @inheritdoc */
38 public messageChannel?: MessageChannel
39 /** @inheritdoc */
40 public tasksQueueBackPressureSize: number
41 private readonly tasksQueue: Deque<Task<Data>>
42 private onBackPressureStarted: boolean
43 private readonly taskFunctionsUsage: Map<string, WorkerUsage>
44
45 /**
46 * Constructs a new worker node.
47 *
48 * @param type - The worker type.
49 * @param filePath - Path to the worker file.
50 * @param opts - The worker node options.
51 */
52 constructor (type: WorkerType, filePath: string, opts: WorkerNodeOptions) {
53 super()
54 checkWorkerNodeArguments(type, filePath, opts)
55 this.worker = createWorker<Worker>(type, filePath, {
56 env: opts.env,
57 workerOptions: opts.workerOptions
58 })
59 this.info = this.initWorkerInfo(this.worker)
60 this.usage = this.initWorkerUsage()
61 if (this.info.type === WorkerTypes.thread) {
62 this.messageChannel = new MessageChannel()
63 }
64 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
65 this.tasksQueueBackPressureSize = opts.tasksQueueBackPressureSize!
66 this.tasksQueue = new Deque<Task<Data>>()
67 this.onBackPressureStarted = false
68 this.taskFunctionsUsage = new Map<string, WorkerUsage>()
69 }
70
71 /** @inheritdoc */
72 public tasksQueueSize (): number {
73 return this.tasksQueue.size
74 }
75
76 /** @inheritdoc */
77 public enqueueTask (task: Task<Data>): number {
78 const tasksQueueSize = this.tasksQueue.push(task)
79 if (this.hasBackPressure() && !this.onBackPressureStarted) {
80 this.onBackPressureStarted = true
81 this.emit('backPressure', { workerId: this.info.id })
82 this.onBackPressureStarted = false
83 }
84 return tasksQueueSize
85 }
86
87 /** @inheritdoc */
88 public unshiftTask (task: Task<Data>): number {
89 const tasksQueueSize = this.tasksQueue.unshift(task)
90 if (this.hasBackPressure() && !this.onBackPressureStarted) {
91 this.onBackPressureStarted = true
92 this.emit('backPressure', { workerId: this.info.id })
93 this.onBackPressureStarted = false
94 }
95 return tasksQueueSize
96 }
97
98 /** @inheritdoc */
99 public dequeueTask (): Task<Data> | undefined {
100 return this.tasksQueue.shift()
101 }
102
103 /** @inheritdoc */
104 public popTask (): Task<Data> | undefined {
105 return this.tasksQueue.pop()
106 }
107
108 /** @inheritdoc */
109 public clearTasksQueue (): void {
110 this.tasksQueue.clear()
111 }
112
113 /** @inheritdoc */
114 public hasBackPressure (): boolean {
115 return this.tasksQueue.size >= this.tasksQueueBackPressureSize
116 }
117
118 /** @inheritdoc */
119 public resetUsage (): void {
120 this.usage = this.initWorkerUsage()
121 this.taskFunctionsUsage.clear()
122 }
123
124 /** @inheritdoc */
125 public async terminate (): Promise<void> {
126 const waitWorkerExit = new Promise<void>(resolve => {
127 this.registerOnceWorkerEventHandler('exit', () => {
128 resolve()
129 })
130 })
131 this.closeMessageChannel()
132 this.removeAllListeners()
133 switch (this.info.type) {
134 case WorkerTypes.thread:
135 await this.worker.terminate?.()
136 break
137 case WorkerTypes.cluster:
138 this.registerOnceWorkerEventHandler('disconnect', () => {
139 this.worker.kill?.()
140 })
141 this.worker.disconnect?.()
142 break
143 }
144 await waitWorkerExit
145 }
146
147 /** @inheritdoc */
148 public registerWorkerEventHandler (
149 event: string,
150 handler: EventHandler<Worker>
151 ): void {
152 this.worker.on(event, handler)
153 }
154
155 /** @inheritdoc */
156 public registerOnceWorkerEventHandler (
157 event: string,
158 handler: EventHandler<Worker>
159 ): void {
160 this.worker.once(event, handler)
161 }
162
163 /** @inheritdoc */
164 public getTaskFunctionWorkerUsage (name: string): WorkerUsage | undefined {
165 if (!Array.isArray(this.info.taskFunctionNames)) {
166 throw new Error(
167 `Cannot get task function worker usage for task function name '${name}' when task function names list is not yet defined`
168 )
169 }
170 if (
171 Array.isArray(this.info.taskFunctionNames) &&
172 this.info.taskFunctionNames.length < 3
173 ) {
174 throw new Error(
175 `Cannot get task function worker usage for task function name '${name}' when task function names list has less than 3 elements`
176 )
177 }
178 if (name === DEFAULT_TASK_NAME) {
179 name = this.info.taskFunctionNames[1]
180 }
181 if (!this.taskFunctionsUsage.has(name)) {
182 this.taskFunctionsUsage.set(name, this.initTaskFunctionWorkerUsage(name))
183 }
184 return this.taskFunctionsUsage.get(name)
185 }
186
187 /** @inheritdoc */
188 public deleteTaskFunctionWorkerUsage (name: string): boolean {
189 return this.taskFunctionsUsage.delete(name)
190 }
191
192 private closeMessageChannel (): void {
193 if (this.messageChannel != null) {
194 this.messageChannel.port1.unref()
195 this.messageChannel.port2.unref()
196 this.messageChannel.port1.close()
197 this.messageChannel.port2.close()
198 delete this.messageChannel
199 }
200 }
201
202 private initWorkerInfo (worker: Worker): WorkerInfo {
203 return {
204 id: getWorkerId(worker),
205 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
206 type: getWorkerType(worker)!,
207 dynamic: false,
208 ready: false,
209 stealing: false
210 }
211 }
212
213 private initWorkerUsage (): WorkerUsage {
214 const getTasksQueueSize = (): number => {
215 return this.tasksQueue.size
216 }
217 const getTasksQueueMaxSize = (): number => {
218 return this.tasksQueue.maxSize
219 }
220 return {
221 tasks: {
222 executed: 0,
223 executing: 0,
224 get queued (): number {
225 return getTasksQueueSize()
226 },
227 get maxQueued (): number {
228 return getTasksQueueMaxSize()
229 },
230 sequentiallyStolen: 0,
231 stolen: 0,
232 failed: 0
233 },
234 runTime: {
235 history: new CircularArray<number>()
236 },
237 waitTime: {
238 history: new CircularArray<number>()
239 },
240 elu: {
241 idle: {
242 history: new CircularArray<number>()
243 },
244 active: {
245 history: new CircularArray<number>()
246 }
247 }
248 }
249 }
250
251 private initTaskFunctionWorkerUsage (name: string): WorkerUsage {
252 const getTaskFunctionQueueSize = (): number => {
253 let taskFunctionQueueSize = 0
254 for (const task of this.tasksQueue) {
255 if (
256 (task.name === DEFAULT_TASK_NAME &&
257 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
258 name === this.info.taskFunctionNames![1]) ||
259 (task.name !== DEFAULT_TASK_NAME && name === task.name)
260 ) {
261 ++taskFunctionQueueSize
262 }
263 }
264 return taskFunctionQueueSize
265 }
266 return {
267 tasks: {
268 executed: 0,
269 executing: 0,
270 get queued (): number {
271 return getTaskFunctionQueueSize()
272 },
273 sequentiallyStolen: 0,
274 stolen: 0,
275 failed: 0
276 },
277 runTime: {
278 history: new CircularArray<number>()
279 },
280 waitTime: {
281 history: new CircularArray<number>()
282 },
283 elu: {
284 idle: {
285 history: new CircularArray<number>()
286 },
287 active: {
288 history: new CircularArray<number>()
289 }
290 }
291 }
292 }
293 }