chore: migrate to eslint 9
[poolifier.git] / src / pools / worker-node.ts
... / ...
CommitLineData
1import { EventEmitter } from 'node:events'
2import { MessageChannel } from 'node:worker_threads'
3
4import { CircularBuffer } from '../circular-buffer.js'
5import { PriorityQueue } from '../priority-queue.js'
6import type { Task } from '../utility-types.js'
7import { DEFAULT_TASK_NAME } from '../utils.js'
8import {
9 checkWorkerNodeArguments,
10 createWorker,
11 getWorkerId,
12 getWorkerType,
13} from './utils.js'
14import {
15 type EventHandler,
16 type IWorker,
17 type IWorkerNode,
18 MeasurementHistorySize,
19 type StrategyData,
20 type WorkerInfo,
21 type WorkerNodeOptions,
22 type WorkerType,
23 WorkerTypes,
24 type WorkerUsage,
25} from './worker.js'
26
27/**
28 * Worker node.
29 * @typeParam Worker - Type of worker.
30 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
31 */
32export class WorkerNode<Worker extends IWorker, Data = unknown>
33 extends EventEmitter
34 implements IWorkerNode<Worker, Data> {
35 /** @inheritdoc */
36 public readonly worker: Worker
37 /** @inheritdoc */
38 public readonly info: WorkerInfo
39 /** @inheritdoc */
40 public usage: WorkerUsage
41 /** @inheritdoc */
42 public strategyData?: StrategyData
43 /** @inheritdoc */
44 public messageChannel?: MessageChannel
45 /** @inheritdoc */
46 public tasksQueueBackPressureSize: number
47 private readonly tasksQueue: PriorityQueue<Task<Data>>
48 private setBackPressureFlag: boolean
49 private readonly taskFunctionsUsage: Map<string, WorkerUsage>
50
51 /**
52 * Constructs a new worker node.
53 * @param type - The worker type.
54 * @param filePath - Path to the worker file.
55 * @param opts - The worker node options.
56 */
57 constructor (type: WorkerType, filePath: string, opts: WorkerNodeOptions) {
58 super()
59 checkWorkerNodeArguments(type, filePath, opts)
60 this.worker = createWorker<Worker>(type, filePath, {
61 env: opts.env,
62 workerOptions: opts.workerOptions,
63 })
64 this.info = this.initWorkerInfo(this.worker)
65 this.usage = this.initWorkerUsage()
66 if (this.info.type === WorkerTypes.thread) {
67 this.messageChannel = new MessageChannel()
68 }
69 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
70 this.tasksQueueBackPressureSize = opts.tasksQueueBackPressureSize!
71 this.tasksQueue = new PriorityQueue<Task<Data>>(
72 opts.tasksQueueBucketSize,
73 opts.tasksQueuePriority
74 )
75 this.setBackPressureFlag = false
76 this.taskFunctionsUsage = new Map<string, WorkerUsage>()
77 }
78
79 /** @inheritdoc */
80 public setTasksQueuePriority (enablePriority: boolean): void {
81 this.tasksQueue.enablePriority = enablePriority
82 }
83
84 /** @inheritdoc */
85 public tasksQueueSize (): number {
86 return this.tasksQueue.size
87 }
88
89 /** @inheritdoc */
90 public enqueueTask (task: Task<Data>): number {
91 const tasksQueueSize = this.tasksQueue.enqueue(task, task.priority)
92 if (
93 !this.setBackPressureFlag &&
94 this.hasBackPressure() &&
95 !this.info.backPressure
96 ) {
97 this.setBackPressureFlag = true
98 this.info.backPressure = true
99 this.emit('backPressure', { workerId: this.info.id })
100 this.setBackPressureFlag = false
101 }
102 return tasksQueueSize
103 }
104
105 /** @inheritdoc */
106 public dequeueTask (bucket?: number): Task<Data> | undefined {
107 const task = this.tasksQueue.dequeue(bucket)
108 if (
109 !this.setBackPressureFlag &&
110 !this.hasBackPressure() &&
111 this.info.backPressure
112 ) {
113 this.setBackPressureFlag = true
114 this.info.backPressure = false
115 this.setBackPressureFlag = false
116 }
117 return task
118 }
119
120 /** @inheritdoc */
121 public dequeueLastPrioritizedTask (): Task<Data> | undefined {
122 // Start from the last empty or partially filled bucket
123 return this.dequeueTask(this.tasksQueue.buckets + 1)
124 }
125
126 /** @inheritdoc */
127 public clearTasksQueue (): void {
128 this.tasksQueue.clear()
129 }
130
131 /** @inheritdoc */
132 public hasBackPressure (): boolean {
133 return this.tasksQueue.size >= this.tasksQueueBackPressureSize
134 }
135
136 /** @inheritdoc */
137 public async terminate (): Promise<void> {
138 const waitWorkerExit = new Promise<void>(resolve => {
139 this.registerOnceWorkerEventHandler('exit', () => {
140 resolve()
141 })
142 })
143 this.closeMessageChannel()
144 this.removeAllListeners()
145 switch (this.info.type) {
146 case WorkerTypes.thread:
147 this.worker.unref?.()
148 await this.worker.terminate?.()
149 break
150 case WorkerTypes.cluster:
151 this.registerOnceWorkerEventHandler('disconnect', () => {
152 this.worker.kill?.()
153 })
154 this.worker.disconnect?.()
155 break
156 }
157 await waitWorkerExit
158 }
159
160 /** @inheritdoc */
161 public registerWorkerEventHandler (
162 event: string,
163 handler: EventHandler<Worker>
164 ): void {
165 this.worker.on(event, handler)
166 }
167
168 /** @inheritdoc */
169 public registerOnceWorkerEventHandler (
170 event: string,
171 handler: EventHandler<Worker>
172 ): void {
173 this.worker.once(event, handler)
174 }
175
176 /** @inheritdoc */
177 public getTaskFunctionWorkerUsage (name: string): WorkerUsage | undefined {
178 if (!Array.isArray(this.info.taskFunctionsProperties)) {
179 throw new Error(
180 `Cannot get task function worker usage for task function name '${name}' when task function properties list is not yet defined`
181 )
182 }
183 if (
184 Array.isArray(this.info.taskFunctionsProperties) &&
185 this.info.taskFunctionsProperties.length < 3
186 ) {
187 throw new Error(
188 `Cannot get task function worker usage for task function name '${name}' when task function properties list has less than 3 elements`
189 )
190 }
191 if (name === DEFAULT_TASK_NAME) {
192 name = this.info.taskFunctionsProperties[1].name
193 }
194 if (!this.taskFunctionsUsage.has(name)) {
195 this.taskFunctionsUsage.set(name, this.initTaskFunctionWorkerUsage(name))
196 }
197 return this.taskFunctionsUsage.get(name)
198 }
199
200 /** @inheritdoc */
201 public deleteTaskFunctionWorkerUsage (name: string): boolean {
202 return this.taskFunctionsUsage.delete(name)
203 }
204
205 private closeMessageChannel (): void {
206 if (this.messageChannel != null) {
207 this.messageChannel.port1.unref()
208 this.messageChannel.port2.unref()
209 this.messageChannel.port1.close()
210 this.messageChannel.port2.close()
211 delete this.messageChannel
212 }
213 }
214
215 private initWorkerInfo (worker: Worker): WorkerInfo {
216 return {
217 id: getWorkerId(worker),
218 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
219 type: getWorkerType(worker)!,
220 dynamic: false,
221 ready: false,
222 stealing: false,
223 backPressure: false,
224 }
225 }
226
227 private initWorkerUsage (): WorkerUsage {
228 const getTasksQueueSize = (): number => {
229 return this.tasksQueue.size
230 }
231 const getTasksQueueMaxSize = (): number => {
232 return this.tasksQueue.maxSize
233 }
234 return {
235 tasks: {
236 executed: 0,
237 executing: 0,
238 get queued (): number {
239 return getTasksQueueSize()
240 },
241 get maxQueued (): number {
242 return getTasksQueueMaxSize()
243 },
244 sequentiallyStolen: 0,
245 stolen: 0,
246 failed: 0,
247 },
248 runTime: {
249 history: new CircularBuffer(MeasurementHistorySize),
250 },
251 waitTime: {
252 history: new CircularBuffer(MeasurementHistorySize),
253 },
254 elu: {
255 idle: {
256 history: new CircularBuffer(MeasurementHistorySize),
257 },
258 active: {
259 history: new CircularBuffer(MeasurementHistorySize),
260 },
261 },
262 }
263 }
264
265 private initTaskFunctionWorkerUsage (name: string): WorkerUsage {
266 const getTaskFunctionQueueSize = (): number => {
267 let taskFunctionQueueSize = 0
268 for (const task of this.tasksQueue) {
269 if (
270 (task.name === DEFAULT_TASK_NAME &&
271 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
272 name === this.info.taskFunctionsProperties![1].name) ||
273 (task.name !== DEFAULT_TASK_NAME && name === task.name)
274 ) {
275 ++taskFunctionQueueSize
276 }
277 }
278 return taskFunctionQueueSize
279 }
280 return {
281 tasks: {
282 executed: 0,
283 executing: 0,
284 get queued (): number {
285 return getTaskFunctionQueueSize()
286 },
287 sequentiallyStolen: 0,
288 stolen: 0,
289 failed: 0,
290 },
291 runTime: {
292 history: new CircularBuffer(MeasurementHistorySize),
293 },
294 waitTime: {
295 history: new CircularBuffer(MeasurementHistorySize),
296 },
297 elu: {
298 idle: {
299 history: new CircularBuffer(MeasurementHistorySize),
300 },
301 active: {
302 history: new CircularBuffer(MeasurementHistorySize),
303 },
304 },
305 }
306 }
307}