From 418106c832022ba6f100f4bf81315300994bee87 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Sun, 24 Jan 2021 19:56:26 +0100 Subject: [PATCH] Cleanup workers handling classes. MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit And various assorted fixes. Signed-off-by: Jérôme Benoit --- src/assets/config-template.json | 2 +- .../abb-atg.station-template.json | 2 +- src/charging-station/ChargingStation.ts | 13 ++- src/charging-station/Worker.ts | 105 +----------------- src/charging-station/WorkerGroup.ts | 79 +++++++++++++ src/charging-station/WorkerPool.ts | 64 ++++++++--- src/start.ts | 49 ++++---- src/types/CommandStatistics.ts | 3 +- src/utils/Statistics.ts | 75 ++++++------- 9 files changed, 211 insertions(+), 181 deletions(-) create mode 100644 src/charging-station/WorkerGroup.ts diff --git a/src/assets/config-template.json b/src/assets/config-template.json index 0104ade3..9e52c6cf 100644 --- a/src/assets/config-template.json +++ b/src/assets/config-template.json @@ -4,9 +4,9 @@ ], "distributeStationsToTenantsEqually": true, "statisticsDisplayInterval": 60, + "chargingStationsPerWorker": 1, "useWorkerPool": false, "workerPoolMaxSize": 16, - "chargingStationsPerWorker": 1, "stationTemplateURLs": [ { "file": "./src/assets/station-templates/siemens.station-template.json", diff --git a/src/assets/station-templates/abb-atg.station-template.json b/src/assets/station-templates/abb-atg.station-template.json index 935509af..c8297a1c 100644 --- a/src/assets/station-templates/abb-atg.station-template.json +++ b/src/assets/station-templates/abb-atg.station-template.json @@ -1,7 +1,7 @@ { "authorizationFile": "./src/assets/authorization-tags.json", "baseName": "CS-ABB", - "nameSuffix": "Roaming", + "nameSuffix": "-Roaming", "chargePointModel": "MD_TERRA_53", "chargePointVendor": "ABB", "firmwareVersion": "4.0.4.22", diff --git a/src/charging-station/ChargingStation.ts b/src/charging-station/ChargingStation.ts index b3a2e622..1aa3e3b6 100644 --- a/src/charging-station/ChargingStation.ts +++ b/src/charging-station/ChargingStation.ts @@ -141,6 +141,9 @@ export default class ChargingStation { if (Utils.convertToInt(lastConnector) === 0 && this._getUseConnectorId0() && this.stationInfo.Connectors[lastConnector]) { this.connectors[lastConnector] = Utils.cloneObject(this.stationInfo.Connectors[lastConnector]); this.connectors[lastConnector].availability = AvailabilityType.OPERATIVE; + if (Utils.isUndefined(this.connectors[lastConnector]?.chargingProfiles)) { + this.connectors[lastConnector].chargingProfiles = []; + } } } // Generate all connectors @@ -149,6 +152,9 @@ export default class ChargingStation { const randConnectorID = this.stationInfo.randomConnectors ? Utils.getRandomInt(Utils.convertToInt(lastConnector), 1) : index; this.connectors[index] = Utils.cloneObject(this.stationInfo.Connectors[randConnectorID]); this.connectors[index].availability = AvailabilityType.OPERATIVE; + if (Utils.isUndefined(this.connectors[lastConnector]?.chargingProfiles)) { + this.connectors[index].chargingProfiles = []; + } } } } @@ -167,8 +173,7 @@ export default class ChargingStation { } this.stationInfo.powerDivider = this._getPowerDivider(); if (this.getEnableStatistics()) { - this.statistics = Statistics.getInstance(); - this.statistics.objName = this.stationInfo.chargingStationId; + this.statistics = new Statistics(this.stationInfo.chargingStationId); this.performanceObserver = new PerformanceObserver((list) => { const entry = list.getEntries()[0]; this.statistics.logPerformance(entry, Constants.ENTITY_CHARGING_STATION); @@ -1057,6 +1062,10 @@ export default class ChargingStation { logger.debug(this._logPrefix() + ' Heartbeat response received: %j to Heartbeat request: %j', payload, requestPayload); } + handleResponseAuthorize(payload: AuthorizeResponse, requestPayload: AuthorizeRequest): void { + logger.debug(this._logPrefix() + ' Authorize response received: %j to Authorize request: %j', payload, requestPayload); + } + async handleRequest(messageId: string, commandName: IncomingRequestCommand, commandPayload: Record): Promise { let response; // Call diff --git a/src/charging-station/Worker.ts b/src/charging-station/Worker.ts index 8a9e9f91..54dcb156 100644 --- a/src/charging-station/Worker.ts +++ b/src/charging-station/Worker.ts @@ -1,111 +1,18 @@ -import Configuration from '../utils/Configuration'; -import Constants from '../utils/Constants'; -import { Worker } from 'worker_threads'; import WorkerData from '../types/WorkerData'; -import WorkerPool from './WorkerPool'; -export default class Wrk { - private workerScript: string; - private workerData: WorkerData; - private worker: Worker; - private maxWorkerElements: number; - private numWorkerElements: number; +export default abstract class Wrk { + protected workerScript: string; + public abstract size: number; /** * Create a new `Wrk`. * * @param {string} workerScript - * @param {WorkerData} workerData - * @param {number} maxWorkerElements */ - constructor(workerScript: string, workerData: WorkerData, maxWorkerElements = 1) { - this.workerData = workerData; + constructor(workerScript: string) { this.workerScript = workerScript; - if (Configuration.useWorkerPool()) { - WorkerPool.maxConcurrentWorkers = Configuration.getWorkerPoolMaxSize(); - } else { - this.maxWorkerElements = maxWorkerElements; - this.numWorkerElements = 0; - } } - /** - * - * @return {Promise} - * @public - */ - async start(): Promise { - if (Configuration.useWorkerPool()) { - await this.startWorkerPool(); - } else { - await this.startWorker(); - } - return this.worker; - } - - /** - * - * @return {void} - * @public - */ - addWorkerElement(workerData: WorkerData): void { - if (Configuration.useWorkerPool()) { - throw Error('Cannot add Wrk element if the worker pool is enabled'); - } - if (this.numWorkerElements >= this.maxWorkerElements) { - throw Error('Cannot add Wrk element: max number of elements per worker reached'); - } - this.workerData = workerData; - this.worker.postMessage({ id: Constants.START_WORKER_ELEMENT, workerData: workerData }); - this.numWorkerElements++; - } - - /** - * - * @return {number} - * @public - */ - public getWorkerPoolSize(): number { - if (Configuration.useWorkerPool()) { - return WorkerPool.getPoolSize(); - } - } - - /** - * - * @return {Promise} - * @private - */ - private async startWorkerPool() { - return new Promise((resolve, reject) => { - WorkerPool.acquire(this.workerScript, { workerData: this.workerData }, (err, worker) => { - if (err) { - return reject(err); - } - worker.once('message', resolve); - worker.once('error', reject); - this.worker = worker; - }); - }); - } - - /** - * - * @return {Promise} - * @private - */ - private async startWorker() { - return new Promise((resolve, reject) => { - const worker = new Worker(this.workerScript, { workerData: this.workerData }); - worker.on('message', resolve); - worker.on('error', reject); - worker.on('exit', (code) => { - if (code !== 0) { - reject(new Error(`Worker stopped with exit code ${code}`)); - } - }); - this.numWorkerElements++; - this.worker = worker; - }); - } + public abstract start(): Promise; + public abstract addElement(elementData: WorkerData): void; } diff --git a/src/charging-station/WorkerGroup.ts b/src/charging-station/WorkerGroup.ts new file mode 100644 index 00000000..4f5ad32f --- /dev/null +++ b/src/charging-station/WorkerGroup.ts @@ -0,0 +1,79 @@ +import Configuration from '../utils/Configuration'; +import Constants from '../utils/Constants'; +import { Worker } from 'worker_threads'; +import WorkerData from '../types/WorkerData'; +import Wrk from './Worker'; + +export default class WorkerGroup extends Wrk { + private worker: Worker; + private lastElementData: WorkerData; + private maxWorkerElements: number; + private numWorkerElements: number; + + /** + * Create a new `WorkerGroup`. + * + * @param {string} workerScript + * @param {WorkerData} workerData + * @param {number} maxWorkerElements + */ + constructor(workerScript: string, initialElementData: WorkerData, maxWorkerElements = 1) { + super(workerScript); + this.lastElementData = initialElementData; + this.maxWorkerElements = maxWorkerElements; + this.numWorkerElements = 0; + } + + get size(): number { + return this.numWorkerElements; + } + + /** + * + * @return {void} + * @public + */ + public addElement(elementData: WorkerData): void { + if (Configuration.useWorkerPool()) { + throw Error('Cannot add a WorkerGroup element: the worker pool is enabled in configuration'); + } + if (!this.worker) { + throw Error('Cannot add a WorkerGroup element: worker does not exist'); + } + if (this.numWorkerElements >= this.maxWorkerElements) { + throw Error('Cannot add a WorkerGroup element: max number of elements per worker reached'); + } + this.lastElementData = elementData; + this.worker.postMessage({ id: Constants.START_WORKER_ELEMENT, workerData: this.lastElementData }); + this.numWorkerElements++; + } + + /** + * + * @return {Promise} + * @public + */ + public async start(): Promise { + await this.startWorker(); + } + + /** + * + * @return {Promise} + * @private + */ + private async startWorker() { + return new Promise((resolve, reject) => { + const worker = new Worker(this.workerScript, { workerData: this.lastElementData }); + worker.on('message', resolve); + worker.on('error', reject); + worker.on('exit', (code) => { + if (code !== 0) { + reject(new Error(`Worker stopped with exit code ${code}`)); + } + }); + this.numWorkerElements++; + this.worker = worker; + }); + } +} diff --git a/src/charging-station/WorkerPool.ts b/src/charging-station/WorkerPool.ts index 40c284e0..76910f82 100644 --- a/src/charging-station/WorkerPool.ts +++ b/src/charging-station/WorkerPool.ts @@ -1,25 +1,59 @@ -import { Worker, WorkerOptions } from 'worker_threads'; - +import Configuration from '../utils/Configuration'; import Pool from 'worker-threads-pool'; +import WorkerData from '../types/WorkerData'; +import Wrk from './Worker'; -export default class WorkerPool { - public static maxConcurrentWorkers: number; - private static instance: Pool; +export default class WorkerPool extends Wrk { + private pool: Pool; - private constructor() { } + /** + * Create a new `WorkerPool`. + * + * @param {string} workerScript + */ + constructor(workerScript: string) { + super(workerScript); + this.pool = UniquePool.getInstance(); + } - public static getInstance(): Pool { - if (!WorkerPool.instance) { - WorkerPool.instance = new Pool({ max: WorkerPool.maxConcurrentWorkers }); - } - return WorkerPool.instance; + get size(): number { + return this.pool.size; } - public static acquire(filename: string, options: WorkerOptions, callback: (error: Error | null, worker: Worker) => void): void { - WorkerPool.getInstance().acquire(filename, options, callback); + /** + * + * @return {Promise} + * @public + */ + public async start(): Promise { } + + /** + * + * @return {Promise} + * @public + */ + public async addElement(elementData: WorkerData): Promise { + return new Promise((resolve, reject) => { + this.pool.acquire(this.workerScript, { workerData: elementData }, (err, worker) => { + if (err) { + return reject(err); + } + worker.once('message', resolve); + worker.once('error', reject); + }); + }); } +} + +class UniquePool { + private static instance: Pool; - public static getPoolSize(): number { - return WorkerPool.getInstance().size; + private constructor() { } + + public static getInstance(): Pool { + if (!UniquePool.instance) { + UniquePool.instance = new Pool({ max: Configuration.getWorkerPoolMaxSize() }); + } + return UniquePool.instance; } } diff --git a/src/start.ts b/src/start.ts index 41ae5d1e..947a1abf 100644 --- a/src/start.ts +++ b/src/start.ts @@ -2,7 +2,8 @@ import Configuration from './utils/Configuration'; import Constants from './utils/Constants'; import Utils from './utils/Utils'; import WorkerData from './types/WorkerData'; -import Wrk from './charging-station/Worker'; +import WorkerGroup from './charging-station/WorkerGroup'; +import WorkerPool from './charging-station/WorkerPool'; class Bootstrap { static async start() { @@ -11,7 +12,11 @@ class Bootstrap { let numConcurrentWorkers = 0; const chargingStationsPerWorker = Configuration.getChargingStationsPerWorker(); let chargingStationsPerWorkerCounter = 0; - let worker: Wrk; + let workerImplementation: WorkerGroup | WorkerPool; + if (Configuration.useWorkerPool()) { + workerImplementation = new WorkerPool('./dist/charging-station/StationWorker.js'); + void workerImplementation.start(); + } // Start each ChargingStation object in a worker thread if (Configuration.getStationTemplateURLs()) { for (const stationURL of Configuration.getStationTemplateURLs()) { @@ -23,26 +28,28 @@ class Bootstrap { templateFile: stationURL.file }; if (Configuration.useWorkerPool()) { - worker = new Wrk('./dist/charging-station/StationWorker.js', workerData); - worker.start().catch(() => { }); - numConcurrentWorkers = worker.getWorkerPoolSize(); - numStationsTotal = numConcurrentWorkers; - // Start Wrk sequentially to optimize memory at start time - await Utils.sleep(Constants.START_WORKER_DELAY); - } else if (!Configuration.useWorkerPool() && (chargingStationsPerWorkerCounter === 0 || chargingStationsPerWorkerCounter >= chargingStationsPerWorker)) { - // Start new Wrk with one charging station - worker = new Wrk('./dist/charging-station/StationWorker.js', workerData, chargingStationsPerWorker); - worker.start().catch(() => { }); - numConcurrentWorkers++; - chargingStationsPerWorkerCounter = 1; - numStationsTotal++; - // Start Wrk sequentially to optimize memory at start time + void workerImplementation.addElement(workerData); + numConcurrentWorkers = workerImplementation.size; + numStationsTotal = workerImplementation.size; + // Start worker sequentially to optimize memory at start time await Utils.sleep(Constants.START_WORKER_DELAY); - } else if (!Configuration.useWorkerPool()) { - // Add charging station to existing Wrk - worker.addWorkerElement(workerData); - chargingStationsPerWorkerCounter++; - numStationsTotal++; + } else { + // eslint-disable-next-line no-lonely-if + if (chargingStationsPerWorkerCounter === 0 || chargingStationsPerWorkerCounter >= chargingStationsPerWorker) { + // Start new WorkerGroup with one charging station + workerImplementation = new WorkerGroup('./dist/charging-station/StationWorker.js', workerData, chargingStationsPerWorker); + void workerImplementation.start(); + numConcurrentWorkers++; + chargingStationsPerWorkerCounter = 1; + numStationsTotal++; + // Start worker sequentially to optimize memory at start time + await Utils.sleep(Constants.START_WORKER_DELAY); + } else { + // Add charging station to existing WorkerGroup + void workerImplementation.addElement(workerData); + chargingStationsPerWorkerCounter++; + numStationsTotal++; + } } } } catch (error) { diff --git a/src/types/CommandStatistics.ts b/src/types/CommandStatistics.ts index b8740039..84f2bdca 100644 --- a/src/types/CommandStatistics.ts +++ b/src/types/CommandStatistics.ts @@ -23,5 +23,6 @@ export interface CommandStatisticsData { } export default interface CommandStatistics { - [command: string]: CommandStatisticsData; + id: string; + commandsStatisticsData: Record; } diff --git a/src/utils/Statistics.ts b/src/utils/Statistics.ts index de8951d0..966f1b57 100644 --- a/src/utils/Statistics.ts +++ b/src/utils/Statistics.ts @@ -9,53 +9,46 @@ import Utils from './Utils'; import logger from './Logger'; export default class Statistics { - private static instance: Statistics; - public objName: string; + private objId: string; private commandsStatistics: CommandStatistics; - private constructor() { - this.commandsStatistics = {} as CommandStatistics; + public constructor(objName: string) { + this.objId = objName; + this.commandsStatistics = { id: this.objId ? this.objId : ' Object id not specified', commandsStatisticsData: {} } as CommandStatistics; } - static getInstance(): Statistics { - if (!Statistics.instance) { - Statistics.instance = new Statistics(); - } - return Statistics.instance; - } - - addMessage(command: RequestCommand | IncomingRequestCommand, messageType: MessageType): void { + public addMessage(command: RequestCommand | IncomingRequestCommand, messageType: MessageType): void { switch (messageType) { case MessageType.CALL_MESSAGE: - if (this.commandsStatistics[command] && this.commandsStatistics[command].countRequest) { - this.commandsStatistics[command].countRequest++; + if (this.commandsStatistics.commandsStatisticsData[command] && this.commandsStatistics.commandsStatisticsData[command].countRequest) { + this.commandsStatistics.commandsStatisticsData[command].countRequest++; } else { - this.commandsStatistics[command] = {} as CommandStatisticsData; - this.commandsStatistics[command].countRequest = 1; + this.commandsStatistics.commandsStatisticsData[command] = {} as CommandStatisticsData; + this.commandsStatistics.commandsStatisticsData[command].countRequest = 1; } break; case MessageType.CALL_RESULT_MESSAGE: - if (this.commandsStatistics[command]) { - if (this.commandsStatistics[command].countResponse) { - this.commandsStatistics[command].countResponse++; + if (this.commandsStatistics.commandsStatisticsData[command]) { + if (this.commandsStatistics.commandsStatisticsData[command].countResponse) { + this.commandsStatistics.commandsStatisticsData[command].countResponse++; } else { - this.commandsStatistics[command].countResponse = 1; + this.commandsStatistics.commandsStatisticsData[command].countResponse = 1; } } else { - this.commandsStatistics[command] = {} as CommandStatisticsData; - this.commandsStatistics[command].countResponse = 1; + this.commandsStatistics.commandsStatisticsData[command] = {} as CommandStatisticsData; + this.commandsStatistics.commandsStatisticsData[command].countResponse = 1; } break; case MessageType.CALL_ERROR_MESSAGE: - if (this.commandsStatistics[command]) { - if (this.commandsStatistics[command].countError) { - this.commandsStatistics[command].countError++; + if (this.commandsStatistics.commandsStatisticsData[command]) { + if (this.commandsStatistics.commandsStatisticsData[command].countError) { + this.commandsStatistics.commandsStatisticsData[command].countError++; } else { - this.commandsStatistics[command].countError = 1; + this.commandsStatistics.commandsStatisticsData[command].countError = 1; } } else { - this.commandsStatistics[command] = {} as CommandStatisticsData; - this.commandsStatistics[command].countError = 1; + this.commandsStatistics.commandsStatisticsData[command] = {} as CommandStatisticsData; + this.commandsStatistics.commandsStatisticsData[command].countError = 1; } break; default: @@ -64,7 +57,7 @@ export default class Statistics { } } - logPerformance(entry: PerformanceEntry, className: string): void { + public logPerformance(entry: PerformanceEntry, className: string): void { this.addPerformanceTimer(entry.name as RequestCommand | IncomingRequestCommand, entry.duration); const perfEntry: PerfEntry = {} as PerfEntry; perfEntry.name = entry.name; @@ -74,7 +67,7 @@ export default class Statistics { logger.info(`${this._logPrefix()} object ${className} method(s) performance entry: %j`, perfEntry); } - start(): void { + public start(): void { this._displayInterval(); } @@ -114,21 +107,21 @@ export default class Statistics { command = MAPCOMMAND[command] as RequestCommand | IncomingRequestCommand; } // Initialize command statistics - if (!this.commandsStatistics[command]) { - this.commandsStatistics[command] = {} as CommandStatisticsData; + if (!this.commandsStatistics.commandsStatisticsData[command]) { + this.commandsStatistics.commandsStatisticsData[command] = {} as CommandStatisticsData; } // Update current statistics timers - this.commandsStatistics[command].countTimeMeasurement = this.commandsStatistics[command].countTimeMeasurement ? this.commandsStatistics[command].countTimeMeasurement + 1 : 1; - this.commandsStatistics[command].currentTimeMeasurement = duration; - this.commandsStatistics[command].minTimeMeasurement = this.commandsStatistics[command].minTimeMeasurement ? (this.commandsStatistics[command].minTimeMeasurement > duration ? duration : this.commandsStatistics[command].minTimeMeasurement) : duration; - this.commandsStatistics[command].maxTimeMeasurement = this.commandsStatistics[command].maxTimeMeasurement ? (this.commandsStatistics[command].maxTimeMeasurement < duration ? duration : this.commandsStatistics[command].maxTimeMeasurement) : duration; - this.commandsStatistics[command].totalTimeMeasurement = this.commandsStatistics[command].totalTimeMeasurement ? this.commandsStatistics[command].totalTimeMeasurement + duration : duration; - this.commandsStatistics[command].avgTimeMeasurement = this.commandsStatistics[command].totalTimeMeasurement / this.commandsStatistics[command].countTimeMeasurement; - Array.isArray(this.commandsStatistics[command].timeMeasurementSeries) ? this.commandsStatistics[command].timeMeasurementSeries.push(duration) : this.commandsStatistics[command].timeMeasurementSeries = [duration] as CircularArray; - this.commandsStatistics[command].medTimeMeasurement = this.median(this.commandsStatistics[command].timeMeasurementSeries); + this.commandsStatistics.commandsStatisticsData[command].countTimeMeasurement = this.commandsStatistics.commandsStatisticsData[command].countTimeMeasurement ? this.commandsStatistics.commandsStatisticsData[command].countTimeMeasurement + 1 : 1; + this.commandsStatistics.commandsStatisticsData[command].currentTimeMeasurement = duration; + this.commandsStatistics.commandsStatisticsData[command].minTimeMeasurement = this.commandsStatistics.commandsStatisticsData[command].minTimeMeasurement ? (this.commandsStatistics.commandsStatisticsData[command].minTimeMeasurement > duration ? duration : this.commandsStatistics.commandsStatisticsData[command].minTimeMeasurement) : duration; + this.commandsStatistics.commandsStatisticsData[command].maxTimeMeasurement = this.commandsStatistics.commandsStatisticsData[command].maxTimeMeasurement ? (this.commandsStatistics.commandsStatisticsData[command].maxTimeMeasurement < duration ? duration : this.commandsStatistics.commandsStatisticsData[command].maxTimeMeasurement) : duration; + this.commandsStatistics.commandsStatisticsData[command].totalTimeMeasurement = this.commandsStatistics.commandsStatisticsData[command].totalTimeMeasurement ? this.commandsStatistics.commandsStatisticsData[command].totalTimeMeasurement + duration : duration; + this.commandsStatistics.commandsStatisticsData[command].avgTimeMeasurement = this.commandsStatistics.commandsStatisticsData[command].totalTimeMeasurement / this.commandsStatistics.commandsStatisticsData[command].countTimeMeasurement; + Array.isArray(this.commandsStatistics.commandsStatisticsData[command].timeMeasurementSeries) ? this.commandsStatistics.commandsStatisticsData[command].timeMeasurementSeries.push(duration) : this.commandsStatistics.commandsStatisticsData[command].timeMeasurementSeries = [duration] as CircularArray; + this.commandsStatistics.commandsStatisticsData[command].medTimeMeasurement = this.median(this.commandsStatistics.commandsStatisticsData[command].timeMeasurementSeries); } private _logPrefix(): string { - return Utils.logPrefix(` ${this.objName} Statistics:`); + return Utils.logPrefix(` ${this.objId} Statistics:`); } } -- 2.34.1