From 97ed5cec983f91ed1cdb903c13505994a6e0d23b Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Thu, 21 Jan 2021 13:10:16 +0100 Subject: [PATCH] Fix worker with pool handling MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- src/charging-station/StationWorker.ts | 5 ++--- src/charging-station/Worker.ts | 31 ++++++++++++++------------- src/start.ts | 30 +++++++++++++++----------- src/types/WorkerData.ts | 1 + src/utils/Constants.ts | 2 +- 5 files changed, 38 insertions(+), 31 deletions(-) diff --git a/src/charging-station/StationWorker.ts b/src/charging-station/StationWorker.ts index 8eac9e46..a948a1b4 100644 --- a/src/charging-station/StationWorker.ts +++ b/src/charging-station/StationWorker.ts @@ -4,8 +4,7 @@ import ChargingStation from './ChargingStation'; import Constants from '../utils/Constants'; if (!isMainThread) { - const station = new ChargingStation(workerData.index as number, workerData.templateFile as string); - station.start(); + startChargingStation({ index: workerData.index as number, templateFile: workerData.templateFile as string }); // Listener: start new charging station from main thread addListener(); @@ -14,7 +13,7 @@ if (!isMainThread) { function addListener() { parentPort.setMaxListeners(Constants.MAX_LISTENERS); parentPort.on('message', (e) => { - if (e.id === Constants.START_CHARGING_STATION) { + if (e.id === Constants.START_WORKER_ELEMENT) { startChargingStation(e.workerData); } }); diff --git a/src/charging-station/Worker.ts b/src/charging-station/Worker.ts index 84ec9edf..41374297 100644 --- a/src/charging-station/Worker.ts +++ b/src/charging-station/Worker.ts @@ -9,7 +9,7 @@ export default class Wrk { private _workerScript: string; private _workerData: WorkerData; private _index: number; - private _concurrentWorkers: number; + private _maxWorkerElements: number; private _worker: Worker; /** @@ -17,26 +17,24 @@ export default class Wrk { * * @param {string} workerScript * @param {WorkerData} workerData - * @param {number} numConcurrentWorkers + * @param {number} maxWorkerElements */ - constructor(workerScript: string, workerData: WorkerData, numConcurrentWorkers: number) { + constructor(workerScript: string, workerData: WorkerData, maxWorkerElements = 1) { this._workerData = workerData; this._index = workerData.index; this._workerScript = workerScript; if (Configuration.useWorkerPool()) { - this._concurrentWorkers = Configuration.getWorkerPoolSize(); - WorkerPool.concurrentWorkers = this._concurrentWorkers; - } else { - this._concurrentWorkers = numConcurrentWorkers; + WorkerPool.maxConcurrentWorkers = Configuration.getWorkerPoolSize(); } + this._maxWorkerElements = maxWorkerElements; } /** * @return {number} * @public */ - public get concurrentWorkers(): number { - return this._concurrentWorkers; + public get maxWorkerElements(): number { + return this._maxWorkerElements; } /** @@ -58,11 +56,14 @@ export default class Wrk { * @return {void} * @public */ - addChargingStation(workerData: WorkerData, numConcurrentWorkers: number): void { + addWorkerElement(workerData: WorkerData): void { + // FIXME: also forbid to add an element if the current number of elements > max number of elements + if (Configuration.useWorkerPool()) { + return; + } this._workerData = workerData; this._index = workerData.index; - this._concurrentWorkers = numConcurrentWorkers; - this._worker.postMessage({ id : Constants.START_CHARGING_STATION, workerData: workerData }); + this._worker.postMessage({ id : Constants.START_WORKER_ELEMENT, workerData: workerData }); } /** @@ -104,14 +105,14 @@ export default class Wrk { } class WorkerPool { - public static concurrentWorkers: number; + public static maxConcurrentWorkers: number; private static _instance: Pool; private constructor() { } public static getInstance(): Pool { - if (!WorkerPool._instance || (WorkerPool._instance?.size === WorkerPool.concurrentWorkers)) { - WorkerPool._instance = new Pool({ max: WorkerPool.concurrentWorkers }); + if (!WorkerPool._instance) { + WorkerPool._instance = new Pool({ max: WorkerPool.maxConcurrentWorkers }); } return WorkerPool._instance; } diff --git a/src/start.ts b/src/start.ts index 67df5a3f..3b9fc75b 100644 --- a/src/start.ts +++ b/src/start.ts @@ -10,8 +10,6 @@ class Bootstrap { let numStationsTotal = 0; let numConcurrentWorkers = 0; let worker: Wrk; - const chargingStationsPerWorker = Configuration.getChargingStationsPerWorker(); - let counter = 0; // Start each ChargingStation object in a worker thread if (Configuration.getStationTemplateURLs()) { for await (const stationURL of Configuration.getStationTemplateURLs()) { @@ -23,19 +21,27 @@ class Bootstrap { index, templateFile: stationURL.file } as WorkerData; - if (counter === 0 || counter === chargingStationsPerWorker) { - // Start new worker with one charging station - worker = new Wrk('./dist/charging-station/StationWorker.js', workerData, numStationsTotal); + if (Configuration.useWorkerPool()) { + worker = new Wrk('./dist/charging-station/StationWorker.js', workerData); worker.start().catch(() => { }); - counter = 0; - // Start workers sequentially to optimize memory at start time - await Utils.sleep(Constants.START_WORKER_DELAY); + numConcurrentWorkers = Configuration.getWorkerPoolSize(); } else { - // Add charging station to existing Worker - worker.addChargingStation(workerData, numStationsTotal); + const chargingStationsPerWorker = Configuration.getChargingStationsPerWorker(); + let chargingStationsPerWorkerCounter = 0; + if (chargingStationsPerWorkerCounter === 0 || chargingStationsPerWorkerCounter === chargingStationsPerWorker) { + // Start new Wrk with one charging station + worker = new Wrk('./dist/charging-station/StationWorker.js', workerData, chargingStationsPerWorker); + worker.start().catch(() => { }); + numConcurrentWorkers++; + chargingStationsPerWorkerCounter = 1; + // Start Wrk sequentially to optimize memory at start time + await Utils.sleep(Constants.START_WORKER_DELAY); + } else { + // Add charging station to existing Wrk + worker.addWorkerElement(workerData); + chargingStationsPerWorkerCounter++; + } } - counter++; - numConcurrentWorkers = worker.concurrentWorkers; } } catch (error) { // eslint-disable-next-line no-console diff --git a/src/types/WorkerData.ts b/src/types/WorkerData.ts index 73ff1f8c..a7bc2c27 100644 --- a/src/types/WorkerData.ts +++ b/src/types/WorkerData.ts @@ -1,3 +1,4 @@ +// FIXME: make it more generic export default interface WorkerData { index: number; templateFile: string; diff --git a/src/utils/Constants.ts b/src/utils/Constants.ts index bad0858d..e6d10386 100644 --- a/src/utils/Constants.ts +++ b/src/utils/Constants.ts @@ -38,5 +38,5 @@ export default class Constants { static readonly MAX_LISTENERS = 1000; static readonly START_WORKER_DELAY = 500; - static readonly START_CHARGING_STATION = 'startChargingStation'; + static readonly START_WORKER_ELEMENT = 'startWorkerElement'; } -- 2.34.1