"statisticsDisplayInterval": 60,
"useWorkerPool": false,
"workerPoolSize": 16,
+ "chargingStationsPerWorker": 1,
+ "chargingStationIdSuffix": "",
"stationTemplateURLs": [
{
"file": "./src/assets/station-templates/siemens.station-template.json",
-import { isMainThread, workerData } from 'worker_threads';
+import { isMainThread, parentPort, workerData } from 'worker_threads';
+import Constants from '../utils/Constants';
import ChargingStation from './ChargingStation';
if (!isMainThread) {
const station = new ChargingStation(workerData.index as number, workerData.templateFile as string);
station.start();
+
+ // Listener: start new charging station from main thread
+ addListener();
+}
+
+function addListener() {
+ parentPort.setMaxListeners(1000);
+ parentPort.on("message", e => {
+ if (e.id === Constants.START_NEW_CHARGING_STATION) {
+ startChargingStation(e.workerData);
+ }
+ });
+}
+
+function startChargingStation(data: any) {
+ const station = new ChargingStation(data.index as number, data.templateFile as string);
+ station.start();
}
import Configuration from '../utils/Configuration';
import Pool from 'worker-threads-pool';
import WorkerData from '../types/WorkerData';
+import Constants from '../utils/Constants';
export default class Wrk {
private _workerScript: string;
private _workerData: WorkerData;
private _index: number;
private _concurrentWorkers: number;
+ private _worker: Worker;
/**
* Create a new `Wrk`.
* @return {Promise}
* @public
*/
- async start(): Promise<unknown> {
+ async start(): Promise<Worker> {
if (Configuration.useWorkerPool()) {
- return this._startWorkerWithPool();
+ this._startWorkerWithPool();
+ } else {
+ this._startWorker();
}
- return this._startWorker();
+ return this._worker;
+ }
+
+ /**
+ *
+ * @return {Promise}
+ * @public
+ */
+ async startNewChargingStation(workerData: WorkerData, numConcurrentWorkers: number): Promise<void> {
+ this._workerData = workerData;
+ this._index = workerData.index;
+ this._concurrentWorkers = numConcurrentWorkers;
+ this._worker.postMessage({ id : Constants.START_NEW_CHARGING_STATION, workerData: workerData });
}
/**
}
worker.once('message', resolve);
worker.once('error', reject);
+ this._worker = worker;
});
});
}
reject(new Error(`Worker id ${this._index} stopped with exit code ${code}`));
}
});
+ this._worker = worker;
});
}
}
private constructor() { }
public static getInstance(): Pool {
- if (!WorkerPool._instance) {
+ if (!WorkerPool._instance || (WorkerPool._instance?.size === WorkerPool.concurrentWorkers)) {
WorkerPool._instance = new Pool({ max: WorkerPool.concurrentWorkers });
}
return WorkerPool._instance;
import Configuration from './utils/Configuration';
import { StationTemplateURL } from './types/ConfigurationData';
+import Utils from './utils/Utils';
import Wrk from './charging-station/Worker';
+import WorkerData from './types/WorkerData';
+import fs from 'fs';
class Bootstrap {
- static start() {
+ static async start() {
try {
let numStationsTotal = 0;
let numConcurrentWorkers = 0;
+ let worker: Wrk;
+ let chargingStationsPerWorker = Configuration.getChargingStationsPerWorker();
+ let counter = 0;
// Start each ChargingStation object in a worker thread
if (Configuration.getStationTemplateURLs()) {
- Configuration.getStationTemplateURLs().forEach((stationURL: StationTemplateURL) => {
+ for await (const stationURL of Configuration.getStationTemplateURLs()) {
try {
const nbStations = stationURL.numberOfStations ? stationURL.numberOfStations : 0;
numStationsTotal += nbStations;
for (let index = 1; index <= nbStations; index++) {
- const worker = new Wrk('./dist/charging-station/StationWorker.js', {
+ const workerData = {
index,
- templateFile: stationURL.file,
- }, numStationsTotal);
- worker.start().catch(() => {});
+ templateFile: stationURL.file
+ } as WorkerData;
+ if(counter === 0 || counter === chargingStationsPerWorker) {
+ // Start new worker with one charging station
+ worker = await new Wrk('./dist/charging-station/StationWorker.js', workerData, numStationsTotal);
+ worker.start().catch(() => {});
+ counter = 0;
+ // Start workers sequentially to optimize memory at start time
+ await Utils.sleep(500);
+ } else {
+ // Add new charging station to existing Worker
+ worker.startNewChargingStation(workerData, numStationsTotal)
+ }
+ counter++;
+ // Start charging station sequentially to optimize memory at start time
numConcurrentWorkers = worker.concurrentWorkers;
}
} catch (error) {
// eslint-disable-next-line no-console
console.log('Charging station start with template file ' + stationURL.file + ' error ' + JSON.stringify(error, null, ' '));
}
- });
+ }
} else {
console.log('No stationTemplateURLs defined in configuration, exiting');
}
distributeStationsToTenantsEqually?: boolean;
useWorkerPool?: boolean;
workerPoolSize?: number;
+ chargingStationsPerWorker: number;
+ chargingStationIdSuffix: string;
logFormat?: string;
logLevel?: string;
logRotate?: boolean;
return Configuration.getConfig().workerPoolSize;
}
+ static getChargingStationsPerWorker(): number {
+ return Configuration.objectHasOwnProperty(Configuration.getConfig(), 'chargingStationsPerWorker') ? Configuration.getConfig().chargingStationsPerWorker : 1;
+ }
+
+ static getChargingStationIdSuffix(): string {
+ return Configuration.objectHasOwnProperty(Configuration.getConfig(), 'chargingStationIdSuffix') ? Configuration.getConfig().chargingStationIdSuffix : '';
+ }
+
static getLogConsole(): boolean {
Configuration.deprecateConfigurationKey('consoleLog', 'Use \'logConsole\' instead');
return Configuration.objectHasOwnProperty(Configuration.getConfig(), 'logConsole') ? Configuration.getConfig().logConsole : false;
static readonly CHARGING_STATION_ATG_WAIT_TIME = 2000; // Ms
static readonly TRANSACTION_DEFAULT_IDTAG = '00000000';
+
+ static readonly START_NEW_CHARGING_STATION = 'startNewChargingStation';
}