Memory optimization + worker configuration
[e-mobility-charging-stations-simulator.git] / src / charging-station / Worker.ts
index 93aade5046b44284ee4604e1b17df0e3a9f5d46d..fbfdf3d1f047fa1886febf126750265ae73c5d79 100644 (file)
@@ -1,38 +1,41 @@
+import { Worker, WorkerOptions } from 'worker_threads';
+
 import Configuration from '../utils/Configuration';
 import Pool from 'worker-threads-pool';
-import { Worker } from 'worker_threads';
+import WorkerData from '../types/WorkerData';
+import Constants from '../utils/Constants';
 
 export default class Wrk {
-  private _workerData;
-  private _workerScript;
-  private _pool;
+  private _workerScript: string;
+  private _workerData: WorkerData;
+  private _index: number;
   private _concurrentWorkers: number;
+  private _worker: Worker;
 
   /**
    * Create a new `Wrk`.
    *
-   * @param {String} workerScript
-   * @param {Object} workerData
-   * @param {Number} numConcurrentWorkers
+   * @param {string} workerScript
+   * @param {WorkerData} workerData
+   * @param {number} numConcurrentWorkers
    */
-  constructor(workerScript, workerData, numConcurrentWorkers) {
+  constructor(workerScript: string, workerData: WorkerData, numConcurrentWorkers: number) {
     this._workerData = workerData;
+    this._index = workerData.index;
     this._workerScript = workerScript;
-    this._numConcurrentWorkers = numConcurrentWorkers;
     if (Configuration.useWorkerPool()) {
-      this._pool = new Pool({ max: Configuration.getWorkerPoolSize() });
+      this._concurrentWorkers = Configuration.getWorkerPoolSize();
+      WorkerPool.concurrentWorkers = this._concurrentWorkers;
+    } else {
+      this._concurrentWorkers = numConcurrentWorkers;
     }
   }
 
   /**
-   * @param {Number} numConcurrentWorkers
-   * @private
+   * @return {number}
+   * @public
    */
-  set _numConcurrentWorkers(numConcurrentWorkers: number) {
-    this._concurrentWorkers = numConcurrentWorkers;
-  }
-
-  get _numConcurrentWorkers(): number {
+  public get concurrentWorkers(): number {
     return this._concurrentWorkers;
   }
 
@@ -41,11 +44,25 @@ export default class Wrk {
    * @return {Promise}
    * @public
    */
-  async start(): Promise<unknown> {
+  async start(): Promise<Worker> {
     if (Configuration.useWorkerPool()) {
-      return this._startWorkerWithPool();
+      this._startWorkerWithPool();
+    } else {
+      this._startWorker();
     }
-    return this._startWorker();
+    return this._worker;
+  }
+
+    /**
+   *
+   * @return {Promise}
+   * @public
+   */
+  async startNewChargingStation(workerData: WorkerData, numConcurrentWorkers: number): Promise<void> {
+    this._workerData = workerData;
+    this._index = workerData.index;
+    this._concurrentWorkers = numConcurrentWorkers;
+    this._worker.postMessage({ id : Constants.START_NEW_CHARGING_STATION, workerData: workerData });
   }
 
   /**
@@ -55,12 +72,13 @@ export default class Wrk {
    */
   private async _startWorkerWithPool() {
     return new Promise((resolve, reject) => {
-      this._pool.acquire(this._workerScript, { workerData: this._workerData }, (err, worker) => {
+      WorkerPool.acquire(this._workerScript, { workerData: this._workerData }, (err, worker) => {
         if (err) {
           return reject(err);
         }
         worker.once('message', resolve);
         worker.once('error', reject);
+        this._worker = worker;
       });
     });
   }
@@ -77,9 +95,28 @@ export default class Wrk {
       worker.on('error', reject);
       worker.on('exit', (code) => {
         if (code !== 0) {
-          reject(new Error(`Worker stopped with exit code ${code}`));
+          reject(new Error(`Worker id ${this._index} stopped with exit code ${code}`));
         }
       });
+      this._worker = worker;
     });
   }
 }
+
+class WorkerPool {
+  public static concurrentWorkers: number;
+  private static _instance: Pool;
+
+  private constructor() { }
+
+  public static getInstance(): Pool {
+    if (!WorkerPool._instance || (WorkerPool._instance?.size === WorkerPool.concurrentWorkers)) {
+      WorkerPool._instance = new Pool({ max: WorkerPool.concurrentWorkers });
+    }
+    return WorkerPool._instance;
+  }
+
+  public static acquire(filename: string, options: WorkerOptions, callback: (error: Error | null, worker: Worker) => void): void {
+    WorkerPool.getInstance().acquire(filename, options, callback);
+  }
+}