fix: guard worker node back pressure flag change
[poolifier.git] / src / pools / worker-node.ts
CommitLineData
e1c2dba7 1import { EventEmitter } from 'node:events'
ded253e2
JB
2import { MessageChannel } from 'node:worker_threads'
3
d35e5717 4import { CircularArray } from '../circular-array.js'
e41b0271 5import { PriorityQueue } from '../priority-queue.js'
d35e5717 6import type { Task } from '../utility-types.js'
e9ed6eee 7import { DEFAULT_TASK_NAME } from '../utils.js'
ded253e2
JB
8import {
9 checkWorkerNodeArguments,
10 createWorker,
11 getWorkerId,
12 getWorkerType
13} from './utils.js'
4b628b48 14import {
3bcbd4c5 15 type EventHandler,
4b628b48
JB
16 type IWorker,
17 type IWorkerNode,
f3a91bac 18 type StrategyData,
4b628b48 19 type WorkerInfo,
c3719753 20 type WorkerNodeOptions,
4b628b48
JB
21 type WorkerType,
22 WorkerTypes,
23 type WorkerUsage
d35e5717 24} from './worker.js'
4b628b48 25
60664f48
JB
26/**
27 * Worker node.
28 *
29 * @typeParam Worker - Type of worker.
30 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
31 */
4b628b48 32export class WorkerNode<Worker extends IWorker, Data = unknown>
e1c2dba7 33 extends EventEmitter
9f95d5eb 34 implements IWorkerNode<Worker, Data> {
671d5154 35 /** @inheritdoc */
4b628b48 36 public readonly worker: Worker
671d5154 37 /** @inheritdoc */
4b628b48 38 public readonly info: WorkerInfo
671d5154 39 /** @inheritdoc */
4b628b48 40 public usage: WorkerUsage
20c6f652 41 /** @inheritdoc */
f3a91bac
JB
42 public strategyData?: StrategyData
43 /** @inheritdoc */
26fb3c18
JB
44 public messageChannel?: MessageChannel
45 /** @inheritdoc */
20c6f652 46 public tasksQueueBackPressureSize: number
95d1a734 47 private readonly tasksQueue: PriorityQueue<Task<Data>>
2eee7220 48 private setBackPressureFlag: boolean
26fb3c18 49 private readonly taskFunctionsUsage: Map<string, WorkerUsage>
4b628b48 50
60664f48
JB
51 /**
52 * Constructs a new worker node.
53 *
c3719753 54 * @param type - The worker type.
9974369e 55 * @param filePath - Path to the worker file.
c3719753 56 * @param opts - The worker node options.
60664f48 57 */
c3719753 58 constructor (type: WorkerType, filePath: string, opts: WorkerNodeOptions) {
9f95d5eb 59 super()
c3719753
JB
60 checkWorkerNodeArguments(type, filePath, opts)
61 this.worker = createWorker<Worker>(type, filePath, {
62 env: opts.env,
63 workerOptions: opts.workerOptions
64 })
65 this.info = this.initWorkerInfo(this.worker)
26fb3c18 66 this.usage = this.initWorkerUsage()
75de9f41 67 if (this.info.type === WorkerTypes.thread) {
7884d183
JB
68 this.messageChannel = new MessageChannel()
69 }
c63a35a0
JB
70 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
71 this.tasksQueueBackPressureSize = opts.tasksQueueBackPressureSize!
95d1a734 72 this.tasksQueue = new PriorityQueue<Task<Data>>(opts.tasksQueueBucketSize)
2eee7220 73 this.setBackPressureFlag = false
26fb3c18 74 this.taskFunctionsUsage = new Map<string, WorkerUsage>()
4b628b48
JB
75 }
76
77 /** @inheritdoc */
78 public tasksQueueSize (): number {
79 return this.tasksQueue.size
80 }
81
4b628b48
JB
82 /** @inheritdoc */
83 public enqueueTask (task: Task<Data>): number {
95d1a734 84 const tasksQueueSize = this.tasksQueue.enqueue(task, task.priority)
2eee7220
JB
85 if (
86 !this.setBackPressureFlag &&
87 this.hasBackPressure() &&
88 !this.info.backPressure
89 ) {
90 this.setBackPressureFlag = true
91 this.info.backPressure = true
7f0e1334 92 this.emit('backPressure', { workerId: this.info.id })
37d821df 93 this.setBackPressureFlag = false
72695f86
JB
94 }
95 return tasksQueueSize
96 }
97
98 /** @inheritdoc */
95d1a734 99 public dequeueTask (bucket?: number): Task<Data> | undefined {
2eee7220
JB
100 const task = this.tasksQueue.dequeue(bucket)
101 if (
102 !this.setBackPressureFlag &&
103 !this.hasBackPressure() &&
104 this.info.backPressure
105 ) {
106 this.setBackPressureFlag = true
107 this.info.backPressure = false
37d821df 108 this.setBackPressureFlag = false
2eee7220 109 }
2eee7220 110 return task
72695f86
JB
111 }
112
0d4e88b3 113 /** @inheritdoc */
2eee7220 114 public dequeueLastPrioritizedTask (): Task<Data> | undefined {
0d4e88b3 115 // Start from the last empty or partially filled bucket
2eee7220 116 return this.dequeueTask(this.tasksQueue.buckets + 1)
0d4e88b3
JB
117 }
118
4b628b48
JB
119 /** @inheritdoc */
120 public clearTasksQueue (): void {
121 this.tasksQueue.clear()
122 }
123
671d5154
JB
124 /** @inheritdoc */
125 public hasBackPressure (): boolean {
8735b4e5 126 return this.tasksQueue.size >= this.tasksQueueBackPressureSize
671d5154
JB
127 }
128
3f09ed9f 129 /** @inheritdoc */
07e0c9e5
JB
130 public async terminate (): Promise<void> {
131 const waitWorkerExit = new Promise<void>(resolve => {
132 this.registerOnceWorkerEventHandler('exit', () => {
133 resolve()
134 })
135 })
136 this.closeMessageChannel()
137 this.removeAllListeners()
839b98b8
JB
138 switch (this.info.type) {
139 case WorkerTypes.thread:
d20cde84 140 this.worker.unref?.()
839b98b8
JB
141 await this.worker.terminate?.()
142 break
143 case WorkerTypes.cluster:
144 this.registerOnceWorkerEventHandler('disconnect', () => {
145 this.worker.kill?.()
146 })
147 this.worker.disconnect?.()
148 break
3f09ed9f 149 }
07e0c9e5 150 await waitWorkerExit
3f09ed9f
JB
151 }
152
c3719753
JB
153 /** @inheritdoc */
154 public registerWorkerEventHandler (
155 event: string,
3bcbd4c5 156 handler: EventHandler<Worker>
c3719753 157 ): void {
88af9bf1 158 this.worker.on(event, handler)
c3719753
JB
159 }
160
161 /** @inheritdoc */
162 public registerOnceWorkerEventHandler (
163 event: string,
3bcbd4c5 164 handler: EventHandler<Worker>
c3719753 165 ): void {
88af9bf1 166 this.worker.once(event, handler)
c3719753
JB
167 }
168
ff128cc9 169 /** @inheritdoc */
db0e38ee 170 public getTaskFunctionWorkerUsage (name: string): WorkerUsage | undefined {
31847469 171 if (!Array.isArray(this.info.taskFunctionsProperties)) {
71b2b6d8 172 throw new Error(
31847469 173 `Cannot get task function worker usage for task function name '${name}' when task function properties list is not yet defined`
71b2b6d8
JB
174 )
175 }
b558f6b5 176 if (
31847469
JB
177 Array.isArray(this.info.taskFunctionsProperties) &&
178 this.info.taskFunctionsProperties.length < 3
b558f6b5 179 ) {
db0e38ee 180 throw new Error(
31847469 181 `Cannot get task function worker usage for task function name '${name}' when task function properties list has less than 3 elements`
db0e38ee
JB
182 )
183 }
184 if (name === DEFAULT_TASK_NAME) {
31847469 185 name = this.info.taskFunctionsProperties[1].name
b558f6b5 186 }
db0e38ee
JB
187 if (!this.taskFunctionsUsage.has(name)) {
188 this.taskFunctionsUsage.set(name, this.initTaskFunctionWorkerUsage(name))
ff128cc9 189 }
db0e38ee 190 return this.taskFunctionsUsage.get(name)
4b628b48
JB
191 }
192
adee6053
JB
193 /** @inheritdoc */
194 public deleteTaskFunctionWorkerUsage (name: string): boolean {
195 return this.taskFunctionsUsage.delete(name)
196 }
197
07e0c9e5
JB
198 private closeMessageChannel (): void {
199 if (this.messageChannel != null) {
200 this.messageChannel.port1.unref()
201 this.messageChannel.port2.unref()
202 this.messageChannel.port1.close()
203 this.messageChannel.port2.close()
204 delete this.messageChannel
205 }
206 }
207
75de9f41 208 private initWorkerInfo (worker: Worker): WorkerInfo {
4b628b48 209 return {
75de9f41 210 id: getWorkerId(worker),
67f3f2d6
JB
211 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
212 type: getWorkerType(worker)!,
4b628b48 213 dynamic: false,
5eb72b9e 214 ready: false,
85b553ba
JB
215 stealing: false,
216 backPressure: false
4b628b48
JB
217 }
218 }
219
220 private initWorkerUsage (): WorkerUsage {
221 const getTasksQueueSize = (): number => {
dd951876 222 return this.tasksQueue.size
4b628b48 223 }
bf4ef2ca 224 const getTasksQueueMaxSize = (): number => {
dd951876 225 return this.tasksQueue.maxSize
4b628b48
JB
226 }
227 return {
228 tasks: {
229 executed: 0,
230 executing: 0,
231 get queued (): number {
232 return getTasksQueueSize()
233 },
234 get maxQueued (): number {
bf4ef2ca 235 return getTasksQueueMaxSize()
4b628b48 236 },
463226a4 237 sequentiallyStolen: 0,
68cbdc84 238 stolen: 0,
4b628b48
JB
239 failed: 0
240 },
241 runTime: {
c52475b8 242 history: new CircularArray<number>()
4b628b48
JB
243 },
244 waitTime: {
c52475b8 245 history: new CircularArray<number>()
4b628b48
JB
246 },
247 elu: {
248 idle: {
c52475b8 249 history: new CircularArray<number>()
4b628b48
JB
250 },
251 active: {
c52475b8 252 history: new CircularArray<number>()
4b628b48
JB
253 }
254 }
255 }
256 }
257
db0e38ee 258 private initTaskFunctionWorkerUsage (name: string): WorkerUsage {
e5ece61d
JB
259 const getTaskFunctionQueueSize = (): number => {
260 let taskFunctionQueueSize = 0
b25a42cd 261 for (const task of this.tasksQueue) {
dd92a715 262 if (
e5ece61d 263 (task.name === DEFAULT_TASK_NAME &&
67f3f2d6 264 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
31847469 265 name === this.info.taskFunctionsProperties![1].name) ||
e5ece61d 266 (task.name !== DEFAULT_TASK_NAME && name === task.name)
dd92a715 267 ) {
e5ece61d 268 ++taskFunctionQueueSize
b25a42cd
JB
269 }
270 }
e5ece61d 271 return taskFunctionQueueSize
b25a42cd
JB
272 }
273 return {
274 tasks: {
275 executed: 0,
276 executing: 0,
277 get queued (): number {
e5ece61d 278 return getTaskFunctionQueueSize()
b25a42cd 279 },
463226a4 280 sequentiallyStolen: 0,
68cbdc84 281 stolen: 0,
b25a42cd
JB
282 failed: 0
283 },
284 runTime: {
c52475b8 285 history: new CircularArray<number>()
b25a42cd
JB
286 },
287 waitTime: {
c52475b8 288 history: new CircularArray<number>()
b25a42cd
JB
289 },
290 elu: {
291 idle: {
c52475b8 292 history: new CircularArray<number>()
b25a42cd
JB
293 },
294 active: {
c52475b8 295 history: new CircularArray<number>()
b25a42cd
JB
296 }
297 }
298 }
299 }
4b628b48 300}