From 6013bc53ce820bacf728a4d85d875c3317ff2442 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Mon, 25 Jan 2021 00:44:30 +0100 Subject: [PATCH] Use object factory design pattern for code handling workers. MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- src/charging-station/ChargingStation.ts | 2 - src/charging-station/StationWorker.ts | 8 ++- src/start.ts | 56 ++++------------- src/worker/Worker.ts | 3 +- src/worker/WorkerFactory.ts | 13 ++++ src/worker/WorkerGroup.ts | 79 ----------------------- src/worker/WorkerPool.ts | 11 +++- src/worker/WorkerSet.ts | 84 +++++++++++++++++++++++++ 8 files changed, 125 insertions(+), 131 deletions(-) create mode 100644 src/worker/WorkerFactory.ts delete mode 100644 src/worker/WorkerGroup.ts create mode 100644 src/worker/WorkerSet.ts diff --git a/src/charging-station/ChargingStation.ts b/src/charging-station/ChargingStation.ts index 1aa3e3b6..f646a32c 100644 --- a/src/charging-station/ChargingStation.ts +++ b/src/charging-station/ChargingStation.ts @@ -74,9 +74,7 @@ export default class ChargingStation { // In case of multiple instances: add instance index to charging station id let instanceIndex = process.env.CF_INSTANCE_INDEX ? process.env.CF_INSTANCE_INDEX : 0; instanceIndex = instanceIndex > 0 ? instanceIndex : ''; - const idSuffix = stationTemplate.nameSuffix ? stationTemplate.nameSuffix : ''; - return stationTemplate.fixedName ? stationTemplate.baseName : stationTemplate.baseName + '-' + instanceIndex.toString() + ('000000000' + this.index.toString()).substr(('000000000' + this.index.toString()).length - 4) + idSuffix; } diff --git a/src/charging-station/StationWorker.ts b/src/charging-station/StationWorker.ts index 513265e2..32c75ca7 100644 --- a/src/charging-station/StationWorker.ts +++ b/src/charging-station/StationWorker.ts @@ -2,12 +2,14 @@ import { isMainThread, parentPort, workerData } from 'worker_threads'; import ChargingStation from './ChargingStation'; import Constants from '../utils/Constants'; +import Utils from '../utils/Utils'; if (!isMainThread) { - startChargingStation({ index: workerData.index as number, templateFile: workerData.templateFile as string }); - - // Listener: start new charging station from main thread + // Add listener to start charging station from main thread addListener(); + if (!Utils.isUndefined(workerData)) { + startChargingStation({ index: workerData.index as number, templateFile: workerData.templateFile as string }); + } } function addListener() { diff --git a/src/start.ts b/src/start.ts index 591ef96f..aa3cfdef 100644 --- a/src/start.ts +++ b/src/start.ts @@ -1,23 +1,15 @@ import Configuration from './utils/Configuration'; -import Constants from './utils/Constants'; -import Utils from './utils/Utils'; import WorkerData from './types/WorkerData'; -import WorkerGroup from './worker/WorkerGroup'; -import WorkerPool from './worker/WorkerPool'; +import WorkerFactory from './worker/WorkerFactory'; +import Wrk from './worker/Worker'; class Bootstrap { - static async start() { + static start() { try { let numStationsTotal = 0; - let numConcurrentWorkers = 0; - const chargingStationsPerWorker = Configuration.getChargingStationsPerWorker(); - let chargingStationsPerWorkerCounter = 0; - let workerImplementation: WorkerGroup | WorkerPool; - if (Configuration.useWorkerPool()) { - workerImplementation = new WorkerPool('./dist/charging-station/StationWorker.js'); - void workerImplementation.start(); - } - // Start each ChargingStation object in a worker thread + const workerImplementation: Wrk = WorkerFactory.getWorkerImpl('./dist/charging-station/StationWorker.js'); + void workerImplementation.start(); + // Start ChargingStation object in worker thread if (Configuration.getStationTemplateURLs()) { for (const stationURL of Configuration.getStationTemplateURLs()) { try { @@ -27,32 +19,12 @@ class Bootstrap { index, templateFile: stationURL.file }; - if (Configuration.useWorkerPool()) { - void workerImplementation.addElement(workerData); - numConcurrentWorkers = workerImplementation.size; - // Start worker sequentially to optimize memory at start time - await Utils.sleep(Constants.START_WORKER_DELAY); - } else { - // eslint-disable-next-line no-lonely-if - if (chargingStationsPerWorkerCounter === 0 || chargingStationsPerWorkerCounter >= chargingStationsPerWorker) { - // Start new WorkerGroup with one charging station - workerImplementation = new WorkerGroup('./dist/charging-station/StationWorker.js', workerData, chargingStationsPerWorker); - void workerImplementation.start(); - numConcurrentWorkers++; - chargingStationsPerWorkerCounter = 1; - // Start worker sequentially to optimize memory at start time - await Utils.sleep(Constants.START_WORKER_DELAY); - } else { - // Add charging station to existing WorkerGroup - void workerImplementation.addElement(workerData); - chargingStationsPerWorkerCounter++; - } - } + void workerImplementation.addElement(workerData); numStationsTotal++; } } catch (error) { // eslint-disable-next-line no-console - console.log('Charging station start with template file ' + stationURL.file + ' error ' + JSON.stringify(error, null, ' ')); + console.error('Charging station start with template file ' + stationURL.file + ' error ', error); } } } else { @@ -60,20 +32,14 @@ class Bootstrap { } if (numStationsTotal === 0) { console.log('No charging station template enabled in configuration, exiting'); - } else if (Configuration.useWorkerPool()) { - console.log(`Charging station simulator started with ${numStationsTotal.toString()} charging station(s) and ${numConcurrentWorkers.toString()}/${Configuration.getWorkerPoolMaxSize().toString()} worker(s) concurrently running`); } else { - console.log(`Charging station simulator started with ${numStationsTotal.toString()} charging station(s) and ${numConcurrentWorkers.toString()} worker(s) concurrently running (${chargingStationsPerWorker} charging station(s) per worker)`); + console.log(`Charging station simulator started with ${numStationsTotal.toString()} charging station(s) and ${workerImplementation.size}${Configuration.useWorkerPool() ? `/${Configuration.getWorkerPoolMaxSize().toString()}` : ''} worker(s) concurrently running (${workerImplementation.maxElementsPerWorker} charging station(s) per worker)`); } } catch (error) { // eslint-disable-next-line no-console - console.log('Bootstrap start error ' + JSON.stringify(error, null, ' ')); + console.error('Bootstrap start error ', error); } } } -Bootstrap.start().catch( - (error) => { - console.error(error); - } -); +Bootstrap.start(); diff --git a/src/worker/Worker.ts b/src/worker/Worker.ts index 54dcb156..bea6d67f 100644 --- a/src/worker/Worker.ts +++ b/src/worker/Worker.ts @@ -3,6 +3,7 @@ import WorkerData from '../types/WorkerData'; export default abstract class Wrk { protected workerScript: string; public abstract size: number; + public abstract maxElementsPerWorker: number; /** * Create a new `Wrk`. @@ -14,5 +15,5 @@ export default abstract class Wrk { } public abstract start(): Promise; - public abstract addElement(elementData: WorkerData): void; + public abstract addElement(elementData: WorkerData): Promise; } diff --git a/src/worker/WorkerFactory.ts b/src/worker/WorkerFactory.ts new file mode 100644 index 00000000..13880b2f --- /dev/null +++ b/src/worker/WorkerFactory.ts @@ -0,0 +1,13 @@ +import Configuration from '../utils/Configuration'; +import WorkerPool from './WorkerPool'; +import WorkerSet from './WorkerSet'; +import Wrk from './Worker'; + +export default class WorkerFactory { + public static getWorkerImpl(workerScript: string): Wrk { + if (Configuration.useWorkerPool()) { + return new WorkerPool(workerScript); + } + return new WorkerSet(workerScript, Configuration.getChargingStationsPerWorker()); + } +} diff --git a/src/worker/WorkerGroup.ts b/src/worker/WorkerGroup.ts deleted file mode 100644 index 4f5ad32f..00000000 --- a/src/worker/WorkerGroup.ts +++ /dev/null @@ -1,79 +0,0 @@ -import Configuration from '../utils/Configuration'; -import Constants from '../utils/Constants'; -import { Worker } from 'worker_threads'; -import WorkerData from '../types/WorkerData'; -import Wrk from './Worker'; - -export default class WorkerGroup extends Wrk { - private worker: Worker; - private lastElementData: WorkerData; - private maxWorkerElements: number; - private numWorkerElements: number; - - /** - * Create a new `WorkerGroup`. - * - * @param {string} workerScript - * @param {WorkerData} workerData - * @param {number} maxWorkerElements - */ - constructor(workerScript: string, initialElementData: WorkerData, maxWorkerElements = 1) { - super(workerScript); - this.lastElementData = initialElementData; - this.maxWorkerElements = maxWorkerElements; - this.numWorkerElements = 0; - } - - get size(): number { - return this.numWorkerElements; - } - - /** - * - * @return {void} - * @public - */ - public addElement(elementData: WorkerData): void { - if (Configuration.useWorkerPool()) { - throw Error('Cannot add a WorkerGroup element: the worker pool is enabled in configuration'); - } - if (!this.worker) { - throw Error('Cannot add a WorkerGroup element: worker does not exist'); - } - if (this.numWorkerElements >= this.maxWorkerElements) { - throw Error('Cannot add a WorkerGroup element: max number of elements per worker reached'); - } - this.lastElementData = elementData; - this.worker.postMessage({ id: Constants.START_WORKER_ELEMENT, workerData: this.lastElementData }); - this.numWorkerElements++; - } - - /** - * - * @return {Promise} - * @public - */ - public async start(): Promise { - await this.startWorker(); - } - - /** - * - * @return {Promise} - * @private - */ - private async startWorker() { - return new Promise((resolve, reject) => { - const worker = new Worker(this.workerScript, { workerData: this.lastElementData }); - worker.on('message', resolve); - worker.on('error', reject); - worker.on('exit', (code) => { - if (code !== 0) { - reject(new Error(`Worker stopped with exit code ${code}`)); - } - }); - this.numWorkerElements++; - this.worker = worker; - }); - } -} diff --git a/src/worker/WorkerPool.ts b/src/worker/WorkerPool.ts index 76910f82..d04926e7 100644 --- a/src/worker/WorkerPool.ts +++ b/src/worker/WorkerPool.ts @@ -1,5 +1,7 @@ import Configuration from '../utils/Configuration'; +import Constants from '../utils/Constants'; import Pool from 'worker-threads-pool'; +import Utils from '../utils/Utils'; import WorkerData from '../types/WorkerData'; import Wrk from './Worker'; @@ -20,16 +22,21 @@ export default class WorkerPool extends Wrk { return this.pool.size; } + get maxElementsPerWorker(): number { + return 1; + } + /** * * @return {Promise} * @public */ + // eslint-disable-next-line @typescript-eslint/no-empty-function public async start(): Promise { } /** * - * @return {Promise} + * @return {Promise} * @public */ public async addElement(elementData: WorkerData): Promise { @@ -41,6 +48,8 @@ export default class WorkerPool extends Wrk { worker.once('message', resolve); worker.once('error', reject); }); + // Start worker sequentially to optimize memory at startup + void Utils.sleep(Constants.START_WORKER_DELAY); }); } } diff --git a/src/worker/WorkerSet.ts b/src/worker/WorkerSet.ts new file mode 100644 index 00000000..a8aa8a37 --- /dev/null +++ b/src/worker/WorkerSet.ts @@ -0,0 +1,84 @@ +import Constants from '../utils/Constants'; +import Utils from '../utils/Utils'; +import { Worker } from 'worker_threads'; +import WorkerData from '../types/WorkerData'; +import Wrk from './Worker'; + +export default class WorkerSet extends Wrk { + public maxElementsPerWorker: number; + private workers: Set; + private lastWorkerNumberOfElements: number; + + /** + * Create a new `WorkerSet`. + * + * @param {string} workerScript + * @param {number} maxElementsPerWorker + */ + constructor(workerScript: string, maxElementsPerWorker = 1) { + super(workerScript); + this.workers = new Set(); + this.maxElementsPerWorker = maxElementsPerWorker; + this.lastWorkerNumberOfElements = 0; + } + + get size(): number { + return this.workers.size; + } + + /** + * + * @return {Promise} + * @public + */ + public async addElement(elementData: WorkerData): Promise { + if (!this.workers) { + throw Error('Cannot add a WorkerSet element: workers set does not exist'); + } + if (this.lastWorkerNumberOfElements >= this.maxElementsPerWorker) { + void this.startWorker(); + this.lastWorkerNumberOfElements = 0; + // Start worker sequentially to optimize memory at startup + void Utils.sleep(Constants.START_WORKER_DELAY); + } + this.getLastWorker().postMessage({ id: Constants.START_WORKER_ELEMENT, workerData: elementData }); + this.lastWorkerNumberOfElements++; + } + + /** + * + * @return {Promise} + * @public + */ + public async start(): Promise { + await this.startWorker(); + // Start worker sequentially to optimize memory at startup + await Utils.sleep(Constants.START_WORKER_DELAY); + } + + /** + * + * @return {Promise} + * @private + */ + private async startWorker() { + return new Promise((resolve, reject) => { + const worker = new Worker(this.workerScript); + worker.on('message', resolve); + worker.on('error', reject); + worker.on('exit', (code) => { + if (code !== 0) { + reject(new Error(`Worker stopped with exit code ${code}`)); + } + }); + this.workers.add(worker); + }); + } + + private getLastWorker(): Worker { + let worker: Worker; + // eslint-disable-next-line no-empty + for (worker of this.workers) { } + return worker; + } +} -- 2.34.1