refactor: untangle utils purpose
[poolifier.git] / src / pools / worker-node.ts
CommitLineData
85aeb3f3 1import { MessageChannel } from 'node:worker_threads'
e1c2dba7 2import { EventEmitter } from 'node:events'
d35e5717
JB
3import { CircularArray } from '../circular-array.js'
4import type { Task } from '../utility-types.js'
e9ed6eee 5import { DEFAULT_TASK_NAME } from '../utils.js'
d35e5717 6import { Deque } from '../deque.js'
4b628b48 7import {
3bcbd4c5 8 type EventHandler,
4b628b48
JB
9 type IWorker,
10 type IWorkerNode,
f3a91bac 11 type StrategyData,
4b628b48 12 type WorkerInfo,
c3719753 13 type WorkerNodeOptions,
4b628b48
JB
14 type WorkerType,
15 WorkerTypes,
16 type WorkerUsage
d35e5717 17} from './worker.js'
e9ed6eee
JB
18import {
19 checkWorkerNodeArguments,
20 createWorker,
21 getWorkerId,
22 getWorkerType
23} from './utils.js'
4b628b48 24
60664f48
JB
25/**
26 * Worker node.
27 *
28 * @typeParam Worker - Type of worker.
29 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
30 */
4b628b48 31export class WorkerNode<Worker extends IWorker, Data = unknown>
e1c2dba7 32 extends EventEmitter
9f95d5eb 33 implements IWorkerNode<Worker, Data> {
671d5154 34 /** @inheritdoc */
4b628b48 35 public readonly worker: Worker
671d5154 36 /** @inheritdoc */
4b628b48 37 public readonly info: WorkerInfo
671d5154 38 /** @inheritdoc */
4b628b48 39 public usage: WorkerUsage
20c6f652 40 /** @inheritdoc */
f3a91bac
JB
41 public strategyData?: StrategyData
42 /** @inheritdoc */
26fb3c18
JB
43 public messageChannel?: MessageChannel
44 /** @inheritdoc */
20c6f652 45 public tasksQueueBackPressureSize: number
574b351d 46 private readonly tasksQueue: Deque<Task<Data>>
47352846 47 private onBackPressureStarted: boolean
26fb3c18 48 private readonly taskFunctionsUsage: Map<string, WorkerUsage>
4b628b48 49
60664f48
JB
50 /**
51 * Constructs a new worker node.
52 *
c3719753 53 * @param type - The worker type.
9974369e 54 * @param filePath - Path to the worker file.
c3719753 55 * @param opts - The worker node options.
60664f48 56 */
c3719753 57 constructor (type: WorkerType, filePath: string, opts: WorkerNodeOptions) {
9f95d5eb 58 super()
c3719753
JB
59 checkWorkerNodeArguments(type, filePath, opts)
60 this.worker = createWorker<Worker>(type, filePath, {
61 env: opts.env,
62 workerOptions: opts.workerOptions
63 })
64 this.info = this.initWorkerInfo(this.worker)
26fb3c18 65 this.usage = this.initWorkerUsage()
75de9f41 66 if (this.info.type === WorkerTypes.thread) {
7884d183
JB
67 this.messageChannel = new MessageChannel()
68 }
c63a35a0
JB
69 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
70 this.tasksQueueBackPressureSize = opts.tasksQueueBackPressureSize!
26fb3c18 71 this.tasksQueue = new Deque<Task<Data>>()
47352846 72 this.onBackPressureStarted = false
26fb3c18 73 this.taskFunctionsUsage = new Map<string, WorkerUsage>()
4b628b48
JB
74 }
75
76 /** @inheritdoc */
77 public tasksQueueSize (): number {
78 return this.tasksQueue.size
79 }
80
4b628b48
JB
81 /** @inheritdoc */
82 public enqueueTask (task: Task<Data>): number {
72695f86 83 const tasksQueueSize = this.tasksQueue.push(task)
9f95d5eb 84 if (this.hasBackPressure() && !this.onBackPressureStarted) {
47352846 85 this.onBackPressureStarted = true
7f0e1334 86 this.emit('backPressure', { workerId: this.info.id })
47352846 87 this.onBackPressureStarted = false
72695f86
JB
88 }
89 return tasksQueueSize
90 }
91
92 /** @inheritdoc */
93 public unshiftTask (task: Task<Data>): number {
94 const tasksQueueSize = this.tasksQueue.unshift(task)
9f95d5eb 95 if (this.hasBackPressure() && !this.onBackPressureStarted) {
47352846 96 this.onBackPressureStarted = true
7f0e1334 97 this.emit('backPressure', { workerId: this.info.id })
47352846 98 this.onBackPressureStarted = false
72695f86
JB
99 }
100 return tasksQueueSize
4b628b48
JB
101 }
102
103 /** @inheritdoc */
104 public dequeueTask (): Task<Data> | undefined {
463226a4 105 return this.tasksQueue.shift()
4b628b48
JB
106 }
107
72695f86
JB
108 /** @inheritdoc */
109 public popTask (): Task<Data> | undefined {
463226a4 110 return this.tasksQueue.pop()
72695f86
JB
111 }
112
4b628b48
JB
113 /** @inheritdoc */
114 public clearTasksQueue (): void {
115 this.tasksQueue.clear()
116 }
117
671d5154
JB
118 /** @inheritdoc */
119 public hasBackPressure (): boolean {
8735b4e5 120 return this.tasksQueue.size >= this.tasksQueueBackPressureSize
671d5154
JB
121 }
122
ff128cc9 123 /** @inheritdoc */
4b628b48
JB
124 public resetUsage (): void {
125 this.usage = this.initWorkerUsage()
db0e38ee 126 this.taskFunctionsUsage.clear()
ff128cc9
JB
127 }
128
3f09ed9f 129 /** @inheritdoc */
07e0c9e5
JB
130 public async terminate (): Promise<void> {
131 const waitWorkerExit = new Promise<void>(resolve => {
132 this.registerOnceWorkerEventHandler('exit', () => {
133 resolve()
134 })
135 })
136 this.closeMessageChannel()
137 this.removeAllListeners()
839b98b8
JB
138 switch (this.info.type) {
139 case WorkerTypes.thread:
140 await this.worker.terminate?.()
141 break
142 case WorkerTypes.cluster:
143 this.registerOnceWorkerEventHandler('disconnect', () => {
144 this.worker.kill?.()
145 })
146 this.worker.disconnect?.()
147 break
3f09ed9f 148 }
07e0c9e5 149 await waitWorkerExit
3f09ed9f
JB
150 }
151
c3719753
JB
152 /** @inheritdoc */
153 public registerWorkerEventHandler (
154 event: string,
3bcbd4c5 155 handler: EventHandler<Worker>
c3719753 156 ): void {
88af9bf1 157 this.worker.on(event, handler)
c3719753
JB
158 }
159
160 /** @inheritdoc */
161 public registerOnceWorkerEventHandler (
162 event: string,
3bcbd4c5 163 handler: EventHandler<Worker>
c3719753 164 ): void {
88af9bf1 165 this.worker.once(event, handler)
c3719753
JB
166 }
167
ff128cc9 168 /** @inheritdoc */
db0e38ee 169 public getTaskFunctionWorkerUsage (name: string): WorkerUsage | undefined {
6703b9f4 170 if (!Array.isArray(this.info.taskFunctionNames)) {
71b2b6d8 171 throw new Error(
db0e38ee 172 `Cannot get task function worker usage for task function name '${name}' when task function names list is not yet defined`
71b2b6d8
JB
173 )
174 }
b558f6b5 175 if (
6703b9f4
JB
176 Array.isArray(this.info.taskFunctionNames) &&
177 this.info.taskFunctionNames.length < 3
b558f6b5 178 ) {
db0e38ee
JB
179 throw new Error(
180 `Cannot get task function worker usage for task function name '${name}' when task function names list has less than 3 elements`
181 )
182 }
183 if (name === DEFAULT_TASK_NAME) {
6703b9f4 184 name = this.info.taskFunctionNames[1]
b558f6b5 185 }
db0e38ee
JB
186 if (!this.taskFunctionsUsage.has(name)) {
187 this.taskFunctionsUsage.set(name, this.initTaskFunctionWorkerUsage(name))
ff128cc9 188 }
db0e38ee 189 return this.taskFunctionsUsage.get(name)
4b628b48
JB
190 }
191
adee6053
JB
192 /** @inheritdoc */
193 public deleteTaskFunctionWorkerUsage (name: string): boolean {
194 return this.taskFunctionsUsage.delete(name)
195 }
196
07e0c9e5
JB
197 private closeMessageChannel (): void {
198 if (this.messageChannel != null) {
199 this.messageChannel.port1.unref()
200 this.messageChannel.port2.unref()
201 this.messageChannel.port1.close()
202 this.messageChannel.port2.close()
203 delete this.messageChannel
204 }
205 }
206
75de9f41 207 private initWorkerInfo (worker: Worker): WorkerInfo {
4b628b48 208 return {
75de9f41 209 id: getWorkerId(worker),
67f3f2d6
JB
210 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
211 type: getWorkerType(worker)!,
4b628b48 212 dynamic: false,
5eb72b9e
JB
213 ready: false,
214 stealing: false
4b628b48
JB
215 }
216 }
217
218 private initWorkerUsage (): WorkerUsage {
219 const getTasksQueueSize = (): number => {
dd951876 220 return this.tasksQueue.size
4b628b48 221 }
bf4ef2ca 222 const getTasksQueueMaxSize = (): number => {
dd951876 223 return this.tasksQueue.maxSize
4b628b48
JB
224 }
225 return {
226 tasks: {
227 executed: 0,
228 executing: 0,
229 get queued (): number {
230 return getTasksQueueSize()
231 },
232 get maxQueued (): number {
bf4ef2ca 233 return getTasksQueueMaxSize()
4b628b48 234 },
463226a4 235 sequentiallyStolen: 0,
68cbdc84 236 stolen: 0,
4b628b48
JB
237 failed: 0
238 },
239 runTime: {
c52475b8 240 history: new CircularArray<number>()
4b628b48
JB
241 },
242 waitTime: {
c52475b8 243 history: new CircularArray<number>()
4b628b48
JB
244 },
245 elu: {
246 idle: {
c52475b8 247 history: new CircularArray<number>()
4b628b48
JB
248 },
249 active: {
c52475b8 250 history: new CircularArray<number>()
4b628b48
JB
251 }
252 }
253 }
254 }
255
db0e38ee 256 private initTaskFunctionWorkerUsage (name: string): WorkerUsage {
e5ece61d
JB
257 const getTaskFunctionQueueSize = (): number => {
258 let taskFunctionQueueSize = 0
b25a42cd 259 for (const task of this.tasksQueue) {
dd92a715 260 if (
e5ece61d 261 (task.name === DEFAULT_TASK_NAME &&
67f3f2d6
JB
262 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
263 name === this.info.taskFunctionNames![1]) ||
e5ece61d 264 (task.name !== DEFAULT_TASK_NAME && name === task.name)
dd92a715 265 ) {
e5ece61d 266 ++taskFunctionQueueSize
b25a42cd
JB
267 }
268 }
e5ece61d 269 return taskFunctionQueueSize
b25a42cd
JB
270 }
271 return {
272 tasks: {
273 executed: 0,
274 executing: 0,
275 get queued (): number {
e5ece61d 276 return getTaskFunctionQueueSize()
b25a42cd 277 },
463226a4 278 sequentiallyStolen: 0,
68cbdc84 279 stolen: 0,
b25a42cd
JB
280 failed: 0
281 },
282 runTime: {
c52475b8 283 history: new CircularArray<number>()
b25a42cd
JB
284 },
285 waitTime: {
c52475b8 286 history: new CircularArray<number>()
b25a42cd
JB
287 },
288 elu: {
289 idle: {
c52475b8 290 history: new CircularArray<number>()
b25a42cd
JB
291 },
292 active: {
c52475b8 293 history: new CircularArray<number>()
b25a42cd
JB
294 }
295 }
296 }
297 }
4b628b48 298}