]> Piment Noir Git Repositories - poolifier.git/blob - src/pools/worker-node.ts
057c206a96b14e8b69f385c2db03b9d8d3e00bc5
[poolifier.git] / src / pools / worker-node.ts
1 import { EventEmitter } from 'node:events'
2 import { MessageChannel } from 'node:worker_threads'
3
4 import type { Task } from '../utility-types.js'
5
6 import { CircularBuffer } from '../circular-buffer.js'
7 import { PriorityQueue } from '../queues/priority-queue.js'
8 import { DEFAULT_TASK_NAME } from '../utils.js'
9 import {
10 checkWorkerNodeArguments,
11 createWorker,
12 initWorkerInfo,
13 } from './utils.js'
14 import {
15 type EventHandler,
16 type IWorker,
17 type IWorkerNode,
18 MeasurementHistorySize,
19 type StrategyData,
20 type WorkerInfo,
21 type WorkerNodeOptions,
22 type WorkerType,
23 WorkerTypes,
24 type WorkerUsage,
25 } from './worker.js'
26
27 /**
28 * Worker node.
29 * @typeParam Worker - Type of worker.
30 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
31 */
32 export class WorkerNode<Worker extends IWorker, Data = unknown>
33 extends EventEmitter
34 implements IWorkerNode<Worker, Data> {
35 /** @inheritdoc */
36 public readonly info: WorkerInfo
37 /** @inheritdoc */
38 public messageChannel?: MessageChannel
39 /** @inheritdoc */
40 public strategyData?: StrategyData
41 /** @inheritdoc */
42 public readonly tasksQueue: PriorityQueue<Task<Data>>
43 /** @inheritdoc */
44 public tasksQueueBackPressureSize: number
45 /** @inheritdoc */
46 public usage: WorkerUsage
47 /** @inheritdoc */
48 public readonly worker: Worker
49 private readonly taskFunctionsUsage: Map<string, WorkerUsage>
50
51 /**
52 * Constructs a new worker node.
53 * @param type - The worker type.
54 * @param filePath - Path to the worker file.
55 * @param opts - The worker node options.
56 */
57 constructor (type: WorkerType, filePath: string, opts: WorkerNodeOptions) {
58 super()
59 checkWorkerNodeArguments(type, filePath, opts)
60 this.worker = createWorker<Worker>(type, filePath, {
61 env: opts.env,
62 workerOptions: opts.workerOptions,
63 })
64 this.info = initWorkerInfo(this.worker)
65 this.usage = this.initWorkerUsage()
66 if (this.info.type === WorkerTypes.thread) {
67 this.messageChannel = new MessageChannel()
68 }
69 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
70 this.tasksQueueBackPressureSize = opts.tasksQueueBackPressureSize!
71 this.tasksQueue = new PriorityQueue<Task<Data>>(
72 opts.tasksQueueBucketSize,
73 opts.tasksQueuePriority
74 )
75 this.taskFunctionsUsage = new Map<string, WorkerUsage>()
76 }
77
78 /** @inheritdoc */
79 public clearTasksQueue (): void {
80 this.tasksQueue.clear()
81 }
82
83 /** @inheritdoc */
84 public deleteTask (task: Task<Data>): boolean {
85 return this.tasksQueue.delete(task)
86 }
87
88 /** @inheritdoc */
89 public deleteTaskFunctionWorkerUsage (name: string): boolean {
90 return this.taskFunctionsUsage.delete(name)
91 }
92
93 /** @inheritdoc */
94 public dequeueLastPrioritizedTask (): Task<Data> | undefined {
95 // Start from the last empty or partially filled bucket
96 return this.dequeueTask(this.tasksQueue.buckets + 1)
97 }
98
99 /** @inheritdoc */
100 public dequeueTask (bucket?: number): Task<Data> | undefined {
101 const task = this.tasksQueue.dequeue(bucket)
102 if (!this.hasBackPressure() && this.info.backPressure) {
103 this.info.backPressure = false
104 }
105 return task
106 }
107
108 /** @inheritdoc */
109 public enqueueTask (task: Task<Data>): number {
110 const tasksQueueSize = this.tasksQueue.enqueue(task, task.priority)
111 if (this.hasBackPressure() && !this.info.backPressure) {
112 this.info.backPressure = true
113 this.emit('backPressure', { workerId: this.info.id })
114 }
115 return tasksQueueSize
116 }
117
118 /** @inheritdoc */
119 public getTaskFunctionWorkerUsage (name: string): undefined | WorkerUsage {
120 if (!Array.isArray(this.info.taskFunctionsProperties)) {
121 throw new Error(
122 `Cannot get task function worker usage for task function name '${name}' when task function properties list is not yet defined`
123 )
124 }
125 if (
126 Array.isArray(this.info.taskFunctionsProperties) &&
127 this.info.taskFunctionsProperties.length < 3
128 ) {
129 throw new Error(
130 `Cannot get task function worker usage for task function name '${name}' when task function properties list has less than 3 elements`
131 )
132 }
133 if (name === DEFAULT_TASK_NAME) {
134 name = this.info.taskFunctionsProperties[1].name
135 }
136 if (!this.taskFunctionsUsage.has(name)) {
137 this.taskFunctionsUsage.set(name, this.initTaskFunctionWorkerUsage(name))
138 }
139 return this.taskFunctionsUsage.get(name)
140 }
141
142 /** @inheritdoc */
143 public registerOnceWorkerEventHandler (
144 event: string,
145 handler: EventHandler<Worker>
146 ): void {
147 this.worker.once(event, handler)
148 }
149
150 /** @inheritdoc */
151 public registerWorkerEventHandler (
152 event: string,
153 handler: EventHandler<Worker>
154 ): void {
155 this.worker.on(event, handler)
156 }
157
158 /** @inheritdoc */
159 public setTasksQueuePriority (enablePriority: boolean): void {
160 this.tasksQueue.enablePriority = enablePriority
161 }
162
163 /** @inheritdoc */
164 public tasksQueueSize (): number {
165 return this.tasksQueue.size
166 }
167
168 /** @inheritdoc */
169 public async terminate (): Promise<void> {
170 const waitWorkerExit = new Promise<void>(resolve => {
171 this.registerOnceWorkerEventHandler('exit', () => {
172 resolve()
173 })
174 })
175 this.closeMessageChannel()
176 this.removeAllListeners()
177 switch (this.info.type) {
178 case WorkerTypes.cluster:
179 this.registerOnceWorkerEventHandler('disconnect', () => {
180 this.worker.kill?.()
181 })
182 this.worker.disconnect?.()
183 break
184 case WorkerTypes.thread:
185 this.worker.unref?.()
186 await this.worker.terminate?.()
187 break
188 }
189 await waitWorkerExit
190 }
191
192 private closeMessageChannel (): void {
193 if (this.messageChannel != null) {
194 this.messageChannel.port1.unref()
195 this.messageChannel.port2.unref()
196 this.messageChannel.port1.close()
197 this.messageChannel.port2.close()
198 delete this.messageChannel
199 }
200 }
201
202 /**
203 * Whether the worker node is back pressured or not.
204 * @returns `true` if the worker node is back pressured, `false` otherwise.
205 */
206 private hasBackPressure (): boolean {
207 return this.tasksQueue.size >= this.tasksQueueBackPressureSize
208 }
209
210 private initTaskFunctionWorkerUsage (name: string): WorkerUsage {
211 const getTaskFunctionQueueSize = (): number => {
212 let taskFunctionQueueSize = 0
213 for (const task of this.tasksQueue) {
214 if (
215 (task.name === DEFAULT_TASK_NAME &&
216 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
217 name === this.info.taskFunctionsProperties![1].name) ||
218 (task.name !== DEFAULT_TASK_NAME && name === task.name)
219 ) {
220 ++taskFunctionQueueSize
221 }
222 }
223 return taskFunctionQueueSize
224 }
225 return {
226 elu: {
227 active: {
228 history: new CircularBuffer(MeasurementHistorySize),
229 },
230 idle: {
231 history: new CircularBuffer(MeasurementHistorySize),
232 },
233 },
234 runTime: {
235 history: new CircularBuffer(MeasurementHistorySize),
236 },
237 tasks: {
238 executed: 0,
239 executing: 0,
240 failed: 0,
241 get queued (): number {
242 return getTaskFunctionQueueSize()
243 },
244 sequentiallyStolen: 0,
245 stolen: 0,
246 },
247 waitTime: {
248 history: new CircularBuffer(MeasurementHistorySize),
249 },
250 }
251 }
252
253 private initWorkerUsage (): WorkerUsage {
254 const getTasksQueueSize = (): number => {
255 return this.tasksQueue.size
256 }
257 const getTasksQueueMaxSize = (): number => {
258 return this.tasksQueue.maxSize
259 }
260 return {
261 elu: {
262 active: {
263 history: new CircularBuffer(MeasurementHistorySize),
264 },
265 idle: {
266 history: new CircularBuffer(MeasurementHistorySize),
267 },
268 },
269 runTime: {
270 history: new CircularBuffer(MeasurementHistorySize),
271 },
272 tasks: {
273 executed: 0,
274 executing: 0,
275 failed: 0,
276 get maxQueued (): number {
277 return getTasksQueueMaxSize()
278 },
279 get queued (): number {
280 return getTasksQueueSize()
281 },
282 sequentiallyStolen: 0,
283 stolen: 0,
284 },
285 waitTime: {
286 history: new CircularBuffer(MeasurementHistorySize),
287 },
288 }
289 }
290 }