Fix workerSet startup
authorJérôme Benoit <jerome.benoit@sap.com>
Fri, 4 Mar 2022 16:09:32 +0000 (17:09 +0100)
committerJérôme Benoit <jerome.benoit@sap.com>
Fri, 4 Mar 2022 16:09:32 +0000 (17:09 +0100)
Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
src/charging-station/ChargingStationWorker.ts
src/types/ChargingStationWorker.ts
src/types/Worker.ts
src/worker/WorkerFactory.ts
src/worker/WorkerSet.ts

index 5ad5f6646920246a248ee93596e4325d907df1c8..cc0f62726a174350095ade3fd4805a0204c753bd 100644 (file)
@@ -24,10 +24,9 @@ if (Utils.workerPoolInUse()) {
  * Listen messages send by the main thread
  */
 function addMessageListener(): void {
-  parentPort?.on('message', async (message: ChargingStationWorkerMessage) => {
+  parentPort?.on('message', (message: ChargingStationWorkerMessage) => {
     if (message.id === ChargingStationWorkerMessageEvents.START_WORKER_ELEMENT) {
       startChargingStation(message.data);
-      message.workerOptions?.elementStartDelay > 0 && await Utils.sleep(this.workerOptions.elementStartDelay);
     }
   });
 }
index 8bdc9af04047ac3df6787f8accbc4e61b2909ccb..155a353a0d61efb84bf894de867a86fb8c233a03 100644 (file)
@@ -1,8 +1,15 @@
 import { WorkerData, WorkerMessage, WorkerMessageEvents } from './Worker';
 
+import { JsonType } from './JsonType';
+
+export interface ChargingStationWorkerOptions extends JsonType {
+  elementStartDelay?: number;
+}
+
 export interface ChargingStationWorkerData extends WorkerData {
   index: number;
   templateFile: string;
+  chargingStationWorkerOptions?: ChargingStationWorkerOptions;
 }
 
 enum InternalChargingStationWorkerMessageEvents {
index 633fb7a688c920cdfc085e5cb53eb063aa984f56..aca1922e02a1df88e098068f12f295ab9f16fe1e 100644 (file)
@@ -27,7 +27,6 @@ export interface WorkerSetElement {
 
 export interface WorkerMessage<T extends WorkerData> {
   id: WorkerMessageEvents;
-  workerOptions?: WorkerOptions;
   data: T;
 }
 
index 3a2d3fa79b153c96424d97ad3820374ee2772042..debd3f6a21342797ac15a6ae23eb5f0e664edce5 100644 (file)
@@ -13,29 +13,29 @@ export default class WorkerFactory {
     // This is intentional
   }
 
-  public static getWorkerImplementation<T extends WorkerData>(workerScript: string, workerProcessType: WorkerProcessType, options?: WorkerOptions): WorkerAbstract<T> | null {
+  public static getWorkerImplementation<T extends WorkerData>(workerScript: string, workerProcessType: WorkerProcessType, workerOptions?: WorkerOptions): WorkerAbstract<T> | null {
     if (!isMainThread) {
       throw new Error('Trying to get a worker implementation outside the main thread');
     }
-    options = options ?? {} as WorkerOptions;
-    options.workerStartDelay = options?.workerStartDelay ?? Constants.WORKER_START_DELAY;
-    options.elementStartDelay = options?.elementStartDelay ?? Constants.ELEMENT_START_DELAY;
-    options.poolOptions = options?.poolOptions ?? {} as PoolOptions<Worker>;
-    options?.messageHandler && (options.poolOptions.messageHandler = options.messageHandler);
+    workerOptions = workerOptions ?? {} as WorkerOptions;
+    workerOptions.workerStartDelay = workerOptions?.workerStartDelay ?? Constants.WORKER_START_DELAY;
+    workerOptions.elementStartDelay = workerOptions?.elementStartDelay ?? Constants.ELEMENT_START_DELAY;
+    workerOptions.poolOptions = workerOptions?.poolOptions ?? {} as PoolOptions<Worker>;
+    workerOptions?.messageHandler && (workerOptions.poolOptions.messageHandler = workerOptions.messageHandler);
     let workerImplementation: WorkerAbstract<T> = null;
     switch (workerProcessType) {
       case WorkerProcessType.WORKER_SET:
-        options.elementsPerWorker = options?.elementsPerWorker ?? Constants.DEFAULT_CHARGING_STATIONS_PER_WORKER;
-        workerImplementation = new WorkerSet(workerScript, options);
+        workerOptions.elementsPerWorker = workerOptions?.elementsPerWorker ?? Constants.DEFAULT_CHARGING_STATIONS_PER_WORKER;
+        workerImplementation = new WorkerSet(workerScript, workerOptions);
         break;
       case WorkerProcessType.STATIC_POOL:
-        options.poolMaxSize = options?.poolMaxSize ?? Constants.DEFAULT_WORKER_POOL_MAX_SIZE;
-        workerImplementation = new WorkerStaticPool(workerScript, options);
+        workerOptions.poolMaxSize = workerOptions?.poolMaxSize ?? Constants.DEFAULT_WORKER_POOL_MAX_SIZE;
+        workerImplementation = new WorkerStaticPool(workerScript, workerOptions);
         break;
       case WorkerProcessType.DYNAMIC_POOL:
-        options.poolMinSize = options?.poolMinSize ?? Constants.DEFAULT_WORKER_POOL_MIN_SIZE;
-        options.poolMaxSize = options?.poolMaxSize ?? Constants.DEFAULT_WORKER_POOL_MAX_SIZE;
-        workerImplementation = new WorkerDynamicPool(workerScript, options);
+        workerOptions.poolMinSize = workerOptions?.poolMinSize ?? Constants.DEFAULT_WORKER_POOL_MIN_SIZE;
+        workerOptions.poolMaxSize = workerOptions?.poolMaxSize ?? Constants.DEFAULT_WORKER_POOL_MAX_SIZE;
+        workerImplementation = new WorkerDynamicPool(workerScript, workerOptions);
         break;
       default:
         throw new Error(`Worker implementation type '${workerProcessType}' not found`);
index b62f63df546d3709e3c00bb4836131501b0076f2..5bc399189e5b079f9bad7cd4b7ab49c3353c10d0 100644 (file)
@@ -44,8 +44,15 @@ export default class WorkerSet extends WorkerAbstract<WorkerData> {
     if (this.getLastWorkerSetElement().numberOfWorkerElements >= this.workerOptions.elementsPerWorker) {
       await this.startWorker();
     }
-    this.getLastWorker().postMessage({ id: WorkerMessageEvents.START_WORKER_ELEMENT, workerOptions: this.workerOptions, data: elementData });
+    this.getLastWorker().postMessage({
+      id: WorkerMessageEvents.START_WORKER_ELEMENT,
+      data: elementData
+    });
     this.getLastWorkerSetElement().numberOfWorkerElements++;
+    // Start element sequentially to optimize memory at startup
+    if (this.workerOptions.elementStartDelay > 0) {
+      await Utils.sleep(this.workerOptions.elementStartDelay);
+    }
   }
 
   /**