Fix worker with pool handling
authorJérôme Benoit <jerome.benoit@sap.com>
Thu, 21 Jan 2021 12:10:16 +0000 (13:10 +0100)
committerJérôme Benoit <jerome.benoit@sap.com>
Thu, 21 Jan 2021 12:10:16 +0000 (13:10 +0100)
Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
src/charging-station/StationWorker.ts
src/charging-station/Worker.ts
src/start.ts
src/types/WorkerData.ts
src/utils/Constants.ts

index 8eac9e46ab2bc1c0aa7b15cd687bf1f53d971e53..a948a1b4bbb1bfc1572ffe3dbac8a47d6e6149bc 100644 (file)
@@ -4,8 +4,7 @@ import ChargingStation from './ChargingStation';
 import Constants from '../utils/Constants';
 
 if (!isMainThread) {
-  const station = new ChargingStation(workerData.index as number, workerData.templateFile as string);
-  station.start();
+  startChargingStation({ index: workerData.index as number, templateFile: workerData.templateFile as string });
 
   // Listener: start new charging station from main thread
   addListener();
@@ -14,7 +13,7 @@ if (!isMainThread) {
 function addListener() {
   parentPort.setMaxListeners(Constants.MAX_LISTENERS);
   parentPort.on('message', (e) => {
-    if (e.id === Constants.START_CHARGING_STATION) {
+    if (e.id === Constants.START_WORKER_ELEMENT) {
       startChargingStation(e.workerData);
     }
   });
index 84ec9edff55ba258813e498cfbdb45a4daa4ded8..4137429780aa3fda1f12de4fdd8ab18be9098d48 100644 (file)
@@ -9,7 +9,7 @@ export default class Wrk {
   private _workerScript: string;
   private _workerData: WorkerData;
   private _index: number;
-  private _concurrentWorkers: number;
+  private _maxWorkerElements: number;
   private _worker: Worker;
 
   /**
@@ -17,26 +17,24 @@ export default class Wrk {
    *
    * @param {string} workerScript
    * @param {WorkerData} workerData
-   * @param {number} numConcurrentWorkers
+   * @param {number} maxWorkerElements
    */
-  constructor(workerScript: string, workerData: WorkerData, numConcurrentWorkers: number) {
+  constructor(workerScript: string, workerData: WorkerData, maxWorkerElements = 1) {
     this._workerData = workerData;
     this._index = workerData.index;
     this._workerScript = workerScript;
     if (Configuration.useWorkerPool()) {
-      this._concurrentWorkers = Configuration.getWorkerPoolSize();
-      WorkerPool.concurrentWorkers = this._concurrentWorkers;
-    } else {
-      this._concurrentWorkers = numConcurrentWorkers;
+      WorkerPool.maxConcurrentWorkers = Configuration.getWorkerPoolSize();
     }
+    this._maxWorkerElements = maxWorkerElements;
   }
 
   /**
    * @return {number}
    * @public
    */
-  public get concurrentWorkers(): number {
-    return this._concurrentWorkers;
+  public get maxWorkerElements(): number {
+    return this._maxWorkerElements;
   }
 
   /**
@@ -58,11 +56,14 @@ export default class Wrk {
    * @return {void}
    * @public
    */
-  addChargingStation(workerData: WorkerData, numConcurrentWorkers: number): void {
+  addWorkerElement(workerData: WorkerData): void {
+    // FIXME: also forbid to add an element if the current number of elements > max number of elements
+    if (Configuration.useWorkerPool()) {
+      return;
+    }
     this._workerData = workerData;
     this._index = workerData.index;
-    this._concurrentWorkers = numConcurrentWorkers;
-    this._worker.postMessage({ id : Constants.START_CHARGING_STATION, workerData: workerData });
+    this._worker.postMessage({ id : Constants.START_WORKER_ELEMENT, workerData: workerData });
   }
 
   /**
@@ -104,14 +105,14 @@ export default class Wrk {
 }
 
 class WorkerPool {
-  public static concurrentWorkers: number;
+  public static maxConcurrentWorkers: number;
   private static _instance: Pool;
 
   private constructor() { }
 
   public static getInstance(): Pool {
-    if (!WorkerPool._instance || (WorkerPool._instance?.size === WorkerPool.concurrentWorkers)) {
-      WorkerPool._instance = new Pool({ max: WorkerPool.concurrentWorkers });
+    if (!WorkerPool._instance) {
+      WorkerPool._instance = new Pool({ max: WorkerPool.maxConcurrentWorkers });
     }
     return WorkerPool._instance;
   }
index 67df5a3f44b1538ebabc3eeb4f79d5c87afaf6a7..3b9fc75b287c7c8d9eff1296dd0eb720ddb09fe0 100644 (file)
@@ -10,8 +10,6 @@ class Bootstrap {
       let numStationsTotal = 0;
       let numConcurrentWorkers = 0;
       let worker: Wrk;
-      const chargingStationsPerWorker = Configuration.getChargingStationsPerWorker();
-      let counter = 0;
       // Start each ChargingStation object in a worker thread
       if (Configuration.getStationTemplateURLs()) {
         for await (const stationURL of Configuration.getStationTemplateURLs()) {
@@ -23,19 +21,27 @@ class Bootstrap {
                 index,
                 templateFile: stationURL.file
               } as WorkerData;
-              if (counter === 0 || counter === chargingStationsPerWorker) {
-                // Start new worker with one charging station
-                worker = new Wrk('./dist/charging-station/StationWorker.js', workerData, numStationsTotal);
+              if (Configuration.useWorkerPool()) {
+                worker = new Wrk('./dist/charging-station/StationWorker.js', workerData);
                 worker.start().catch(() => { });
-                counter = 0;
-                // Start workers sequentially to optimize memory at start time
-                await Utils.sleep(Constants.START_WORKER_DELAY);
+                numConcurrentWorkers = Configuration.getWorkerPoolSize();
               } else {
-                // Add charging station to existing Worker
-                worker.addChargingStation(workerData, numStationsTotal);
+                const chargingStationsPerWorker = Configuration.getChargingStationsPerWorker();
+                let chargingStationsPerWorkerCounter = 0;
+                if (chargingStationsPerWorkerCounter === 0 || chargingStationsPerWorkerCounter === chargingStationsPerWorker) {
+                  // Start new Wrk with one charging station
+                  worker = new Wrk('./dist/charging-station/StationWorker.js', workerData, chargingStationsPerWorker);
+                  worker.start().catch(() => { });
+                  numConcurrentWorkers++;
+                  chargingStationsPerWorkerCounter = 1;
+                  // Start Wrk sequentially to optimize memory at start time
+                  await Utils.sleep(Constants.START_WORKER_DELAY);
+                } else {
+                  // Add charging station to existing Wrk
+                  worker.addWorkerElement(workerData);
+                  chargingStationsPerWorkerCounter++;
+                }
               }
-              counter++;
-              numConcurrentWorkers = worker.concurrentWorkers;
             }
           } catch (error) {
             // eslint-disable-next-line no-console
index 73ff1f8cedc92443270c2d974451493e865900a1..a7bc2c272d528586c2ba531c6c281bcf701f3ddd 100644 (file)
@@ -1,3 +1,4 @@
+// FIXME: make it more generic
 export default interface WorkerData {
   index: number;
   templateFile: string;
index bad0858d8b55db289b201e8e74a1cd2d75a6c478..e6d103862c3f8182fc925b9deb931893043e501b 100644 (file)
@@ -38,5 +38,5 @@ export default class Constants {
   static readonly MAX_LISTENERS = 1000;
 
   static readonly START_WORKER_DELAY = 500;
-  static readonly START_CHARGING_STATION = 'startChargingStation';
+  static readonly START_WORKER_ELEMENT = 'startWorkerElement';
 }