Ensure performance statistics is started before connection to the OCPP
[e-mobility-charging-stations-simulator.git] / src / worker / WorkerSet.ts
1 import { WorkerEvents, WorkerSetElement } from '../types/Worker';
2
3 import Utils from '../utils/Utils';
4 import { Worker } from 'worker_threads';
5 import WorkerAbstract from './WorkerAbstract';
6 import { WorkerUtils } from './WorkerUtils';
7
8 export default class WorkerSet<T> extends WorkerAbstract {
9 public maxElementsPerWorker: number;
10 private workerSet: Set<WorkerSetElement>;
11
12 /**
13 * Create a new `WorkerSet`.
14 *
15 * @param {string} workerScript
16 * @param {number} maxElementsPerWorker
17 * @param {number} workerStartDelay
18 */
19 constructor(workerScript: string, maxElementsPerWorker = 1, workerStartDelay?: number) {
20 super(workerScript, workerStartDelay);
21 this.workerSet = new Set<WorkerSetElement>();
22 this.maxElementsPerWorker = maxElementsPerWorker;
23 }
24
25 get size(): number {
26 return this.workerSet.size;
27 }
28
29 /**
30 *
31 * @param {T} elementData
32 * @returns {Promise<void>}
33 * @public
34 */
35 public async addElement(elementData: T): Promise<void> {
36 if (!this.workerSet) {
37 throw new Error('Cannot add a WorkerSet element: workers\' set does not exist');
38 }
39 if (this.getLastWorkerSetElement().numberOfWorkerElements >= this.maxElementsPerWorker) {
40 this.startWorker();
41 // Start worker sequentially to optimize memory at startup
42 await Utils.sleep(this.workerStartDelay);
43 }
44 this.getLastWorker().postMessage({ id: WorkerEvents.START_WORKER_ELEMENT, workerData: elementData });
45 this.getLastWorkerSetElement().numberOfWorkerElements++;
46 }
47
48 /**
49 *
50 * @returns {Promise<void>}
51 * @public
52 */
53 public async start(): Promise<void> {
54 this.startWorker();
55 // Start worker sequentially to optimize memory at startup
56 await Utils.sleep(this.workerStartDelay);
57 }
58
59 /**
60 *
61 * @returns {Promise<void>}
62 * @public
63 */
64 public async stop(): Promise<void> {
65 for (const workerSetElement of this.workerSet) {
66 await workerSetElement.worker.terminate();
67 }
68 this.workerSet.clear();
69 }
70
71 /**
72 *
73 * @private
74 */
75 private startWorker(): void {
76 const worker = new Worker(this.workerScript);
77 worker.on('message', () => { });
78 worker.on('error', () => { });
79 worker.on('exit', (code) => {
80 WorkerUtils.defaultExitHandler(code);
81 this.workerSet.delete(this.getWorkerSetElementByWorker(worker));
82 });
83 this.workerSet.add({ worker, numberOfWorkerElements: 0 });
84 }
85
86 private getLastWorkerSetElement(): WorkerSetElement {
87 let workerSetElement: WorkerSetElement;
88 // eslint-disable-next-line no-empty
89 for (workerSetElement of this.workerSet) { }
90 return workerSetElement;
91 }
92
93 private getLastWorker(): Worker {
94 return this.getLastWorkerSetElement().worker;
95 }
96
97 private getWorkerSetElementByWorker(worker: Worker): WorkerSetElement {
98 let workerSetElt: WorkerSetElement;
99 this.workerSet.forEach((workerSetElement) => {
100 if (workerSetElement.worker.threadId === worker.threadId) {
101 workerSetElt = workerSetElement;
102 }
103 });
104 return workerSetElt;
105 }
106 }