fix: untangle worker pool/set init from start
[e-mobility-charging-stations-simulator.git] / src / charging-station / Bootstrap.ts
1 // Partial Copyright Jerome Benoit. 2021-2024. All Rights Reserved.
2
3 import { EventEmitter } from 'node:events'
4 import { dirname, extname, join } from 'node:path'
5 import process, { exit } from 'node:process'
6 import { fileURLToPath } from 'node:url'
7 import { isMainThread } from 'node:worker_threads'
8
9 import chalk from 'chalk'
10 import { availableParallelism, type MessageHandler } from 'poolifier'
11 import type { Worker } from 'worker_threads'
12
13 import { version } from '../../package.json'
14 import { BaseError } from '../exception/index.js'
15 import { type Storage, StorageFactory } from '../performance/index.js'
16 import {
17 type ChargingStationData,
18 type ChargingStationOptions,
19 type ChargingStationWorkerData,
20 type ChargingStationWorkerEventError,
21 type ChargingStationWorkerMessage,
22 type ChargingStationWorkerMessageData,
23 ChargingStationWorkerMessageEvents,
24 ConfigurationSection,
25 ProcedureName,
26 type SimulatorState,
27 type Statistics,
28 type StorageConfiguration,
29 type TemplateStatistics,
30 type UIServerConfiguration,
31 type WorkerConfiguration
32 } from '../types/index.js'
33 import {
34 Configuration,
35 Constants,
36 formatDurationMilliSeconds,
37 generateUUID,
38 handleUncaughtException,
39 handleUnhandledRejection,
40 isAsyncFunction,
41 isNotEmptyArray,
42 logger,
43 logPrefix
44 } from '../utils/index.js'
45 import { type WorkerAbstract, WorkerFactory } from '../worker/index.js'
46 import { buildTemplateName, waitChargingStationEvents } from './Helpers.js'
47 import type { AbstractUIServer } from './ui-server/AbstractUIServer.js'
48 import { UIServerFactory } from './ui-server/UIServerFactory.js'
49
50 const moduleName = 'Bootstrap'
51
52 enum exitCodes {
53 succeeded = 0,
54 missingChargingStationsConfiguration = 1,
55 duplicateChargingStationTemplateUrls = 2,
56 noChargingStationTemplates = 3,
57 gracefulShutdownError = 4
58 }
59
60 export class Bootstrap extends EventEmitter {
61 private static instance: Bootstrap | null = null
62 private workerImplementation?: WorkerAbstract<ChargingStationWorkerData>
63 private readonly uiServer: AbstractUIServer
64 private storage?: Storage
65 private readonly templateStatistics: Map<string, TemplateStatistics>
66 private readonly version: string = version
67 private initializedCounters: boolean
68 private started: boolean
69 private starting: boolean
70 private stopping: boolean
71 private uiServerStarted: boolean
72
73 private constructor () {
74 super()
75 for (const signal of ['SIGINT', 'SIGQUIT', 'SIGTERM']) {
76 process.on(signal, this.gracefulShutdown.bind(this))
77 }
78 // Enable unconditionally for now
79 handleUnhandledRejection()
80 handleUncaughtException()
81 this.started = false
82 this.starting = false
83 this.stopping = false
84 this.initializedCounters = false
85 this.uiServerStarted = false
86 this.templateStatistics = new Map<string, TemplateStatistics>()
87 this.initializeWorkerImplementation(
88 Configuration.getConfigurationSection<WorkerConfiguration>(ConfigurationSection.worker)
89 )
90 this.uiServer = UIServerFactory.getUIServerImplementation(
91 Configuration.getConfigurationSection<UIServerConfiguration>(ConfigurationSection.uiServer)
92 )
93 this.initializeCounters()
94 Configuration.configurationChangeCallback = async () => {
95 if (isMainThread) {
96 await Bootstrap.getInstance().restart()
97 }
98 }
99 }
100
101 public static getInstance (): Bootstrap {
102 if (Bootstrap.instance === null) {
103 Bootstrap.instance = new Bootstrap()
104 }
105 return Bootstrap.instance
106 }
107
108 public get numberOfChargingStationTemplates (): number {
109 return this.templateStatistics.size
110 }
111
112 public get numberOfConfiguredChargingStations (): number {
113 return [...this.templateStatistics.values()].reduce(
114 (accumulator, value) => accumulator + value.configured,
115 0
116 )
117 }
118
119 public getState (): SimulatorState {
120 return {
121 version: this.version,
122 started: this.started,
123 templateStatistics: this.templateStatistics
124 }
125 }
126
127 public getLastIndex (templateName: string): number {
128 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
129 const indexes = [...this.templateStatistics.get(templateName)!.indexes]
130 .concat(0)
131 .sort((a, b) => a - b)
132 for (let i = 0; i < indexes.length - 1; i++) {
133 if (indexes[i + 1] - indexes[i] !== 1) {
134 return indexes[i]
135 }
136 }
137 return indexes[indexes.length - 1]
138 }
139
140 public getPerformanceStatistics (): IterableIterator<Statistics> | undefined {
141 return this.storage?.getPerformanceStatistics()
142 }
143
144 private get numberOfAddedChargingStations (): number {
145 return [...this.templateStatistics.values()].reduce(
146 (accumulator, value) => accumulator + value.added,
147 0
148 )
149 }
150
151 private get numberOfStartedChargingStations (): number {
152 return [...this.templateStatistics.values()].reduce(
153 (accumulator, value) => accumulator + value.started,
154 0
155 )
156 }
157
158 public async start (): Promise<void> {
159 if (!this.started) {
160 if (!this.starting) {
161 this.starting = true
162 this.on(ChargingStationWorkerMessageEvents.added, this.workerEventAdded)
163 this.on(ChargingStationWorkerMessageEvents.deleted, this.workerEventDeleted)
164 this.on(ChargingStationWorkerMessageEvents.started, this.workerEventStarted)
165 this.on(ChargingStationWorkerMessageEvents.stopped, this.workerEventStopped)
166 this.on(ChargingStationWorkerMessageEvents.updated, this.workerEventUpdated)
167 this.on(
168 ChargingStationWorkerMessageEvents.performanceStatistics,
169 this.workerEventPerformanceStatistics
170 )
171 this.on(
172 ChargingStationWorkerMessageEvents.workerElementError,
173 (eventError: ChargingStationWorkerEventError) => {
174 logger.error(
175 `${this.logPrefix()} ${moduleName}.start: Error occurred while handling '${eventError.event}' event on worker:`,
176 eventError
177 )
178 }
179 )
180 this.initializeCounters()
181 // eslint-disable-next-line @typescript-eslint/unbound-method
182 if (isAsyncFunction(this.workerImplementation?.start)) {
183 await this.workerImplementation.start()
184 } else {
185 (this.workerImplementation?.start as () => void)()
186 }
187 const performanceStorageConfiguration =
188 Configuration.getConfigurationSection<StorageConfiguration>(
189 ConfigurationSection.performanceStorage
190 )
191 if (performanceStorageConfiguration.enabled === true) {
192 this.storage = StorageFactory.getStorage(
193 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
194 performanceStorageConfiguration.type!,
195 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
196 performanceStorageConfiguration.uri!,
197 this.logPrefix()
198 )
199 await this.storage?.open()
200 }
201 if (
202 !this.uiServerStarted &&
203 Configuration.getConfigurationSection<UIServerConfiguration>(
204 ConfigurationSection.uiServer
205 ).enabled === true
206 ) {
207 this.uiServer.start()
208 this.uiServerStarted = true
209 }
210 // Start ChargingStation object instance in worker thread
211 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
212 for (const stationTemplateUrl of Configuration.getStationTemplateUrls()!) {
213 try {
214 const nbStations = stationTemplateUrl.numberOfStations
215 for (let index = 1; index <= nbStations; index++) {
216 await this.addChargingStation(index, stationTemplateUrl.file)
217 }
218 } catch (error) {
219 console.error(
220 chalk.red(
221 `Error at starting charging station with template file ${stationTemplateUrl.file}: `
222 ),
223 error
224 )
225 }
226 }
227 const workerConfiguration = Configuration.getConfigurationSection<WorkerConfiguration>(
228 ConfigurationSection.worker
229 )
230 console.info(
231 chalk.green(
232 `Charging stations simulator ${
233 this.version
234 } started with ${this.numberOfConfiguredChargingStations} configured charging station(s) from ${this.numberOfChargingStationTemplates} charging station template(s) and ${
235 Configuration.workerDynamicPoolInUse() ? `${workerConfiguration.poolMinSize}/` : ''
236 }${this.workerImplementation?.size}${
237 Configuration.workerPoolInUse() ? `/${workerConfiguration.poolMaxSize}` : ''
238 } worker(s) concurrently running in '${workerConfiguration.processType}' mode${
239 this.workerImplementation?.maxElementsPerWorker != null
240 ? ` (${this.workerImplementation.maxElementsPerWorker} charging station(s) per worker)`
241 : ''
242 }`
243 )
244 )
245 Configuration.workerDynamicPoolInUse() &&
246 console.warn(
247 chalk.yellow(
248 'Charging stations simulator is using dynamic pool mode. This is an experimental feature with known issues.\nPlease consider using fixed pool or worker set mode instead'
249 )
250 )
251 console.info(chalk.green('Worker set/pool information:'), this.workerImplementation?.info)
252 this.started = true
253 this.starting = false
254 } else {
255 console.error(chalk.red('Cannot start an already starting charging stations simulator'))
256 }
257 } else {
258 console.error(chalk.red('Cannot start an already started charging stations simulator'))
259 }
260 }
261
262 public async stop (): Promise<void> {
263 if (this.started) {
264 if (!this.stopping) {
265 this.stopping = true
266 await this.uiServer.sendInternalRequest(
267 this.uiServer.buildProtocolRequest(
268 generateUUID(),
269 ProcedureName.STOP_CHARGING_STATION,
270 Constants.EMPTY_FROZEN_OBJECT
271 )
272 )
273 try {
274 await this.waitChargingStationsStopped()
275 } catch (error) {
276 console.error(chalk.red('Error while waiting for charging stations to stop: '), error)
277 }
278 await this.workerImplementation?.stop()
279 this.removeAllListeners()
280 this.uiServer.clearCaches()
281 this.initializedCounters = false
282 await this.storage?.close()
283 delete this.storage
284 this.started = false
285 this.stopping = false
286 } else {
287 console.error(chalk.red('Cannot stop an already stopping charging stations simulator'))
288 }
289 } else {
290 console.error(chalk.red('Cannot stop an already stopped charging stations simulator'))
291 }
292 }
293
294 private async restart (): Promise<void> {
295 await this.stop()
296 // FIXME: initialize worker implementation only if the worker section has changed
297 this.initializeWorkerImplementation(
298 Configuration.getConfigurationSection<WorkerConfiguration>(ConfigurationSection.worker)
299 )
300 if (
301 this.uiServerStarted &&
302 Configuration.getConfigurationSection<UIServerConfiguration>(ConfigurationSection.uiServer)
303 .enabled !== true
304 ) {
305 this.uiServer.stop()
306 this.uiServerStarted = false
307 }
308 await this.start()
309 }
310
311 private async waitChargingStationsStopped (): Promise<string> {
312 return await new Promise<string>((resolve, reject: (reason?: unknown) => void) => {
313 const waitTimeout = setTimeout(() => {
314 const timeoutMessage = `Timeout ${formatDurationMilliSeconds(
315 Constants.STOP_CHARGING_STATIONS_TIMEOUT
316 )} reached at stopping charging stations`
317 console.warn(chalk.yellow(timeoutMessage))
318 reject(new Error(timeoutMessage))
319 }, Constants.STOP_CHARGING_STATIONS_TIMEOUT)
320 waitChargingStationEvents(
321 this,
322 ChargingStationWorkerMessageEvents.stopped,
323 this.numberOfStartedChargingStations
324 )
325 .then(() => {
326 resolve('Charging stations stopped')
327 })
328 .catch(reject)
329 .finally(() => {
330 clearTimeout(waitTimeout)
331 })
332 })
333 }
334
335 private initializeWorkerImplementation (workerConfiguration: WorkerConfiguration): void {
336 if (!isMainThread) {
337 return
338 }
339 let elementsPerWorker: number
340 switch (workerConfiguration.elementsPerWorker) {
341 case 'all':
342 elementsPerWorker = this.numberOfConfiguredChargingStations
343 break
344 case 'auto':
345 default:
346 elementsPerWorker =
347 this.numberOfConfiguredChargingStations > availableParallelism()
348 ? Math.round(this.numberOfConfiguredChargingStations / (availableParallelism() * 1.5))
349 : 1
350 break
351 }
352 this.workerImplementation = WorkerFactory.getWorkerImplementation<ChargingStationWorkerData>(
353 join(
354 dirname(fileURLToPath(import.meta.url)),
355 `ChargingStationWorker${extname(fileURLToPath(import.meta.url))}`
356 ),
357 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
358 workerConfiguration.processType!,
359 {
360 workerStartDelay: workerConfiguration.startDelay,
361 elementAddDelay: workerConfiguration.elementAddDelay,
362 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
363 poolMaxSize: workerConfiguration.poolMaxSize!,
364 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
365 poolMinSize: workerConfiguration.poolMinSize!,
366 elementsPerWorker,
367 poolOptions: {
368 messageHandler: this.messageHandler.bind(this) as MessageHandler<Worker>,
369 workerOptions: { resourceLimits: workerConfiguration.resourceLimits }
370 }
371 }
372 )
373 }
374
375 private messageHandler (
376 msg: ChargingStationWorkerMessage<ChargingStationWorkerMessageData>
377 ): void {
378 // logger.debug(
379 // `${this.logPrefix()} ${moduleName}.messageHandler: Worker channel message received: ${JSON.stringify(
380 // msg,
381 // undefined,
382 // 2
383 // )}`
384 // )
385 try {
386 switch (msg.event) {
387 case ChargingStationWorkerMessageEvents.added:
388 this.emit(ChargingStationWorkerMessageEvents.added, msg.data)
389 break
390 case ChargingStationWorkerMessageEvents.deleted:
391 this.emit(ChargingStationWorkerMessageEvents.deleted, msg.data)
392 break
393 case ChargingStationWorkerMessageEvents.started:
394 this.emit(ChargingStationWorkerMessageEvents.started, msg.data)
395 break
396 case ChargingStationWorkerMessageEvents.stopped:
397 this.emit(ChargingStationWorkerMessageEvents.stopped, msg.data)
398 break
399 case ChargingStationWorkerMessageEvents.updated:
400 this.emit(ChargingStationWorkerMessageEvents.updated, msg.data)
401 break
402 case ChargingStationWorkerMessageEvents.performanceStatistics:
403 this.emit(ChargingStationWorkerMessageEvents.performanceStatistics, msg.data)
404 break
405 case ChargingStationWorkerMessageEvents.addedWorkerElement:
406 this.emit(ChargingStationWorkerMessageEvents.addWorkerElement, msg.data)
407 break
408 case ChargingStationWorkerMessageEvents.workerElementError:
409 this.emit(ChargingStationWorkerMessageEvents.workerElementError, msg.data)
410 break
411 default:
412 throw new BaseError(
413 `Unknown charging station worker event: '${
414 msg.event
415 }' received with data: ${JSON.stringify(msg.data, undefined, 2)}`
416 )
417 }
418 } catch (error) {
419 logger.error(
420 `${this.logPrefix()} ${moduleName}.messageHandler: Error occurred while handling '${
421 msg.event
422 }' event:`,
423 error
424 )
425 }
426 }
427
428 private readonly workerEventAdded = (data: ChargingStationData): void => {
429 this.uiServer.chargingStations.set(data.stationInfo.hashId, data)
430 logger.info(
431 `${this.logPrefix()} ${moduleName}.workerEventAdded: Charging station ${
432 data.stationInfo.chargingStationId
433 } (hashId: ${data.stationInfo.hashId}) added (${
434 this.numberOfAddedChargingStations
435 } added from ${this.numberOfConfiguredChargingStations} configured charging station(s))`
436 )
437 }
438
439 private readonly workerEventDeleted = (data: ChargingStationData): void => {
440 this.uiServer.chargingStations.delete(data.stationInfo.hashId)
441 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
442 const templateStatistics = this.templateStatistics.get(data.stationInfo.templateName)!
443 --templateStatistics.added
444 templateStatistics.indexes.delete(data.stationInfo.templateIndex)
445 logger.info(
446 `${this.logPrefix()} ${moduleName}.workerEventDeleted: Charging station ${
447 data.stationInfo.chargingStationId
448 } (hashId: ${data.stationInfo.hashId}) deleted (${
449 this.numberOfAddedChargingStations
450 } added from ${this.numberOfConfiguredChargingStations} configured charging station(s))`
451 )
452 }
453
454 private readonly workerEventStarted = (data: ChargingStationData): void => {
455 this.uiServer.chargingStations.set(data.stationInfo.hashId, data)
456 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
457 ++this.templateStatistics.get(data.stationInfo.templateName)!.started
458 logger.info(
459 `${this.logPrefix()} ${moduleName}.workerEventStarted: Charging station ${
460 data.stationInfo.chargingStationId
461 } (hashId: ${data.stationInfo.hashId}) started (${
462 this.numberOfStartedChargingStations
463 } started from ${this.numberOfAddedChargingStations} added charging station(s))`
464 )
465 }
466
467 private readonly workerEventStopped = (data: ChargingStationData): void => {
468 this.uiServer.chargingStations.set(data.stationInfo.hashId, data)
469 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
470 --this.templateStatistics.get(data.stationInfo.templateName)!.started
471 logger.info(
472 `${this.logPrefix()} ${moduleName}.workerEventStopped: Charging station ${
473 data.stationInfo.chargingStationId
474 } (hashId: ${data.stationInfo.hashId}) stopped (${
475 this.numberOfStartedChargingStations
476 } started from ${this.numberOfAddedChargingStations} added charging station(s))`
477 )
478 }
479
480 private readonly workerEventUpdated = (data: ChargingStationData): void => {
481 this.uiServer.chargingStations.set(data.stationInfo.hashId, data)
482 }
483
484 private readonly workerEventPerformanceStatistics = (data: Statistics): void => {
485 // eslint-disable-next-line @typescript-eslint/unbound-method
486 if (isAsyncFunction(this.storage?.storePerformanceStatistics)) {
487 (
488 this.storage.storePerformanceStatistics as (
489 performanceStatistics: Statistics
490 ) => Promise<void>
491 )(data).catch(Constants.EMPTY_FUNCTION)
492 } else {
493 (this.storage?.storePerformanceStatistics as (performanceStatistics: Statistics) => void)(
494 data
495 )
496 }
497 }
498
499 private initializeCounters (): void {
500 if (!this.initializedCounters) {
501 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
502 const stationTemplateUrls = Configuration.getStationTemplateUrls()!
503 if (isNotEmptyArray(stationTemplateUrls)) {
504 for (const stationTemplateUrl of stationTemplateUrls) {
505 const templateName = buildTemplateName(stationTemplateUrl.file)
506 this.templateStatistics.set(templateName, {
507 configured: stationTemplateUrl.numberOfStations,
508 added: 0,
509 started: 0,
510 indexes: new Set<number>()
511 })
512 this.uiServer.chargingStationTemplates.add(templateName)
513 }
514 if (this.templateStatistics.size !== stationTemplateUrls.length) {
515 console.error(
516 chalk.red(
517 "'stationTemplateUrls' contains duplicate entries, please check your configuration"
518 )
519 )
520 exit(exitCodes.duplicateChargingStationTemplateUrls)
521 }
522 } else {
523 console.error(
524 chalk.red("'stationTemplateUrls' not defined or empty, please check your configuration")
525 )
526 exit(exitCodes.missingChargingStationsConfiguration)
527 }
528 if (
529 this.numberOfConfiguredChargingStations === 0 &&
530 Configuration.getConfigurationSection<UIServerConfiguration>(ConfigurationSection.uiServer)
531 .enabled !== true
532 ) {
533 console.error(
534 chalk.red(
535 "'stationTemplateUrls' has no charging station enabled and UI server is disabled, please check your configuration"
536 )
537 )
538 exit(exitCodes.noChargingStationTemplates)
539 }
540 this.initializedCounters = true
541 }
542 }
543
544 public async addChargingStation (
545 index: number,
546 templateFile: string,
547 options?: ChargingStationOptions
548 ): Promise<void> {
549 if (!this.started && !this.starting) {
550 throw new BaseError(
551 'Cannot add charging station while the charging stations simulator is not started'
552 )
553 }
554 await this.workerImplementation?.addElement({
555 index,
556 templateFile: join(
557 dirname(fileURLToPath(import.meta.url)),
558 'assets',
559 'station-templates',
560 templateFile
561 ),
562 options
563 })
564 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
565 const templateStatistics = this.templateStatistics.get(buildTemplateName(templateFile))!
566 ++templateStatistics.added
567 templateStatistics.indexes.add(index)
568 }
569
570 private gracefulShutdown (): void {
571 this.stop()
572 .then(() => {
573 console.info(chalk.green('Graceful shutdown'))
574 this.uiServer.stop()
575 this.uiServerStarted = false
576 this.waitChargingStationsStopped()
577 .then(() => {
578 exit(exitCodes.succeeded)
579 })
580 .catch(() => {
581 exit(exitCodes.gracefulShutdownError)
582 })
583 })
584 .catch((error: unknown) => {
585 console.error(chalk.red('Error while shutdowning charging stations simulator: '), error)
586 exit(exitCodes.gracefulShutdownError)
587 })
588 }
589
590 private readonly logPrefix = (): string => {
591 return logPrefix(' Bootstrap |')
592 }
593 }