import { Worker, WorkerOptions } from 'worker_threads';
import Configuration from '../utils/Configuration';
+import Constants from '../utils/Constants';
import Pool from 'worker-threads-pool';
import WorkerData from '../types/WorkerData';
export default class Wrk {
private _workerScript: string;
private _workerData: WorkerData;
- private _index: number;
- private _concurrentWorkers: number;
+ private _worker: Worker;
/**
* Create a new `Wrk`.
*
* @param {string} workerScript
* @param {WorkerData} workerData
- * @param {number} numConcurrentWorkers
*/
- constructor(workerScript: string, workerData: WorkerData, numConcurrentWorkers: number) {
+ constructor(workerScript: string, workerData: WorkerData) {
this._workerData = workerData;
- this._index = workerData.index;
this._workerScript = workerScript;
if (Configuration.useWorkerPool()) {
- this._concurrentWorkers = Configuration.getWorkerPoolSize();
- WorkerPool.concurrentWorkers = this._concurrentWorkers;
- } else {
- this._concurrentWorkers = numConcurrentWorkers;
+ WorkerPool.maxConcurrentWorkers = Configuration.getWorkerPoolSize();
}
}
/**
- * @return {number}
+ *
+ * @return {Promise}
* @public
*/
- public get concurrentWorkers(): number {
- return this._concurrentWorkers;
+ async start(): Promise<Worker> {
+ if (Configuration.useWorkerPool()) {
+ await this._startWorkerWithPool();
+ } else {
+ await this._startWorker();
+ }
+ return this._worker;
}
/**
*
- * @return {Promise}
+ * @return {void}
* @public
*/
- async start(): Promise<unknown> {
+ addWorkerElement(workerData: WorkerData): void {
+ // FIXME: also forbid to add an element if the current number of elements > max number of elements
if (Configuration.useWorkerPool()) {
- return this._startWorkerWithPool();
+ return;
}
- return this._startWorker();
+ this._workerData = workerData;
+ this._worker.postMessage({ id : Constants.START_WORKER_ELEMENT, workerData: workerData });
}
/**
}
worker.once('message', resolve);
worker.once('error', reject);
+ this._worker = worker;
});
});
}
worker.on('error', reject);
worker.on('exit', (code) => {
if (code !== 0) {
- reject(new Error(`Worker id ${this._index} stopped with exit code ${code}`));
+ reject(new Error(`Worker stopped with exit code ${code}`));
}
});
+ this._worker = worker;
});
}
}
class WorkerPool {
- public static concurrentWorkers: number;
+ public static maxConcurrentWorkers: number;
private static _instance: Pool;
private constructor() { }
public static getInstance(): Pool {
if (!WorkerPool._instance) {
- WorkerPool._instance = new Pool({ max: WorkerPool.concurrentWorkers });
+ WorkerPool._instance = new Pool({ max: WorkerPool.maxConcurrentWorkers });
}
return WorkerPool._instance;
}