supervisionUrls | | [] | string \| string[] | string or array of global connection URIs to OCPP-J servers
supervisionUrlDistribution | round-robin/random/sequential | round-robin | boolean | supervision urls distribution policy to simulated charging stations
workerProcess | workerSet/staticPool/dynamicPool | workerSet | string | worker threads process type
-workerStartDelay | | 500 | integer | milliseconds to wait at charging station worker threads startup
+workerStartDelay | | 500 | integer | milliseconds to wait at worker threads startup (only for workerSet threads process type)
+elementStartDelay | | 0 | integer | milliseconds to wait at charging station startup
workerPoolMinSize | | 4 | integer | worker threads pool minimum number of threads
workerPoolMaxSize | | 16 | integer | worker threads pool maximum number of threads
workerPoolStrategy | ROUND_ROBIN/LESS_RECENTLY_USED/... | [poolifier](https://github.com/poolifier/poolifier) default: ROUND_ROBBIN | string | worker threads pool [poolifier](https://github.com/poolifier/poolifier) worker choice strategy
private initWorkerImplementation(): void {
this.workerImplementation = WorkerFactory.getWorkerImplementation<ChargingStationWorkerData>(this.workerScript, Configuration.getWorkerProcess(),
{
- startDelay: Configuration.getWorkerStartDelay(),
+ workerStartDelay: Configuration.getWorkerStartDelay(),
+ elementStartDelay: Configuration.getElementStartDelay(),
poolMaxSize: Configuration.getWorkerPoolMaxSize(),
poolMinSize: Configuration.getWorkerPoolMinSize(),
elementsPerWorker: Configuration.getChargingStationsPerWorker(),
autoReconnectMaxRetries?: number;
workerProcess?: WorkerProcessType;
workerStartDelay?: number;
+ elementStartDelay?: number;
workerPoolMinSize?: number;
workerPoolMaxSize?: number;
workerPoolStrategy?: WorkerChoiceStrategy;
}
export interface WorkerOptions {
- startDelay?: number;
+ workerStartDelay?: number;
+ elementStartDelay?: number;
poolMaxSize?: number;
poolMinSize?: number;
elementsPerWorker?: number;
messageHandler?: (message: unknown) => void | Promise<void>;
}
+export interface WorkerStartOptions {
+ workerStartDelay: number;
+ elementStartDelay: number;
+}
+
export type WorkerData = JsonType;
export interface WorkerSetElement {
return Configuration.objectHasOwnProperty(Configuration.getConfig(), 'workerStartDelay') ? Configuration.getConfig().workerStartDelay : Constants.WORKER_START_DELAY;
}
+ static getElementStartDelay(): number {
+ return Configuration.objectHasOwnProperty(Configuration.getConfig(), 'elementStartDelay') ? Configuration.getConfig().elementStartDelay : Constants.ELEMENT_START_DELAY;
+ }
+
static getWorkerPoolMinSize(): number {
return Configuration.objectHasOwnProperty(Configuration.getConfig(), 'workerPoolMinSize') ? Configuration.getConfig().workerPoolMinSize : Constants.DEFAULT_WORKER_POOL_MIN_SIZE;
}
static readonly TRANSACTION_DEFAULT_IDTAG = '00000000';
+ static readonly ELEMENT_START_DELAY = 0;
static readonly WORKER_START_DELAY = 500;
static readonly WORKER_POOL_MAX_INACTIVE_TIME = 60000;
static readonly DEFAULT_WORKER_POOL_MIN_SIZE = 4;
+import { WorkerData, WorkerStartOptions } from '../types/Worker';
+
import Constants from '../utils/Constants';
-import { WorkerData } from '../types/Worker';
export default abstract class WorkerAbstract<T extends WorkerData> {
protected readonly workerScript: string;
protected readonly workerStartDelay: number;
+ protected readonly elementStartDelay: number;
public abstract readonly size: number;
public abstract readonly maxElementsPerWorker: number | null;
* `WorkerAbstract` constructor.
*
* @param workerScript
- * @param workerStartDelay
+ * @param workerStartOptions
*/
- constructor(workerScript: string, workerStartDelay: number = Constants.WORKER_START_DELAY) {
+ constructor(workerScript: string, workerStartOptions: WorkerStartOptions = {
+ workerStartDelay: Constants.WORKER_START_DELAY,
+ elementStartDelay: Constants.ELEMENT_START_DELAY
+ }) {
this.workerScript = workerScript;
- this.workerStartDelay = workerStartDelay;
+ this.workerStartDelay = workerStartOptions.workerStartDelay;
+ this.elementStartDelay = workerStartOptions.elementStartDelay;
}
public abstract start(): Promise<void>;
import { DynamicThreadPool, PoolOptions } from 'poolifier';
+import { WorkerData, WorkerStartOptions } from '../types/Worker';
import Utils from '../utils/Utils';
import { Worker } from 'worker_threads';
import WorkerAbstract from './WorkerAbstract';
-import { WorkerData } from '../types/Worker';
import { WorkerUtils } from './WorkerUtils';
export default class WorkerDynamicPool extends WorkerAbstract<WorkerData> {
* @param workerScript
* @param min
* @param max
- * @param workerStartDelay
+ * @param workerStartOptions
* @param opts
*/
- constructor(workerScript: string, min: number, max: number, workerStartDelay?: number, opts?: PoolOptions<Worker>) {
- super(workerScript, workerStartDelay);
+ constructor(workerScript: string, min: number, max: number, workerStartOptions?: WorkerStartOptions, opts?: PoolOptions<Worker>) {
+ super(workerScript, workerStartOptions);
opts.exitHandler = opts?.exitHandler ?? WorkerUtils.defaultExitHandler;
this.pool = new DynamicThreadPool<WorkerData>(min, max, this.workerScript, opts);
}
*/
public async addElement(elementData: WorkerData): Promise<void> {
await this.pool.execute(elementData);
- // Start worker sequentially to optimize memory at startup
- await Utils.sleep(this.workerStartDelay);
+ // Start element sequentially to optimize memory at startup
+ this.elementStartDelay > 0 && await Utils.sleep(this.elementStartDelay);
}
}
throw new Error('Trying to get a worker implementation outside the main thread');
}
options = options ?? {} as WorkerOptions;
- options.startDelay = options?.startDelay ?? Constants.WORKER_START_DELAY;
+ options.workerStartDelay = options?.workerStartDelay ?? Constants.WORKER_START_DELAY;
+ options.elementStartDelay = options?.elementStartDelay ?? Constants.ELEMENT_START_DELAY;
options.poolOptions = options?.poolOptions ?? {} as PoolOptions<Worker>;
options?.messageHandler && (options.poolOptions.messageHandler = options.messageHandler);
let workerImplementation: WorkerAbstract<T> = null;
switch (workerProcessType) {
case WorkerProcessType.WORKER_SET:
options.elementsPerWorker = options.elementsPerWorker ?? Constants.DEFAULT_CHARGING_STATIONS_PER_WORKER;
- workerImplementation = new WorkerSet(workerScript, options.elementsPerWorker, options.startDelay, options);
+ workerImplementation = new WorkerSet(workerScript, options.elementsPerWorker, { workerStartDelay: options.workerStartDelay, elementStartDelay: options.elementStartDelay }, options);
break;
case WorkerProcessType.STATIC_POOL:
options.poolMaxSize = options.poolMaxSize ?? Constants.DEFAULT_WORKER_POOL_MAX_SIZE;
- workerImplementation = new WorkerStaticPool(workerScript, options.poolMaxSize, options.startDelay, options.poolOptions);
+ workerImplementation = new WorkerStaticPool(workerScript, options.poolMaxSize, { workerStartDelay: options.workerStartDelay, elementStartDelay: options.elementStartDelay }, options.poolOptions);
break;
case WorkerProcessType.DYNAMIC_POOL:
options.poolMinSize = options.poolMinSize ?? Constants.DEFAULT_WORKER_POOL_MIN_SIZE;
options.poolMaxSize = options.poolMaxSize ?? Constants.DEFAULT_WORKER_POOL_MAX_SIZE;
- workerImplementation = new WorkerDynamicPool(workerScript, options.poolMinSize, options.poolMaxSize, options.startDelay, options.poolOptions);
+ workerImplementation = new WorkerDynamicPool(workerScript, options.poolMinSize, options.poolMaxSize, { workerStartDelay: options.workerStartDelay, elementStartDelay: options.elementStartDelay }, options.poolOptions);
break;
default:
throw new Error(`Worker implementation type '${workerProcessType}' not found`);
// Partial Copyright Jerome Benoit. 2021. All Rights Reserved.
-import { WorkerData, WorkerMessageEvents, WorkerOptions, WorkerSetElement } from '../types/Worker';
+import { WorkerData, WorkerMessageEvents, WorkerOptions, WorkerSetElement, WorkerStartOptions } from '../types/Worker';
import Utils from '../utils/Utils';
import { Worker } from 'worker_threads';
*
* @param workerScript
* @param maxElementsPerWorker
- * @param workerStartDelay
+ * @param workerStartOptions
* @param opts
*/
- constructor(workerScript: string, maxElementsPerWorker = 1, workerStartDelay?: number, opts?: WorkerOptions) {
- super(workerScript, workerStartDelay);
+ constructor(workerScript: string, maxElementsPerWorker = 1, workerStartOptions?: WorkerStartOptions, opts?: WorkerOptions) {
+ super(workerScript, workerStartOptions);
this.maxElementsPerWorker = maxElementsPerWorker;
this.messageHandler = opts?.messageHandler ?? (() => { /* This is intentional */ });
this.workerSet = new Set<WorkerSetElement>();
if (this.getLastWorkerSetElement().numberOfWorkerElements >= this.maxElementsPerWorker) {
this.startWorker();
// Start worker sequentially to optimize memory at startup
- await Utils.sleep(this.workerStartDelay);
+ this.workerStartDelay > 0 && await Utils.sleep(this.workerStartDelay);
}
this.getLastWorker().postMessage({ id: WorkerMessageEvents.START_WORKER_ELEMENT, data: elementData });
this.getLastWorkerSetElement().numberOfWorkerElements++;
+ this.elementStartDelay > 0 && await Utils.sleep(this.elementStartDelay);
}
/**
public async start(): Promise<void> {
this.startWorker();
// Start worker sequentially to optimize memory at startup
- await Utils.sleep(this.workerStartDelay);
+ this.workerStartDelay > 0 && await Utils.sleep(this.workerStartDelay);
}
/**
import { FixedThreadPool, PoolOptions } from 'poolifier';
+import { WorkerData, WorkerStartOptions } from '../types/Worker';
import Utils from '../utils/Utils';
import { Worker } from 'worker_threads';
import WorkerAbstract from './WorkerAbstract';
-import { WorkerData } from '../types/Worker';
import { WorkerUtils } from './WorkerUtils';
export default class WorkerStaticPool extends WorkerAbstract<WorkerData> {
*
* @param workerScript
* @param numberOfThreads
- * @param startWorkerDelay
+ * @param workerStartOptions
* @param opts
*/
- constructor(workerScript: string, numberOfThreads: number, startWorkerDelay?: number, opts?: PoolOptions<Worker>) {
- super(workerScript, startWorkerDelay);
+ constructor(workerScript: string, numberOfThreads: number, workerStartOptions?: WorkerStartOptions, opts?: PoolOptions<Worker>) {
+ super(workerScript, workerStartOptions);
opts.exitHandler = opts?.exitHandler ?? WorkerUtils.defaultExitHandler;
this.pool = new FixedThreadPool(numberOfThreads, this.workerScript, opts);
}
*/
public async addElement(elementData: WorkerData): Promise<void> {
await this.pool.execute(elementData);
- // Start worker sequentially to optimize memory at startup
- await Utils.sleep(this.workerStartDelay);
+ // Start element sequentially to optimize memory at startup
+ this.elementStartDelay > 0 && await Utils.sleep(this.elementStartDelay);
}
}