From: Jérôme Benoit Date: Mon, 25 Jan 2021 20:30:29 +0000 (+0100) Subject: Switch to poolifier worker threads pool implementation. X-Git-Tag: v1.0.1-0~112 X-Git-Url: https://git.piment-noir.org/?a=commitdiff_plain;h=a4624c96a6c159b4885f5d0baaf592ceec0bab30;p=e-mobility-charging-stations-simulator.git Switch to poolifier worker threads pool implementation. Signed-off-by: Jérôme Benoit --- diff --git a/docker/config.json b/docker/config.json index 84148c20..54a9f31d 100644 --- a/docker/config.json +++ b/docker/config.json @@ -4,8 +4,9 @@ ], "distributeStationsToTenantsEqually": true, "statisticsDisplayInterval": 60, - "useWorkerPool": false, - "workerMaxPoolSize": 16, + "workerProcess": "workerSet", + "workerPoolMinSize": 4, + "workerPoolMaxSize": 16, "chargingStationsPerWorker": 1, "stationTemplateURLs": [ { diff --git a/package-lock.json b/package-lock.json index 6d0fc1e5..db9117b0 100644 --- a/package-lock.json +++ b/package-lock.json @@ -712,15 +712,6 @@ "integrity": "sha512-PACt1xdErJbMUOUweSrbVM7gSIYm1vTncW2hF6Os/EeWi6TXYAYMPp+8v6rzHmypE5gHrxaxZNXgMkJVIdZpHw==", "dev": true }, - "@types/worker-threads-pool": { - "version": "2.0.0", - "resolved": "https://registry.npmjs.org/@types/worker-threads-pool/-/worker-threads-pool-2.0.0.tgz", - "integrity": "sha512-VnFtC73JBhQRtamCR4edJWxyZrwvwz6y/Ov4VNQLiirpRUkND7xo0iUNXEZP2mvZuNbvFeXGglTAFZwCDdWMTg==", - "dev": true, - "requires": { - "@types/node": "*" - } - }, "@types/ws": { "version": "7.4.0", "resolved": "https://registry.npmjs.org/@types/ws/-/ws-7.4.0.tgz", @@ -939,14 +930,6 @@ "integrity": "sha512-OPdCF6GsMIP+Az+aWfAAOEt2/+iVDKE7oy6lJ098aoe59oAmK76qV6Gw60SbZ8jHuG2wH058GF4pLFbYamYrVA==", "dev": true }, - "after-all": { - "version": "2.0.2", - "resolved": "https://registry.npmjs.org/after-all/-/after-all-2.0.2.tgz", - "integrity": "sha1-IDACmO1glLTIXJjnyK1NymKPn3M=", - "requires": { - "once": "^1.3.0" - } - }, "ajv": { "version": "6.12.2", "resolved": "https://registry.npmjs.org/ajv/-/ajv-6.12.2.tgz", @@ -8086,6 +8069,7 @@ "version": "1.4.0", "resolved": "https://registry.npmjs.org/once/-/once-1.4.0.tgz", "integrity": "sha1-WDsap3WWHUsROsF9nFC6753Xa9E=", + "dev": true, "requires": { "wrappy": "1" } @@ -8667,6 +8651,11 @@ "semver-compare": "^1.0.0" } }, + "poolifier": { + "version": "1.2.1", + "resolved": "https://registry.npmjs.org/poolifier/-/poolifier-1.2.1.tgz", + "integrity": "sha512-kUH3JlLLO7JdAnRdtbgaSME5WDxgDzAuUk9+hapVHfXeI0VjpeuLnxLL8cUF7lEgrUE4m59scr5TFx5ajbPqXQ==" + }, "posix-character-classes": { "version": "0.1.1", "resolved": "https://registry.npmjs.org/posix-character-classes/-/posix-character-classes-0.1.1.tgz", @@ -11522,14 +11511,6 @@ "integrity": "sha512-Hz/mrNwitNRh/HUAtM/VT/5VH+ygD6DV7mYKZAtHOrbs8U7lvPS6xf7EJKMF0uW1KJCl0H701g3ZGus+muE5vQ==", "dev": true }, - "worker-threads-pool": { - "version": "2.0.0", - "resolved": "https://registry.npmjs.org/worker-threads-pool/-/worker-threads-pool-2.0.0.tgz", - "integrity": "sha512-5dtGbEucee6o5/kQgpyKIUoHGWf8488DP3ihZDJzDIVvH4V+NA6HdBl/I5ckI4yN1NwM68pdZDbrwac1M95mEA==", - "requires": { - "after-all": "^2.0.2" - } - }, "wrap-ansi": { "version": "5.1.0", "resolved": "https://registry.npmjs.org/wrap-ansi/-/wrap-ansi-5.1.0.tgz", @@ -11572,7 +11553,8 @@ "wrappy": { "version": "1.0.2", "resolved": "https://registry.npmjs.org/wrappy/-/wrappy-1.0.2.tgz", - "integrity": "sha1-tSQ9jz7BqjXxNkYFvA0QNuMKtp8=" + "integrity": "sha1-tSQ9jz7BqjXxNkYFvA0QNuMKtp8=", + "dev": true }, "write-file-atomic": { "version": "2.4.2", diff --git a/package.json b/package.json index 36cea67b..1e39e321 100644 --- a/package.json +++ b/package.json @@ -48,12 +48,12 @@ }, "dependencies": { "mongodb": "^3.6.3", + "poolifier": "^1.2.1", "source-map-support": "^0.5.19", "tslib": "^2.1.0", "uuid": "^8.3.2", "winston": "^3.3.3", "winston-daily-rotate-file": "^4.5.0", - "worker-threads-pool": "^2.0.0", "ws": "^7.4.2" }, "optionalDependencies": { @@ -63,7 +63,6 @@ "devDependencies": { "@types/node": "^14.14.22", "@types/uuid": "^8.3.0", - "@types/worker-threads-pool": "^2.0.0", "@types/ws": "^7.4.0", "@typescript-eslint/eslint-plugin": "^4.14.0", "@typescript-eslint/parser": "^4.14.0", diff --git a/src/assets/config-template.json b/src/assets/config-template.json index 9e52c6cf..a7eef490 100644 --- a/src/assets/config-template.json +++ b/src/assets/config-template.json @@ -5,7 +5,8 @@ "distributeStationsToTenantsEqually": true, "statisticsDisplayInterval": 60, "chargingStationsPerWorker": 1, - "useWorkerPool": false, + "workerProcess": "workerSet", + "workerPoolMinSize": 4, "workerPoolMaxSize": 16, "stationTemplateURLs": [ { diff --git a/src/charging-station/ChargingStation.ts b/src/charging-station/ChargingStation.ts index 2456defb..e419d81e 100644 --- a/src/charging-station/ChargingStation.ts +++ b/src/charging-station/ChargingStation.ts @@ -1347,7 +1347,7 @@ export default class ChargingStation { await this.sendStatusNotification(transactionConnectorID, ChargePointStatus.PREPARING); if (commandPayload.chargingProfile && commandPayload.chargingProfile.chargingProfilePurpose === ChargingProfilePurposeType.TX_PROFILE) { this._setChargingProfile(transactionConnectorID, commandPayload.chargingProfile); - } else { + } else if (commandPayload.chargingProfile && commandPayload.chargingProfile.chargingProfilePurpose !== ChargingProfilePurposeType.TX_PROFILE) { return Constants.OCPP_RESPONSE_REJECTED; } // Authorization successful start transaction @@ -1361,7 +1361,7 @@ export default class ChargingStation { await this.sendStatusNotification(transactionConnectorID, ChargePointStatus.PREPARING); if (commandPayload.chargingProfile && commandPayload.chargingProfile.chargingProfilePurpose === ChargingProfilePurposeType.TX_PROFILE) { this._setChargingProfile(transactionConnectorID, commandPayload.chargingProfile); - } else { + } else if (commandPayload.chargingProfile && commandPayload.chargingProfile.chargingProfilePurpose !== ChargingProfilePurposeType.TX_PROFILE) { return Constants.OCPP_RESPONSE_REJECTED; } // No local authorization check required => start transaction diff --git a/src/charging-station/StationWorker.ts b/src/charging-station/StationWorker.ts index b22d8932..8ddffc4e 100644 --- a/src/charging-station/StationWorker.ts +++ b/src/charging-station/StationWorker.ts @@ -1,8 +1,10 @@ +import { WorkerData, WorkerEvents } from '../types/Worker'; import { isMainThread, parentPort, workerData } from 'worker_threads'; import ChargingStation from './ChargingStation'; +import Constants from '../utils/Constants'; +import { ThreadWorker } from 'poolifier'; import Utils from '../utils/Utils'; -import { WorkerEvents } from '../types/Worker'; if (!isMainThread) { // Add listener to start charging station from main thread @@ -20,7 +22,9 @@ function addListener() { }); } -function startChargingStation(data: any) { - const station = new ChargingStation(data.index as number, data.templateFile as string); +function startChargingStation(data: WorkerData) { + const station = new ChargingStation(data.index , data.templateFile); station.start(); } + +export default new ThreadWorker(startChargingStation, { maxInactiveTime: Constants.WORKER_POOL_MAX_INACTIVE_TIME, async: false }); diff --git a/src/start.ts b/src/start.ts index 7f582222..c0eab941 100644 --- a/src/start.ts +++ b/src/start.ts @@ -1,14 +1,15 @@ import Configuration from './utils/Configuration'; +import Utils from './utils/Utils'; import { WorkerData } from './types/Worker'; import WorkerFactory from './worker/WorkerFactory'; import Wrk from './worker/Wrk'; class Bootstrap { - static start() { + static async start() { try { let numStationsTotal = 0; const workerImplementation: Wrk = WorkerFactory.getWorkerImpl('./dist/charging-station/StationWorker.js'); - void workerImplementation.start(); + await workerImplementation.start(); // Start ChargingStation object in worker thread if (Configuration.getStationTemplateURLs()) { for (const stationURL of Configuration.getStationTemplateURLs()) { @@ -19,7 +20,7 @@ class Bootstrap { index, templateFile: stationURL.file }; - void workerImplementation.addElement(workerData); + await workerImplementation.addElement(workerData); numStationsTotal++; } } catch (error) { @@ -33,7 +34,7 @@ class Bootstrap { if (numStationsTotal === 0) { console.log('No charging station template enabled in configuration, exiting'); } else { - console.log(`Charging station simulator started with ${numStationsTotal.toString()} charging station(s) and ${workerImplementation.size}${Configuration.useWorkerPool() ? `/${Configuration.getWorkerPoolMaxSize().toString()}` : ''} worker(s) concurrently running (${workerImplementation.maxElementsPerWorker} charging station(s) per worker)`); + console.log(`Charging station simulator started with ${numStationsTotal.toString()} charging station(s) and ${workerImplementation.size}${Utils.workerPoolInUse() ? `/${Configuration.getWorkerPoolMaxSize().toString()}` : ''} worker(s) concurrently running (${workerImplementation.maxElementsPerWorker} charging station(s) per worker)`); } } catch (error) { // eslint-disable-next-line no-console @@ -42,4 +43,8 @@ class Bootstrap { } } -Bootstrap.start(); +Bootstrap.start().catch( + (error) => { + console.error(error); + } +); diff --git a/src/types/ConfigurationData.ts b/src/types/ConfigurationData.ts index 2c15a6f5..125ecef6 100644 --- a/src/types/ConfigurationData.ts +++ b/src/types/ConfigurationData.ts @@ -1,3 +1,5 @@ +import { WorkerProcessType } from './Worker'; + export interface StationTemplateURL { file: string; numberOfStations: number; @@ -10,7 +12,8 @@ export default interface ConfigurationData { connectionTimeout?: number; autoReconnectMaxRetries?: number; distributeStationsToTenantsEqually?: boolean; - useWorkerPool?: boolean; + workerProcess?: WorkerProcessType; + workerPoolMinSize?: number; workerPoolMaxSize?: number; chargingStationsPerWorker?: number; logFormat?: string; diff --git a/src/types/Worker.ts b/src/types/Worker.ts index bc93d76e..20c44882 100644 --- a/src/types/Worker.ts +++ b/src/types/Worker.ts @@ -1,5 +1,11 @@ import { Worker } from 'worker_threads'; +export enum WorkerProcessType { + WORKER_SET = 'workerSet', + DYNAMIC_POOL = 'dynamicPool', + STATIC_POOL = 'staticPool' +} + // FIXME: make it more generic export interface WorkerData { index: number; diff --git a/src/utils/Configuration.ts b/src/utils/Configuration.ts index 8770a9b9..e27ec127 100644 --- a/src/utils/Configuration.ts +++ b/src/utils/Configuration.ts @@ -1,5 +1,6 @@ import ConfigurationData, { StationTemplateURL } from '../types/ConfigurationData'; +import { WorkerProcessType } from '../types/Worker'; import fs from 'fs'; export default class Configuration { @@ -37,13 +38,18 @@ export default class Configuration { return Configuration.getConfig().stationTemplateURLs; } - static useWorkerPool(): boolean { - return Configuration.getConfig().useWorkerPool; + static getWorkerProcess(): WorkerProcessType { + Configuration.deprecateConfigurationKey('useWorkerPool;', 'Use \'workerProcess\' to define the type of worker process to use instead'); + return Configuration.objectHasOwnProperty(Configuration.getConfig(), 'workerProcess') ? Configuration.getConfig().workerProcess : WorkerProcessType.WORKER_SET; + } + + static getWorkerPoolMinSize(): number { + return Configuration.objectHasOwnProperty(Configuration.getConfig(), 'workerPoolMinSize') ? Configuration.getConfig().workerPoolMinSize : 4; } static getWorkerPoolMaxSize(): number { Configuration.deprecateConfigurationKey('workerPoolSize;', 'Use \'workerPoolMaxSize\' instead'); - return Configuration.useWorkerPool() && Configuration.objectHasOwnProperty(Configuration.getConfig(), 'workerPoolMaxSize') ? Configuration.getConfig().workerPoolMaxSize : 16; + return Configuration.objectHasOwnProperty(Configuration.getConfig(), 'workerPoolMaxSize') ? Configuration.getConfig().workerPoolMaxSize : 16; } static getChargingStationsPerWorker(): number { diff --git a/src/utils/Constants.ts b/src/utils/Constants.ts index c36ade65..35a77a69 100644 --- a/src/utils/Constants.ts +++ b/src/utils/Constants.ts @@ -38,4 +38,5 @@ export default class Constants { static readonly TRANSACTION_DEFAULT_TAGID = '00000000'; static readonly START_WORKER_DELAY = 500; + static readonly WORKER_POOL_MAX_INACTIVE_TIME = 60000; } diff --git a/src/utils/Utils.ts b/src/utils/Utils.ts index 16cb7fa3..8ea05765 100644 --- a/src/utils/Utils.ts +++ b/src/utils/Utils.ts @@ -1,4 +1,6 @@ +import Configuration from './Configuration'; import { WebSocketCloseEventStatusString } from '../types/WebSocket'; +import { WorkerProcessType } from '../types/Worker'; import { v4 as uuid } from 'uuid'; export default class Utils { @@ -209,4 +211,8 @@ export default class Utils { } return '(Unknown)'; } + + static workerPoolInUse(): boolean { + return Configuration.getWorkerProcess() === WorkerProcessType.DYNAMIC_POOL || Configuration.getWorkerProcess() === WorkerProcessType.STATIC_POOL; + } } diff --git a/src/worker/WorkerDynamicPool.ts b/src/worker/WorkerDynamicPool.ts new file mode 100644 index 00000000..9567768c --- /dev/null +++ b/src/worker/WorkerDynamicPool.ts @@ -0,0 +1,71 @@ +import { DynamicThreadPool, DynamicThreadPoolOptions } from 'poolifier'; + +import Constants from '../utils/Constants'; +import Utils from '../utils/Utils'; +import { WorkerData } from '../types/Worker'; +import Wrk from './Wrk'; +import { threadId } from 'worker_threads'; + +export default class WorkerDynamicPool extends Wrk { + private pool: DynamicPool; + + /** + * Create a new `WorkerDynamicPool`. + * + * @param {string} workerScript + */ + constructor(workerScript: string, min: number, max: number,) { + super(workerScript); + this.pool = DynamicPool.getInstance(min, max, this.workerScript); + } + + get size(): number { + return this.pool.workers.length; + } + + get maxElementsPerWorker(): number { + return 1; + } + + /** + * + * @return {Promise} + * @public + */ + // eslint-disable-next-line @typescript-eslint/no-empty-function + public async start(): Promise { } + + /** + * + * @return {Promise} + * @public + */ + public async addElement(elementData: WorkerData): Promise { + await this.pool.execute(elementData); + // Start worker sequentially to optimize memory at startup + await Utils.sleep(Constants.START_WORKER_DELAY); + } +} + +class DynamicPool extends DynamicThreadPool { + private static instance: DynamicPool; + + private constructor(min: number, max: number, filename: string, opts?: DynamicThreadPoolOptions) { + super(min, max, filename, opts); + } + + public static getInstance(min: number, max: number, filename: string): DynamicPool { + if (!DynamicPool.instance) { + DynamicPool.instance = new DynamicPool(min, max, filename, + { + exitHandler: (code) => { + if (code !== 0) { + console.error(`Worker ${threadId} stopped with exit code ${code}`); + } + } + } + ); + } + return DynamicPool.instance; + } +} diff --git a/src/worker/WorkerFactory.ts b/src/worker/WorkerFactory.ts index 067188d8..2d923daa 100644 --- a/src/worker/WorkerFactory.ts +++ b/src/worker/WorkerFactory.ts @@ -1,13 +1,21 @@ import Configuration from '../utils/Configuration'; -import WorkerPool from './WorkerPool'; +import WorkerDynamicPool from './WorkerDynamicPool'; +import { WorkerProcessType } from '../types/Worker'; import WorkerSet from './WorkerSet'; +import WorkerStaticPool from './WorkerStaticPool'; import Wrk from './Wrk'; export default class WorkerFactory { public static getWorkerImpl(workerScript: string): Wrk { - if (Configuration.useWorkerPool()) { - return new WorkerPool(workerScript); + switch (Configuration.getWorkerProcess()) { + case WorkerProcessType.WORKER_SET: + return new WorkerSet(workerScript, Configuration.getChargingStationsPerWorker()); + case WorkerProcessType.STATIC_POOL: + return new WorkerStaticPool(workerScript, Configuration.getWorkerPoolMaxSize()); + case WorkerProcessType.DYNAMIC_POOL: + return new WorkerDynamicPool(workerScript, Configuration.getWorkerPoolMinSize(), Configuration.getWorkerPoolMaxSize()); + default: + return null; } - return new WorkerSet(workerScript, Configuration.getChargingStationsPerWorker()); } } diff --git a/src/worker/WorkerPool.ts b/src/worker/WorkerPool.ts deleted file mode 100644 index 5b9415c8..00000000 --- a/src/worker/WorkerPool.ts +++ /dev/null @@ -1,68 +0,0 @@ -import Configuration from '../utils/Configuration'; -import Constants from '../utils/Constants'; -import Pool from 'worker-threads-pool'; -import Utils from '../utils/Utils'; -import { WorkerData } from '../types/Worker'; -import Wrk from './Wrk'; - -export default class WorkerPool extends Wrk { - private pool: Pool; - - /** - * Create a new `WorkerPool`. - * - * @param {string} workerScript - */ - constructor(workerScript: string) { - super(workerScript); - this.pool = UniquePool.getInstance(); - } - - get size(): number { - return this.pool.size; - } - - get maxElementsPerWorker(): number { - return 1; - } - - /** - * - * @return {Promise} - * @public - */ - // eslint-disable-next-line @typescript-eslint/no-empty-function - public async start(): Promise { } - - /** - * - * @return {Promise} - * @public - */ - public async addElement(elementData: WorkerData): Promise { - return new Promise((resolve, reject) => { - this.pool.acquire(this.workerScript, { workerData: elementData }, (err, worker) => { - if (err) { - return reject(err); - } - worker.once('message', resolve); - worker.once('error', reject); - }); - // Start worker sequentially to optimize memory at startup - void Utils.sleep(Constants.START_WORKER_DELAY); - }); - } -} - -class UniquePool { - private static instance: Pool; - - private constructor() { } - - public static getInstance(): Pool { - if (!UniquePool.instance) { - UniquePool.instance = new Pool({ max: Configuration.getWorkerPoolMaxSize() }); - } - return UniquePool.instance; - } -} diff --git a/src/worker/WorkerSet.ts b/src/worker/WorkerSet.ts index 50d099d8..2151eff9 100644 --- a/src/worker/WorkerSet.ts +++ b/src/worker/WorkerSet.ts @@ -1,8 +1,8 @@ +import { Worker, threadId } from 'worker_threads'; import { WorkerData, WorkerEvents, WorkerSetElement } from '../types/Worker'; import Constants from '../utils/Constants'; import Utils from '../utils/Utils'; -import { Worker } from 'worker_threads'; import Wrk from './Wrk'; export default class WorkerSet extends Wrk { @@ -32,12 +32,12 @@ export default class WorkerSet extends Wrk { */ public async addElement(elementData: WorkerData): Promise { if (!this.workers) { - throw Error('Cannot add a WorkerSet element: workers set does not exist'); + throw Error('Cannot add a WorkerSet element: workers\' set does not exist'); } if (this.getLastWorkerSetElement().numberOfWorkerElements >= this.maxElementsPerWorker) { - void this.startWorker(); + this.startWorker(); // Start worker sequentially to optimize memory at startup - void Utils.sleep(Constants.START_WORKER_DELAY); + await Utils.sleep(Constants.START_WORKER_DELAY); } this.getLastWorker().postMessage({ id: WorkerEvents.START_WORKER_ELEMENT, workerData: elementData }); this.getLastWorkerSetElement().numberOfWorkerElements++; @@ -49,7 +49,7 @@ export default class WorkerSet extends Wrk { * @public */ public async start(): Promise { - await this.startWorker(); + this.startWorker(); // Start worker sequentially to optimize memory at startup await Utils.sleep(Constants.START_WORKER_DELAY); } @@ -59,18 +59,16 @@ export default class WorkerSet extends Wrk { * @return {Promise} * @private */ - private async startWorker() { - return new Promise((resolve, reject) => { - const worker = new Worker(this.workerScript); - worker.on('message', resolve); - worker.on('error', reject); - worker.on('exit', (code) => { - if (code !== 0) { - reject(new Error(`Worker stopped with exit code ${code}`)); - } - }); - this.workers.add({ worker, numberOfWorkerElements: 0 }); + private startWorker(): void { + const worker = new Worker(this.workerScript); + worker.on('message', () => { }); + worker.on('error', () => { }); + worker.on('exit', (code) => { + if (code !== 0) { + console.error(`Worker ${threadId} stopped with exit code ${code}`); + } }); + this.workers.add({ worker, numberOfWorkerElements: 0 }); } private getLastWorkerSetElement(): WorkerSetElement { diff --git a/src/worker/WorkerStaticPool.ts b/src/worker/WorkerStaticPool.ts new file mode 100644 index 00000000..20d53b0e --- /dev/null +++ b/src/worker/WorkerStaticPool.ts @@ -0,0 +1,71 @@ +import { FixedThreadPool, FixedThreadPoolOptions } from 'poolifier'; + +import Constants from '../utils/Constants'; +import Utils from '../utils/Utils'; +import { WorkerData } from '../types/Worker'; +import Wrk from './Wrk'; +import { threadId } from 'worker_threads'; + +export default class WorkerStaticPool extends Wrk { + private pool: StaticPool; + + /** + * Create a new `WorkerStaticPool`. + * + * @param {string} workerScript + */ + constructor(workerScript: string, numThreads: number) { + super(workerScript); + this.pool = StaticPool.getInstance(numThreads, this.workerScript); + } + + get size(): number { + return this.pool.workers.length; + } + + get maxElementsPerWorker(): number { + return 1; + } + + /** + * + * @return {Promise} + * @public + */ + // eslint-disable-next-line @typescript-eslint/no-empty-function + public async start(): Promise { } + + /** + * + * @return {Promise} + * @public + */ + public async addElement(elementData: WorkerData): Promise { + await this.pool.execute(elementData); + // Start worker sequentially to optimize memory at startup + await Utils.sleep(Constants.START_WORKER_DELAY); + } +} + +class StaticPool extends FixedThreadPool { + private static instance: StaticPool; + + private constructor(numThreads: number, workerScript: string, opts?: FixedThreadPoolOptions) { + super(numThreads, workerScript, opts); + } + + public static getInstance(numThreads: number, workerScript: string): StaticPool { + if (!StaticPool.instance) { + StaticPool.instance = new StaticPool(numThreads, workerScript, + { + exitHandler: (code) => { + if (code !== 0) { + console.error(`Worker ${threadId} stopped with exit code ${code}`); + } + } + } + ); + } + return StaticPool.instance; + } +}