fix: fix simulator initialization ordering
[e-mobility-charging-stations-simulator.git] / src / worker / WorkerSet.ts
1 // Partial Copyright Jerome Benoit. 2021-2024. All Rights Reserved.
2
3 import { EventEmitterAsyncResource } from 'node:events'
4 import { SHARE_ENV, Worker } from 'node:worker_threads'
5
6 import { WorkerAbstract } from './WorkerAbstract.js'
7 import { EMPTY_FUNCTION, workerSetVersion } from './WorkerConstants.js'
8 import {
9 type SetInfo,
10 type WorkerData,
11 type WorkerMessage,
12 WorkerMessageEvents,
13 type WorkerOptions,
14 type WorkerSetElement,
15 WorkerSetEvents
16 } from './WorkerTypes.js'
17 import { randomizeDelay, sleep } from './WorkerUtils.js'
18
19 export class WorkerSet extends WorkerAbstract<WorkerData> {
20 public readonly emitter: EventEmitterAsyncResource | undefined
21 private readonly workerSet: Set<WorkerSetElement>
22 private started: boolean
23 private workerStartup: boolean
24
25 /**
26 * Creates a new `WorkerSet`.
27 *
28 * @param workerScript -
29 * @param workerOptions -
30 */
31 constructor (workerScript: string, workerOptions: WorkerOptions) {
32 super(workerScript, workerOptions)
33 if (this.workerOptions.elementsPerWorker == null) {
34 throw new TypeError('Elements per worker is not defined')
35 }
36 if (!Number.isSafeInteger(this.workerOptions.elementsPerWorker)) {
37 throw new TypeError('Elements per worker must be an integer')
38 }
39 if (this.workerOptions.elementsPerWorker <= 0) {
40 throw new RangeError('Elements per worker must be greater than zero')
41 }
42 this.workerSet = new Set<WorkerSetElement>()
43 if (this.workerOptions.poolOptions?.enableEvents === true) {
44 this.emitter = new EventEmitterAsyncResource({ name: 'workerset' })
45 }
46 this.started = false
47 this.workerStartup = false
48 }
49
50 get info (): SetInfo {
51 return {
52 version: workerSetVersion,
53 type: 'set',
54 worker: 'thread',
55 started: this.started,
56 size: this.size,
57 elementsExecuting: [...this.workerSet].reduce(
58 (accumulator, workerSetElement) => accumulator + workerSetElement.numberOfWorkerElements,
59 0
60 ),
61 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
62 elementsPerWorker: this.maxElementsPerWorker!
63 }
64 }
65
66 get size (): number {
67 return this.workerSet.size
68 }
69
70 get maxElementsPerWorker (): number | undefined {
71 return this.workerOptions.elementsPerWorker
72 }
73
74 /** @inheritDoc */
75 public async start (): Promise<void> {
76 this.addWorkerSetElement()
77 // Add worker set element sequentially to optimize memory at startup
78 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
79 this.workerOptions.workerStartDelay! > 0 &&
80 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
81 (await sleep(randomizeDelay(this.workerOptions.workerStartDelay!)))
82 this.emitter?.emit(WorkerSetEvents.started, this.info)
83 this.started = true
84 }
85
86 /** @inheritDoc */
87 public async stop (): Promise<void> {
88 for (const workerSetElement of this.workerSet) {
89 const worker = workerSetElement.worker
90 const waitWorkerExit = new Promise<void>(resolve => {
91 worker.once('exit', () => {
92 resolve()
93 })
94 })
95 worker.unref()
96 await worker.terminate()
97 await waitWorkerExit
98 }
99 this.emitter?.emit(WorkerSetEvents.stopped, this.info)
100 this.started = false
101 this.emitter?.emitDestroy()
102 }
103
104 /** @inheritDoc */
105 public async addElement (elementData: WorkerData): Promise<void> {
106 if (!this.started) {
107 throw new Error('Cannot add a WorkerSet element: not started')
108 }
109 const workerSetElement = await this.getWorkerSetElement()
110 workerSetElement.worker.postMessage({
111 event: WorkerMessageEvents.addWorkerElement,
112 data: elementData
113 })
114 ++workerSetElement.numberOfWorkerElements
115 // Add element sequentially to optimize memory at startup
116 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
117 if (this.workerOptions.elementAddDelay! > 0) {
118 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
119 await sleep(randomizeDelay(this.workerOptions.elementAddDelay!))
120 }
121 }
122
123 /**
124 * Adds a new `WorkerSetElement`.
125 */
126 private addWorkerSetElement (): WorkerSetElement {
127 this.workerStartup = true
128 const worker = new Worker(this.workerScript, {
129 env: SHARE_ENV,
130 ...this.workerOptions.poolOptions?.workerOptions
131 })
132 worker.on('message', this.workerOptions.poolOptions?.messageHandler ?? EMPTY_FUNCTION)
133 worker.on('message', (message: WorkerMessage<WorkerData>) => {
134 if (message.event === WorkerMessageEvents.addedWorkerElement) {
135 this.emitter?.emit(WorkerSetEvents.elementAdded, this.info)
136 } else if (message.event === WorkerMessageEvents.workerElementError) {
137 this.emitter?.emit(WorkerSetEvents.elementError, message.data)
138 }
139 })
140 worker.on('error', this.workerOptions.poolOptions?.errorHandler ?? EMPTY_FUNCTION)
141 worker.on('error', error => {
142 this.emitter?.emit(WorkerSetEvents.error, error)
143 if (
144 this.workerOptions.poolOptions?.restartWorkerOnError === true &&
145 this.started &&
146 !this.workerStartup
147 ) {
148 this.addWorkerSetElement()
149 }
150 worker.unref()
151 worker.terminate().catch((error: unknown) => this.emitter?.emit(WorkerSetEvents.error, error))
152 })
153 worker.on('online', this.workerOptions.poolOptions?.onlineHandler ?? EMPTY_FUNCTION)
154 worker.on('exit', this.workerOptions.poolOptions?.exitHandler ?? EMPTY_FUNCTION)
155 worker.once('exit', () => {
156 this.removeWorkerSetElement(this.getWorkerSetElementByWorker(worker))
157 })
158 const workerSetElement: WorkerSetElement = { worker, numberOfWorkerElements: 0 }
159 this.workerSet.add(workerSetElement)
160 this.workerStartup = false
161 return workerSetElement
162 }
163
164 private removeWorkerSetElement (workerSetElement: WorkerSetElement | undefined): void {
165 if (workerSetElement == null) {
166 return
167 }
168 this.workerSet.delete(workerSetElement)
169 }
170
171 private async getWorkerSetElement (): Promise<WorkerSetElement> {
172 let chosenWorkerSetElement: WorkerSetElement | undefined
173 for (const workerSetElement of this.workerSet) {
174 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
175 if (workerSetElement.numberOfWorkerElements < this.workerOptions.elementsPerWorker!) {
176 chosenWorkerSetElement = workerSetElement
177 break
178 }
179 }
180 if (chosenWorkerSetElement == null) {
181 chosenWorkerSetElement = this.addWorkerSetElement()
182 // Add worker set element sequentially to optimize memory at startup
183 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
184 this.workerOptions.workerStartDelay! > 0 &&
185 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
186 (await sleep(randomizeDelay(this.workerOptions.workerStartDelay!)))
187 }
188 return chosenWorkerSetElement
189 }
190
191 private getWorkerSetElementByWorker (worker: Worker): WorkerSetElement | undefined {
192 let workerSetElt: WorkerSetElement | undefined
193 for (const workerSetElement of this.workerSet) {
194 if (workerSetElement.worker.threadId === worker.threadId) {
195 workerSetElt = workerSetElement
196 break
197 }
198 }
199 return workerSetElt
200 }
201 }