build(deps-dev): apply updates
[e-mobility-charging-stations-simulator.git] / src / worker / WorkerSet.ts
CommitLineData
a19b897d 1// Partial Copyright Jerome Benoit. 2021-2024. All Rights Reserved.
c8eeb62b 2
65d22502 3import { randomUUID } from 'node:crypto'
66a7748d
JB
4import { EventEmitterAsyncResource } from 'node:events'
5import { SHARE_ENV, Worker } from 'node:worker_threads'
c045d9a9 6
66a7748d
JB
7import { WorkerAbstract } from './WorkerAbstract.js'
8import { EMPTY_FUNCTION, workerSetVersion } from './WorkerConstants.js'
0e4fa348 9import {
b779c0f8 10 type SetInfo,
0e4fa348 11 type WorkerData,
c26984f2 12 type WorkerMessage,
0e4fa348
JB
13 WorkerMessageEvents,
14 type WorkerOptions,
15 type WorkerSetElement,
66a7748d
JB
16 WorkerSetEvents
17} from './WorkerTypes.js'
18import { randomizeDelay, sleep } from './WorkerUtils.js'
6013bc53 19
65d22502
JB
20interface ResponseWrapper<R extends WorkerData> {
21 resolve: (value: R | PromiseLike<R>) => void
22 reject: (reason?: unknown) => void
23 workerSetElement: WorkerSetElement
24}
25
3b09e788 26export class WorkerSet<D extends WorkerData, R extends WorkerData> extends WorkerAbstract<D, R> {
66a7748d
JB
27 public readonly emitter: EventEmitterAsyncResource | undefined
28 private readonly workerSet: Set<WorkerSetElement>
65d22502
JB
29 private readonly promiseResponseMap: Map<
30 `${string}-${string}-${string}-${string}`,
31 ResponseWrapper<R>
32 >
33
66a7748d
JB
34 private started: boolean
35 private workerStartup: boolean
6013bc53
JB
36
37 /**
361c98f5 38 * Creates a new `WorkerSet`.
6013bc53 39 *
0e4fa348
JB
40 * @param workerScript -
41 * @param workerOptions -
6013bc53 42 */
66a7748d
JB
43 constructor (workerScript: string, workerOptions: WorkerOptions) {
44 super(workerScript, workerOptions)
27662cf1 45 if (this.workerOptions.elementsPerWorker == null) {
66a7748d 46 throw new TypeError('Elements per worker is not defined')
81027aa5 47 }
f028efca 48 if (!Number.isSafeInteger(this.workerOptions.elementsPerWorker)) {
66a7748d 49 throw new TypeError('Elements per worker must be an integer')
81027aa5 50 }
2bb3c92f 51 if (this.workerOptions.elementsPerWorker <= 0) {
66a7748d 52 throw new RangeError('Elements per worker must be greater than zero')
81027aa5 53 }
66a7748d 54 this.workerSet = new Set<WorkerSetElement>()
65d22502
JB
55 this.promiseResponseMap = new Map<
56 `${string}-${string}-${string}-${string}`,
57 ResponseWrapper<R>
58 >()
a4385edc 59 if (this.workerOptions.poolOptions?.enableEvents === true) {
66a7748d 60 this.emitter = new EventEmitterAsyncResource({ name: 'workerset' })
29bb4dee 61 }
66a7748d
JB
62 this.started = false
63 this.workerStartup = false
6013bc53
JB
64 }
65
66a7748d 66 get info (): SetInfo {
b779c0f8 67 return {
769d3b10 68 version: workerSetVersion,
0bde1ea1
JB
69 type: 'set',
70 worker: 'thread',
4f02e9b4 71 started: this.started,
b779c0f8 72 size: this.size,
19bdf4ca 73 elementsExecuting: [...this.workerSet].reduce(
b779c0f8 74 (accumulator, workerSetElement) => accumulator + workerSetElement.numberOfWorkerElements,
66a7748d 75 0
b779c0f8 76 ),
66a7748d
JB
77 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
78 elementsPerWorker: this.maxElementsPerWorker!
79 }
b779c0f8
JB
80 }
81
66a7748d
JB
82 get size (): number {
83 return this.workerSet.size
6013bc53
JB
84 }
85
66a7748d
JB
86 get maxElementsPerWorker (): number | undefined {
87 return this.workerOptions.elementsPerWorker
4d7227e6
JB
88 }
89
b0dee778 90 /** @inheritDoc */
66a7748d
JB
91 public async start (): Promise<void> {
92 this.addWorkerSetElement()
b0dee778 93 // Add worker set element sequentially to optimize memory at startup
66a7748d 94 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
ab93b184 95 this.workerOptions.workerStartDelay! > 0 &&
66a7748d
JB
96 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
97 (await sleep(randomizeDelay(this.workerOptions.workerStartDelay!)))
98 this.emitter?.emit(WorkerSetEvents.started, this.info)
99 this.started = true
b0dee778
JB
100 }
101
102 /** @inheritDoc */
66a7748d 103 public async stop (): Promise<void> {
b0dee778 104 for (const workerSetElement of this.workerSet) {
66a7748d 105 const worker = workerSetElement.worker
a974c8e4 106 const waitWorkerExit = new Promise<void>(resolve => {
ae3a41a1 107 worker.once('exit', () => {
66a7748d
JB
108 resolve()
109 })
110 })
8ba9c854 111 worker.unref()
66a7748d
JB
112 await worker.terminate()
113 await waitWorkerExit
b0dee778 114 }
4f02e9b4 115 this.emitter?.emit(WorkerSetEvents.stopped, this.info)
36c1166d 116 this.started = false
4f02e9b4 117 this.emitter?.emitDestroy()
b0dee778
JB
118 }
119
8baf3f8f 120 /** @inheritDoc */
3b09e788 121 public async addElement (elementData: D): Promise<R> {
c81424b8 122 if (!this.started) {
66a7748d 123 throw new Error('Cannot add a WorkerSet element: not started')
c81424b8 124 }
66a7748d 125 const workerSetElement = await this.getWorkerSetElement()
65d22502
JB
126 const sendMessageToWorker = new Promise<R>((resolve, reject) => {
127 const message = {
128 uuid: randomUUID(),
129 event: WorkerMessageEvents.addWorkerElement,
130 data: elementData
131 } satisfies WorkerMessage<D>
132 workerSetElement.worker.postMessage(message)
48847bc0
JB
133 this.promiseResponseMap.set(message.uuid, {
134 resolve,
135 reject,
136 workerSetElement
137 })
66a7748d 138 })
65d22502 139 const response = await sendMessageToWorker
962a8159 140 // Add element sequentially to optimize memory at startup
66a7748d 141 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
da47bc29 142 if (this.workerOptions.elementAddDelay! > 0) {
66a7748d 143 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
da47bc29 144 await sleep(randomizeDelay(this.workerOptions.elementAddDelay!))
d070d967 145 }
3b09e788 146 return response
6013bc53
JB
147 }
148
6013bc53 149 /**
361c98f5 150 * Adds a new `WorkerSetElement`.
6013bc53 151 */
66a7748d
JB
152 private addWorkerSetElement (): WorkerSetElement {
153 this.workerStartup = true
be245fda
JB
154 const worker = new Worker(this.workerScript, {
155 env: SHARE_ENV,
66a7748d
JB
156 ...this.workerOptions.poolOptions?.workerOptions
157 })
158 worker.on('message', this.workerOptions.poolOptions?.messageHandler ?? EMPTY_FUNCTION)
3b09e788 159 worker.on('message', (message: WorkerMessage<R>) => {
1d7a504b
JB
160 const { uuid, event, data } = message
161 if (this.promiseResponseMap.has(uuid)) {
65d22502 162 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1d7a504b 163 const { resolve, reject, workerSetElement } = this.promiseResponseMap.get(uuid)!
ce0abd82
JB
164 switch (event) {
165 case WorkerMessageEvents.addedWorkerElement:
166 this.emitter?.emit(WorkerSetEvents.elementAdded, this.info)
167 ++workerSetElement.numberOfWorkerElements
168 resolve(data)
169 break
170 case WorkerMessageEvents.workerElementError:
171 this.emitter?.emit(WorkerSetEvents.elementError, data)
172 reject(data)
173 break
174 default:
175 reject(
176 new Error(
177 `Unknown worker message event: '${event}' received with data: '${JSON.stringify(
178 data,
179 undefined,
180 2
181 )}'`
182 )
183 )
65d22502 184 }
ccf11189 185 this.promiseResponseMap.delete(uuid)
c26984f2 186 }
66a7748d
JB
187 })
188 worker.on('error', this.workerOptions.poolOptions?.errorHandler ?? EMPTY_FUNCTION)
4119f480 189 worker.once('error', error => {
66a7748d 190 this.emitter?.emit(WorkerSetEvents.error, error)
c81424b8 191 if (
66a7748d 192 this.workerOptions.poolOptions?.restartWorkerOnError === true &&
c81424b8
JB
193 this.started &&
194 !this.workerStartup
195 ) {
66a7748d 196 this.addWorkerSetElement()
29bb4dee 197 }
8ba9c854 198 worker.unref()
ea32ea05 199 worker.terminate().catch((error: unknown) => this.emitter?.emit(WorkerSetEvents.error, error))
66a7748d
JB
200 })
201 worker.on('online', this.workerOptions.poolOptions?.onlineHandler ?? EMPTY_FUNCTION)
202 worker.on('exit', this.workerOptions.poolOptions?.exitHandler ?? EMPTY_FUNCTION)
203 worker.once('exit', () => {
204 this.removeWorkerSetElement(this.getWorkerSetElementByWorker(worker))
205 })
48847bc0
JB
206 const workerSetElement: WorkerSetElement = {
207 worker,
208 numberOfWorkerElements: 0
209 }
66a7748d
JB
210 this.workerSet.add(workerSetElement)
211 this.workerStartup = false
212 return workerSetElement
6013bc53
JB
213 }
214
66a7748d
JB
215 private removeWorkerSetElement (workerSetElement: WorkerSetElement | undefined): void {
216 if (workerSetElement == null) {
217 return
218 }
219 this.workerSet.delete(workerSetElement)
dbc29904
JB
220 }
221
66a7748d
JB
222 private async getWorkerSetElement (): Promise<WorkerSetElement> {
223 let chosenWorkerSetElement: WorkerSetElement | undefined
962a8159 224 for (const workerSetElement of this.workerSet) {
66a7748d 225 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
e1d9a0f4 226 if (workerSetElement.numberOfWorkerElements < this.workerOptions.elementsPerWorker!) {
66a7748d
JB
227 chosenWorkerSetElement = workerSetElement
228 break
962a8159 229 }
e7aeea18 230 }
66a7748d
JB
231 if (chosenWorkerSetElement == null) {
232 chosenWorkerSetElement = this.addWorkerSetElement()
962a8159 233 // Add worker set element sequentially to optimize memory at startup
66a7748d 234 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
e1d9a0f4 235 this.workerOptions.workerStartDelay! > 0 &&
66a7748d
JB
236 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
237 (await sleep(randomizeDelay(this.workerOptions.workerStartDelay!)))
962a8159 238 }
66a7748d 239 return chosenWorkerSetElement
c045d9a9
JB
240 }
241
66a7748d
JB
242 private getWorkerSetElementByWorker (worker: Worker): WorkerSetElement | undefined {
243 let workerSetElt: WorkerSetElement | undefined
0e7a11e1 244 for (const workerSetElement of this.workerSet) {
81696bd5 245 if (workerSetElement.worker.threadId === worker.threadId) {
66a7748d
JB
246 workerSetElt = workerSetElement
247 break
1e924543 248 }
0e7a11e1 249 }
66a7748d 250 return workerSetElt
1e924543 251 }
6013bc53 252}