From: Jakob Date: Wed, 20 Jan 2021 16:00:56 +0000 (+0100) Subject: Memory optimization + worker configuration X-Git-Tag: v1.0.1-0~137^2 X-Git-Url: https://git.piment-noir.org/?a=commitdiff_plain;h=3d2ff9e4875d166265bb925e00a4301e82f5c248;p=e-mobility-charging-stations-simulator.git Memory optimization + worker configuration --- diff --git a/src/assets/config-template.json b/src/assets/config-template.json index adfe80ef..61067bcf 100644 --- a/src/assets/config-template.json +++ b/src/assets/config-template.json @@ -6,6 +6,8 @@ "statisticsDisplayInterval": 60, "useWorkerPool": false, "workerPoolSize": 16, + "chargingStationsPerWorker": 1, + "chargingStationIdSuffix": "", "stationTemplateURLs": [ { "file": "./src/assets/station-templates/siemens.station-template.json", diff --git a/src/charging-station/StationWorker.ts b/src/charging-station/StationWorker.ts index 68c3ea24..c7bc2337 100644 --- a/src/charging-station/StationWorker.ts +++ b/src/charging-station/StationWorker.ts @@ -1,8 +1,26 @@ -import { isMainThread, workerData } from 'worker_threads'; +import { isMainThread, parentPort, workerData } from 'worker_threads'; +import Constants from '../utils/Constants'; import ChargingStation from './ChargingStation'; if (!isMainThread) { const station = new ChargingStation(workerData.index as number, workerData.templateFile as string); station.start(); + + // Listener: start new charging station from main thread + addListener(); +} + +function addListener() { + parentPort.setMaxListeners(1000); + parentPort.on("message", e => { + if (e.id === Constants.START_NEW_CHARGING_STATION) { + startChargingStation(e.workerData); + } + }); +} + +function startChargingStation(data: any) { + const station = new ChargingStation(data.index as number, data.templateFile as string); + station.start(); } diff --git a/src/charging-station/Worker.ts b/src/charging-station/Worker.ts index 57748831..fbfdf3d1 100644 --- a/src/charging-station/Worker.ts +++ b/src/charging-station/Worker.ts @@ -3,12 +3,14 @@ import { Worker, WorkerOptions } from 'worker_threads'; import Configuration from '../utils/Configuration'; import Pool from 'worker-threads-pool'; import WorkerData from '../types/WorkerData'; +import Constants from '../utils/Constants'; export default class Wrk { private _workerScript: string; private _workerData: WorkerData; private _index: number; private _concurrentWorkers: number; + private _worker: Worker; /** * Create a new `Wrk`. @@ -42,11 +44,25 @@ export default class Wrk { * @return {Promise} * @public */ - async start(): Promise { + async start(): Promise { if (Configuration.useWorkerPool()) { - return this._startWorkerWithPool(); + this._startWorkerWithPool(); + } else { + this._startWorker(); } - return this._startWorker(); + return this._worker; + } + + /** + * + * @return {Promise} + * @public + */ + async startNewChargingStation(workerData: WorkerData, numConcurrentWorkers: number): Promise { + this._workerData = workerData; + this._index = workerData.index; + this._concurrentWorkers = numConcurrentWorkers; + this._worker.postMessage({ id : Constants.START_NEW_CHARGING_STATION, workerData: workerData }); } /** @@ -62,6 +78,7 @@ export default class Wrk { } worker.once('message', resolve); worker.once('error', reject); + this._worker = worker; }); }); } @@ -81,6 +98,7 @@ export default class Wrk { reject(new Error(`Worker id ${this._index} stopped with exit code ${code}`)); } }); + this._worker = worker; }); } } @@ -92,7 +110,7 @@ class WorkerPool { private constructor() { } public static getInstance(): Pool { - if (!WorkerPool._instance) { + if (!WorkerPool._instance || (WorkerPool._instance?.size === WorkerPool.concurrentWorkers)) { WorkerPool._instance = new Pool({ max: WorkerPool.concurrentWorkers }); } return WorkerPool._instance; diff --git a/src/start.ts b/src/start.ts index e841b57e..fb58c2d2 100644 --- a/src/start.ts +++ b/src/start.ts @@ -1,31 +1,49 @@ import Configuration from './utils/Configuration'; import { StationTemplateURL } from './types/ConfigurationData'; +import Utils from './utils/Utils'; import Wrk from './charging-station/Worker'; +import WorkerData from './types/WorkerData'; +import fs from 'fs'; class Bootstrap { - static start() { + static async start() { try { let numStationsTotal = 0; let numConcurrentWorkers = 0; + let worker: Wrk; + let chargingStationsPerWorker = Configuration.getChargingStationsPerWorker(); + let counter = 0; // Start each ChargingStation object in a worker thread if (Configuration.getStationTemplateURLs()) { - Configuration.getStationTemplateURLs().forEach((stationURL: StationTemplateURL) => { + for await (const stationURL of Configuration.getStationTemplateURLs()) { try { const nbStations = stationURL.numberOfStations ? stationURL.numberOfStations : 0; numStationsTotal += nbStations; for (let index = 1; index <= nbStations; index++) { - const worker = new Wrk('./dist/charging-station/StationWorker.js', { + const workerData = { index, - templateFile: stationURL.file, - }, numStationsTotal); - worker.start().catch(() => {}); + templateFile: stationURL.file + } as WorkerData; + if(counter === 0 || counter === chargingStationsPerWorker) { + // Start new worker with one charging station + worker = await new Wrk('./dist/charging-station/StationWorker.js', workerData, numStationsTotal); + worker.start().catch(() => {}); + counter = 0; + // Start workers sequentially to optimize memory at start time + await Utils.sleep(500); + } else { + // Add new charging station to existing Worker + worker.startNewChargingStation(workerData, numStationsTotal) + } + counter++; + // Start charging station sequentially to optimize memory at start time numConcurrentWorkers = worker.concurrentWorkers; } } catch (error) { // eslint-disable-next-line no-console console.log('Charging station start with template file ' + stationURL.file + ' error ' + JSON.stringify(error, null, ' ')); } - }); + } } else { console.log('No stationTemplateURLs defined in configuration, exiting'); } diff --git a/src/types/ConfigurationData.ts b/src/types/ConfigurationData.ts index 371e9c5d..e056471c 100644 --- a/src/types/ConfigurationData.ts +++ b/src/types/ConfigurationData.ts @@ -12,6 +12,8 @@ export default interface ConfigurationData { distributeStationsToTenantsEqually?: boolean; useWorkerPool?: boolean; workerPoolSize?: number; + chargingStationsPerWorker: number; + chargingStationIdSuffix: string; logFormat?: string; logLevel?: string; logRotate?: boolean; diff --git a/src/utils/Configuration.ts b/src/utils/Configuration.ts index ca98427e..090e2e2c 100644 --- a/src/utils/Configuration.ts +++ b/src/utils/Configuration.ts @@ -45,6 +45,14 @@ export default class Configuration { return Configuration.getConfig().workerPoolSize; } + static getChargingStationsPerWorker(): number { + return Configuration.objectHasOwnProperty(Configuration.getConfig(), 'chargingStationsPerWorker') ? Configuration.getConfig().chargingStationsPerWorker : 1; + } + + static getChargingStationIdSuffix(): string { + return Configuration.objectHasOwnProperty(Configuration.getConfig(), 'chargingStationIdSuffix') ? Configuration.getConfig().chargingStationIdSuffix : ''; + } + static getLogConsole(): boolean { Configuration.deprecateConfigurationKey('consoleLog', 'Use \'logConsole\' instead'); return Configuration.objectHasOwnProperty(Configuration.getConfig(), 'logConsole') ? Configuration.getConfig().logConsole : false; diff --git a/src/utils/Constants.ts b/src/utils/Constants.ts index 7afe4cac..6e5eb697 100644 --- a/src/utils/Constants.ts +++ b/src/utils/Constants.ts @@ -34,4 +34,6 @@ export default class Constants { static readonly CHARGING_STATION_ATG_WAIT_TIME = 2000; // Ms static readonly TRANSACTION_DEFAULT_IDTAG = '00000000'; + + static readonly START_NEW_CHARGING_STATION = 'startNewChargingStation'; }