Commit | Line | Data |
---|---|---|
f98fbdb9 JB |
1 | import { Worker, WorkerOptions } from 'worker_threads'; |
2 | ||
6af9012e | 3 | import Configuration from '../utils/Configuration'; |
5fdab605 | 4 | import Constants from '../utils/Constants'; |
3f40bc9c | 5 | import Pool from 'worker-threads-pool'; |
ad3de6c4 | 6 | import WorkerData from '../types/WorkerData'; |
7dde0b73 | 7 | |
3f40bc9c | 8 | export default class Wrk { |
ad3de6c4 JB |
9 | private _workerScript: string; |
10 | private _workerData: WorkerData; | |
f98fbdb9 | 11 | private _index: number; |
6af9012e | 12 | private _concurrentWorkers: number; |
3d2ff9e4 | 13 | private _worker: Worker; |
6af9012e | 14 | |
7dde0b73 JB |
15 | /** |
16 | * Create a new `Wrk`. | |
17 | * | |
ad3de6c4 JB |
18 | * @param {string} workerScript |
19 | * @param {WorkerData} workerData | |
20 | * @param {number} numConcurrentWorkers | |
7dde0b73 | 21 | */ |
ad3de6c4 | 22 | constructor(workerScript: string, workerData: WorkerData, numConcurrentWorkers: number) { |
6798437b | 23 | this._workerData = workerData; |
f98fbdb9 | 24 | this._index = workerData.index; |
6798437b | 25 | this._workerScript = workerScript; |
7dde0b73 | 26 | if (Configuration.useWorkerPool()) { |
ad3de6c4 | 27 | this._concurrentWorkers = Configuration.getWorkerPoolSize(); |
f98fbdb9 | 28 | WorkerPool.concurrentWorkers = this._concurrentWorkers; |
ad3de6c4 JB |
29 | } else { |
30 | this._concurrentWorkers = numConcurrentWorkers; | |
7dde0b73 | 31 | } |
6798437b JB |
32 | } |
33 | ||
34 | /** | |
ad3de6c4 JB |
35 | * @return {number} |
36 | * @public | |
6798437b | 37 | */ |
ad3de6c4 | 38 | public get concurrentWorkers(): number { |
6798437b | 39 | return this._concurrentWorkers; |
7dde0b73 JB |
40 | } |
41 | ||
6af9012e JB |
42 | /** |
43 | * | |
44 | * @return {Promise} | |
45 | * @public | |
46 | */ | |
3d2ff9e4 | 47 | async start(): Promise<Worker> { |
6af9012e | 48 | if (Configuration.useWorkerPool()) { |
5fdab605 | 49 | await this._startWorkerWithPool(); |
3d2ff9e4 | 50 | } else { |
5fdab605 | 51 | await this._startWorker(); |
6af9012e | 52 | } |
3d2ff9e4 J |
53 | return this._worker; |
54 | } | |
55 | ||
5fdab605 | 56 | /** |
3d2ff9e4 | 57 | * |
5fdab605 | 58 | * @return {void} |
3d2ff9e4 J |
59 | * @public |
60 | */ | |
5fdab605 | 61 | addChargingStation(workerData: WorkerData, numConcurrentWorkers: number): void { |
3d2ff9e4 J |
62 | this._workerData = workerData; |
63 | this._index = workerData.index; | |
64 | this._concurrentWorkers = numConcurrentWorkers; | |
5fdab605 | 65 | this._worker.postMessage({ id : Constants.START_CHARGING_STATION, workerData: workerData }); |
6af9012e JB |
66 | } |
67 | ||
7dde0b73 JB |
68 | /** |
69 | * | |
70 | * @return {Promise} | |
71 | * @private | |
72 | */ | |
6af9012e | 73 | private async _startWorkerWithPool() { |
7dde0b73 | 74 | return new Promise((resolve, reject) => { |
f98fbdb9 | 75 | WorkerPool.acquire(this._workerScript, { workerData: this._workerData }, (err, worker) => { |
7dde0b73 JB |
76 | if (err) { |
77 | return reject(err); | |
78 | } | |
79 | worker.once('message', resolve); | |
80 | worker.once('error', reject); | |
3d2ff9e4 | 81 | this._worker = worker; |
7dde0b73 JB |
82 | }); |
83 | }); | |
84 | } | |
85 | ||
86 | /** | |
87 | * | |
88 | * @return {Promise} | |
89 | * @private | |
90 | */ | |
6af9012e | 91 | private async _startWorker() { |
7dde0b73 | 92 | return new Promise((resolve, reject) => { |
6af9012e | 93 | const worker = new Worker(this._workerScript, { workerData: this._workerData }); |
7dde0b73 JB |
94 | worker.on('message', resolve); |
95 | worker.on('error', reject); | |
96 | worker.on('exit', (code) => { | |
97 | if (code !== 0) { | |
f98fbdb9 | 98 | reject(new Error(`Worker id ${this._index} stopped with exit code ${code}`)); |
7dde0b73 JB |
99 | } |
100 | }); | |
3d2ff9e4 | 101 | this._worker = worker; |
7dde0b73 JB |
102 | }); |
103 | } | |
7dde0b73 | 104 | } |
f98fbdb9 JB |
105 | |
106 | class WorkerPool { | |
107 | public static concurrentWorkers: number; | |
108 | private static _instance: Pool; | |
109 | ||
110 | private constructor() { } | |
111 | ||
112 | public static getInstance(): Pool { | |
3d2ff9e4 | 113 | if (!WorkerPool._instance || (WorkerPool._instance?.size === WorkerPool.concurrentWorkers)) { |
f98fbdb9 JB |
114 | WorkerPool._instance = new Pool({ max: WorkerPool.concurrentWorkers }); |
115 | } | |
116 | return WorkerPool._instance; | |
117 | } | |
118 | ||
119 | public static acquire(filename: string, options: WorkerOptions, callback: (error: Error | null, worker: Worker) => void): void { | |
120 | WorkerPool.getInstance().acquire(filename, options, callback); | |
121 | } | |
122 | } |