fix: ensure event listeners are always removed at simulator stop
[e-mobility-charging-stations-simulator.git] / src / charging-station / Bootstrap.ts
index 66b2d5f835146782bdb9a703b5002c9ab540146c..6620764559974d12e6ca8a89cf7ccae497937e22 100644 (file)
@@ -2,8 +2,8 @@
 
 import { EventEmitter } from 'node:events';
 import { dirname, extname, join } from 'node:path';
+import process, { exit } from 'node:process';
 import { fileURLToPath } from 'node:url';
-import { isMainThread } from 'node:worker_threads';
 
 import chalk from 'chalk';
 import { availableParallelism } from 'poolifier';
@@ -11,7 +11,7 @@ import { availableParallelism } from 'poolifier';
 import { waitChargingStationEvents } from './Helpers';
 import type { AbstractUIServer } from './ui-server/AbstractUIServer';
 import { UIServerFactory } from './ui-server/UIServerFactory';
-import { version } from '../../package.json' assert { type: 'json' };
+import { version } from '../../package.json';
 import { BaseError } from '../exception';
 import { type Storage, StorageFactory } from '../performance';
 import {
@@ -55,21 +55,20 @@ export class Bootstrap extends EventEmitter {
   private static instance: Bootstrap | null = null;
   public numberOfChargingStations!: number;
   public numberOfChargingStationTemplates!: number;
-  private workerImplementation: WorkerAbstract<ChargingStationWorkerData> | null;
-  private readonly uiServer!: AbstractUIServer | null;
-  private readonly storage!: Storage;
+  private workerImplementation?: WorkerAbstract<ChargingStationWorkerData>;
+  private readonly uiServer?: AbstractUIServer;
+  private storage?: Storage;
   private numberOfStartedChargingStations!: number;
   private readonly version: string = version;
   private initializedCounters: boolean;
   private started: boolean;
   private starting: boolean;
   private stopping: boolean;
-  private readonly workerScript: string;
 
   private constructor() {
     super();
     for (const signal of ['SIGINT', 'SIGQUIT', 'SIGTERM']) {
-      process.on(signal, this.gracefulShutdown);
+      process.on(signal, this.gracefulShutdown.bind(this));
     }
     // Enable unconditionally for now
     handleUnhandledRejection();
@@ -79,27 +78,10 @@ export class Bootstrap extends EventEmitter {
     this.stopping = false;
     this.initializedCounters = false;
     this.initializeCounters();
-    this.workerImplementation = null;
-    this.workerScript = join(
-      dirname(fileURLToPath(import.meta.url)),
-      `ChargingStationWorker${extname(fileURLToPath(import.meta.url))}`,
+    this.uiServer = UIServerFactory.getUIServerImplementation(
+      Configuration.getConfigurationSection<UIServerConfiguration>(ConfigurationSection.uiServer),
     );
-    const uiServerConfiguration = Configuration.getConfigurationSection<UIServerConfiguration>(
-      ConfigurationSection.uiServer,
-    );
-    uiServerConfiguration.enabled === true &&
-      (this.uiServer = UIServerFactory.getUIServerImplementation(uiServerConfiguration));
-    const performanceStorageConfiguration =
-      Configuration.getConfigurationSection<StorageConfiguration>(
-        ConfigurationSection.performanceStorage,
-      );
-    performanceStorageConfiguration.enabled === true &&
-      (this.storage = StorageFactory.getStorage(
-        performanceStorageConfiguration.type!,
-        performanceStorageConfiguration.uri!,
-        this.logPrefix(),
-      ));
-    Configuration.configurationChangeCallback = async () => Bootstrap.getInstance().restart();
+    Configuration.configurationChangeCallback = async () => Bootstrap.getInstance().restart(false);
   }
 
   public static getInstance(): Bootstrap {
@@ -110,20 +92,36 @@ export class Bootstrap extends EventEmitter {
   }
 
   public async start(): Promise<void> {
-    if (!isMainThread) {
-      throw new BaseError('Cannot start charging stations simulator from worker thread');
-    }
     if (this.started === false) {
       if (this.starting === false) {
         this.starting = true;
+        this.on(ChargingStationWorkerMessageEvents.started, this.workerEventStarted);
+        this.on(ChargingStationWorkerMessageEvents.stopped, this.workerEventStopped);
+        this.on(ChargingStationWorkerMessageEvents.updated, this.workerEventUpdated);
+        this.on(
+          ChargingStationWorkerMessageEvents.performanceStatistics,
+          this.workerEventPerformanceStatistics,
+        );
         this.initializeCounters();
         const workerConfiguration = Configuration.getConfigurationSection<WorkerConfiguration>(
           ConfigurationSection.worker,
         );
         this.initializeWorkerImplementation(workerConfiguration);
         await this.workerImplementation?.start();
-        await this.storage?.open();
-        this.uiServer?.start();
+        const performanceStorageConfiguration =
+          Configuration.getConfigurationSection<StorageConfiguration>(
+            ConfigurationSection.performanceStorage,
+          );
+        if (performanceStorageConfiguration.enabled === true) {
+          this.storage = StorageFactory.getStorage(
+            performanceStorageConfiguration.type!,
+            performanceStorageConfiguration.uri!,
+            this.logPrefix(),
+          );
+          await this.storage?.open();
+        }
+        Configuration.getConfigurationSection<UIServerConfiguration>(ConfigurationSection.uiServer)
+          .enabled === true && this.uiServer?.start();
         // Start ChargingStation object instance in worker thread
         for (const stationTemplateUrl of Configuration.getStationTemplateUrls()!) {
           try {
@@ -162,7 +160,7 @@ export class Bootstrap extends EventEmitter {
         Configuration.workerDynamicPoolInUse() &&
           console.warn(
             chalk.yellow(
-              'Charging stations simulator is using dynamic pool mode. This is an experimental feature with known issues.\nPlease consider using static pool or worker set mode instead',
+              'Charging stations simulator is using dynamic pool mode. This is an experimental feature with known issues.\nPlease consider using fixed pool or worker set mode instead',
             ),
           );
         console.info(chalk.green('Worker set/pool information:'), this.workerImplementation?.info);
@@ -176,40 +174,30 @@ export class Bootstrap extends EventEmitter {
     }
   }
 
-  public async stop(): Promise<void> {
-    if (!isMainThread) {
-      throw new BaseError('Cannot stop charging stations simulator from worker thread');
-    }
+  public async stop(stopChargingStations = true): Promise<void> {
     if (this.started === true) {
       if (this.stopping === false) {
         this.stopping = true;
-        await this.uiServer?.sendInternalRequest(
-          this.uiServer.buildProtocolRequest(
-            generateUUID(),
-            ProcedureName.STOP_CHARGING_STATION,
-            Constants.EMPTY_FROZEN_OBJECT,
-          ),
-        );
-        await Promise.race([
-          waitChargingStationEvents(
-            this,
-            ChargingStationWorkerMessageEvents.stopped,
-            this.numberOfChargingStations,
-          ),
-          new Promise<string>((resolve) => {
-            setTimeout(() => {
-              const message = `Timeout reached ${formatDurationMilliSeconds(
-                Constants.STOP_SIMULATOR_TIMEOUT,
-              )} at stopping charging stations simulator`;
-              console.warn(chalk.yellow(message));
-              resolve(message);
-            }, Constants.STOP_SIMULATOR_TIMEOUT);
-          }),
-        ]);
+        if (stopChargingStations === true) {
+          await this.uiServer?.sendInternalRequest(
+            this.uiServer.buildProtocolRequest(
+              generateUUID(),
+              ProcedureName.STOP_CHARGING_STATION,
+              Constants.EMPTY_FROZEN_OBJECT,
+            ),
+          );
+          try {
+            await this.waitChargingStationsStopped();
+          } catch (error) {
+            console.error(chalk.red('Error while waiting for charging stations to stop: '), error);
+          }
+        }
         await this.workerImplementation?.stop();
-        this.workerImplementation = null;
+        delete this.workerImplementation;
+        this.removeAllListeners();
         this.uiServer?.stop();
         await this.storage?.close();
+        delete this.storage;
         this.resetCounters();
         this.initializedCounters = false;
         this.started = false;
@@ -222,34 +210,66 @@ export class Bootstrap extends EventEmitter {
     }
   }
 
-  public async restart(): Promise<void> {
-    await this.stop();
+  public async restart(stopChargingStations?: boolean): Promise<void> {
+    await this.stop(stopChargingStations);
     await this.start();
   }
 
+  private async waitChargingStationsStopped(): Promise<string> {
+    return new Promise<string>((resolve, reject) => {
+      const waitTimeout = setTimeout(() => {
+        const message = `Timeout ${formatDurationMilliSeconds(
+          Constants.STOP_CHARGING_STATIONS_TIMEOUT,
+        )} reached at stopping charging stations`;
+        console.warn(chalk.yellow(message));
+        reject(new Error(message));
+      }, Constants.STOP_CHARGING_STATIONS_TIMEOUT);
+      waitChargingStationEvents(
+        this,
+        ChargingStationWorkerMessageEvents.stopped,
+        this.numberOfChargingStations,
+      )
+        .then(() => {
+          resolve('Charging stations stopped');
+        })
+        .catch(reject)
+        .finally(() => {
+          clearTimeout(waitTimeout);
+        });
+    });
+  }
+
   private initializeWorkerImplementation(workerConfiguration: WorkerConfiguration): void {
     let elementsPerWorker: number | undefined;
-    if (workerConfiguration?.elementsPerWorker === 'auto') {
-      elementsPerWorker =
-        this.numberOfChargingStations > availableParallelism()
-          ? Math.round(this.numberOfChargingStations / (availableParallelism() * 1.5))
-          : 1;
+    switch (workerConfiguration?.elementsPerWorker) {
+      case 'auto':
+        elementsPerWorker =
+          this.numberOfChargingStations > availableParallelism()
+            ? Math.round(this.numberOfChargingStations / (availableParallelism() * 1.5))
+            : 1;
+        break;
+      case 'all':
+        elementsPerWorker = this.numberOfChargingStations;
+        break;
     }
-    this.workerImplementation === null &&
-      (this.workerImplementation = WorkerFactory.getWorkerImplementation<ChargingStationWorkerData>(
-        this.workerScript,
-        workerConfiguration.processType!,
-        {
-          workerStartDelay: workerConfiguration.startDelay,
-          elementStartDelay: workerConfiguration.elementStartDelay,
-          poolMaxSize: workerConfiguration.poolMaxSize!,
-          poolMinSize: workerConfiguration.poolMinSize!,
-          elementsPerWorker: elementsPerWorker ?? (workerConfiguration.elementsPerWorker as number),
-          poolOptions: {
-            messageHandler: this.messageHandler.bind(this) as (message: unknown) => void,
-          },
+    this.workerImplementation = WorkerFactory.getWorkerImplementation<ChargingStationWorkerData>(
+      join(
+        dirname(fileURLToPath(import.meta.url)),
+        `ChargingStationWorker${extname(fileURLToPath(import.meta.url))}`,
+      ),
+      workerConfiguration.processType!,
+      {
+        workerStartDelay: workerConfiguration.startDelay,
+        elementStartDelay: workerConfiguration.elementStartDelay,
+        poolMaxSize: workerConfiguration.poolMaxSize!,
+        poolMinSize: workerConfiguration.poolMinSize!,
+        elementsPerWorker: elementsPerWorker ?? (workerConfiguration.elementsPerWorker as number),
+        poolOptions: {
+          messageHandler: this.messageHandler.bind(this) as (message: unknown) => void,
+          workerOptions: { resourceLimits: workerConfiguration.resourceLimits },
         },
-      ));
+      },
+    );
   }
 
   private messageHandler(
@@ -258,26 +278,22 @@ export class Bootstrap extends EventEmitter {
     // logger.debug(
     //   `${this.logPrefix()} ${moduleName}.messageHandler: Worker channel message received: ${JSON.stringify(
     //     msg,
-    //     null,
+    //     undefined,
     //     2,
     //   )}`,
     // );
     try {
       switch (msg.event) {
         case ChargingStationWorkerMessageEvents.started:
-          this.workerEventStarted(msg.data as ChargingStationData);
           this.emit(ChargingStationWorkerMessageEvents.started, msg.data as ChargingStationData);
           break;
         case ChargingStationWorkerMessageEvents.stopped:
-          this.workerEventStopped(msg.data as ChargingStationData);
           this.emit(ChargingStationWorkerMessageEvents.stopped, msg.data as ChargingStationData);
           break;
         case ChargingStationWorkerMessageEvents.updated:
-          this.workerEventUpdated(msg.data as ChargingStationData);
           this.emit(ChargingStationWorkerMessageEvents.updated, msg.data as ChargingStationData);
           break;
         case ChargingStationWorkerMessageEvents.performanceStatistics:
-          this.workerEventPerformanceStatistics(msg.data as Statistics);
           this.emit(
             ChargingStationWorkerMessageEvents.performanceStatistics,
             msg.data as Statistics,
@@ -296,7 +312,7 @@ export class Bootstrap extends EventEmitter {
           throw new BaseError(
             `Unknown charging station worker event: '${
               msg.event
-            }' received with data: ${JSON.stringify(msg.data, null, 2)}`,
+            }' received with data: ${JSON.stringify(msg.data, undefined, 2)}`,
           );
       }
     } catch (error) {
@@ -338,7 +354,7 @@ export class Bootstrap extends EventEmitter {
   };
 
   private workerEventPerformanceStatistics = (data: Statistics) => {
-    this.storage.storePerformanceStatistics(data) as void;
+    this.storage?.storePerformanceStatistics(data) as void;
   };
 
   private initializeCounters() {
@@ -354,13 +370,13 @@ export class Bootstrap extends EventEmitter {
         console.warn(
           chalk.yellow("'stationTemplateUrls' not defined or empty in configuration, exiting"),
         );
-        process.exit(exitCodes.missingChargingStationsConfiguration);
+        exit(exitCodes.missingChargingStationsConfiguration);
       }
       if (this.numberOfChargingStations === 0) {
         console.warn(
           chalk.yellow('No charging station template enabled in configuration, exiting'),
         );
-        process.exit(exitCodes.noChargingStationTemplates);
+        exit(exitCodes.noChargingStationTemplates);
       }
       this.initializedCounters = true;
     }
@@ -387,17 +403,24 @@ export class Bootstrap extends EventEmitter {
     });
   }
 
-  private gracefulShutdown = (): void => {
-    console.info(`${chalk.green('Graceful shutdown')}`);
+  private gracefulShutdown(): void {
     this.stop()
       .then(() => {
-        process.exit(exitCodes.succeeded);
+        console.info(`${chalk.green('Graceful shutdown')}`);
+        // stop() asks for charging stations to stop by default
+        this.waitChargingStationsStopped()
+          .then(() => {
+            exit(exitCodes.succeeded);
+          })
+          .catch(() => {
+            exit(exitCodes.gracefulShutdownError);
+          });
       })
       .catch((error) => {
         console.error(chalk.red('Error while shutdowning charging stations simulator: '), error);
-        process.exit(exitCodes.gracefulShutdownError);
+        exit(exitCodes.gracefulShutdownError);
       });
-  };
+  }
 
   private logPrefix = (): string => {
     return logPrefix(' Bootstrap |');