Add git submodules init script.
[e-mobility-charging-stations-simulator.git] / src / charging-station / Worker.ts
CommitLineData
f98fbdb9
JB
1import { Worker, WorkerOptions } from 'worker_threads';
2
6af9012e 3import Configuration from '../utils/Configuration';
3f40bc9c 4import Pool from 'worker-threads-pool';
ad3de6c4 5import WorkerData from '../types/WorkerData';
7dde0b73 6
3f40bc9c 7export default class Wrk {
ad3de6c4
JB
8 private _workerScript: string;
9 private _workerData: WorkerData;
f98fbdb9 10 private _index: number;
6af9012e
JB
11 private _concurrentWorkers: number;
12
7dde0b73
JB
13 /**
14 * Create a new `Wrk`.
15 *
ad3de6c4
JB
16 * @param {string} workerScript
17 * @param {WorkerData} workerData
18 * @param {number} numConcurrentWorkers
7dde0b73 19 */
ad3de6c4 20 constructor(workerScript: string, workerData: WorkerData, numConcurrentWorkers: number) {
6798437b 21 this._workerData = workerData;
f98fbdb9 22 this._index = workerData.index;
6798437b 23 this._workerScript = workerScript;
7dde0b73 24 if (Configuration.useWorkerPool()) {
ad3de6c4 25 this._concurrentWorkers = Configuration.getWorkerPoolSize();
f98fbdb9 26 WorkerPool.concurrentWorkers = this._concurrentWorkers;
ad3de6c4
JB
27 } else {
28 this._concurrentWorkers = numConcurrentWorkers;
7dde0b73 29 }
6798437b
JB
30 }
31
32 /**
ad3de6c4
JB
33 * @return {number}
34 * @public
6798437b 35 */
ad3de6c4 36 public get concurrentWorkers(): number {
6798437b 37 return this._concurrentWorkers;
7dde0b73
JB
38 }
39
6af9012e
JB
40 /**
41 *
42 * @return {Promise}
43 * @public
44 */
45 async start(): Promise<unknown> {
46 if (Configuration.useWorkerPool()) {
47 return this._startWorkerWithPool();
48 }
49 return this._startWorker();
50 }
51
7dde0b73
JB
52 /**
53 *
54 * @return {Promise}
55 * @private
56 */
6af9012e 57 private async _startWorkerWithPool() {
7dde0b73 58 return new Promise((resolve, reject) => {
f98fbdb9 59 WorkerPool.acquire(this._workerScript, { workerData: this._workerData }, (err, worker) => {
7dde0b73
JB
60 if (err) {
61 return reject(err);
62 }
63 worker.once('message', resolve);
64 worker.once('error', reject);
65 });
66 });
67 }
68
69 /**
70 *
71 * @return {Promise}
72 * @private
73 */
6af9012e 74 private async _startWorker() {
7dde0b73 75 return new Promise((resolve, reject) => {
6af9012e 76 const worker = new Worker(this._workerScript, { workerData: this._workerData });
7dde0b73
JB
77 worker.on('message', resolve);
78 worker.on('error', reject);
79 worker.on('exit', (code) => {
80 if (code !== 0) {
f98fbdb9 81 reject(new Error(`Worker id ${this._index} stopped with exit code ${code}`));
7dde0b73
JB
82 }
83 });
84 });
85 }
7dde0b73 86}
f98fbdb9
JB
87
88class WorkerPool {
89 public static concurrentWorkers: number;
90 private static _instance: Pool;
91
92 private constructor() { }
93
94 public static getInstance(): Pool {
95 if (!WorkerPool._instance) {
96 WorkerPool._instance = new Pool({ max: WorkerPool.concurrentWorkers });
97 }
98 return WorkerPool._instance;
99 }
100
101 public static acquire(filename: string, options: WorkerOptions, callback: (error: Error | null, worker: Worker) => void): void {
102 WorkerPool.getInstance().acquire(filename, options, callback);
103 }
104}