build(deps-dev): apply updates
[poolifier.git] / src / pools / worker-node.ts
CommitLineData
e1c2dba7 1import { EventEmitter } from 'node:events'
ded253e2
JB
2import { MessageChannel } from 'node:worker_threads'
3
f12182ad 4import { CircularBuffer } from '../circular-buffer.js'
e41b0271 5import { PriorityQueue } from '../priority-queue.js'
d35e5717 6import type { Task } from '../utility-types.js'
e9ed6eee 7import { DEFAULT_TASK_NAME } from '../utils.js'
ded253e2
JB
8import {
9 checkWorkerNodeArguments,
10 createWorker,
11 getWorkerId,
3a502712 12 getWorkerType,
ded253e2 13} from './utils.js'
4b628b48 14import {
3bcbd4c5 15 type EventHandler,
4b628b48
JB
16 type IWorker,
17 type IWorkerNode,
f12182ad 18 MeasurementHistorySize,
f3a91bac 19 type StrategyData,
4b628b48 20 type WorkerInfo,
c3719753 21 type WorkerNodeOptions,
4b628b48
JB
22 type WorkerType,
23 WorkerTypes,
3a502712 24 type WorkerUsage,
d35e5717 25} from './worker.js'
4b628b48 26
60664f48
JB
27/**
28 * Worker node.
60664f48
JB
29 * @typeParam Worker - Type of worker.
30 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
31 */
4b628b48 32export class WorkerNode<Worker extends IWorker, Data = unknown>
e1c2dba7 33 extends EventEmitter
9f95d5eb 34 implements IWorkerNode<Worker, Data> {
671d5154 35 /** @inheritdoc */
4b628b48 36 public readonly worker: Worker
671d5154 37 /** @inheritdoc */
4b628b48 38 public readonly info: WorkerInfo
671d5154 39 /** @inheritdoc */
4b628b48 40 public usage: WorkerUsage
20c6f652 41 /** @inheritdoc */
f3a91bac
JB
42 public strategyData?: StrategyData
43 /** @inheritdoc */
26fb3c18
JB
44 public messageChannel?: MessageChannel
45 /** @inheritdoc */
20c6f652 46 public tasksQueueBackPressureSize: number
95d1a734 47 private readonly tasksQueue: PriorityQueue<Task<Data>>
2eee7220 48 private setBackPressureFlag: boolean
26fb3c18 49 private readonly taskFunctionsUsage: Map<string, WorkerUsage>
4b628b48 50
60664f48
JB
51 /**
52 * Constructs a new worker node.
c3719753 53 * @param type - The worker type.
9974369e 54 * @param filePath - Path to the worker file.
c3719753 55 * @param opts - The worker node options.
60664f48 56 */
c3719753 57 constructor (type: WorkerType, filePath: string, opts: WorkerNodeOptions) {
9f95d5eb 58 super()
c3719753
JB
59 checkWorkerNodeArguments(type, filePath, opts)
60 this.worker = createWorker<Worker>(type, filePath, {
61 env: opts.env,
3a502712 62 workerOptions: opts.workerOptions,
c3719753
JB
63 })
64 this.info = this.initWorkerInfo(this.worker)
26fb3c18 65 this.usage = this.initWorkerUsage()
75de9f41 66 if (this.info.type === WorkerTypes.thread) {
7884d183
JB
67 this.messageChannel = new MessageChannel()
68 }
c63a35a0
JB
69 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
70 this.tasksQueueBackPressureSize = opts.tasksQueueBackPressureSize!
fcfc3353
JB
71 this.tasksQueue = new PriorityQueue<Task<Data>>(
72 opts.tasksQueueBucketSize,
73 opts.tasksQueuePriority
74 )
2eee7220 75 this.setBackPressureFlag = false
26fb3c18 76 this.taskFunctionsUsage = new Map<string, WorkerUsage>()
4b628b48
JB
77 }
78
fcfc3353
JB
79 /** @inheritdoc */
80 public setTasksQueuePriority (enablePriority: boolean): void {
81 this.tasksQueue.enablePriority = enablePriority
82 }
83
4b628b48
JB
84 /** @inheritdoc */
85 public tasksQueueSize (): number {
86 return this.tasksQueue.size
87 }
88
4b628b48
JB
89 /** @inheritdoc */
90 public enqueueTask (task: Task<Data>): number {
95d1a734 91 const tasksQueueSize = this.tasksQueue.enqueue(task, task.priority)
2eee7220
JB
92 if (
93 !this.setBackPressureFlag &&
94 this.hasBackPressure() &&
95 !this.info.backPressure
96 ) {
97 this.setBackPressureFlag = true
98 this.info.backPressure = true
7f0e1334 99 this.emit('backPressure', { workerId: this.info.id })
37d821df 100 this.setBackPressureFlag = false
72695f86
JB
101 }
102 return tasksQueueSize
103 }
104
105 /** @inheritdoc */
95d1a734 106 public dequeueTask (bucket?: number): Task<Data> | undefined {
2eee7220
JB
107 const task = this.tasksQueue.dequeue(bucket)
108 if (
109 !this.setBackPressureFlag &&
110 !this.hasBackPressure() &&
111 this.info.backPressure
112 ) {
113 this.setBackPressureFlag = true
114 this.info.backPressure = false
37d821df 115 this.setBackPressureFlag = false
2eee7220 116 }
2eee7220 117 return task
72695f86
JB
118 }
119
0d4e88b3 120 /** @inheritdoc */
2eee7220 121 public dequeueLastPrioritizedTask (): Task<Data> | undefined {
0d4e88b3 122 // Start from the last empty or partially filled bucket
e75373e6 123 return this.dequeueTask(this.tasksQueue.buckets + 1)
0d4e88b3
JB
124 }
125
4b628b48
JB
126 /** @inheritdoc */
127 public clearTasksQueue (): void {
128 this.tasksQueue.clear()
129 }
130
671d5154
JB
131 /** @inheritdoc */
132 public hasBackPressure (): boolean {
8735b4e5 133 return this.tasksQueue.size >= this.tasksQueueBackPressureSize
671d5154
JB
134 }
135
3f09ed9f 136 /** @inheritdoc */
07e0c9e5
JB
137 public async terminate (): Promise<void> {
138 const waitWorkerExit = new Promise<void>(resolve => {
139 this.registerOnceWorkerEventHandler('exit', () => {
140 resolve()
141 })
142 })
143 this.closeMessageChannel()
144 this.removeAllListeners()
839b98b8
JB
145 switch (this.info.type) {
146 case WorkerTypes.thread:
d20cde84 147 this.worker.unref?.()
839b98b8
JB
148 await this.worker.terminate?.()
149 break
150 case WorkerTypes.cluster:
151 this.registerOnceWorkerEventHandler('disconnect', () => {
152 this.worker.kill?.()
153 })
154 this.worker.disconnect?.()
155 break
3f09ed9f 156 }
07e0c9e5 157 await waitWorkerExit
3f09ed9f
JB
158 }
159
c3719753
JB
160 /** @inheritdoc */
161 public registerWorkerEventHandler (
162 event: string,
3bcbd4c5 163 handler: EventHandler<Worker>
c3719753 164 ): void {
88af9bf1 165 this.worker.on(event, handler)
c3719753
JB
166 }
167
168 /** @inheritdoc */
169 public registerOnceWorkerEventHandler (
170 event: string,
3bcbd4c5 171 handler: EventHandler<Worker>
c3719753 172 ): void {
88af9bf1 173 this.worker.once(event, handler)
c3719753
JB
174 }
175
ff128cc9 176 /** @inheritdoc */
db0e38ee 177 public getTaskFunctionWorkerUsage (name: string): WorkerUsage | undefined {
31847469 178 if (!Array.isArray(this.info.taskFunctionsProperties)) {
71b2b6d8 179 throw new Error(
31847469 180 `Cannot get task function worker usage for task function name '${name}' when task function properties list is not yet defined`
71b2b6d8
JB
181 )
182 }
b558f6b5 183 if (
31847469
JB
184 Array.isArray(this.info.taskFunctionsProperties) &&
185 this.info.taskFunctionsProperties.length < 3
b558f6b5 186 ) {
db0e38ee 187 throw new Error(
31847469 188 `Cannot get task function worker usage for task function name '${name}' when task function properties list has less than 3 elements`
db0e38ee
JB
189 )
190 }
191 if (name === DEFAULT_TASK_NAME) {
31847469 192 name = this.info.taskFunctionsProperties[1].name
b558f6b5 193 }
db0e38ee
JB
194 if (!this.taskFunctionsUsage.has(name)) {
195 this.taskFunctionsUsage.set(name, this.initTaskFunctionWorkerUsage(name))
ff128cc9 196 }
db0e38ee 197 return this.taskFunctionsUsage.get(name)
4b628b48
JB
198 }
199
adee6053
JB
200 /** @inheritdoc */
201 public deleteTaskFunctionWorkerUsage (name: string): boolean {
202 return this.taskFunctionsUsage.delete(name)
203 }
204
07e0c9e5
JB
205 private closeMessageChannel (): void {
206 if (this.messageChannel != null) {
207 this.messageChannel.port1.unref()
208 this.messageChannel.port2.unref()
209 this.messageChannel.port1.close()
210 this.messageChannel.port2.close()
211 delete this.messageChannel
212 }
213 }
214
75de9f41 215 private initWorkerInfo (worker: Worker): WorkerInfo {
4b628b48 216 return {
75de9f41 217 id: getWorkerId(worker),
67f3f2d6
JB
218 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
219 type: getWorkerType(worker)!,
4b628b48 220 dynamic: false,
5eb72b9e 221 ready: false,
85b553ba 222 stealing: false,
3a502712 223 backPressure: false,
4b628b48
JB
224 }
225 }
226
227 private initWorkerUsage (): WorkerUsage {
228 const getTasksQueueSize = (): number => {
dd951876 229 return this.tasksQueue.size
4b628b48 230 }
bf4ef2ca 231 const getTasksQueueMaxSize = (): number => {
dd951876 232 return this.tasksQueue.maxSize
4b628b48
JB
233 }
234 return {
235 tasks: {
236 executed: 0,
237 executing: 0,
238 get queued (): number {
239 return getTasksQueueSize()
240 },
241 get maxQueued (): number {
bf4ef2ca 242 return getTasksQueueMaxSize()
4b628b48 243 },
463226a4 244 sequentiallyStolen: 0,
68cbdc84 245 stolen: 0,
3a502712 246 failed: 0,
4b628b48
JB
247 },
248 runTime: {
3a502712 249 history: new CircularBuffer(MeasurementHistorySize),
4b628b48
JB
250 },
251 waitTime: {
3a502712 252 history: new CircularBuffer(MeasurementHistorySize),
4b628b48
JB
253 },
254 elu: {
255 idle: {
3a502712 256 history: new CircularBuffer(MeasurementHistorySize),
4b628b48
JB
257 },
258 active: {
3a502712
JB
259 history: new CircularBuffer(MeasurementHistorySize),
260 },
261 },
4b628b48
JB
262 }
263 }
264
db0e38ee 265 private initTaskFunctionWorkerUsage (name: string): WorkerUsage {
e5ece61d
JB
266 const getTaskFunctionQueueSize = (): number => {
267 let taskFunctionQueueSize = 0
b25a42cd 268 for (const task of this.tasksQueue) {
dd92a715 269 if (
e5ece61d 270 (task.name === DEFAULT_TASK_NAME &&
67f3f2d6 271 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
31847469 272 name === this.info.taskFunctionsProperties![1].name) ||
e5ece61d 273 (task.name !== DEFAULT_TASK_NAME && name === task.name)
dd92a715 274 ) {
e5ece61d 275 ++taskFunctionQueueSize
b25a42cd
JB
276 }
277 }
e5ece61d 278 return taskFunctionQueueSize
b25a42cd
JB
279 }
280 return {
281 tasks: {
282 executed: 0,
283 executing: 0,
284 get queued (): number {
e5ece61d 285 return getTaskFunctionQueueSize()
b25a42cd 286 },
463226a4 287 sequentiallyStolen: 0,
68cbdc84 288 stolen: 0,
3a502712 289 failed: 0,
b25a42cd
JB
290 },
291 runTime: {
3a502712 292 history: new CircularBuffer(MeasurementHistorySize),
b25a42cd
JB
293 },
294 waitTime: {
3a502712 295 history: new CircularBuffer(MeasurementHistorySize),
b25a42cd
JB
296 },
297 elu: {
298 idle: {
3a502712 299 history: new CircularBuffer(MeasurementHistorySize),
b25a42cd
JB
300 },
301 active: {
3a502712
JB
302 history: new CircularBuffer(MeasurementHistorySize),
303 },
304 },
b25a42cd
JB
305 }
306 }
4b628b48 307}