From 81797102d5214fea2fc58eff2666fe8b8d9a5a11 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Thu, 26 Aug 2021 23:58:19 +0200 Subject: [PATCH] Storage: use worker threads message passing to store performance records from the main thread MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Pool usage does not yet support it. Signed-off-by: Jérôme Benoit --- src/charging-station/Bootstrap.ts | 15 ++++++++++- src/charging-station/StationWorker.ts | 8 +++--- .../ocpp/OCPPRequestService.ts | 6 ++--- src/types/Worker.ts | 11 ++++++-- src/utils/PerformanceStatistics.ts | 14 +++------- src/utils/Utils.ts | 8 +++--- src/worker/WorkerAbstract.ts | 10 ++++--- src/worker/WorkerDynamicPool.ts | 24 +++++++++-------- src/worker/WorkerFactory.ts | 9 ++++--- src/worker/WorkerSet.ts | 23 ++++++++-------- src/worker/WorkerStaticPool.ts | 26 +++++++++++-------- 11 files changed, 89 insertions(+), 65 deletions(-) diff --git a/src/charging-station/Bootstrap.ts b/src/charging-station/Bootstrap.ts index 3cd83d1d..7fd000a2 100644 --- a/src/charging-station/Bootstrap.ts +++ b/src/charging-station/Bootstrap.ts @@ -1,5 +1,8 @@ +import { StationWorkerData, WorkerEvents, WorkerMessage } from '../types/Worker'; + import Configuration from '../utils/Configuration'; -import { StationWorkerData } from '../types/Worker'; +import { Storage } from '../utils/performance-storage/Storage'; +import { StorageFactory } from '../utils/performance-storage/StorageFactory'; import Utils from '../utils/Utils'; import WorkerAbstract from '../worker/WorkerAbstract'; import WorkerFactory from '../worker/WorkerFactory'; @@ -10,6 +13,7 @@ import { version } from '../../package.json'; export default class Bootstrap { private static instance: Bootstrap | null = null; private static workerImplementation: WorkerAbstract | null = null; + private static storage: Storage; private version: string = version; private started: boolean; private workerScript: string; @@ -18,6 +22,7 @@ export default class Bootstrap { this.started = false; this.workerScript = path.join(path.resolve(__dirname, '../'), 'charging-station', 'StationWorker.js'); this.initWorkerImplementation(); + Bootstrap.storage = StorageFactory.getStorage(Configuration.getPerformanceStorage().type, Configuration.getPerformanceStorage().URI, this.logPrefix()); Configuration.setConfigurationChangeCallback(async () => Bootstrap.getInstance().restart()); } @@ -88,9 +93,17 @@ export default class Bootstrap { poolOptions: { workerChoiceStrategy: Configuration.getWorkerPoolStrategy() } + }, (msg: WorkerMessage) => { + if (msg.id === WorkerEvents.PERFORMANCE_STATISTICS) { + Bootstrap.storage.storePerformanceStatistics(msg.data); + } }); if (!Bootstrap.workerImplementation) { throw new Error('Worker implementation not found'); } } + + private logPrefix(): string { + return Utils.logPrefix(' Bootstrap'); + } } diff --git a/src/charging-station/StationWorker.ts b/src/charging-station/StationWorker.ts index c3c5cb51..dd0f377c 100644 --- a/src/charging-station/StationWorker.ts +++ b/src/charging-station/StationWorker.ts @@ -1,6 +1,6 @@ // Partial Copyright Jerome Benoit. 2021. All Rights Reserved. -import { StationWorkerData, WorkerEvents } from '../types/Worker'; +import { StationWorkerData, WorkerEvents, WorkerMessage } from '../types/Worker'; import { parentPort, workerData } from 'worker_threads'; import ChargingStation from './ChargingStation'; @@ -24,9 +24,9 @@ if (Utils.workerPoolInUse()) { * Listen messages send by the main thread */ function addMessageListener(): void { - parentPort?.on('message', (message) => { + parentPort?.on('message', (message: WorkerMessage) => { if (message.id === WorkerEvents.START_WORKER_ELEMENT) { - startChargingStation(message.workerData); + startChargingStation(message.data); } }); } @@ -34,7 +34,7 @@ function addMessageListener(): void { /** * Create and start a charging station instance * - * @param {StationWorkerData} data workerData + * @param data workerData */ function startChargingStation(data: StationWorkerData): void { const station = new ChargingStation(data.index, data.templateFile); diff --git a/src/charging-station/ocpp/OCPPRequestService.ts b/src/charging-station/ocpp/OCPPRequestService.ts index 5b1f981c..4c1d1b74 100644 --- a/src/charging-station/ocpp/OCPPRequestService.ts +++ b/src/charging-station/ocpp/OCPPRequestService.ts @@ -75,8 +75,8 @@ export default abstract class OCPPRequestService { /** * Function that will receive the request's response * - * @param {Record | string} payload - * @param {Record} requestPayload + * @param payload + * @param requestPayload */ async function responseCallback(payload: Record | string, requestPayload: Record): Promise { if (self.chargingStation.getEnableStatistics()) { @@ -90,7 +90,7 @@ export default abstract class OCPPRequestService { /** * Function that will receive the request's rejection * - * @param {OCPPError} error + * @param error */ function rejectCallback(error: OCPPError): void { if (self.chargingStation.getEnableStatistics()) { diff --git a/src/types/Worker.ts b/src/types/Worker.ts index c1f6f0c8..eedf5611 100644 --- a/src/types/Worker.ts +++ b/src/types/Worker.ts @@ -24,11 +24,18 @@ export interface StationWorkerData extends WorkerData { } export interface WorkerSetElement { - worker: Worker, - numberOfWorkerElements: number + worker: Worker; + numberOfWorkerElements: number; +} + +export interface WorkerMessage { + id: WorkerEvents; + data: any; } export enum WorkerEvents { START_WORKER_ELEMENT = 'startWorkerElement', + STOP_WORKER_ELEMENT = 'stopWorkerElement', + PERFORMANCE_STATISTICS = 'performanceStatistics' } diff --git a/src/utils/PerformanceStatistics.ts b/src/utils/PerformanceStatistics.ts index 02b2854e..4049197a 100644 --- a/src/utils/PerformanceStatistics.ts +++ b/src/utils/PerformanceStatistics.ts @@ -7,13 +7,12 @@ import Statistics, { StatisticsData } from '../types/Statistics'; import Configuration from './Configuration'; import { MessageType } from '../types/ocpp/MessageType'; -import { Storage } from './performance-storage/Storage'; -import { StorageFactory } from './performance-storage/StorageFactory'; import Utils from './Utils'; +import { WorkerEvents } from '../types/Worker'; import logger from './Logger'; +import { parentPort } from 'worker_threads'; export default class PerformanceStatistics { - private static storage: Storage; private objId: string; private performanceObserver: PerformanceObserver; private statistics: Statistics; @@ -191,18 +190,11 @@ export default class PerformanceStatistics { this.statistics.statisticsData[entryName].ninetyFiveThPercentileTimeMeasurement = this.percentile(this.statistics.statisticsData[entryName].timeMeasurementSeries, 95); this.statistics.statisticsData[entryName].stdDevTimeMeasurement = this.stdDeviation(this.statistics.statisticsData[entryName].timeMeasurementSeries); if (Configuration.getPerformanceStorage().enabled) { - this.getStorage().storePerformanceStatistics(this.statistics); + parentPort.postMessage({ id: WorkerEvents.PERFORMANCE_STATISTICS, data: this.statistics }); } } private logPrefix(): string { return Utils.logPrefix(` ${this.objId} | Performance statistics`); } - - private getStorage(): Storage { - if (!PerformanceStatistics.storage) { - PerformanceStatistics.storage = StorageFactory.getStorage(Configuration.getPerformanceStorage().type ,Configuration.getPerformanceStorage().URI, this.logPrefix()); - } - return PerformanceStatistics.storage; - } } diff --git a/src/utils/Utils.ts b/src/utils/Utils.ts index 5da54a2e..74abb782 100644 --- a/src/utils/Utils.ts +++ b/src/utils/Utils.ts @@ -190,8 +190,8 @@ export default class Utils { static insertAt = (str: string, subStr: string, pos: number): string => `${str.slice(0, pos)}${subStr}${str.slice(pos)}`; /** - * @param {number} [retryNumber=0] - * @returns {number} delay in milliseconds + * @param [retryNumber=0] + * @returns delay in milliseconds */ static exponentialDelay(retryNumber = 0): number { const delay = Math.pow(2, retryNumber) * 100; @@ -202,8 +202,8 @@ export default class Utils { /** * Convert websocket error code to human readable string message * - * @param {number} code websocket error code - * @returns {string} human readable string message + * @param code websocket error code + * @returns human readable string message */ static getWebSocketCloseEventStatusString(code: number): string { if (code >= 0 && code <= 999) { diff --git a/src/worker/WorkerAbstract.ts b/src/worker/WorkerAbstract.ts index 71577816..d5f9a58e 100644 --- a/src/worker/WorkerAbstract.ts +++ b/src/worker/WorkerAbstract.ts @@ -4,18 +4,22 @@ import { WorkerData } from '../types/Worker'; export default abstract class WorkerAbstract { protected readonly workerScript: string; protected readonly workerStartDelay: number; + protected readonly messageListener: (message: any) => void; public abstract size: number; public abstract maxElementsPerWorker: number | null; /** * `WorkerAbstract` constructor. * - * @param {string} workerScript - * @param {number} workerStartDelay + * @param workerScript + * @param workerStartDelay + * @param messageListenerCallback */ - constructor(workerScript: string, workerStartDelay: number = Constants.WORKER_START_DELAY) { + constructor(workerScript: string, workerStartDelay: number = Constants.WORKER_START_DELAY, + messageListenerCallback: (message: any) => void = () => { /* This is intentional */ }) { this.workerScript = workerScript; this.workerStartDelay = workerStartDelay; + this.messageListener = messageListenerCallback; } public abstract start(): Promise; diff --git a/src/worker/WorkerDynamicPool.ts b/src/worker/WorkerDynamicPool.ts index 47d1e458..0e1cd4cd 100644 --- a/src/worker/WorkerDynamicPool.ts +++ b/src/worker/WorkerDynamicPool.ts @@ -12,14 +12,16 @@ export default class WorkerDynamicPool extends WorkerAbstract { /** * Create a new `WorkerDynamicPool`. * - * @param {string} workerScript - * @param {number} min - * @param {number} max - * @param {number} workerStartDelay - * @param {PoolOptions} opts + * @param workerScript + * @param min + * @param max + * @param workerStartDelay + * @param opts + * @param messageListenerCallback */ - constructor(workerScript: string, min: number, max: number, workerStartDelay?: number, opts?: PoolOptions) { - super(workerScript, workerStartDelay); + constructor(workerScript: string, min: number, max: number, workerStartDelay?: number, opts?: PoolOptions, + messageListenerCallback: (message: any) => void = () => { /* This is intentional */ }) { + super(workerScript, workerStartDelay, messageListenerCallback); opts.exitHandler = opts?.exitHandler ?? WorkerUtils.defaultExitHandler; this.pool = new DynamicThreadPool(min, max, this.workerScript, opts); } @@ -34,7 +36,7 @@ export default class WorkerDynamicPool extends WorkerAbstract { /** * - * @returns {Promise} + * @returns * @public */ // eslint-disable-next-line @typescript-eslint/no-empty-function @@ -44,7 +46,7 @@ export default class WorkerDynamicPool extends WorkerAbstract { /** * - * @returns {Promise} + * @returns * @public */ // eslint-disable-next-line @typescript-eslint/require-await @@ -54,8 +56,8 @@ export default class WorkerDynamicPool extends WorkerAbstract { /** * - * @param {T} elementData - * @returns {Promise} + * @param elementData + * @returns * @public */ public async addElement(elementData: T): Promise { diff --git a/src/worker/WorkerFactory.ts b/src/worker/WorkerFactory.ts index 8ccd6698..af2c53f9 100644 --- a/src/worker/WorkerFactory.ts +++ b/src/worker/WorkerFactory.ts @@ -13,7 +13,8 @@ export default class WorkerFactory { // This is intentional } - public static getWorkerImplementation(workerScript: string, workerProcessType: WorkerProcessType, options?: WorkerOptions): WorkerAbstract | null { + public static getWorkerImplementation(workerScript: string, workerProcessType: WorkerProcessType, options?: WorkerOptions, + messageListenerCallback: (message: any) => void = () => { /* This is intentional */ }): WorkerAbstract | null { if (!isMainThread) { throw new Error('Trying to get a worker implementation outside the main thread'); } @@ -23,16 +24,16 @@ export default class WorkerFactory { switch (workerProcessType) { case WorkerProcessType.WORKER_SET: options.elementsPerWorker = options.elementsPerWorker ?? Constants.DEFAULT_CHARGING_STATIONS_PER_WORKER; - workerImplementation = new WorkerSet(workerScript, options.elementsPerWorker, options.startDelay); + workerImplementation = new WorkerSet(workerScript, options.elementsPerWorker, options.startDelay, messageListenerCallback); break; case WorkerProcessType.STATIC_POOL: options.poolMaxSize = options.poolMaxSize ?? Constants.DEFAULT_WORKER_POOL_MAX_SIZE; - workerImplementation = new WorkerStaticPool(workerScript, options.poolMaxSize, options.startDelay, options.poolOptions); + workerImplementation = new WorkerStaticPool(workerScript, options.poolMaxSize, options.startDelay, options.poolOptions, messageListenerCallback); break; case WorkerProcessType.DYNAMIC_POOL: options.poolMinSize = options.poolMinSize ?? Constants.DEFAULT_WORKER_POOL_MIN_SIZE; options.poolMaxSize = options.poolMaxSize ?? Constants.DEFAULT_WORKER_POOL_MAX_SIZE; - workerImplementation = new WorkerDynamicPool(workerScript, options.poolMinSize, options.poolMaxSize, options.startDelay, options.poolOptions); + workerImplementation = new WorkerDynamicPool(workerScript, options.poolMinSize, options.poolMaxSize, options.startDelay, options.poolOptions, messageListenerCallback); break; } return workerImplementation; diff --git a/src/worker/WorkerSet.ts b/src/worker/WorkerSet.ts index 32956104..1b82f1d7 100644 --- a/src/worker/WorkerSet.ts +++ b/src/worker/WorkerSet.ts @@ -14,12 +14,13 @@ export default class WorkerSet extends WorkerAbstract { /** * Create a new `WorkerSet`. * - * @param {string} workerScript - * @param {number} maxElementsPerWorker - * @param {number} workerStartDelay + * @param workerScript + * @param maxElementsPerWorker + * @param workerStartDelay + * @param messageListenerCallback */ - constructor(workerScript: string, maxElementsPerWorker = 1, workerStartDelay?: number) { - super(workerScript, workerStartDelay); + constructor(workerScript: string, maxElementsPerWorker = 1, workerStartDelay?: number, messageListenerCallback: (message: any) => void = () => { /* This is intentional */ }) { + super(workerScript, workerStartDelay, messageListenerCallback); this.workerSet = new Set(); this.maxElementsPerWorker = maxElementsPerWorker; } @@ -30,8 +31,8 @@ export default class WorkerSet extends WorkerAbstract { /** * - * @param {T} elementData - * @returns {Promise} + * @param elementData + * @returns * @public */ public async addElement(elementData: T): Promise { @@ -43,13 +44,13 @@ export default class WorkerSet extends WorkerAbstract { // Start worker sequentially to optimize memory at startup await Utils.sleep(this.workerStartDelay); } - this.getLastWorker().postMessage({ id: WorkerEvents.START_WORKER_ELEMENT, workerData: elementData }); + this.getLastWorker().postMessage({ id: WorkerEvents.START_WORKER_ELEMENT, data: elementData }); this.getLastWorkerSetElement().numberOfWorkerElements++; } /** * - * @returns {Promise} + * @returns * @public */ public async start(): Promise { @@ -60,7 +61,7 @@ export default class WorkerSet extends WorkerAbstract { /** * - * @returns {Promise} + * @returns * @public */ public async stop(): Promise { @@ -76,7 +77,7 @@ export default class WorkerSet extends WorkerAbstract { */ private startWorker(): void { const worker = new Worker(this.workerScript); - worker.on('message', () => { /* This is intentional */ }); + worker.on('message', this.messageListener); worker.on('error', () => { /* This is intentional */ }); worker.on('exit', (code) => { WorkerUtils.defaultExitHandler(code); diff --git a/src/worker/WorkerStaticPool.ts b/src/worker/WorkerStaticPool.ts index a7bb1932..d5e92226 100644 --- a/src/worker/WorkerStaticPool.ts +++ b/src/worker/WorkerStaticPool.ts @@ -12,13 +12,15 @@ export default class WorkerStaticPool extends WorkerAbstract { /** * Create a new `WorkerStaticPool`. * - * @param {string} workerScript - * @param {number} numberOfThreads - * @param {number} startWorkerDelay - * @param {PoolOptions} opts + * @param workerScript + * @param numberOfThreads + * @param startWorkerDelay + * @param opts + * @param messageListenerCallback */ - constructor(workerScript: string, numberOfThreads: number, startWorkerDelay?: number, opts?: PoolOptions) { - super(workerScript, startWorkerDelay); + constructor(workerScript: string, numberOfThreads: number, startWorkerDelay?: number, opts?: PoolOptions, + messageListenerCallback: (message: any) => void = () => { /* This is intentional */ }) { + super(workerScript, startWorkerDelay, messageListenerCallback); opts.exitHandler = opts?.exitHandler ?? WorkerUtils.defaultExitHandler; this.pool = new FixedThreadPool(numberOfThreads, this.workerScript, opts); } @@ -33,15 +35,17 @@ export default class WorkerStaticPool extends WorkerAbstract { /** * - * @returns {Promise} + * @returns * @public */ // eslint-disable-next-line @typescript-eslint/no-empty-function - public async start(): Promise {} + public async start(): Promise { + // This is intentional + } /** * - * @returns {Promise} + * @returns * @public */ public async stop(): Promise { @@ -50,8 +54,8 @@ export default class WorkerStaticPool extends WorkerAbstract { /** * - * @param {T} elementData - * @returns {Promise} + * @param elementData + * @returns * @public */ public async addElement(elementData: T): Promise { -- 2.34.1