Commit | Line | Data |
---|---|---|
f98fbdb9 JB |
1 | import { Worker, WorkerOptions } from 'worker_threads'; |
2 | ||
6af9012e | 3 | import Configuration from '../utils/Configuration'; |
5fdab605 | 4 | import Constants from '../utils/Constants'; |
3f40bc9c | 5 | import Pool from 'worker-threads-pool'; |
ad3de6c4 | 6 | import WorkerData from '../types/WorkerData'; |
7dde0b73 | 7 | |
3f40bc9c | 8 | export default class Wrk { |
ad3de6c4 JB |
9 | private _workerScript: string; |
10 | private _workerData: WorkerData; | |
3d2ff9e4 | 11 | private _worker: Worker; |
ff70d2ed JB |
12 | private _maxWorkerElements: number; |
13 | private _numWorkerElements: number; | |
6af9012e | 14 | |
7dde0b73 JB |
15 | /** |
16 | * Create a new `Wrk`. | |
17 | * | |
ad3de6c4 JB |
18 | * @param {string} workerScript |
19 | * @param {WorkerData} workerData | |
7dde0b73 | 20 | */ |
ff70d2ed | 21 | constructor(workerScript: string, workerData: WorkerData, maxWorkerElements = 1) { |
6798437b JB |
22 | this._workerData = workerData; |
23 | this._workerScript = workerScript; | |
ff70d2ed JB |
24 | this._maxWorkerElements = maxWorkerElements; |
25 | this._numWorkerElements = 0; | |
7dde0b73 | 26 | if (Configuration.useWorkerPool()) { |
4fa59b8a | 27 | WorkerPool.maxConcurrentWorkers = Configuration.getWorkerPoolMaxSize(); |
7dde0b73 | 28 | } |
7dde0b73 JB |
29 | } |
30 | ||
6af9012e JB |
31 | /** |
32 | * | |
33 | * @return {Promise} | |
34 | * @public | |
35 | */ | |
3d2ff9e4 | 36 | async start(): Promise<Worker> { |
6af9012e | 37 | if (Configuration.useWorkerPool()) { |
5fdab605 | 38 | await this._startWorkerWithPool(); |
3d2ff9e4 | 39 | } else { |
5fdab605 | 40 | await this._startWorker(); |
6af9012e | 41 | } |
3d2ff9e4 J |
42 | return this._worker; |
43 | } | |
44 | ||
5fdab605 | 45 | /** |
3d2ff9e4 | 46 | * |
5fdab605 | 47 | * @return {void} |
3d2ff9e4 J |
48 | * @public |
49 | */ | |
97ed5cec | 50 | addWorkerElement(workerData: WorkerData): void { |
97ed5cec | 51 | if (Configuration.useWorkerPool()) { |
ff70d2ed JB |
52 | throw Error('Cannot add Wrk element if the worker pool is enabled'); |
53 | } | |
54 | if (this._numWorkerElements >= this._maxWorkerElements) { | |
55 | throw Error('Cannot add Wrk element: max number of elements per worker reached'); | |
97ed5cec | 56 | } |
3d2ff9e4 | 57 | this._workerData = workerData; |
4fa59b8a | 58 | this._worker.postMessage({ id: Constants.START_WORKER_ELEMENT, workerData: workerData }); |
ff70d2ed | 59 | this._numWorkerElements++; |
4fa59b8a JB |
60 | } |
61 | ||
62 | /** | |
63 | * | |
64 | * @return {number} | |
65 | * @public | |
66 | */ | |
67 | public getWorkerPoolSize(): number { | |
68 | if (Configuration.useWorkerPool()) { | |
69 | return WorkerPool.getPoolSize(); | |
70 | } | |
6af9012e JB |
71 | } |
72 | ||
7dde0b73 JB |
73 | /** |
74 | * | |
75 | * @return {Promise} | |
76 | * @private | |
77 | */ | |
6af9012e | 78 | private async _startWorkerWithPool() { |
7dde0b73 | 79 | return new Promise((resolve, reject) => { |
f98fbdb9 | 80 | WorkerPool.acquire(this._workerScript, { workerData: this._workerData }, (err, worker) => { |
7dde0b73 JB |
81 | if (err) { |
82 | return reject(err); | |
83 | } | |
84 | worker.once('message', resolve); | |
85 | worker.once('error', reject); | |
3d2ff9e4 | 86 | this._worker = worker; |
7dde0b73 JB |
87 | }); |
88 | }); | |
89 | } | |
90 | ||
91 | /** | |
92 | * | |
93 | * @return {Promise} | |
94 | * @private | |
95 | */ | |
6af9012e | 96 | private async _startWorker() { |
7dde0b73 | 97 | return new Promise((resolve, reject) => { |
6af9012e | 98 | const worker = new Worker(this._workerScript, { workerData: this._workerData }); |
7dde0b73 JB |
99 | worker.on('message', resolve); |
100 | worker.on('error', reject); | |
101 | worker.on('exit', (code) => { | |
102 | if (code !== 0) { | |
4faad557 | 103 | reject(new Error(`Worker stopped with exit code ${code}`)); |
7dde0b73 JB |
104 | } |
105 | }); | |
ff70d2ed | 106 | this._numWorkerElements++; |
3d2ff9e4 | 107 | this._worker = worker; |
7dde0b73 JB |
108 | }); |
109 | } | |
7dde0b73 | 110 | } |
f98fbdb9 | 111 | |
72a965c4 | 112 | |
f98fbdb9 | 113 | class WorkerPool { |
97ed5cec | 114 | public static maxConcurrentWorkers: number; |
f98fbdb9 JB |
115 | private static _instance: Pool; |
116 | ||
117 | private constructor() { } | |
118 | ||
119 | public static getInstance(): Pool { | |
97ed5cec JB |
120 | if (!WorkerPool._instance) { |
121 | WorkerPool._instance = new Pool({ max: WorkerPool.maxConcurrentWorkers }); | |
f98fbdb9 JB |
122 | } |
123 | return WorkerPool._instance; | |
124 | } | |
125 | ||
126 | public static acquire(filename: string, options: WorkerOptions, callback: (error: Error | null, worker: Worker) => void): void { | |
127 | WorkerPool.getInstance().acquire(filename, options, callback); | |
128 | } | |
72a965c4 J |
129 | |
130 | public static getPoolSize(): number { | |
4fa59b8a | 131 | return WorkerPool.getInstance().size; |
72a965c4 | 132 | } |
f98fbdb9 | 133 | } |