Storage: use worker threads message passing to store performance records from
authorJérôme Benoit <jerome.benoit@sap.com>
Thu, 26 Aug 2021 21:58:19 +0000 (23:58 +0200)
committerJérôme Benoit <jerome.benoit@sap.com>
Thu, 26 Aug 2021 21:58:19 +0000 (23:58 +0200)
the main thread

Pool usage does not yet support it.

Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
src/charging-station/Bootstrap.ts
src/charging-station/StationWorker.ts
src/charging-station/ocpp/OCPPRequestService.ts
src/types/Worker.ts
src/utils/PerformanceStatistics.ts
src/utils/Utils.ts
src/worker/WorkerAbstract.ts
src/worker/WorkerDynamicPool.ts
src/worker/WorkerFactory.ts
src/worker/WorkerSet.ts
src/worker/WorkerStaticPool.ts

index 3cd83d1db43d9004042891d5c447ce6881b1d7fa..7fd000a20a5746d968f0538a536f617009946a16 100644 (file)
@@ -1,5 +1,8 @@
+import { StationWorkerData, WorkerEvents, WorkerMessage } from '../types/Worker';
+
 import Configuration from '../utils/Configuration';
-import { StationWorkerData } from '../types/Worker';
+import { Storage } from '../utils/performance-storage/Storage';
+import { StorageFactory } from '../utils/performance-storage/StorageFactory';
 import Utils from '../utils/Utils';
 import WorkerAbstract from '../worker/WorkerAbstract';
 import WorkerFactory from '../worker/WorkerFactory';
@@ -10,6 +13,7 @@ import { version } from '../../package.json';
 export default class Bootstrap {
   private static instance: Bootstrap | null = null;
   private static workerImplementation: WorkerAbstract | null = null;
+  private static storage: Storage;
   private version: string = version;
   private started: boolean;
   private workerScript: string;
@@ -18,6 +22,7 @@ export default class Bootstrap {
     this.started = false;
     this.workerScript = path.join(path.resolve(__dirname, '../'), 'charging-station', 'StationWorker.js');
     this.initWorkerImplementation();
+    Bootstrap.storage = StorageFactory.getStorage(Configuration.getPerformanceStorage().type, Configuration.getPerformanceStorage().URI, this.logPrefix());
     Configuration.setConfigurationChangeCallback(async () => Bootstrap.getInstance().restart());
   }
 
@@ -88,9 +93,17 @@ export default class Bootstrap {
         poolOptions: {
           workerChoiceStrategy: Configuration.getWorkerPoolStrategy()
         }
+      }, (msg: WorkerMessage) => {
+        if (msg.id === WorkerEvents.PERFORMANCE_STATISTICS) {
+          Bootstrap.storage.storePerformanceStatistics(msg.data);
+        }
       });
     if (!Bootstrap.workerImplementation) {
       throw new Error('Worker implementation not found');
     }
   }
+
+  private logPrefix(): string {
+    return Utils.logPrefix(' Bootstrap');
+  }
 }
index c3c5cb51346ce2f36f843f816c29eb27abbdb524..dd0f377ceff295c7df8980ece4931d5545dfa9da 100644 (file)
@@ -1,6 +1,6 @@
 // Partial Copyright Jerome Benoit. 2021. All Rights Reserved.
 
-import { StationWorkerData, WorkerEvents } from '../types/Worker';
+import { StationWorkerData, WorkerEvents, WorkerMessage } from '../types/Worker';
 import { parentPort, workerData } from 'worker_threads';
 
 import ChargingStation from './ChargingStation';
@@ -24,9 +24,9 @@ if (Utils.workerPoolInUse()) {
  * Listen messages send by the main thread
  */
 function addMessageListener(): void {
-  parentPort?.on('message', (message) => {
+  parentPort?.on('message', (message: WorkerMessage) => {
     if (message.id === WorkerEvents.START_WORKER_ELEMENT) {
-      startChargingStation(message.workerData);
+      startChargingStation(message.data);
     }
   });
 }
@@ -34,7 +34,7 @@ function addMessageListener(): void {
 /**
  * Create and start a charging station instance
  *
- * @param {StationWorkerData} data workerData
+ * @param data workerData
  */
 function startChargingStation(data: StationWorkerData): void {
   const station = new ChargingStation(data.index, data.templateFile);
index 5b1f981cf50a332d4fb3ecb75d3824f52408ac48..4c1d1b74f7eeef54fa7aef1e227cc196d7ddd520 100644 (file)
@@ -75,8 +75,8 @@ export default abstract class OCPPRequestService {
       /**
        * Function that will receive the request's response
        *
-       * @param {Record<string, unknown> | string} payload
-       * @param {Record<string, unknown>} requestPayload
+       * @param payload
+       * @param requestPayload
        */
       async function responseCallback(payload: Record<string, unknown> | string, requestPayload: Record<string, unknown>): Promise<void> {
         if (self.chargingStation.getEnableStatistics()) {
@@ -90,7 +90,7 @@ export default abstract class OCPPRequestService {
       /**
        * Function that will receive the request's rejection
        *
-       * @param {OCPPError} error
+       * @param error
        */
       function rejectCallback(error: OCPPError): void {
         if (self.chargingStation.getEnableStatistics()) {
index c1f6f0c89ae3c197d26eb1a37c6fa3edb2c1c328..eedf5611b773a6f723ce662b189e1572c99432b3 100644 (file)
@@ -24,11 +24,18 @@ export interface StationWorkerData extends WorkerData {
 }
 
 export interface WorkerSetElement {
-  worker: Worker,
-  numberOfWorkerElements: number
+  worker: Worker;
+  numberOfWorkerElements: number;
+}
+
+export interface WorkerMessage {
+  id: WorkerEvents;
+  data: any;
 }
 
 export enum WorkerEvents {
   START_WORKER_ELEMENT = 'startWorkerElement',
+  STOP_WORKER_ELEMENT = 'stopWorkerElement',
+  PERFORMANCE_STATISTICS = 'performanceStatistics'
 }
 
index 02b2854e185d7bc794cb4ba5184908366e19141e..4049197a0693b3e4eb74575b06f057f4add57ae2 100644 (file)
@@ -7,13 +7,12 @@ import Statistics, { StatisticsData } from '../types/Statistics';
 
 import Configuration from './Configuration';
 import { MessageType } from '../types/ocpp/MessageType';
-import { Storage } from './performance-storage/Storage';
-import { StorageFactory } from './performance-storage/StorageFactory';
 import Utils from './Utils';
+import { WorkerEvents } from '../types/Worker';
 import logger from './Logger';
+import { parentPort } from 'worker_threads';
 
 export default class PerformanceStatistics {
-  private static storage: Storage;
   private objId: string;
   private performanceObserver: PerformanceObserver;
   private statistics: Statistics;
@@ -191,18 +190,11 @@ export default class PerformanceStatistics {
     this.statistics.statisticsData[entryName].ninetyFiveThPercentileTimeMeasurement = this.percentile(this.statistics.statisticsData[entryName].timeMeasurementSeries, 95);
     this.statistics.statisticsData[entryName].stdDevTimeMeasurement = this.stdDeviation(this.statistics.statisticsData[entryName].timeMeasurementSeries);
     if (Configuration.getPerformanceStorage().enabled) {
-      this.getStorage().storePerformanceStatistics(this.statistics);
+      parentPort.postMessage({ id: WorkerEvents.PERFORMANCE_STATISTICS, data: this.statistics });
     }
   }
 
   private logPrefix(): string {
     return Utils.logPrefix(` ${this.objId} | Performance statistics`);
   }
-
-  private getStorage(): Storage {
-    if (!PerformanceStatistics.storage) {
-      PerformanceStatistics.storage = StorageFactory.getStorage(Configuration.getPerformanceStorage().type ,Configuration.getPerformanceStorage().URI, this.logPrefix());
-    }
-    return PerformanceStatistics.storage;
-  }
 }
index 5da54a2e150fb64940c563ca6494297f7f0f7306..74abb7822b4c73ab5efd947fa5a397b0a23075cb 100644 (file)
@@ -190,8 +190,8 @@ export default class Utils {
   static insertAt = (str: string, subStr: string, pos: number): string => `${str.slice(0, pos)}${subStr}${str.slice(pos)}`;
 
   /**
-   * @param {number} [retryNumber=0]
-   * @returns {number} delay in milliseconds
+   * @param [retryNumber=0]
+   * @returns delay in milliseconds
    */
   static exponentialDelay(retryNumber = 0): number {
     const delay = Math.pow(2, retryNumber) * 100;
@@ -202,8 +202,8 @@ export default class Utils {
   /**
    * Convert websocket error code to human readable string message
    *
-   * @param {number} code websocket error code
-   * @returns {string} human readable string message
+   * @param code websocket error code
+   * @returns human readable string message
    */
   static getWebSocketCloseEventStatusString(code: number): string {
     if (code >= 0 && code <= 999) {
index 715778161af13b59bed90aa2805b0120451c64b8..d5f9a58e4a00eeed20e24d3e9605776f943f2114 100644 (file)
@@ -4,18 +4,22 @@ import { WorkerData } from '../types/Worker';
 export default abstract class WorkerAbstract {
   protected readonly workerScript: string;
   protected readonly workerStartDelay: number;
+  protected readonly messageListener: (message: any) => void;
   public abstract size: number;
   public abstract maxElementsPerWorker: number | null;
 
   /**
    * `WorkerAbstract` constructor.
    *
-   * @param {string} workerScript
-   * @param {number} workerStartDelay
+   * @param workerScript
+   * @param workerStartDelay
+   * @param messageListenerCallback
    */
-  constructor(workerScript: string, workerStartDelay: number = Constants.WORKER_START_DELAY) {
+  constructor(workerScript: string, workerStartDelay: number = Constants.WORKER_START_DELAY,
+      messageListenerCallback: (message: any) => void = () => { /* This is intentional */ }) {
     this.workerScript = workerScript;
     this.workerStartDelay = workerStartDelay;
+    this.messageListener = messageListenerCallback;
   }
 
   public abstract start(): Promise<void>;
index 47d1e458b0c0593d5dd12226a9afc9a16036c029..0e1cd4cde877caf86cf4896fcf31271efeb4b53a 100644 (file)
@@ -12,14 +12,16 @@ export default class WorkerDynamicPool<T> extends WorkerAbstract {
   /**
    * Create a new `WorkerDynamicPool`.
    *
-   * @param {string} workerScript
-   * @param {number} min
-   * @param {number} max
-   * @param {number} workerStartDelay
-   * @param {PoolOptions} opts
+   * @param workerScript
+   * @param min
+   * @param max
+   * @param workerStartDelay
+   * @param opts
+   * @param messageListenerCallback
    */
-  constructor(workerScript: string, min: number, max: number, workerStartDelay?: number, opts?: PoolOptions<Worker>) {
-    super(workerScript, workerStartDelay);
+  constructor(workerScript: string, min: number, max: number, workerStartDelay?: number, opts?: PoolOptions<Worker>,
+      messageListenerCallback: (message: any) => void = () => { /* This is intentional */ }) {
+    super(workerScript, workerStartDelay, messageListenerCallback);
     opts.exitHandler = opts?.exitHandler ?? WorkerUtils.defaultExitHandler;
     this.pool = new DynamicThreadPool<WorkerData>(min, max, this.workerScript, opts);
   }
@@ -34,7 +36,7 @@ export default class WorkerDynamicPool<T> extends WorkerAbstract {
 
   /**
    *
-   * @returns {Promise<void>}
+   * @returns
    * @public
    */
   // eslint-disable-next-line @typescript-eslint/no-empty-function
@@ -44,7 +46,7 @@ export default class WorkerDynamicPool<T> extends WorkerAbstract {
 
   /**
    *
-   * @returns {Promise<void>}
+   * @returns
    * @public
    */
   // eslint-disable-next-line @typescript-eslint/require-await
@@ -54,8 +56,8 @@ export default class WorkerDynamicPool<T> extends WorkerAbstract {
 
   /**
    *
-   * @param {T} elementData
-   * @returns {Promise<void>}
+   * @param elementData
+   * @returns
    * @public
    */
   public async addElement(elementData: T): Promise<void> {
index 8ccd6698bc3bffd2e33e71e3d4270541f6c46694..af2c53f9b3d5c86817a4204bb806fe53a3917f50 100644 (file)
@@ -13,7 +13,8 @@ export default class WorkerFactory {
     // This is intentional
   }
 
-  public static getWorkerImplementation<T>(workerScript: string, workerProcessType: WorkerProcessType, options?: WorkerOptions): WorkerAbstract | null {
+  public static getWorkerImplementation<T>(workerScript: string, workerProcessType: WorkerProcessType, options?: WorkerOptions,
+      messageListenerCallback: (message: any) => void = () => { /* This is intentional */ }): WorkerAbstract | null {
     if (!isMainThread) {
       throw new Error('Trying to get a worker implementation outside the main thread');
     }
@@ -23,16 +24,16 @@ export default class WorkerFactory {
     switch (workerProcessType) {
       case WorkerProcessType.WORKER_SET:
         options.elementsPerWorker = options.elementsPerWorker ?? Constants.DEFAULT_CHARGING_STATIONS_PER_WORKER;
-        workerImplementation = new WorkerSet<T>(workerScript, options.elementsPerWorker, options.startDelay);
+        workerImplementation = new WorkerSet<T>(workerScript, options.elementsPerWorker, options.startDelay, messageListenerCallback);
         break;
       case WorkerProcessType.STATIC_POOL:
         options.poolMaxSize = options.poolMaxSize ?? Constants.DEFAULT_WORKER_POOL_MAX_SIZE;
-        workerImplementation = new WorkerStaticPool<T>(workerScript, options.poolMaxSize, options.startDelay, options.poolOptions);
+        workerImplementation = new WorkerStaticPool<T>(workerScript, options.poolMaxSize, options.startDelay, options.poolOptions, messageListenerCallback);
         break;
       case WorkerProcessType.DYNAMIC_POOL:
         options.poolMinSize = options.poolMinSize ?? Constants.DEFAULT_WORKER_POOL_MIN_SIZE;
         options.poolMaxSize = options.poolMaxSize ?? Constants.DEFAULT_WORKER_POOL_MAX_SIZE;
-        workerImplementation = new WorkerDynamicPool<T>(workerScript, options.poolMinSize, options.poolMaxSize, options.startDelay, options.poolOptions);
+        workerImplementation = new WorkerDynamicPool<T>(workerScript, options.poolMinSize, options.poolMaxSize, options.startDelay, options.poolOptions, messageListenerCallback);
         break;
     }
     return workerImplementation;
index 32956104d288808de4150d776a113ad065c8ed80..1b82f1d751a1854d1ed4af9782e4b4b345e71c44 100644 (file)
@@ -14,12 +14,13 @@ export default class WorkerSet<T> extends WorkerAbstract {
   /**
    * Create a new `WorkerSet`.
    *
-   * @param {string} workerScript
-   * @param {number} maxElementsPerWorker
-   * @param {number} workerStartDelay
+   * @param workerScript
+   * @param maxElementsPerWorker
+   * @param workerStartDelay
+   * @param messageListenerCallback
    */
-  constructor(workerScript: string, maxElementsPerWorker = 1, workerStartDelay?: number) {
-    super(workerScript, workerStartDelay);
+  constructor(workerScript: string, maxElementsPerWorker = 1, workerStartDelay?: number, messageListenerCallback: (message: any) => void = () => { /* This is intentional */ }) {
+    super(workerScript, workerStartDelay, messageListenerCallback);
     this.workerSet = new Set<WorkerSetElement>();
     this.maxElementsPerWorker = maxElementsPerWorker;
   }
@@ -30,8 +31,8 @@ export default class WorkerSet<T> extends WorkerAbstract {
 
   /**
    *
-   * @param {T} elementData
-   * @returns {Promise<void>}
+   * @param elementData
+   * @returns
    * @public
    */
   public async addElement(elementData: T): Promise<void> {
@@ -43,13 +44,13 @@ export default class WorkerSet<T> extends WorkerAbstract {
       // Start worker sequentially to optimize memory at startup
       await Utils.sleep(this.workerStartDelay);
     }
-    this.getLastWorker().postMessage({ id: WorkerEvents.START_WORKER_ELEMENT, workerData: elementData });
+    this.getLastWorker().postMessage({ id: WorkerEvents.START_WORKER_ELEMENT, data: elementData });
     this.getLastWorkerSetElement().numberOfWorkerElements++;
   }
 
   /**
    *
-   * @returns {Promise<void>}
+   * @returns
    * @public
    */
   public async start(): Promise<void> {
@@ -60,7 +61,7 @@ export default class WorkerSet<T> extends WorkerAbstract {
 
   /**
    *
-   * @returns {Promise<void>}
+   * @returns
    * @public
    */
   public async stop(): Promise<void> {
@@ -76,7 +77,7 @@ export default class WorkerSet<T> extends WorkerAbstract {
    */
   private startWorker(): void {
     const worker = new Worker(this.workerScript);
-    worker.on('message', () => { /* This is intentional */ });
+    worker.on('message', this.messageListener);
     worker.on('error', () => { /* This is intentional */ });
     worker.on('exit', (code) => {
       WorkerUtils.defaultExitHandler(code);
index a7bb193270e88346347f42fa1d2c98efcc31f806..d5e9222618fb1ae37cdb1cfcf563ca4936a99594 100644 (file)
@@ -12,13 +12,15 @@ export default class WorkerStaticPool<T> extends WorkerAbstract {
   /**
    * Create a new `WorkerStaticPool`.
    *
-   * @param {string} workerScript
-   * @param {number} numberOfThreads
-   * @param {number} startWorkerDelay
-   * @param {PoolOptions} opts
+   * @param workerScript
+   * @param numberOfThreads
+   * @param startWorkerDelay
+   * @param opts
+   * @param messageListenerCallback
    */
-  constructor(workerScript: string, numberOfThreads: number, startWorkerDelay?: number, opts?: PoolOptions<Worker>) {
-    super(workerScript, startWorkerDelay);
+  constructor(workerScript: string, numberOfThreads: number, startWorkerDelay?: number, opts?: PoolOptions<Worker>,
+      messageListenerCallback: (message: any) => void = () => { /* This is intentional */ }) {
+    super(workerScript, startWorkerDelay, messageListenerCallback);
     opts.exitHandler = opts?.exitHandler ?? WorkerUtils.defaultExitHandler;
     this.pool = new FixedThreadPool(numberOfThreads, this.workerScript, opts);
   }
@@ -33,15 +35,17 @@ export default class WorkerStaticPool<T> extends WorkerAbstract {
 
   /**
    *
-   * @returns {Promise<void>}
+   * @returns
    * @public
    */
   // eslint-disable-next-line @typescript-eslint/no-empty-function
-  public async start(): Promise<void> {}
+  public async start(): Promise<void> {
+    // This is intentional
+  }
 
   /**
    *
-   * @returns {Promise<void>}
+   * @returns
    * @public
    */
   public async stop(): Promise<void> {
@@ -50,8 +54,8 @@ export default class WorkerStaticPool<T> extends WorkerAbstract {
 
   /**
    *
-   * @param {T} elementData
-   * @returns {Promise<void>}
+   * @param elementData
+   * @returns
    * @public
    */
   public async addElement(elementData: T): Promise<void> {