feat: use priority queue for task queueing
[poolifier.git] / src / pools / worker-node.ts
1 import { EventEmitter } from 'node:events'
2 import { MessageChannel } from 'node:worker_threads'
3
4 import { CircularArray } from '../circular-array.js'
5 import { PriorityQueue } from '../priority-queue.js'
6 import type { Task } from '../utility-types.js'
7 import { DEFAULT_TASK_NAME } from '../utils.js'
8 import {
9 checkWorkerNodeArguments,
10 createWorker,
11 getWorkerId,
12 getWorkerType
13 } from './utils.js'
14 import {
15 type EventHandler,
16 type IWorker,
17 type IWorkerNode,
18 type StrategyData,
19 type WorkerInfo,
20 type WorkerNodeOptions,
21 type WorkerType,
22 WorkerTypes,
23 type WorkerUsage
24 } from './worker.js'
25
26 /**
27 * Worker node.
28 *
29 * @typeParam Worker - Type of worker.
30 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
31 */
32 export class WorkerNode<Worker extends IWorker, Data = unknown>
33 extends EventEmitter
34 implements IWorkerNode<Worker, Data> {
35 /** @inheritdoc */
36 public readonly worker: Worker
37 /** @inheritdoc */
38 public readonly info: WorkerInfo
39 /** @inheritdoc */
40 public usage: WorkerUsage
41 /** @inheritdoc */
42 public strategyData?: StrategyData
43 /** @inheritdoc */
44 public messageChannel?: MessageChannel
45 /** @inheritdoc */
46 public tasksQueueBackPressureSize: number
47 private readonly tasksQueue: PriorityQueue<Task<Data>>
48 private onBackPressureStarted: boolean
49 private readonly taskFunctionsUsage: Map<string, WorkerUsage>
50
51 /**
52 * Constructs a new worker node.
53 *
54 * @param type - The worker type.
55 * @param filePath - Path to the worker file.
56 * @param opts - The worker node options.
57 */
58 constructor (type: WorkerType, filePath: string, opts: WorkerNodeOptions) {
59 super()
60 checkWorkerNodeArguments(type, filePath, opts)
61 this.worker = createWorker<Worker>(type, filePath, {
62 env: opts.env,
63 workerOptions: opts.workerOptions
64 })
65 this.info = this.initWorkerInfo(this.worker)
66 this.usage = this.initWorkerUsage()
67 if (this.info.type === WorkerTypes.thread) {
68 this.messageChannel = new MessageChannel()
69 }
70 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
71 this.tasksQueueBackPressureSize = opts.tasksQueueBackPressureSize!
72 this.tasksQueue = new PriorityQueue<Task<Data>>(opts.tasksQueueBucketSize)
73 this.onBackPressureStarted = false
74 this.taskFunctionsUsage = new Map<string, WorkerUsage>()
75 }
76
77 /** @inheritdoc */
78 public tasksQueueSize (): number {
79 return this.tasksQueue.size
80 }
81
82 /** @inheritdoc */
83 public enqueueTask (task: Task<Data>): number {
84 const tasksQueueSize = this.tasksQueue.enqueue(task, task.priority)
85 if (this.hasBackPressure() && !this.onBackPressureStarted) {
86 this.onBackPressureStarted = true
87 this.emit('backPressure', { workerId: this.info.id })
88 this.onBackPressureStarted = false
89 }
90 return tasksQueueSize
91 }
92
93 /** @inheritdoc */
94 public dequeueTask (bucket?: number): Task<Data> | undefined {
95 return this.tasksQueue.dequeue(bucket)
96 }
97
98 /** @inheritdoc */
99 public clearTasksQueue (): void {
100 this.tasksQueue.clear()
101 }
102
103 /** @inheritdoc */
104 public hasBackPressure (): boolean {
105 return this.tasksQueue.size >= this.tasksQueueBackPressureSize
106 }
107
108 /** @inheritdoc */
109 public async terminate (): Promise<void> {
110 const waitWorkerExit = new Promise<void>(resolve => {
111 this.registerOnceWorkerEventHandler('exit', () => {
112 resolve()
113 })
114 })
115 this.closeMessageChannel()
116 this.removeAllListeners()
117 switch (this.info.type) {
118 case WorkerTypes.thread:
119 this.worker.unref?.()
120 await this.worker.terminate?.()
121 break
122 case WorkerTypes.cluster:
123 this.registerOnceWorkerEventHandler('disconnect', () => {
124 this.worker.kill?.()
125 })
126 this.worker.disconnect?.()
127 break
128 }
129 await waitWorkerExit
130 }
131
132 /** @inheritdoc */
133 public registerWorkerEventHandler (
134 event: string,
135 handler: EventHandler<Worker>
136 ): void {
137 this.worker.on(event, handler)
138 }
139
140 /** @inheritdoc */
141 public registerOnceWorkerEventHandler (
142 event: string,
143 handler: EventHandler<Worker>
144 ): void {
145 this.worker.once(event, handler)
146 }
147
148 /** @inheritdoc */
149 public getTaskFunctionWorkerUsage (name: string): WorkerUsage | undefined {
150 if (!Array.isArray(this.info.taskFunctionsProperties)) {
151 throw new Error(
152 `Cannot get task function worker usage for task function name '${name}' when task function properties list is not yet defined`
153 )
154 }
155 if (
156 Array.isArray(this.info.taskFunctionsProperties) &&
157 this.info.taskFunctionsProperties.length < 3
158 ) {
159 throw new Error(
160 `Cannot get task function worker usage for task function name '${name}' when task function properties list has less than 3 elements`
161 )
162 }
163 if (name === DEFAULT_TASK_NAME) {
164 name = this.info.taskFunctionsProperties[1].name
165 }
166 if (!this.taskFunctionsUsage.has(name)) {
167 this.taskFunctionsUsage.set(name, this.initTaskFunctionWorkerUsage(name))
168 }
169 return this.taskFunctionsUsage.get(name)
170 }
171
172 /** @inheritdoc */
173 public deleteTaskFunctionWorkerUsage (name: string): boolean {
174 return this.taskFunctionsUsage.delete(name)
175 }
176
177 private closeMessageChannel (): void {
178 if (this.messageChannel != null) {
179 this.messageChannel.port1.unref()
180 this.messageChannel.port2.unref()
181 this.messageChannel.port1.close()
182 this.messageChannel.port2.close()
183 delete this.messageChannel
184 }
185 }
186
187 private initWorkerInfo (worker: Worker): WorkerInfo {
188 return {
189 id: getWorkerId(worker),
190 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
191 type: getWorkerType(worker)!,
192 dynamic: false,
193 ready: false,
194 stealing: false
195 }
196 }
197
198 private initWorkerUsage (): WorkerUsage {
199 const getTasksQueueSize = (): number => {
200 return this.tasksQueue.size
201 }
202 const getTasksQueueMaxSize = (): number => {
203 return this.tasksQueue.maxSize
204 }
205 return {
206 tasks: {
207 executed: 0,
208 executing: 0,
209 get queued (): number {
210 return getTasksQueueSize()
211 },
212 get maxQueued (): number {
213 return getTasksQueueMaxSize()
214 },
215 sequentiallyStolen: 0,
216 stolen: 0,
217 failed: 0
218 },
219 runTime: {
220 history: new CircularArray<number>()
221 },
222 waitTime: {
223 history: new CircularArray<number>()
224 },
225 elu: {
226 idle: {
227 history: new CircularArray<number>()
228 },
229 active: {
230 history: new CircularArray<number>()
231 }
232 }
233 }
234 }
235
236 private initTaskFunctionWorkerUsage (name: string): WorkerUsage {
237 const getTaskFunctionQueueSize = (): number => {
238 let taskFunctionQueueSize = 0
239 for (const task of this.tasksQueue) {
240 if (
241 (task.name === DEFAULT_TASK_NAME &&
242 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
243 name === this.info.taskFunctionsProperties![1].name) ||
244 (task.name !== DEFAULT_TASK_NAME && name === task.name)
245 ) {
246 ++taskFunctionQueueSize
247 }
248 }
249 return taskFunctionQueueSize
250 }
251 return {
252 tasks: {
253 executed: 0,
254 executing: 0,
255 get queued (): number {
256 return getTaskFunctionQueueSize()
257 },
258 sequentiallyStolen: 0,
259 stolen: 0,
260 failed: 0
261 },
262 runTime: {
263 history: new CircularArray<number>()
264 },
265 waitTime: {
266 history: new CircularArray<number>()
267 },
268 elu: {
269 idle: {
270 history: new CircularArray<number>()
271 },
272 active: {
273 history: new CircularArray<number>()
274 }
275 }
276 }
277 }
278 }