build: make eslint configuration use strict type checking
[poolifier.git] / src / pools / worker-node.ts
1 import { MessageChannel } from 'node:worker_threads'
2 import { EventEmitter } from 'node:events'
3 import { CircularArray } from '../circular-array.js'
4 import type { Task } from '../utility-types.js'
5 import { DEFAULT_TASK_NAME, getWorkerId, getWorkerType } from '../utils.js'
6 import { Deque } from '../deque.js'
7 import {
8 type EventHandler,
9 type IWorker,
10 type IWorkerNode,
11 type StrategyData,
12 type WorkerInfo,
13 type WorkerNodeOptions,
14 type WorkerType,
15 WorkerTypes,
16 type WorkerUsage
17 } from './worker.js'
18 import { checkWorkerNodeArguments, createWorker } from './utils.js'
19
20 /**
21 * Worker node.
22 *
23 * @typeParam Worker - Type of worker.
24 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
25 */
26 export class WorkerNode<Worker extends IWorker, Data = unknown>
27 extends EventEmitter
28 implements IWorkerNode<Worker, Data> {
29 /** @inheritdoc */
30 public readonly worker: Worker
31 /** @inheritdoc */
32 public readonly info: WorkerInfo
33 /** @inheritdoc */
34 public usage: WorkerUsage
35 /** @inheritdoc */
36 public strategyData?: StrategyData
37 /** @inheritdoc */
38 public messageChannel?: MessageChannel
39 /** @inheritdoc */
40 public tasksQueueBackPressureSize: number
41 private readonly tasksQueue: Deque<Task<Data>>
42 private onBackPressureStarted: boolean
43 private readonly taskFunctionsUsage: Map<string, WorkerUsage>
44
45 /**
46 * Constructs a new worker node.
47 *
48 * @param type - The worker type.
49 * @param filePath - Path to the worker file.
50 * @param opts - The worker node options.
51 */
52 constructor (type: WorkerType, filePath: string, opts: WorkerNodeOptions) {
53 super()
54 checkWorkerNodeArguments(type, filePath, opts)
55 this.worker = createWorker<Worker>(type, filePath, {
56 env: opts.env,
57 workerOptions: opts.workerOptions
58 })
59 this.info = this.initWorkerInfo(this.worker)
60 this.usage = this.initWorkerUsage()
61 if (this.info.type === WorkerTypes.thread) {
62 this.messageChannel = new MessageChannel()
63 }
64 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
65 this.tasksQueueBackPressureSize = opts.tasksQueueBackPressureSize!
66 this.tasksQueue = new Deque<Task<Data>>()
67 this.onBackPressureStarted = false
68 this.taskFunctionsUsage = new Map<string, WorkerUsage>()
69 }
70
71 /** @inheritdoc */
72 public tasksQueueSize (): number {
73 return this.tasksQueue.size
74 }
75
76 /** @inheritdoc */
77 public enqueueTask (task: Task<Data>): number {
78 const tasksQueueSize = this.tasksQueue.push(task)
79 if (this.hasBackPressure() && !this.onBackPressureStarted) {
80 this.onBackPressureStarted = true
81 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
82 this.emit('backPressure', { workerId: this.info.id! })
83 this.onBackPressureStarted = false
84 }
85 return tasksQueueSize
86 }
87
88 /** @inheritdoc */
89 public unshiftTask (task: Task<Data>): number {
90 const tasksQueueSize = this.tasksQueue.unshift(task)
91 if (this.hasBackPressure() && !this.onBackPressureStarted) {
92 this.onBackPressureStarted = true
93 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
94 this.emit('backPressure', { workerId: this.info.id! })
95 this.onBackPressureStarted = false
96 }
97 return tasksQueueSize
98 }
99
100 /** @inheritdoc */
101 public dequeueTask (): Task<Data> | undefined {
102 return this.tasksQueue.shift()
103 }
104
105 /** @inheritdoc */
106 public popTask (): Task<Data> | undefined {
107 return this.tasksQueue.pop()
108 }
109
110 /** @inheritdoc */
111 public clearTasksQueue (): void {
112 this.tasksQueue.clear()
113 }
114
115 /** @inheritdoc */
116 public hasBackPressure (): boolean {
117 return this.tasksQueue.size >= this.tasksQueueBackPressureSize
118 }
119
120 /** @inheritdoc */
121 public resetUsage (): void {
122 this.usage = this.initWorkerUsage()
123 this.taskFunctionsUsage.clear()
124 }
125
126 /** @inheritdoc */
127 public async terminate (): Promise<void> {
128 const waitWorkerExit = new Promise<void>(resolve => {
129 this.registerOnceWorkerEventHandler('exit', () => {
130 resolve()
131 })
132 })
133 this.closeMessageChannel()
134 this.removeAllListeners()
135 switch (this.info.type) {
136 case WorkerTypes.thread:
137 await this.worker.terminate?.()
138 break
139 case WorkerTypes.cluster:
140 this.registerOnceWorkerEventHandler('disconnect', () => {
141 this.worker.kill?.()
142 })
143 this.worker.disconnect?.()
144 break
145 }
146 await waitWorkerExit
147 }
148
149 /** @inheritdoc */
150 public registerWorkerEventHandler (
151 event: string,
152 handler: EventHandler<Worker>
153 ): void {
154 this.worker.on(event, handler)
155 }
156
157 /** @inheritdoc */
158 public registerOnceWorkerEventHandler (
159 event: string,
160 handler: EventHandler<Worker>
161 ): void {
162 this.worker.once(event, handler)
163 }
164
165 /** @inheritdoc */
166 public getTaskFunctionWorkerUsage (name: string): WorkerUsage | undefined {
167 if (!Array.isArray(this.info.taskFunctionNames)) {
168 throw new Error(
169 `Cannot get task function worker usage for task function name '${name}' when task function names list is not yet defined`
170 )
171 }
172 if (
173 Array.isArray(this.info.taskFunctionNames) &&
174 this.info.taskFunctionNames.length < 3
175 ) {
176 throw new Error(
177 `Cannot get task function worker usage for task function name '${name}' when task function names list has less than 3 elements`
178 )
179 }
180 if (name === DEFAULT_TASK_NAME) {
181 name = this.info.taskFunctionNames[1]
182 }
183 if (!this.taskFunctionsUsage.has(name)) {
184 this.taskFunctionsUsage.set(name, this.initTaskFunctionWorkerUsage(name))
185 }
186 return this.taskFunctionsUsage.get(name)
187 }
188
189 /** @inheritdoc */
190 public deleteTaskFunctionWorkerUsage (name: string): boolean {
191 return this.taskFunctionsUsage.delete(name)
192 }
193
194 private closeMessageChannel (): void {
195 if (this.messageChannel != null) {
196 this.messageChannel.port1.unref()
197 this.messageChannel.port2.unref()
198 this.messageChannel.port1.close()
199 this.messageChannel.port2.close()
200 delete this.messageChannel
201 }
202 }
203
204 private initWorkerInfo (worker: Worker): WorkerInfo {
205 return {
206 id: getWorkerId(worker),
207 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
208 type: getWorkerType(worker)!,
209 dynamic: false,
210 ready: false,
211 stealing: false
212 }
213 }
214
215 private initWorkerUsage (): WorkerUsage {
216 const getTasksQueueSize = (): number => {
217 return this.tasksQueue.size
218 }
219 const getTasksQueueMaxSize = (): number => {
220 return this.tasksQueue.maxSize
221 }
222 return {
223 tasks: {
224 executed: 0,
225 executing: 0,
226 get queued (): number {
227 return getTasksQueueSize()
228 },
229 get maxQueued (): number {
230 return getTasksQueueMaxSize()
231 },
232 sequentiallyStolen: 0,
233 stolen: 0,
234 failed: 0
235 },
236 runTime: {
237 history: new CircularArray<number>()
238 },
239 waitTime: {
240 history: new CircularArray<number>()
241 },
242 elu: {
243 idle: {
244 history: new CircularArray<number>()
245 },
246 active: {
247 history: new CircularArray<number>()
248 }
249 }
250 }
251 }
252
253 private initTaskFunctionWorkerUsage (name: string): WorkerUsage {
254 const getTaskFunctionQueueSize = (): number => {
255 let taskFunctionQueueSize = 0
256 for (const task of this.tasksQueue) {
257 if (
258 (task.name === DEFAULT_TASK_NAME &&
259 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
260 name === this.info.taskFunctionNames![1]) ||
261 (task.name !== DEFAULT_TASK_NAME && name === task.name)
262 ) {
263 ++taskFunctionQueueSize
264 }
265 }
266 return taskFunctionQueueSize
267 }
268 return {
269 tasks: {
270 executed: 0,
271 executing: 0,
272 get queued (): number {
273 return getTaskFunctionQueueSize()
274 },
275 sequentiallyStolen: 0,
276 stolen: 0,
277 failed: 0
278 },
279 runTime: {
280 history: new CircularArray<number>()
281 },
282 waitTime: {
283 history: new CircularArray<number>()
284 },
285 elu: {
286 idle: {
287 history: new CircularArray<number>()
288 },
289 active: {
290 history: new CircularArray<number>()
291 }
292 }
293 }
294 }
295 }