],
"distributeStationsToTenantsEqually": true,
"statisticsDisplayInterval": 60,
- "useWorkerPool": false,
- "workerMaxPoolSize": 16,
+ "workerProcess": "workerSet",
+ "workerPoolMinSize": 4,
+ "workerPoolMaxSize": 16,
"chargingStationsPerWorker": 1,
"stationTemplateURLs": [
{
"integrity": "sha512-PACt1xdErJbMUOUweSrbVM7gSIYm1vTncW2hF6Os/EeWi6TXYAYMPp+8v6rzHmypE5gHrxaxZNXgMkJVIdZpHw==",
"dev": true
},
- "@types/worker-threads-pool": {
- "version": "2.0.0",
- "resolved": "https://registry.npmjs.org/@types/worker-threads-pool/-/worker-threads-pool-2.0.0.tgz",
- "integrity": "sha512-VnFtC73JBhQRtamCR4edJWxyZrwvwz6y/Ov4VNQLiirpRUkND7xo0iUNXEZP2mvZuNbvFeXGglTAFZwCDdWMTg==",
- "dev": true,
- "requires": {
- "@types/node": "*"
- }
- },
"@types/ws": {
"version": "7.4.0",
"resolved": "https://registry.npmjs.org/@types/ws/-/ws-7.4.0.tgz",
"integrity": "sha512-OPdCF6GsMIP+Az+aWfAAOEt2/+iVDKE7oy6lJ098aoe59oAmK76qV6Gw60SbZ8jHuG2wH058GF4pLFbYamYrVA==",
"dev": true
},
- "after-all": {
- "version": "2.0.2",
- "resolved": "https://registry.npmjs.org/after-all/-/after-all-2.0.2.tgz",
- "integrity": "sha1-IDACmO1glLTIXJjnyK1NymKPn3M=",
- "requires": {
- "once": "^1.3.0"
- }
- },
"ajv": {
"version": "6.12.2",
"resolved": "https://registry.npmjs.org/ajv/-/ajv-6.12.2.tgz",
"version": "1.4.0",
"resolved": "https://registry.npmjs.org/once/-/once-1.4.0.tgz",
"integrity": "sha1-WDsap3WWHUsROsF9nFC6753Xa9E=",
+ "dev": true,
"requires": {
"wrappy": "1"
}
"semver-compare": "^1.0.0"
}
},
+ "poolifier": {
+ "version": "1.2.1",
+ "resolved": "https://registry.npmjs.org/poolifier/-/poolifier-1.2.1.tgz",
+ "integrity": "sha512-kUH3JlLLO7JdAnRdtbgaSME5WDxgDzAuUk9+hapVHfXeI0VjpeuLnxLL8cUF7lEgrUE4m59scr5TFx5ajbPqXQ=="
+ },
"posix-character-classes": {
"version": "0.1.1",
"resolved": "https://registry.npmjs.org/posix-character-classes/-/posix-character-classes-0.1.1.tgz",
"integrity": "sha512-Hz/mrNwitNRh/HUAtM/VT/5VH+ygD6DV7mYKZAtHOrbs8U7lvPS6xf7EJKMF0uW1KJCl0H701g3ZGus+muE5vQ==",
"dev": true
},
- "worker-threads-pool": {
- "version": "2.0.0",
- "resolved": "https://registry.npmjs.org/worker-threads-pool/-/worker-threads-pool-2.0.0.tgz",
- "integrity": "sha512-5dtGbEucee6o5/kQgpyKIUoHGWf8488DP3ihZDJzDIVvH4V+NA6HdBl/I5ckI4yN1NwM68pdZDbrwac1M95mEA==",
- "requires": {
- "after-all": "^2.0.2"
- }
- },
"wrap-ansi": {
"version": "5.1.0",
"resolved": "https://registry.npmjs.org/wrap-ansi/-/wrap-ansi-5.1.0.tgz",
"wrappy": {
"version": "1.0.2",
"resolved": "https://registry.npmjs.org/wrappy/-/wrappy-1.0.2.tgz",
- "integrity": "sha1-tSQ9jz7BqjXxNkYFvA0QNuMKtp8="
+ "integrity": "sha1-tSQ9jz7BqjXxNkYFvA0QNuMKtp8=",
+ "dev": true
},
"write-file-atomic": {
"version": "2.4.2",
},
"dependencies": {
"mongodb": "^3.6.3",
+ "poolifier": "^1.2.1",
"source-map-support": "^0.5.19",
"tslib": "^2.1.0",
"uuid": "^8.3.2",
"winston": "^3.3.3",
"winston-daily-rotate-file": "^4.5.0",
- "worker-threads-pool": "^2.0.0",
"ws": "^7.4.2"
},
"optionalDependencies": {
"devDependencies": {
"@types/node": "^14.14.22",
"@types/uuid": "^8.3.0",
- "@types/worker-threads-pool": "^2.0.0",
"@types/ws": "^7.4.0",
"@typescript-eslint/eslint-plugin": "^4.14.0",
"@typescript-eslint/parser": "^4.14.0",
"distributeStationsToTenantsEqually": true,
"statisticsDisplayInterval": 60,
"chargingStationsPerWorker": 1,
- "useWorkerPool": false,
+ "workerProcess": "workerSet",
+ "workerPoolMinSize": 4,
"workerPoolMaxSize": 16,
"stationTemplateURLs": [
{
await this.sendStatusNotification(transactionConnectorID, ChargePointStatus.PREPARING);
if (commandPayload.chargingProfile && commandPayload.chargingProfile.chargingProfilePurpose === ChargingProfilePurposeType.TX_PROFILE) {
this._setChargingProfile(transactionConnectorID, commandPayload.chargingProfile);
- } else {
+ } else if (commandPayload.chargingProfile && commandPayload.chargingProfile.chargingProfilePurpose !== ChargingProfilePurposeType.TX_PROFILE) {
return Constants.OCPP_RESPONSE_REJECTED;
}
// Authorization successful start transaction
await this.sendStatusNotification(transactionConnectorID, ChargePointStatus.PREPARING);
if (commandPayload.chargingProfile && commandPayload.chargingProfile.chargingProfilePurpose === ChargingProfilePurposeType.TX_PROFILE) {
this._setChargingProfile(transactionConnectorID, commandPayload.chargingProfile);
- } else {
+ } else if (commandPayload.chargingProfile && commandPayload.chargingProfile.chargingProfilePurpose !== ChargingProfilePurposeType.TX_PROFILE) {
return Constants.OCPP_RESPONSE_REJECTED;
}
// No local authorization check required => start transaction
+import { WorkerData, WorkerEvents } from '../types/Worker';
import { isMainThread, parentPort, workerData } from 'worker_threads';
import ChargingStation from './ChargingStation';
+import Constants from '../utils/Constants';
+import { ThreadWorker } from 'poolifier';
import Utils from '../utils/Utils';
-import { WorkerEvents } from '../types/Worker';
if (!isMainThread) {
// Add listener to start charging station from main thread
});
}
-function startChargingStation(data: any) {
- const station = new ChargingStation(data.index as number, data.templateFile as string);
+function startChargingStation(data: WorkerData) {
+ const station = new ChargingStation(data.index , data.templateFile);
station.start();
}
+
+export default new ThreadWorker(startChargingStation, { maxInactiveTime: Constants.WORKER_POOL_MAX_INACTIVE_TIME, async: false });
import Configuration from './utils/Configuration';
+import Utils from './utils/Utils';
import { WorkerData } from './types/Worker';
import WorkerFactory from './worker/WorkerFactory';
import Wrk from './worker/Wrk';
class Bootstrap {
- static start() {
+ static async start() {
try {
let numStationsTotal = 0;
const workerImplementation: Wrk = WorkerFactory.getWorkerImpl('./dist/charging-station/StationWorker.js');
- void workerImplementation.start();
+ await workerImplementation.start();
// Start ChargingStation object in worker thread
if (Configuration.getStationTemplateURLs()) {
for (const stationURL of Configuration.getStationTemplateURLs()) {
index,
templateFile: stationURL.file
};
- void workerImplementation.addElement(workerData);
+ await workerImplementation.addElement(workerData);
numStationsTotal++;
}
} catch (error) {
if (numStationsTotal === 0) {
console.log('No charging station template enabled in configuration, exiting');
} else {
- console.log(`Charging station simulator started with ${numStationsTotal.toString()} charging station(s) and ${workerImplementation.size}${Configuration.useWorkerPool() ? `/${Configuration.getWorkerPoolMaxSize().toString()}` : ''} worker(s) concurrently running (${workerImplementation.maxElementsPerWorker} charging station(s) per worker)`);
+ console.log(`Charging station simulator started with ${numStationsTotal.toString()} charging station(s) and ${workerImplementation.size}${Utils.workerPoolInUse() ? `/${Configuration.getWorkerPoolMaxSize().toString()}` : ''} worker(s) concurrently running (${workerImplementation.maxElementsPerWorker} charging station(s) per worker)`);
}
} catch (error) {
// eslint-disable-next-line no-console
}
}
-Bootstrap.start();
+Bootstrap.start().catch(
+ (error) => {
+ console.error(error);
+ }
+);
+import { WorkerProcessType } from './Worker';
+
export interface StationTemplateURL {
file: string;
numberOfStations: number;
connectionTimeout?: number;
autoReconnectMaxRetries?: number;
distributeStationsToTenantsEqually?: boolean;
- useWorkerPool?: boolean;
+ workerProcess?: WorkerProcessType;
+ workerPoolMinSize?: number;
workerPoolMaxSize?: number;
chargingStationsPerWorker?: number;
logFormat?: string;
import { Worker } from 'worker_threads';
+export enum WorkerProcessType {
+ WORKER_SET = 'workerSet',
+ DYNAMIC_POOL = 'dynamicPool',
+ STATIC_POOL = 'staticPool'
+}
+
// FIXME: make it more generic
export interface WorkerData {
index: number;
import ConfigurationData, { StationTemplateURL } from '../types/ConfigurationData';
+import { WorkerProcessType } from '../types/Worker';
import fs from 'fs';
export default class Configuration {
return Configuration.getConfig().stationTemplateURLs;
}
- static useWorkerPool(): boolean {
- return Configuration.getConfig().useWorkerPool;
+ static getWorkerProcess(): WorkerProcessType {
+ Configuration.deprecateConfigurationKey('useWorkerPool;', 'Use \'workerProcess\' to define the type of worker process to use instead');
+ return Configuration.objectHasOwnProperty(Configuration.getConfig(), 'workerProcess') ? Configuration.getConfig().workerProcess : WorkerProcessType.WORKER_SET;
+ }
+
+ static getWorkerPoolMinSize(): number {
+ return Configuration.objectHasOwnProperty(Configuration.getConfig(), 'workerPoolMinSize') ? Configuration.getConfig().workerPoolMinSize : 4;
}
static getWorkerPoolMaxSize(): number {
Configuration.deprecateConfigurationKey('workerPoolSize;', 'Use \'workerPoolMaxSize\' instead');
- return Configuration.useWorkerPool() && Configuration.objectHasOwnProperty(Configuration.getConfig(), 'workerPoolMaxSize') ? Configuration.getConfig().workerPoolMaxSize : 16;
+ return Configuration.objectHasOwnProperty(Configuration.getConfig(), 'workerPoolMaxSize') ? Configuration.getConfig().workerPoolMaxSize : 16;
}
static getChargingStationsPerWorker(): number {
static readonly TRANSACTION_DEFAULT_TAGID = '00000000';
static readonly START_WORKER_DELAY = 500;
+ static readonly WORKER_POOL_MAX_INACTIVE_TIME = 60000;
}
+import Configuration from './Configuration';
import { WebSocketCloseEventStatusString } from '../types/WebSocket';
+import { WorkerProcessType } from '../types/Worker';
import { v4 as uuid } from 'uuid';
export default class Utils {
}
return '(Unknown)';
}
+
+ static workerPoolInUse(): boolean {
+ return Configuration.getWorkerProcess() === WorkerProcessType.DYNAMIC_POOL || Configuration.getWorkerProcess() === WorkerProcessType.STATIC_POOL;
+ }
}
--- /dev/null
+import { DynamicThreadPool, DynamicThreadPoolOptions } from 'poolifier';
+
+import Constants from '../utils/Constants';
+import Utils from '../utils/Utils';
+import { WorkerData } from '../types/Worker';
+import Wrk from './Wrk';
+import { threadId } from 'worker_threads';
+
+export default class WorkerDynamicPool extends Wrk {
+ private pool: DynamicPool;
+
+ /**
+ * Create a new `WorkerDynamicPool`.
+ *
+ * @param {string} workerScript
+ */
+ constructor(workerScript: string, min: number, max: number,) {
+ super(workerScript);
+ this.pool = DynamicPool.getInstance(min, max, this.workerScript);
+ }
+
+ get size(): number {
+ return this.pool.workers.length;
+ }
+
+ get maxElementsPerWorker(): number {
+ return 1;
+ }
+
+ /**
+ *
+ * @return {Promise<void>}
+ * @public
+ */
+ // eslint-disable-next-line @typescript-eslint/no-empty-function
+ public async start(): Promise<void> { }
+
+ /**
+ *
+ * @return {Promise<void>}
+ * @public
+ */
+ public async addElement(elementData: WorkerData): Promise<void> {
+ await this.pool.execute(elementData);
+ // Start worker sequentially to optimize memory at startup
+ await Utils.sleep(Constants.START_WORKER_DELAY);
+ }
+}
+
+class DynamicPool extends DynamicThreadPool<WorkerData> {
+ private static instance: DynamicPool;
+
+ private constructor(min: number, max: number, filename: string, opts?: DynamicThreadPoolOptions) {
+ super(min, max, filename, opts);
+ }
+
+ public static getInstance(min: number, max: number, filename: string): DynamicPool {
+ if (!DynamicPool.instance) {
+ DynamicPool.instance = new DynamicPool(min, max, filename,
+ {
+ exitHandler: (code) => {
+ if (code !== 0) {
+ console.error(`Worker ${threadId} stopped with exit code ${code}`);
+ }
+ }
+ }
+ );
+ }
+ return DynamicPool.instance;
+ }
+}
import Configuration from '../utils/Configuration';
-import WorkerPool from './WorkerPool';
+import WorkerDynamicPool from './WorkerDynamicPool';
+import { WorkerProcessType } from '../types/Worker';
import WorkerSet from './WorkerSet';
+import WorkerStaticPool from './WorkerStaticPool';
import Wrk from './Wrk';
export default class WorkerFactory {
public static getWorkerImpl(workerScript: string): Wrk {
- if (Configuration.useWorkerPool()) {
- return new WorkerPool(workerScript);
+ switch (Configuration.getWorkerProcess()) {
+ case WorkerProcessType.WORKER_SET:
+ return new WorkerSet(workerScript, Configuration.getChargingStationsPerWorker());
+ case WorkerProcessType.STATIC_POOL:
+ return new WorkerStaticPool(workerScript, Configuration.getWorkerPoolMaxSize());
+ case WorkerProcessType.DYNAMIC_POOL:
+ return new WorkerDynamicPool(workerScript, Configuration.getWorkerPoolMinSize(), Configuration.getWorkerPoolMaxSize());
+ default:
+ return null;
}
- return new WorkerSet(workerScript, Configuration.getChargingStationsPerWorker());
}
}
+++ /dev/null
-import Configuration from '../utils/Configuration';
-import Constants from '../utils/Constants';
-import Pool from 'worker-threads-pool';
-import Utils from '../utils/Utils';
-import { WorkerData } from '../types/Worker';
-import Wrk from './Wrk';
-
-export default class WorkerPool extends Wrk {
- private pool: Pool;
-
- /**
- * Create a new `WorkerPool`.
- *
- * @param {string} workerScript
- */
- constructor(workerScript: string) {
- super(workerScript);
- this.pool = UniquePool.getInstance();
- }
-
- get size(): number {
- return this.pool.size;
- }
-
- get maxElementsPerWorker(): number {
- return 1;
- }
-
- /**
- *
- * @return {Promise<void>}
- * @public
- */
- // eslint-disable-next-line @typescript-eslint/no-empty-function
- public async start(): Promise<void> { }
-
- /**
- *
- * @return {Promise<void>}
- * @public
- */
- public async addElement(elementData: WorkerData): Promise<void> {
- return new Promise((resolve, reject) => {
- this.pool.acquire(this.workerScript, { workerData: elementData }, (err, worker) => {
- if (err) {
- return reject(err);
- }
- worker.once('message', resolve);
- worker.once('error', reject);
- });
- // Start worker sequentially to optimize memory at startup
- void Utils.sleep(Constants.START_WORKER_DELAY);
- });
- }
-}
-
-class UniquePool {
- private static instance: Pool;
-
- private constructor() { }
-
- public static getInstance(): Pool {
- if (!UniquePool.instance) {
- UniquePool.instance = new Pool({ max: Configuration.getWorkerPoolMaxSize() });
- }
- return UniquePool.instance;
- }
-}
+import { Worker, threadId } from 'worker_threads';
import { WorkerData, WorkerEvents, WorkerSetElement } from '../types/Worker';
import Constants from '../utils/Constants';
import Utils from '../utils/Utils';
-import { Worker } from 'worker_threads';
import Wrk from './Wrk';
export default class WorkerSet extends Wrk {
*/
public async addElement(elementData: WorkerData): Promise<void> {
if (!this.workers) {
- throw Error('Cannot add a WorkerSet element: workers set does not exist');
+ throw Error('Cannot add a WorkerSet element: workers\' set does not exist');
}
if (this.getLastWorkerSetElement().numberOfWorkerElements >= this.maxElementsPerWorker) {
- void this.startWorker();
+ this.startWorker();
// Start worker sequentially to optimize memory at startup
- void Utils.sleep(Constants.START_WORKER_DELAY);
+ await Utils.sleep(Constants.START_WORKER_DELAY);
}
this.getLastWorker().postMessage({ id: WorkerEvents.START_WORKER_ELEMENT, workerData: elementData });
this.getLastWorkerSetElement().numberOfWorkerElements++;
* @public
*/
public async start(): Promise<void> {
- await this.startWorker();
+ this.startWorker();
// Start worker sequentially to optimize memory at startup
await Utils.sleep(Constants.START_WORKER_DELAY);
}
* @return {Promise}
* @private
*/
- private async startWorker() {
- return new Promise((resolve, reject) => {
- const worker = new Worker(this.workerScript);
- worker.on('message', resolve);
- worker.on('error', reject);
- worker.on('exit', (code) => {
- if (code !== 0) {
- reject(new Error(`Worker stopped with exit code ${code}`));
- }
- });
- this.workers.add({ worker, numberOfWorkerElements: 0 });
+ private startWorker(): void {
+ const worker = new Worker(this.workerScript);
+ worker.on('message', () => { });
+ worker.on('error', () => { });
+ worker.on('exit', (code) => {
+ if (code !== 0) {
+ console.error(`Worker ${threadId} stopped with exit code ${code}`);
+ }
});
+ this.workers.add({ worker, numberOfWorkerElements: 0 });
}
private getLastWorkerSetElement(): WorkerSetElement {
--- /dev/null
+import { FixedThreadPool, FixedThreadPoolOptions } from 'poolifier';
+
+import Constants from '../utils/Constants';
+import Utils from '../utils/Utils';
+import { WorkerData } from '../types/Worker';
+import Wrk from './Wrk';
+import { threadId } from 'worker_threads';
+
+export default class WorkerStaticPool extends Wrk {
+ private pool: StaticPool;
+
+ /**
+ * Create a new `WorkerStaticPool`.
+ *
+ * @param {string} workerScript
+ */
+ constructor(workerScript: string, numThreads: number) {
+ super(workerScript);
+ this.pool = StaticPool.getInstance(numThreads, this.workerScript);
+ }
+
+ get size(): number {
+ return this.pool.workers.length;
+ }
+
+ get maxElementsPerWorker(): number {
+ return 1;
+ }
+
+ /**
+ *
+ * @return {Promise<void>}
+ * @public
+ */
+ // eslint-disable-next-line @typescript-eslint/no-empty-function
+ public async start(): Promise<void> { }
+
+ /**
+ *
+ * @return {Promise<void>}
+ * @public
+ */
+ public async addElement(elementData: WorkerData): Promise<void> {
+ await this.pool.execute(elementData);
+ // Start worker sequentially to optimize memory at startup
+ await Utils.sleep(Constants.START_WORKER_DELAY);
+ }
+}
+
+class StaticPool extends FixedThreadPool<WorkerData> {
+ private static instance: StaticPool;
+
+ private constructor(numThreads: number, workerScript: string, opts?: FixedThreadPoolOptions) {
+ super(numThreads, workerScript, opts);
+ }
+
+ public static getInstance(numThreads: number, workerScript: string): StaticPool {
+ if (!StaticPool.instance) {
+ StaticPool.instance = new StaticPool(numThreads, workerScript,
+ {
+ exitHandler: (code) => {
+ if (code !== 0) {
+ console.error(`Worker ${threadId} stopped with exit code ${code}`);
+ }
+ }
+ }
+ );
+ }
+ return StaticPool.instance;
+ }
+}