-import { Worker, WorkerOptions } from 'worker_threads';
-
import Configuration from '../utils/Configuration';
import Constants from '../utils/Constants';
-import Pool from 'worker-threads-pool';
+import { Worker } from 'worker_threads';
import WorkerData from '../types/WorkerData';
+import WorkerPool from './WorkerPool';
export default class Wrk {
- private _workerScript: string;
- private _workerData: WorkerData;
- private _worker: Worker;
+ private workerScript: string;
+ private workerData: WorkerData;
+ private worker: Worker;
+ private maxWorkerElements: number;
+ private numWorkerElements: number;
/**
* Create a new `Wrk`.
*
* @param {string} workerScript
* @param {WorkerData} workerData
+ * @param {number} maxWorkerElements
*/
- constructor(workerScript: string, workerData: WorkerData) {
- this._workerData = workerData;
- this._workerScript = workerScript;
+ constructor(workerScript: string, workerData: WorkerData, maxWorkerElements = 1) {
+ this.workerData = workerData;
+ this.workerScript = workerScript;
if (Configuration.useWorkerPool()) {
- WorkerPool.maxConcurrentWorkers = Configuration.getWorkerPoolSize();
+ WorkerPool.maxConcurrentWorkers = Configuration.getWorkerPoolMaxSize();
+ } else {
+ this.maxWorkerElements = maxWorkerElements;
+ this.numWorkerElements = 0;
}
}
*/
async start(): Promise<Worker> {
if (Configuration.useWorkerPool()) {
- await this._startWorkerWithPool();
+ await this.startWorkerPool();
} else {
- await this._startWorker();
+ await this.startWorker();
}
- return this._worker;
+ return this.worker;
}
/**
* @public
*/
addWorkerElement(workerData: WorkerData): void {
- // FIXME: also forbid to add an element if the current number of elements > max number of elements
if (Configuration.useWorkerPool()) {
- return;
+ throw Error('Cannot add Wrk element if the worker pool is enabled');
+ }
+ if (this.numWorkerElements >= this.maxWorkerElements) {
+ throw Error('Cannot add Wrk element: max number of elements per worker reached');
+ }
+ this.workerData = workerData;
+ this.worker.postMessage({ id: Constants.START_WORKER_ELEMENT, workerData: workerData });
+ this.numWorkerElements++;
+ }
+
+ /**
+ *
+ * @return {number}
+ * @public
+ */
+ public getWorkerPoolSize(): number {
+ if (Configuration.useWorkerPool()) {
+ return WorkerPool.getPoolSize();
}
- this._workerData = workerData;
- this._worker.postMessage({ id : Constants.START_WORKER_ELEMENT, workerData: workerData });
}
/**
* @return {Promise}
* @private
*/
- private async _startWorkerWithPool() {
+ private async startWorkerPool() {
return new Promise((resolve, reject) => {
- WorkerPool.acquire(this._workerScript, { workerData: this._workerData }, (err, worker) => {
+ WorkerPool.acquire(this.workerScript, { workerData: this.workerData }, (err, worker) => {
if (err) {
return reject(err);
}
worker.once('message', resolve);
worker.once('error', reject);
- this._worker = worker;
+ this.worker = worker;
});
});
}
* @return {Promise}
* @private
*/
- private async _startWorker() {
+ private async startWorker() {
return new Promise((resolve, reject) => {
- const worker = new Worker(this._workerScript, { workerData: this._workerData });
+ const worker = new Worker(this.workerScript, { workerData: this.workerData });
worker.on('message', resolve);
worker.on('error', reject);
worker.on('exit', (code) => {
reject(new Error(`Worker stopped with exit code ${code}`));
}
});
- this._worker = worker;
+ this.numWorkerElements++;
+ this.worker = worker;
});
}
}
-
-class WorkerPool {
- public static maxConcurrentWorkers: number;
- private static _instance: Pool;
-
- private constructor() { }
-
- public static getInstance(): Pool {
- if (!WorkerPool._instance) {
- WorkerPool._instance = new Pool({ max: WorkerPool.maxConcurrentWorkers });
- }
- return WorkerPool._instance;
- }
-
- public static acquire(filename: string, options: WorkerOptions, callback: (error: Error | null, worker: Worker) => void): void {
- WorkerPool.getInstance().acquire(filename, options, callback);
- }
-}