+import { Worker, WorkerOptions } from 'worker_threads';
+
import Configuration from '../utils/Configuration';
import Pool from 'worker-threads-pool';
-import { Worker } from 'worker_threads';
+import WorkerData from '../types/WorkerData';
+import Constants from '../utils/Constants';
export default class Wrk {
- private _workerData;
- private _workerScript;
- private _pool;
+ private _workerScript: string;
+ private _workerData: WorkerData;
+ private _index: number;
private _concurrentWorkers: number;
+ private _worker: Worker;
/**
* Create a new `Wrk`.
*
- * @param {String} workerScript
- * @param {Object} workerData
- * @param {Number} numConcurrentWorkers
+ * @param {string} workerScript
+ * @param {WorkerData} workerData
+ * @param {number} numConcurrentWorkers
*/
- constructor(workerScript, workerData, numConcurrentWorkers) {
+ constructor(workerScript: string, workerData: WorkerData, numConcurrentWorkers: number) {
this._workerData = workerData;
+ this._index = workerData.index;
this._workerScript = workerScript;
- this._numConcurrentWorkers = numConcurrentWorkers;
if (Configuration.useWorkerPool()) {
- this._pool = new Pool({ max: Configuration.getWorkerPoolSize() });
+ this._concurrentWorkers = Configuration.getWorkerPoolSize();
+ WorkerPool.concurrentWorkers = this._concurrentWorkers;
+ } else {
+ this._concurrentWorkers = numConcurrentWorkers;
}
}
/**
- * @param {Number} numConcurrentWorkers
- * @private
+ * @return {number}
+ * @public
*/
- set _numConcurrentWorkers(numConcurrentWorkers: number) {
- this._concurrentWorkers = numConcurrentWorkers;
- }
-
- get _numConcurrentWorkers(): number {
+ public get concurrentWorkers(): number {
return this._concurrentWorkers;
}
* @return {Promise}
* @public
*/
- async start(): Promise<unknown> {
+ async start(): Promise<Worker> {
if (Configuration.useWorkerPool()) {
- return this._startWorkerWithPool();
+ this._startWorkerWithPool();
+ } else {
+ this._startWorker();
}
- return this._startWorker();
+ return this._worker;
+ }
+
+ /**
+ *
+ * @return {Promise}
+ * @public
+ */
+ async startNewChargingStation(workerData: WorkerData, numConcurrentWorkers: number): Promise<void> {
+ this._workerData = workerData;
+ this._index = workerData.index;
+ this._concurrentWorkers = numConcurrentWorkers;
+ this._worker.postMessage({ id : Constants.START_NEW_CHARGING_STATION, workerData: workerData });
}
/**
*/
private async _startWorkerWithPool() {
return new Promise((resolve, reject) => {
- this._pool.acquire(this._workerScript, { workerData: this._workerData }, (err, worker) => {
+ WorkerPool.acquire(this._workerScript, { workerData: this._workerData }, (err, worker) => {
if (err) {
return reject(err);
}
worker.once('message', resolve);
worker.once('error', reject);
+ this._worker = worker;
});
});
}
worker.on('error', reject);
worker.on('exit', (code) => {
if (code !== 0) {
- reject(new Error(`Worker stopped with exit code ${code}`));
+ reject(new Error(`Worker id ${this._index} stopped with exit code ${code}`));
}
});
+ this._worker = worker;
});
}
}
+
+class WorkerPool {
+ public static concurrentWorkers: number;
+ private static _instance: Pool;
+
+ private constructor() { }
+
+ public static getInstance(): Pool {
+ if (!WorkerPool._instance || (WorkerPool._instance?.size === WorkerPool.concurrentWorkers)) {
+ WorkerPool._instance = new Pool({ max: WorkerPool.concurrentWorkers });
+ }
+ return WorkerPool._instance;
+ }
+
+ public static acquire(filename: string, options: WorkerOptions, callback: (error: Error | null, worker: Worker) => void): void {
+ WorkerPool.getInstance().acquire(filename, options, callback);
+ }
+}