Use object factory design pattern for code handling workers.
authorJérôme Benoit <jerome.benoit@sap.com>
Sun, 24 Jan 2021 23:44:30 +0000 (00:44 +0100)
committerJérôme Benoit <jerome.benoit@sap.com>
Sun, 24 Jan 2021 23:44:30 +0000 (00:44 +0100)
Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
src/charging-station/ChargingStation.ts
src/charging-station/StationWorker.ts
src/start.ts
src/worker/Worker.ts
src/worker/WorkerFactory.ts [new file with mode: 0644]
src/worker/WorkerGroup.ts [deleted file]
src/worker/WorkerPool.ts
src/worker/WorkerSet.ts [new file with mode: 0644]

index 1aa3e3b6793e8f0c96aa96e95574f06720875c1a..f646a32c092240f3b3d84df64b6bce709819795b 100644 (file)
@@ -74,9 +74,7 @@ export default class ChargingStation {
     // In case of multiple instances: add instance index to charging station id
     let instanceIndex = process.env.CF_INSTANCE_INDEX ? process.env.CF_INSTANCE_INDEX : 0;
     instanceIndex = instanceIndex > 0 ? instanceIndex : '';
-
     const idSuffix = stationTemplate.nameSuffix ? stationTemplate.nameSuffix : '';
-
     return stationTemplate.fixedName ? stationTemplate.baseName : stationTemplate.baseName + '-' + instanceIndex.toString() + ('000000000' + this.index.toString()).substr(('000000000' + this.index.toString()).length - 4) + idSuffix;
   }
 
index 513265e2bb0679323adbbca9c2959c20d94b04be..32c75ca79724e114daad68c9cfb08e8459a92d70 100644 (file)
@@ -2,12 +2,14 @@ import { isMainThread, parentPort, workerData } from 'worker_threads';
 
 import ChargingStation from './ChargingStation';
 import Constants from '../utils/Constants';
+import Utils from '../utils/Utils';
 
 if (!isMainThread) {
-  startChargingStation({ index: workerData.index as number, templateFile: workerData.templateFile as string });
-
-  // Listener: start new charging station from main thread
+  // Add listener to start charging station from main thread
   addListener();
+  if (!Utils.isUndefined(workerData)) {
+    startChargingStation({ index: workerData.index as number, templateFile: workerData.templateFile as string });
+  }
 }
 
 function addListener() {
index 591ef96f0b6d186ede5a40053b7d1a466f3611a7..aa3cfdeff96af62b59895cc9cefab0dbed60fa4f 100644 (file)
@@ -1,23 +1,15 @@
 import Configuration from './utils/Configuration';
-import Constants from './utils/Constants';
-import Utils from './utils/Utils';
 import WorkerData from './types/WorkerData';
-import WorkerGroup from './worker/WorkerGroup';
-import WorkerPool from './worker/WorkerPool';
+import WorkerFactory from './worker/WorkerFactory';
+import Wrk from './worker/Worker';
 
 class Bootstrap {
-  static async start() {
+  static start() {
     try {
       let numStationsTotal = 0;
-      let numConcurrentWorkers = 0;
-      const chargingStationsPerWorker = Configuration.getChargingStationsPerWorker();
-      let chargingStationsPerWorkerCounter = 0;
-      let workerImplementation: WorkerGroup | WorkerPool;
-      if (Configuration.useWorkerPool()) {
-        workerImplementation = new WorkerPool('./dist/charging-station/StationWorker.js');
-        void workerImplementation.start();
-      }
-      // Start each ChargingStation object in a worker thread
+      const workerImplementation: Wrk = WorkerFactory.getWorkerImpl('./dist/charging-station/StationWorker.js');
+      void workerImplementation.start();
+      // Start ChargingStation object in worker thread
       if (Configuration.getStationTemplateURLs()) {
         for (const stationURL of Configuration.getStationTemplateURLs()) {
           try {
@@ -27,32 +19,12 @@ class Bootstrap {
                 index,
                 templateFile: stationURL.file
               };
-              if (Configuration.useWorkerPool()) {
-                void workerImplementation.addElement(workerData);
-                numConcurrentWorkers = workerImplementation.size;
-                // Start worker sequentially to optimize memory at start time
-                await Utils.sleep(Constants.START_WORKER_DELAY);
-              } else {
-                // eslint-disable-next-line no-lonely-if
-                if (chargingStationsPerWorkerCounter === 0 || chargingStationsPerWorkerCounter >= chargingStationsPerWorker) {
-                  // Start new WorkerGroup with one charging station
-                  workerImplementation = new WorkerGroup('./dist/charging-station/StationWorker.js', workerData, chargingStationsPerWorker);
-                  void workerImplementation.start();
-                  numConcurrentWorkers++;
-                  chargingStationsPerWorkerCounter = 1;
-                  // Start worker sequentially to optimize memory at start time
-                  await Utils.sleep(Constants.START_WORKER_DELAY);
-                } else {
-                  // Add charging station to existing WorkerGroup
-                  void workerImplementation.addElement(workerData);
-                  chargingStationsPerWorkerCounter++;
-                }
-              }
+              void workerImplementation.addElement(workerData);
               numStationsTotal++;
             }
           } catch (error) {
             // eslint-disable-next-line no-console
-            console.log('Charging station start with template file ' + stationURL.file + ' error ' + JSON.stringify(error, null, ' '));
+            console.error('Charging station start with template file ' + stationURL.file + ' error ', error);
           }
         }
       } else {
@@ -60,20 +32,14 @@ class Bootstrap {
       }
       if (numStationsTotal === 0) {
         console.log('No charging station template enabled in configuration, exiting');
-      } else if (Configuration.useWorkerPool()) {
-        console.log(`Charging station simulator started with ${numStationsTotal.toString()} charging station(s) and ${numConcurrentWorkers.toString()}/${Configuration.getWorkerPoolMaxSize().toString()} worker(s) concurrently running`);
       } else {
-        console.log(`Charging station simulator started with ${numStationsTotal.toString()} charging station(s) and ${numConcurrentWorkers.toString()} worker(s) concurrently running (${chargingStationsPerWorker} charging station(s) per worker)`);
+        console.log(`Charging station simulator started with ${numStationsTotal.toString()} charging station(s) and ${workerImplementation.size}${Configuration.useWorkerPool() ? `/${Configuration.getWorkerPoolMaxSize().toString()}` : ''} worker(s) concurrently running (${workerImplementation.maxElementsPerWorker} charging station(s) per worker)`);
       }
     } catch (error) {
       // eslint-disable-next-line no-console
-      console.log('Bootstrap start error ' + JSON.stringify(error, null, ' '));
+      console.error('Bootstrap start error ', error);
     }
   }
 }
 
-Bootstrap.start().catch(
-  (error) => {
-    console.error(error);
-  }
-);
+Bootstrap.start();
index 54dcb1562df2a62846284d02c9e444e034cbc736..bea6d67f9b6bfdf86756cb060ad42d812ecf44e4 100644 (file)
@@ -3,6 +3,7 @@ import WorkerData from '../types/WorkerData';
 export default abstract class Wrk {
   protected workerScript: string;
   public abstract size: number;
+  public abstract maxElementsPerWorker: number;
 
   /**
    * Create a new `Wrk`.
@@ -14,5 +15,5 @@ export default abstract class Wrk {
   }
 
   public abstract start(): Promise<void>;
-  public abstract addElement(elementData: WorkerData): void;
+  public abstract addElement(elementData: WorkerData): Promise<void>;
 }
diff --git a/src/worker/WorkerFactory.ts b/src/worker/WorkerFactory.ts
new file mode 100644 (file)
index 0000000..13880b2
--- /dev/null
@@ -0,0 +1,13 @@
+import Configuration from '../utils/Configuration';
+import WorkerPool from './WorkerPool';
+import WorkerSet from './WorkerSet';
+import Wrk from './Worker';
+
+export default class WorkerFactory {
+  public static getWorkerImpl(workerScript: string): Wrk {
+    if (Configuration.useWorkerPool()) {
+      return new WorkerPool(workerScript);
+    }
+    return new WorkerSet(workerScript, Configuration.getChargingStationsPerWorker());
+  }
+}
diff --git a/src/worker/WorkerGroup.ts b/src/worker/WorkerGroup.ts
deleted file mode 100644 (file)
index 4f5ad32..0000000
+++ /dev/null
@@ -1,79 +0,0 @@
-import Configuration from '../utils/Configuration';
-import Constants from '../utils/Constants';
-import { Worker } from 'worker_threads';
-import WorkerData from '../types/WorkerData';
-import Wrk from './Worker';
-
-export default class WorkerGroup extends Wrk {
-  private worker: Worker;
-  private lastElementData: WorkerData;
-  private maxWorkerElements: number;
-  private numWorkerElements: number;
-
-  /**
-   * Create a new `WorkerGroup`.
-   *
-   * @param {string} workerScript
-   * @param {WorkerData} workerData
-   * @param {number} maxWorkerElements
-   */
-  constructor(workerScript: string, initialElementData: WorkerData, maxWorkerElements = 1) {
-    super(workerScript);
-    this.lastElementData = initialElementData;
-    this.maxWorkerElements = maxWorkerElements;
-    this.numWorkerElements = 0;
-  }
-
-  get size(): number {
-    return this.numWorkerElements;
-  }
-
-  /**
-   *
-   * @return {void}
-   * @public
-   */
-  public addElement(elementData: WorkerData): void {
-    if (Configuration.useWorkerPool()) {
-      throw Error('Cannot add a WorkerGroup element: the worker pool is enabled in configuration');
-    }
-    if (!this.worker) {
-      throw Error('Cannot add a WorkerGroup element: worker does not exist');
-    }
-    if (this.numWorkerElements >= this.maxWorkerElements) {
-      throw Error('Cannot add a WorkerGroup element: max number of elements per worker reached');
-    }
-    this.lastElementData = elementData;
-    this.worker.postMessage({ id: Constants.START_WORKER_ELEMENT, workerData: this.lastElementData });
-    this.numWorkerElements++;
-  }
-
-  /**
-   *
-   * @return {Promise<Worker>}
-   * @public
-   */
-  public async start(): Promise<void> {
-    await this.startWorker();
-  }
-
-  /**
-   *
-   * @return {Promise}
-   * @private
-   */
-  private async startWorker() {
-    return new Promise((resolve, reject) => {
-      const worker = new Worker(this.workerScript, { workerData: this.lastElementData });
-      worker.on('message', resolve);
-      worker.on('error', reject);
-      worker.on('exit', (code) => {
-        if (code !== 0) {
-          reject(new Error(`Worker stopped with exit code ${code}`));
-        }
-      });
-      this.numWorkerElements++;
-      this.worker = worker;
-    });
-  }
-}
index 76910f82ea69907bdadc8f84d0b8bcbf6593ecac..d04926e7d78a30a3e338c9fd9446eb8fbb148022 100644 (file)
@@ -1,5 +1,7 @@
 import Configuration from '../utils/Configuration';
+import Constants from '../utils/Constants';
 import Pool from 'worker-threads-pool';
+import Utils from '../utils/Utils';
 import WorkerData from '../types/WorkerData';
 import Wrk from './Worker';
 
@@ -20,16 +22,21 @@ export default class WorkerPool extends Wrk {
     return this.pool.size;
   }
 
+  get maxElementsPerWorker(): number {
+    return 1;
+  }
+
   /**
    *
    * @return {Promise<void>}
    * @public
    */
+  // eslint-disable-next-line @typescript-eslint/no-empty-function
   public async start(): Promise<void> { }
 
   /**
    *
-   * @return {Promise}
+   * @return {Promise<void>}
    * @public
    */
   public async addElement(elementData: WorkerData): Promise<void> {
@@ -41,6 +48,8 @@ export default class WorkerPool extends Wrk {
         worker.once('message', resolve);
         worker.once('error', reject);
       });
+      // Start worker sequentially to optimize memory at startup
+      void Utils.sleep(Constants.START_WORKER_DELAY);
     });
   }
 }
diff --git a/src/worker/WorkerSet.ts b/src/worker/WorkerSet.ts
new file mode 100644 (file)
index 0000000..a8aa8a3
--- /dev/null
@@ -0,0 +1,84 @@
+import Constants from '../utils/Constants';
+import Utils from '../utils/Utils';
+import { Worker } from 'worker_threads';
+import WorkerData from '../types/WorkerData';
+import Wrk from './Worker';
+
+export default class WorkerSet extends Wrk {
+  public maxElementsPerWorker: number;
+  private workers: Set<Worker>;
+  private lastWorkerNumberOfElements: number;
+
+  /**
+   * Create a new `WorkerSet`.
+   *
+   * @param {string} workerScript
+   * @param {number} maxElementsPerWorker
+   */
+  constructor(workerScript: string, maxElementsPerWorker = 1) {
+    super(workerScript);
+    this.workers = new Set<Worker>();
+    this.maxElementsPerWorker = maxElementsPerWorker;
+    this.lastWorkerNumberOfElements = 0;
+  }
+
+  get size(): number {
+    return this.workers.size;
+  }
+
+  /**
+   *
+   * @return {Promise<void>}
+   * @public
+   */
+  public async addElement(elementData: WorkerData): Promise<void> {
+    if (!this.workers) {
+      throw Error('Cannot add a WorkerSet element: workers set does not exist');
+    }
+    if (this.lastWorkerNumberOfElements >= this.maxElementsPerWorker) {
+      void this.startWorker();
+      this.lastWorkerNumberOfElements = 0;
+      // Start worker sequentially to optimize memory at startup
+      void Utils.sleep(Constants.START_WORKER_DELAY);
+    }
+    this.getLastWorker().postMessage({ id: Constants.START_WORKER_ELEMENT, workerData: elementData });
+    this.lastWorkerNumberOfElements++;
+  }
+
+  /**
+   *
+   * @return {Promise<void>}
+   * @public
+   */
+  public async start(): Promise<void> {
+    await this.startWorker();
+    // Start worker sequentially to optimize memory at startup
+    await Utils.sleep(Constants.START_WORKER_DELAY);
+  }
+
+  /**
+   *
+   * @return {Promise}
+   * @private
+   */
+  private async startWorker() {
+    return new Promise((resolve, reject) => {
+      const worker = new Worker(this.workerScript);
+      worker.on('message', resolve);
+      worker.on('error', reject);
+      worker.on('exit', (code) => {
+        if (code !== 0) {
+          reject(new Error(`Worker stopped with exit code ${code}`));
+        }
+      });
+      this.workers.add(worker);
+    });
+  }
+
+  private getLastWorker(): Worker {
+    let worker: Worker;
+    // eslint-disable-next-line no-empty
+    for (worker of this.workers) { }
+    return worker;
+  }
+}