renaming (pool size != max pool size)
[e-mobility-charging-stations-simulator.git] / src / charging-station / Worker.ts
1 import { Worker, WorkerOptions } from 'worker_threads';
2
3 import Configuration from '../utils/Configuration';
4 import Constants from '../utils/Constants';
5 import Pool from 'worker-threads-pool';
6 import WorkerData from '../types/WorkerData';
7
8 export default class Wrk {
9 private _workerScript: string;
10 private _workerData: WorkerData;
11 private _worker: Worker;
12
13 /**
14 * Create a new `Wrk`.
15 *
16 * @param {string} workerScript
17 * @param {WorkerData} workerData
18 */
19 constructor(workerScript: string, workerData: WorkerData) {
20 this._workerData = workerData;
21 this._workerScript = workerScript;
22 if (Configuration.useWorkerPool()) {
23 WorkerPool.maxConcurrentWorkers = Configuration.getWorkerMaxPoolSize();
24 }
25 }
26
27 /**
28 *
29 * @return {Promise}
30 * @public
31 */
32 async start(): Promise<Worker> {
33 if (Configuration.useWorkerPool()) {
34 await this._startWorkerWithPool();
35 } else {
36 await this._startWorker();
37 }
38 return this._worker;
39 }
40
41 /**
42 *
43 * @return {void}
44 * @public
45 */
46 addWorkerElement(workerData: WorkerData): void {
47 // FIXME: also forbid to add an element if the current number of elements > max number of elements
48 if (Configuration.useWorkerPool()) {
49 return;
50 }
51 this._workerData = workerData;
52 this._worker.postMessage({ id : Constants.START_WORKER_ELEMENT, workerData: workerData });
53 }
54
55 /**
56 *
57 * @return {Promise}
58 * @private
59 */
60 private async _startWorkerWithPool() {
61 return new Promise((resolve, reject) => {
62 WorkerPool.acquire(this._workerScript, { workerData: this._workerData }, (err, worker) => {
63 if (err) {
64 return reject(err);
65 }
66 worker.once('message', resolve);
67 worker.once('error', reject);
68 this._worker = worker;
69 });
70 });
71 }
72
73 /**
74 *
75 * @return {Promise}
76 * @private
77 */
78 private async _startWorker() {
79 return new Promise((resolve, reject) => {
80 const worker = new Worker(this._workerScript, { workerData: this._workerData });
81 worker.on('message', resolve);
82 worker.on('error', reject);
83 worker.on('exit', (code) => {
84 if (code !== 0) {
85 reject(new Error(`Worker stopped with exit code ${code}`));
86 }
87 });
88 this._worker = worker;
89 });
90 }
91
92 /**
93 *
94 * @return {number}
95 * @public
96 */
97 public getPoolSize(): number {
98 return WorkerPool.getPoolSize();
99 }
100 }
101
102
103 class WorkerPool {
104 public static maxConcurrentWorkers: number;
105 private static _instance: Pool;
106
107 private constructor() { }
108
109 public static getInstance(): Pool {
110 if (!WorkerPool._instance) {
111 WorkerPool._instance = new Pool({ max: WorkerPool.maxConcurrentWorkers });
112 }
113 return WorkerPool._instance;
114 }
115
116 public static acquire(filename: string, options: WorkerOptions, callback: (error: Error | null, worker: Worker) => void): void {
117 WorkerPool.getInstance().acquire(filename, options, callback);
118 }
119
120 public static getPoolSize(): number {
121 return WorkerPool._instance.size;
122 }
123 }