remove setMaxListener (not longer needed)
[e-mobility-charging-stations-simulator.git] / src / charging-station / Worker.ts
CommitLineData
f98fbdb9
JB
1import { Worker, WorkerOptions } from 'worker_threads';
2
6af9012e 3import Configuration from '../utils/Configuration';
5fdab605 4import Constants from '../utils/Constants';
3f40bc9c 5import Pool from 'worker-threads-pool';
ad3de6c4 6import WorkerData from '../types/WorkerData';
7dde0b73 7
3f40bc9c 8export default class Wrk {
ad3de6c4
JB
9 private _workerScript: string;
10 private _workerData: WorkerData;
3d2ff9e4 11 private _worker: Worker;
6af9012e 12
7dde0b73
JB
13 /**
14 * Create a new `Wrk`.
15 *
ad3de6c4
JB
16 * @param {string} workerScript
17 * @param {WorkerData} workerData
7dde0b73 18 */
4faad557 19 constructor(workerScript: string, workerData: WorkerData) {
6798437b
JB
20 this._workerData = workerData;
21 this._workerScript = workerScript;
7dde0b73 22 if (Configuration.useWorkerPool()) {
97ed5cec 23 WorkerPool.maxConcurrentWorkers = Configuration.getWorkerPoolSize();
7dde0b73 24 }
7dde0b73
JB
25 }
26
6af9012e
JB
27 /**
28 *
29 * @return {Promise}
30 * @public
31 */
3d2ff9e4 32 async start(): Promise<Worker> {
6af9012e 33 if (Configuration.useWorkerPool()) {
5fdab605 34 await this._startWorkerWithPool();
3d2ff9e4 35 } else {
5fdab605 36 await this._startWorker();
6af9012e 37 }
3d2ff9e4
J
38 return this._worker;
39 }
40
5fdab605 41 /**
3d2ff9e4 42 *
5fdab605 43 * @return {void}
3d2ff9e4
J
44 * @public
45 */
97ed5cec
JB
46 addWorkerElement(workerData: WorkerData): void {
47 // FIXME: also forbid to add an element if the current number of elements > max number of elements
48 if (Configuration.useWorkerPool()) {
49 return;
50 }
3d2ff9e4 51 this._workerData = workerData;
97ed5cec 52 this._worker.postMessage({ id : Constants.START_WORKER_ELEMENT, workerData: workerData });
6af9012e
JB
53 }
54
7dde0b73
JB
55 /**
56 *
57 * @return {Promise}
58 * @private
59 */
6af9012e 60 private async _startWorkerWithPool() {
7dde0b73 61 return new Promise((resolve, reject) => {
f98fbdb9 62 WorkerPool.acquire(this._workerScript, { workerData: this._workerData }, (err, worker) => {
7dde0b73
JB
63 if (err) {
64 return reject(err);
65 }
66 worker.once('message', resolve);
67 worker.once('error', reject);
3d2ff9e4 68 this._worker = worker;
7dde0b73
JB
69 });
70 });
71 }
72
73 /**
74 *
75 * @return {Promise}
76 * @private
77 */
6af9012e 78 private async _startWorker() {
7dde0b73 79 return new Promise((resolve, reject) => {
6af9012e 80 const worker = new Worker(this._workerScript, { workerData: this._workerData });
7dde0b73
JB
81 worker.on('message', resolve);
82 worker.on('error', reject);
83 worker.on('exit', (code) => {
84 if (code !== 0) {
4faad557 85 reject(new Error(`Worker stopped with exit code ${code}`));
7dde0b73
JB
86 }
87 });
3d2ff9e4 88 this._worker = worker;
7dde0b73
JB
89 });
90 }
7dde0b73 91}
f98fbdb9
JB
92
93class WorkerPool {
97ed5cec 94 public static maxConcurrentWorkers: number;
f98fbdb9
JB
95 private static _instance: Pool;
96
97 private constructor() { }
98
99 public static getInstance(): Pool {
97ed5cec
JB
100 if (!WorkerPool._instance) {
101 WorkerPool._instance = new Pool({ max: WorkerPool.maxConcurrentWorkers });
f98fbdb9
JB
102 }
103 return WorkerPool._instance;
104 }
105
106 public static acquire(filename: string, options: WorkerOptions, callback: (error: Error | null, worker: Worker) => void): void {
107 WorkerPool.getInstance().acquire(filename, options, callback);
108 }
109}