Memory optimization + worker configuration
authorJakob <jakob-ingenfeld@gmx.de>
Wed, 20 Jan 2021 16:00:56 +0000 (17:00 +0100)
committerJakob <jakob-ingenfeld@gmx.de>
Wed, 20 Jan 2021 16:00:56 +0000 (17:00 +0100)
src/assets/config-template.json
src/charging-station/StationWorker.ts
src/charging-station/Worker.ts
src/start.ts
src/types/ConfigurationData.ts
src/utils/Configuration.ts
src/utils/Constants.ts

index adfe80ef5510346c4941f4415ae13961d9f175f7..61067bcf9839bc40656b254eda4137d46227cb2a 100644 (file)
@@ -6,6 +6,8 @@
   "statisticsDisplayInterval": 60,
   "useWorkerPool": false,
   "workerPoolSize": 16,
+  "chargingStationsPerWorker": 1,
+  "chargingStationIdSuffix": "",
   "stationTemplateURLs": [
     {
       "file": "./src/assets/station-templates/siemens.station-template.json",
index 68c3ea24c0f96c4def2f5c187050bd5e816dc04b..c7bc2337b0e772c4d73a3712b0606b9981500096 100644 (file)
@@ -1,8 +1,26 @@
-import { isMainThread, workerData } from 'worker_threads';
+import { isMainThread, parentPort, workerData } from 'worker_threads';
+import Constants from '../utils/Constants';
 
 import ChargingStation from './ChargingStation';
 
 if (!isMainThread) {
   const station = new ChargingStation(workerData.index as number, workerData.templateFile as string);
   station.start();
+
+  // Listener: start new charging station from main thread
+  addListener();
+}
+
+function addListener() {
+  parentPort.setMaxListeners(1000);
+  parentPort.on("message", e => {
+    if (e.id === Constants.START_NEW_CHARGING_STATION) {
+        startChargingStation(e.workerData);
+    }
+  });
+}
+
+function startChargingStation(data: any) {
+  const station = new ChargingStation(data.index as number, data.templateFile as string);
+  station.start();
 }
index 57748831ed5b4d00e6b6bd94663e984916d437c8..fbfdf3d1f047fa1886febf126750265ae73c5d79 100644 (file)
@@ -3,12 +3,14 @@ import { Worker, WorkerOptions } from 'worker_threads';
 import Configuration from '../utils/Configuration';
 import Pool from 'worker-threads-pool';
 import WorkerData from '../types/WorkerData';
+import Constants from '../utils/Constants';
 
 export default class Wrk {
   private _workerScript: string;
   private _workerData: WorkerData;
   private _index: number;
   private _concurrentWorkers: number;
+  private _worker: Worker;
 
   /**
    * Create a new `Wrk`.
@@ -42,11 +44,25 @@ export default class Wrk {
    * @return {Promise}
    * @public
    */
-  async start(): Promise<unknown> {
+  async start(): Promise<Worker> {
     if (Configuration.useWorkerPool()) {
-      return this._startWorkerWithPool();
+      this._startWorkerWithPool();
+    } else {
+      this._startWorker();
     }
-    return this._startWorker();
+    return this._worker;
+  }
+
+    /**
+   *
+   * @return {Promise}
+   * @public
+   */
+  async startNewChargingStation(workerData: WorkerData, numConcurrentWorkers: number): Promise<void> {
+    this._workerData = workerData;
+    this._index = workerData.index;
+    this._concurrentWorkers = numConcurrentWorkers;
+    this._worker.postMessage({ id : Constants.START_NEW_CHARGING_STATION, workerData: workerData });
   }
 
   /**
@@ -62,6 +78,7 @@ export default class Wrk {
         }
         worker.once('message', resolve);
         worker.once('error', reject);
+        this._worker = worker;
       });
     });
   }
@@ -81,6 +98,7 @@ export default class Wrk {
           reject(new Error(`Worker id ${this._index} stopped with exit code ${code}`));
         }
       });
+      this._worker = worker;
     });
   }
 }
@@ -92,7 +110,7 @@ class WorkerPool {
   private constructor() { }
 
   public static getInstance(): Pool {
-    if (!WorkerPool._instance) {
+    if (!WorkerPool._instance || (WorkerPool._instance?.size === WorkerPool.concurrentWorkers)) {
       WorkerPool._instance = new Pool({ max: WorkerPool.concurrentWorkers });
     }
     return WorkerPool._instance;
index e841b57ec63692d173ea6f8e9d69703be546e668..fb58c2d28ca2187dbcf0bb48b8e5b4905f23c512 100644 (file)
@@ -1,31 +1,49 @@
 import Configuration from './utils/Configuration';
 import { StationTemplateURL } from './types/ConfigurationData';
+import Utils from './utils/Utils';
 import Wrk from './charging-station/Worker';
+import WorkerData from './types/WorkerData';
+import fs from 'fs';
 
 class Bootstrap {
-  static start() {
+  static async start() {
     try {
       let numStationsTotal = 0;
       let numConcurrentWorkers = 0;
+      let worker: Wrk;
+      let chargingStationsPerWorker = Configuration.getChargingStationsPerWorker();
+      let counter = 0;
       // Start each ChargingStation object in a worker thread
       if (Configuration.getStationTemplateURLs()) {
-        Configuration.getStationTemplateURLs().forEach((stationURL: StationTemplateURL) => {
+        for await (const stationURL of Configuration.getStationTemplateURLs()) {
           try {
             const nbStations = stationURL.numberOfStations ? stationURL.numberOfStations : 0;
             numStationsTotal += nbStations;
             for (let index = 1; index <= nbStations; index++) {
-              const worker = new Wrk('./dist/charging-station/StationWorker.js', {
+              const workerData = {
                 index,
-                templateFile: stationURL.file,
-              }, numStationsTotal);
-              worker.start().catch(() => {});
+                templateFile: stationURL.file
+              } as WorkerData;
+              if(counter === 0 || counter === chargingStationsPerWorker) {
+                // Start new worker with one charging station
+                worker = await new Wrk('./dist/charging-station/StationWorker.js', workerData, numStationsTotal);
+                worker.start().catch(() => {});
+                counter = 0;
+                // Start workers sequentially to optimize memory at start time
+                await Utils.sleep(500);
+              } else {
+                // Add new charging station to existing Worker
+                worker.startNewChargingStation(workerData, numStationsTotal)
+              }
+              counter++;
+              // Start charging station sequentially to optimize memory at start time
               numConcurrentWorkers = worker.concurrentWorkers;
             }
           } catch (error) {
             // eslint-disable-next-line no-console
             console.log('Charging station start with template file ' + stationURL.file + ' error ' + JSON.stringify(error, null, ' '));
           }
-        });
+        }
       } else {
         console.log('No stationTemplateURLs defined in configuration, exiting');
       }
index 371e9c5d558587d137a031655f2dd4cc3c4757f9..e056471cfe5ed160513d727e5ca124627d07fb7a 100644 (file)
@@ -12,6 +12,8 @@ export default interface ConfigurationData {
   distributeStationsToTenantsEqually?: boolean;
   useWorkerPool?: boolean;
   workerPoolSize?: number;
+  chargingStationsPerWorker: number;
+  chargingStationIdSuffix: string;
   logFormat?: string;
   logLevel?: string;
   logRotate?: boolean;
index ca98427eb87cee39c22c6a720e5998599e2fbabb..090e2e2c066f5e6a2e5fec886762e372fd12f327 100644 (file)
@@ -45,6 +45,14 @@ export default class Configuration {
     return Configuration.getConfig().workerPoolSize;
   }
 
+  static getChargingStationsPerWorker(): number {
+    return Configuration.objectHasOwnProperty(Configuration.getConfig(), 'chargingStationsPerWorker') ? Configuration.getConfig().chargingStationsPerWorker : 1;
+  }
+
+  static getChargingStationIdSuffix(): string {
+    return Configuration.objectHasOwnProperty(Configuration.getConfig(), 'chargingStationIdSuffix') ? Configuration.getConfig().chargingStationIdSuffix : '';
+  }
+
   static getLogConsole(): boolean {
     Configuration.deprecateConfigurationKey('consoleLog', 'Use \'logConsole\' instead');
     return Configuration.objectHasOwnProperty(Configuration.getConfig(), 'logConsole') ? Configuration.getConfig().logConsole : false;
index 7afe4cac1b5ba0ea973b0c2858be8dd208b6ea7e..6e5eb6972441ecb4109906591d1010c50ed5f2c9 100644 (file)
@@ -34,4 +34,6 @@ export default class Constants {
   static readonly CHARGING_STATION_ATG_WAIT_TIME = 2000; // Ms
 
   static readonly TRANSACTION_DEFAULT_IDTAG = '00000000';
+
+  static readonly START_NEW_CHARGING_STATION = 'startNewChargingStation';
 }