308215176edb515851c2349ba550d3cc50e2611b
[poolifier.git] / src / pools / worker-node.ts
1 import { MessageChannel } from 'node:worker_threads'
2 import { EventEmitter } from 'node:events'
3 import { CircularArray } from '../circular-array.js'
4 import type { Task } from '../utility-types.js'
5 import { DEFAULT_TASK_NAME } from '../utils.js'
6 import { Deque } from '../deque.js'
7 import {
8 type EventHandler,
9 type IWorker,
10 type IWorkerNode,
11 type StrategyData,
12 type WorkerInfo,
13 type WorkerNodeOptions,
14 type WorkerType,
15 WorkerTypes,
16 type WorkerUsage
17 } from './worker.js'
18 import {
19 checkWorkerNodeArguments,
20 createWorker,
21 getWorkerId,
22 getWorkerType
23 } from './utils.js'
24
25 /**
26 * Worker node.
27 *
28 * @typeParam Worker - Type of worker.
29 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
30 */
31 export class WorkerNode<Worker extends IWorker, Data = unknown>
32 extends EventEmitter
33 implements IWorkerNode<Worker, Data> {
34 /** @inheritdoc */
35 public readonly worker: Worker
36 /** @inheritdoc */
37 public readonly info: WorkerInfo
38 /** @inheritdoc */
39 public usage: WorkerUsage
40 /** @inheritdoc */
41 public strategyData?: StrategyData
42 /** @inheritdoc */
43 public messageChannel?: MessageChannel
44 /** @inheritdoc */
45 public tasksQueueBackPressureSize: number
46 private readonly tasksQueue: Deque<Task<Data>>
47 private onBackPressureStarted: boolean
48 private readonly taskFunctionsUsage: Map<string, WorkerUsage>
49
50 /**
51 * Constructs a new worker node.
52 *
53 * @param type - The worker type.
54 * @param filePath - Path to the worker file.
55 * @param opts - The worker node options.
56 */
57 constructor (type: WorkerType, filePath: string, opts: WorkerNodeOptions) {
58 super()
59 checkWorkerNodeArguments(type, filePath, opts)
60 this.worker = createWorker<Worker>(type, filePath, {
61 env: opts.env,
62 workerOptions: opts.workerOptions
63 })
64 this.info = this.initWorkerInfo(this.worker)
65 this.usage = this.initWorkerUsage()
66 if (this.info.type === WorkerTypes.thread) {
67 this.messageChannel = new MessageChannel()
68 }
69 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
70 this.tasksQueueBackPressureSize = opts.tasksQueueBackPressureSize!
71 this.tasksQueue = new Deque<Task<Data>>()
72 this.onBackPressureStarted = false
73 this.taskFunctionsUsage = new Map<string, WorkerUsage>()
74 }
75
76 /** @inheritdoc */
77 public tasksQueueSize (): number {
78 return this.tasksQueue.size
79 }
80
81 /** @inheritdoc */
82 public enqueueTask (task: Task<Data>): number {
83 const tasksQueueSize = this.tasksQueue.push(task)
84 if (this.hasBackPressure() && !this.onBackPressureStarted) {
85 this.onBackPressureStarted = true
86 this.emit('backPressure', { workerId: this.info.id })
87 this.onBackPressureStarted = false
88 }
89 return tasksQueueSize
90 }
91
92 /** @inheritdoc */
93 public unshiftTask (task: Task<Data>): number {
94 const tasksQueueSize = this.tasksQueue.unshift(task)
95 if (this.hasBackPressure() && !this.onBackPressureStarted) {
96 this.onBackPressureStarted = true
97 this.emit('backPressure', { workerId: this.info.id })
98 this.onBackPressureStarted = false
99 }
100 return tasksQueueSize
101 }
102
103 /** @inheritdoc */
104 public dequeueTask (): Task<Data> | undefined {
105 return this.tasksQueue.shift()
106 }
107
108 /** @inheritdoc */
109 public popTask (): Task<Data> | undefined {
110 return this.tasksQueue.pop()
111 }
112
113 /** @inheritdoc */
114 public clearTasksQueue (): void {
115 this.tasksQueue.clear()
116 }
117
118 /** @inheritdoc */
119 public hasBackPressure (): boolean {
120 return this.tasksQueue.size >= this.tasksQueueBackPressureSize
121 }
122
123 /** @inheritdoc */
124 public resetUsage (): void {
125 this.usage = this.initWorkerUsage()
126 this.taskFunctionsUsage.clear()
127 }
128
129 /** @inheritdoc */
130 public async terminate (): Promise<void> {
131 const waitWorkerExit = new Promise<void>(resolve => {
132 this.registerOnceWorkerEventHandler('exit', () => {
133 resolve()
134 })
135 })
136 this.closeMessageChannel()
137 this.removeAllListeners()
138 switch (this.info.type) {
139 case WorkerTypes.thread:
140 await this.worker.terminate?.()
141 break
142 case WorkerTypes.cluster:
143 this.registerOnceWorkerEventHandler('disconnect', () => {
144 this.worker.kill?.()
145 })
146 this.worker.disconnect?.()
147 break
148 }
149 await waitWorkerExit
150 }
151
152 /** @inheritdoc */
153 public registerWorkerEventHandler (
154 event: string,
155 handler: EventHandler<Worker>
156 ): void {
157 this.worker.on(event, handler)
158 }
159
160 /** @inheritdoc */
161 public registerOnceWorkerEventHandler (
162 event: string,
163 handler: EventHandler<Worker>
164 ): void {
165 this.worker.once(event, handler)
166 }
167
168 /** @inheritdoc */
169 public getTaskFunctionWorkerUsage (name: string): WorkerUsage | undefined {
170 if (!Array.isArray(this.info.taskFunctionNames)) {
171 throw new Error(
172 `Cannot get task function worker usage for task function name '${name}' when task function names list is not yet defined`
173 )
174 }
175 if (
176 Array.isArray(this.info.taskFunctionNames) &&
177 this.info.taskFunctionNames.length < 3
178 ) {
179 throw new Error(
180 `Cannot get task function worker usage for task function name '${name}' when task function names list has less than 3 elements`
181 )
182 }
183 if (name === DEFAULT_TASK_NAME) {
184 name = this.info.taskFunctionNames[1]
185 }
186 if (!this.taskFunctionsUsage.has(name)) {
187 this.taskFunctionsUsage.set(name, this.initTaskFunctionWorkerUsage(name))
188 }
189 return this.taskFunctionsUsage.get(name)
190 }
191
192 /** @inheritdoc */
193 public deleteTaskFunctionWorkerUsage (name: string): boolean {
194 return this.taskFunctionsUsage.delete(name)
195 }
196
197 private closeMessageChannel (): void {
198 if (this.messageChannel != null) {
199 this.messageChannel.port1.unref()
200 this.messageChannel.port2.unref()
201 this.messageChannel.port1.close()
202 this.messageChannel.port2.close()
203 delete this.messageChannel
204 }
205 }
206
207 private initWorkerInfo (worker: Worker): WorkerInfo {
208 return {
209 id: getWorkerId(worker),
210 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
211 type: getWorkerType(worker)!,
212 dynamic: false,
213 ready: false,
214 stealing: false
215 }
216 }
217
218 private initWorkerUsage (): WorkerUsage {
219 const getTasksQueueSize = (): number => {
220 return this.tasksQueue.size
221 }
222 const getTasksQueueMaxSize = (): number => {
223 return this.tasksQueue.maxSize
224 }
225 return {
226 tasks: {
227 executed: 0,
228 executing: 0,
229 get queued (): number {
230 return getTasksQueueSize()
231 },
232 get maxQueued (): number {
233 return getTasksQueueMaxSize()
234 },
235 sequentiallyStolen: 0,
236 stolen: 0,
237 failed: 0
238 },
239 runTime: {
240 history: new CircularArray<number>()
241 },
242 waitTime: {
243 history: new CircularArray<number>()
244 },
245 elu: {
246 idle: {
247 history: new CircularArray<number>()
248 },
249 active: {
250 history: new CircularArray<number>()
251 }
252 }
253 }
254 }
255
256 private initTaskFunctionWorkerUsage (name: string): WorkerUsage {
257 const getTaskFunctionQueueSize = (): number => {
258 let taskFunctionQueueSize = 0
259 for (const task of this.tasksQueue) {
260 if (
261 (task.name === DEFAULT_TASK_NAME &&
262 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
263 name === this.info.taskFunctionNames![1]) ||
264 (task.name !== DEFAULT_TASK_NAME && name === task.name)
265 ) {
266 ++taskFunctionQueueSize
267 }
268 }
269 return taskFunctionQueueSize
270 }
271 return {
272 tasks: {
273 executed: 0,
274 executing: 0,
275 get queued (): number {
276 return getTaskFunctionQueueSize()
277 },
278 sequentiallyStolen: 0,
279 stolen: 0,
280 failed: 0
281 },
282 runTime: {
283 history: new CircularArray<number>()
284 },
285 waitTime: {
286 history: new CircularArray<number>()
287 },
288 elu: {
289 idle: {
290 history: new CircularArray<number>()
291 },
292 active: {
293 history: new CircularArray<number>()
294 }
295 }
296 }
297 }
298 }