perf: enable prioritized tasks queue only when necessary
[poolifier.git] / src / pools / worker-node.ts
1 import { EventEmitter } from 'node:events'
2 import { MessageChannel } from 'node:worker_threads'
3
4 import { CircularBuffer } from '../circular-buffer.js'
5 import { PriorityQueue } from '../priority-queue.js'
6 import type { Task } from '../utility-types.js'
7 import { DEFAULT_TASK_NAME } from '../utils.js'
8 import {
9 checkWorkerNodeArguments,
10 createWorker,
11 getWorkerId,
12 getWorkerType
13 } from './utils.js'
14 import {
15 type EventHandler,
16 type IWorker,
17 type IWorkerNode,
18 MeasurementHistorySize,
19 type StrategyData,
20 type WorkerInfo,
21 type WorkerNodeOptions,
22 type WorkerType,
23 WorkerTypes,
24 type WorkerUsage
25 } from './worker.js'
26
27 /**
28 * Worker node.
29 *
30 * @typeParam Worker - Type of worker.
31 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
32 */
33 export class WorkerNode<Worker extends IWorker, Data = unknown>
34 extends EventEmitter
35 implements IWorkerNode<Worker, Data> {
36 /** @inheritdoc */
37 public readonly worker: Worker
38 /** @inheritdoc */
39 public readonly info: WorkerInfo
40 /** @inheritdoc */
41 public usage: WorkerUsage
42 /** @inheritdoc */
43 public strategyData?: StrategyData
44 /** @inheritdoc */
45 public messageChannel?: MessageChannel
46 /** @inheritdoc */
47 public tasksQueueBackPressureSize: number
48 private readonly tasksQueue: PriorityQueue<Task<Data>>
49 private setBackPressureFlag: boolean
50 private readonly taskFunctionsUsage: Map<string, WorkerUsage>
51
52 /**
53 * Constructs a new worker node.
54 *
55 * @param type - The worker type.
56 * @param filePath - Path to the worker file.
57 * @param opts - The worker node options.
58 */
59 constructor (type: WorkerType, filePath: string, opts: WorkerNodeOptions) {
60 super()
61 checkWorkerNodeArguments(type, filePath, opts)
62 this.worker = createWorker<Worker>(type, filePath, {
63 env: opts.env,
64 workerOptions: opts.workerOptions
65 })
66 this.info = this.initWorkerInfo(this.worker)
67 this.usage = this.initWorkerUsage()
68 if (this.info.type === WorkerTypes.thread) {
69 this.messageChannel = new MessageChannel()
70 }
71 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
72 this.tasksQueueBackPressureSize = opts.tasksQueueBackPressureSize!
73 this.tasksQueue = new PriorityQueue<Task<Data>>(
74 opts.tasksQueueBucketSize,
75 opts.tasksQueuePriority
76 )
77 this.setBackPressureFlag = false
78 this.taskFunctionsUsage = new Map<string, WorkerUsage>()
79 }
80
81 /** @inheritdoc */
82 public setTasksQueuePriority (enablePriority: boolean): void {
83 this.tasksQueue.enablePriority = enablePriority
84 }
85
86 /** @inheritdoc */
87 public tasksQueueSize (): number {
88 return this.tasksQueue.size
89 }
90
91 /** @inheritdoc */
92 public enqueueTask (task: Task<Data>): number {
93 const tasksQueueSize = this.tasksQueue.enqueue(task, task.priority)
94 if (
95 !this.setBackPressureFlag &&
96 this.hasBackPressure() &&
97 !this.info.backPressure
98 ) {
99 this.setBackPressureFlag = true
100 this.info.backPressure = true
101 this.emit('backPressure', { workerId: this.info.id })
102 this.setBackPressureFlag = false
103 }
104 return tasksQueueSize
105 }
106
107 /** @inheritdoc */
108 public dequeueTask (bucket?: number): Task<Data> | undefined {
109 const task = this.tasksQueue.dequeue(bucket)
110 if (
111 !this.setBackPressureFlag &&
112 !this.hasBackPressure() &&
113 this.info.backPressure
114 ) {
115 this.setBackPressureFlag = true
116 this.info.backPressure = false
117 this.setBackPressureFlag = false
118 }
119 return task
120 }
121
122 /** @inheritdoc */
123 public dequeueLastPrioritizedTask (): Task<Data> | undefined {
124 // Start from the last empty or partially filled bucket
125 return this.dequeueTask(this.tasksQueue.buckets + 1)
126 }
127
128 /** @inheritdoc */
129 public clearTasksQueue (): void {
130 this.tasksQueue.clear()
131 }
132
133 /** @inheritdoc */
134 public hasBackPressure (): boolean {
135 return this.tasksQueue.size >= this.tasksQueueBackPressureSize
136 }
137
138 /** @inheritdoc */
139 public async terminate (): Promise<void> {
140 const waitWorkerExit = new Promise<void>(resolve => {
141 this.registerOnceWorkerEventHandler('exit', () => {
142 resolve()
143 })
144 })
145 this.closeMessageChannel()
146 this.removeAllListeners()
147 switch (this.info.type) {
148 case WorkerTypes.thread:
149 this.worker.unref?.()
150 await this.worker.terminate?.()
151 break
152 case WorkerTypes.cluster:
153 this.registerOnceWorkerEventHandler('disconnect', () => {
154 this.worker.kill?.()
155 })
156 this.worker.disconnect?.()
157 break
158 }
159 await waitWorkerExit
160 }
161
162 /** @inheritdoc */
163 public registerWorkerEventHandler (
164 event: string,
165 handler: EventHandler<Worker>
166 ): void {
167 this.worker.on(event, handler)
168 }
169
170 /** @inheritdoc */
171 public registerOnceWorkerEventHandler (
172 event: string,
173 handler: EventHandler<Worker>
174 ): void {
175 this.worker.once(event, handler)
176 }
177
178 /** @inheritdoc */
179 public getTaskFunctionWorkerUsage (name: string): WorkerUsage | undefined {
180 if (!Array.isArray(this.info.taskFunctionsProperties)) {
181 throw new Error(
182 `Cannot get task function worker usage for task function name '${name}' when task function properties list is not yet defined`
183 )
184 }
185 if (
186 Array.isArray(this.info.taskFunctionsProperties) &&
187 this.info.taskFunctionsProperties.length < 3
188 ) {
189 throw new Error(
190 `Cannot get task function worker usage for task function name '${name}' when task function properties list has less than 3 elements`
191 )
192 }
193 if (name === DEFAULT_TASK_NAME) {
194 name = this.info.taskFunctionsProperties[1].name
195 }
196 if (!this.taskFunctionsUsage.has(name)) {
197 this.taskFunctionsUsage.set(name, this.initTaskFunctionWorkerUsage(name))
198 }
199 return this.taskFunctionsUsage.get(name)
200 }
201
202 /** @inheritdoc */
203 public deleteTaskFunctionWorkerUsage (name: string): boolean {
204 return this.taskFunctionsUsage.delete(name)
205 }
206
207 private closeMessageChannel (): void {
208 if (this.messageChannel != null) {
209 this.messageChannel.port1.unref()
210 this.messageChannel.port2.unref()
211 this.messageChannel.port1.close()
212 this.messageChannel.port2.close()
213 delete this.messageChannel
214 }
215 }
216
217 private initWorkerInfo (worker: Worker): WorkerInfo {
218 return {
219 id: getWorkerId(worker),
220 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
221 type: getWorkerType(worker)!,
222 dynamic: false,
223 ready: false,
224 stealing: false,
225 backPressure: false
226 }
227 }
228
229 private initWorkerUsage (): WorkerUsage {
230 const getTasksQueueSize = (): number => {
231 return this.tasksQueue.size
232 }
233 const getTasksQueueMaxSize = (): number => {
234 return this.tasksQueue.maxSize
235 }
236 return {
237 tasks: {
238 executed: 0,
239 executing: 0,
240 get queued (): number {
241 return getTasksQueueSize()
242 },
243 get maxQueued (): number {
244 return getTasksQueueMaxSize()
245 },
246 sequentiallyStolen: 0,
247 stolen: 0,
248 failed: 0
249 },
250 runTime: {
251 history: new CircularBuffer(MeasurementHistorySize)
252 },
253 waitTime: {
254 history: new CircularBuffer(MeasurementHistorySize)
255 },
256 elu: {
257 idle: {
258 history: new CircularBuffer(MeasurementHistorySize)
259 },
260 active: {
261 history: new CircularBuffer(MeasurementHistorySize)
262 }
263 }
264 }
265 }
266
267 private initTaskFunctionWorkerUsage (name: string): WorkerUsage {
268 const getTaskFunctionQueueSize = (): number => {
269 let taskFunctionQueueSize = 0
270 for (const task of this.tasksQueue) {
271 if (
272 (task.name === DEFAULT_TASK_NAME &&
273 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
274 name === this.info.taskFunctionsProperties![1].name) ||
275 (task.name !== DEFAULT_TASK_NAME && name === task.name)
276 ) {
277 ++taskFunctionQueueSize
278 }
279 }
280 return taskFunctionQueueSize
281 }
282 return {
283 tasks: {
284 executed: 0,
285 executing: 0,
286 get queued (): number {
287 return getTaskFunctionQueueSize()
288 },
289 sequentiallyStolen: 0,
290 stolen: 0,
291 failed: 0
292 },
293 runTime: {
294 history: new CircularBuffer(MeasurementHistorySize)
295 },
296 waitTime: {
297 history: new CircularBuffer(MeasurementHistorySize)
298 },
299 elu: {
300 idle: {
301 history: new CircularBuffer(MeasurementHistorySize)
302 },
303 active: {
304 history: new CircularBuffer(MeasurementHistorySize)
305 }
306 }
307 }
308 }
309 }