feat: add error handling to worker set worker
[e-mobility-charging-stations-simulator.git] / src / charging-station / Bootstrap.ts
1 // Partial Copyright Jerome Benoit. 2021-2023. All Rights Reserved.
2
3 import { EventEmitter } from 'node:events';
4 import { dirname, extname, join } from 'node:path';
5 import { fileURLToPath } from 'node:url';
6 import { isMainThread } from 'node:worker_threads';
7
8 import chalk from 'chalk';
9 import { availableParallelism } from 'poolifier';
10
11 import { waitChargingStationEvents } from './ChargingStationUtils';
12 import type { AbstractUIServer } from './ui-server/AbstractUIServer';
13 import { UIServerFactory } from './ui-server/UIServerFactory';
14 import { version } from '../../package.json' assert { type: 'json' };
15 import { BaseError } from '../exception';
16 import { type Storage, StorageFactory } from '../performance';
17 import {
18 type ChargingStationData,
19 type ChargingStationWorkerData,
20 type ChargingStationWorkerMessage,
21 type ChargingStationWorkerMessageData,
22 ChargingStationWorkerMessageEvents,
23 ConfigurationSection,
24 ProcedureName,
25 type StationTemplateUrl,
26 type Statistics,
27 type StorageConfiguration,
28 type UIServerConfiguration,
29 type WorkerConfiguration,
30 } from '../types';
31 import {
32 Configuration,
33 Constants,
34 formatDurationMilliSeconds,
35 generateUUID,
36 handleUncaughtException,
37 handleUnhandledRejection,
38 isNotEmptyArray,
39 isNullOrUndefined,
40 logPrefix,
41 logger,
42 } from '../utils';
43 import { type WorkerAbstract, WorkerFactory } from '../worker';
44
45 const moduleName = 'Bootstrap';
46
47 enum exitCodes {
48 missingChargingStationsConfiguration = 1,
49 noChargingStationTemplates = 2,
50 }
51
52 export class Bootstrap extends EventEmitter {
53 private static instance: Bootstrap | null = null;
54 public numberOfChargingStations!: number;
55 public numberOfChargingStationTemplates!: number;
56 private workerImplementation: WorkerAbstract<ChargingStationWorkerData> | null;
57 private readonly uiServer!: AbstractUIServer | null;
58 private readonly storage!: Storage;
59 private numberOfStartedChargingStations!: number;
60 private readonly version: string = version;
61 private initializedCounters: boolean;
62 private started: boolean;
63 private starting: boolean;
64 private stopping: boolean;
65 private readonly workerScript: string;
66
67 private constructor() {
68 super();
69 for (const signal of ['SIGINT', 'SIGQUIT', 'SIGTERM']) {
70 process.on(signal, this.gracefulShutdown);
71 }
72 // Enable unconditionally for now
73 handleUnhandledRejection();
74 handleUncaughtException();
75 this.started = false;
76 this.starting = false;
77 this.stopping = false;
78 this.initializedCounters = false;
79 this.initializeCounters();
80 this.workerImplementation = null;
81 this.workerScript = join(
82 dirname(fileURLToPath(import.meta.url)),
83 `ChargingStationWorker${extname(fileURLToPath(import.meta.url))}`,
84 );
85 const uiServerConfiguration = Configuration.getConfigurationSection<UIServerConfiguration>(
86 ConfigurationSection.uiServer,
87 );
88 uiServerConfiguration.enabled === true &&
89 (this.uiServer = UIServerFactory.getUIServerImplementation(uiServerConfiguration));
90 const performanceStorageConfiguration =
91 Configuration.getConfigurationSection<StorageConfiguration>(
92 ConfigurationSection.performanceStorage,
93 );
94 performanceStorageConfiguration.enabled === true &&
95 (this.storage = StorageFactory.getStorage(
96 performanceStorageConfiguration.type!,
97 performanceStorageConfiguration.uri!,
98 this.logPrefix(),
99 ));
100 Configuration.setConfigurationChangeCallback(async () => Bootstrap.getInstance().restart());
101 }
102
103 public static getInstance(): Bootstrap {
104 if (Bootstrap.instance === null) {
105 Bootstrap.instance = new Bootstrap();
106 }
107 return Bootstrap.instance;
108 }
109
110 public async start(): Promise<void> {
111 if (!isMainThread) {
112 throw new Error('Cannot start charging stations simulator from worker thread');
113 }
114 if (this.started === false) {
115 if (this.starting === false) {
116 this.starting = true;
117 this.initializeCounters();
118 const workerConfiguration = Configuration.getConfigurationSection<WorkerConfiguration>(
119 ConfigurationSection.worker,
120 );
121 this.initializeWorkerImplementation(workerConfiguration);
122 await this.workerImplementation?.start();
123 await this.storage?.open();
124 this.uiServer?.start();
125 // Start ChargingStation object instance in worker thread
126 for (const stationTemplateUrl of Configuration.getStationTemplateUrls()!) {
127 try {
128 const nbStations = stationTemplateUrl.numberOfStations ?? 0;
129 for (let index = 1; index <= nbStations; index++) {
130 await this.startChargingStation(index, stationTemplateUrl);
131 }
132 } catch (error) {
133 console.error(
134 chalk.red(
135 `Error at starting charging station with template file ${stationTemplateUrl.file}: `,
136 ),
137 error,
138 );
139 }
140 }
141 console.info(
142 chalk.green(
143 `Charging stations simulator ${
144 this.version
145 } started with ${this.numberOfChargingStations.toString()} charging station(s) from ${this.numberOfChargingStationTemplates.toString()} configured charging station template(s) and ${
146 Configuration.workerDynamicPoolInUse()
147 ? `${workerConfiguration.poolMinSize?.toString()}/`
148 : ''
149 }${this.workerImplementation?.size}${
150 Configuration.workerPoolInUse()
151 ? `/${workerConfiguration.poolMaxSize?.toString()}`
152 : ''
153 } worker(s) concurrently running in '${workerConfiguration.processType}' mode${
154 !isNullOrUndefined(this.workerImplementation?.maxElementsPerWorker)
155 ? ` (${this.workerImplementation?.maxElementsPerWorker} charging station(s) per worker)`
156 : ''
157 }`,
158 ),
159 );
160 Configuration.workerDynamicPoolInUse() &&
161 console.warn(
162 chalk.yellow(
163 'Charging stations simulator is using dynamic pool mode. This is an experimental feature with known issues.\nPlease consider using static pool or worker set mode instead',
164 ),
165 );
166 console.info(chalk.green('Worker set/pool information:'), this.workerImplementation?.info);
167 this.started = true;
168 this.starting = false;
169 } else {
170 console.error(chalk.red('Cannot start an already starting charging stations simulator'));
171 }
172 } else {
173 console.error(chalk.red('Cannot start an already started charging stations simulator'));
174 }
175 }
176
177 public async stop(): Promise<void> {
178 if (!isMainThread) {
179 throw new Error('Cannot stop charging stations simulator from worker thread');
180 }
181 if (this.started === true) {
182 if (this.stopping === false) {
183 this.stopping = true;
184 await this.uiServer?.sendInternalRequest(
185 this.uiServer.buildProtocolRequest(
186 generateUUID(),
187 ProcedureName.STOP_CHARGING_STATION,
188 Constants.EMPTY_FREEZED_OBJECT,
189 ),
190 );
191 await Promise.race([
192 waitChargingStationEvents(
193 this,
194 ChargingStationWorkerMessageEvents.stopped,
195 this.numberOfChargingStations,
196 ),
197 new Promise<string>((resolve) => {
198 setTimeout(() => {
199 const message = `Timeout reached ${formatDurationMilliSeconds(
200 Constants.STOP_SIMULATOR_TIMEOUT,
201 )} at stopping charging stations simulator`;
202 console.warn(chalk.yellow(message));
203 resolve(message);
204 }, Constants.STOP_SIMULATOR_TIMEOUT);
205 }),
206 ]);
207 await this.workerImplementation?.stop();
208 this.workerImplementation = null;
209 this.uiServer?.stop();
210 await this.storage?.close();
211 this.resetCounters();
212 this.initializedCounters = false;
213 this.started = false;
214 this.stopping = false;
215 } else {
216 console.error(chalk.red('Cannot stop an already stopping charging stations simulator'));
217 }
218 } else {
219 console.error(chalk.red('Cannot stop an already stopped charging stations simulator'));
220 }
221 }
222
223 public async restart(): Promise<void> {
224 await this.stop();
225 await this.start();
226 }
227
228 private initializeWorkerImplementation(workerConfiguration: WorkerConfiguration): void {
229 let elementsPerWorker: number | undefined;
230 if (workerConfiguration?.elementsPerWorker === 'auto') {
231 elementsPerWorker =
232 this.numberOfChargingStations > availableParallelism()
233 ? Math.round(this.numberOfChargingStations / availableParallelism())
234 : 1;
235 }
236 this.workerImplementation === null &&
237 (this.workerImplementation = WorkerFactory.getWorkerImplementation<ChargingStationWorkerData>(
238 this.workerScript,
239 workerConfiguration.processType!,
240 {
241 workerStartDelay: workerConfiguration.startDelay,
242 elementStartDelay: workerConfiguration.elementStartDelay,
243 poolMaxSize: workerConfiguration.poolMaxSize!,
244 poolMinSize: workerConfiguration.poolMinSize!,
245 elementsPerWorker: elementsPerWorker ?? (workerConfiguration.elementsPerWorker as number),
246 poolOptions: {
247 messageHandler: this.messageHandler.bind(this) as (message: unknown) => void,
248 },
249 },
250 ));
251 }
252
253 private messageHandler(
254 msg: ChargingStationWorkerMessage<ChargingStationWorkerMessageData>,
255 ): void {
256 // logger.debug(
257 // `${this.logPrefix()} ${moduleName}.messageHandler: Worker channel message received: ${JSON.stringify(
258 // msg,
259 // null,
260 // 2,
261 // )}`,
262 // );
263 try {
264 switch (msg.event) {
265 case ChargingStationWorkerMessageEvents.started:
266 this.workerEventStarted(msg.data as ChargingStationData);
267 this.emit(ChargingStationWorkerMessageEvents.started, msg.data as ChargingStationData);
268 break;
269 case ChargingStationWorkerMessageEvents.stopped:
270 this.workerEventStopped(msg.data as ChargingStationData);
271 this.emit(ChargingStationWorkerMessageEvents.stopped, msg.data as ChargingStationData);
272 break;
273 case ChargingStationWorkerMessageEvents.updated:
274 this.workerEventUpdated(msg.data as ChargingStationData);
275 this.emit(ChargingStationWorkerMessageEvents.updated, msg.data as ChargingStationData);
276 break;
277 case ChargingStationWorkerMessageEvents.performanceStatistics:
278 this.workerEventPerformanceStatistics(msg.data as Statistics);
279 this.emit(
280 ChargingStationWorkerMessageEvents.performanceStatistics,
281 msg.data as Statistics,
282 );
283 break;
284 case ChargingStationWorkerMessageEvents.startWorkerElementError:
285 logger.error(
286 `${this.logPrefix()} ${moduleName}.messageHandler: Error occured while starting worker element:`,
287 msg.data,
288 );
289 this.emit(ChargingStationWorkerMessageEvents.startWorkerElementError, msg.data);
290 break;
291 case ChargingStationWorkerMessageEvents.startedWorkerElement:
292 break;
293 default:
294 throw new BaseError(
295 `Unknown event type: '${msg.event}' for data: ${JSON.stringify(msg.data, null, 2)}`,
296 );
297 }
298 } catch (error) {
299 logger.error(
300 `${this.logPrefix()} ${moduleName}.messageHandler: Error occurred while handling '${
301 msg.event
302 }' event:`,
303 error,
304 );
305 }
306 }
307
308 private workerEventStarted = (data: ChargingStationData) => {
309 this.uiServer?.chargingStations.set(data.stationInfo.hashId, data);
310 ++this.numberOfStartedChargingStations;
311 logger.info(
312 `${this.logPrefix()} ${moduleName}.workerEventStarted: Charging station ${
313 data.stationInfo.chargingStationId
314 } (hashId: ${data.stationInfo.hashId}) started (${
315 this.numberOfStartedChargingStations
316 } started from ${this.numberOfChargingStations})`,
317 );
318 };
319
320 private workerEventStopped = (data: ChargingStationData) => {
321 this.uiServer?.chargingStations.set(data.stationInfo.hashId, data);
322 --this.numberOfStartedChargingStations;
323 logger.info(
324 `${this.logPrefix()} ${moduleName}.workerEventStopped: Charging station ${
325 data.stationInfo.chargingStationId
326 } (hashId: ${data.stationInfo.hashId}) stopped (${
327 this.numberOfStartedChargingStations
328 } started from ${this.numberOfChargingStations})`,
329 );
330 };
331
332 private workerEventUpdated = (data: ChargingStationData) => {
333 this.uiServer?.chargingStations.set(data.stationInfo.hashId, data);
334 };
335
336 private workerEventPerformanceStatistics = (data: Statistics) => {
337 this.storage.storePerformanceStatistics(data) as void;
338 };
339
340 private initializeCounters() {
341 if (this.initializedCounters === false) {
342 this.resetCounters();
343 const stationTemplateUrls = Configuration.getStationTemplateUrls()!;
344 if (isNotEmptyArray(stationTemplateUrls)) {
345 this.numberOfChargingStationTemplates = stationTemplateUrls.length;
346 for (const stationTemplateUrl of stationTemplateUrls) {
347 this.numberOfChargingStations += stationTemplateUrl.numberOfStations ?? 0;
348 }
349 } else {
350 console.warn(
351 chalk.yellow("'stationTemplateUrls' not defined or empty in configuration, exiting"),
352 );
353 process.exit(exitCodes.missingChargingStationsConfiguration);
354 }
355 if (this.numberOfChargingStations === 0) {
356 console.warn(
357 chalk.yellow('No charging station template enabled in configuration, exiting'),
358 );
359 process.exit(exitCodes.noChargingStationTemplates);
360 }
361 this.initializedCounters = true;
362 }
363 }
364
365 private resetCounters(): void {
366 this.numberOfChargingStationTemplates = 0;
367 this.numberOfChargingStations = 0;
368 this.numberOfStartedChargingStations = 0;
369 }
370
371 private async startChargingStation(
372 index: number,
373 stationTemplateUrl: StationTemplateUrl,
374 ): Promise<void> {
375 await this.workerImplementation?.addElement({
376 index,
377 templateFile: join(
378 dirname(fileURLToPath(import.meta.url)),
379 'assets',
380 'station-templates',
381 stationTemplateUrl.file,
382 ),
383 });
384 }
385
386 private gracefulShutdown = (): void => {
387 console.info(`${chalk.green('Graceful shutdown')}`);
388 this.stop()
389 .then(() => {
390 process.exit(0);
391 })
392 .catch((error) => {
393 console.error(chalk.red('Error while shutdowning charging stations simulator: '), error);
394 process.exit(1);
395 });
396 };
397
398 private logPrefix = (): string => {
399 return logPrefix(' Bootstrap |');
400 };
401 }