refactor: renable standard JS linter rules
[poolifier.git] / src / pools / worker-node.ts
CommitLineData
85aeb3f3 1import { MessageChannel } from 'node:worker_threads'
e1c2dba7 2import { EventEmitter } from 'node:events'
d35e5717
JB
3import { CircularArray } from '../circular-array.js'
4import type { Task } from '../utility-types.js'
5import { DEFAULT_TASK_NAME, getWorkerId, getWorkerType } from '../utils.js'
6import { Deque } from '../deque.js'
4b628b48 7import {
c3719753
JB
8 type ErrorHandler,
9 type ExitHandler,
4b628b48
JB
10 type IWorker,
11 type IWorkerNode,
c3719753
JB
12 type MessageHandler,
13 type OnlineHandler,
f3a91bac 14 type StrategyData,
4b628b48 15 type WorkerInfo,
c3719753 16 type WorkerNodeOptions,
4b628b48
JB
17 type WorkerType,
18 WorkerTypes,
19 type WorkerUsage
d35e5717
JB
20} from './worker.js'
21import { checkWorkerNodeArguments, createWorker } from './utils.js'
4b628b48 22
60664f48
JB
23/**
24 * Worker node.
25 *
26 * @typeParam Worker - Type of worker.
27 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
28 */
4b628b48 29export class WorkerNode<Worker extends IWorker, Data = unknown>
e1c2dba7 30 extends EventEmitter
9f95d5eb 31 implements IWorkerNode<Worker, Data> {
671d5154 32 /** @inheritdoc */
4b628b48 33 public readonly worker: Worker
671d5154 34 /** @inheritdoc */
4b628b48 35 public readonly info: WorkerInfo
671d5154 36 /** @inheritdoc */
4b628b48 37 public usage: WorkerUsage
20c6f652 38 /** @inheritdoc */
f3a91bac
JB
39 public strategyData?: StrategyData
40 /** @inheritdoc */
26fb3c18
JB
41 public messageChannel?: MessageChannel
42 /** @inheritdoc */
20c6f652 43 public tasksQueueBackPressureSize: number
574b351d 44 private readonly tasksQueue: Deque<Task<Data>>
47352846 45 private onBackPressureStarted: boolean
26fb3c18 46 private readonly taskFunctionsUsage: Map<string, WorkerUsage>
4b628b48 47
60664f48
JB
48 /**
49 * Constructs a new worker node.
50 *
c3719753 51 * @param type - The worker type.
9974369e 52 * @param filePath - Path to the worker file.
c3719753 53 * @param opts - The worker node options.
60664f48 54 */
c3719753 55 constructor (type: WorkerType, filePath: string, opts: WorkerNodeOptions) {
9f95d5eb 56 super()
c3719753
JB
57 checkWorkerNodeArguments(type, filePath, opts)
58 this.worker = createWorker<Worker>(type, filePath, {
59 env: opts.env,
60 workerOptions: opts.workerOptions
61 })
62 this.info = this.initWorkerInfo(this.worker)
26fb3c18 63 this.usage = this.initWorkerUsage()
75de9f41 64 if (this.info.type === WorkerTypes.thread) {
7884d183
JB
65 this.messageChannel = new MessageChannel()
66 }
c3719753 67 this.tasksQueueBackPressureSize = opts.tasksQueueBackPressureSize
26fb3c18 68 this.tasksQueue = new Deque<Task<Data>>()
47352846 69 this.onBackPressureStarted = false
26fb3c18 70 this.taskFunctionsUsage = new Map<string, WorkerUsage>()
4b628b48
JB
71 }
72
73 /** @inheritdoc */
74 public tasksQueueSize (): number {
75 return this.tasksQueue.size
76 }
77
4b628b48
JB
78 /** @inheritdoc */
79 public enqueueTask (task: Task<Data>): number {
72695f86 80 const tasksQueueSize = this.tasksQueue.push(task)
9f95d5eb 81 if (this.hasBackPressure() && !this.onBackPressureStarted) {
47352846 82 this.onBackPressureStarted = true
67f3f2d6
JB
83 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
84 this.emit('backPressure', { workerId: this.info.id! })
47352846 85 this.onBackPressureStarted = false
72695f86
JB
86 }
87 return tasksQueueSize
88 }
89
90 /** @inheritdoc */
91 public unshiftTask (task: Task<Data>): number {
92 const tasksQueueSize = this.tasksQueue.unshift(task)
9f95d5eb 93 if (this.hasBackPressure() && !this.onBackPressureStarted) {
47352846 94 this.onBackPressureStarted = true
67f3f2d6
JB
95 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
96 this.emit('backPressure', { workerId: this.info.id! })
47352846 97 this.onBackPressureStarted = false
72695f86
JB
98 }
99 return tasksQueueSize
4b628b48
JB
100 }
101
102 /** @inheritdoc */
103 public dequeueTask (): Task<Data> | undefined {
463226a4 104 return this.tasksQueue.shift()
4b628b48
JB
105 }
106
72695f86
JB
107 /** @inheritdoc */
108 public popTask (): Task<Data> | undefined {
463226a4 109 return this.tasksQueue.pop()
72695f86
JB
110 }
111
4b628b48
JB
112 /** @inheritdoc */
113 public clearTasksQueue (): void {
114 this.tasksQueue.clear()
115 }
116
671d5154
JB
117 /** @inheritdoc */
118 public hasBackPressure (): boolean {
8735b4e5 119 return this.tasksQueue.size >= this.tasksQueueBackPressureSize
671d5154
JB
120 }
121
ff128cc9 122 /** @inheritdoc */
4b628b48
JB
123 public resetUsage (): void {
124 this.usage = this.initWorkerUsage()
db0e38ee 125 this.taskFunctionsUsage.clear()
ff128cc9
JB
126 }
127
3f09ed9f 128 /** @inheritdoc */
07e0c9e5
JB
129 public async terminate (): Promise<void> {
130 const waitWorkerExit = new Promise<void>(resolve => {
131 this.registerOnceWorkerEventHandler('exit', () => {
132 resolve()
133 })
134 })
135 this.closeMessageChannel()
136 this.removeAllListeners()
839b98b8
JB
137 switch (this.info.type) {
138 case WorkerTypes.thread:
139 await this.worker.terminate?.()
140 break
141 case WorkerTypes.cluster:
142 this.registerOnceWorkerEventHandler('disconnect', () => {
143 this.worker.kill?.()
144 })
145 this.worker.disconnect?.()
146 break
3f09ed9f 147 }
07e0c9e5 148 await waitWorkerExit
3f09ed9f
JB
149 }
150
c3719753
JB
151 /** @inheritdoc */
152 public registerWorkerEventHandler (
153 event: string,
88af9bf1 154 handler:
c3719753
JB
155 | OnlineHandler<Worker>
156 | MessageHandler<Worker>
157 | ErrorHandler<Worker>
158 | ExitHandler<Worker>
159 ): void {
88af9bf1 160 this.worker.on(event, handler)
c3719753
JB
161 }
162
163 /** @inheritdoc */
164 public registerOnceWorkerEventHandler (
165 event: string,
88af9bf1 166 handler:
c3719753
JB
167 | OnlineHandler<Worker>
168 | MessageHandler<Worker>
169 | ErrorHandler<Worker>
170 | ExitHandler<Worker>
171 ): void {
88af9bf1 172 this.worker.once(event, handler)
c3719753
JB
173 }
174
ff128cc9 175 /** @inheritdoc */
db0e38ee 176 public getTaskFunctionWorkerUsage (name: string): WorkerUsage | undefined {
6703b9f4 177 if (!Array.isArray(this.info.taskFunctionNames)) {
71b2b6d8 178 throw new Error(
db0e38ee 179 `Cannot get task function worker usage for task function name '${name}' when task function names list is not yet defined`
71b2b6d8
JB
180 )
181 }
b558f6b5 182 if (
6703b9f4
JB
183 Array.isArray(this.info.taskFunctionNames) &&
184 this.info.taskFunctionNames.length < 3
b558f6b5 185 ) {
db0e38ee
JB
186 throw new Error(
187 `Cannot get task function worker usage for task function name '${name}' when task function names list has less than 3 elements`
188 )
189 }
190 if (name === DEFAULT_TASK_NAME) {
6703b9f4 191 name = this.info.taskFunctionNames[1]
b558f6b5 192 }
db0e38ee
JB
193 if (!this.taskFunctionsUsage.has(name)) {
194 this.taskFunctionsUsage.set(name, this.initTaskFunctionWorkerUsage(name))
ff128cc9 195 }
db0e38ee 196 return this.taskFunctionsUsage.get(name)
4b628b48
JB
197 }
198
adee6053
JB
199 /** @inheritdoc */
200 public deleteTaskFunctionWorkerUsage (name: string): boolean {
201 return this.taskFunctionsUsage.delete(name)
202 }
203
07e0c9e5
JB
204 private closeMessageChannel (): void {
205 if (this.messageChannel != null) {
206 this.messageChannel.port1.unref()
207 this.messageChannel.port2.unref()
208 this.messageChannel.port1.close()
209 this.messageChannel.port2.close()
210 delete this.messageChannel
211 }
212 }
213
75de9f41 214 private initWorkerInfo (worker: Worker): WorkerInfo {
4b628b48 215 return {
75de9f41 216 id: getWorkerId(worker),
67f3f2d6
JB
217 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
218 type: getWorkerType(worker)!,
4b628b48 219 dynamic: false,
5eb72b9e
JB
220 ready: false,
221 stealing: false
4b628b48
JB
222 }
223 }
224
225 private initWorkerUsage (): WorkerUsage {
226 const getTasksQueueSize = (): number => {
dd951876 227 return this.tasksQueue.size
4b628b48 228 }
bf4ef2ca 229 const getTasksQueueMaxSize = (): number => {
dd951876 230 return this.tasksQueue.maxSize
4b628b48
JB
231 }
232 return {
233 tasks: {
234 executed: 0,
235 executing: 0,
236 get queued (): number {
237 return getTasksQueueSize()
238 },
239 get maxQueued (): number {
bf4ef2ca 240 return getTasksQueueMaxSize()
4b628b48 241 },
463226a4 242 sequentiallyStolen: 0,
68cbdc84 243 stolen: 0,
4b628b48
JB
244 failed: 0
245 },
246 runTime: {
c52475b8 247 history: new CircularArray<number>()
4b628b48
JB
248 },
249 waitTime: {
c52475b8 250 history: new CircularArray<number>()
4b628b48
JB
251 },
252 elu: {
253 idle: {
c52475b8 254 history: new CircularArray<number>()
4b628b48
JB
255 },
256 active: {
c52475b8 257 history: new CircularArray<number>()
4b628b48
JB
258 }
259 }
260 }
261 }
262
db0e38ee 263 private initTaskFunctionWorkerUsage (name: string): WorkerUsage {
e5ece61d
JB
264 const getTaskFunctionQueueSize = (): number => {
265 let taskFunctionQueueSize = 0
b25a42cd 266 for (const task of this.tasksQueue) {
dd92a715 267 if (
e5ece61d 268 (task.name === DEFAULT_TASK_NAME &&
67f3f2d6
JB
269 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
270 name === this.info.taskFunctionNames![1]) ||
e5ece61d 271 (task.name !== DEFAULT_TASK_NAME && name === task.name)
dd92a715 272 ) {
e5ece61d 273 ++taskFunctionQueueSize
b25a42cd
JB
274 }
275 }
e5ece61d 276 return taskFunctionQueueSize
b25a42cd
JB
277 }
278 return {
279 tasks: {
280 executed: 0,
281 executing: 0,
282 get queued (): number {
e5ece61d 283 return getTaskFunctionQueueSize()
b25a42cd 284 },
463226a4 285 sequentiallyStolen: 0,
68cbdc84 286 stolen: 0,
b25a42cd
JB
287 failed: 0
288 },
289 runTime: {
c52475b8 290 history: new CircularArray<number>()
b25a42cd
JB
291 },
292 waitTime: {
c52475b8 293 history: new CircularArray<number>()
b25a42cd
JB
294 },
295 elu: {
296 idle: {
c52475b8 297 history: new CircularArray<number>()
b25a42cd
JB
298 },
299 active: {
c52475b8 300 history: new CircularArray<number>()
b25a42cd
JB
301 }
302 }
303 }
304 }
4b628b48 305}