perf: allow to fine tune worker_threads ressource configuration
[e-mobility-charging-stations-simulator.git] / src / charging-station / Bootstrap.ts
1 // Partial Copyright Jerome Benoit. 2021-2023. All Rights Reserved.
2
3 import { EventEmitter } from 'node:events';
4 import { dirname, extname, join } from 'node:path';
5 import process, { exit } from 'node:process';
6 import { fileURLToPath } from 'node:url';
7
8 import chalk from 'chalk';
9 import { availableParallelism } from 'poolifier';
10
11 import { waitChargingStationEvents } from './Helpers';
12 import type { AbstractUIServer } from './ui-server/AbstractUIServer';
13 import { UIServerFactory } from './ui-server/UIServerFactory';
14 import { version } from '../../package.json';
15 import { BaseError } from '../exception';
16 import { type Storage, StorageFactory } from '../performance';
17 import {
18 type ChargingStationData,
19 type ChargingStationWorkerData,
20 type ChargingStationWorkerMessage,
21 type ChargingStationWorkerMessageData,
22 ChargingStationWorkerMessageEvents,
23 ConfigurationSection,
24 ProcedureName,
25 type StationTemplateUrl,
26 type Statistics,
27 type StorageConfiguration,
28 type UIServerConfiguration,
29 type WorkerConfiguration,
30 } from '../types';
31 import {
32 Configuration,
33 Constants,
34 formatDurationMilliSeconds,
35 generateUUID,
36 handleUncaughtException,
37 handleUnhandledRejection,
38 isNotEmptyArray,
39 isNullOrUndefined,
40 logPrefix,
41 logger,
42 } from '../utils';
43 import { type WorkerAbstract, WorkerFactory } from '../worker';
44
45 const moduleName = 'Bootstrap';
46
47 enum exitCodes {
48 succeeded = 0,
49 missingChargingStationsConfiguration = 1,
50 noChargingStationTemplates = 2,
51 gracefulShutdownError = 3,
52 }
53
54 export class Bootstrap extends EventEmitter {
55 private static instance: Bootstrap | null = null;
56 public numberOfChargingStations!: number;
57 public numberOfChargingStationTemplates!: number;
58 private workerImplementation?: WorkerAbstract<ChargingStationWorkerData>;
59 private readonly uiServer?: AbstractUIServer;
60 private storage?: Storage;
61 private numberOfStartedChargingStations!: number;
62 private readonly version: string = version;
63 private initializedCounters: boolean;
64 private started: boolean;
65 private starting: boolean;
66 private stopping: boolean;
67
68 private constructor() {
69 super();
70 for (const signal of ['SIGINT', 'SIGQUIT', 'SIGTERM']) {
71 process.on(signal, this.gracefulShutdown.bind(this));
72 }
73 // Enable unconditionally for now
74 handleUnhandledRejection();
75 handleUncaughtException();
76 this.started = false;
77 this.starting = false;
78 this.stopping = false;
79 this.initializedCounters = false;
80 this.initializeCounters();
81 this.uiServer = UIServerFactory.getUIServerImplementation(
82 Configuration.getConfigurationSection<UIServerConfiguration>(ConfigurationSection.uiServer),
83 );
84 this.on(ChargingStationWorkerMessageEvents.started, this.workerEventStarted);
85 this.on(ChargingStationWorkerMessageEvents.stopped, this.workerEventStopped);
86 this.on(ChargingStationWorkerMessageEvents.updated, this.workerEventUpdated);
87 this.on(
88 ChargingStationWorkerMessageEvents.performanceStatistics,
89 this.workerEventPerformanceStatistics,
90 );
91 Configuration.configurationChangeCallback = async () => Bootstrap.getInstance().restart(false);
92 }
93
94 public static getInstance(): Bootstrap {
95 if (Bootstrap.instance === null) {
96 Bootstrap.instance = new Bootstrap();
97 }
98 return Bootstrap.instance;
99 }
100
101 public async start(): Promise<void> {
102 if (this.started === false) {
103 if (this.starting === false) {
104 this.starting = true;
105 this.initializeCounters();
106 const workerConfiguration = Configuration.getConfigurationSection<WorkerConfiguration>(
107 ConfigurationSection.worker,
108 );
109 this.initializeWorkerImplementation(workerConfiguration);
110 await this.workerImplementation?.start();
111 const performanceStorageConfiguration =
112 Configuration.getConfigurationSection<StorageConfiguration>(
113 ConfigurationSection.performanceStorage,
114 );
115 if (performanceStorageConfiguration.enabled === true) {
116 this.storage = StorageFactory.getStorage(
117 performanceStorageConfiguration.type!,
118 performanceStorageConfiguration.uri!,
119 this.logPrefix(),
120 );
121 await this.storage?.open();
122 }
123 Configuration.getConfigurationSection<UIServerConfiguration>(ConfigurationSection.uiServer)
124 .enabled === true && this.uiServer?.start();
125 // Start ChargingStation object instance in worker thread
126 for (const stationTemplateUrl of Configuration.getStationTemplateUrls()!) {
127 try {
128 const nbStations = stationTemplateUrl.numberOfStations ?? 0;
129 for (let index = 1; index <= nbStations; index++) {
130 await this.startChargingStation(index, stationTemplateUrl);
131 }
132 } catch (error) {
133 console.error(
134 chalk.red(
135 `Error at starting charging station with template file ${stationTemplateUrl.file}: `,
136 ),
137 error,
138 );
139 }
140 }
141 console.info(
142 chalk.green(
143 `Charging stations simulator ${
144 this.version
145 } started with ${this.numberOfChargingStations.toString()} charging station(s) from ${this.numberOfChargingStationTemplates.toString()} configured charging station template(s) and ${
146 Configuration.workerDynamicPoolInUse()
147 ? `${workerConfiguration.poolMinSize?.toString()}/`
148 : ''
149 }${this.workerImplementation?.size}${
150 Configuration.workerPoolInUse()
151 ? `/${workerConfiguration.poolMaxSize?.toString()}`
152 : ''
153 } worker(s) concurrently running in '${workerConfiguration.processType}' mode${
154 !isNullOrUndefined(this.workerImplementation?.maxElementsPerWorker)
155 ? ` (${this.workerImplementation?.maxElementsPerWorker} charging station(s) per worker)`
156 : ''
157 }`,
158 ),
159 );
160 Configuration.workerDynamicPoolInUse() &&
161 console.warn(
162 chalk.yellow(
163 'Charging stations simulator is using dynamic pool mode. This is an experimental feature with known issues.\nPlease consider using fixed pool or worker set mode instead',
164 ),
165 );
166 console.info(chalk.green('Worker set/pool information:'), this.workerImplementation?.info);
167 this.started = true;
168 this.starting = false;
169 } else {
170 console.error(chalk.red('Cannot start an already starting charging stations simulator'));
171 }
172 } else {
173 console.error(chalk.red('Cannot start an already started charging stations simulator'));
174 }
175 }
176
177 public async stop(stopChargingStations = true): Promise<void> {
178 if (this.started === true) {
179 if (this.stopping === false) {
180 this.stopping = true;
181 if (stopChargingStations === true) {
182 await this.uiServer?.sendInternalRequest(
183 this.uiServer.buildProtocolRequest(
184 generateUUID(),
185 ProcedureName.STOP_CHARGING_STATION,
186 Constants.EMPTY_FROZEN_OBJECT,
187 ),
188 );
189 try {
190 await this.waitChargingStationsStopped();
191 } catch (error) {
192 console.error(chalk.red('Error while waiting for charging stations to stop: '), error);
193 }
194 }
195 await this.workerImplementation?.stop();
196 delete this.workerImplementation;
197 this.uiServer?.stop();
198 await this.storage?.close();
199 delete this.storage;
200 this.resetCounters();
201 this.initializedCounters = false;
202 this.started = false;
203 this.stopping = false;
204 } else {
205 console.error(chalk.red('Cannot stop an already stopping charging stations simulator'));
206 }
207 } else {
208 console.error(chalk.red('Cannot stop an already stopped charging stations simulator'));
209 }
210 }
211
212 public async restart(stopChargingStations?: boolean): Promise<void> {
213 await this.stop(stopChargingStations);
214 await this.start();
215 }
216
217 private async waitChargingStationsStopped(): Promise<string> {
218 return new Promise<string>((resolve, reject) => {
219 const waitTimeout = setTimeout(() => {
220 const message = `Timeout ${formatDurationMilliSeconds(
221 Constants.STOP_CHARGING_STATIONS_TIMEOUT,
222 )} reached at stopping charging stations`;
223 console.warn(chalk.yellow(message));
224 reject(new Error(message));
225 }, Constants.STOP_CHARGING_STATIONS_TIMEOUT);
226 waitChargingStationEvents(
227 this,
228 ChargingStationWorkerMessageEvents.stopped,
229 this.numberOfChargingStations,
230 )
231 .then(() => {
232 resolve('Charging stations stopped');
233 })
234 .catch(reject)
235 .finally(() => {
236 clearTimeout(waitTimeout);
237 });
238 });
239 }
240
241 private initializeWorkerImplementation(workerConfiguration: WorkerConfiguration): void {
242 let elementsPerWorker: number | undefined;
243 switch (workerConfiguration?.elementsPerWorker) {
244 case 'auto':
245 elementsPerWorker =
246 this.numberOfChargingStations > availableParallelism()
247 ? Math.round(this.numberOfChargingStations / (availableParallelism() * 1.5))
248 : 1;
249 break;
250 case 'single':
251 elementsPerWorker = this.numberOfChargingStations;
252 break;
253 }
254 this.workerImplementation = WorkerFactory.getWorkerImplementation<ChargingStationWorkerData>(
255 join(
256 dirname(fileURLToPath(import.meta.url)),
257 `ChargingStationWorker${extname(fileURLToPath(import.meta.url))}`,
258 ),
259 workerConfiguration.processType!,
260 {
261 workerStartDelay: workerConfiguration.startDelay,
262 elementStartDelay: workerConfiguration.elementStartDelay,
263 poolMaxSize: workerConfiguration.poolMaxSize!,
264 poolMinSize: workerConfiguration.poolMinSize!,
265 elementsPerWorker: elementsPerWorker ?? (workerConfiguration.elementsPerWorker as number),
266 poolOptions: {
267 messageHandler: this.messageHandler.bind(this) as (message: unknown) => void,
268 workerOptions: { resourceLimits: workerConfiguration.resourceLimits },
269 },
270 },
271 );
272 }
273
274 private messageHandler(
275 msg: ChargingStationWorkerMessage<ChargingStationWorkerMessageData>,
276 ): void {
277 // logger.debug(
278 // `${this.logPrefix()} ${moduleName}.messageHandler: Worker channel message received: ${JSON.stringify(
279 // msg,
280 // undefined,
281 // 2,
282 // )}`,
283 // );
284 try {
285 switch (msg.event) {
286 case ChargingStationWorkerMessageEvents.started:
287 this.emit(ChargingStationWorkerMessageEvents.started, msg.data as ChargingStationData);
288 break;
289 case ChargingStationWorkerMessageEvents.stopped:
290 this.emit(ChargingStationWorkerMessageEvents.stopped, msg.data as ChargingStationData);
291 break;
292 case ChargingStationWorkerMessageEvents.updated:
293 this.emit(ChargingStationWorkerMessageEvents.updated, msg.data as ChargingStationData);
294 break;
295 case ChargingStationWorkerMessageEvents.performanceStatistics:
296 this.emit(
297 ChargingStationWorkerMessageEvents.performanceStatistics,
298 msg.data as Statistics,
299 );
300 break;
301 case ChargingStationWorkerMessageEvents.startWorkerElementError:
302 logger.error(
303 `${this.logPrefix()} ${moduleName}.messageHandler: Error occured while starting worker element:`,
304 msg.data,
305 );
306 this.emit(ChargingStationWorkerMessageEvents.startWorkerElementError, msg.data);
307 break;
308 case ChargingStationWorkerMessageEvents.startedWorkerElement:
309 break;
310 default:
311 throw new BaseError(
312 `Unknown charging station worker event: '${
313 msg.event
314 }' received with data: ${JSON.stringify(msg.data, undefined, 2)}`,
315 );
316 }
317 } catch (error) {
318 logger.error(
319 `${this.logPrefix()} ${moduleName}.messageHandler: Error occurred while handling '${
320 msg.event
321 }' event:`,
322 error,
323 );
324 }
325 }
326
327 private workerEventStarted = (data: ChargingStationData) => {
328 this.uiServer?.chargingStations.set(data.stationInfo.hashId, data);
329 ++this.numberOfStartedChargingStations;
330 logger.info(
331 `${this.logPrefix()} ${moduleName}.workerEventStarted: Charging station ${
332 data.stationInfo.chargingStationId
333 } (hashId: ${data.stationInfo.hashId}) started (${
334 this.numberOfStartedChargingStations
335 } started from ${this.numberOfChargingStations})`,
336 );
337 };
338
339 private workerEventStopped = (data: ChargingStationData) => {
340 this.uiServer?.chargingStations.set(data.stationInfo.hashId, data);
341 --this.numberOfStartedChargingStations;
342 logger.info(
343 `${this.logPrefix()} ${moduleName}.workerEventStopped: Charging station ${
344 data.stationInfo.chargingStationId
345 } (hashId: ${data.stationInfo.hashId}) stopped (${
346 this.numberOfStartedChargingStations
347 } started from ${this.numberOfChargingStations})`,
348 );
349 };
350
351 private workerEventUpdated = (data: ChargingStationData) => {
352 this.uiServer?.chargingStations.set(data.stationInfo.hashId, data);
353 };
354
355 private workerEventPerformanceStatistics = (data: Statistics) => {
356 this.storage?.storePerformanceStatistics(data) as void;
357 };
358
359 private initializeCounters() {
360 if (this.initializedCounters === false) {
361 this.resetCounters();
362 const stationTemplateUrls = Configuration.getStationTemplateUrls()!;
363 if (isNotEmptyArray(stationTemplateUrls)) {
364 this.numberOfChargingStationTemplates = stationTemplateUrls.length;
365 for (const stationTemplateUrl of stationTemplateUrls) {
366 this.numberOfChargingStations += stationTemplateUrl.numberOfStations ?? 0;
367 }
368 } else {
369 console.warn(
370 chalk.yellow("'stationTemplateUrls' not defined or empty in configuration, exiting"),
371 );
372 exit(exitCodes.missingChargingStationsConfiguration);
373 }
374 if (this.numberOfChargingStations === 0) {
375 console.warn(
376 chalk.yellow('No charging station template enabled in configuration, exiting'),
377 );
378 exit(exitCodes.noChargingStationTemplates);
379 }
380 this.initializedCounters = true;
381 }
382 }
383
384 private resetCounters(): void {
385 this.numberOfChargingStationTemplates = 0;
386 this.numberOfChargingStations = 0;
387 this.numberOfStartedChargingStations = 0;
388 }
389
390 private async startChargingStation(
391 index: number,
392 stationTemplateUrl: StationTemplateUrl,
393 ): Promise<void> {
394 await this.workerImplementation?.addElement({
395 index,
396 templateFile: join(
397 dirname(fileURLToPath(import.meta.url)),
398 'assets',
399 'station-templates',
400 stationTemplateUrl.file,
401 ),
402 });
403 }
404
405 private gracefulShutdown(): void {
406 this.stop()
407 .then(() => {
408 console.info(`${chalk.green('Graceful shutdown')}`);
409 // stop() asks for charging stations to stop by default
410 this.waitChargingStationsStopped()
411 .then(() => {
412 exit(exitCodes.succeeded);
413 })
414 .catch(() => {
415 exit(exitCodes.gracefulShutdownError);
416 });
417 })
418 .catch((error) => {
419 console.error(chalk.red('Error while shutdowning charging stations simulator: '), error);
420 exit(exitCodes.gracefulShutdownError);
421 });
422 }
423
424 private logPrefix = (): string => {
425 return logPrefix(' Bootstrap |');
426 };
427 }