build: make eslint configuration use strict type checking
[poolifier.git] / src / pools / worker-node.ts
CommitLineData
85aeb3f3 1import { MessageChannel } from 'node:worker_threads'
e1c2dba7 2import { EventEmitter } from 'node:events'
d35e5717
JB
3import { CircularArray } from '../circular-array.js'
4import type { Task } from '../utility-types.js'
5import { DEFAULT_TASK_NAME, getWorkerId, getWorkerType } from '../utils.js'
6import { Deque } from '../deque.js'
4b628b48 7import {
3bcbd4c5 8 type EventHandler,
4b628b48
JB
9 type IWorker,
10 type IWorkerNode,
f3a91bac 11 type StrategyData,
4b628b48 12 type WorkerInfo,
c3719753 13 type WorkerNodeOptions,
4b628b48
JB
14 type WorkerType,
15 WorkerTypes,
16 type WorkerUsage
d35e5717
JB
17} from './worker.js'
18import { checkWorkerNodeArguments, createWorker } from './utils.js'
4b628b48 19
60664f48
JB
20/**
21 * Worker node.
22 *
23 * @typeParam Worker - Type of worker.
24 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
25 */
4b628b48 26export class WorkerNode<Worker extends IWorker, Data = unknown>
e1c2dba7 27 extends EventEmitter
9f95d5eb 28 implements IWorkerNode<Worker, Data> {
671d5154 29 /** @inheritdoc */
4b628b48 30 public readonly worker: Worker
671d5154 31 /** @inheritdoc */
4b628b48 32 public readonly info: WorkerInfo
671d5154 33 /** @inheritdoc */
4b628b48 34 public usage: WorkerUsage
20c6f652 35 /** @inheritdoc */
f3a91bac
JB
36 public strategyData?: StrategyData
37 /** @inheritdoc */
26fb3c18
JB
38 public messageChannel?: MessageChannel
39 /** @inheritdoc */
20c6f652 40 public tasksQueueBackPressureSize: number
574b351d 41 private readonly tasksQueue: Deque<Task<Data>>
47352846 42 private onBackPressureStarted: boolean
26fb3c18 43 private readonly taskFunctionsUsage: Map<string, WorkerUsage>
4b628b48 44
60664f48
JB
45 /**
46 * Constructs a new worker node.
47 *
c3719753 48 * @param type - The worker type.
9974369e 49 * @param filePath - Path to the worker file.
c3719753 50 * @param opts - The worker node options.
60664f48 51 */
c3719753 52 constructor (type: WorkerType, filePath: string, opts: WorkerNodeOptions) {
9f95d5eb 53 super()
c3719753
JB
54 checkWorkerNodeArguments(type, filePath, opts)
55 this.worker = createWorker<Worker>(type, filePath, {
56 env: opts.env,
57 workerOptions: opts.workerOptions
58 })
59 this.info = this.initWorkerInfo(this.worker)
26fb3c18 60 this.usage = this.initWorkerUsage()
75de9f41 61 if (this.info.type === WorkerTypes.thread) {
7884d183
JB
62 this.messageChannel = new MessageChannel()
63 }
c63a35a0
JB
64 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
65 this.tasksQueueBackPressureSize = opts.tasksQueueBackPressureSize!
26fb3c18 66 this.tasksQueue = new Deque<Task<Data>>()
47352846 67 this.onBackPressureStarted = false
26fb3c18 68 this.taskFunctionsUsage = new Map<string, WorkerUsage>()
4b628b48
JB
69 }
70
71 /** @inheritdoc */
72 public tasksQueueSize (): number {
73 return this.tasksQueue.size
74 }
75
4b628b48
JB
76 /** @inheritdoc */
77 public enqueueTask (task: Task<Data>): number {
72695f86 78 const tasksQueueSize = this.tasksQueue.push(task)
9f95d5eb 79 if (this.hasBackPressure() && !this.onBackPressureStarted) {
47352846 80 this.onBackPressureStarted = true
67f3f2d6
JB
81 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
82 this.emit('backPressure', { workerId: this.info.id! })
47352846 83 this.onBackPressureStarted = false
72695f86
JB
84 }
85 return tasksQueueSize
86 }
87
88 /** @inheritdoc */
89 public unshiftTask (task: Task<Data>): number {
90 const tasksQueueSize = this.tasksQueue.unshift(task)
9f95d5eb 91 if (this.hasBackPressure() && !this.onBackPressureStarted) {
47352846 92 this.onBackPressureStarted = true
67f3f2d6
JB
93 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
94 this.emit('backPressure', { workerId: this.info.id! })
47352846 95 this.onBackPressureStarted = false
72695f86
JB
96 }
97 return tasksQueueSize
4b628b48
JB
98 }
99
100 /** @inheritdoc */
101 public dequeueTask (): Task<Data> | undefined {
463226a4 102 return this.tasksQueue.shift()
4b628b48
JB
103 }
104
72695f86
JB
105 /** @inheritdoc */
106 public popTask (): Task<Data> | undefined {
463226a4 107 return this.tasksQueue.pop()
72695f86
JB
108 }
109
4b628b48
JB
110 /** @inheritdoc */
111 public clearTasksQueue (): void {
112 this.tasksQueue.clear()
113 }
114
671d5154
JB
115 /** @inheritdoc */
116 public hasBackPressure (): boolean {
8735b4e5 117 return this.tasksQueue.size >= this.tasksQueueBackPressureSize
671d5154
JB
118 }
119
ff128cc9 120 /** @inheritdoc */
4b628b48
JB
121 public resetUsage (): void {
122 this.usage = this.initWorkerUsage()
db0e38ee 123 this.taskFunctionsUsage.clear()
ff128cc9
JB
124 }
125
3f09ed9f 126 /** @inheritdoc */
07e0c9e5
JB
127 public async terminate (): Promise<void> {
128 const waitWorkerExit = new Promise<void>(resolve => {
129 this.registerOnceWorkerEventHandler('exit', () => {
130 resolve()
131 })
132 })
133 this.closeMessageChannel()
134 this.removeAllListeners()
839b98b8
JB
135 switch (this.info.type) {
136 case WorkerTypes.thread:
137 await this.worker.terminate?.()
138 break
139 case WorkerTypes.cluster:
140 this.registerOnceWorkerEventHandler('disconnect', () => {
141 this.worker.kill?.()
142 })
143 this.worker.disconnect?.()
144 break
3f09ed9f 145 }
07e0c9e5 146 await waitWorkerExit
3f09ed9f
JB
147 }
148
c3719753
JB
149 /** @inheritdoc */
150 public registerWorkerEventHandler (
151 event: string,
3bcbd4c5 152 handler: EventHandler<Worker>
c3719753 153 ): void {
88af9bf1 154 this.worker.on(event, handler)
c3719753
JB
155 }
156
157 /** @inheritdoc */
158 public registerOnceWorkerEventHandler (
159 event: string,
3bcbd4c5 160 handler: EventHandler<Worker>
c3719753 161 ): void {
88af9bf1 162 this.worker.once(event, handler)
c3719753
JB
163 }
164
ff128cc9 165 /** @inheritdoc */
db0e38ee 166 public getTaskFunctionWorkerUsage (name: string): WorkerUsage | undefined {
6703b9f4 167 if (!Array.isArray(this.info.taskFunctionNames)) {
71b2b6d8 168 throw new Error(
db0e38ee 169 `Cannot get task function worker usage for task function name '${name}' when task function names list is not yet defined`
71b2b6d8
JB
170 )
171 }
b558f6b5 172 if (
6703b9f4
JB
173 Array.isArray(this.info.taskFunctionNames) &&
174 this.info.taskFunctionNames.length < 3
b558f6b5 175 ) {
db0e38ee
JB
176 throw new Error(
177 `Cannot get task function worker usage for task function name '${name}' when task function names list has less than 3 elements`
178 )
179 }
180 if (name === DEFAULT_TASK_NAME) {
6703b9f4 181 name = this.info.taskFunctionNames[1]
b558f6b5 182 }
db0e38ee
JB
183 if (!this.taskFunctionsUsage.has(name)) {
184 this.taskFunctionsUsage.set(name, this.initTaskFunctionWorkerUsage(name))
ff128cc9 185 }
db0e38ee 186 return this.taskFunctionsUsage.get(name)
4b628b48
JB
187 }
188
adee6053
JB
189 /** @inheritdoc */
190 public deleteTaskFunctionWorkerUsage (name: string): boolean {
191 return this.taskFunctionsUsage.delete(name)
192 }
193
07e0c9e5
JB
194 private closeMessageChannel (): void {
195 if (this.messageChannel != null) {
196 this.messageChannel.port1.unref()
197 this.messageChannel.port2.unref()
198 this.messageChannel.port1.close()
199 this.messageChannel.port2.close()
200 delete this.messageChannel
201 }
202 }
203
75de9f41 204 private initWorkerInfo (worker: Worker): WorkerInfo {
4b628b48 205 return {
75de9f41 206 id: getWorkerId(worker),
67f3f2d6
JB
207 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
208 type: getWorkerType(worker)!,
4b628b48 209 dynamic: false,
5eb72b9e
JB
210 ready: false,
211 stealing: false
4b628b48
JB
212 }
213 }
214
215 private initWorkerUsage (): WorkerUsage {
216 const getTasksQueueSize = (): number => {
dd951876 217 return this.tasksQueue.size
4b628b48 218 }
bf4ef2ca 219 const getTasksQueueMaxSize = (): number => {
dd951876 220 return this.tasksQueue.maxSize
4b628b48
JB
221 }
222 return {
223 tasks: {
224 executed: 0,
225 executing: 0,
226 get queued (): number {
227 return getTasksQueueSize()
228 },
229 get maxQueued (): number {
bf4ef2ca 230 return getTasksQueueMaxSize()
4b628b48 231 },
463226a4 232 sequentiallyStolen: 0,
68cbdc84 233 stolen: 0,
4b628b48
JB
234 failed: 0
235 },
236 runTime: {
c52475b8 237 history: new CircularArray<number>()
4b628b48
JB
238 },
239 waitTime: {
c52475b8 240 history: new CircularArray<number>()
4b628b48
JB
241 },
242 elu: {
243 idle: {
c52475b8 244 history: new CircularArray<number>()
4b628b48
JB
245 },
246 active: {
c52475b8 247 history: new CircularArray<number>()
4b628b48
JB
248 }
249 }
250 }
251 }
252
db0e38ee 253 private initTaskFunctionWorkerUsage (name: string): WorkerUsage {
e5ece61d
JB
254 const getTaskFunctionQueueSize = (): number => {
255 let taskFunctionQueueSize = 0
b25a42cd 256 for (const task of this.tasksQueue) {
dd92a715 257 if (
e5ece61d 258 (task.name === DEFAULT_TASK_NAME &&
67f3f2d6
JB
259 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
260 name === this.info.taskFunctionNames![1]) ||
e5ece61d 261 (task.name !== DEFAULT_TASK_NAME && name === task.name)
dd92a715 262 ) {
e5ece61d 263 ++taskFunctionQueueSize
b25a42cd
JB
264 }
265 }
e5ece61d 266 return taskFunctionQueueSize
b25a42cd
JB
267 }
268 return {
269 tasks: {
270 executed: 0,
271 executing: 0,
272 get queued (): number {
e5ece61d 273 return getTaskFunctionQueueSize()
b25a42cd 274 },
463226a4 275 sequentiallyStolen: 0,
68cbdc84 276 stolen: 0,
b25a42cd
JB
277 failed: 0
278 },
279 runTime: {
c52475b8 280 history: new CircularArray<number>()
b25a42cd
JB
281 },
282 waitTime: {
c52475b8 283 history: new CircularArray<number>()
b25a42cd
JB
284 },
285 elu: {
286 idle: {
c52475b8 287 history: new CircularArray<number>()
b25a42cd
JB
288 },
289 active: {
c52475b8 290 history: new CircularArray<number>()
b25a42cd
JB
291 }
292 }
293 }
294 }
4b628b48 295}