]>
Commit | Line | Data |
---|---|---|
1 | import { EventEmitter } from 'node:events' | |
2 | import { MessageChannel } from 'node:worker_threads' | |
3 | ||
4 | import type { Task } from '../utility-types.js' | |
5 | ||
6 | import { CircularBuffer } from '../circular-buffer.js' | |
7 | import { PriorityQueue } from '../queues/priority-queue.js' | |
8 | import { DEFAULT_TASK_NAME } from '../utils.js' | |
9 | import { | |
10 | checkWorkerNodeArguments, | |
11 | createWorker, | |
12 | initWorkerInfo, | |
13 | } from './utils.js' | |
14 | import { | |
15 | type EventHandler, | |
16 | type IWorker, | |
17 | type IWorkerNode, | |
18 | MeasurementHistorySize, | |
19 | type StrategyData, | |
20 | type WorkerInfo, | |
21 | type WorkerNodeOptions, | |
22 | type WorkerType, | |
23 | WorkerTypes, | |
24 | type WorkerUsage, | |
25 | } from './worker.js' | |
26 | ||
27 | /** | |
28 | * Worker node. | |
29 | * @typeParam Worker - Type of worker. | |
30 | * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data. | |
31 | */ | |
32 | export class WorkerNode<Worker extends IWorker, Data = unknown> | |
33 | extends EventEmitter | |
34 | implements IWorkerNode<Worker, Data> { | |
35 | /** @inheritdoc */ | |
36 | public readonly info: WorkerInfo | |
37 | /** @inheritdoc */ | |
38 | public messageChannel?: MessageChannel | |
39 | /** @inheritdoc */ | |
40 | public strategyData?: StrategyData | |
41 | /** @inheritdoc */ | |
42 | public readonly tasksQueue: PriorityQueue<Task<Data>> | |
43 | /** @inheritdoc */ | |
44 | public tasksQueueBackPressureSize: number | |
45 | /** @inheritdoc */ | |
46 | public usage: WorkerUsage | |
47 | /** @inheritdoc */ | |
48 | public readonly worker: Worker | |
49 | private readonly taskFunctionsUsage: Map<string, WorkerUsage> | |
50 | ||
51 | /** | |
52 | * Constructs a new worker node. | |
53 | * @param type - The worker type. | |
54 | * @param filePath - Path to the worker file. | |
55 | * @param opts - The worker node options. | |
56 | */ | |
57 | constructor (type: WorkerType, filePath: string, opts: WorkerNodeOptions) { | |
58 | super() | |
59 | checkWorkerNodeArguments(type, filePath, opts) | |
60 | this.worker = createWorker<Worker>(type, filePath, { | |
61 | env: opts.env, | |
62 | workerOptions: opts.workerOptions, | |
63 | }) | |
64 | this.info = initWorkerInfo(this.worker) | |
65 | this.usage = this.initWorkerUsage() | |
66 | if (this.info.type === WorkerTypes.thread) { | |
67 | this.messageChannel = new MessageChannel() | |
68 | } | |
69 | // eslint-disable-next-line @typescript-eslint/no-non-null-assertion | |
70 | this.tasksQueueBackPressureSize = opts.tasksQueueBackPressureSize! | |
71 | this.tasksQueue = new PriorityQueue<Task<Data>>( | |
72 | opts.tasksQueueBucketSize, | |
73 | opts.tasksQueuePriority | |
74 | ) | |
75 | this.taskFunctionsUsage = new Map<string, WorkerUsage>() | |
76 | } | |
77 | ||
78 | /** @inheritdoc */ | |
79 | public clearTasksQueue (): void { | |
80 | this.tasksQueue.clear() | |
81 | } | |
82 | ||
83 | /** @inheritdoc */ | |
84 | public deleteTask (task: Task<Data>): boolean { | |
85 | return this.tasksQueue.delete(task) | |
86 | } | |
87 | ||
88 | /** @inheritdoc */ | |
89 | public deleteTaskFunctionWorkerUsage (name: string): boolean { | |
90 | return this.taskFunctionsUsage.delete(name) | |
91 | } | |
92 | ||
93 | /** @inheritdoc */ | |
94 | public dequeueLastPrioritizedTask (): Task<Data> | undefined { | |
95 | // Start from the last empty or partially filled bucket | |
96 | return this.dequeueTask(this.tasksQueue.buckets + 1) | |
97 | } | |
98 | ||
99 | /** @inheritdoc */ | |
100 | public dequeueTask (bucket?: number): Task<Data> | undefined { | |
101 | const task = this.tasksQueue.dequeue(bucket) | |
102 | if (!this.hasBackPressure() && this.info.backPressure) { | |
103 | this.info.backPressure = false | |
104 | } | |
105 | return task | |
106 | } | |
107 | ||
108 | /** @inheritdoc */ | |
109 | public enqueueTask (task: Task<Data>): number { | |
110 | const tasksQueueSize = this.tasksQueue.enqueue(task, task.priority) | |
111 | if (this.hasBackPressure() && !this.info.backPressure) { | |
112 | this.info.backPressure = true | |
113 | this.emit('backPressure', { workerId: this.info.id }) | |
114 | } | |
115 | return tasksQueueSize | |
116 | } | |
117 | ||
118 | /** @inheritdoc */ | |
119 | public getTaskFunctionWorkerUsage (name: string): undefined | WorkerUsage { | |
120 | if (!Array.isArray(this.info.taskFunctionsProperties)) { | |
121 | throw new Error( | |
122 | `Cannot get task function worker usage for task function name '${name}' when task function properties list is not yet defined` | |
123 | ) | |
124 | } | |
125 | if ( | |
126 | Array.isArray(this.info.taskFunctionsProperties) && | |
127 | this.info.taskFunctionsProperties.length < 3 | |
128 | ) { | |
129 | throw new Error( | |
130 | `Cannot get task function worker usage for task function name '${name}' when task function properties list has less than 3 elements` | |
131 | ) | |
132 | } | |
133 | if (name === DEFAULT_TASK_NAME) { | |
134 | name = this.info.taskFunctionsProperties[1].name | |
135 | } | |
136 | if (!this.taskFunctionsUsage.has(name)) { | |
137 | this.taskFunctionsUsage.set(name, this.initTaskFunctionWorkerUsage(name)) | |
138 | } | |
139 | return this.taskFunctionsUsage.get(name) | |
140 | } | |
141 | ||
142 | /** @inheritdoc */ | |
143 | public registerOnceWorkerEventHandler ( | |
144 | event: string, | |
145 | handler: EventHandler<Worker> | |
146 | ): void { | |
147 | this.worker.once(event, handler) | |
148 | } | |
149 | ||
150 | /** @inheritdoc */ | |
151 | public registerWorkerEventHandler ( | |
152 | event: string, | |
153 | handler: EventHandler<Worker> | |
154 | ): void { | |
155 | this.worker.on(event, handler) | |
156 | } | |
157 | ||
158 | /** @inheritdoc */ | |
159 | public setTasksQueuePriority (enablePriority: boolean): void { | |
160 | this.tasksQueue.enablePriority = enablePriority | |
161 | } | |
162 | ||
163 | /** @inheritdoc */ | |
164 | public tasksQueueSize (): number { | |
165 | return this.tasksQueue.size | |
166 | } | |
167 | ||
168 | /** @inheritdoc */ | |
169 | public async terminate (): Promise<void> { | |
170 | const waitWorkerExit = new Promise<void>(resolve => { | |
171 | this.registerOnceWorkerEventHandler('exit', () => { | |
172 | resolve() | |
173 | }) | |
174 | }) | |
175 | this.closeMessageChannel() | |
176 | this.removeAllListeners() | |
177 | switch (this.info.type) { | |
178 | case WorkerTypes.cluster: | |
179 | this.registerOnceWorkerEventHandler('disconnect', () => { | |
180 | this.worker.kill?.() | |
181 | }) | |
182 | this.worker.disconnect?.() | |
183 | break | |
184 | case WorkerTypes.thread: | |
185 | this.worker.unref?.() | |
186 | await this.worker.terminate?.() | |
187 | break | |
188 | } | |
189 | await waitWorkerExit | |
190 | } | |
191 | ||
192 | private closeMessageChannel (): void { | |
193 | if (this.messageChannel != null) { | |
194 | this.messageChannel.port1.unref() | |
195 | this.messageChannel.port2.unref() | |
196 | this.messageChannel.port1.close() | |
197 | this.messageChannel.port2.close() | |
198 | delete this.messageChannel | |
199 | } | |
200 | } | |
201 | ||
202 | /** | |
203 | * Whether the worker node is back pressured or not. | |
204 | * @returns `true` if the worker node is back pressured, `false` otherwise. | |
205 | */ | |
206 | private hasBackPressure (): boolean { | |
207 | return this.tasksQueue.size >= this.tasksQueueBackPressureSize | |
208 | } | |
209 | ||
210 | private initTaskFunctionWorkerUsage (name: string): WorkerUsage { | |
211 | const getTaskFunctionQueueSize = (): number => { | |
212 | let taskFunctionQueueSize = 0 | |
213 | for (const task of this.tasksQueue) { | |
214 | if ( | |
215 | (task.name === DEFAULT_TASK_NAME && | |
216 | // eslint-disable-next-line @typescript-eslint/no-non-null-assertion | |
217 | name === this.info.taskFunctionsProperties![1].name) || | |
218 | (task.name !== DEFAULT_TASK_NAME && name === task.name) | |
219 | ) { | |
220 | ++taskFunctionQueueSize | |
221 | } | |
222 | } | |
223 | return taskFunctionQueueSize | |
224 | } | |
225 | return { | |
226 | elu: { | |
227 | active: { | |
228 | history: new CircularBuffer(MeasurementHistorySize), | |
229 | }, | |
230 | idle: { | |
231 | history: new CircularBuffer(MeasurementHistorySize), | |
232 | }, | |
233 | }, | |
234 | runTime: { | |
235 | history: new CircularBuffer(MeasurementHistorySize), | |
236 | }, | |
237 | tasks: { | |
238 | executed: 0, | |
239 | executing: 0, | |
240 | failed: 0, | |
241 | get queued (): number { | |
242 | return getTaskFunctionQueueSize() | |
243 | }, | |
244 | sequentiallyStolen: 0, | |
245 | stolen: 0, | |
246 | }, | |
247 | waitTime: { | |
248 | history: new CircularBuffer(MeasurementHistorySize), | |
249 | }, | |
250 | } | |
251 | } | |
252 | ||
253 | private initWorkerUsage (): WorkerUsage { | |
254 | const getTasksQueueSize = (): number => { | |
255 | return this.tasksQueue.size | |
256 | } | |
257 | const getTasksQueueMaxSize = (): number => { | |
258 | return this.tasksQueue.maxSize | |
259 | } | |
260 | return { | |
261 | elu: { | |
262 | active: { | |
263 | history: new CircularBuffer(MeasurementHistorySize), | |
264 | }, | |
265 | idle: { | |
266 | history: new CircularBuffer(MeasurementHistorySize), | |
267 | }, | |
268 | }, | |
269 | runTime: { | |
270 | history: new CircularBuffer(MeasurementHistorySize), | |
271 | }, | |
272 | tasks: { | |
273 | executed: 0, | |
274 | executing: 0, | |
275 | failed: 0, | |
276 | get maxQueued (): number { | |
277 | return getTasksQueueMaxSize() | |
278 | }, | |
279 | get queued (): number { | |
280 | return getTasksQueueSize() | |
281 | }, | |
282 | sequentiallyStolen: 0, | |
283 | stolen: 0, | |
284 | }, | |
285 | waitTime: { | |
286 | history: new CircularBuffer(MeasurementHistorySize), | |
287 | }, | |
288 | } | |
289 | } | |
290 | } |