1 import { Worker
, WorkerOptions
} from
'worker_threads';
3 import Configuration from
'../utils/Configuration';
4 import Constants from
'../utils/Constants';
5 import Pool from
'worker-threads-pool';
6 import WorkerData from
'../types/WorkerData';
8 export default class Wrk
{
9 private _workerScript
: string;
10 private _workerData
: WorkerData
;
11 private _worker
: Worker
;
12 private _maxWorkerElements
: number;
13 private _numWorkerElements
: number;
18 * @param {string} workerScript
19 * @param {WorkerData} workerData
20 * @param {number} maxWorkerElements
22 constructor(workerScript
: string, workerData
: WorkerData
, maxWorkerElements
= 1) {
23 this._workerData
= workerData
;
24 this._workerScript
= workerScript
;
25 this._maxWorkerElements
= maxWorkerElements
;
26 this._numWorkerElements
= 0;
27 if (Configuration
.useWorkerPool()) {
28 WorkerPool
.maxConcurrentWorkers
= Configuration
.getWorkerPoolMaxSize();
37 async start(): Promise
<Worker
> {
38 if (Configuration
.useWorkerPool()) {
39 await this._startWorkerWithPool();
41 await this._startWorker();
51 addWorkerElement(workerData
: WorkerData
): void {
52 if (Configuration
.useWorkerPool()) {
53 throw Error('Cannot add Wrk element if the worker pool is enabled');
55 if (this._numWorkerElements
>= this._maxWorkerElements
) {
56 throw Error('Cannot add Wrk element: max number of elements per worker reached');
58 this._workerData
= workerData
;
59 this._worker
.postMessage({ id
: Constants
.START_WORKER_ELEMENT
, workerData
: workerData
});
60 this._numWorkerElements
++;
68 public getWorkerPoolSize(): number {
69 if (Configuration
.useWorkerPool()) {
70 return WorkerPool
.getPoolSize();
79 private async _startWorkerWithPool() {
80 return new Promise((resolve
, reject
) => {
81 WorkerPool
.acquire(this._workerScript
, { workerData
: this._workerData
}, (err
, worker
) => {
85 worker
.once('message', resolve
);
86 worker
.once('error', reject
);
87 this._worker
= worker
;
97 private async _startWorker() {
98 return new Promise((resolve
, reject
) => {
99 const worker
= new Worker(this._workerScript
, { workerData
: this._workerData
});
100 worker
.on('message', resolve
);
101 worker
.on('error', reject
);
102 worker
.on('exit', (code
) => {
104 reject(new Error(`Worker stopped with exit code ${code}`));
107 this._numWorkerElements
++;
108 this._worker
= worker
;
115 public static maxConcurrentWorkers
: number;
116 private static _instance
: Pool
;
118 private constructor() { }
120 public static getInstance(): Pool
{
121 if (!WorkerPool
._instance
) {
122 WorkerPool
._instance
= new Pool({ max
: WorkerPool
.maxConcurrentWorkers
});
124 return WorkerPool
._instance
;
127 public static acquire(filename
: string, options
: WorkerOptions
, callback
: (error
: Error | null, worker
: Worker
) => void): void {
128 WorkerPool
.getInstance().acquire(filename
, options
, callback
);
131 public static getPoolSize(): number {
132 return WorkerPool
.getInstance().size
;