523036a5eac6f57db2966131652b29da6423fea1
[poolifier.git] / src / pools / worker-node.ts
1 import { MessageChannel } from 'node:worker_threads'
2 import { EventEmitter } from 'node:events'
3 import { CircularArray } from '../circular-array.js'
4 import type { Task } from '../utility-types.js'
5 import { DEFAULT_TASK_NAME } from '../utils.js'
6 import { Deque } from '../deque.js'
7 import {
8 type EventHandler,
9 type IWorker,
10 type IWorkerNode,
11 type StrategyData,
12 type WorkerInfo,
13 type WorkerNodeOptions,
14 type WorkerType,
15 WorkerTypes,
16 type WorkerUsage
17 } from './worker.js'
18 import {
19 checkWorkerNodeArguments,
20 createWorker,
21 getWorkerId,
22 getWorkerType
23 } from './utils.js'
24
25 /**
26 * Worker node.
27 *
28 * @typeParam Worker - Type of worker.
29 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
30 */
31 export class WorkerNode<Worker extends IWorker, Data = unknown>
32 extends EventEmitter
33 implements IWorkerNode<Worker, Data> {
34 /** @inheritdoc */
35 public readonly worker: Worker
36 /** @inheritdoc */
37 public readonly info: WorkerInfo
38 /** @inheritdoc */
39 public usage: WorkerUsage
40 /** @inheritdoc */
41 public strategyData?: StrategyData
42 /** @inheritdoc */
43 public messageChannel?: MessageChannel
44 /** @inheritdoc */
45 public tasksQueueBackPressureSize: number
46 private readonly tasksQueue: Deque<Task<Data>>
47 private onBackPressureStarted: boolean
48 private readonly taskFunctionsUsage: Map<string, WorkerUsage>
49
50 /**
51 * Constructs a new worker node.
52 *
53 * @param type - The worker type.
54 * @param filePath - Path to the worker file.
55 * @param opts - The worker node options.
56 */
57 constructor (type: WorkerType, filePath: string, opts: WorkerNodeOptions) {
58 super()
59 checkWorkerNodeArguments(type, filePath, opts)
60 this.worker = createWorker<Worker>(type, filePath, {
61 env: opts.env,
62 workerOptions: opts.workerOptions
63 })
64 this.info = this.initWorkerInfo(this.worker)
65 this.usage = this.initWorkerUsage()
66 if (this.info.type === WorkerTypes.thread) {
67 this.messageChannel = new MessageChannel()
68 }
69 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
70 this.tasksQueueBackPressureSize = opts.tasksQueueBackPressureSize!
71 this.tasksQueue = new Deque<Task<Data>>()
72 this.onBackPressureStarted = false
73 this.taskFunctionsUsage = new Map<string, WorkerUsage>()
74 }
75
76 /** @inheritdoc */
77 public tasksQueueSize (): number {
78 return this.tasksQueue.size
79 }
80
81 /** @inheritdoc */
82 public enqueueTask (task: Task<Data>): number {
83 const tasksQueueSize = this.tasksQueue.push(task)
84 if (this.hasBackPressure() && !this.onBackPressureStarted) {
85 this.onBackPressureStarted = true
86 this.emit('backPressure', { workerId: this.info.id })
87 this.onBackPressureStarted = false
88 }
89 return tasksQueueSize
90 }
91
92 /** @inheritdoc */
93 public unshiftTask (task: Task<Data>): number {
94 const tasksQueueSize = this.tasksQueue.unshift(task)
95 if (this.hasBackPressure() && !this.onBackPressureStarted) {
96 this.onBackPressureStarted = true
97 this.emit('backPressure', { workerId: this.info.id })
98 this.onBackPressureStarted = false
99 }
100 return tasksQueueSize
101 }
102
103 /** @inheritdoc */
104 public dequeueTask (): Task<Data> | undefined {
105 return this.tasksQueue.shift()
106 }
107
108 /** @inheritdoc */
109 public popTask (): Task<Data> | undefined {
110 return this.tasksQueue.pop()
111 }
112
113 /** @inheritdoc */
114 public clearTasksQueue (): void {
115 this.tasksQueue.clear()
116 }
117
118 /** @inheritdoc */
119 public hasBackPressure (): boolean {
120 return this.tasksQueue.size >= this.tasksQueueBackPressureSize
121 }
122
123 /** @inheritdoc */
124 public resetUsage (): void {
125 this.usage = this.initWorkerUsage()
126 this.taskFunctionsUsage.clear()
127 }
128
129 /** @inheritdoc */
130 public async terminate (): Promise<void> {
131 const waitWorkerExit = new Promise<void>(resolve => {
132 this.registerOnceWorkerEventHandler('exit', () => {
133 resolve()
134 })
135 })
136 this.closeMessageChannel()
137 this.removeAllListeners()
138 switch (this.info.type) {
139 case WorkerTypes.thread:
140 this.worker.unref?.()
141 await this.worker.terminate?.()
142 break
143 case WorkerTypes.cluster:
144 this.registerOnceWorkerEventHandler('disconnect', () => {
145 this.worker.kill?.()
146 })
147 this.worker.disconnect?.()
148 break
149 }
150 await waitWorkerExit
151 }
152
153 /** @inheritdoc */
154 public registerWorkerEventHandler (
155 event: string,
156 handler: EventHandler<Worker>
157 ): void {
158 this.worker.on(event, handler)
159 }
160
161 /** @inheritdoc */
162 public registerOnceWorkerEventHandler (
163 event: string,
164 handler: EventHandler<Worker>
165 ): void {
166 this.worker.once(event, handler)
167 }
168
169 /** @inheritdoc */
170 public getTaskFunctionWorkerUsage (name: string): WorkerUsage | undefined {
171 if (!Array.isArray(this.info.taskFunctionNames)) {
172 throw new Error(
173 `Cannot get task function worker usage for task function name '${name}' when task function names list is not yet defined`
174 )
175 }
176 if (
177 Array.isArray(this.info.taskFunctionNames) &&
178 this.info.taskFunctionNames.length < 3
179 ) {
180 throw new Error(
181 `Cannot get task function worker usage for task function name '${name}' when task function names list has less than 3 elements`
182 )
183 }
184 if (name === DEFAULT_TASK_NAME) {
185 name = this.info.taskFunctionNames[1]
186 }
187 if (!this.taskFunctionsUsage.has(name)) {
188 this.taskFunctionsUsage.set(name, this.initTaskFunctionWorkerUsage(name))
189 }
190 return this.taskFunctionsUsage.get(name)
191 }
192
193 /** @inheritdoc */
194 public deleteTaskFunctionWorkerUsage (name: string): boolean {
195 return this.taskFunctionsUsage.delete(name)
196 }
197
198 private closeMessageChannel (): void {
199 if (this.messageChannel != null) {
200 this.messageChannel.port1.unref()
201 this.messageChannel.port2.unref()
202 this.messageChannel.port1.close()
203 this.messageChannel.port2.close()
204 delete this.messageChannel
205 }
206 }
207
208 private initWorkerInfo (worker: Worker): WorkerInfo {
209 return {
210 id: getWorkerId(worker),
211 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
212 type: getWorkerType(worker)!,
213 dynamic: false,
214 ready: false,
215 stealing: false
216 }
217 }
218
219 private initWorkerUsage (): WorkerUsage {
220 const getTasksQueueSize = (): number => {
221 return this.tasksQueue.size
222 }
223 const getTasksQueueMaxSize = (): number => {
224 return this.tasksQueue.maxSize
225 }
226 return {
227 tasks: {
228 executed: 0,
229 executing: 0,
230 get queued (): number {
231 return getTasksQueueSize()
232 },
233 get maxQueued (): number {
234 return getTasksQueueMaxSize()
235 },
236 sequentiallyStolen: 0,
237 stolen: 0,
238 failed: 0
239 },
240 runTime: {
241 history: new CircularArray<number>()
242 },
243 waitTime: {
244 history: new CircularArray<number>()
245 },
246 elu: {
247 idle: {
248 history: new CircularArray<number>()
249 },
250 active: {
251 history: new CircularArray<number>()
252 }
253 }
254 }
255 }
256
257 private initTaskFunctionWorkerUsage (name: string): WorkerUsage {
258 const getTaskFunctionQueueSize = (): number => {
259 let taskFunctionQueueSize = 0
260 for (const task of this.tasksQueue) {
261 if (
262 (task.name === DEFAULT_TASK_NAME &&
263 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
264 name === this.info.taskFunctionNames![1]) ||
265 (task.name !== DEFAULT_TASK_NAME && name === task.name)
266 ) {
267 ++taskFunctionQueueSize
268 }
269 }
270 return taskFunctionQueueSize
271 }
272 return {
273 tasks: {
274 executed: 0,
275 executing: 0,
276 get queued (): number {
277 return getTaskFunctionQueueSize()
278 },
279 sequentiallyStolen: 0,
280 stolen: 0,
281 failed: 0
282 },
283 runTime: {
284 history: new CircularArray<number>()
285 },
286 waitTime: {
287 history: new CircularArray<number>()
288 },
289 elu: {
290 idle: {
291 history: new CircularArray<number>()
292 },
293 active: {
294 history: new CircularArray<number>()
295 }
296 }
297 }
298 }
299 }