Merge branch 'master' into combined-prs-branch
[poolifier.git] / src / pools / worker-node.ts
CommitLineData
e1c2dba7 1import { EventEmitter } from 'node:events'
ded253e2
JB
2import { MessageChannel } from 'node:worker_threads'
3
f12182ad 4import { CircularBuffer } from '../circular-buffer.js'
e41b0271 5import { PriorityQueue } from '../priority-queue.js'
d35e5717 6import type { Task } from '../utility-types.js'
e9ed6eee 7import { DEFAULT_TASK_NAME } from '../utils.js'
ded253e2
JB
8import {
9 checkWorkerNodeArguments,
10 createWorker,
11 getWorkerId,
12 getWorkerType
13} from './utils.js'
4b628b48 14import {
3bcbd4c5 15 type EventHandler,
4b628b48
JB
16 type IWorker,
17 type IWorkerNode,
f12182ad 18 MeasurementHistorySize,
f3a91bac 19 type StrategyData,
4b628b48 20 type WorkerInfo,
c3719753 21 type WorkerNodeOptions,
4b628b48
JB
22 type WorkerType,
23 WorkerTypes,
24 type WorkerUsage
d35e5717 25} from './worker.js'
4b628b48 26
60664f48
JB
27/**
28 * Worker node.
29 *
30 * @typeParam Worker - Type of worker.
31 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
32 */
4b628b48 33export class WorkerNode<Worker extends IWorker, Data = unknown>
e1c2dba7 34 extends EventEmitter
9f95d5eb 35 implements IWorkerNode<Worker, Data> {
671d5154 36 /** @inheritdoc */
4b628b48 37 public readonly worker: Worker
671d5154 38 /** @inheritdoc */
4b628b48 39 public readonly info: WorkerInfo
671d5154 40 /** @inheritdoc */
4b628b48 41 public usage: WorkerUsage
20c6f652 42 /** @inheritdoc */
f3a91bac
JB
43 public strategyData?: StrategyData
44 /** @inheritdoc */
26fb3c18
JB
45 public messageChannel?: MessageChannel
46 /** @inheritdoc */
20c6f652 47 public tasksQueueBackPressureSize: number
95d1a734 48 private readonly tasksQueue: PriorityQueue<Task<Data>>
2eee7220 49 private setBackPressureFlag: boolean
26fb3c18 50 private readonly taskFunctionsUsage: Map<string, WorkerUsage>
4b628b48 51
60664f48
JB
52 /**
53 * Constructs a new worker node.
54 *
c3719753 55 * @param type - The worker type.
9974369e 56 * @param filePath - Path to the worker file.
c3719753 57 * @param opts - The worker node options.
60664f48 58 */
c3719753 59 constructor (type: WorkerType, filePath: string, opts: WorkerNodeOptions) {
9f95d5eb 60 super()
c3719753
JB
61 checkWorkerNodeArguments(type, filePath, opts)
62 this.worker = createWorker<Worker>(type, filePath, {
63 env: opts.env,
64 workerOptions: opts.workerOptions
65 })
66 this.info = this.initWorkerInfo(this.worker)
26fb3c18 67 this.usage = this.initWorkerUsage()
75de9f41 68 if (this.info.type === WorkerTypes.thread) {
7884d183
JB
69 this.messageChannel = new MessageChannel()
70 }
c63a35a0
JB
71 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
72 this.tasksQueueBackPressureSize = opts.tasksQueueBackPressureSize!
fcfc3353
JB
73 this.tasksQueue = new PriorityQueue<Task<Data>>(
74 opts.tasksQueueBucketSize,
75 opts.tasksQueuePriority
76 )
2eee7220 77 this.setBackPressureFlag = false
26fb3c18 78 this.taskFunctionsUsage = new Map<string, WorkerUsage>()
4b628b48
JB
79 }
80
fcfc3353
JB
81 /** @inheritdoc */
82 public setTasksQueuePriority (enablePriority: boolean): void {
83 this.tasksQueue.enablePriority = enablePriority
84 }
85
4b628b48
JB
86 /** @inheritdoc */
87 public tasksQueueSize (): number {
88 return this.tasksQueue.size
89 }
90
4b628b48
JB
91 /** @inheritdoc */
92 public enqueueTask (task: Task<Data>): number {
95d1a734 93 const tasksQueueSize = this.tasksQueue.enqueue(task, task.priority)
2eee7220
JB
94 if (
95 !this.setBackPressureFlag &&
96 this.hasBackPressure() &&
97 !this.info.backPressure
98 ) {
99 this.setBackPressureFlag = true
100 this.info.backPressure = true
7f0e1334 101 this.emit('backPressure', { workerId: this.info.id })
37d821df 102 this.setBackPressureFlag = false
72695f86
JB
103 }
104 return tasksQueueSize
105 }
106
107 /** @inheritdoc */
95d1a734 108 public dequeueTask (bucket?: number): Task<Data> | undefined {
2eee7220
JB
109 const task = this.tasksQueue.dequeue(bucket)
110 if (
111 !this.setBackPressureFlag &&
112 !this.hasBackPressure() &&
113 this.info.backPressure
114 ) {
115 this.setBackPressureFlag = true
116 this.info.backPressure = false
37d821df 117 this.setBackPressureFlag = false
2eee7220 118 }
2eee7220 119 return task
72695f86
JB
120 }
121
0d4e88b3 122 /** @inheritdoc */
2eee7220 123 public dequeueLastPrioritizedTask (): Task<Data> | undefined {
0d4e88b3 124 // Start from the last empty or partially filled bucket
e75373e6 125 return this.dequeueTask(this.tasksQueue.buckets + 1)
0d4e88b3
JB
126 }
127
4b628b48
JB
128 /** @inheritdoc */
129 public clearTasksQueue (): void {
130 this.tasksQueue.clear()
131 }
132
671d5154
JB
133 /** @inheritdoc */
134 public hasBackPressure (): boolean {
8735b4e5 135 return this.tasksQueue.size >= this.tasksQueueBackPressureSize
671d5154
JB
136 }
137
3f09ed9f 138 /** @inheritdoc */
07e0c9e5
JB
139 public async terminate (): Promise<void> {
140 const waitWorkerExit = new Promise<void>(resolve => {
141 this.registerOnceWorkerEventHandler('exit', () => {
142 resolve()
143 })
144 })
145 this.closeMessageChannel()
146 this.removeAllListeners()
839b98b8
JB
147 switch (this.info.type) {
148 case WorkerTypes.thread:
d20cde84 149 this.worker.unref?.()
839b98b8
JB
150 await this.worker.terminate?.()
151 break
152 case WorkerTypes.cluster:
153 this.registerOnceWorkerEventHandler('disconnect', () => {
154 this.worker.kill?.()
155 })
156 this.worker.disconnect?.()
157 break
3f09ed9f 158 }
07e0c9e5 159 await waitWorkerExit
3f09ed9f
JB
160 }
161
c3719753
JB
162 /** @inheritdoc */
163 public registerWorkerEventHandler (
164 event: string,
3bcbd4c5 165 handler: EventHandler<Worker>
c3719753 166 ): void {
88af9bf1 167 this.worker.on(event, handler)
c3719753
JB
168 }
169
170 /** @inheritdoc */
171 public registerOnceWorkerEventHandler (
172 event: string,
3bcbd4c5 173 handler: EventHandler<Worker>
c3719753 174 ): void {
88af9bf1 175 this.worker.once(event, handler)
c3719753
JB
176 }
177
ff128cc9 178 /** @inheritdoc */
db0e38ee 179 public getTaskFunctionWorkerUsage (name: string): WorkerUsage | undefined {
31847469 180 if (!Array.isArray(this.info.taskFunctionsProperties)) {
71b2b6d8 181 throw new Error(
31847469 182 `Cannot get task function worker usage for task function name '${name}' when task function properties list is not yet defined`
71b2b6d8
JB
183 )
184 }
b558f6b5 185 if (
31847469
JB
186 Array.isArray(this.info.taskFunctionsProperties) &&
187 this.info.taskFunctionsProperties.length < 3
b558f6b5 188 ) {
db0e38ee 189 throw new Error(
31847469 190 `Cannot get task function worker usage for task function name '${name}' when task function properties list has less than 3 elements`
db0e38ee
JB
191 )
192 }
193 if (name === DEFAULT_TASK_NAME) {
31847469 194 name = this.info.taskFunctionsProperties[1].name
b558f6b5 195 }
db0e38ee
JB
196 if (!this.taskFunctionsUsage.has(name)) {
197 this.taskFunctionsUsage.set(name, this.initTaskFunctionWorkerUsage(name))
ff128cc9 198 }
db0e38ee 199 return this.taskFunctionsUsage.get(name)
4b628b48
JB
200 }
201
adee6053
JB
202 /** @inheritdoc */
203 public deleteTaskFunctionWorkerUsage (name: string): boolean {
204 return this.taskFunctionsUsage.delete(name)
205 }
206
07e0c9e5
JB
207 private closeMessageChannel (): void {
208 if (this.messageChannel != null) {
209 this.messageChannel.port1.unref()
210 this.messageChannel.port2.unref()
211 this.messageChannel.port1.close()
212 this.messageChannel.port2.close()
213 delete this.messageChannel
214 }
215 }
216
75de9f41 217 private initWorkerInfo (worker: Worker): WorkerInfo {
4b628b48 218 return {
75de9f41 219 id: getWorkerId(worker),
67f3f2d6
JB
220 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
221 type: getWorkerType(worker)!,
4b628b48 222 dynamic: false,
5eb72b9e 223 ready: false,
85b553ba
JB
224 stealing: false,
225 backPressure: false
4b628b48
JB
226 }
227 }
228
229 private initWorkerUsage (): WorkerUsage {
230 const getTasksQueueSize = (): number => {
dd951876 231 return this.tasksQueue.size
4b628b48 232 }
bf4ef2ca 233 const getTasksQueueMaxSize = (): number => {
dd951876 234 return this.tasksQueue.maxSize
4b628b48
JB
235 }
236 return {
237 tasks: {
238 executed: 0,
239 executing: 0,
240 get queued (): number {
241 return getTasksQueueSize()
242 },
243 get maxQueued (): number {
bf4ef2ca 244 return getTasksQueueMaxSize()
4b628b48 245 },
463226a4 246 sequentiallyStolen: 0,
68cbdc84 247 stolen: 0,
4b628b48
JB
248 failed: 0
249 },
250 runTime: {
0cd1f28c 251 history: new CircularBuffer(MeasurementHistorySize)
4b628b48
JB
252 },
253 waitTime: {
0cd1f28c 254 history: new CircularBuffer(MeasurementHistorySize)
4b628b48
JB
255 },
256 elu: {
257 idle: {
0cd1f28c 258 history: new CircularBuffer(MeasurementHistorySize)
4b628b48
JB
259 },
260 active: {
0cd1f28c 261 history: new CircularBuffer(MeasurementHistorySize)
4b628b48
JB
262 }
263 }
264 }
265 }
266
db0e38ee 267 private initTaskFunctionWorkerUsage (name: string): WorkerUsage {
e5ece61d
JB
268 const getTaskFunctionQueueSize = (): number => {
269 let taskFunctionQueueSize = 0
b25a42cd 270 for (const task of this.tasksQueue) {
dd92a715 271 if (
e5ece61d 272 (task.name === DEFAULT_TASK_NAME &&
67f3f2d6 273 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
31847469 274 name === this.info.taskFunctionsProperties![1].name) ||
e5ece61d 275 (task.name !== DEFAULT_TASK_NAME && name === task.name)
dd92a715 276 ) {
e5ece61d 277 ++taskFunctionQueueSize
b25a42cd
JB
278 }
279 }
e5ece61d 280 return taskFunctionQueueSize
b25a42cd
JB
281 }
282 return {
283 tasks: {
284 executed: 0,
285 executing: 0,
286 get queued (): number {
e5ece61d 287 return getTaskFunctionQueueSize()
b25a42cd 288 },
463226a4 289 sequentiallyStolen: 0,
68cbdc84 290 stolen: 0,
b25a42cd
JB
291 failed: 0
292 },
293 runTime: {
0cd1f28c 294 history: new CircularBuffer(MeasurementHistorySize)
b25a42cd
JB
295 },
296 waitTime: {
0cd1f28c 297 history: new CircularBuffer(MeasurementHistorySize)
b25a42cd
JB
298 },
299 elu: {
300 idle: {
0cd1f28c 301 history: new CircularBuffer(MeasurementHistorySize)
b25a42cd
JB
302 },
303 active: {
0cd1f28c 304 history: new CircularBuffer(MeasurementHistorySize)
b25a42cd
JB
305 }
306 }
307 }
308 }
4b628b48 309}