]> Piment Noir Git Repositories - e-mobility-charging-stations-simulator.git/blame - src/worker/WorkerSet.ts
build(deps): bump the regular group across 1 directory with 6 updates (#1452)
[e-mobility-charging-stations-simulator.git] / src / worker / WorkerSet.ts
CommitLineData
a19b897d 1// Partial Copyright Jerome Benoit. 2021-2024. All Rights Reserved.
c8eeb62b 2
65d22502 3import { randomUUID } from 'node:crypto'
66a7748d
JB
4import { EventEmitterAsyncResource } from 'node:events'
5import { SHARE_ENV, Worker } from 'node:worker_threads'
c045d9a9 6
66a7748d
JB
7import { WorkerAbstract } from './WorkerAbstract.js'
8import { EMPTY_FUNCTION, workerSetVersion } from './WorkerConstants.js'
0e4fa348 9import {
b779c0f8 10 type SetInfo,
0e4fa348 11 type WorkerData,
c26984f2 12 type WorkerMessage,
0e4fa348
JB
13 WorkerMessageEvents,
14 type WorkerOptions,
15 type WorkerSetElement,
d1f5bfd8 16 WorkerSetEvents,
66a7748d
JB
17} from './WorkerTypes.js'
18import { randomizeDelay, sleep } from './WorkerUtils.js'
6013bc53 19
65d22502 20interface ResponseWrapper<R extends WorkerData> {
65d22502 21 reject: (reason?: unknown) => void
0749233f 22 resolve: (value: PromiseLike<R> | R) => void
65d22502
JB
23 workerSetElement: WorkerSetElement
24}
25
3b09e788 26export class WorkerSet<D extends WorkerData, R extends WorkerData> extends WorkerAbstract<D, R> {
c4a89082
JB
27 public readonly emitter: EventEmitterAsyncResource | undefined
28
29 get info (): SetInfo {
30 return {
31 elementsExecuting: [...this.workerSet].reduce(
32 (accumulator, workerSetElement) => accumulator + workerSetElement.numberOfWorkerElements,
33 0
34 ),
35 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
36 elementsPerWorker: this.maxElementsPerWorker!,
37 size: this.size,
38 started: this.started,
39 type: 'set',
40 version: workerSetVersion,
41 worker: 'thread',
42 }
43 }
44
45 get maxElementsPerWorker (): number | undefined {
46 return this.workerOptions.elementsPerWorker
47 }
48
49 get size (): number {
50 return this.workerSet.size
51 }
52
65d22502
JB
53 private readonly promiseResponseMap: Map<
54 `${string}-${string}-${string}-${string}`,
d1f5bfd8 55 ResponseWrapper<R>
65d22502
JB
56 >
57
66a7748d 58 private started: boolean
0749233f 59 private readonly workerSet: Set<WorkerSetElement>
66a7748d 60 private workerStartup: boolean
6013bc53
JB
61
62 /**
361c98f5 63 * Creates a new `WorkerSet`.
0e4fa348
JB
64 * @param workerScript -
65 * @param workerOptions -
6013bc53 66 */
66a7748d
JB
67 constructor (workerScript: string, workerOptions: WorkerOptions) {
68 super(workerScript, workerOptions)
27662cf1 69 if (this.workerOptions.elementsPerWorker == null) {
66a7748d 70 throw new TypeError('Elements per worker is not defined')
81027aa5 71 }
f028efca 72 if (!Number.isSafeInteger(this.workerOptions.elementsPerWorker)) {
66a7748d 73 throw new TypeError('Elements per worker must be an integer')
81027aa5 74 }
2bb3c92f 75 if (this.workerOptions.elementsPerWorker <= 0) {
66a7748d 76 throw new RangeError('Elements per worker must be greater than zero')
81027aa5 77 }
66a7748d 78 this.workerSet = new Set<WorkerSetElement>()
65d22502
JB
79 this.promiseResponseMap = new Map<
80 `${string}-${string}-${string}-${string}`,
d1f5bfd8 81 ResponseWrapper<R>
65d22502 82 >()
a4385edc 83 if (this.workerOptions.poolOptions?.enableEvents === true) {
66a7748d 84 this.emitter = new EventEmitterAsyncResource({ name: 'workerset' })
29bb4dee 85 }
66a7748d
JB
86 this.started = false
87 this.workerStartup = false
6013bc53
JB
88 }
89
c4a89082
JB
90 /** @inheritDoc */
91 public async addElement (elementData: D): Promise<R> {
92 if (!this.started) {
93 throw new Error('Cannot add a WorkerSet element: not started')
94 }
95 const workerSetElement = await this.getWorkerSetElement()
96 const sendMessageToWorker = new Promise<R>((resolve, reject) => {
97 const message = {
98 data: elementData,
99 event: WorkerMessageEvents.addWorkerElement,
100 uuid: randomUUID(),
101 } satisfies WorkerMessage<D>
102 workerSetElement.worker.postMessage(message)
103 this.promiseResponseMap.set(message.uuid, {
104 reject,
105 resolve,
106 workerSetElement,
107 })
108 })
109 const response = await sendMessageToWorker
110 // Add element sequentially to optimize memory at startup
111 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
112 if (this.workerOptions.elementAddDelay! > 0) {
113 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
114 await sleep(randomizeDelay(this.workerOptions.elementAddDelay!))
115 }
116 return response
117 }
118
119 /** @inheritDoc */
120 public async start (): Promise<void> {
121 this.addWorkerSetElement()
122 // Add worker set element sequentially to optimize memory at startup
123 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
124 this.workerOptions.workerStartDelay! > 0 &&
125 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
126 (await sleep(randomizeDelay(this.workerOptions.workerStartDelay!)))
127 this.emitter?.emit(WorkerSetEvents.started, this.info)
128 this.started = true
129 }
130
131 /** @inheritDoc */
132 public async stop (): Promise<void> {
133 for (const workerSetElement of this.workerSet) {
134 const worker = workerSetElement.worker
135 const waitWorkerExit = new Promise<void>(resolve => {
136 worker.once('exit', () => {
137 resolve()
138 })
139 })
140 worker.unref()
141 await worker.terminate()
142 await waitWorkerExit
143 }
144 this.emitter?.emit(WorkerSetEvents.stopped, this.info)
145 this.started = false
146 this.emitter?.emitDestroy()
147 }
148
6013bc53 149 /**
361c98f5 150 * Adds a new `WorkerSetElement`.
d1f5bfd8 151 * @returns The new `WorkerSetElement`.
6013bc53 152 */
66a7748d
JB
153 private addWorkerSetElement (): WorkerSetElement {
154 this.workerStartup = true
be245fda
JB
155 const worker = new Worker(this.workerScript, {
156 env: SHARE_ENV,
d1f5bfd8 157 ...this.workerOptions.poolOptions?.workerOptions,
66a7748d
JB
158 })
159 worker.on('message', this.workerOptions.poolOptions?.messageHandler ?? EMPTY_FUNCTION)
3b09e788 160 worker.on('message', (message: WorkerMessage<R>) => {
0749233f 161 const { data, event, uuid } = message
1d7a504b 162 if (this.promiseResponseMap.has(uuid)) {
65d22502 163 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
0749233f 164 const { reject, resolve, workerSetElement } = this.promiseResponseMap.get(uuid)!
ce0abd82
JB
165 switch (event) {
166 case WorkerMessageEvents.addedWorkerElement:
167 this.emitter?.emit(WorkerSetEvents.elementAdded, this.info)
168 ++workerSetElement.numberOfWorkerElements
169 resolve(data)
170 break
171 case WorkerMessageEvents.workerElementError:
172 this.emitter?.emit(WorkerSetEvents.elementError, data)
173 reject(data)
174 break
175 default:
176 reject(
177 new Error(
178 `Unknown worker message event: '${event}' received with data: '${JSON.stringify(
179 data,
180 undefined,
181 2
182 )}'`
183 )
184 )
65d22502 185 }
ccf11189 186 this.promiseResponseMap.delete(uuid)
c26984f2 187 }
66a7748d
JB
188 })
189 worker.on('error', this.workerOptions.poolOptions?.errorHandler ?? EMPTY_FUNCTION)
4119f480 190 worker.once('error', error => {
66a7748d 191 this.emitter?.emit(WorkerSetEvents.error, error)
c81424b8 192 if (
66a7748d 193 this.workerOptions.poolOptions?.restartWorkerOnError === true &&
c81424b8
JB
194 this.started &&
195 !this.workerStartup
196 ) {
66a7748d 197 this.addWorkerSetElement()
29bb4dee 198 }
8ba9c854 199 worker.unref()
d1f5bfd8 200 // eslint-disable-next-line promise/no-promise-in-callback
ea32ea05 201 worker.terminate().catch((error: unknown) => this.emitter?.emit(WorkerSetEvents.error, error))
66a7748d
JB
202 })
203 worker.on('online', this.workerOptions.poolOptions?.onlineHandler ?? EMPTY_FUNCTION)
204 worker.on('exit', this.workerOptions.poolOptions?.exitHandler ?? EMPTY_FUNCTION)
205 worker.once('exit', () => {
206 this.removeWorkerSetElement(this.getWorkerSetElementByWorker(worker))
207 })
48847bc0 208 const workerSetElement: WorkerSetElement = {
d1f5bfd8 209 numberOfWorkerElements: 0,
0749233f 210 worker,
48847bc0 211 }
66a7748d
JB
212 this.workerSet.add(workerSetElement)
213 this.workerStartup = false
214 return workerSetElement
6013bc53
JB
215 }
216
66a7748d 217 private async getWorkerSetElement (): Promise<WorkerSetElement> {
0749233f 218 let chosenWorkerSetElement: undefined | WorkerSetElement
962a8159 219 for (const workerSetElement of this.workerSet) {
66a7748d 220 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
e1d9a0f4 221 if (workerSetElement.numberOfWorkerElements < this.workerOptions.elementsPerWorker!) {
66a7748d
JB
222 chosenWorkerSetElement = workerSetElement
223 break
962a8159 224 }
e7aeea18 225 }
66a7748d
JB
226 if (chosenWorkerSetElement == null) {
227 chosenWorkerSetElement = this.addWorkerSetElement()
962a8159 228 // Add worker set element sequentially to optimize memory at startup
66a7748d 229 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
e1d9a0f4 230 this.workerOptions.workerStartDelay! > 0 &&
66a7748d
JB
231 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
232 (await sleep(randomizeDelay(this.workerOptions.workerStartDelay!)))
962a8159 233 }
66a7748d 234 return chosenWorkerSetElement
c045d9a9
JB
235 }
236
0749233f
JB
237 private getWorkerSetElementByWorker (worker: Worker): undefined | WorkerSetElement {
238 let workerSetElt: undefined | WorkerSetElement
0e7a11e1 239 for (const workerSetElement of this.workerSet) {
81696bd5 240 if (workerSetElement.worker.threadId === worker.threadId) {
66a7748d
JB
241 workerSetElt = workerSetElement
242 break
1e924543 243 }
0e7a11e1 244 }
66a7748d 245 return workerSetElt
1e924543 246 }
0749233f
JB
247
248 private removeWorkerSetElement (workerSetElement: undefined | WorkerSetElement): void {
249 if (workerSetElement == null) {
250 return
251 }
252 this.workerSet.delete(workerSetElement)
253 }
6013bc53 254}