]>
Commit | Line | Data |
---|---|---|
e1c2dba7 | 1 | import { EventEmitter } from 'node:events' |
ded253e2 JB |
2 | import { MessageChannel } from 'node:worker_threads' |
3 | ||
97231086 JB |
4 | import type { Task } from '../utility-types.js' |
5 | ||
f12182ad | 6 | import { CircularBuffer } from '../circular-buffer.js' |
c6dd1aeb | 7 | import { PriorityQueue } from '../queues/priority-queue.js' |
e9ed6eee | 8 | import { DEFAULT_TASK_NAME } from '../utils.js' |
ded253e2 JB |
9 | import { |
10 | checkWorkerNodeArguments, | |
11 | createWorker, | |
559c196a | 12 | initWorkerInfo, |
ded253e2 | 13 | } from './utils.js' |
4b628b48 | 14 | import { |
3bcbd4c5 | 15 | type EventHandler, |
4b628b48 JB |
16 | type IWorker, |
17 | type IWorkerNode, | |
f12182ad | 18 | MeasurementHistorySize, |
f3a91bac | 19 | type StrategyData, |
4b628b48 | 20 | type WorkerInfo, |
c3719753 | 21 | type WorkerNodeOptions, |
4b628b48 JB |
22 | type WorkerType, |
23 | WorkerTypes, | |
3a502712 | 24 | type WorkerUsage, |
d35e5717 | 25 | } from './worker.js' |
4b628b48 | 26 | |
60664f48 JB |
27 | /** |
28 | * Worker node. | |
60664f48 JB |
29 | * @typeParam Worker - Type of worker. |
30 | * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data. | |
31 | */ | |
4b628b48 | 32 | export class WorkerNode<Worker extends IWorker, Data = unknown> |
e1c2dba7 | 33 | extends EventEmitter |
9f95d5eb | 34 | implements IWorkerNode<Worker, Data> { |
671d5154 | 35 | /** @inheritdoc */ |
4b628b48 | 36 | public readonly info: WorkerInfo |
671d5154 | 37 | /** @inheritdoc */ |
97231086 | 38 | public messageChannel?: MessageChannel |
20c6f652 | 39 | /** @inheritdoc */ |
f3a91bac JB |
40 | public strategyData?: StrategyData |
41 | /** @inheritdoc */ | |
f4289ecb JB |
42 | public readonly tasksQueue: PriorityQueue<Task<Data>> |
43 | /** @inheritdoc */ | |
20c6f652 | 44 | public tasksQueueBackPressureSize: number |
97231086 JB |
45 | /** @inheritdoc */ |
46 | public usage: WorkerUsage | |
47 | /** @inheritdoc */ | |
48 | public readonly worker: Worker | |
5de88a2a | 49 | private readonly taskFunctionsUsage: Map<string, WorkerUsage> |
4b628b48 | 50 | |
60664f48 JB |
51 | /** |
52 | * Constructs a new worker node. | |
c3719753 | 53 | * @param type - The worker type. |
9974369e | 54 | * @param filePath - Path to the worker file. |
c3719753 | 55 | * @param opts - The worker node options. |
60664f48 | 56 | */ |
c3719753 | 57 | constructor (type: WorkerType, filePath: string, opts: WorkerNodeOptions) { |
9f95d5eb | 58 | super() |
c3719753 JB |
59 | checkWorkerNodeArguments(type, filePath, opts) |
60 | this.worker = createWorker<Worker>(type, filePath, { | |
61 | env: opts.env, | |
3a502712 | 62 | workerOptions: opts.workerOptions, |
c3719753 | 63 | }) |
559c196a | 64 | this.info = initWorkerInfo(this.worker) |
26fb3c18 | 65 | this.usage = this.initWorkerUsage() |
75de9f41 | 66 | if (this.info.type === WorkerTypes.thread) { |
7884d183 JB |
67 | this.messageChannel = new MessageChannel() |
68 | } | |
c63a35a0 JB |
69 | // eslint-disable-next-line @typescript-eslint/no-non-null-assertion |
70 | this.tasksQueueBackPressureSize = opts.tasksQueueBackPressureSize! | |
fcfc3353 JB |
71 | this.tasksQueue = new PriorityQueue<Task<Data>>( |
72 | opts.tasksQueueBucketSize, | |
73 | opts.tasksQueuePriority | |
74 | ) | |
26fb3c18 | 75 | this.taskFunctionsUsage = new Map<string, WorkerUsage>() |
4b628b48 JB |
76 | } |
77 | ||
fcfc3353 | 78 | /** @inheritdoc */ |
97231086 JB |
79 | public clearTasksQueue (): void { |
80 | this.tasksQueue.clear() | |
fcfc3353 JB |
81 | } |
82 | ||
f4289ecb JB |
83 | /** @inheritdoc */ |
84 | public deleteTask (task: Task<Data>): boolean { | |
85 | return this.tasksQueue.delete(task) | |
86 | } | |
87 | ||
4b628b48 | 88 | /** @inheritdoc */ |
97231086 JB |
89 | public deleteTaskFunctionWorkerUsage (name: string): boolean { |
90 | return this.taskFunctionsUsage.delete(name) | |
4b628b48 JB |
91 | } |
92 | ||
4b628b48 | 93 | /** @inheritdoc */ |
97231086 JB |
94 | public dequeueLastPrioritizedTask (): Task<Data> | undefined { |
95 | // Start from the last empty or partially filled bucket | |
96 | return this.dequeueTask(this.tasksQueue.buckets + 1) | |
72695f86 JB |
97 | } |
98 | ||
99 | /** @inheritdoc */ | |
95d1a734 | 100 | public dequeueTask (bucket?: number): Task<Data> | undefined { |
2eee7220 | 101 | const task = this.tasksQueue.dequeue(bucket) |
8a2bf757 | 102 | if (!this.hasBackPressure() && this.info.backPressure) { |
2eee7220 JB |
103 | this.info.backPressure = false |
104 | } | |
2eee7220 | 105 | return task |
72695f86 JB |
106 | } |
107 | ||
0d4e88b3 | 108 | /** @inheritdoc */ |
97231086 JB |
109 | public enqueueTask (task: Task<Data>): number { |
110 | const tasksQueueSize = this.tasksQueue.enqueue(task, task.priority) | |
8a2bf757 | 111 | if (this.hasBackPressure() && !this.info.backPressure) { |
97231086 JB |
112 | this.info.backPressure = true |
113 | this.emit('backPressure', { workerId: this.info.id }) | |
3f09ed9f | 114 | } |
97231086 | 115 | return tasksQueueSize |
c3719753 JB |
116 | } |
117 | ||
ff128cc9 | 118 | /** @inheritdoc */ |
97231086 | 119 | public getTaskFunctionWorkerUsage (name: string): undefined | WorkerUsage { |
31847469 | 120 | if (!Array.isArray(this.info.taskFunctionsProperties)) { |
71b2b6d8 | 121 | throw new Error( |
31847469 | 122 | `Cannot get task function worker usage for task function name '${name}' when task function properties list is not yet defined` |
71b2b6d8 JB |
123 | ) |
124 | } | |
b558f6b5 | 125 | if ( |
31847469 JB |
126 | Array.isArray(this.info.taskFunctionsProperties) && |
127 | this.info.taskFunctionsProperties.length < 3 | |
b558f6b5 | 128 | ) { |
db0e38ee | 129 | throw new Error( |
31847469 | 130 | `Cannot get task function worker usage for task function name '${name}' when task function properties list has less than 3 elements` |
db0e38ee JB |
131 | ) |
132 | } | |
133 | if (name === DEFAULT_TASK_NAME) { | |
31847469 | 134 | name = this.info.taskFunctionsProperties[1].name |
b558f6b5 | 135 | } |
db0e38ee JB |
136 | if (!this.taskFunctionsUsage.has(name)) { |
137 | this.taskFunctionsUsage.set(name, this.initTaskFunctionWorkerUsage(name)) | |
ff128cc9 | 138 | } |
db0e38ee | 139 | return this.taskFunctionsUsage.get(name) |
4b628b48 JB |
140 | } |
141 | ||
97231086 JB |
142 | /** @inheritdoc */ |
143 | public registerOnceWorkerEventHandler ( | |
144 | event: string, | |
145 | handler: EventHandler<Worker> | |
146 | ): void { | |
147 | this.worker.once(event, handler) | |
07e0c9e5 JB |
148 | } |
149 | ||
97231086 JB |
150 | /** @inheritdoc */ |
151 | public registerWorkerEventHandler ( | |
152 | event: string, | |
153 | handler: EventHandler<Worker> | |
154 | ): void { | |
155 | this.worker.on(event, handler) | |
4b628b48 JB |
156 | } |
157 | ||
97231086 JB |
158 | /** @inheritdoc */ |
159 | public setTasksQueuePriority (enablePriority: boolean): void { | |
160 | this.tasksQueue.enablePriority = enablePriority | |
4b628b48 JB |
161 | } |
162 | ||
97231086 JB |
163 | /** @inheritdoc */ |
164 | public tasksQueueSize (): number { | |
165 | return this.tasksQueue.size | |
166 | } | |
167 | ||
168 | /** @inheritdoc */ | |
169 | public async terminate (): Promise<void> { | |
170 | const waitWorkerExit = new Promise<void>(resolve => { | |
171 | this.registerOnceWorkerEventHandler('exit', () => { | |
172 | resolve() | |
173 | }) | |
174 | }) | |
175 | this.closeMessageChannel() | |
176 | this.removeAllListeners() | |
177 | switch (this.info.type) { | |
97231086 JB |
178 | case WorkerTypes.cluster: |
179 | this.registerOnceWorkerEventHandler('disconnect', () => { | |
180 | this.worker.kill?.() | |
181 | }) | |
182 | this.worker.disconnect?.() | |
183 | break | |
e7730ecc | 184 | case WorkerTypes.thread: |
185 | this.worker.unref?.() | |
186 | await this.worker.terminate?.() | |
187 | break | |
b25a42cd | 188 | } |
97231086 | 189 | await waitWorkerExit |
b25a42cd | 190 | } |
5de88a2a JB |
191 | |
192 | private closeMessageChannel (): void { | |
193 | if (this.messageChannel != null) { | |
194 | this.messageChannel.port1.unref() | |
195 | this.messageChannel.port2.unref() | |
196 | this.messageChannel.port1.close() | |
197 | this.messageChannel.port2.close() | |
198 | delete this.messageChannel | |
199 | } | |
200 | } | |
201 | ||
202 | /** | |
203 | * Whether the worker node is back pressured or not. | |
204 | * @returns `true` if the worker node is back pressured, `false` otherwise. | |
205 | */ | |
206 | private hasBackPressure (): boolean { | |
207 | return this.tasksQueue.size >= this.tasksQueueBackPressureSize | |
208 | } | |
209 | ||
210 | private initTaskFunctionWorkerUsage (name: string): WorkerUsage { | |
211 | const getTaskFunctionQueueSize = (): number => { | |
212 | let taskFunctionQueueSize = 0 | |
213 | for (const task of this.tasksQueue) { | |
214 | if ( | |
215 | (task.name === DEFAULT_TASK_NAME && | |
216 | // eslint-disable-next-line @typescript-eslint/no-non-null-assertion | |
217 | name === this.info.taskFunctionsProperties![1].name) || | |
218 | (task.name !== DEFAULT_TASK_NAME && name === task.name) | |
219 | ) { | |
220 | ++taskFunctionQueueSize | |
221 | } | |
222 | } | |
223 | return taskFunctionQueueSize | |
224 | } | |
225 | return { | |
226 | elu: { | |
227 | active: { | |
228 | history: new CircularBuffer(MeasurementHistorySize), | |
229 | }, | |
230 | idle: { | |
231 | history: new CircularBuffer(MeasurementHistorySize), | |
232 | }, | |
233 | }, | |
234 | runTime: { | |
235 | history: new CircularBuffer(MeasurementHistorySize), | |
236 | }, | |
237 | tasks: { | |
238 | executed: 0, | |
239 | executing: 0, | |
240 | failed: 0, | |
241 | get queued (): number { | |
242 | return getTaskFunctionQueueSize() | |
243 | }, | |
244 | sequentiallyStolen: 0, | |
245 | stolen: 0, | |
246 | }, | |
247 | waitTime: { | |
248 | history: new CircularBuffer(MeasurementHistorySize), | |
249 | }, | |
250 | } | |
251 | } | |
252 | ||
5de88a2a JB |
253 | private initWorkerUsage (): WorkerUsage { |
254 | const getTasksQueueSize = (): number => { | |
255 | return this.tasksQueue.size | |
256 | } | |
257 | const getTasksQueueMaxSize = (): number => { | |
258 | return this.tasksQueue.maxSize | |
259 | } | |
260 | return { | |
261 | elu: { | |
262 | active: { | |
263 | history: new CircularBuffer(MeasurementHistorySize), | |
264 | }, | |
265 | idle: { | |
266 | history: new CircularBuffer(MeasurementHistorySize), | |
267 | }, | |
268 | }, | |
269 | runTime: { | |
270 | history: new CircularBuffer(MeasurementHistorySize), | |
271 | }, | |
272 | tasks: { | |
273 | executed: 0, | |
274 | executing: 0, | |
275 | failed: 0, | |
276 | get maxQueued (): number { | |
277 | return getTasksQueueMaxSize() | |
278 | }, | |
279 | get queued (): number { | |
280 | return getTasksQueueSize() | |
281 | }, | |
282 | sequentiallyStolen: 0, | |
283 | stolen: 0, | |
284 | }, | |
285 | waitTime: { | |
286 | history: new CircularBuffer(MeasurementHistorySize), | |
287 | }, | |
288 | } | |
289 | } | |
4b628b48 | 290 | } |