Fix worker with pool handling
[e-mobility-charging-stations-simulator.git] / src / charging-station / Worker.ts
CommitLineData
f98fbdb9
JB
1import { Worker, WorkerOptions } from 'worker_threads';
2
6af9012e 3import Configuration from '../utils/Configuration';
5fdab605 4import Constants from '../utils/Constants';
3f40bc9c 5import Pool from 'worker-threads-pool';
ad3de6c4 6import WorkerData from '../types/WorkerData';
7dde0b73 7
3f40bc9c 8export default class Wrk {
ad3de6c4
JB
9 private _workerScript: string;
10 private _workerData: WorkerData;
f98fbdb9 11 private _index: number;
97ed5cec 12 private _maxWorkerElements: number;
3d2ff9e4 13 private _worker: Worker;
6af9012e 14
7dde0b73
JB
15 /**
16 * Create a new `Wrk`.
17 *
ad3de6c4
JB
18 * @param {string} workerScript
19 * @param {WorkerData} workerData
97ed5cec 20 * @param {number} maxWorkerElements
7dde0b73 21 */
97ed5cec 22 constructor(workerScript: string, workerData: WorkerData, maxWorkerElements = 1) {
6798437b 23 this._workerData = workerData;
f98fbdb9 24 this._index = workerData.index;
6798437b 25 this._workerScript = workerScript;
7dde0b73 26 if (Configuration.useWorkerPool()) {
97ed5cec 27 WorkerPool.maxConcurrentWorkers = Configuration.getWorkerPoolSize();
7dde0b73 28 }
97ed5cec 29 this._maxWorkerElements = maxWorkerElements;
6798437b
JB
30 }
31
32 /**
ad3de6c4
JB
33 * @return {number}
34 * @public
6798437b 35 */
97ed5cec
JB
36 public get maxWorkerElements(): number {
37 return this._maxWorkerElements;
7dde0b73
JB
38 }
39
6af9012e
JB
40 /**
41 *
42 * @return {Promise}
43 * @public
44 */
3d2ff9e4 45 async start(): Promise<Worker> {
6af9012e 46 if (Configuration.useWorkerPool()) {
5fdab605 47 await this._startWorkerWithPool();
3d2ff9e4 48 } else {
5fdab605 49 await this._startWorker();
6af9012e 50 }
3d2ff9e4
J
51 return this._worker;
52 }
53
5fdab605 54 /**
3d2ff9e4 55 *
5fdab605 56 * @return {void}
3d2ff9e4
J
57 * @public
58 */
97ed5cec
JB
59 addWorkerElement(workerData: WorkerData): void {
60 // FIXME: also forbid to add an element if the current number of elements > max number of elements
61 if (Configuration.useWorkerPool()) {
62 return;
63 }
3d2ff9e4
J
64 this._workerData = workerData;
65 this._index = workerData.index;
97ed5cec 66 this._worker.postMessage({ id : Constants.START_WORKER_ELEMENT, workerData: workerData });
6af9012e
JB
67 }
68
7dde0b73
JB
69 /**
70 *
71 * @return {Promise}
72 * @private
73 */
6af9012e 74 private async _startWorkerWithPool() {
7dde0b73 75 return new Promise((resolve, reject) => {
f98fbdb9 76 WorkerPool.acquire(this._workerScript, { workerData: this._workerData }, (err, worker) => {
7dde0b73
JB
77 if (err) {
78 return reject(err);
79 }
80 worker.once('message', resolve);
81 worker.once('error', reject);
3d2ff9e4 82 this._worker = worker;
7dde0b73
JB
83 });
84 });
85 }
86
87 /**
88 *
89 * @return {Promise}
90 * @private
91 */
6af9012e 92 private async _startWorker() {
7dde0b73 93 return new Promise((resolve, reject) => {
6af9012e 94 const worker = new Worker(this._workerScript, { workerData: this._workerData });
7dde0b73
JB
95 worker.on('message', resolve);
96 worker.on('error', reject);
97 worker.on('exit', (code) => {
98 if (code !== 0) {
f98fbdb9 99 reject(new Error(`Worker id ${this._index} stopped with exit code ${code}`));
7dde0b73
JB
100 }
101 });
3d2ff9e4 102 this._worker = worker;
7dde0b73
JB
103 });
104 }
7dde0b73 105}
f98fbdb9
JB
106
107class WorkerPool {
97ed5cec 108 public static maxConcurrentWorkers: number;
f98fbdb9
JB
109 private static _instance: Pool;
110
111 private constructor() { }
112
113 public static getInstance(): Pool {
97ed5cec
JB
114 if (!WorkerPool._instance) {
115 WorkerPool._instance = new Pool({ max: WorkerPool.maxConcurrentWorkers });
f98fbdb9
JB
116 }
117 return WorkerPool._instance;
118 }
119
120 public static acquire(filename: string, options: WorkerOptions, callback: (error: Error | null, worker: Worker) => void): void {
121 WorkerPool.getInstance().acquire(filename, options, callback);
122 }
123}