d87a79c099c5dab34126e284ca2f14f4be72b3a1
[poolifier.git] / src / pools / worker-node.ts
1 import { EventEmitter } from 'node:events'
2 import { MessageChannel } from 'node:worker_threads'
3
4 import { CircularBuffer } from '../circular-buffer.js'
5 import { PriorityQueue } from '../priority-queue.js'
6 import type { Task } from '../utility-types.js'
7 import { DEFAULT_TASK_NAME } from '../utils.js'
8 import {
9 checkWorkerNodeArguments,
10 createWorker,
11 getWorkerId,
12 getWorkerType
13 } from './utils.js'
14 import {
15 type EventHandler,
16 type IWorker,
17 type IWorkerNode,
18 MeasurementHistorySize,
19 type StrategyData,
20 type WorkerInfo,
21 type WorkerNodeOptions,
22 type WorkerType,
23 WorkerTypes,
24 type WorkerUsage
25 } from './worker.js'
26
27 /**
28 * Worker node.
29 *
30 * @typeParam Worker - Type of worker.
31 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
32 */
33 export class WorkerNode<Worker extends IWorker, Data = unknown>
34 extends EventEmitter
35 implements IWorkerNode<Worker, Data> {
36 /** @inheritdoc */
37 public readonly worker: Worker
38 /** @inheritdoc */
39 public readonly info: WorkerInfo
40 /** @inheritdoc */
41 public usage: WorkerUsage
42 /** @inheritdoc */
43 public strategyData?: StrategyData
44 /** @inheritdoc */
45 public messageChannel?: MessageChannel
46 /** @inheritdoc */
47 public tasksQueueBackPressureSize: number
48 private readonly tasksQueue: PriorityQueue<Task<Data>>
49 private setBackPressureFlag: boolean
50 private readonly taskFunctionsUsage: Map<string, WorkerUsage>
51
52 /**
53 * Constructs a new worker node.
54 *
55 * @param type - The worker type.
56 * @param filePath - Path to the worker file.
57 * @param opts - The worker node options.
58 */
59 constructor (type: WorkerType, filePath: string, opts: WorkerNodeOptions) {
60 super()
61 checkWorkerNodeArguments(type, filePath, opts)
62 this.worker = createWorker<Worker>(type, filePath, {
63 env: opts.env,
64 workerOptions: opts.workerOptions
65 })
66 this.info = this.initWorkerInfo(this.worker)
67 this.usage = this.initWorkerUsage()
68 if (this.info.type === WorkerTypes.thread) {
69 this.messageChannel = new MessageChannel()
70 }
71 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
72 this.tasksQueueBackPressureSize = opts.tasksQueueBackPressureSize!
73 this.tasksQueue = new PriorityQueue<Task<Data>>(opts.tasksQueueBucketSize)
74 this.setBackPressureFlag = false
75 this.taskFunctionsUsage = new Map<string, WorkerUsage>()
76 }
77
78 /** @inheritdoc */
79 public tasksQueueSize (): number {
80 return this.tasksQueue.size
81 }
82
83 /** @inheritdoc */
84 public enqueueTask (task: Task<Data>): number {
85 const tasksQueueSize = this.tasksQueue.enqueue(task, task.priority)
86 if (
87 !this.setBackPressureFlag &&
88 this.hasBackPressure() &&
89 !this.info.backPressure
90 ) {
91 this.setBackPressureFlag = true
92 this.info.backPressure = true
93 this.emit('backPressure', { workerId: this.info.id })
94 this.setBackPressureFlag = false
95 }
96 return tasksQueueSize
97 }
98
99 /** @inheritdoc */
100 public dequeueTask (bucket?: number): Task<Data> | undefined {
101 const task = this.tasksQueue.dequeue(bucket)
102 if (
103 !this.setBackPressureFlag &&
104 !this.hasBackPressure() &&
105 this.info.backPressure
106 ) {
107 this.setBackPressureFlag = true
108 this.info.backPressure = false
109 this.setBackPressureFlag = false
110 }
111 return task
112 }
113
114 /** @inheritdoc */
115 public dequeueLastPrioritizedTask (): Task<Data> | undefined {
116 // Start from the last empty or partially filled bucket
117 return this.dequeueTask(this.tasksQueue.buckets + 1)
118 }
119
120 /** @inheritdoc */
121 public clearTasksQueue (): void {
122 this.tasksQueue.clear()
123 }
124
125 /** @inheritdoc */
126 public hasBackPressure (): boolean {
127 return this.tasksQueue.size >= this.tasksQueueBackPressureSize
128 }
129
130 /** @inheritdoc */
131 public async terminate (): Promise<void> {
132 const waitWorkerExit = new Promise<void>(resolve => {
133 this.registerOnceWorkerEventHandler('exit', () => {
134 resolve()
135 })
136 })
137 this.closeMessageChannel()
138 this.removeAllListeners()
139 switch (this.info.type) {
140 case WorkerTypes.thread:
141 this.worker.unref?.()
142 await this.worker.terminate?.()
143 break
144 case WorkerTypes.cluster:
145 this.registerOnceWorkerEventHandler('disconnect', () => {
146 this.worker.kill?.()
147 })
148 this.worker.disconnect?.()
149 break
150 }
151 await waitWorkerExit
152 }
153
154 /** @inheritdoc */
155 public registerWorkerEventHandler (
156 event: string,
157 handler: EventHandler<Worker>
158 ): void {
159 this.worker.on(event, handler)
160 }
161
162 /** @inheritdoc */
163 public registerOnceWorkerEventHandler (
164 event: string,
165 handler: EventHandler<Worker>
166 ): void {
167 this.worker.once(event, handler)
168 }
169
170 /** @inheritdoc */
171 public getTaskFunctionWorkerUsage (name: string): WorkerUsage | undefined {
172 if (!Array.isArray(this.info.taskFunctionsProperties)) {
173 throw new Error(
174 `Cannot get task function worker usage for task function name '${name}' when task function properties list is not yet defined`
175 )
176 }
177 if (
178 Array.isArray(this.info.taskFunctionsProperties) &&
179 this.info.taskFunctionsProperties.length < 3
180 ) {
181 throw new Error(
182 `Cannot get task function worker usage for task function name '${name}' when task function properties list has less than 3 elements`
183 )
184 }
185 if (name === DEFAULT_TASK_NAME) {
186 name = this.info.taskFunctionsProperties[1].name
187 }
188 if (!this.taskFunctionsUsage.has(name)) {
189 this.taskFunctionsUsage.set(name, this.initTaskFunctionWorkerUsage(name))
190 }
191 return this.taskFunctionsUsage.get(name)
192 }
193
194 /** @inheritdoc */
195 public deleteTaskFunctionWorkerUsage (name: string): boolean {
196 return this.taskFunctionsUsage.delete(name)
197 }
198
199 private closeMessageChannel (): void {
200 if (this.messageChannel != null) {
201 this.messageChannel.port1.unref()
202 this.messageChannel.port2.unref()
203 this.messageChannel.port1.close()
204 this.messageChannel.port2.close()
205 delete this.messageChannel
206 }
207 }
208
209 private initWorkerInfo (worker: Worker): WorkerInfo {
210 return {
211 id: getWorkerId(worker),
212 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
213 type: getWorkerType(worker)!,
214 dynamic: false,
215 ready: false,
216 stealing: false,
217 backPressure: false
218 }
219 }
220
221 private initWorkerUsage (): WorkerUsage {
222 const getTasksQueueSize = (): number => {
223 return this.tasksQueue.size
224 }
225 const getTasksQueueMaxSize = (): number => {
226 return this.tasksQueue.maxSize
227 }
228 return {
229 tasks: {
230 executed: 0,
231 executing: 0,
232 get queued (): number {
233 return getTasksQueueSize()
234 },
235 get maxQueued (): number {
236 return getTasksQueueMaxSize()
237 },
238 sequentiallyStolen: 0,
239 stolen: 0,
240 failed: 0
241 },
242 runTime: {
243 history: new CircularBuffer<number>(MeasurementHistorySize)
244 },
245 waitTime: {
246 history: new CircularBuffer<number>(MeasurementHistorySize)
247 },
248 elu: {
249 idle: {
250 history: new CircularBuffer<number>(MeasurementHistorySize)
251 },
252 active: {
253 history: new CircularBuffer<number>(MeasurementHistorySize)
254 }
255 }
256 }
257 }
258
259 private initTaskFunctionWorkerUsage (name: string): WorkerUsage {
260 const getTaskFunctionQueueSize = (): number => {
261 let taskFunctionQueueSize = 0
262 for (const task of this.tasksQueue) {
263 if (
264 (task.name === DEFAULT_TASK_NAME &&
265 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
266 name === this.info.taskFunctionsProperties![1].name) ||
267 (task.name !== DEFAULT_TASK_NAME && name === task.name)
268 ) {
269 ++taskFunctionQueueSize
270 }
271 }
272 return taskFunctionQueueSize
273 }
274 return {
275 tasks: {
276 executed: 0,
277 executing: 0,
278 get queued (): number {
279 return getTaskFunctionQueueSize()
280 },
281 sequentiallyStolen: 0,
282 stolen: 0,
283 failed: 0
284 },
285 runTime: {
286 history: new CircularBuffer<number>(MeasurementHistorySize)
287 },
288 waitTime: {
289 history: new CircularBuffer<number>(MeasurementHistorySize)
290 },
291 elu: {
292 idle: {
293 history: new CircularBuffer<number>(MeasurementHistorySize)
294 },
295 active: {
296 history: new CircularBuffer<number>(MeasurementHistorySize)
297 }
298 }
299 }
300 }
301 }