refactor: cleanup eslint configuration
[poolifier.git] / src / pools / worker-node.ts
1 import { EventEmitter } from 'node:events'
2 import { MessageChannel } from 'node:worker_threads'
3
4 import { CircularArray } from '../circular-array.js'
5 import { Deque } from '../deque.js'
6 import type { Task } from '../utility-types.js'
7 import { DEFAULT_TASK_NAME } from '../utils.js'
8 import {
9 checkWorkerNodeArguments,
10 createWorker,
11 getWorkerId,
12 getWorkerType
13 } from './utils.js'
14 import {
15 type EventHandler,
16 type IWorker,
17 type IWorkerNode,
18 type StrategyData,
19 type WorkerInfo,
20 type WorkerNodeOptions,
21 type WorkerType,
22 WorkerTypes,
23 type WorkerUsage
24 } from './worker.js'
25
26 /**
27 * Worker node.
28 *
29 * @typeParam Worker - Type of worker.
30 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
31 */
32 export class WorkerNode<Worker extends IWorker, Data = unknown>
33 extends EventEmitter
34 implements IWorkerNode<Worker, Data> {
35 /** @inheritdoc */
36 public readonly worker: Worker
37 /** @inheritdoc */
38 public readonly info: WorkerInfo
39 /** @inheritdoc */
40 public usage: WorkerUsage
41 /** @inheritdoc */
42 public strategyData?: StrategyData
43 /** @inheritdoc */
44 public messageChannel?: MessageChannel
45 /** @inheritdoc */
46 public tasksQueueBackPressureSize: number
47 private readonly tasksQueue: Deque<Task<Data>>
48 private onBackPressureStarted: boolean
49 private readonly taskFunctionsUsage: Map<string, WorkerUsage>
50
51 /**
52 * Constructs a new worker node.
53 *
54 * @param type - The worker type.
55 * @param filePath - Path to the worker file.
56 * @param opts - The worker node options.
57 */
58 constructor (type: WorkerType, filePath: string, opts: WorkerNodeOptions) {
59 super()
60 checkWorkerNodeArguments(type, filePath, opts)
61 this.worker = createWorker<Worker>(type, filePath, {
62 env: opts.env,
63 workerOptions: opts.workerOptions
64 })
65 this.info = this.initWorkerInfo(this.worker)
66 this.usage = this.initWorkerUsage()
67 if (this.info.type === WorkerTypes.thread) {
68 this.messageChannel = new MessageChannel()
69 }
70 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
71 this.tasksQueueBackPressureSize = opts.tasksQueueBackPressureSize!
72 this.tasksQueue = new Deque<Task<Data>>()
73 this.onBackPressureStarted = false
74 this.taskFunctionsUsage = new Map<string, WorkerUsage>()
75 }
76
77 /** @inheritdoc */
78 public tasksQueueSize (): number {
79 return this.tasksQueue.size
80 }
81
82 /** @inheritdoc */
83 public enqueueTask (task: Task<Data>): number {
84 const tasksQueueSize = this.tasksQueue.push(task)
85 if (this.hasBackPressure() && !this.onBackPressureStarted) {
86 this.onBackPressureStarted = true
87 this.emit('backPressure', { workerId: this.info.id })
88 this.onBackPressureStarted = false
89 }
90 return tasksQueueSize
91 }
92
93 /** @inheritdoc */
94 public unshiftTask (task: Task<Data>): number {
95 const tasksQueueSize = this.tasksQueue.unshift(task)
96 if (this.hasBackPressure() && !this.onBackPressureStarted) {
97 this.onBackPressureStarted = true
98 this.emit('backPressure', { workerId: this.info.id })
99 this.onBackPressureStarted = false
100 }
101 return tasksQueueSize
102 }
103
104 /** @inheritdoc */
105 public dequeueTask (): Task<Data> | undefined {
106 return this.tasksQueue.shift()
107 }
108
109 /** @inheritdoc */
110 public popTask (): Task<Data> | undefined {
111 return this.tasksQueue.pop()
112 }
113
114 /** @inheritdoc */
115 public clearTasksQueue (): void {
116 this.tasksQueue.clear()
117 }
118
119 /** @inheritdoc */
120 public hasBackPressure (): boolean {
121 return this.tasksQueue.size >= this.tasksQueueBackPressureSize
122 }
123
124 /** @inheritdoc */
125 public resetUsage (): void {
126 this.usage = this.initWorkerUsage()
127 this.taskFunctionsUsage.clear()
128 }
129
130 /** @inheritdoc */
131 public async terminate (): Promise<void> {
132 const waitWorkerExit = new Promise<void>(resolve => {
133 this.registerOnceWorkerEventHandler('exit', () => {
134 resolve()
135 })
136 })
137 this.closeMessageChannel()
138 this.removeAllListeners()
139 switch (this.info.type) {
140 case WorkerTypes.thread:
141 this.worker.unref?.()
142 await this.worker.terminate?.()
143 break
144 case WorkerTypes.cluster:
145 this.registerOnceWorkerEventHandler('disconnect', () => {
146 this.worker.kill?.()
147 })
148 this.worker.disconnect?.()
149 break
150 }
151 await waitWorkerExit
152 }
153
154 /** @inheritdoc */
155 public registerWorkerEventHandler (
156 event: string,
157 handler: EventHandler<Worker>
158 ): void {
159 this.worker.on(event, handler)
160 }
161
162 /** @inheritdoc */
163 public registerOnceWorkerEventHandler (
164 event: string,
165 handler: EventHandler<Worker>
166 ): void {
167 this.worker.once(event, handler)
168 }
169
170 /** @inheritdoc */
171 public getTaskFunctionWorkerUsage (name: string): WorkerUsage | undefined {
172 if (!Array.isArray(this.info.taskFunctionNames)) {
173 throw new Error(
174 `Cannot get task function worker usage for task function name '${name}' when task function names list is not yet defined`
175 )
176 }
177 if (
178 Array.isArray(this.info.taskFunctionNames) &&
179 this.info.taskFunctionNames.length < 3
180 ) {
181 throw new Error(
182 `Cannot get task function worker usage for task function name '${name}' when task function names list has less than 3 elements`
183 )
184 }
185 if (name === DEFAULT_TASK_NAME) {
186 name = this.info.taskFunctionNames[1]
187 }
188 if (!this.taskFunctionsUsage.has(name)) {
189 this.taskFunctionsUsage.set(name, this.initTaskFunctionWorkerUsage(name))
190 }
191 return this.taskFunctionsUsage.get(name)
192 }
193
194 /** @inheritdoc */
195 public deleteTaskFunctionWorkerUsage (name: string): boolean {
196 return this.taskFunctionsUsage.delete(name)
197 }
198
199 private closeMessageChannel (): void {
200 if (this.messageChannel != null) {
201 this.messageChannel.port1.unref()
202 this.messageChannel.port2.unref()
203 this.messageChannel.port1.close()
204 this.messageChannel.port2.close()
205 delete this.messageChannel
206 }
207 }
208
209 private initWorkerInfo (worker: Worker): WorkerInfo {
210 return {
211 id: getWorkerId(worker),
212 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
213 type: getWorkerType(worker)!,
214 dynamic: false,
215 ready: false,
216 stealing: false
217 }
218 }
219
220 private initWorkerUsage (): WorkerUsage {
221 const getTasksQueueSize = (): number => {
222 return this.tasksQueue.size
223 }
224 const getTasksQueueMaxSize = (): number => {
225 return this.tasksQueue.maxSize
226 }
227 return {
228 tasks: {
229 executed: 0,
230 executing: 0,
231 get queued (): number {
232 return getTasksQueueSize()
233 },
234 get maxQueued (): number {
235 return getTasksQueueMaxSize()
236 },
237 sequentiallyStolen: 0,
238 stolen: 0,
239 failed: 0
240 },
241 runTime: {
242 history: new CircularArray<number>()
243 },
244 waitTime: {
245 history: new CircularArray<number>()
246 },
247 elu: {
248 idle: {
249 history: new CircularArray<number>()
250 },
251 active: {
252 history: new CircularArray<number>()
253 }
254 }
255 }
256 }
257
258 private initTaskFunctionWorkerUsage (name: string): WorkerUsage {
259 const getTaskFunctionQueueSize = (): number => {
260 let taskFunctionQueueSize = 0
261 for (const task of this.tasksQueue) {
262 if (
263 (task.name === DEFAULT_TASK_NAME &&
264 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
265 name === this.info.taskFunctionNames![1]) ||
266 (task.name !== DEFAULT_TASK_NAME && name === task.name)
267 ) {
268 ++taskFunctionQueueSize
269 }
270 }
271 return taskFunctionQueueSize
272 }
273 return {
274 tasks: {
275 executed: 0,
276 executing: 0,
277 get queued (): number {
278 return getTaskFunctionQueueSize()
279 },
280 sequentiallyStolen: 0,
281 stolen: 0,
282 failed: 0
283 },
284 runTime: {
285 history: new CircularArray<number>()
286 },
287 waitTime: {
288 history: new CircularArray<number>()
289 },
290 elu: {
291 idle: {
292 history: new CircularArray<number>()
293 },
294 active: {
295 history: new CircularArray<number>()
296 }
297 }
298 }
299 }
300 }