From 4bfd80fa794b4b2f89427ef9f5944664ce3bdade Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Thu, 3 Mar 2022 17:53:19 +0100 Subject: [PATCH] Add tunable for charging station start delay for linear ramp up MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- README.md | 3 ++- src/charging-station/Bootstrap.ts | 3 ++- src/types/ConfigurationData.ts | 1 + src/types/Worker.ts | 8 +++++++- src/utils/Configuration.ts | 4 ++++ src/utils/Constants.ts | 1 + src/worker/WorkerAbstract.ts | 14 ++++++++++---- src/worker/WorkerDynamicPool.ts | 12 ++++++------ src/worker/WorkerFactory.ts | 9 +++++---- src/worker/WorkerSet.ts | 13 +++++++------ src/worker/WorkerStaticPool.ts | 12 ++++++------ 11 files changed, 51 insertions(+), 29 deletions(-) diff --git a/README.md b/README.md index 29bf46dd..16bad112 100644 --- a/README.md +++ b/README.md @@ -52,7 +52,8 @@ Key | Value(s) | Default Value | Value type | Description supervisionUrls | | [] | string \| string[] | string or array of global connection URIs to OCPP-J servers supervisionUrlDistribution | round-robin/random/sequential | round-robin | boolean | supervision urls distribution policy to simulated charging stations workerProcess | workerSet/staticPool/dynamicPool | workerSet | string | worker threads process type -workerStartDelay | | 500 | integer | milliseconds to wait at charging station worker threads startup +workerStartDelay | | 500 | integer | milliseconds to wait at worker threads startup (only for workerSet threads process type) +elementStartDelay | | 0 | integer | milliseconds to wait at charging station startup workerPoolMinSize | | 4 | integer | worker threads pool minimum number of threads workerPoolMaxSize | | 16 | integer | worker threads pool maximum number of threads workerPoolStrategy | ROUND_ROBIN/LESS_RECENTLY_USED/... | [poolifier](https://github.com/poolifier/poolifier) default: ROUND_ROBBIN | string | worker threads pool [poolifier](https://github.com/poolifier/poolifier) worker choice strategy diff --git a/src/charging-station/Bootstrap.ts b/src/charging-station/Bootstrap.ts index 143bb5c4..3aab9416 100644 --- a/src/charging-station/Bootstrap.ts +++ b/src/charging-station/Bootstrap.ts @@ -110,7 +110,8 @@ export default class Bootstrap { private initWorkerImplementation(): void { this.workerImplementation = WorkerFactory.getWorkerImplementation(this.workerScript, Configuration.getWorkerProcess(), { - startDelay: Configuration.getWorkerStartDelay(), + workerStartDelay: Configuration.getWorkerStartDelay(), + elementStartDelay: Configuration.getElementStartDelay(), poolMaxSize: Configuration.getWorkerPoolMaxSize(), poolMinSize: Configuration.getWorkerPoolMinSize(), elementsPerWorker: Configuration.getChargingStationsPerWorker(), diff --git a/src/types/ConfigurationData.ts b/src/types/ConfigurationData.ts index 54ce659c..94e65ed5 100644 --- a/src/types/ConfigurationData.ts +++ b/src/types/ConfigurationData.ts @@ -34,6 +34,7 @@ export default interface ConfigurationData { autoReconnectMaxRetries?: number; workerProcess?: WorkerProcessType; workerStartDelay?: number; + elementStartDelay?: number; workerPoolMinSize?: number; workerPoolMaxSize?: number; workerPoolStrategy?: WorkerChoiceStrategy; diff --git a/src/types/Worker.ts b/src/types/Worker.ts index 7a963355..4654fdcf 100644 --- a/src/types/Worker.ts +++ b/src/types/Worker.ts @@ -9,7 +9,8 @@ export enum WorkerProcessType { } export interface WorkerOptions { - startDelay?: number; + workerStartDelay?: number; + elementStartDelay?: number; poolMaxSize?: number; poolMinSize?: number; elementsPerWorker?: number; @@ -17,6 +18,11 @@ export interface WorkerOptions { messageHandler?: (message: unknown) => void | Promise; } +export interface WorkerStartOptions { + workerStartDelay: number; + elementStartDelay: number; +} + export type WorkerData = JsonType; export interface WorkerSetElement { diff --git a/src/utils/Configuration.ts b/src/utils/Configuration.ts index d88526a2..86c93358 100644 --- a/src/utils/Configuration.ts +++ b/src/utils/Configuration.ts @@ -103,6 +103,10 @@ export default class Configuration { return Configuration.objectHasOwnProperty(Configuration.getConfig(), 'workerStartDelay') ? Configuration.getConfig().workerStartDelay : Constants.WORKER_START_DELAY; } + static getElementStartDelay(): number { + return Configuration.objectHasOwnProperty(Configuration.getConfig(), 'elementStartDelay') ? Configuration.getConfig().elementStartDelay : Constants.ELEMENT_START_DELAY; + } + static getWorkerPoolMinSize(): number { return Configuration.objectHasOwnProperty(Configuration.getConfig(), 'workerPoolMinSize') ? Configuration.getConfig().workerPoolMinSize : Constants.DEFAULT_WORKER_POOL_MIN_SIZE; } diff --git a/src/utils/Constants.ts b/src/utils/Constants.ts index 2e43d376..91f5231c 100644 --- a/src/utils/Constants.ts +++ b/src/utils/Constants.ts @@ -35,6 +35,7 @@ export default class Constants { static readonly TRANSACTION_DEFAULT_IDTAG = '00000000'; + static readonly ELEMENT_START_DELAY = 0; static readonly WORKER_START_DELAY = 500; static readonly WORKER_POOL_MAX_INACTIVE_TIME = 60000; static readonly DEFAULT_WORKER_POOL_MIN_SIZE = 4; diff --git a/src/worker/WorkerAbstract.ts b/src/worker/WorkerAbstract.ts index 7c662d9d..351a61d5 100644 --- a/src/worker/WorkerAbstract.ts +++ b/src/worker/WorkerAbstract.ts @@ -1,9 +1,11 @@ +import { WorkerData, WorkerStartOptions } from '../types/Worker'; + import Constants from '../utils/Constants'; -import { WorkerData } from '../types/Worker'; export default abstract class WorkerAbstract { protected readonly workerScript: string; protected readonly workerStartDelay: number; + protected readonly elementStartDelay: number; public abstract readonly size: number; public abstract readonly maxElementsPerWorker: number | null; @@ -11,11 +13,15 @@ export default abstract class WorkerAbstract { * `WorkerAbstract` constructor. * * @param workerScript - * @param workerStartDelay + * @param workerStartOptions */ - constructor(workerScript: string, workerStartDelay: number = Constants.WORKER_START_DELAY) { + constructor(workerScript: string, workerStartOptions: WorkerStartOptions = { + workerStartDelay: Constants.WORKER_START_DELAY, + elementStartDelay: Constants.ELEMENT_START_DELAY + }) { this.workerScript = workerScript; - this.workerStartDelay = workerStartDelay; + this.workerStartDelay = workerStartOptions.workerStartDelay; + this.elementStartDelay = workerStartOptions.elementStartDelay; } public abstract start(): Promise; diff --git a/src/worker/WorkerDynamicPool.ts b/src/worker/WorkerDynamicPool.ts index 37b6ddd1..ec5ffc37 100644 --- a/src/worker/WorkerDynamicPool.ts +++ b/src/worker/WorkerDynamicPool.ts @@ -1,9 +1,9 @@ import { DynamicThreadPool, PoolOptions } from 'poolifier'; +import { WorkerData, WorkerStartOptions } from '../types/Worker'; import Utils from '../utils/Utils'; import { Worker } from 'worker_threads'; import WorkerAbstract from './WorkerAbstract'; -import { WorkerData } from '../types/Worker'; import { WorkerUtils } from './WorkerUtils'; export default class WorkerDynamicPool extends WorkerAbstract { @@ -15,11 +15,11 @@ export default class WorkerDynamicPool extends WorkerAbstract { * @param workerScript * @param min * @param max - * @param workerStartDelay + * @param workerStartOptions * @param opts */ - constructor(workerScript: string, min: number, max: number, workerStartDelay?: number, opts?: PoolOptions) { - super(workerScript, workerStartDelay); + constructor(workerScript: string, min: number, max: number, workerStartOptions?: WorkerStartOptions, opts?: PoolOptions) { + super(workerScript, workerStartOptions); opts.exitHandler = opts?.exitHandler ?? WorkerUtils.defaultExitHandler; this.pool = new DynamicThreadPool(min, max, this.workerScript, opts); } @@ -58,7 +58,7 @@ export default class WorkerDynamicPool extends WorkerAbstract { */ public async addElement(elementData: WorkerData): Promise { await this.pool.execute(elementData); - // Start worker sequentially to optimize memory at startup - await Utils.sleep(this.workerStartDelay); + // Start element sequentially to optimize memory at startup + this.elementStartDelay > 0 && await Utils.sleep(this.elementStartDelay); } } diff --git a/src/worker/WorkerFactory.ts b/src/worker/WorkerFactory.ts index 929bf619..ccea2a38 100644 --- a/src/worker/WorkerFactory.ts +++ b/src/worker/WorkerFactory.ts @@ -18,23 +18,24 @@ export default class WorkerFactory { throw new Error('Trying to get a worker implementation outside the main thread'); } options = options ?? {} as WorkerOptions; - options.startDelay = options?.startDelay ?? Constants.WORKER_START_DELAY; + options.workerStartDelay = options?.workerStartDelay ?? Constants.WORKER_START_DELAY; + options.elementStartDelay = options?.elementStartDelay ?? Constants.ELEMENT_START_DELAY; options.poolOptions = options?.poolOptions ?? {} as PoolOptions; options?.messageHandler && (options.poolOptions.messageHandler = options.messageHandler); let workerImplementation: WorkerAbstract = null; switch (workerProcessType) { case WorkerProcessType.WORKER_SET: options.elementsPerWorker = options.elementsPerWorker ?? Constants.DEFAULT_CHARGING_STATIONS_PER_WORKER; - workerImplementation = new WorkerSet(workerScript, options.elementsPerWorker, options.startDelay, options); + workerImplementation = new WorkerSet(workerScript, options.elementsPerWorker, { workerStartDelay: options.workerStartDelay, elementStartDelay: options.elementStartDelay }, options); break; case WorkerProcessType.STATIC_POOL: options.poolMaxSize = options.poolMaxSize ?? Constants.DEFAULT_WORKER_POOL_MAX_SIZE; - workerImplementation = new WorkerStaticPool(workerScript, options.poolMaxSize, options.startDelay, options.poolOptions); + workerImplementation = new WorkerStaticPool(workerScript, options.poolMaxSize, { workerStartDelay: options.workerStartDelay, elementStartDelay: options.elementStartDelay }, options.poolOptions); break; case WorkerProcessType.DYNAMIC_POOL: options.poolMinSize = options.poolMinSize ?? Constants.DEFAULT_WORKER_POOL_MIN_SIZE; options.poolMaxSize = options.poolMaxSize ?? Constants.DEFAULT_WORKER_POOL_MAX_SIZE; - workerImplementation = new WorkerDynamicPool(workerScript, options.poolMinSize, options.poolMaxSize, options.startDelay, options.poolOptions); + workerImplementation = new WorkerDynamicPool(workerScript, options.poolMinSize, options.poolMaxSize, { workerStartDelay: options.workerStartDelay, elementStartDelay: options.elementStartDelay }, options.poolOptions); break; default: throw new Error(`Worker implementation type '${workerProcessType}' not found`); diff --git a/src/worker/WorkerSet.ts b/src/worker/WorkerSet.ts index 64399c87..dff07f23 100644 --- a/src/worker/WorkerSet.ts +++ b/src/worker/WorkerSet.ts @@ -1,6 +1,6 @@ // Partial Copyright Jerome Benoit. 2021. All Rights Reserved. -import { WorkerData, WorkerMessageEvents, WorkerOptions, WorkerSetElement } from '../types/Worker'; +import { WorkerData, WorkerMessageEvents, WorkerOptions, WorkerSetElement, WorkerStartOptions } from '../types/Worker'; import Utils from '../utils/Utils'; import { Worker } from 'worker_threads'; @@ -17,11 +17,11 @@ export default class WorkerSet extends WorkerAbstract { * * @param workerScript * @param maxElementsPerWorker - * @param workerStartDelay + * @param workerStartOptions * @param opts */ - constructor(workerScript: string, maxElementsPerWorker = 1, workerStartDelay?: number, opts?: WorkerOptions) { - super(workerScript, workerStartDelay); + constructor(workerScript: string, maxElementsPerWorker = 1, workerStartOptions?: WorkerStartOptions, opts?: WorkerOptions) { + super(workerScript, workerStartOptions); this.maxElementsPerWorker = maxElementsPerWorker; this.messageHandler = opts?.messageHandler ?? (() => { /* This is intentional */ }); this.workerSet = new Set(); @@ -44,10 +44,11 @@ export default class WorkerSet extends WorkerAbstract { if (this.getLastWorkerSetElement().numberOfWorkerElements >= this.maxElementsPerWorker) { this.startWorker(); // Start worker sequentially to optimize memory at startup - await Utils.sleep(this.workerStartDelay); + this.workerStartDelay > 0 && await Utils.sleep(this.workerStartDelay); } this.getLastWorker().postMessage({ id: WorkerMessageEvents.START_WORKER_ELEMENT, data: elementData }); this.getLastWorkerSetElement().numberOfWorkerElements++; + this.elementStartDelay > 0 && await Utils.sleep(this.elementStartDelay); } /** @@ -58,7 +59,7 @@ export default class WorkerSet extends WorkerAbstract { public async start(): Promise { this.startWorker(); // Start worker sequentially to optimize memory at startup - await Utils.sleep(this.workerStartDelay); + this.workerStartDelay > 0 && await Utils.sleep(this.workerStartDelay); } /** diff --git a/src/worker/WorkerStaticPool.ts b/src/worker/WorkerStaticPool.ts index 1cbd18e8..cbf46298 100644 --- a/src/worker/WorkerStaticPool.ts +++ b/src/worker/WorkerStaticPool.ts @@ -1,9 +1,9 @@ import { FixedThreadPool, PoolOptions } from 'poolifier'; +import { WorkerData, WorkerStartOptions } from '../types/Worker'; import Utils from '../utils/Utils'; import { Worker } from 'worker_threads'; import WorkerAbstract from './WorkerAbstract'; -import { WorkerData } from '../types/Worker'; import { WorkerUtils } from './WorkerUtils'; export default class WorkerStaticPool extends WorkerAbstract { @@ -14,11 +14,11 @@ export default class WorkerStaticPool extends WorkerAbstract { * * @param workerScript * @param numberOfThreads - * @param startWorkerDelay + * @param workerStartOptions * @param opts */ - constructor(workerScript: string, numberOfThreads: number, startWorkerDelay?: number, opts?: PoolOptions) { - super(workerScript, startWorkerDelay); + constructor(workerScript: string, numberOfThreads: number, workerStartOptions?: WorkerStartOptions, opts?: PoolOptions) { + super(workerScript, workerStartOptions); opts.exitHandler = opts?.exitHandler ?? WorkerUtils.defaultExitHandler; this.pool = new FixedThreadPool(numberOfThreads, this.workerScript, opts); } @@ -57,7 +57,7 @@ export default class WorkerStaticPool extends WorkerAbstract { */ public async addElement(elementData: WorkerData): Promise { await this.pool.execute(elementData); - // Start worker sequentially to optimize memory at startup - await Utils.sleep(this.workerStartDelay); + // Start element sequentially to optimize memory at startup + this.elementStartDelay > 0 && await Utils.sleep(this.elementStartDelay); } } -- 2.34.1