Add tunable for charging station start delay for linear ramp up
authorJérôme Benoit <jerome.benoit@sap.com>
Thu, 3 Mar 2022 16:53:19 +0000 (17:53 +0100)
committerJérôme Benoit <jerome.benoit@sap.com>
Thu, 3 Mar 2022 16:53:19 +0000 (17:53 +0100)
Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
README.md
src/charging-station/Bootstrap.ts
src/types/ConfigurationData.ts
src/types/Worker.ts
src/utils/Configuration.ts
src/utils/Constants.ts
src/worker/WorkerAbstract.ts
src/worker/WorkerDynamicPool.ts
src/worker/WorkerFactory.ts
src/worker/WorkerSet.ts
src/worker/WorkerStaticPool.ts

index 29bf46dd9c96c11ab77fced223a91f76ddad70b9..16bad1120a31acbf8087a4b48bf010dd2fc64fa2 100644 (file)
--- a/README.md
+++ b/README.md
@@ -52,7 +52,8 @@ Key | Value(s) | Default Value | Value type | Description
 supervisionUrls | | [] | string \| string[] | string or array of global connection URIs to OCPP-J servers
 supervisionUrlDistribution | round-robin/random/sequential | round-robin | boolean | supervision urls distribution policy to simulated charging stations
 workerProcess | workerSet/staticPool/dynamicPool | workerSet | string | worker threads process type
-workerStartDelay | | 500 | integer | milliseconds to wait at charging station worker threads startup
+workerStartDelay | | 500 | integer | milliseconds to wait at worker threads startup (only for workerSet threads process type)
+elementStartDelay | | 0 | integer | milliseconds to wait at charging station startup
 workerPoolMinSize | | 4 | integer | worker threads pool minimum number of threads
 workerPoolMaxSize | | 16 | integer | worker threads pool maximum number of threads
 workerPoolStrategy | ROUND_ROBIN/LESS_RECENTLY_USED/... | [poolifier](https://github.com/poolifier/poolifier) default: ROUND_ROBBIN | string | worker threads pool [poolifier](https://github.com/poolifier/poolifier) worker choice strategy
index 143bb5c4298ee229fe3c0d8d66ad69eebda37bf6..3aab9416ee4555e62f7e268d55d192f9607e95d2 100644 (file)
@@ -110,7 +110,8 @@ export default class Bootstrap {
   private initWorkerImplementation(): void {
     this.workerImplementation = WorkerFactory.getWorkerImplementation<ChargingStationWorkerData>(this.workerScript, Configuration.getWorkerProcess(),
       {
-        startDelay: Configuration.getWorkerStartDelay(),
+        workerStartDelay: Configuration.getWorkerStartDelay(),
+        elementStartDelay: Configuration.getElementStartDelay(),
         poolMaxSize: Configuration.getWorkerPoolMaxSize(),
         poolMinSize: Configuration.getWorkerPoolMinSize(),
         elementsPerWorker: Configuration.getChargingStationsPerWorker(),
index 54ce659cc81d369b632fc09b793c77776e36277e..94e65ed55d93f90c37fe361376bf96e5162e5b0b 100644 (file)
@@ -34,6 +34,7 @@ export default interface ConfigurationData {
   autoReconnectMaxRetries?: number;
   workerProcess?: WorkerProcessType;
   workerStartDelay?: number;
+  elementStartDelay?: number;
   workerPoolMinSize?: number;
   workerPoolMaxSize?: number;
   workerPoolStrategy?: WorkerChoiceStrategy;
index 7a96335535b5dbb4f98d847edbf9e37b4c7a7f26..4654fdcf6c57b7bf652773d60952ac112a5b6a8b 100644 (file)
@@ -9,7 +9,8 @@ export enum WorkerProcessType {
 }
 
 export interface WorkerOptions {
-  startDelay?: number;
+  workerStartDelay?: number;
+  elementStartDelay?: number;
   poolMaxSize?: number;
   poolMinSize?: number;
   elementsPerWorker?: number;
@@ -17,6 +18,11 @@ export interface WorkerOptions {
   messageHandler?: (message: unknown) => void | Promise<void>;
 }
 
+export interface WorkerStartOptions {
+  workerStartDelay: number;
+  elementStartDelay: number;
+}
+
 export type WorkerData = JsonType;
 
 export interface WorkerSetElement {
index d88526a23648da293f0c6556c731c80b050b99b4..86c9335802bfc1a19eb4c11e10ab074a4ab22dbc 100644 (file)
@@ -103,6 +103,10 @@ export default class Configuration {
     return Configuration.objectHasOwnProperty(Configuration.getConfig(), 'workerStartDelay') ? Configuration.getConfig().workerStartDelay : Constants.WORKER_START_DELAY;
   }
 
+  static getElementStartDelay(): number {
+    return Configuration.objectHasOwnProperty(Configuration.getConfig(), 'elementStartDelay') ? Configuration.getConfig().elementStartDelay : Constants.ELEMENT_START_DELAY;
+  }
+
   static getWorkerPoolMinSize(): number {
     return Configuration.objectHasOwnProperty(Configuration.getConfig(), 'workerPoolMinSize') ? Configuration.getConfig().workerPoolMinSize : Constants.DEFAULT_WORKER_POOL_MIN_SIZE;
   }
index 2e43d37697d2e47c603d3b5f312f9d54bef12631..91f5231c073510e939d1b5bc20694d8ed8c1549f 100644 (file)
@@ -35,6 +35,7 @@ export default class Constants {
 
   static readonly TRANSACTION_DEFAULT_IDTAG = '00000000';
 
+  static readonly ELEMENT_START_DELAY = 0;
   static readonly WORKER_START_DELAY = 500;
   static readonly WORKER_POOL_MAX_INACTIVE_TIME = 60000;
   static readonly DEFAULT_WORKER_POOL_MIN_SIZE = 4;
index 7c662d9dead4aa46e519b2519d1e560c774615b8..351a61d5284b54b037e6c181882526d5ddc7fe0b 100644 (file)
@@ -1,9 +1,11 @@
+import { WorkerData, WorkerStartOptions } from '../types/Worker';
+
 import Constants from '../utils/Constants';
-import { WorkerData } from '../types/Worker';
 
 export default abstract class WorkerAbstract<T extends WorkerData> {
   protected readonly workerScript: string;
   protected readonly workerStartDelay: number;
+  protected readonly elementStartDelay: number;
   public abstract readonly size: number;
   public abstract readonly maxElementsPerWorker: number | null;
 
@@ -11,11 +13,15 @@ export default abstract class WorkerAbstract<T extends WorkerData> {
    * `WorkerAbstract` constructor.
    *
    * @param workerScript
-   * @param workerStartDelay
+   * @param workerStartOptions
    */
-  constructor(workerScript: string, workerStartDelay: number = Constants.WORKER_START_DELAY) {
+  constructor(workerScript: string, workerStartOptions: WorkerStartOptions = {
+    workerStartDelay: Constants.WORKER_START_DELAY,
+    elementStartDelay: Constants.ELEMENT_START_DELAY
+  }) {
     this.workerScript = workerScript;
-    this.workerStartDelay = workerStartDelay;
+    this.workerStartDelay = workerStartOptions.workerStartDelay;
+    this.elementStartDelay = workerStartOptions.elementStartDelay;
   }
 
   public abstract start(): Promise<void>;
index 37b6ddd15c32a71f43a2de51a91af308ce31ac3e..ec5ffc372b9ee0f24ea97f4e8f611677ff17af9b 100644 (file)
@@ -1,9 +1,9 @@
 import { DynamicThreadPool, PoolOptions } from 'poolifier';
+import { WorkerData, WorkerStartOptions } from '../types/Worker';
 
 import Utils from '../utils/Utils';
 import { Worker } from 'worker_threads';
 import WorkerAbstract from './WorkerAbstract';
-import { WorkerData } from '../types/Worker';
 import { WorkerUtils } from './WorkerUtils';
 
 export default class WorkerDynamicPool extends WorkerAbstract<WorkerData> {
@@ -15,11 +15,11 @@ export default class WorkerDynamicPool extends WorkerAbstract<WorkerData> {
    * @param workerScript
    * @param min
    * @param max
-   * @param workerStartDelay
+   * @param workerStartOptions
    * @param opts
    */
-  constructor(workerScript: string, min: number, max: number, workerStartDelay?: number, opts?: PoolOptions<Worker>) {
-    super(workerScript, workerStartDelay);
+  constructor(workerScript: string, min: number, max: number, workerStartOptions?: WorkerStartOptions, opts?: PoolOptions<Worker>) {
+    super(workerScript, workerStartOptions);
     opts.exitHandler = opts?.exitHandler ?? WorkerUtils.defaultExitHandler;
     this.pool = new DynamicThreadPool<WorkerData>(min, max, this.workerScript, opts);
   }
@@ -58,7 +58,7 @@ export default class WorkerDynamicPool extends WorkerAbstract<WorkerData> {
    */
   public async addElement(elementData: WorkerData): Promise<void> {
     await this.pool.execute(elementData);
-    // Start worker sequentially to optimize memory at startup
-    await Utils.sleep(this.workerStartDelay);
+    // Start element sequentially to optimize memory at startup
+    this.elementStartDelay > 0 && await Utils.sleep(this.elementStartDelay);
   }
 }
index 929bf6199b94c91fdbc28014daf74fe80d9ddd52..ccea2a38f918c99881d90af39812ac4484db2ed3 100644 (file)
@@ -18,23 +18,24 @@ export default class WorkerFactory {
       throw new Error('Trying to get a worker implementation outside the main thread');
     }
     options = options ?? {} as WorkerOptions;
-    options.startDelay = options?.startDelay ?? Constants.WORKER_START_DELAY;
+    options.workerStartDelay = options?.workerStartDelay ?? Constants.WORKER_START_DELAY;
+    options.elementStartDelay = options?.elementStartDelay ?? Constants.ELEMENT_START_DELAY;
     options.poolOptions = options?.poolOptions ?? {} as PoolOptions<Worker>;
     options?.messageHandler && (options.poolOptions.messageHandler = options.messageHandler);
     let workerImplementation: WorkerAbstract<T> = null;
     switch (workerProcessType) {
       case WorkerProcessType.WORKER_SET:
         options.elementsPerWorker = options.elementsPerWorker ?? Constants.DEFAULT_CHARGING_STATIONS_PER_WORKER;
-        workerImplementation = new WorkerSet(workerScript, options.elementsPerWorker, options.startDelay, options);
+        workerImplementation = new WorkerSet(workerScript, options.elementsPerWorker, { workerStartDelay: options.workerStartDelay, elementStartDelay: options.elementStartDelay }, options);
         break;
       case WorkerProcessType.STATIC_POOL:
         options.poolMaxSize = options.poolMaxSize ?? Constants.DEFAULT_WORKER_POOL_MAX_SIZE;
-        workerImplementation = new WorkerStaticPool(workerScript, options.poolMaxSize, options.startDelay, options.poolOptions);
+        workerImplementation = new WorkerStaticPool(workerScript, options.poolMaxSize, { workerStartDelay: options.workerStartDelay, elementStartDelay: options.elementStartDelay }, options.poolOptions);
         break;
       case WorkerProcessType.DYNAMIC_POOL:
         options.poolMinSize = options.poolMinSize ?? Constants.DEFAULT_WORKER_POOL_MIN_SIZE;
         options.poolMaxSize = options.poolMaxSize ?? Constants.DEFAULT_WORKER_POOL_MAX_SIZE;
-        workerImplementation = new WorkerDynamicPool(workerScript, options.poolMinSize, options.poolMaxSize, options.startDelay, options.poolOptions);
+        workerImplementation = new WorkerDynamicPool(workerScript, options.poolMinSize, options.poolMaxSize, { workerStartDelay: options.workerStartDelay, elementStartDelay: options.elementStartDelay }, options.poolOptions);
         break;
       default:
         throw new Error(`Worker implementation type '${workerProcessType}' not found`);
index 64399c87616b11b488d63a60e45620eca7cb2d3b..dff07f2391756d882aeece772d225be182002577 100644 (file)
@@ -1,6 +1,6 @@
 // Partial Copyright Jerome Benoit. 2021. All Rights Reserved.
 
-import { WorkerData, WorkerMessageEvents, WorkerOptions, WorkerSetElement } from '../types/Worker';
+import { WorkerData, WorkerMessageEvents, WorkerOptions, WorkerSetElement, WorkerStartOptions } from '../types/Worker';
 
 import Utils from '../utils/Utils';
 import { Worker } from 'worker_threads';
@@ -17,11 +17,11 @@ export default class WorkerSet extends WorkerAbstract<WorkerData> {
    *
    * @param workerScript
    * @param maxElementsPerWorker
-   * @param workerStartDelay
+   * @param workerStartOptions
    * @param opts
    */
-  constructor(workerScript: string, maxElementsPerWorker = 1, workerStartDelay?: number, opts?: WorkerOptions) {
-    super(workerScript, workerStartDelay);
+  constructor(workerScript: string, maxElementsPerWorker = 1, workerStartOptions?: WorkerStartOptions, opts?: WorkerOptions) {
+    super(workerScript, workerStartOptions);
     this.maxElementsPerWorker = maxElementsPerWorker;
     this.messageHandler = opts?.messageHandler ?? (() => { /* This is intentional */ });
     this.workerSet = new Set<WorkerSetElement>();
@@ -44,10 +44,11 @@ export default class WorkerSet extends WorkerAbstract<WorkerData> {
     if (this.getLastWorkerSetElement().numberOfWorkerElements >= this.maxElementsPerWorker) {
       this.startWorker();
       // Start worker sequentially to optimize memory at startup
-      await Utils.sleep(this.workerStartDelay);
+      this.workerStartDelay > 0 && await Utils.sleep(this.workerStartDelay);
     }
     this.getLastWorker().postMessage({ id: WorkerMessageEvents.START_WORKER_ELEMENT, data: elementData });
     this.getLastWorkerSetElement().numberOfWorkerElements++;
+    this.elementStartDelay > 0 && await Utils.sleep(this.elementStartDelay);
   }
 
   /**
@@ -58,7 +59,7 @@ export default class WorkerSet extends WorkerAbstract<WorkerData> {
   public async start(): Promise<void> {
     this.startWorker();
     // Start worker sequentially to optimize memory at startup
-    await Utils.sleep(this.workerStartDelay);
+    this.workerStartDelay > 0 && await Utils.sleep(this.workerStartDelay);
   }
 
   /**
index 1cbd18e8b9b1cf89094350c71ac2b1488d2d0aac..cbf46298d67022aeef3a8e35969a4643ed01ce73 100644 (file)
@@ -1,9 +1,9 @@
 import { FixedThreadPool, PoolOptions } from 'poolifier';
+import { WorkerData, WorkerStartOptions } from '../types/Worker';
 
 import Utils from '../utils/Utils';
 import { Worker } from 'worker_threads';
 import WorkerAbstract from './WorkerAbstract';
-import { WorkerData } from '../types/Worker';
 import { WorkerUtils } from './WorkerUtils';
 
 export default class WorkerStaticPool extends WorkerAbstract<WorkerData> {
@@ -14,11 +14,11 @@ export default class WorkerStaticPool extends WorkerAbstract<WorkerData> {
    *
    * @param workerScript
    * @param numberOfThreads
-   * @param startWorkerDelay
+   * @param workerStartOptions
    * @param opts
    */
-  constructor(workerScript: string, numberOfThreads: number, startWorkerDelay?: number, opts?: PoolOptions<Worker>) {
-    super(workerScript, startWorkerDelay);
+  constructor(workerScript: string, numberOfThreads: number, workerStartOptions?: WorkerStartOptions, opts?: PoolOptions<Worker>) {
+    super(workerScript, workerStartOptions);
     opts.exitHandler = opts?.exitHandler ?? WorkerUtils.defaultExitHandler;
     this.pool = new FixedThreadPool(numberOfThreads, this.workerScript, opts);
   }
@@ -57,7 +57,7 @@ export default class WorkerStaticPool extends WorkerAbstract<WorkerData> {
    */
   public async addElement(elementData: WorkerData): Promise<void> {
     await this.pool.execute(elementData);
-    // Start worker sequentially to optimize memory at startup
-    await Utils.sleep(this.workerStartDelay);
+    // Start element sequentially to optimize memory at startup
+    this.elementStartDelay > 0 && await Utils.sleep(this.elementStartDelay);
   }
 }