From d070d967782492d71c5716d2560177531f826f53 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Fri, 4 Mar 2022 17:09:32 +0100 Subject: [PATCH] Fix workerSet startup MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- src/charging-station/ChargingStationWorker.ts | 3 +-- src/types/ChargingStationWorker.ts | 7 +++++ src/types/Worker.ts | 1 - src/worker/WorkerFactory.ts | 26 +++++++++---------- src/worker/WorkerSet.ts | 9 ++++++- 5 files changed, 29 insertions(+), 17 deletions(-) diff --git a/src/charging-station/ChargingStationWorker.ts b/src/charging-station/ChargingStationWorker.ts index 5ad5f664..cc0f6272 100644 --- a/src/charging-station/ChargingStationWorker.ts +++ b/src/charging-station/ChargingStationWorker.ts @@ -24,10 +24,9 @@ if (Utils.workerPoolInUse()) { * Listen messages send by the main thread */ function addMessageListener(): void { - parentPort?.on('message', async (message: ChargingStationWorkerMessage) => { + parentPort?.on('message', (message: ChargingStationWorkerMessage) => { if (message.id === ChargingStationWorkerMessageEvents.START_WORKER_ELEMENT) { startChargingStation(message.data); - message.workerOptions?.elementStartDelay > 0 && await Utils.sleep(this.workerOptions.elementStartDelay); } }); } diff --git a/src/types/ChargingStationWorker.ts b/src/types/ChargingStationWorker.ts index 8bdc9af0..155a353a 100644 --- a/src/types/ChargingStationWorker.ts +++ b/src/types/ChargingStationWorker.ts @@ -1,8 +1,15 @@ import { WorkerData, WorkerMessage, WorkerMessageEvents } from './Worker'; +import { JsonType } from './JsonType'; + +export interface ChargingStationWorkerOptions extends JsonType { + elementStartDelay?: number; +} + export interface ChargingStationWorkerData extends WorkerData { index: number; templateFile: string; + chargingStationWorkerOptions?: ChargingStationWorkerOptions; } enum InternalChargingStationWorkerMessageEvents { diff --git a/src/types/Worker.ts b/src/types/Worker.ts index 633fb7a6..aca1922e 100644 --- a/src/types/Worker.ts +++ b/src/types/Worker.ts @@ -27,7 +27,6 @@ export interface WorkerSetElement { export interface WorkerMessage { id: WorkerMessageEvents; - workerOptions?: WorkerOptions; data: T; } diff --git a/src/worker/WorkerFactory.ts b/src/worker/WorkerFactory.ts index 3a2d3fa7..debd3f6a 100644 --- a/src/worker/WorkerFactory.ts +++ b/src/worker/WorkerFactory.ts @@ -13,29 +13,29 @@ export default class WorkerFactory { // This is intentional } - public static getWorkerImplementation(workerScript: string, workerProcessType: WorkerProcessType, options?: WorkerOptions): WorkerAbstract | null { + public static getWorkerImplementation(workerScript: string, workerProcessType: WorkerProcessType, workerOptions?: WorkerOptions): WorkerAbstract | null { if (!isMainThread) { throw new Error('Trying to get a worker implementation outside the main thread'); } - options = options ?? {} as WorkerOptions; - options.workerStartDelay = options?.workerStartDelay ?? Constants.WORKER_START_DELAY; - options.elementStartDelay = options?.elementStartDelay ?? Constants.ELEMENT_START_DELAY; - options.poolOptions = options?.poolOptions ?? {} as PoolOptions; - options?.messageHandler && (options.poolOptions.messageHandler = options.messageHandler); + workerOptions = workerOptions ?? {} as WorkerOptions; + workerOptions.workerStartDelay = workerOptions?.workerStartDelay ?? Constants.WORKER_START_DELAY; + workerOptions.elementStartDelay = workerOptions?.elementStartDelay ?? Constants.ELEMENT_START_DELAY; + workerOptions.poolOptions = workerOptions?.poolOptions ?? {} as PoolOptions; + workerOptions?.messageHandler && (workerOptions.poolOptions.messageHandler = workerOptions.messageHandler); let workerImplementation: WorkerAbstract = null; switch (workerProcessType) { case WorkerProcessType.WORKER_SET: - options.elementsPerWorker = options?.elementsPerWorker ?? Constants.DEFAULT_CHARGING_STATIONS_PER_WORKER; - workerImplementation = new WorkerSet(workerScript, options); + workerOptions.elementsPerWorker = workerOptions?.elementsPerWorker ?? Constants.DEFAULT_CHARGING_STATIONS_PER_WORKER; + workerImplementation = new WorkerSet(workerScript, workerOptions); break; case WorkerProcessType.STATIC_POOL: - options.poolMaxSize = options?.poolMaxSize ?? Constants.DEFAULT_WORKER_POOL_MAX_SIZE; - workerImplementation = new WorkerStaticPool(workerScript, options); + workerOptions.poolMaxSize = workerOptions?.poolMaxSize ?? Constants.DEFAULT_WORKER_POOL_MAX_SIZE; + workerImplementation = new WorkerStaticPool(workerScript, workerOptions); break; case WorkerProcessType.DYNAMIC_POOL: - options.poolMinSize = options?.poolMinSize ?? Constants.DEFAULT_WORKER_POOL_MIN_SIZE; - options.poolMaxSize = options?.poolMaxSize ?? Constants.DEFAULT_WORKER_POOL_MAX_SIZE; - workerImplementation = new WorkerDynamicPool(workerScript, options); + workerOptions.poolMinSize = workerOptions?.poolMinSize ?? Constants.DEFAULT_WORKER_POOL_MIN_SIZE; + workerOptions.poolMaxSize = workerOptions?.poolMaxSize ?? Constants.DEFAULT_WORKER_POOL_MAX_SIZE; + workerImplementation = new WorkerDynamicPool(workerScript, workerOptions); break; default: throw new Error(`Worker implementation type '${workerProcessType}' not found`); diff --git a/src/worker/WorkerSet.ts b/src/worker/WorkerSet.ts index b62f63df..5bc39918 100644 --- a/src/worker/WorkerSet.ts +++ b/src/worker/WorkerSet.ts @@ -44,8 +44,15 @@ export default class WorkerSet extends WorkerAbstract { if (this.getLastWorkerSetElement().numberOfWorkerElements >= this.workerOptions.elementsPerWorker) { await this.startWorker(); } - this.getLastWorker().postMessage({ id: WorkerMessageEvents.START_WORKER_ELEMENT, workerOptions: this.workerOptions, data: elementData }); + this.getLastWorker().postMessage({ + id: WorkerMessageEvents.START_WORKER_ELEMENT, + data: elementData + }); this.getLastWorkerSetElement().numberOfWorkerElements++; + // Start element sequentially to optimize memory at startup + if (this.workerOptions.elementStartDelay > 0) { + await Utils.sleep(this.workerOptions.elementStartDelay); + } } /** -- 2.34.1