feat: implement k-priority queue
[poolifier.git] / src / pools / worker-node.ts
1 import { EventEmitter } from 'node:events'
2 import { MessageChannel } from 'node:worker_threads'
3
4 import { CircularArray } from '../circular-array.js'
5 import { Deque } from '../deque.js'
6 import { PriorityQueue } from '../priority-queue.js'
7 import type { Task } from '../utility-types.js'
8 import { DEFAULT_TASK_NAME } from '../utils.js'
9 import {
10 checkWorkerNodeArguments,
11 createWorker,
12 getWorkerId,
13 getWorkerType
14 } from './utils.js'
15 import {
16 type EventHandler,
17 type IWorker,
18 type IWorkerNode,
19 type StrategyData,
20 type WorkerInfo,
21 type WorkerNodeOptions,
22 type WorkerType,
23 WorkerTypes,
24 type WorkerUsage
25 } from './worker.js'
26
27 /**
28 * Worker node.
29 *
30 * @typeParam Worker - Type of worker.
31 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
32 */
33 export class WorkerNode<Worker extends IWorker, Data = unknown>
34 extends EventEmitter
35 implements IWorkerNode<Worker, Data> {
36 /** @inheritdoc */
37 public readonly worker: Worker
38 /** @inheritdoc */
39 public readonly info: WorkerInfo
40 /** @inheritdoc */
41 public usage: WorkerUsage
42 /** @inheritdoc */
43 public strategyData?: StrategyData
44 /** @inheritdoc */
45 public messageChannel?: MessageChannel
46 /** @inheritdoc */
47 public tasksQueueBackPressureSize: number
48 private readonly tasksQueue: Deque<Task<Data>>
49 private readonly tasksQueue2 = new PriorityQueue<Task<Data>>()
50 private onBackPressureStarted: boolean
51 private readonly taskFunctionsUsage: Map<string, WorkerUsage>
52
53 /**
54 * Constructs a new worker node.
55 *
56 * @param type - The worker type.
57 * @param filePath - Path to the worker file.
58 * @param opts - The worker node options.
59 */
60 constructor (type: WorkerType, filePath: string, opts: WorkerNodeOptions) {
61 super()
62 checkWorkerNodeArguments(type, filePath, opts)
63 this.worker = createWorker<Worker>(type, filePath, {
64 env: opts.env,
65 workerOptions: opts.workerOptions
66 })
67 this.info = this.initWorkerInfo(this.worker)
68 this.usage = this.initWorkerUsage()
69 if (this.info.type === WorkerTypes.thread) {
70 this.messageChannel = new MessageChannel()
71 }
72 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
73 this.tasksQueueBackPressureSize = opts.tasksQueueBackPressureSize!
74 this.tasksQueue = new Deque<Task<Data>>()
75 this.onBackPressureStarted = false
76 this.taskFunctionsUsage = new Map<string, WorkerUsage>()
77 }
78
79 /** @inheritdoc */
80 public tasksQueueSize (): number {
81 return this.tasksQueue.size
82 }
83
84 /** @inheritdoc */
85 public enqueueTask (task: Task<Data>): number {
86 const tasksQueueSize = this.tasksQueue.push(task)
87 if (this.hasBackPressure() && !this.onBackPressureStarted) {
88 this.onBackPressureStarted = true
89 this.emit('backPressure', { workerId: this.info.id })
90 this.onBackPressureStarted = false
91 }
92 return tasksQueueSize
93 }
94
95 /** @inheritdoc */
96 public unshiftTask (task: Task<Data>): number {
97 const tasksQueueSize = this.tasksQueue.unshift(task)
98 if (this.hasBackPressure() && !this.onBackPressureStarted) {
99 this.onBackPressureStarted = true
100 this.emit('backPressure', { workerId: this.info.id })
101 this.onBackPressureStarted = false
102 }
103 return tasksQueueSize
104 }
105
106 /** @inheritdoc */
107 public dequeueTask (): Task<Data> | undefined {
108 return this.tasksQueue.shift()
109 }
110
111 /** @inheritdoc */
112 public popTask (): Task<Data> | undefined {
113 return this.tasksQueue.pop()
114 }
115
116 /** @inheritdoc */
117 public clearTasksQueue (): void {
118 this.tasksQueue.clear()
119 }
120
121 /** @inheritdoc */
122 public hasBackPressure (): boolean {
123 return this.tasksQueue.size >= this.tasksQueueBackPressureSize
124 }
125
126 /** @inheritdoc */
127 public async terminate (): Promise<void> {
128 const waitWorkerExit = new Promise<void>(resolve => {
129 this.registerOnceWorkerEventHandler('exit', () => {
130 resolve()
131 })
132 })
133 this.closeMessageChannel()
134 this.removeAllListeners()
135 switch (this.info.type) {
136 case WorkerTypes.thread:
137 this.worker.unref?.()
138 await this.worker.terminate?.()
139 break
140 case WorkerTypes.cluster:
141 this.registerOnceWorkerEventHandler('disconnect', () => {
142 this.worker.kill?.()
143 })
144 this.worker.disconnect?.()
145 break
146 }
147 await waitWorkerExit
148 }
149
150 /** @inheritdoc */
151 public registerWorkerEventHandler (
152 event: string,
153 handler: EventHandler<Worker>
154 ): void {
155 this.worker.on(event, handler)
156 }
157
158 /** @inheritdoc */
159 public registerOnceWorkerEventHandler (
160 event: string,
161 handler: EventHandler<Worker>
162 ): void {
163 this.worker.once(event, handler)
164 }
165
166 /** @inheritdoc */
167 public getTaskFunctionWorkerUsage (name: string): WorkerUsage | undefined {
168 if (!Array.isArray(this.info.taskFunctionsProperties)) {
169 throw new Error(
170 `Cannot get task function worker usage for task function name '${name}' when task function properties list is not yet defined`
171 )
172 }
173 if (
174 Array.isArray(this.info.taskFunctionsProperties) &&
175 this.info.taskFunctionsProperties.length < 3
176 ) {
177 throw new Error(
178 `Cannot get task function worker usage for task function name '${name}' when task function properties list has less than 3 elements`
179 )
180 }
181 if (name === DEFAULT_TASK_NAME) {
182 name = this.info.taskFunctionsProperties[1].name
183 }
184 if (!this.taskFunctionsUsage.has(name)) {
185 this.taskFunctionsUsage.set(name, this.initTaskFunctionWorkerUsage(name))
186 }
187 return this.taskFunctionsUsage.get(name)
188 }
189
190 /** @inheritdoc */
191 public deleteTaskFunctionWorkerUsage (name: string): boolean {
192 return this.taskFunctionsUsage.delete(name)
193 }
194
195 private closeMessageChannel (): void {
196 if (this.messageChannel != null) {
197 this.messageChannel.port1.unref()
198 this.messageChannel.port2.unref()
199 this.messageChannel.port1.close()
200 this.messageChannel.port2.close()
201 delete this.messageChannel
202 }
203 }
204
205 private initWorkerInfo (worker: Worker): WorkerInfo {
206 return {
207 id: getWorkerId(worker),
208 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
209 type: getWorkerType(worker)!,
210 dynamic: false,
211 ready: false,
212 stealing: false
213 }
214 }
215
216 private initWorkerUsage (): WorkerUsage {
217 const getTasksQueueSize = (): number => {
218 return this.tasksQueue.size
219 }
220 const getTasksQueueMaxSize = (): number => {
221 return this.tasksQueue.maxSize
222 }
223 return {
224 tasks: {
225 executed: 0,
226 executing: 0,
227 get queued (): number {
228 return getTasksQueueSize()
229 },
230 get maxQueued (): number {
231 return getTasksQueueMaxSize()
232 },
233 sequentiallyStolen: 0,
234 stolen: 0,
235 failed: 0
236 },
237 runTime: {
238 history: new CircularArray<number>()
239 },
240 waitTime: {
241 history: new CircularArray<number>()
242 },
243 elu: {
244 idle: {
245 history: new CircularArray<number>()
246 },
247 active: {
248 history: new CircularArray<number>()
249 }
250 }
251 }
252 }
253
254 private initTaskFunctionWorkerUsage (name: string): WorkerUsage {
255 const getTaskFunctionQueueSize = (): number => {
256 let taskFunctionQueueSize = 0
257 for (const task of this.tasksQueue) {
258 if (
259 (task.name === DEFAULT_TASK_NAME &&
260 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
261 name === this.info.taskFunctionsProperties![1].name) ||
262 (task.name !== DEFAULT_TASK_NAME && name === task.name)
263 ) {
264 ++taskFunctionQueueSize
265 }
266 }
267 return taskFunctionQueueSize
268 }
269 return {
270 tasks: {
271 executed: 0,
272 executing: 0,
273 get queued (): number {
274 return getTaskFunctionQueueSize()
275 },
276 sequentiallyStolen: 0,
277 stolen: 0,
278 failed: 0
279 },
280 runTime: {
281 history: new CircularArray<number>()
282 },
283 waitTime: {
284 history: new CircularArray<number>()
285 },
286 elu: {
287 idle: {
288 history: new CircularArray<number>()
289 },
290 active: {
291 history: new CircularArray<number>()
292 }
293 }
294 }
295 }
296 }