X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fcharging-station%2FWorker.ts;h=8a9e9f91198b1d70f3b9fd7246097bdd9678ca7a;hb=c96c65446db22c81ae48714691dbf974a8141b1d;hp=fbfdf3d1f047fa1886febf126750265ae73c5d79;hpb=3d2ff9e4875d166265bb925e00a4301e82f5c248;p=e-mobility-charging-stations-simulator.git diff --git a/src/charging-station/Worker.ts b/src/charging-station/Worker.ts index fbfdf3d1..8a9e9f91 100644 --- a/src/charging-station/Worker.ts +++ b/src/charging-station/Worker.ts @@ -1,68 +1,74 @@ -import { Worker, WorkerOptions } from 'worker_threads'; - import Configuration from '../utils/Configuration'; -import Pool from 'worker-threads-pool'; -import WorkerData from '../types/WorkerData'; import Constants from '../utils/Constants'; +import { Worker } from 'worker_threads'; +import WorkerData from '../types/WorkerData'; +import WorkerPool from './WorkerPool'; export default class Wrk { - private _workerScript: string; - private _workerData: WorkerData; - private _index: number; - private _concurrentWorkers: number; - private _worker: Worker; + private workerScript: string; + private workerData: WorkerData; + private worker: Worker; + private maxWorkerElements: number; + private numWorkerElements: number; /** * Create a new `Wrk`. * * @param {string} workerScript * @param {WorkerData} workerData - * @param {number} numConcurrentWorkers + * @param {number} maxWorkerElements */ - constructor(workerScript: string, workerData: WorkerData, numConcurrentWorkers: number) { - this._workerData = workerData; - this._index = workerData.index; - this._workerScript = workerScript; + constructor(workerScript: string, workerData: WorkerData, maxWorkerElements = 1) { + this.workerData = workerData; + this.workerScript = workerScript; if (Configuration.useWorkerPool()) { - this._concurrentWorkers = Configuration.getWorkerPoolSize(); - WorkerPool.concurrentWorkers = this._concurrentWorkers; + WorkerPool.maxConcurrentWorkers = Configuration.getWorkerPoolMaxSize(); } else { - this._concurrentWorkers = numConcurrentWorkers; + this.maxWorkerElements = maxWorkerElements; + this.numWorkerElements = 0; } } /** - * @return {number} + * + * @return {Promise} * @public */ - public get concurrentWorkers(): number { - return this._concurrentWorkers; + async start(): Promise { + if (Configuration.useWorkerPool()) { + await this.startWorkerPool(); + } else { + await this.startWorker(); + } + return this.worker; } /** * - * @return {Promise} + * @return {void} * @public */ - async start(): Promise { + addWorkerElement(workerData: WorkerData): void { if (Configuration.useWorkerPool()) { - this._startWorkerWithPool(); - } else { - this._startWorker(); + throw Error('Cannot add Wrk element if the worker pool is enabled'); + } + if (this.numWorkerElements >= this.maxWorkerElements) { + throw Error('Cannot add Wrk element: max number of elements per worker reached'); } - return this._worker; + this.workerData = workerData; + this.worker.postMessage({ id: Constants.START_WORKER_ELEMENT, workerData: workerData }); + this.numWorkerElements++; } - /** + /** * - * @return {Promise} + * @return {number} * @public */ - async startNewChargingStation(workerData: WorkerData, numConcurrentWorkers: number): Promise { - this._workerData = workerData; - this._index = workerData.index; - this._concurrentWorkers = numConcurrentWorkers; - this._worker.postMessage({ id : Constants.START_NEW_CHARGING_STATION, workerData: workerData }); + public getWorkerPoolSize(): number { + if (Configuration.useWorkerPool()) { + return WorkerPool.getPoolSize(); + } } /** @@ -70,15 +76,15 @@ export default class Wrk { * @return {Promise} * @private */ - private async _startWorkerWithPool() { + private async startWorkerPool() { return new Promise((resolve, reject) => { - WorkerPool.acquire(this._workerScript, { workerData: this._workerData }, (err, worker) => { + WorkerPool.acquire(this.workerScript, { workerData: this.workerData }, (err, worker) => { if (err) { return reject(err); } worker.once('message', resolve); worker.once('error', reject); - this._worker = worker; + this.worker = worker; }); }); } @@ -88,35 +94,18 @@ export default class Wrk { * @return {Promise} * @private */ - private async _startWorker() { + private async startWorker() { return new Promise((resolve, reject) => { - const worker = new Worker(this._workerScript, { workerData: this._workerData }); + const worker = new Worker(this.workerScript, { workerData: this.workerData }); worker.on('message', resolve); worker.on('error', reject); worker.on('exit', (code) => { if (code !== 0) { - reject(new Error(`Worker id ${this._index} stopped with exit code ${code}`)); + reject(new Error(`Worker stopped with exit code ${code}`)); } }); - this._worker = worker; + this.numWorkerElements++; + this.worker = worker; }); } } - -class WorkerPool { - public static concurrentWorkers: number; - private static _instance: Pool; - - private constructor() { } - - public static getInstance(): Pool { - if (!WorkerPool._instance || (WorkerPool._instance?.size === WorkerPool.concurrentWorkers)) { - WorkerPool._instance = new Pool({ max: WorkerPool.concurrentWorkers }); - } - return WorkerPool._instance; - } - - public static acquire(filename: string, options: WorkerOptions, callback: (error: Error | null, worker: Worker) => void): void { - WorkerPool.getInstance().acquire(filename, options, callback); - } -}