perf: optimize tasks queue implementation
[poolifier.git] / src / pools / worker-node.ts
CommitLineData
e1c2dba7 1import { EventEmitter } from 'node:events'
ded253e2
JB
2import { MessageChannel } from 'node:worker_threads'
3
f12182ad 4import { CircularBuffer } from '../circular-buffer.js'
e41b0271 5import { PriorityQueue } from '../priority-queue.js'
d35e5717 6import type { Task } from '../utility-types.js'
e9ed6eee 7import { DEFAULT_TASK_NAME } from '../utils.js'
ded253e2
JB
8import {
9 checkWorkerNodeArguments,
10 createWorker,
11 getWorkerId,
12 getWorkerType
13} from './utils.js'
4b628b48 14import {
3bcbd4c5 15 type EventHandler,
4b628b48
JB
16 type IWorker,
17 type IWorkerNode,
f12182ad 18 MeasurementHistorySize,
f3a91bac 19 type StrategyData,
4b628b48 20 type WorkerInfo,
c3719753 21 type WorkerNodeOptions,
4b628b48
JB
22 type WorkerType,
23 WorkerTypes,
24 type WorkerUsage
d35e5717 25} from './worker.js'
4b628b48 26
60664f48
JB
27/**
28 * Worker node.
29 *
30 * @typeParam Worker - Type of worker.
31 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
32 */
4b628b48 33export class WorkerNode<Worker extends IWorker, Data = unknown>
e1c2dba7 34 extends EventEmitter
9f95d5eb 35 implements IWorkerNode<Worker, Data> {
671d5154 36 /** @inheritdoc */
4b628b48 37 public readonly worker: Worker
671d5154 38 /** @inheritdoc */
4b628b48 39 public readonly info: WorkerInfo
671d5154 40 /** @inheritdoc */
4b628b48 41 public usage: WorkerUsage
20c6f652 42 /** @inheritdoc */
f3a91bac
JB
43 public strategyData?: StrategyData
44 /** @inheritdoc */
26fb3c18
JB
45 public messageChannel?: MessageChannel
46 /** @inheritdoc */
20c6f652 47 public tasksQueueBackPressureSize: number
95d1a734 48 private readonly tasksQueue: PriorityQueue<Task<Data>>
2eee7220 49 private setBackPressureFlag: boolean
26fb3c18 50 private readonly taskFunctionsUsage: Map<string, WorkerUsage>
4b628b48 51
60664f48
JB
52 /**
53 * Constructs a new worker node.
54 *
c3719753 55 * @param type - The worker type.
9974369e 56 * @param filePath - Path to the worker file.
c3719753 57 * @param opts - The worker node options.
60664f48 58 */
c3719753 59 constructor (type: WorkerType, filePath: string, opts: WorkerNodeOptions) {
9f95d5eb 60 super()
c3719753
JB
61 checkWorkerNodeArguments(type, filePath, opts)
62 this.worker = createWorker<Worker>(type, filePath, {
63 env: opts.env,
64 workerOptions: opts.workerOptions
65 })
66 this.info = this.initWorkerInfo(this.worker)
26fb3c18 67 this.usage = this.initWorkerUsage()
75de9f41 68 if (this.info.type === WorkerTypes.thread) {
7884d183
JB
69 this.messageChannel = new MessageChannel()
70 }
c63a35a0
JB
71 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
72 this.tasksQueueBackPressureSize = opts.tasksQueueBackPressureSize!
95d1a734 73 this.tasksQueue = new PriorityQueue<Task<Data>>(opts.tasksQueueBucketSize)
2eee7220 74 this.setBackPressureFlag = false
26fb3c18 75 this.taskFunctionsUsage = new Map<string, WorkerUsage>()
4b628b48
JB
76 }
77
78 /** @inheritdoc */
79 public tasksQueueSize (): number {
80 return this.tasksQueue.size
81 }
82
4b628b48
JB
83 /** @inheritdoc */
84 public enqueueTask (task: Task<Data>): number {
95d1a734 85 const tasksQueueSize = this.tasksQueue.enqueue(task, task.priority)
2eee7220
JB
86 if (
87 !this.setBackPressureFlag &&
88 this.hasBackPressure() &&
89 !this.info.backPressure
90 ) {
91 this.setBackPressureFlag = true
92 this.info.backPressure = true
7f0e1334 93 this.emit('backPressure', { workerId: this.info.id })
37d821df 94 this.setBackPressureFlag = false
72695f86
JB
95 }
96 return tasksQueueSize
97 }
98
99 /** @inheritdoc */
95d1a734 100 public dequeueTask (bucket?: number): Task<Data> | undefined {
2eee7220
JB
101 const task = this.tasksQueue.dequeue(bucket)
102 if (
103 !this.setBackPressureFlag &&
104 !this.hasBackPressure() &&
105 this.info.backPressure
106 ) {
107 this.setBackPressureFlag = true
108 this.info.backPressure = false
37d821df 109 this.setBackPressureFlag = false
2eee7220 110 }
2eee7220 111 return task
72695f86
JB
112 }
113
0d4e88b3 114 /** @inheritdoc */
2eee7220 115 public dequeueLastPrioritizedTask (): Task<Data> | undefined {
0d4e88b3 116 // Start from the last empty or partially filled bucket
9df282a0 117 return this.dequeueTask()
0d4e88b3
JB
118 }
119
4b628b48
JB
120 /** @inheritdoc */
121 public clearTasksQueue (): void {
122 this.tasksQueue.clear()
123 }
124
671d5154
JB
125 /** @inheritdoc */
126 public hasBackPressure (): boolean {
8735b4e5 127 return this.tasksQueue.size >= this.tasksQueueBackPressureSize
671d5154
JB
128 }
129
3f09ed9f 130 /** @inheritdoc */
07e0c9e5
JB
131 public async terminate (): Promise<void> {
132 const waitWorkerExit = new Promise<void>(resolve => {
133 this.registerOnceWorkerEventHandler('exit', () => {
134 resolve()
135 })
136 })
137 this.closeMessageChannel()
138 this.removeAllListeners()
839b98b8
JB
139 switch (this.info.type) {
140 case WorkerTypes.thread:
d20cde84 141 this.worker.unref?.()
839b98b8
JB
142 await this.worker.terminate?.()
143 break
144 case WorkerTypes.cluster:
145 this.registerOnceWorkerEventHandler('disconnect', () => {
146 this.worker.kill?.()
147 })
148 this.worker.disconnect?.()
149 break
3f09ed9f 150 }
07e0c9e5 151 await waitWorkerExit
3f09ed9f
JB
152 }
153
c3719753
JB
154 /** @inheritdoc */
155 public registerWorkerEventHandler (
156 event: string,
3bcbd4c5 157 handler: EventHandler<Worker>
c3719753 158 ): void {
88af9bf1 159 this.worker.on(event, handler)
c3719753
JB
160 }
161
162 /** @inheritdoc */
163 public registerOnceWorkerEventHandler (
164 event: string,
3bcbd4c5 165 handler: EventHandler<Worker>
c3719753 166 ): void {
88af9bf1 167 this.worker.once(event, handler)
c3719753
JB
168 }
169
ff128cc9 170 /** @inheritdoc */
db0e38ee 171 public getTaskFunctionWorkerUsage (name: string): WorkerUsage | undefined {
31847469 172 if (!Array.isArray(this.info.taskFunctionsProperties)) {
71b2b6d8 173 throw new Error(
31847469 174 `Cannot get task function worker usage for task function name '${name}' when task function properties list is not yet defined`
71b2b6d8
JB
175 )
176 }
b558f6b5 177 if (
31847469
JB
178 Array.isArray(this.info.taskFunctionsProperties) &&
179 this.info.taskFunctionsProperties.length < 3
b558f6b5 180 ) {
db0e38ee 181 throw new Error(
31847469 182 `Cannot get task function worker usage for task function name '${name}' when task function properties list has less than 3 elements`
db0e38ee
JB
183 )
184 }
185 if (name === DEFAULT_TASK_NAME) {
31847469 186 name = this.info.taskFunctionsProperties[1].name
b558f6b5 187 }
db0e38ee
JB
188 if (!this.taskFunctionsUsage.has(name)) {
189 this.taskFunctionsUsage.set(name, this.initTaskFunctionWorkerUsage(name))
ff128cc9 190 }
db0e38ee 191 return this.taskFunctionsUsage.get(name)
4b628b48
JB
192 }
193
adee6053
JB
194 /** @inheritdoc */
195 public deleteTaskFunctionWorkerUsage (name: string): boolean {
196 return this.taskFunctionsUsage.delete(name)
197 }
198
07e0c9e5
JB
199 private closeMessageChannel (): void {
200 if (this.messageChannel != null) {
201 this.messageChannel.port1.unref()
202 this.messageChannel.port2.unref()
203 this.messageChannel.port1.close()
204 this.messageChannel.port2.close()
205 delete this.messageChannel
206 }
207 }
208
75de9f41 209 private initWorkerInfo (worker: Worker): WorkerInfo {
4b628b48 210 return {
75de9f41 211 id: getWorkerId(worker),
67f3f2d6
JB
212 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
213 type: getWorkerType(worker)!,
4b628b48 214 dynamic: false,
5eb72b9e 215 ready: false,
85b553ba
JB
216 stealing: false,
217 backPressure: false
4b628b48
JB
218 }
219 }
220
221 private initWorkerUsage (): WorkerUsage {
222 const getTasksQueueSize = (): number => {
dd951876 223 return this.tasksQueue.size
4b628b48 224 }
bf4ef2ca 225 const getTasksQueueMaxSize = (): number => {
dd951876 226 return this.tasksQueue.maxSize
4b628b48
JB
227 }
228 return {
229 tasks: {
230 executed: 0,
231 executing: 0,
232 get queued (): number {
233 return getTasksQueueSize()
234 },
235 get maxQueued (): number {
bf4ef2ca 236 return getTasksQueueMaxSize()
4b628b48 237 },
463226a4 238 sequentiallyStolen: 0,
68cbdc84 239 stolen: 0,
4b628b48
JB
240 failed: 0
241 },
242 runTime: {
0cd1f28c 243 history: new CircularBuffer(MeasurementHistorySize)
4b628b48
JB
244 },
245 waitTime: {
0cd1f28c 246 history: new CircularBuffer(MeasurementHistorySize)
4b628b48
JB
247 },
248 elu: {
249 idle: {
0cd1f28c 250 history: new CircularBuffer(MeasurementHistorySize)
4b628b48
JB
251 },
252 active: {
0cd1f28c 253 history: new CircularBuffer(MeasurementHistorySize)
4b628b48
JB
254 }
255 }
256 }
257 }
258
db0e38ee 259 private initTaskFunctionWorkerUsage (name: string): WorkerUsage {
e5ece61d
JB
260 const getTaskFunctionQueueSize = (): number => {
261 let taskFunctionQueueSize = 0
b25a42cd 262 for (const task of this.tasksQueue) {
dd92a715 263 if (
e5ece61d 264 (task.name === DEFAULT_TASK_NAME &&
67f3f2d6 265 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
31847469 266 name === this.info.taskFunctionsProperties![1].name) ||
e5ece61d 267 (task.name !== DEFAULT_TASK_NAME && name === task.name)
dd92a715 268 ) {
e5ece61d 269 ++taskFunctionQueueSize
b25a42cd
JB
270 }
271 }
e5ece61d 272 return taskFunctionQueueSize
b25a42cd
JB
273 }
274 return {
275 tasks: {
276 executed: 0,
277 executing: 0,
278 get queued (): number {
e5ece61d 279 return getTaskFunctionQueueSize()
b25a42cd 280 },
463226a4 281 sequentiallyStolen: 0,
68cbdc84 282 stolen: 0,
b25a42cd
JB
283 failed: 0
284 },
285 runTime: {
0cd1f28c 286 history: new CircularBuffer(MeasurementHistorySize)
b25a42cd
JB
287 },
288 waitTime: {
0cd1f28c 289 history: new CircularBuffer(MeasurementHistorySize)
b25a42cd
JB
290 },
291 elu: {
292 idle: {
0cd1f28c 293 history: new CircularBuffer(MeasurementHistorySize)
b25a42cd
JB
294 },
295 active: {
0cd1f28c 296 history: new CircularBuffer(MeasurementHistorySize)
b25a42cd
JB
297 }
298 }
299 }
300 }
4b628b48 301}