refactor: add type alias for worker event handler
[poolifier.git] / src / pools / worker-node.ts
CommitLineData
85aeb3f3 1import { MessageChannel } from 'node:worker_threads'
e1c2dba7 2import { EventEmitter } from 'node:events'
d35e5717
JB
3import { CircularArray } from '../circular-array.js'
4import type { Task } from '../utility-types.js'
5import { DEFAULT_TASK_NAME, getWorkerId, getWorkerType } from '../utils.js'
6import { Deque } from '../deque.js'
4b628b48 7import {
3bcbd4c5 8 type EventHandler,
4b628b48
JB
9 type IWorker,
10 type IWorkerNode,
f3a91bac 11 type StrategyData,
4b628b48 12 type WorkerInfo,
c3719753 13 type WorkerNodeOptions,
4b628b48
JB
14 type WorkerType,
15 WorkerTypes,
16 type WorkerUsage
d35e5717
JB
17} from './worker.js'
18import { checkWorkerNodeArguments, createWorker } from './utils.js'
4b628b48 19
60664f48
JB
20/**
21 * Worker node.
22 *
23 * @typeParam Worker - Type of worker.
24 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
25 */
4b628b48 26export class WorkerNode<Worker extends IWorker, Data = unknown>
e1c2dba7 27 extends EventEmitter
9f95d5eb 28 implements IWorkerNode<Worker, Data> {
671d5154 29 /** @inheritdoc */
4b628b48 30 public readonly worker: Worker
671d5154 31 /** @inheritdoc */
4b628b48 32 public readonly info: WorkerInfo
671d5154 33 /** @inheritdoc */
4b628b48 34 public usage: WorkerUsage
20c6f652 35 /** @inheritdoc */
f3a91bac
JB
36 public strategyData?: StrategyData
37 /** @inheritdoc */
26fb3c18
JB
38 public messageChannel?: MessageChannel
39 /** @inheritdoc */
20c6f652 40 public tasksQueueBackPressureSize: number
574b351d 41 private readonly tasksQueue: Deque<Task<Data>>
47352846 42 private onBackPressureStarted: boolean
26fb3c18 43 private readonly taskFunctionsUsage: Map<string, WorkerUsage>
4b628b48 44
60664f48
JB
45 /**
46 * Constructs a new worker node.
47 *
c3719753 48 * @param type - The worker type.
9974369e 49 * @param filePath - Path to the worker file.
c3719753 50 * @param opts - The worker node options.
60664f48 51 */
c3719753 52 constructor (type: WorkerType, filePath: string, opts: WorkerNodeOptions) {
9f95d5eb 53 super()
c3719753
JB
54 checkWorkerNodeArguments(type, filePath, opts)
55 this.worker = createWorker<Worker>(type, filePath, {
56 env: opts.env,
57 workerOptions: opts.workerOptions
58 })
59 this.info = this.initWorkerInfo(this.worker)
26fb3c18 60 this.usage = this.initWorkerUsage()
75de9f41 61 if (this.info.type === WorkerTypes.thread) {
7884d183
JB
62 this.messageChannel = new MessageChannel()
63 }
c3719753 64 this.tasksQueueBackPressureSize = opts.tasksQueueBackPressureSize
26fb3c18 65 this.tasksQueue = new Deque<Task<Data>>()
47352846 66 this.onBackPressureStarted = false
26fb3c18 67 this.taskFunctionsUsage = new Map<string, WorkerUsage>()
4b628b48
JB
68 }
69
70 /** @inheritdoc */
71 public tasksQueueSize (): number {
72 return this.tasksQueue.size
73 }
74
4b628b48
JB
75 /** @inheritdoc */
76 public enqueueTask (task: Task<Data>): number {
72695f86 77 const tasksQueueSize = this.tasksQueue.push(task)
9f95d5eb 78 if (this.hasBackPressure() && !this.onBackPressureStarted) {
47352846 79 this.onBackPressureStarted = true
67f3f2d6
JB
80 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
81 this.emit('backPressure', { workerId: this.info.id! })
47352846 82 this.onBackPressureStarted = false
72695f86
JB
83 }
84 return tasksQueueSize
85 }
86
87 /** @inheritdoc */
88 public unshiftTask (task: Task<Data>): number {
89 const tasksQueueSize = this.tasksQueue.unshift(task)
9f95d5eb 90 if (this.hasBackPressure() && !this.onBackPressureStarted) {
47352846 91 this.onBackPressureStarted = true
67f3f2d6
JB
92 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
93 this.emit('backPressure', { workerId: this.info.id! })
47352846 94 this.onBackPressureStarted = false
72695f86
JB
95 }
96 return tasksQueueSize
4b628b48
JB
97 }
98
99 /** @inheritdoc */
100 public dequeueTask (): Task<Data> | undefined {
463226a4 101 return this.tasksQueue.shift()
4b628b48
JB
102 }
103
72695f86
JB
104 /** @inheritdoc */
105 public popTask (): Task<Data> | undefined {
463226a4 106 return this.tasksQueue.pop()
72695f86
JB
107 }
108
4b628b48
JB
109 /** @inheritdoc */
110 public clearTasksQueue (): void {
111 this.tasksQueue.clear()
112 }
113
671d5154
JB
114 /** @inheritdoc */
115 public hasBackPressure (): boolean {
8735b4e5 116 return this.tasksQueue.size >= this.tasksQueueBackPressureSize
671d5154
JB
117 }
118
ff128cc9 119 /** @inheritdoc */
4b628b48
JB
120 public resetUsage (): void {
121 this.usage = this.initWorkerUsage()
db0e38ee 122 this.taskFunctionsUsage.clear()
ff128cc9
JB
123 }
124
3f09ed9f 125 /** @inheritdoc */
07e0c9e5
JB
126 public async terminate (): Promise<void> {
127 const waitWorkerExit = new Promise<void>(resolve => {
128 this.registerOnceWorkerEventHandler('exit', () => {
129 resolve()
130 })
131 })
132 this.closeMessageChannel()
133 this.removeAllListeners()
839b98b8
JB
134 switch (this.info.type) {
135 case WorkerTypes.thread:
136 await this.worker.terminate?.()
137 break
138 case WorkerTypes.cluster:
139 this.registerOnceWorkerEventHandler('disconnect', () => {
140 this.worker.kill?.()
141 })
142 this.worker.disconnect?.()
143 break
3f09ed9f 144 }
07e0c9e5 145 await waitWorkerExit
3f09ed9f
JB
146 }
147
c3719753
JB
148 /** @inheritdoc */
149 public registerWorkerEventHandler (
150 event: string,
3bcbd4c5 151 handler: EventHandler<Worker>
c3719753 152 ): void {
88af9bf1 153 this.worker.on(event, handler)
c3719753
JB
154 }
155
156 /** @inheritdoc */
157 public registerOnceWorkerEventHandler (
158 event: string,
3bcbd4c5 159 handler: EventHandler<Worker>
c3719753 160 ): void {
88af9bf1 161 this.worker.once(event, handler)
c3719753
JB
162 }
163
ff128cc9 164 /** @inheritdoc */
db0e38ee 165 public getTaskFunctionWorkerUsage (name: string): WorkerUsage | undefined {
6703b9f4 166 if (!Array.isArray(this.info.taskFunctionNames)) {
71b2b6d8 167 throw new Error(
db0e38ee 168 `Cannot get task function worker usage for task function name '${name}' when task function names list is not yet defined`
71b2b6d8
JB
169 )
170 }
b558f6b5 171 if (
6703b9f4
JB
172 Array.isArray(this.info.taskFunctionNames) &&
173 this.info.taskFunctionNames.length < 3
b558f6b5 174 ) {
db0e38ee
JB
175 throw new Error(
176 `Cannot get task function worker usage for task function name '${name}' when task function names list has less than 3 elements`
177 )
178 }
179 if (name === DEFAULT_TASK_NAME) {
6703b9f4 180 name = this.info.taskFunctionNames[1]
b558f6b5 181 }
db0e38ee
JB
182 if (!this.taskFunctionsUsage.has(name)) {
183 this.taskFunctionsUsage.set(name, this.initTaskFunctionWorkerUsage(name))
ff128cc9 184 }
db0e38ee 185 return this.taskFunctionsUsage.get(name)
4b628b48
JB
186 }
187
adee6053
JB
188 /** @inheritdoc */
189 public deleteTaskFunctionWorkerUsage (name: string): boolean {
190 return this.taskFunctionsUsage.delete(name)
191 }
192
07e0c9e5
JB
193 private closeMessageChannel (): void {
194 if (this.messageChannel != null) {
195 this.messageChannel.port1.unref()
196 this.messageChannel.port2.unref()
197 this.messageChannel.port1.close()
198 this.messageChannel.port2.close()
199 delete this.messageChannel
200 }
201 }
202
75de9f41 203 private initWorkerInfo (worker: Worker): WorkerInfo {
4b628b48 204 return {
75de9f41 205 id: getWorkerId(worker),
67f3f2d6
JB
206 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
207 type: getWorkerType(worker)!,
4b628b48 208 dynamic: false,
5eb72b9e
JB
209 ready: false,
210 stealing: false
4b628b48
JB
211 }
212 }
213
214 private initWorkerUsage (): WorkerUsage {
215 const getTasksQueueSize = (): number => {
dd951876 216 return this.tasksQueue.size
4b628b48 217 }
bf4ef2ca 218 const getTasksQueueMaxSize = (): number => {
dd951876 219 return this.tasksQueue.maxSize
4b628b48
JB
220 }
221 return {
222 tasks: {
223 executed: 0,
224 executing: 0,
225 get queued (): number {
226 return getTasksQueueSize()
227 },
228 get maxQueued (): number {
bf4ef2ca 229 return getTasksQueueMaxSize()
4b628b48 230 },
463226a4 231 sequentiallyStolen: 0,
68cbdc84 232 stolen: 0,
4b628b48
JB
233 failed: 0
234 },
235 runTime: {
c52475b8 236 history: new CircularArray<number>()
4b628b48
JB
237 },
238 waitTime: {
c52475b8 239 history: new CircularArray<number>()
4b628b48
JB
240 },
241 elu: {
242 idle: {
c52475b8 243 history: new CircularArray<number>()
4b628b48
JB
244 },
245 active: {
c52475b8 246 history: new CircularArray<number>()
4b628b48
JB
247 }
248 }
249 }
250 }
251
db0e38ee 252 private initTaskFunctionWorkerUsage (name: string): WorkerUsage {
e5ece61d
JB
253 const getTaskFunctionQueueSize = (): number => {
254 let taskFunctionQueueSize = 0
b25a42cd 255 for (const task of this.tasksQueue) {
dd92a715 256 if (
e5ece61d 257 (task.name === DEFAULT_TASK_NAME &&
67f3f2d6
JB
258 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
259 name === this.info.taskFunctionNames![1]) ||
e5ece61d 260 (task.name !== DEFAULT_TASK_NAME && name === task.name)
dd92a715 261 ) {
e5ece61d 262 ++taskFunctionQueueSize
b25a42cd
JB
263 }
264 }
e5ece61d 265 return taskFunctionQueueSize
b25a42cd
JB
266 }
267 return {
268 tasks: {
269 executed: 0,
270 executing: 0,
271 get queued (): number {
e5ece61d 272 return getTaskFunctionQueueSize()
b25a42cd 273 },
463226a4 274 sequentiallyStolen: 0,
68cbdc84 275 stolen: 0,
b25a42cd
JB
276 failed: 0
277 },
278 runTime: {
c52475b8 279 history: new CircularArray<number>()
b25a42cd
JB
280 },
281 waitTime: {
c52475b8 282 history: new CircularArray<number>()
b25a42cd
JB
283 },
284 elu: {
285 idle: {
c52475b8 286 history: new CircularArray<number>()
b25a42cd
JB
287 },
288 active: {
c52475b8 289 history: new CircularArray<number>()
b25a42cd
JB
290 }
291 }
292 }
293 }
4b628b48 294}