feat: implement k-priority queue
[poolifier.git] / src / pools / worker-node.ts
CommitLineData
e1c2dba7 1import { EventEmitter } from 'node:events'
ded253e2
JB
2import { MessageChannel } from 'node:worker_threads'
3
d35e5717 4import { CircularArray } from '../circular-array.js'
ded253e2 5import { Deque } from '../deque.js'
e41b0271 6import { PriorityQueue } from '../priority-queue.js'
d35e5717 7import type { Task } from '../utility-types.js'
e9ed6eee 8import { DEFAULT_TASK_NAME } from '../utils.js'
ded253e2
JB
9import {
10 checkWorkerNodeArguments,
11 createWorker,
12 getWorkerId,
13 getWorkerType
14} from './utils.js'
4b628b48 15import {
3bcbd4c5 16 type EventHandler,
4b628b48
JB
17 type IWorker,
18 type IWorkerNode,
f3a91bac 19 type StrategyData,
4b628b48 20 type WorkerInfo,
c3719753 21 type WorkerNodeOptions,
4b628b48
JB
22 type WorkerType,
23 WorkerTypes,
24 type WorkerUsage
d35e5717 25} from './worker.js'
4b628b48 26
60664f48
JB
27/**
28 * Worker node.
29 *
30 * @typeParam Worker - Type of worker.
31 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
32 */
4b628b48 33export class WorkerNode<Worker extends IWorker, Data = unknown>
e1c2dba7 34 extends EventEmitter
9f95d5eb 35 implements IWorkerNode<Worker, Data> {
671d5154 36 /** @inheritdoc */
4b628b48 37 public readonly worker: Worker
671d5154 38 /** @inheritdoc */
4b628b48 39 public readonly info: WorkerInfo
671d5154 40 /** @inheritdoc */
4b628b48 41 public usage: WorkerUsage
20c6f652 42 /** @inheritdoc */
f3a91bac
JB
43 public strategyData?: StrategyData
44 /** @inheritdoc */
26fb3c18
JB
45 public messageChannel?: MessageChannel
46 /** @inheritdoc */
20c6f652 47 public tasksQueueBackPressureSize: number
574b351d 48 private readonly tasksQueue: Deque<Task<Data>>
e41b0271 49 private readonly tasksQueue2 = new PriorityQueue<Task<Data>>()
47352846 50 private onBackPressureStarted: boolean
26fb3c18 51 private readonly taskFunctionsUsage: Map<string, WorkerUsage>
4b628b48 52
60664f48
JB
53 /**
54 * Constructs a new worker node.
55 *
c3719753 56 * @param type - The worker type.
9974369e 57 * @param filePath - Path to the worker file.
c3719753 58 * @param opts - The worker node options.
60664f48 59 */
c3719753 60 constructor (type: WorkerType, filePath: string, opts: WorkerNodeOptions) {
9f95d5eb 61 super()
c3719753
JB
62 checkWorkerNodeArguments(type, filePath, opts)
63 this.worker = createWorker<Worker>(type, filePath, {
64 env: opts.env,
65 workerOptions: opts.workerOptions
66 })
67 this.info = this.initWorkerInfo(this.worker)
26fb3c18 68 this.usage = this.initWorkerUsage()
75de9f41 69 if (this.info.type === WorkerTypes.thread) {
7884d183
JB
70 this.messageChannel = new MessageChannel()
71 }
c63a35a0
JB
72 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
73 this.tasksQueueBackPressureSize = opts.tasksQueueBackPressureSize!
26fb3c18 74 this.tasksQueue = new Deque<Task<Data>>()
47352846 75 this.onBackPressureStarted = false
26fb3c18 76 this.taskFunctionsUsage = new Map<string, WorkerUsage>()
4b628b48
JB
77 }
78
79 /** @inheritdoc */
80 public tasksQueueSize (): number {
81 return this.tasksQueue.size
82 }
83
4b628b48
JB
84 /** @inheritdoc */
85 public enqueueTask (task: Task<Data>): number {
72695f86 86 const tasksQueueSize = this.tasksQueue.push(task)
9f95d5eb 87 if (this.hasBackPressure() && !this.onBackPressureStarted) {
47352846 88 this.onBackPressureStarted = true
7f0e1334 89 this.emit('backPressure', { workerId: this.info.id })
47352846 90 this.onBackPressureStarted = false
72695f86
JB
91 }
92 return tasksQueueSize
93 }
94
95 /** @inheritdoc */
96 public unshiftTask (task: Task<Data>): number {
97 const tasksQueueSize = this.tasksQueue.unshift(task)
9f95d5eb 98 if (this.hasBackPressure() && !this.onBackPressureStarted) {
47352846 99 this.onBackPressureStarted = true
7f0e1334 100 this.emit('backPressure', { workerId: this.info.id })
47352846 101 this.onBackPressureStarted = false
72695f86
JB
102 }
103 return tasksQueueSize
4b628b48
JB
104 }
105
106 /** @inheritdoc */
107 public dequeueTask (): Task<Data> | undefined {
463226a4 108 return this.tasksQueue.shift()
4b628b48
JB
109 }
110
72695f86
JB
111 /** @inheritdoc */
112 public popTask (): Task<Data> | undefined {
463226a4 113 return this.tasksQueue.pop()
72695f86
JB
114 }
115
4b628b48
JB
116 /** @inheritdoc */
117 public clearTasksQueue (): void {
118 this.tasksQueue.clear()
119 }
120
671d5154
JB
121 /** @inheritdoc */
122 public hasBackPressure (): boolean {
8735b4e5 123 return this.tasksQueue.size >= this.tasksQueueBackPressureSize
671d5154
JB
124 }
125
3f09ed9f 126 /** @inheritdoc */
07e0c9e5
JB
127 public async terminate (): Promise<void> {
128 const waitWorkerExit = new Promise<void>(resolve => {
129 this.registerOnceWorkerEventHandler('exit', () => {
130 resolve()
131 })
132 })
133 this.closeMessageChannel()
134 this.removeAllListeners()
839b98b8
JB
135 switch (this.info.type) {
136 case WorkerTypes.thread:
d20cde84 137 this.worker.unref?.()
839b98b8
JB
138 await this.worker.terminate?.()
139 break
140 case WorkerTypes.cluster:
141 this.registerOnceWorkerEventHandler('disconnect', () => {
142 this.worker.kill?.()
143 })
144 this.worker.disconnect?.()
145 break
3f09ed9f 146 }
07e0c9e5 147 await waitWorkerExit
3f09ed9f
JB
148 }
149
c3719753
JB
150 /** @inheritdoc */
151 public registerWorkerEventHandler (
152 event: string,
3bcbd4c5 153 handler: EventHandler<Worker>
c3719753 154 ): void {
88af9bf1 155 this.worker.on(event, handler)
c3719753
JB
156 }
157
158 /** @inheritdoc */
159 public registerOnceWorkerEventHandler (
160 event: string,
3bcbd4c5 161 handler: EventHandler<Worker>
c3719753 162 ): void {
88af9bf1 163 this.worker.once(event, handler)
c3719753
JB
164 }
165
ff128cc9 166 /** @inheritdoc */
db0e38ee 167 public getTaskFunctionWorkerUsage (name: string): WorkerUsage | undefined {
31847469 168 if (!Array.isArray(this.info.taskFunctionsProperties)) {
71b2b6d8 169 throw new Error(
31847469 170 `Cannot get task function worker usage for task function name '${name}' when task function properties list is not yet defined`
71b2b6d8
JB
171 )
172 }
b558f6b5 173 if (
31847469
JB
174 Array.isArray(this.info.taskFunctionsProperties) &&
175 this.info.taskFunctionsProperties.length < 3
b558f6b5 176 ) {
db0e38ee 177 throw new Error(
31847469 178 `Cannot get task function worker usage for task function name '${name}' when task function properties list has less than 3 elements`
db0e38ee
JB
179 )
180 }
181 if (name === DEFAULT_TASK_NAME) {
31847469 182 name = this.info.taskFunctionsProperties[1].name
b558f6b5 183 }
db0e38ee
JB
184 if (!this.taskFunctionsUsage.has(name)) {
185 this.taskFunctionsUsage.set(name, this.initTaskFunctionWorkerUsage(name))
ff128cc9 186 }
db0e38ee 187 return this.taskFunctionsUsage.get(name)
4b628b48
JB
188 }
189
adee6053
JB
190 /** @inheritdoc */
191 public deleteTaskFunctionWorkerUsage (name: string): boolean {
192 return this.taskFunctionsUsage.delete(name)
193 }
194
07e0c9e5
JB
195 private closeMessageChannel (): void {
196 if (this.messageChannel != null) {
197 this.messageChannel.port1.unref()
198 this.messageChannel.port2.unref()
199 this.messageChannel.port1.close()
200 this.messageChannel.port2.close()
201 delete this.messageChannel
202 }
203 }
204
75de9f41 205 private initWorkerInfo (worker: Worker): WorkerInfo {
4b628b48 206 return {
75de9f41 207 id: getWorkerId(worker),
67f3f2d6
JB
208 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
209 type: getWorkerType(worker)!,
4b628b48 210 dynamic: false,
5eb72b9e
JB
211 ready: false,
212 stealing: false
4b628b48
JB
213 }
214 }
215
216 private initWorkerUsage (): WorkerUsage {
217 const getTasksQueueSize = (): number => {
dd951876 218 return this.tasksQueue.size
4b628b48 219 }
bf4ef2ca 220 const getTasksQueueMaxSize = (): number => {
dd951876 221 return this.tasksQueue.maxSize
4b628b48
JB
222 }
223 return {
224 tasks: {
225 executed: 0,
226 executing: 0,
227 get queued (): number {
228 return getTasksQueueSize()
229 },
230 get maxQueued (): number {
bf4ef2ca 231 return getTasksQueueMaxSize()
4b628b48 232 },
463226a4 233 sequentiallyStolen: 0,
68cbdc84 234 stolen: 0,
4b628b48
JB
235 failed: 0
236 },
237 runTime: {
c52475b8 238 history: new CircularArray<number>()
4b628b48
JB
239 },
240 waitTime: {
c52475b8 241 history: new CircularArray<number>()
4b628b48
JB
242 },
243 elu: {
244 idle: {
c52475b8 245 history: new CircularArray<number>()
4b628b48
JB
246 },
247 active: {
c52475b8 248 history: new CircularArray<number>()
4b628b48
JB
249 }
250 }
251 }
252 }
253
db0e38ee 254 private initTaskFunctionWorkerUsage (name: string): WorkerUsage {
e5ece61d
JB
255 const getTaskFunctionQueueSize = (): number => {
256 let taskFunctionQueueSize = 0
b25a42cd 257 for (const task of this.tasksQueue) {
dd92a715 258 if (
e5ece61d 259 (task.name === DEFAULT_TASK_NAME &&
67f3f2d6 260 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
31847469 261 name === this.info.taskFunctionsProperties![1].name) ||
e5ece61d 262 (task.name !== DEFAULT_TASK_NAME && name === task.name)
dd92a715 263 ) {
e5ece61d 264 ++taskFunctionQueueSize
b25a42cd
JB
265 }
266 }
e5ece61d 267 return taskFunctionQueueSize
b25a42cd
JB
268 }
269 return {
270 tasks: {
271 executed: 0,
272 executing: 0,
273 get queued (): number {
e5ece61d 274 return getTaskFunctionQueueSize()
b25a42cd 275 },
463226a4 276 sequentiallyStolen: 0,
68cbdc84 277 stolen: 0,
b25a42cd
JB
278 failed: 0
279 },
280 runTime: {
c52475b8 281 history: new CircularArray<number>()
b25a42cd
JB
282 },
283 waitTime: {
c52475b8 284 history: new CircularArray<number>()
b25a42cd
JB
285 },
286 elu: {
287 idle: {
c52475b8 288 history: new CircularArray<number>()
b25a42cd
JB
289 },
290 active: {
c52475b8 291 history: new CircularArray<number>()
b25a42cd
JB
292 }
293 }
294 }
295 }
4b628b48 296}