fix: validate worker node event to wait
[poolifier.git] / src / pools / worker-node.ts
1 import { EventEmitter } from 'node:events'
2 import { MessageChannel } from 'node:worker_threads'
3
4 import { CircularArray } from '../circular-array.js'
5 import { PriorityQueue } from '../priority-queue.js'
6 import type { Task } from '../utility-types.js'
7 import { DEFAULT_TASK_NAME } from '../utils.js'
8 import {
9 checkWorkerNodeArguments,
10 createWorker,
11 getWorkerId,
12 getWorkerType
13 } from './utils.js'
14 import {
15 type EventHandler,
16 type IWorker,
17 type IWorkerNode,
18 type StrategyData,
19 type WorkerInfo,
20 type WorkerNodeOptions,
21 type WorkerType,
22 WorkerTypes,
23 type WorkerUsage
24 } from './worker.js'
25
26 /**
27 * Worker node.
28 *
29 * @typeParam Worker - Type of worker.
30 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
31 */
32 export class WorkerNode<Worker extends IWorker, Data = unknown>
33 extends EventEmitter
34 implements IWorkerNode<Worker, Data> {
35 /** @inheritdoc */
36 public readonly worker: Worker
37 /** @inheritdoc */
38 public readonly info: WorkerInfo
39 /** @inheritdoc */
40 public usage: WorkerUsage
41 /** @inheritdoc */
42 public strategyData?: StrategyData
43 /** @inheritdoc */
44 public messageChannel?: MessageChannel
45 /** @inheritdoc */
46 public tasksQueueBackPressureSize: number
47 private readonly tasksQueue: PriorityQueue<Task<Data>>
48 private setBackPressureFlag: boolean
49 private readonly taskFunctionsUsage: Map<string, WorkerUsage>
50
51 /**
52 * Constructs a new worker node.
53 *
54 * @param type - The worker type.
55 * @param filePath - Path to the worker file.
56 * @param opts - The worker node options.
57 */
58 constructor (type: WorkerType, filePath: string, opts: WorkerNodeOptions) {
59 super()
60 checkWorkerNodeArguments(type, filePath, opts)
61 this.worker = createWorker<Worker>(type, filePath, {
62 env: opts.env,
63 workerOptions: opts.workerOptions
64 })
65 this.info = this.initWorkerInfo(this.worker)
66 this.usage = this.initWorkerUsage()
67 if (this.info.type === WorkerTypes.thread) {
68 this.messageChannel = new MessageChannel()
69 }
70 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
71 this.tasksQueueBackPressureSize = opts.tasksQueueBackPressureSize!
72 this.tasksQueue = new PriorityQueue<Task<Data>>(opts.tasksQueueBucketSize)
73 this.setBackPressureFlag = false
74 this.taskFunctionsUsage = new Map<string, WorkerUsage>()
75 }
76
77 /** @inheritdoc */
78 public tasksQueueSize (): number {
79 return this.tasksQueue.size
80 }
81
82 /** @inheritdoc */
83 public enqueueTask (task: Task<Data>): number {
84 const tasksQueueSize = this.tasksQueue.enqueue(task, task.priority)
85 if (
86 !this.setBackPressureFlag &&
87 this.hasBackPressure() &&
88 !this.info.backPressure
89 ) {
90 this.setBackPressureFlag = true
91 this.info.backPressure = true
92 this.emit('backPressure', { workerId: this.info.id })
93 this.setBackPressureFlag = false
94 }
95 return tasksQueueSize
96 }
97
98 /** @inheritdoc */
99 public dequeueTask (bucket?: number): Task<Data> | undefined {
100 const task = this.tasksQueue.dequeue(bucket)
101 if (
102 !this.setBackPressureFlag &&
103 !this.hasBackPressure() &&
104 this.info.backPressure
105 ) {
106 this.setBackPressureFlag = true
107 this.info.backPressure = false
108 this.setBackPressureFlag = false
109 }
110 return task
111 }
112
113 /** @inheritdoc */
114 public dequeueLastPrioritizedTask (): Task<Data> | undefined {
115 // Start from the last empty or partially filled bucket
116 return this.dequeueTask(this.tasksQueue.buckets + 1)
117 }
118
119 /** @inheritdoc */
120 public clearTasksQueue (): void {
121 this.tasksQueue.clear()
122 }
123
124 /** @inheritdoc */
125 public hasBackPressure (): boolean {
126 return this.tasksQueue.size >= this.tasksQueueBackPressureSize
127 }
128
129 /** @inheritdoc */
130 public async terminate (): Promise<void> {
131 const waitWorkerExit = new Promise<void>(resolve => {
132 this.registerOnceWorkerEventHandler('exit', () => {
133 resolve()
134 })
135 })
136 this.closeMessageChannel()
137 this.removeAllListeners()
138 switch (this.info.type) {
139 case WorkerTypes.thread:
140 this.worker.unref?.()
141 await this.worker.terminate?.()
142 break
143 case WorkerTypes.cluster:
144 this.registerOnceWorkerEventHandler('disconnect', () => {
145 this.worker.kill?.()
146 })
147 this.worker.disconnect?.()
148 break
149 }
150 await waitWorkerExit
151 }
152
153 /** @inheritdoc */
154 public registerWorkerEventHandler (
155 event: string,
156 handler: EventHandler<Worker>
157 ): void {
158 this.worker.on(event, handler)
159 }
160
161 /** @inheritdoc */
162 public registerOnceWorkerEventHandler (
163 event: string,
164 handler: EventHandler<Worker>
165 ): void {
166 this.worker.once(event, handler)
167 }
168
169 /** @inheritdoc */
170 public getTaskFunctionWorkerUsage (name: string): WorkerUsage | undefined {
171 if (!Array.isArray(this.info.taskFunctionsProperties)) {
172 throw new Error(
173 `Cannot get task function worker usage for task function name '${name}' when task function properties list is not yet defined`
174 )
175 }
176 if (
177 Array.isArray(this.info.taskFunctionsProperties) &&
178 this.info.taskFunctionsProperties.length < 3
179 ) {
180 throw new Error(
181 `Cannot get task function worker usage for task function name '${name}' when task function properties list has less than 3 elements`
182 )
183 }
184 if (name === DEFAULT_TASK_NAME) {
185 name = this.info.taskFunctionsProperties[1].name
186 }
187 if (!this.taskFunctionsUsage.has(name)) {
188 this.taskFunctionsUsage.set(name, this.initTaskFunctionWorkerUsage(name))
189 }
190 return this.taskFunctionsUsage.get(name)
191 }
192
193 /** @inheritdoc */
194 public deleteTaskFunctionWorkerUsage (name: string): boolean {
195 return this.taskFunctionsUsage.delete(name)
196 }
197
198 private closeMessageChannel (): void {
199 if (this.messageChannel != null) {
200 this.messageChannel.port1.unref()
201 this.messageChannel.port2.unref()
202 this.messageChannel.port1.close()
203 this.messageChannel.port2.close()
204 delete this.messageChannel
205 }
206 }
207
208 private initWorkerInfo (worker: Worker): WorkerInfo {
209 return {
210 id: getWorkerId(worker),
211 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
212 type: getWorkerType(worker)!,
213 dynamic: false,
214 ready: false,
215 stealing: false,
216 backPressure: false
217 }
218 }
219
220 private initWorkerUsage (): WorkerUsage {
221 const getTasksQueueSize = (): number => {
222 return this.tasksQueue.size
223 }
224 const getTasksQueueMaxSize = (): number => {
225 return this.tasksQueue.maxSize
226 }
227 return {
228 tasks: {
229 executed: 0,
230 executing: 0,
231 get queued (): number {
232 return getTasksQueueSize()
233 },
234 get maxQueued (): number {
235 return getTasksQueueMaxSize()
236 },
237 sequentiallyStolen: 0,
238 stolen: 0,
239 failed: 0
240 },
241 runTime: {
242 history: new CircularArray<number>()
243 },
244 waitTime: {
245 history: new CircularArray<number>()
246 },
247 elu: {
248 idle: {
249 history: new CircularArray<number>()
250 },
251 active: {
252 history: new CircularArray<number>()
253 }
254 }
255 }
256 }
257
258 private initTaskFunctionWorkerUsage (name: string): WorkerUsage {
259 const getTaskFunctionQueueSize = (): number => {
260 let taskFunctionQueueSize = 0
261 for (const task of this.tasksQueue) {
262 if (
263 (task.name === DEFAULT_TASK_NAME &&
264 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
265 name === this.info.taskFunctionsProperties![1].name) ||
266 (task.name !== DEFAULT_TASK_NAME && name === task.name)
267 ) {
268 ++taskFunctionQueueSize
269 }
270 }
271 return taskFunctionQueueSize
272 }
273 return {
274 tasks: {
275 executed: 0,
276 executing: 0,
277 get queued (): number {
278 return getTaskFunctionQueueSize()
279 },
280 sequentiallyStolen: 0,
281 stolen: 0,
282 failed: 0
283 },
284 runTime: {
285 history: new CircularArray<number>()
286 },
287 waitTime: {
288 history: new CircularArray<number>()
289 },
290 elu: {
291 idle: {
292 history: new CircularArray<number>()
293 },
294 active: {
295 history: new CircularArray<number>()
296 }
297 }
298 }
299 }
300 }