refactor: cleanup worker events handling in main thread
[e-mobility-charging-stations-simulator.git] / src / charging-station / Bootstrap.ts
1 // Partial Copyright Jerome Benoit. 2021-2024. All Rights Reserved.
2
3 import { EventEmitter } from 'node:events'
4 import { dirname, extname, join, parse } from 'node:path'
5 import process, { exit } from 'node:process'
6 import { fileURLToPath } from 'node:url'
7 import { isMainThread } from 'node:worker_threads'
8 import type { Worker } from 'worker_threads'
9
10 import chalk from 'chalk'
11 import { type MessageHandler, availableParallelism } from 'poolifier'
12
13 import { waitChargingStationEvents } from './Helpers.js'
14 import type { AbstractUIServer } from './ui-server/AbstractUIServer.js'
15 import { UIServerFactory } from './ui-server/UIServerFactory.js'
16 import { version } from '../../package.json'
17 import { BaseError } from '../exception/index.js'
18 import { type Storage, StorageFactory } from '../performance/index.js'
19 import {
20 type ChargingStationData,
21 type ChargingStationWorkerData,
22 type ChargingStationWorkerEventError,
23 type ChargingStationWorkerMessage,
24 type ChargingStationWorkerMessageData,
25 ChargingStationWorkerMessageEvents,
26 ConfigurationSection,
27 ProcedureName,
28 type Statistics,
29 type StorageConfiguration,
30 type UIServerConfiguration,
31 type WorkerConfiguration
32 } from '../types/index.js'
33 import {
34 Configuration,
35 Constants,
36 formatDurationMilliSeconds,
37 generateUUID,
38 handleUncaughtException,
39 handleUnhandledRejection,
40 isAsyncFunction,
41 isNotEmptyArray,
42 logPrefix,
43 logger,
44 max
45 } from '../utils/index.js'
46 import { type WorkerAbstract, WorkerFactory } from '../worker/index.js'
47
48 const moduleName = 'Bootstrap'
49
50 enum exitCodes {
51 succeeded = 0,
52 missingChargingStationsConfiguration = 1,
53 duplicateChargingStationTemplateUrls = 2,
54 noChargingStationTemplates = 3,
55 gracefulShutdownError = 4
56 }
57
58 interface TemplateChargingStations {
59 configured: number
60 added: number
61 started: number
62 lastIndex: number
63 }
64
65 export class Bootstrap extends EventEmitter {
66 private static instance: Bootstrap | null = null
67 private workerImplementation?: WorkerAbstract<ChargingStationWorkerData>
68 private readonly uiServer?: AbstractUIServer
69 private storage?: Storage
70 private readonly chargingStationsByTemplate: Map<string, TemplateChargingStations>
71 private readonly version: string = version
72 private initializedCounters: boolean
73 private started: boolean
74 private starting: boolean
75 private stopping: boolean
76
77 private constructor () {
78 super()
79 for (const signal of ['SIGINT', 'SIGQUIT', 'SIGTERM']) {
80 process.on(signal, this.gracefulShutdown.bind(this))
81 }
82 // Enable unconditionally for now
83 handleUnhandledRejection()
84 handleUncaughtException()
85 this.started = false
86 this.starting = false
87 this.stopping = false
88 this.chargingStationsByTemplate = new Map<string, TemplateChargingStations>()
89 this.uiServer = UIServerFactory.getUIServerImplementation(
90 Configuration.getConfigurationSection<UIServerConfiguration>(ConfigurationSection.uiServer)
91 )
92 this.initializedCounters = false
93 this.initializeCounters()
94 Configuration.configurationChangeCallback = async () => {
95 if (isMainThread) {
96 await Bootstrap.getInstance().restart()
97 }
98 }
99 }
100
101 public static getInstance (): Bootstrap {
102 if (Bootstrap.instance === null) {
103 Bootstrap.instance = new Bootstrap()
104 }
105 return Bootstrap.instance
106 }
107
108 public get numberOfChargingStationTemplates (): number {
109 return this.chargingStationsByTemplate.size
110 }
111
112 public get numberOfConfiguredChargingStations (): number {
113 return [...this.chargingStationsByTemplate.values()].reduce(
114 (accumulator, value) => accumulator + value.configured,
115 0
116 )
117 }
118
119 public getLastIndex (templateName: string): number {
120 return this.chargingStationsByTemplate.get(templateName)?.lastIndex ?? 0
121 }
122
123 public getPerformanceStatistics (): IterableIterator<Statistics> | undefined {
124 return this.storage?.getPerformanceStatistics()
125 }
126
127 private get numberOfAddedChargingStations (): number {
128 return [...this.chargingStationsByTemplate.values()].reduce(
129 (accumulator, value) => accumulator + value.added,
130 0
131 )
132 }
133
134 private get numberOfStartedChargingStations (): number {
135 return [...this.chargingStationsByTemplate.values()].reduce(
136 (accumulator, value) => accumulator + value.started,
137 0
138 )
139 }
140
141 public async start (): Promise<void> {
142 if (!this.started) {
143 if (!this.starting) {
144 this.starting = true
145 this.on(ChargingStationWorkerMessageEvents.added, this.workerEventAdded)
146 this.on(ChargingStationWorkerMessageEvents.started, this.workerEventStarted)
147 this.on(ChargingStationWorkerMessageEvents.stopped, this.workerEventStopped)
148 this.on(ChargingStationWorkerMessageEvents.updated, this.workerEventUpdated)
149 this.on(
150 ChargingStationWorkerMessageEvents.performanceStatistics,
151 this.workerEventPerformanceStatistics
152 )
153 this.on(
154 ChargingStationWorkerMessageEvents.workerElementError,
155 (msg: ChargingStationWorkerMessage<ChargingStationWorkerEventError>) => {
156 logger.error(
157 `${this.logPrefix()} ${moduleName}.messageHandler: Error occurred while handling '${msg.data.event}' event on worker:`,
158 msg.data
159 )
160 }
161 )
162 this.initializeCounters()
163 const workerConfiguration = Configuration.getConfigurationSection<WorkerConfiguration>(
164 ConfigurationSection.worker
165 )
166 this.initializeWorkerImplementation(workerConfiguration)
167 await this.workerImplementation?.start()
168 const performanceStorageConfiguration =
169 Configuration.getConfigurationSection<StorageConfiguration>(
170 ConfigurationSection.performanceStorage
171 )
172 if (performanceStorageConfiguration.enabled === true) {
173 this.storage = StorageFactory.getStorage(
174 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
175 performanceStorageConfiguration.type!,
176 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
177 performanceStorageConfiguration.uri!,
178 this.logPrefix()
179 )
180 await this.storage?.open()
181 }
182 Configuration.getConfigurationSection<UIServerConfiguration>(ConfigurationSection.uiServer)
183 .enabled === true && this.uiServer?.start()
184 // Start ChargingStation object instance in worker thread
185 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
186 for (const stationTemplateUrl of Configuration.getStationTemplateUrls()!) {
187 try {
188 const nbStations =
189 this.chargingStationsByTemplate.get(parse(stationTemplateUrl.file).name)
190 ?.configured ?? stationTemplateUrl.numberOfStations
191 for (let index = 1; index <= nbStations; index++) {
192 await this.addChargingStation(index, stationTemplateUrl.file)
193 }
194 } catch (error) {
195 console.error(
196 chalk.red(
197 `Error at starting charging station with template file ${stationTemplateUrl.file}: `
198 ),
199 error
200 )
201 }
202 }
203 console.info(
204 chalk.green(
205 `Charging stations simulator ${
206 this.version
207 } started with ${this.numberOfConfiguredChargingStations} configured charging station(s) from ${this.numberOfChargingStationTemplates} charging station template(s) and ${
208 Configuration.workerDynamicPoolInUse() ? `${workerConfiguration.poolMinSize}/` : ''
209 }${this.workerImplementation?.size}${
210 Configuration.workerPoolInUse() ? `/${workerConfiguration.poolMaxSize}` : ''
211 } worker(s) concurrently running in '${workerConfiguration.processType}' mode${
212 this.workerImplementation?.maxElementsPerWorker != null
213 ? ` (${this.workerImplementation.maxElementsPerWorker} charging station(s) per worker)`
214 : ''
215 }`
216 )
217 )
218 Configuration.workerDynamicPoolInUse() &&
219 console.warn(
220 chalk.yellow(
221 'Charging stations simulator is using dynamic pool mode. This is an experimental feature with known issues.\nPlease consider using fixed pool or worker set mode instead'
222 )
223 )
224 console.info(chalk.green('Worker set/pool information:'), this.workerImplementation?.info)
225 this.started = true
226 this.starting = false
227 } else {
228 console.error(chalk.red('Cannot start an already starting charging stations simulator'))
229 }
230 } else {
231 console.error(chalk.red('Cannot start an already started charging stations simulator'))
232 }
233 }
234
235 public async stop (): Promise<void> {
236 if (this.started) {
237 if (!this.stopping) {
238 this.stopping = true
239 await this.uiServer?.sendInternalRequest(
240 this.uiServer.buildProtocolRequest(
241 generateUUID(),
242 ProcedureName.STOP_CHARGING_STATION,
243 Constants.EMPTY_FROZEN_OBJECT
244 )
245 )
246 try {
247 await this.waitChargingStationsStopped()
248 } catch (error) {
249 console.error(chalk.red('Error while waiting for charging stations to stop: '), error)
250 }
251 await this.workerImplementation?.stop()
252 delete this.workerImplementation
253 this.removeAllListeners()
254 await this.storage?.close()
255 delete this.storage
256 this.started = false
257 this.stopping = false
258 } else {
259 console.error(chalk.red('Cannot stop an already stopping charging stations simulator'))
260 }
261 } else {
262 console.error(chalk.red('Cannot stop an already stopped charging stations simulator'))
263 }
264 }
265
266 private async restart (): Promise<void> {
267 await this.stop()
268 Configuration.getConfigurationSection<UIServerConfiguration>(ConfigurationSection.uiServer)
269 .enabled !== true && this.uiServer?.stop()
270 this.initializedCounters = false
271 await this.start()
272 }
273
274 private async waitChargingStationsStopped (): Promise<string> {
275 return await new Promise<string>((resolve, reject) => {
276 const waitTimeout = setTimeout(() => {
277 const timeoutMessage = `Timeout ${formatDurationMilliSeconds(
278 Constants.STOP_CHARGING_STATIONS_TIMEOUT
279 )} reached at stopping charging stations`
280 console.warn(chalk.yellow(timeoutMessage))
281 reject(new Error(timeoutMessage))
282 }, Constants.STOP_CHARGING_STATIONS_TIMEOUT)
283 waitChargingStationEvents(
284 this,
285 ChargingStationWorkerMessageEvents.stopped,
286 this.numberOfStartedChargingStations
287 )
288 .then(() => {
289 resolve('Charging stations stopped')
290 })
291 .catch(reject)
292 .finally(() => {
293 clearTimeout(waitTimeout)
294 })
295 })
296 }
297
298 private initializeWorkerImplementation (workerConfiguration: WorkerConfiguration): void {
299 if (!isMainThread) {
300 return
301 }
302 let elementsPerWorker: number | undefined
303 switch (workerConfiguration.elementsPerWorker) {
304 case 'auto':
305 elementsPerWorker =
306 this.numberOfConfiguredChargingStations > availableParallelism()
307 ? Math.round(this.numberOfConfiguredChargingStations / (availableParallelism() * 1.5))
308 : 1
309 break
310 case 'all':
311 elementsPerWorker = this.numberOfConfiguredChargingStations
312 break
313 }
314 this.workerImplementation = WorkerFactory.getWorkerImplementation<ChargingStationWorkerData>(
315 join(
316 dirname(fileURLToPath(import.meta.url)),
317 `ChargingStationWorker${extname(fileURLToPath(import.meta.url))}`
318 ),
319 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
320 workerConfiguration.processType!,
321 {
322 workerStartDelay: workerConfiguration.startDelay,
323 elementStartDelay: workerConfiguration.elementStartDelay,
324 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
325 poolMaxSize: workerConfiguration.poolMaxSize!,
326 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
327 poolMinSize: workerConfiguration.poolMinSize!,
328 elementsPerWorker: elementsPerWorker ?? (workerConfiguration.elementsPerWorker as number),
329 poolOptions: {
330 messageHandler: this.messageHandler.bind(this) as MessageHandler<Worker>,
331 workerOptions: { resourceLimits: workerConfiguration.resourceLimits }
332 }
333 }
334 )
335 }
336
337 private messageHandler (
338 msg: ChargingStationWorkerMessage<ChargingStationWorkerMessageData>
339 ): void {
340 // logger.debug(
341 // `${this.logPrefix()} ${moduleName}.messageHandler: Worker channel message received: ${JSON.stringify(
342 // msg,
343 // undefined,
344 // 2
345 // )}`
346 // )
347 try {
348 switch (msg.event) {
349 case ChargingStationWorkerMessageEvents.added:
350 this.emit(ChargingStationWorkerMessageEvents.added, msg.data)
351 break
352 case ChargingStationWorkerMessageEvents.started:
353 this.emit(ChargingStationWorkerMessageEvents.started, msg.data)
354 break
355 case ChargingStationWorkerMessageEvents.stopped:
356 this.emit(ChargingStationWorkerMessageEvents.stopped, msg.data)
357 break
358 case ChargingStationWorkerMessageEvents.updated:
359 this.emit(ChargingStationWorkerMessageEvents.updated, msg.data)
360 break
361 case ChargingStationWorkerMessageEvents.performanceStatistics:
362 this.emit(ChargingStationWorkerMessageEvents.performanceStatistics, msg.data)
363 break
364 case ChargingStationWorkerMessageEvents.addedWorkerElement:
365 break
366 case ChargingStationWorkerMessageEvents.workerElementError:
367 this.emit(ChargingStationWorkerMessageEvents.workerElementError, msg.data)
368 break
369 default:
370 throw new BaseError(
371 `Unknown charging station worker event: '${
372 msg.event
373 }' received with data: ${JSON.stringify(msg.data, undefined, 2)}`
374 )
375 }
376 } catch (error) {
377 logger.error(
378 `${this.logPrefix()} ${moduleName}.messageHandler: Error occurred while handling '${
379 msg.event
380 }' event:`,
381 error
382 )
383 }
384 }
385
386 private readonly workerEventAdded = (data: ChargingStationData): void => {
387 this.uiServer?.chargingStations.set(data.stationInfo.hashId, data)
388 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
389 ++this.chargingStationsByTemplate.get(data.stationInfo.templateName)!.added
390 logger.info(
391 `${this.logPrefix()} ${moduleName}.workerEventAdded: Charging station ${
392 data.stationInfo.chargingStationId
393 } (hashId: ${data.stationInfo.hashId}) added (${
394 this.numberOfAddedChargingStations
395 } added from ${this.numberOfConfiguredChargingStations} configured charging station(s))`
396 )
397 }
398
399 private readonly workerEventStarted = (data: ChargingStationData): void => {
400 this.uiServer?.chargingStations.set(data.stationInfo.hashId, data)
401 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
402 ++this.chargingStationsByTemplate.get(data.stationInfo.templateName)!.started
403 logger.info(
404 `${this.logPrefix()} ${moduleName}.workerEventStarted: Charging station ${
405 data.stationInfo.chargingStationId
406 } (hashId: ${data.stationInfo.hashId}) started (${
407 this.numberOfStartedChargingStations
408 } started from ${this.numberOfAddedChargingStations} added charging station(s))`
409 )
410 }
411
412 private readonly workerEventStopped = (data: ChargingStationData): void => {
413 this.uiServer?.chargingStations.set(data.stationInfo.hashId, data)
414 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
415 --this.chargingStationsByTemplate.get(data.stationInfo.templateName)!.started
416 logger.info(
417 `${this.logPrefix()} ${moduleName}.workerEventStopped: Charging station ${
418 data.stationInfo.chargingStationId
419 } (hashId: ${data.stationInfo.hashId}) stopped (${
420 this.numberOfStartedChargingStations
421 } started from ${this.numberOfAddedChargingStations} added charging station(s))`
422 )
423 }
424
425 private readonly workerEventUpdated = (data: ChargingStationData): void => {
426 this.uiServer?.chargingStations.set(data.stationInfo.hashId, data)
427 }
428
429 private readonly workerEventPerformanceStatistics = (data: Statistics): void => {
430 // eslint-disable-next-line @typescript-eslint/unbound-method
431 if (isAsyncFunction(this.storage?.storePerformanceStatistics)) {
432 (
433 this.storage.storePerformanceStatistics as (
434 performanceStatistics: Statistics
435 ) => Promise<void>
436 )(data).catch(Constants.EMPTY_FUNCTION)
437 } else {
438 (this.storage?.storePerformanceStatistics as (performanceStatistics: Statistics) => void)(
439 data
440 )
441 }
442 }
443
444 private initializeCounters (): void {
445 if (!this.initializedCounters) {
446 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
447 const stationTemplateUrls = Configuration.getStationTemplateUrls()!
448 if (isNotEmptyArray(stationTemplateUrls)) {
449 for (const stationTemplateUrl of stationTemplateUrls) {
450 const templateName = parse(stationTemplateUrl.file).name
451 this.chargingStationsByTemplate.set(templateName, {
452 configured: stationTemplateUrl.numberOfStations,
453 added: 0,
454 started: 0,
455 lastIndex: 0
456 })
457 this.uiServer?.chargingStationTemplates.add(templateName)
458 }
459 if (this.chargingStationsByTemplate.size !== stationTemplateUrls.length) {
460 console.error(
461 chalk.red(
462 "'stationTemplateUrls' contains duplicate entries, please check your configuration"
463 )
464 )
465 exit(exitCodes.duplicateChargingStationTemplateUrls)
466 }
467 } else {
468 console.error(
469 chalk.red("'stationTemplateUrls' not defined or empty, please check your configuration")
470 )
471 exit(exitCodes.missingChargingStationsConfiguration)
472 }
473 if (
474 this.numberOfConfiguredChargingStations === 0 &&
475 Configuration.getConfigurationSection<UIServerConfiguration>(ConfigurationSection.uiServer)
476 .enabled !== true
477 ) {
478 console.error(
479 chalk.red(
480 "'stationTemplateUrls' has no charging station enabled and UI server is disabled, please check your configuration"
481 )
482 )
483 exit(exitCodes.noChargingStationTemplates)
484 }
485 this.initializedCounters = true
486 }
487 }
488
489 public async addChargingStation (index: number, stationTemplateFile: string): Promise<void> {
490 await this.workerImplementation?.addElement({
491 index,
492 templateFile: join(
493 dirname(fileURLToPath(import.meta.url)),
494 'assets',
495 'station-templates',
496 stationTemplateFile
497 )
498 })
499 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
500 this.chargingStationsByTemplate.get(parse(stationTemplateFile).name)!.lastIndex = max(
501 index,
502 this.chargingStationsByTemplate.get(parse(stationTemplateFile).name)?.lastIndex ?? -Infinity
503 )
504 }
505
506 private gracefulShutdown (): void {
507 this.stop()
508 .then(() => {
509 console.info(chalk.green('Graceful shutdown'))
510 this.uiServer?.stop()
511 this.waitChargingStationsStopped()
512 .then(() => {
513 exit(exitCodes.succeeded)
514 })
515 .catch(() => {
516 exit(exitCodes.gracefulShutdownError)
517 })
518 })
519 .catch(error => {
520 console.error(chalk.red('Error while shutdowning charging stations simulator: '), error)
521 exit(exitCodes.gracefulShutdownError)
522 })
523 }
524
525 private readonly logPrefix = (): string => {
526 return logPrefix(' Bootstrap |')
527 }
528 }