feat: use priority queue for task queueing
[poolifier.git] / src / pools / worker-node.ts
CommitLineData
e1c2dba7 1import { EventEmitter } from 'node:events'
ded253e2
JB
2import { MessageChannel } from 'node:worker_threads'
3
d35e5717 4import { CircularArray } from '../circular-array.js'
e41b0271 5import { PriorityQueue } from '../priority-queue.js'
d35e5717 6import type { Task } from '../utility-types.js'
e9ed6eee 7import { DEFAULT_TASK_NAME } from '../utils.js'
ded253e2
JB
8import {
9 checkWorkerNodeArguments,
10 createWorker,
11 getWorkerId,
12 getWorkerType
13} from './utils.js'
4b628b48 14import {
3bcbd4c5 15 type EventHandler,
4b628b48
JB
16 type IWorker,
17 type IWorkerNode,
f3a91bac 18 type StrategyData,
4b628b48 19 type WorkerInfo,
c3719753 20 type WorkerNodeOptions,
4b628b48
JB
21 type WorkerType,
22 WorkerTypes,
23 type WorkerUsage
d35e5717 24} from './worker.js'
4b628b48 25
60664f48
JB
26/**
27 * Worker node.
28 *
29 * @typeParam Worker - Type of worker.
30 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
31 */
4b628b48 32export class WorkerNode<Worker extends IWorker, Data = unknown>
e1c2dba7 33 extends EventEmitter
9f95d5eb 34 implements IWorkerNode<Worker, Data> {
671d5154 35 /** @inheritdoc */
4b628b48 36 public readonly worker: Worker
671d5154 37 /** @inheritdoc */
4b628b48 38 public readonly info: WorkerInfo
671d5154 39 /** @inheritdoc */
4b628b48 40 public usage: WorkerUsage
20c6f652 41 /** @inheritdoc */
f3a91bac
JB
42 public strategyData?: StrategyData
43 /** @inheritdoc */
26fb3c18
JB
44 public messageChannel?: MessageChannel
45 /** @inheritdoc */
20c6f652 46 public tasksQueueBackPressureSize: number
95d1a734 47 private readonly tasksQueue: PriorityQueue<Task<Data>>
47352846 48 private onBackPressureStarted: boolean
26fb3c18 49 private readonly taskFunctionsUsage: Map<string, WorkerUsage>
4b628b48 50
60664f48
JB
51 /**
52 * Constructs a new worker node.
53 *
c3719753 54 * @param type - The worker type.
9974369e 55 * @param filePath - Path to the worker file.
c3719753 56 * @param opts - The worker node options.
60664f48 57 */
c3719753 58 constructor (type: WorkerType, filePath: string, opts: WorkerNodeOptions) {
9f95d5eb 59 super()
c3719753
JB
60 checkWorkerNodeArguments(type, filePath, opts)
61 this.worker = createWorker<Worker>(type, filePath, {
62 env: opts.env,
63 workerOptions: opts.workerOptions
64 })
65 this.info = this.initWorkerInfo(this.worker)
26fb3c18 66 this.usage = this.initWorkerUsage()
75de9f41 67 if (this.info.type === WorkerTypes.thread) {
7884d183
JB
68 this.messageChannel = new MessageChannel()
69 }
c63a35a0
JB
70 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
71 this.tasksQueueBackPressureSize = opts.tasksQueueBackPressureSize!
95d1a734 72 this.tasksQueue = new PriorityQueue<Task<Data>>(opts.tasksQueueBucketSize)
47352846 73 this.onBackPressureStarted = false
26fb3c18 74 this.taskFunctionsUsage = new Map<string, WorkerUsage>()
4b628b48
JB
75 }
76
77 /** @inheritdoc */
78 public tasksQueueSize (): number {
79 return this.tasksQueue.size
80 }
81
4b628b48
JB
82 /** @inheritdoc */
83 public enqueueTask (task: Task<Data>): number {
95d1a734 84 const tasksQueueSize = this.tasksQueue.enqueue(task, task.priority)
9f95d5eb 85 if (this.hasBackPressure() && !this.onBackPressureStarted) {
47352846 86 this.onBackPressureStarted = true
7f0e1334 87 this.emit('backPressure', { workerId: this.info.id })
47352846 88 this.onBackPressureStarted = false
72695f86
JB
89 }
90 return tasksQueueSize
91 }
92
93 /** @inheritdoc */
95d1a734
JB
94 public dequeueTask (bucket?: number): Task<Data> | undefined {
95 return this.tasksQueue.dequeue(bucket)
72695f86
JB
96 }
97
4b628b48
JB
98 /** @inheritdoc */
99 public clearTasksQueue (): void {
100 this.tasksQueue.clear()
101 }
102
671d5154
JB
103 /** @inheritdoc */
104 public hasBackPressure (): boolean {
8735b4e5 105 return this.tasksQueue.size >= this.tasksQueueBackPressureSize
671d5154
JB
106 }
107
3f09ed9f 108 /** @inheritdoc */
07e0c9e5
JB
109 public async terminate (): Promise<void> {
110 const waitWorkerExit = new Promise<void>(resolve => {
111 this.registerOnceWorkerEventHandler('exit', () => {
112 resolve()
113 })
114 })
115 this.closeMessageChannel()
116 this.removeAllListeners()
839b98b8
JB
117 switch (this.info.type) {
118 case WorkerTypes.thread:
d20cde84 119 this.worker.unref?.()
839b98b8
JB
120 await this.worker.terminate?.()
121 break
122 case WorkerTypes.cluster:
123 this.registerOnceWorkerEventHandler('disconnect', () => {
124 this.worker.kill?.()
125 })
126 this.worker.disconnect?.()
127 break
3f09ed9f 128 }
07e0c9e5 129 await waitWorkerExit
3f09ed9f
JB
130 }
131
c3719753
JB
132 /** @inheritdoc */
133 public registerWorkerEventHandler (
134 event: string,
3bcbd4c5 135 handler: EventHandler<Worker>
c3719753 136 ): void {
88af9bf1 137 this.worker.on(event, handler)
c3719753
JB
138 }
139
140 /** @inheritdoc */
141 public registerOnceWorkerEventHandler (
142 event: string,
3bcbd4c5 143 handler: EventHandler<Worker>
c3719753 144 ): void {
88af9bf1 145 this.worker.once(event, handler)
c3719753
JB
146 }
147
ff128cc9 148 /** @inheritdoc */
db0e38ee 149 public getTaskFunctionWorkerUsage (name: string): WorkerUsage | undefined {
31847469 150 if (!Array.isArray(this.info.taskFunctionsProperties)) {
71b2b6d8 151 throw new Error(
31847469 152 `Cannot get task function worker usage for task function name '${name}' when task function properties list is not yet defined`
71b2b6d8
JB
153 )
154 }
b558f6b5 155 if (
31847469
JB
156 Array.isArray(this.info.taskFunctionsProperties) &&
157 this.info.taskFunctionsProperties.length < 3
b558f6b5 158 ) {
db0e38ee 159 throw new Error(
31847469 160 `Cannot get task function worker usage for task function name '${name}' when task function properties list has less than 3 elements`
db0e38ee
JB
161 )
162 }
163 if (name === DEFAULT_TASK_NAME) {
31847469 164 name = this.info.taskFunctionsProperties[1].name
b558f6b5 165 }
db0e38ee
JB
166 if (!this.taskFunctionsUsage.has(name)) {
167 this.taskFunctionsUsage.set(name, this.initTaskFunctionWorkerUsage(name))
ff128cc9 168 }
db0e38ee 169 return this.taskFunctionsUsage.get(name)
4b628b48
JB
170 }
171
adee6053
JB
172 /** @inheritdoc */
173 public deleteTaskFunctionWorkerUsage (name: string): boolean {
174 return this.taskFunctionsUsage.delete(name)
175 }
176
07e0c9e5
JB
177 private closeMessageChannel (): void {
178 if (this.messageChannel != null) {
179 this.messageChannel.port1.unref()
180 this.messageChannel.port2.unref()
181 this.messageChannel.port1.close()
182 this.messageChannel.port2.close()
183 delete this.messageChannel
184 }
185 }
186
75de9f41 187 private initWorkerInfo (worker: Worker): WorkerInfo {
4b628b48 188 return {
75de9f41 189 id: getWorkerId(worker),
67f3f2d6
JB
190 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
191 type: getWorkerType(worker)!,
4b628b48 192 dynamic: false,
5eb72b9e
JB
193 ready: false,
194 stealing: false
4b628b48
JB
195 }
196 }
197
198 private initWorkerUsage (): WorkerUsage {
199 const getTasksQueueSize = (): number => {
dd951876 200 return this.tasksQueue.size
4b628b48 201 }
bf4ef2ca 202 const getTasksQueueMaxSize = (): number => {
dd951876 203 return this.tasksQueue.maxSize
4b628b48
JB
204 }
205 return {
206 tasks: {
207 executed: 0,
208 executing: 0,
209 get queued (): number {
210 return getTasksQueueSize()
211 },
212 get maxQueued (): number {
bf4ef2ca 213 return getTasksQueueMaxSize()
4b628b48 214 },
463226a4 215 sequentiallyStolen: 0,
68cbdc84 216 stolen: 0,
4b628b48
JB
217 failed: 0
218 },
219 runTime: {
c52475b8 220 history: new CircularArray<number>()
4b628b48
JB
221 },
222 waitTime: {
c52475b8 223 history: new CircularArray<number>()
4b628b48
JB
224 },
225 elu: {
226 idle: {
c52475b8 227 history: new CircularArray<number>()
4b628b48
JB
228 },
229 active: {
c52475b8 230 history: new CircularArray<number>()
4b628b48
JB
231 }
232 }
233 }
234 }
235
db0e38ee 236 private initTaskFunctionWorkerUsage (name: string): WorkerUsage {
e5ece61d
JB
237 const getTaskFunctionQueueSize = (): number => {
238 let taskFunctionQueueSize = 0
b25a42cd 239 for (const task of this.tasksQueue) {
dd92a715 240 if (
e5ece61d 241 (task.name === DEFAULT_TASK_NAME &&
67f3f2d6 242 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
31847469 243 name === this.info.taskFunctionsProperties![1].name) ||
e5ece61d 244 (task.name !== DEFAULT_TASK_NAME && name === task.name)
dd92a715 245 ) {
e5ece61d 246 ++taskFunctionQueueSize
b25a42cd
JB
247 }
248 }
e5ece61d 249 return taskFunctionQueueSize
b25a42cd
JB
250 }
251 return {
252 tasks: {
253 executed: 0,
254 executing: 0,
255 get queued (): number {
e5ece61d 256 return getTaskFunctionQueueSize()
b25a42cd 257 },
463226a4 258 sequentiallyStolen: 0,
68cbdc84 259 stolen: 0,
b25a42cd
JB
260 failed: 0
261 },
262 runTime: {
c52475b8 263 history: new CircularArray<number>()
b25a42cd
JB
264 },
265 waitTime: {
c52475b8 266 history: new CircularArray<number>()
b25a42cd
JB
267 },
268 elu: {
269 idle: {
c52475b8 270 history: new CircularArray<number>()
b25a42cd
JB
271 },
272 active: {
c52475b8 273 history: new CircularArray<number>()
b25a42cd
JB
274 }
275 }
276 }
277 }
4b628b48 278}