Refine comment.
[e-mobility-charging-stations-simulator.git] / src / charging-station / Worker.ts
1 import { Worker, WorkerOptions } from 'worker_threads';
2
3 import Configuration from '../utils/Configuration';
4 import Constants from '../utils/Constants';
5 import Pool from 'worker-threads-pool';
6 import WorkerData from '../types/WorkerData';
7
8 export default class Wrk {
9 private _workerScript: string;
10 private _workerData: WorkerData;
11 private _worker: Worker;
12 private _maxWorkerElements: number;
13 private _numWorkerElements: number;
14
15 /**
16 * Create a new `Wrk`.
17 *
18 * @param {string} workerScript
19 * @param {WorkerData} workerData
20 * @param {number} maxWorkerElements
21 */
22 constructor(workerScript: string, workerData: WorkerData, maxWorkerElements = 1) {
23 this._workerData = workerData;
24 this._workerScript = workerScript;
25 this._maxWorkerElements = maxWorkerElements;
26 this._numWorkerElements = 0;
27 if (Configuration.useWorkerPool()) {
28 WorkerPool.maxConcurrentWorkers = Configuration.getWorkerPoolMaxSize();
29 }
30 }
31
32 /**
33 *
34 * @return {Promise}
35 * @public
36 */
37 async start(): Promise<Worker> {
38 if (Configuration.useWorkerPool()) {
39 await this._startWorkerWithPool();
40 } else {
41 await this._startWorker();
42 }
43 return this._worker;
44 }
45
46 /**
47 *
48 * @return {void}
49 * @public
50 */
51 addWorkerElement(workerData: WorkerData): void {
52 if (Configuration.useWorkerPool()) {
53 throw Error('Cannot add Wrk element if the worker pool is enabled');
54 }
55 if (this._numWorkerElements >= this._maxWorkerElements) {
56 throw Error('Cannot add Wrk element: max number of elements per worker reached');
57 }
58 this._workerData = workerData;
59 this._worker.postMessage({ id: Constants.START_WORKER_ELEMENT, workerData: workerData });
60 this._numWorkerElements++;
61 }
62
63 /**
64 *
65 * @return {number}
66 * @public
67 */
68 public getWorkerPoolSize(): number {
69 if (Configuration.useWorkerPool()) {
70 return WorkerPool.getPoolSize();
71 }
72 }
73
74 /**
75 *
76 * @return {Promise}
77 * @private
78 */
79 private async _startWorkerWithPool() {
80 return new Promise((resolve, reject) => {
81 WorkerPool.acquire(this._workerScript, { workerData: this._workerData }, (err, worker) => {
82 if (err) {
83 return reject(err);
84 }
85 worker.once('message', resolve);
86 worker.once('error', reject);
87 this._worker = worker;
88 });
89 });
90 }
91
92 /**
93 *
94 * @return {Promise}
95 * @private
96 */
97 private async _startWorker() {
98 return new Promise((resolve, reject) => {
99 const worker = new Worker(this._workerScript, { workerData: this._workerData });
100 worker.on('message', resolve);
101 worker.on('error', reject);
102 worker.on('exit', (code) => {
103 if (code !== 0) {
104 reject(new Error(`Worker stopped with exit code ${code}`));
105 }
106 });
107 this._numWorkerElements++;
108 this._worker = worker;
109 });
110 }
111 }
112
113
114 class WorkerPool {
115 public static maxConcurrentWorkers: number;
116 private static _instance: Pool;
117
118 private constructor() { }
119
120 public static getInstance(): Pool {
121 if (!WorkerPool._instance) {
122 WorkerPool._instance = new Pool({ max: WorkerPool.maxConcurrentWorkers });
123 }
124 return WorkerPool._instance;
125 }
126
127 public static acquire(filename: string, options: WorkerOptions, callback: (error: Error | null, worker: Worker) => void): void {
128 WorkerPool.getInstance().acquire(filename, options, callback);
129 }
130
131 public static getPoolSize(): number {
132 return WorkerPool.getInstance().size;
133 }
134 }