docs: generate documentation
[poolifier.git] / src / pools / worker-node.ts
1 import { EventEmitter } from 'node:events'
2 import { MessageChannel } from 'node:worker_threads'
3
4 import { CircularArray } from '../circular-array.js'
5 import { PriorityQueue } from '../priority-queue.js'
6 import type { Task } from '../utility-types.js'
7 import { DEFAULT_TASK_NAME } from '../utils.js'
8 import {
9 checkWorkerNodeArguments,
10 createWorker,
11 getWorkerId,
12 getWorkerType
13 } from './utils.js'
14 import {
15 type EventHandler,
16 type IWorker,
17 type IWorkerNode,
18 type StrategyData,
19 type WorkerInfo,
20 type WorkerNodeOptions,
21 type WorkerType,
22 WorkerTypes,
23 type WorkerUsage
24 } from './worker.js'
25
26 /**
27 * Worker node.
28 *
29 * @typeParam Worker - Type of worker.
30 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
31 */
32 export class WorkerNode<Worker extends IWorker, Data = unknown>
33 extends EventEmitter
34 implements IWorkerNode<Worker, Data> {
35 /** @inheritdoc */
36 public readonly worker: Worker
37 /** @inheritdoc */
38 public readonly info: WorkerInfo
39 /** @inheritdoc */
40 public usage: WorkerUsage
41 /** @inheritdoc */
42 public strategyData?: StrategyData
43 /** @inheritdoc */
44 public messageChannel?: MessageChannel
45 /** @inheritdoc */
46 public tasksQueueBackPressureSize: number
47 private readonly tasksQueue: PriorityQueue<Task<Data>>
48 private onBackPressureStarted: boolean
49 private readonly taskFunctionsUsage: Map<string, WorkerUsage>
50
51 /**
52 * Constructs a new worker node.
53 *
54 * @param type - The worker type.
55 * @param filePath - Path to the worker file.
56 * @param opts - The worker node options.
57 */
58 constructor (type: WorkerType, filePath: string, opts: WorkerNodeOptions) {
59 super()
60 checkWorkerNodeArguments(type, filePath, opts)
61 this.worker = createWorker<Worker>(type, filePath, {
62 env: opts.env,
63 workerOptions: opts.workerOptions
64 })
65 this.info = this.initWorkerInfo(this.worker)
66 this.usage = this.initWorkerUsage()
67 if (this.info.type === WorkerTypes.thread) {
68 this.messageChannel = new MessageChannel()
69 }
70 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
71 this.tasksQueueBackPressureSize = opts.tasksQueueBackPressureSize!
72 this.tasksQueue = new PriorityQueue<Task<Data>>(opts.tasksQueueBucketSize)
73 this.onBackPressureStarted = false
74 this.taskFunctionsUsage = new Map<string, WorkerUsage>()
75 }
76
77 /** @inheritdoc */
78 public tasksQueueSize (): number {
79 return this.tasksQueue.size
80 }
81
82 /** @inheritdoc */
83 public enqueueTask (task: Task<Data>): number {
84 const tasksQueueSize = this.tasksQueue.enqueue(task, task.priority)
85 if (this.hasBackPressure() && !this.onBackPressureStarted) {
86 this.onBackPressureStarted = true
87 this.emit('backPressure', { workerId: this.info.id })
88 this.onBackPressureStarted = false
89 }
90 return tasksQueueSize
91 }
92
93 /** @inheritdoc */
94 public dequeueTask (bucket?: number): Task<Data> | undefined {
95 return this.tasksQueue.dequeue(bucket)
96 }
97
98 /** @inheritdoc */
99 public dequeueLastBucketTask (): Task<Data> | undefined {
100 // Start from the last empty or partially filled bucket
101 return this.tasksQueue.dequeue(this.tasksQueue.buckets + 1)
102 }
103
104 /** @inheritdoc */
105 public clearTasksQueue (): void {
106 this.tasksQueue.clear()
107 }
108
109 /** @inheritdoc */
110 public hasBackPressure (): boolean {
111 return this.tasksQueue.size >= this.tasksQueueBackPressureSize
112 }
113
114 /** @inheritdoc */
115 public async terminate (): Promise<void> {
116 const waitWorkerExit = new Promise<void>(resolve => {
117 this.registerOnceWorkerEventHandler('exit', () => {
118 resolve()
119 })
120 })
121 this.closeMessageChannel()
122 this.removeAllListeners()
123 switch (this.info.type) {
124 case WorkerTypes.thread:
125 this.worker.unref?.()
126 await this.worker.terminate?.()
127 break
128 case WorkerTypes.cluster:
129 this.registerOnceWorkerEventHandler('disconnect', () => {
130 this.worker.kill?.()
131 })
132 this.worker.disconnect?.()
133 break
134 }
135 await waitWorkerExit
136 }
137
138 /** @inheritdoc */
139 public registerWorkerEventHandler (
140 event: string,
141 handler: EventHandler<Worker>
142 ): void {
143 this.worker.on(event, handler)
144 }
145
146 /** @inheritdoc */
147 public registerOnceWorkerEventHandler (
148 event: string,
149 handler: EventHandler<Worker>
150 ): void {
151 this.worker.once(event, handler)
152 }
153
154 /** @inheritdoc */
155 public getTaskFunctionWorkerUsage (name: string): WorkerUsage | undefined {
156 if (!Array.isArray(this.info.taskFunctionsProperties)) {
157 throw new Error(
158 `Cannot get task function worker usage for task function name '${name}' when task function properties list is not yet defined`
159 )
160 }
161 if (
162 Array.isArray(this.info.taskFunctionsProperties) &&
163 this.info.taskFunctionsProperties.length < 3
164 ) {
165 throw new Error(
166 `Cannot get task function worker usage for task function name '${name}' when task function properties list has less than 3 elements`
167 )
168 }
169 if (name === DEFAULT_TASK_NAME) {
170 name = this.info.taskFunctionsProperties[1].name
171 }
172 if (!this.taskFunctionsUsage.has(name)) {
173 this.taskFunctionsUsage.set(name, this.initTaskFunctionWorkerUsage(name))
174 }
175 return this.taskFunctionsUsage.get(name)
176 }
177
178 /** @inheritdoc */
179 public deleteTaskFunctionWorkerUsage (name: string): boolean {
180 return this.taskFunctionsUsage.delete(name)
181 }
182
183 private closeMessageChannel (): void {
184 if (this.messageChannel != null) {
185 this.messageChannel.port1.unref()
186 this.messageChannel.port2.unref()
187 this.messageChannel.port1.close()
188 this.messageChannel.port2.close()
189 delete this.messageChannel
190 }
191 }
192
193 private initWorkerInfo (worker: Worker): WorkerInfo {
194 return {
195 id: getWorkerId(worker),
196 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
197 type: getWorkerType(worker)!,
198 dynamic: false,
199 ready: false,
200 stealing: false
201 }
202 }
203
204 private initWorkerUsage (): WorkerUsage {
205 const getTasksQueueSize = (): number => {
206 return this.tasksQueue.size
207 }
208 const getTasksQueueMaxSize = (): number => {
209 return this.tasksQueue.maxSize
210 }
211 return {
212 tasks: {
213 executed: 0,
214 executing: 0,
215 get queued (): number {
216 return getTasksQueueSize()
217 },
218 get maxQueued (): number {
219 return getTasksQueueMaxSize()
220 },
221 sequentiallyStolen: 0,
222 stolen: 0,
223 failed: 0
224 },
225 runTime: {
226 history: new CircularArray<number>()
227 },
228 waitTime: {
229 history: new CircularArray<number>()
230 },
231 elu: {
232 idle: {
233 history: new CircularArray<number>()
234 },
235 active: {
236 history: new CircularArray<number>()
237 }
238 }
239 }
240 }
241
242 private initTaskFunctionWorkerUsage (name: string): WorkerUsage {
243 const getTaskFunctionQueueSize = (): number => {
244 let taskFunctionQueueSize = 0
245 for (const task of this.tasksQueue) {
246 if (
247 (task.name === DEFAULT_TASK_NAME &&
248 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
249 name === this.info.taskFunctionsProperties![1].name) ||
250 (task.name !== DEFAULT_TASK_NAME && name === task.name)
251 ) {
252 ++taskFunctionQueueSize
253 }
254 }
255 return taskFunctionQueueSize
256 }
257 return {
258 tasks: {
259 executed: 0,
260 executing: 0,
261 get queued (): number {
262 return getTaskFunctionQueueSize()
263 },
264 sequentiallyStolen: 0,
265 stolen: 0,
266 failed: 0
267 },
268 runTime: {
269 history: new CircularArray<number>()
270 },
271 waitTime: {
272 history: new CircularArray<number>()
273 },
274 elu: {
275 idle: {
276 history: new CircularArray<number>()
277 },
278 active: {
279 history: new CircularArray<number>()
280 }
281 }
282 }
283 }
284 }