b09ea711df718bbcd508dfba52dd99112e90b285
[e-mobility-charging-stations-simulator.git] / src / worker / WorkerSet.ts
1 // Partial Copyright Jerome Benoit. 2021-2024. All Rights Reserved.
2
3 import { randomUUID } from 'node:crypto'
4 import { EventEmitterAsyncResource } from 'node:events'
5 import { SHARE_ENV, Worker } from 'node:worker_threads'
6
7 import { WorkerAbstract } from './WorkerAbstract.js'
8 import { EMPTY_FUNCTION, workerSetVersion } from './WorkerConstants.js'
9 import {
10 type SetInfo,
11 type WorkerData,
12 type WorkerMessage,
13 WorkerMessageEvents,
14 type WorkerOptions,
15 type WorkerSetElement,
16 WorkerSetEvents
17 } from './WorkerTypes.js'
18 import { randomizeDelay, sleep } from './WorkerUtils.js'
19
20 interface ResponseWrapper<R extends WorkerData> {
21 resolve: (value: R | PromiseLike<R>) => void
22 reject: (reason?: unknown) => void
23 workerSetElement: WorkerSetElement
24 }
25
26 export class WorkerSet<D extends WorkerData, R extends WorkerData> extends WorkerAbstract<D, R> {
27 public readonly emitter: EventEmitterAsyncResource | undefined
28 private readonly workerSet: Set<WorkerSetElement>
29 private readonly promiseResponseMap: Map<
30 `${string}-${string}-${string}-${string}`,
31 ResponseWrapper<R>
32 >
33
34 private started: boolean
35 private workerStartup: boolean
36
37 /**
38 * Creates a new `WorkerSet`.
39 *
40 * @param workerScript -
41 * @param workerOptions -
42 */
43 constructor (workerScript: string, workerOptions: WorkerOptions) {
44 super(workerScript, workerOptions)
45 if (this.workerOptions.elementsPerWorker == null) {
46 throw new TypeError('Elements per worker is not defined')
47 }
48 if (!Number.isSafeInteger(this.workerOptions.elementsPerWorker)) {
49 throw new TypeError('Elements per worker must be an integer')
50 }
51 if (this.workerOptions.elementsPerWorker <= 0) {
52 throw new RangeError('Elements per worker must be greater than zero')
53 }
54 this.workerSet = new Set<WorkerSetElement>()
55 this.promiseResponseMap = new Map<
56 `${string}-${string}-${string}-${string}`,
57 ResponseWrapper<R>
58 >()
59 if (this.workerOptions.poolOptions?.enableEvents === true) {
60 this.emitter = new EventEmitterAsyncResource({ name: 'workerset' })
61 }
62 this.started = false
63 this.workerStartup = false
64 }
65
66 get info (): SetInfo {
67 return {
68 version: workerSetVersion,
69 type: 'set',
70 worker: 'thread',
71 started: this.started,
72 size: this.size,
73 elementsExecuting: [...this.workerSet].reduce(
74 (accumulator, workerSetElement) => accumulator + workerSetElement.numberOfWorkerElements,
75 0
76 ),
77 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
78 elementsPerWorker: this.maxElementsPerWorker!
79 }
80 }
81
82 get size (): number {
83 return this.workerSet.size
84 }
85
86 get maxElementsPerWorker (): number | undefined {
87 return this.workerOptions.elementsPerWorker
88 }
89
90 /** @inheritDoc */
91 public async start (): Promise<void> {
92 this.addWorkerSetElement()
93 // Add worker set element sequentially to optimize memory at startup
94 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
95 this.workerOptions.workerStartDelay! > 0 &&
96 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
97 (await sleep(randomizeDelay(this.workerOptions.workerStartDelay!)))
98 this.emitter?.emit(WorkerSetEvents.started, this.info)
99 this.started = true
100 }
101
102 /** @inheritDoc */
103 public async stop (): Promise<void> {
104 for (const workerSetElement of this.workerSet) {
105 const worker = workerSetElement.worker
106 const waitWorkerExit = new Promise<void>(resolve => {
107 worker.once('exit', () => {
108 resolve()
109 })
110 })
111 worker.unref()
112 await worker.terminate()
113 await waitWorkerExit
114 }
115 this.emitter?.emit(WorkerSetEvents.stopped, this.info)
116 this.started = false
117 this.emitter?.emitDestroy()
118 }
119
120 /** @inheritDoc */
121 public async addElement (elementData: D): Promise<R> {
122 if (!this.started) {
123 throw new Error('Cannot add a WorkerSet element: not started')
124 }
125 const workerSetElement = await this.getWorkerSetElement()
126 const sendMessageToWorker = new Promise<R>((resolve, reject) => {
127 const message = {
128 uuid: randomUUID(),
129 event: WorkerMessageEvents.addWorkerElement,
130 data: elementData
131 } satisfies WorkerMessage<D>
132 workerSetElement.worker.postMessage(message)
133 this.promiseResponseMap.set(message.uuid, { resolve, reject, workerSetElement })
134 })
135 const response = await sendMessageToWorker
136 // Add element sequentially to optimize memory at startup
137 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
138 if (this.workerOptions.elementAddDelay! > 0) {
139 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
140 await sleep(randomizeDelay(this.workerOptions.elementAddDelay!))
141 }
142 return response
143 }
144
145 /**
146 * Adds a new `WorkerSetElement`.
147 */
148 private addWorkerSetElement (): WorkerSetElement {
149 this.workerStartup = true
150 const worker = new Worker(this.workerScript, {
151 env: SHARE_ENV,
152 ...this.workerOptions.poolOptions?.workerOptions
153 })
154 worker.on('message', this.workerOptions.poolOptions?.messageHandler ?? EMPTY_FUNCTION)
155 worker.on('message', (message: WorkerMessage<R>) => {
156 const { uuid, event, data } = message
157 if (this.promiseResponseMap.has(uuid)) {
158 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
159 const { resolve, reject, workerSetElement } = this.promiseResponseMap.get(uuid)!
160 switch (event) {
161 case WorkerMessageEvents.addedWorkerElement:
162 this.emitter?.emit(WorkerSetEvents.elementAdded, this.info)
163 ++workerSetElement.numberOfWorkerElements
164 resolve(data)
165 break
166 case WorkerMessageEvents.workerElementError:
167 this.emitter?.emit(WorkerSetEvents.elementError, data)
168 reject(data)
169 break
170 default:
171 reject(
172 new Error(
173 `Unknown worker message event: '${event}' received with data: '${JSON.stringify(
174 data,
175 undefined,
176 2
177 )}'`
178 )
179 )
180 }
181 this.promiseResponseMap.delete(uuid)
182 }
183 })
184 worker.on('error', this.workerOptions.poolOptions?.errorHandler ?? EMPTY_FUNCTION)
185 worker.on('error', error => {
186 this.emitter?.emit(WorkerSetEvents.error, error)
187 if (
188 this.workerOptions.poolOptions?.restartWorkerOnError === true &&
189 this.started &&
190 !this.workerStartup
191 ) {
192 this.addWorkerSetElement()
193 }
194 worker.unref()
195 worker.terminate().catch((error: unknown) => this.emitter?.emit(WorkerSetEvents.error, error))
196 })
197 worker.on('online', this.workerOptions.poolOptions?.onlineHandler ?? EMPTY_FUNCTION)
198 worker.on('exit', this.workerOptions.poolOptions?.exitHandler ?? EMPTY_FUNCTION)
199 worker.once('exit', () => {
200 this.removeWorkerSetElement(this.getWorkerSetElementByWorker(worker))
201 })
202 const workerSetElement: WorkerSetElement = { worker, numberOfWorkerElements: 0 }
203 this.workerSet.add(workerSetElement)
204 this.workerStartup = false
205 return workerSetElement
206 }
207
208 private removeWorkerSetElement (workerSetElement: WorkerSetElement | undefined): void {
209 if (workerSetElement == null) {
210 return
211 }
212 this.workerSet.delete(workerSetElement)
213 }
214
215 private async getWorkerSetElement (): Promise<WorkerSetElement> {
216 let chosenWorkerSetElement: WorkerSetElement | undefined
217 for (const workerSetElement of this.workerSet) {
218 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
219 if (workerSetElement.numberOfWorkerElements < this.workerOptions.elementsPerWorker!) {
220 chosenWorkerSetElement = workerSetElement
221 break
222 }
223 }
224 if (chosenWorkerSetElement == null) {
225 chosenWorkerSetElement = this.addWorkerSetElement()
226 // Add worker set element sequentially to optimize memory at startup
227 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
228 this.workerOptions.workerStartDelay! > 0 &&
229 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
230 (await sleep(randomizeDelay(this.workerOptions.workerStartDelay!)))
231 }
232 return chosenWorkerSetElement
233 }
234
235 private getWorkerSetElementByWorker (worker: Worker): WorkerSetElement | undefined {
236 let workerSetElt: WorkerSetElement | undefined
237 for (const workerSetElement of this.workerSet) {
238 if (workerSetElement.worker.threadId === worker.threadId) {
239 workerSetElt = workerSetElement
240 break
241 }
242 }
243 return workerSetElt
244 }
245 }