+import { StationWorkerData, WorkerEvents, WorkerMessage } from '../types/Worker';
+
import Configuration from '../utils/Configuration';
-import { StationWorkerData } from '../types/Worker';
+import { Storage } from '../utils/performance-storage/Storage';
+import { StorageFactory } from '../utils/performance-storage/StorageFactory';
import Utils from '../utils/Utils';
import WorkerAbstract from '../worker/WorkerAbstract';
import WorkerFactory from '../worker/WorkerFactory';
export default class Bootstrap {
private static instance: Bootstrap | null = null;
private static workerImplementation: WorkerAbstract | null = null;
+ private static storage: Storage;
private version: string = version;
private started: boolean;
private workerScript: string;
this.started = false;
this.workerScript = path.join(path.resolve(__dirname, '../'), 'charging-station', 'StationWorker.js');
this.initWorkerImplementation();
+ Bootstrap.storage = StorageFactory.getStorage(Configuration.getPerformanceStorage().type, Configuration.getPerformanceStorage().URI, this.logPrefix());
Configuration.setConfigurationChangeCallback(async () => Bootstrap.getInstance().restart());
}
poolOptions: {
workerChoiceStrategy: Configuration.getWorkerPoolStrategy()
}
+ }, (msg: WorkerMessage) => {
+ if (msg.id === WorkerEvents.PERFORMANCE_STATISTICS) {
+ Bootstrap.storage.storePerformanceStatistics(msg.data);
+ }
});
if (!Bootstrap.workerImplementation) {
throw new Error('Worker implementation not found');
}
}
+
+ private logPrefix(): string {
+ return Utils.logPrefix(' Bootstrap');
+ }
}
// Partial Copyright Jerome Benoit. 2021. All Rights Reserved.
-import { StationWorkerData, WorkerEvents } from '../types/Worker';
+import { StationWorkerData, WorkerEvents, WorkerMessage } from '../types/Worker';
import { parentPort, workerData } from 'worker_threads';
import ChargingStation from './ChargingStation';
* Listen messages send by the main thread
*/
function addMessageListener(): void {
- parentPort?.on('message', (message) => {
+ parentPort?.on('message', (message: WorkerMessage) => {
if (message.id === WorkerEvents.START_WORKER_ELEMENT) {
- startChargingStation(message.workerData);
+ startChargingStation(message.data);
}
});
}
/**
* Create and start a charging station instance
*
- * @param {StationWorkerData} data workerData
+ * @param data workerData
*/
function startChargingStation(data: StationWorkerData): void {
const station = new ChargingStation(data.index, data.templateFile);
/**
* Function that will receive the request's response
*
- * @param {Record<string, unknown> | string} payload
- * @param {Record<string, unknown>} requestPayload
+ * @param payload
+ * @param requestPayload
*/
async function responseCallback(payload: Record<string, unknown> | string, requestPayload: Record<string, unknown>): Promise<void> {
if (self.chargingStation.getEnableStatistics()) {
/**
* Function that will receive the request's rejection
*
- * @param {OCPPError} error
+ * @param error
*/
function rejectCallback(error: OCPPError): void {
if (self.chargingStation.getEnableStatistics()) {
}
export interface WorkerSetElement {
- worker: Worker,
- numberOfWorkerElements: number
+ worker: Worker;
+ numberOfWorkerElements: number;
+}
+
+export interface WorkerMessage {
+ id: WorkerEvents;
+ data: any;
}
export enum WorkerEvents {
START_WORKER_ELEMENT = 'startWorkerElement',
+ STOP_WORKER_ELEMENT = 'stopWorkerElement',
+ PERFORMANCE_STATISTICS = 'performanceStatistics'
}
import Configuration from './Configuration';
import { MessageType } from '../types/ocpp/MessageType';
-import { Storage } from './performance-storage/Storage';
-import { StorageFactory } from './performance-storage/StorageFactory';
import Utils from './Utils';
+import { WorkerEvents } from '../types/Worker';
import logger from './Logger';
+import { parentPort } from 'worker_threads';
export default class PerformanceStatistics {
- private static storage: Storage;
private objId: string;
private performanceObserver: PerformanceObserver;
private statistics: Statistics;
this.statistics.statisticsData[entryName].ninetyFiveThPercentileTimeMeasurement = this.percentile(this.statistics.statisticsData[entryName].timeMeasurementSeries, 95);
this.statistics.statisticsData[entryName].stdDevTimeMeasurement = this.stdDeviation(this.statistics.statisticsData[entryName].timeMeasurementSeries);
if (Configuration.getPerformanceStorage().enabled) {
- this.getStorage().storePerformanceStatistics(this.statistics);
+ parentPort.postMessage({ id: WorkerEvents.PERFORMANCE_STATISTICS, data: this.statistics });
}
}
private logPrefix(): string {
return Utils.logPrefix(` ${this.objId} | Performance statistics`);
}
-
- private getStorage(): Storage {
- if (!PerformanceStatistics.storage) {
- PerformanceStatistics.storage = StorageFactory.getStorage(Configuration.getPerformanceStorage().type ,Configuration.getPerformanceStorage().URI, this.logPrefix());
- }
- return PerformanceStatistics.storage;
- }
}
static insertAt = (str: string, subStr: string, pos: number): string => `${str.slice(0, pos)}${subStr}${str.slice(pos)}`;
/**
- * @param {number} [retryNumber=0]
- * @returns {number} delay in milliseconds
+ * @param [retryNumber=0]
+ * @returns delay in milliseconds
*/
static exponentialDelay(retryNumber = 0): number {
const delay = Math.pow(2, retryNumber) * 100;
/**
* Convert websocket error code to human readable string message
*
- * @param {number} code websocket error code
- * @returns {string} human readable string message
+ * @param code websocket error code
+ * @returns human readable string message
*/
static getWebSocketCloseEventStatusString(code: number): string {
if (code >= 0 && code <= 999) {
export default abstract class WorkerAbstract {
protected readonly workerScript: string;
protected readonly workerStartDelay: number;
+ protected readonly messageListener: (message: any) => void;
public abstract size: number;
public abstract maxElementsPerWorker: number | null;
/**
* `WorkerAbstract` constructor.
*
- * @param {string} workerScript
- * @param {number} workerStartDelay
+ * @param workerScript
+ * @param workerStartDelay
+ * @param messageListenerCallback
*/
- constructor(workerScript: string, workerStartDelay: number = Constants.WORKER_START_DELAY) {
+ constructor(workerScript: string, workerStartDelay: number = Constants.WORKER_START_DELAY,
+ messageListenerCallback: (message: any) => void = () => { /* This is intentional */ }) {
this.workerScript = workerScript;
this.workerStartDelay = workerStartDelay;
+ this.messageListener = messageListenerCallback;
}
public abstract start(): Promise<void>;
/**
* Create a new `WorkerDynamicPool`.
*
- * @param {string} workerScript
- * @param {number} min
- * @param {number} max
- * @param {number} workerStartDelay
- * @param {PoolOptions} opts
+ * @param workerScript
+ * @param min
+ * @param max
+ * @param workerStartDelay
+ * @param opts
+ * @param messageListenerCallback
*/
- constructor(workerScript: string, min: number, max: number, workerStartDelay?: number, opts?: PoolOptions<Worker>) {
- super(workerScript, workerStartDelay);
+ constructor(workerScript: string, min: number, max: number, workerStartDelay?: number, opts?: PoolOptions<Worker>,
+ messageListenerCallback: (message: any) => void = () => { /* This is intentional */ }) {
+ super(workerScript, workerStartDelay, messageListenerCallback);
opts.exitHandler = opts?.exitHandler ?? WorkerUtils.defaultExitHandler;
this.pool = new DynamicThreadPool<WorkerData>(min, max, this.workerScript, opts);
}
/**
*
- * @returns {Promise<void>}
+ * @returns
* @public
*/
// eslint-disable-next-line @typescript-eslint/no-empty-function
/**
*
- * @returns {Promise<void>}
+ * @returns
* @public
*/
// eslint-disable-next-line @typescript-eslint/require-await
/**
*
- * @param {T} elementData
- * @returns {Promise<void>}
+ * @param elementData
+ * @returns
* @public
*/
public async addElement(elementData: T): Promise<void> {
// This is intentional
}
- public static getWorkerImplementation<T>(workerScript: string, workerProcessType: WorkerProcessType, options?: WorkerOptions): WorkerAbstract | null {
+ public static getWorkerImplementation<T>(workerScript: string, workerProcessType: WorkerProcessType, options?: WorkerOptions,
+ messageListenerCallback: (message: any) => void = () => { /* This is intentional */ }): WorkerAbstract | null {
if (!isMainThread) {
throw new Error('Trying to get a worker implementation outside the main thread');
}
switch (workerProcessType) {
case WorkerProcessType.WORKER_SET:
options.elementsPerWorker = options.elementsPerWorker ?? Constants.DEFAULT_CHARGING_STATIONS_PER_WORKER;
- workerImplementation = new WorkerSet<T>(workerScript, options.elementsPerWorker, options.startDelay);
+ workerImplementation = new WorkerSet<T>(workerScript, options.elementsPerWorker, options.startDelay, messageListenerCallback);
break;
case WorkerProcessType.STATIC_POOL:
options.poolMaxSize = options.poolMaxSize ?? Constants.DEFAULT_WORKER_POOL_MAX_SIZE;
- workerImplementation = new WorkerStaticPool<T>(workerScript, options.poolMaxSize, options.startDelay, options.poolOptions);
+ workerImplementation = new WorkerStaticPool<T>(workerScript, options.poolMaxSize, options.startDelay, options.poolOptions, messageListenerCallback);
break;
case WorkerProcessType.DYNAMIC_POOL:
options.poolMinSize = options.poolMinSize ?? Constants.DEFAULT_WORKER_POOL_MIN_SIZE;
options.poolMaxSize = options.poolMaxSize ?? Constants.DEFAULT_WORKER_POOL_MAX_SIZE;
- workerImplementation = new WorkerDynamicPool<T>(workerScript, options.poolMinSize, options.poolMaxSize, options.startDelay, options.poolOptions);
+ workerImplementation = new WorkerDynamicPool<T>(workerScript, options.poolMinSize, options.poolMaxSize, options.startDelay, options.poolOptions, messageListenerCallback);
break;
}
return workerImplementation;
/**
* Create a new `WorkerSet`.
*
- * @param {string} workerScript
- * @param {number} maxElementsPerWorker
- * @param {number} workerStartDelay
+ * @param workerScript
+ * @param maxElementsPerWorker
+ * @param workerStartDelay
+ * @param messageListenerCallback
*/
- constructor(workerScript: string, maxElementsPerWorker = 1, workerStartDelay?: number) {
- super(workerScript, workerStartDelay);
+ constructor(workerScript: string, maxElementsPerWorker = 1, workerStartDelay?: number, messageListenerCallback: (message: any) => void = () => { /* This is intentional */ }) {
+ super(workerScript, workerStartDelay, messageListenerCallback);
this.workerSet = new Set<WorkerSetElement>();
this.maxElementsPerWorker = maxElementsPerWorker;
}
/**
*
- * @param {T} elementData
- * @returns {Promise<void>}
+ * @param elementData
+ * @returns
* @public
*/
public async addElement(elementData: T): Promise<void> {
// Start worker sequentially to optimize memory at startup
await Utils.sleep(this.workerStartDelay);
}
- this.getLastWorker().postMessage({ id: WorkerEvents.START_WORKER_ELEMENT, workerData: elementData });
+ this.getLastWorker().postMessage({ id: WorkerEvents.START_WORKER_ELEMENT, data: elementData });
this.getLastWorkerSetElement().numberOfWorkerElements++;
}
/**
*
- * @returns {Promise<void>}
+ * @returns
* @public
*/
public async start(): Promise<void> {
/**
*
- * @returns {Promise<void>}
+ * @returns
* @public
*/
public async stop(): Promise<void> {
*/
private startWorker(): void {
const worker = new Worker(this.workerScript);
- worker.on('message', () => { /* This is intentional */ });
+ worker.on('message', this.messageListener);
worker.on('error', () => { /* This is intentional */ });
worker.on('exit', (code) => {
WorkerUtils.defaultExitHandler(code);
/**
* Create a new `WorkerStaticPool`.
*
- * @param {string} workerScript
- * @param {number} numberOfThreads
- * @param {number} startWorkerDelay
- * @param {PoolOptions} opts
+ * @param workerScript
+ * @param numberOfThreads
+ * @param startWorkerDelay
+ * @param opts
+ * @param messageListenerCallback
*/
- constructor(workerScript: string, numberOfThreads: number, startWorkerDelay?: number, opts?: PoolOptions<Worker>) {
- super(workerScript, startWorkerDelay);
+ constructor(workerScript: string, numberOfThreads: number, startWorkerDelay?: number, opts?: PoolOptions<Worker>,
+ messageListenerCallback: (message: any) => void = () => { /* This is intentional */ }) {
+ super(workerScript, startWorkerDelay, messageListenerCallback);
opts.exitHandler = opts?.exitHandler ?? WorkerUtils.defaultExitHandler;
this.pool = new FixedThreadPool(numberOfThreads, this.workerScript, opts);
}
/**
*
- * @returns {Promise<void>}
+ * @returns
* @public
*/
// eslint-disable-next-line @typescript-eslint/no-empty-function
- public async start(): Promise<void> {}
+ public async start(): Promise<void> {
+ // This is intentional
+ }
/**
*
- * @returns {Promise<void>}
+ * @returns
* @public
*/
public async stop(): Promise<void> {
/**
*
- * @param {T} elementData
- * @returns {Promise<void>}
+ * @param elementData
+ * @returns
* @public
*/
public async addElement(elementData: T): Promise<void> {