feat: ensure charging station add op return its station info
[e-mobility-charging-stations-simulator.git] / src / worker / WorkerSet.ts
CommitLineData
a19b897d 1// Partial Copyright Jerome Benoit. 2021-2024. All Rights Reserved.
c8eeb62b 2
66a7748d
JB
3import { EventEmitterAsyncResource } from 'node:events'
4import { SHARE_ENV, Worker } from 'node:worker_threads'
c045d9a9 5
66a7748d
JB
6import { WorkerAbstract } from './WorkerAbstract.js'
7import { EMPTY_FUNCTION, workerSetVersion } from './WorkerConstants.js'
0e4fa348 8import {
b779c0f8 9 type SetInfo,
0e4fa348 10 type WorkerData,
c26984f2 11 type WorkerMessage,
0e4fa348
JB
12 WorkerMessageEvents,
13 type WorkerOptions,
14 type WorkerSetElement,
66a7748d
JB
15 WorkerSetEvents
16} from './WorkerTypes.js'
17import { randomizeDelay, sleep } from './WorkerUtils.js'
6013bc53 18
3b09e788 19export class WorkerSet<D extends WorkerData, R extends WorkerData> extends WorkerAbstract<D, R> {
66a7748d
JB
20 public readonly emitter: EventEmitterAsyncResource | undefined
21 private readonly workerSet: Set<WorkerSetElement>
22 private started: boolean
23 private workerStartup: boolean
6013bc53
JB
24
25 /**
361c98f5 26 * Creates a new `WorkerSet`.
6013bc53 27 *
0e4fa348
JB
28 * @param workerScript -
29 * @param workerOptions -
6013bc53 30 */
66a7748d
JB
31 constructor (workerScript: string, workerOptions: WorkerOptions) {
32 super(workerScript, workerOptions)
27662cf1 33 if (this.workerOptions.elementsPerWorker == null) {
66a7748d 34 throw new TypeError('Elements per worker is not defined')
81027aa5 35 }
f028efca 36 if (!Number.isSafeInteger(this.workerOptions.elementsPerWorker)) {
66a7748d 37 throw new TypeError('Elements per worker must be an integer')
81027aa5 38 }
2bb3c92f 39 if (this.workerOptions.elementsPerWorker <= 0) {
66a7748d 40 throw new RangeError('Elements per worker must be greater than zero')
81027aa5 41 }
66a7748d 42 this.workerSet = new Set<WorkerSetElement>()
a4385edc 43 if (this.workerOptions.poolOptions?.enableEvents === true) {
66a7748d 44 this.emitter = new EventEmitterAsyncResource({ name: 'workerset' })
29bb4dee 45 }
66a7748d
JB
46 this.started = false
47 this.workerStartup = false
6013bc53
JB
48 }
49
66a7748d 50 get info (): SetInfo {
b779c0f8 51 return {
769d3b10 52 version: workerSetVersion,
0bde1ea1
JB
53 type: 'set',
54 worker: 'thread',
4f02e9b4 55 started: this.started,
b779c0f8 56 size: this.size,
19bdf4ca 57 elementsExecuting: [...this.workerSet].reduce(
b779c0f8 58 (accumulator, workerSetElement) => accumulator + workerSetElement.numberOfWorkerElements,
66a7748d 59 0
b779c0f8 60 ),
66a7748d
JB
61 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
62 elementsPerWorker: this.maxElementsPerWorker!
63 }
b779c0f8
JB
64 }
65
66a7748d
JB
66 get size (): number {
67 return this.workerSet.size
6013bc53
JB
68 }
69
66a7748d
JB
70 get maxElementsPerWorker (): number | undefined {
71 return this.workerOptions.elementsPerWorker
4d7227e6
JB
72 }
73
b0dee778 74 /** @inheritDoc */
66a7748d
JB
75 public async start (): Promise<void> {
76 this.addWorkerSetElement()
b0dee778 77 // Add worker set element sequentially to optimize memory at startup
66a7748d 78 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
ab93b184 79 this.workerOptions.workerStartDelay! > 0 &&
66a7748d
JB
80 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
81 (await sleep(randomizeDelay(this.workerOptions.workerStartDelay!)))
82 this.emitter?.emit(WorkerSetEvents.started, this.info)
83 this.started = true
b0dee778
JB
84 }
85
86 /** @inheritDoc */
66a7748d 87 public async stop (): Promise<void> {
b0dee778 88 for (const workerSetElement of this.workerSet) {
66a7748d 89 const worker = workerSetElement.worker
a974c8e4 90 const waitWorkerExit = new Promise<void>(resolve => {
ae3a41a1 91 worker.once('exit', () => {
66a7748d
JB
92 resolve()
93 })
94 })
8ba9c854 95 worker.unref()
66a7748d
JB
96 await worker.terminate()
97 await waitWorkerExit
b0dee778 98 }
4f02e9b4 99 this.emitter?.emit(WorkerSetEvents.stopped, this.info)
36c1166d 100 this.started = false
4f02e9b4 101 this.emitter?.emitDestroy()
b0dee778
JB
102 }
103
8baf3f8f 104 /** @inheritDoc */
3b09e788 105 public async addElement (elementData: D): Promise<R> {
c81424b8 106 if (!this.started) {
66a7748d 107 throw new Error('Cannot add a WorkerSet element: not started')
c81424b8 108 }
66a7748d 109 const workerSetElement = await this.getWorkerSetElement()
3b09e788
JB
110 const waitForAddedWorkerElement = new Promise<R>((resolve, reject) => {
111 const messageHandler = (message: WorkerMessage<R>): void => {
112 if (message.event === WorkerMessageEvents.addedWorkerElement) {
113 ++workerSetElement.numberOfWorkerElements
114 resolve(message.data)
115 workerSetElement.worker.off('message', messageHandler)
116 } else if (message.event === WorkerMessageEvents.workerElementError) {
117 // eslint-disable-next-line @typescript-eslint/prefer-promise-reject-errors
118 reject(message.data)
119 workerSetElement.worker.off('message', messageHandler)
120 }
121 }
122 workerSetElement.worker.on('message', messageHandler)
123 })
962a8159 124 workerSetElement.worker.postMessage({
244c1396 125 event: WorkerMessageEvents.addWorkerElement,
66a7748d
JB
126 data: elementData
127 })
3b09e788 128 const response = await waitForAddedWorkerElement
962a8159 129 // Add element sequentially to optimize memory at startup
66a7748d 130 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
da47bc29 131 if (this.workerOptions.elementAddDelay! > 0) {
66a7748d 132 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
da47bc29 133 await sleep(randomizeDelay(this.workerOptions.elementAddDelay!))
d070d967 134 }
3b09e788 135 return response
6013bc53
JB
136 }
137
6013bc53 138 /**
361c98f5 139 * Adds a new `WorkerSetElement`.
6013bc53 140 */
66a7748d
JB
141 private addWorkerSetElement (): WorkerSetElement {
142 this.workerStartup = true
be245fda
JB
143 const worker = new Worker(this.workerScript, {
144 env: SHARE_ENV,
66a7748d
JB
145 ...this.workerOptions.poolOptions?.workerOptions
146 })
147 worker.on('message', this.workerOptions.poolOptions?.messageHandler ?? EMPTY_FUNCTION)
3b09e788 148 worker.on('message', (message: WorkerMessage<R>) => {
244c1396
JB
149 if (message.event === WorkerMessageEvents.addedWorkerElement) {
150 this.emitter?.emit(WorkerSetEvents.elementAdded, this.info)
151 } else if (message.event === WorkerMessageEvents.workerElementError) {
66a7748d 152 this.emitter?.emit(WorkerSetEvents.elementError, message.data)
c26984f2 153 }
66a7748d
JB
154 })
155 worker.on('error', this.workerOptions.poolOptions?.errorHandler ?? EMPTY_FUNCTION)
a974c8e4 156 worker.on('error', error => {
66a7748d 157 this.emitter?.emit(WorkerSetEvents.error, error)
c81424b8 158 if (
66a7748d 159 this.workerOptions.poolOptions?.restartWorkerOnError === true &&
c81424b8
JB
160 this.started &&
161 !this.workerStartup
162 ) {
66a7748d 163 this.addWorkerSetElement()
29bb4dee 164 }
8ba9c854 165 worker.unref()
ea32ea05 166 worker.terminate().catch((error: unknown) => this.emitter?.emit(WorkerSetEvents.error, error))
66a7748d
JB
167 })
168 worker.on('online', this.workerOptions.poolOptions?.onlineHandler ?? EMPTY_FUNCTION)
169 worker.on('exit', this.workerOptions.poolOptions?.exitHandler ?? EMPTY_FUNCTION)
170 worker.once('exit', () => {
171 this.removeWorkerSetElement(this.getWorkerSetElementByWorker(worker))
172 })
173 const workerSetElement: WorkerSetElement = { worker, numberOfWorkerElements: 0 }
174 this.workerSet.add(workerSetElement)
175 this.workerStartup = false
176 return workerSetElement
6013bc53
JB
177 }
178
66a7748d
JB
179 private removeWorkerSetElement (workerSetElement: WorkerSetElement | undefined): void {
180 if (workerSetElement == null) {
181 return
182 }
183 this.workerSet.delete(workerSetElement)
dbc29904
JB
184 }
185
66a7748d
JB
186 private async getWorkerSetElement (): Promise<WorkerSetElement> {
187 let chosenWorkerSetElement: WorkerSetElement | undefined
962a8159 188 for (const workerSetElement of this.workerSet) {
66a7748d 189 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
e1d9a0f4 190 if (workerSetElement.numberOfWorkerElements < this.workerOptions.elementsPerWorker!) {
66a7748d
JB
191 chosenWorkerSetElement = workerSetElement
192 break
962a8159 193 }
e7aeea18 194 }
66a7748d
JB
195 if (chosenWorkerSetElement == null) {
196 chosenWorkerSetElement = this.addWorkerSetElement()
962a8159 197 // Add worker set element sequentially to optimize memory at startup
66a7748d 198 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
e1d9a0f4 199 this.workerOptions.workerStartDelay! > 0 &&
66a7748d
JB
200 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
201 (await sleep(randomizeDelay(this.workerOptions.workerStartDelay!)))
962a8159 202 }
66a7748d 203 return chosenWorkerSetElement
c045d9a9
JB
204 }
205
66a7748d
JB
206 private getWorkerSetElementByWorker (worker: Worker): WorkerSetElement | undefined {
207 let workerSetElt: WorkerSetElement | undefined
0e7a11e1 208 for (const workerSetElement of this.workerSet) {
81696bd5 209 if (workerSetElement.worker.threadId === worker.threadId) {
66a7748d
JB
210 workerSetElt = workerSetElement
211 break
1e924543 212 }
0e7a11e1 213 }
66a7748d 214 return workerSetElt
1e924543 215 }
6013bc53 216}