From: Jérôme Benoit Date: Fri, 22 Mar 2024 23:47:13 +0000 (+0100) Subject: fix: untangle worker set message from application message X-Git-Tag: v1.3.1~11 X-Git-Url: https://git.piment-noir.org/?p=e-mobility-charging-stations-simulator.git;a=commitdiff_plain;h=65d2250205186eaab80febce407d1f7fdf830473 fix: untangle worker set message from application message Signed-off-by: Jérôme Benoit --- diff --git a/src/charging-station/Bootstrap.ts b/src/charging-station/Bootstrap.ts index 4f72d6df..c13eb1b1 100644 --- a/src/charging-station/Bootstrap.ts +++ b/src/charging-station/Bootstrap.ts @@ -18,7 +18,6 @@ import { type ChargingStationInfo, type ChargingStationOptions, type ChargingStationWorkerData, - type ChargingStationWorkerEventError, type ChargingStationWorkerMessage, type ChargingStationWorkerMessageData, ChargingStationWorkerMessageEvents, @@ -43,7 +42,7 @@ import { logger, logPrefix } from '../utils/index.js' -import { type WorkerAbstract, WorkerFactory } from '../worker/index.js' +import { DEFAULT_ELEMENTS_PER_WORKER, type WorkerAbstract, WorkerFactory } from '../worker/index.js' import { buildTemplateName, waitChargingStationEvents } from './Helpers.js' import type { AbstractUIServer } from './ui-server/AbstractUIServer.js' import { UIServerFactory } from './ui-server/UIServerFactory.js' @@ -167,15 +166,6 @@ export class Bootstrap extends EventEmitter { ChargingStationWorkerMessageEvents.performanceStatistics, this.workerEventPerformanceStatistics ) - this.on( - ChargingStationWorkerMessageEvents.workerElementError, - (eventError: ChargingStationWorkerEventError) => { - logger.error( - `${this.logPrefix()} ${moduleName}.start: Error occurred while handling '${eventError.event}' event on worker:`, - eventError - ) - } - ) // eslint-disable-next-line @typescript-eslint/unbound-method if (isAsyncFunction(this.workerImplementation?.start)) { await this.workerImplementation.start() @@ -340,12 +330,13 @@ export class Bootstrap extends EventEmitter { elementsPerWorker = this.numberOfConfiguredChargingStations break case 'auto': - default: elementsPerWorker = this.numberOfConfiguredChargingStations > availableParallelism() ? Math.round(this.numberOfConfiguredChargingStations / (availableParallelism() * 1.5)) : 1 break + default: + elementsPerWorker = workerConfiguration.elementsPerWorker ?? DEFAULT_ELEMENTS_PER_WORKER } this.workerImplementation = WorkerFactory.getWorkerImplementation< ChargingStationWorkerData, @@ -405,12 +396,6 @@ export class Bootstrap extends EventEmitter { case ChargingStationWorkerMessageEvents.performanceStatistics: this.emit(ChargingStationWorkerMessageEvents.performanceStatistics, msg.data) break - case ChargingStationWorkerMessageEvents.addedWorkerElement: - this.emit(ChargingStationWorkerMessageEvents.addWorkerElement, msg.data) - break - case ChargingStationWorkerMessageEvents.workerElementError: - this.emit(ChargingStationWorkerMessageEvents.workerElementError, msg.data) - break default: throw new BaseError( `Unknown charging station worker event: '${ diff --git a/src/charging-station/ChargingStationWorker.ts b/src/charging-station/ChargingStationWorker.ts index 042d3d5b..779c638a 100644 --- a/src/charging-station/ChargingStationWorker.ts +++ b/src/charging-station/ChargingStationWorker.ts @@ -5,14 +5,9 @@ import { parentPort } from 'node:worker_threads' import { ThreadWorker } from 'poolifier' import { BaseError } from '../exception/index.js' -import type { - ChargingStationInfo, - ChargingStationWorkerData, - ChargingStationWorkerEventError, - ChargingStationWorkerMessage -} from '../types/index.js' +import type { ChargingStationInfo, ChargingStationWorkerData } from '../types/index.js' import { Configuration } from '../utils/index.js' -import { type WorkerMessage, WorkerMessageEvents } from '../worker/index.js' +import { type WorkerDataError, type WorkerMessage, WorkerMessageEvents } from '../worker/index.js' import { ChargingStation } from './ChargingStation.js' export let chargingStationWorker: object @@ -21,43 +16,47 @@ if (Configuration.workerPoolInUse()) { ChargingStationWorkerData, ChargingStationInfo | undefined >((data?: ChargingStationWorkerData): ChargingStationInfo | undefined => { - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion, no-new - return new ChargingStation(data!.index, data!.templateFile, data!.options).stationInfo + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + const { index, templateFile, options } = data! + return new ChargingStation(index, templateFile, options).stationInfo }) } else { // eslint-disable-next-line @typescript-eslint/no-extraneous-class class ChargingStationWorker { constructor () { parentPort?.on('message', (message: WorkerMessage) => { - switch (message.event) { + const { uuid, event, data } = message + switch (event) { case WorkerMessageEvents.addWorkerElement: try { const chargingStation = new ChargingStation( - message.data.index, - message.data.templateFile, - message.data.options + data.index, + data.templateFile, + data.options ) parentPort?.postMessage({ + uuid, event: WorkerMessageEvents.addedWorkerElement, // eslint-disable-next-line @typescript-eslint/no-non-null-assertion data: chargingStation.stationInfo! - } satisfies ChargingStationWorkerMessage) + } satisfies WorkerMessage) } catch (error) { parentPort?.postMessage({ + uuid, event: WorkerMessageEvents.workerElementError, data: { - event: message.event, + event, name: (error as Error).name, message: (error as Error).message, stack: (error as Error).stack } - } satisfies ChargingStationWorkerMessage) + } satisfies WorkerMessage) } break default: throw new BaseError( - `Unknown worker event: '${message.event}' received with data: '${JSON.stringify( - message.data, + `Unknown worker event: '${event}' received with data: '${JSON.stringify( + data, undefined, 2 )}'` diff --git a/src/performance/storage/None.ts b/src/performance/storage/None.ts index af3e273e..aae7547f 100644 --- a/src/performance/storage/None.ts +++ b/src/performance/storage/None.ts @@ -13,7 +13,7 @@ export class None extends Storage { } public open (): void { - /** Intentionally empty */ + /** Intentionally empty */ } public close (): void { diff --git a/src/types/ChargingStationWorker.ts b/src/types/ChargingStationWorker.ts index c6fd0af2..336b50e8 100644 --- a/src/types/ChargingStationWorker.ts +++ b/src/types/ChargingStationWorker.ts @@ -1,6 +1,6 @@ import type { WebSocket } from 'ws' -import { type WorkerData, type WorkerMessage, WorkerMessageEvents } from '../worker/index.js' +import { type WorkerData, type WorkerMessage } from '../worker/index.js' import type { ChargingStationAutomaticTransactionGeneratorConfiguration } from './AutomaticTransactionGenerator.js' import { ChargingStationEvents } from './ChargingStationEvents.js' import type { ChargingStationInfo } from './ChargingStationInfo.js' @@ -52,32 +52,19 @@ enum ChargingStationMessageEvents { } export const ChargingStationWorkerMessageEvents = { - ...WorkerMessageEvents, ...ChargingStationEvents, ...ChargingStationMessageEvents } as const // eslint-disable-next-line @typescript-eslint/no-redeclare export type ChargingStationWorkerMessageEvents = - | WorkerMessageEvents | ChargingStationEvents | ChargingStationMessageEvents -export interface ChargingStationWorkerEventError extends WorkerData { - event: WorkerMessageEvents - name: string - message: string - stack?: string -} - -export type ChargingStationWorkerMessageData = - | ChargingStationInfo - | ChargingStationData - | Statistics - | ChargingStationWorkerEventError +export type ChargingStationWorkerMessageData = ChargingStationData | Statistics export type ChargingStationWorkerMessage = Omit< WorkerMessage, -'event' +'uuid' | 'event' > & { event: ChargingStationWorkerMessageEvents } diff --git a/src/types/index.ts b/src/types/index.ts index e24d7f54..868dd73d 100644 --- a/src/types/index.ts +++ b/src/types/index.ts @@ -27,7 +27,6 @@ export { type ChargingStationData, type ChargingStationOptions, type ChargingStationWorkerData, - type ChargingStationWorkerEventError, type ChargingStationWorkerMessage, type ChargingStationWorkerMessageData, ChargingStationWorkerMessageEvents, diff --git a/src/worker/WorkerSet.ts b/src/worker/WorkerSet.ts index 97414af7..7636a167 100644 --- a/src/worker/WorkerSet.ts +++ b/src/worker/WorkerSet.ts @@ -1,5 +1,6 @@ // Partial Copyright Jerome Benoit. 2021-2024. All Rights Reserved. +import { randomUUID } from 'node:crypto' import { EventEmitterAsyncResource } from 'node:events' import { SHARE_ENV, Worker } from 'node:worker_threads' @@ -16,9 +17,20 @@ import { } from './WorkerTypes.js' import { randomizeDelay, sleep } from './WorkerUtils.js' +interface ResponseWrapper { + resolve: (value: R | PromiseLike) => void + reject: (reason?: unknown) => void + workerSetElement: WorkerSetElement +} + export class WorkerSet extends WorkerAbstract { public readonly emitter: EventEmitterAsyncResource | undefined private readonly workerSet: Set + private readonly promiseResponseMap: Map< + `${string}-${string}-${string}-${string}`, + ResponseWrapper + > + private started: boolean private workerStartup: boolean @@ -40,6 +52,10 @@ export class WorkerSet extends Worke throw new RangeError('Elements per worker must be greater than zero') } this.workerSet = new Set() + this.promiseResponseMap = new Map< + `${string}-${string}-${string}-${string}`, + ResponseWrapper + >() if (this.workerOptions.poolOptions?.enableEvents === true) { this.emitter = new EventEmitterAsyncResource({ name: 'workerset' }) } @@ -107,25 +123,16 @@ export class WorkerSet extends Worke throw new Error('Cannot add a WorkerSet element: not started') } const workerSetElement = await this.getWorkerSetElement() - const waitAddedWorkerElement = new Promise((resolve, reject) => { - const messageHandler = (message: WorkerMessage): void => { - if (message.event === WorkerMessageEvents.addedWorkerElement) { - ++workerSetElement.numberOfWorkerElements - resolve(message.data) - workerSetElement.worker.off('message', messageHandler) - } else if (message.event === WorkerMessageEvents.workerElementError) { - // eslint-disable-next-line @typescript-eslint/prefer-promise-reject-errors - reject(message.data) - workerSetElement.worker.off('message', messageHandler) - } - } - workerSetElement.worker.on('message', messageHandler) - }) - workerSetElement.worker.postMessage({ - event: WorkerMessageEvents.addWorkerElement, - data: elementData + const sendMessageToWorker = new Promise((resolve, reject) => { + const message = { + uuid: randomUUID(), + event: WorkerMessageEvents.addWorkerElement, + data: elementData + } satisfies WorkerMessage + workerSetElement.worker.postMessage(message) + this.promiseResponseMap.set(message.uuid, { resolve, reject, workerSetElement }) }) - const response = await waitAddedWorkerElement + const response = await sendMessageToWorker // Add element sequentially to optimize memory at startup // eslint-disable-next-line @typescript-eslint/no-non-null-assertion if (this.workerOptions.elementAddDelay! > 0) { @@ -146,10 +153,18 @@ export class WorkerSet extends Worke }) worker.on('message', this.workerOptions.poolOptions?.messageHandler ?? EMPTY_FUNCTION) worker.on('message', (message: WorkerMessage) => { - if (message.event === WorkerMessageEvents.addedWorkerElement) { - this.emitter?.emit(WorkerSetEvents.elementAdded, this.info) - } else if (message.event === WorkerMessageEvents.workerElementError) { - this.emitter?.emit(WorkerSetEvents.elementError, message.data) + if (this.promiseResponseMap.has(message.uuid)) { + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + const { resolve, reject, workerSetElement } = this.promiseResponseMap.get(message.uuid)! + if (message.event === WorkerMessageEvents.addedWorkerElement) { + this.emitter?.emit(WorkerSetEvents.elementAdded, this.info) + workerSetElement.numberOfWorkerElements++ + resolve(message.data) + } else if (message.event === WorkerMessageEvents.workerElementError) { + this.emitter?.emit(WorkerSetEvents.elementError, message.data) + reject(message.data) + } + this.promiseResponseMap.delete(message.uuid) } }) worker.on('error', this.workerOptions.poolOptions?.errorHandler ?? EMPTY_FUNCTION) diff --git a/src/worker/WorkerTypes.ts b/src/worker/WorkerTypes.ts index ee69c191..e1e89a7c 100644 --- a/src/worker/WorkerTypes.ts +++ b/src/worker/WorkerTypes.ts @@ -51,6 +51,7 @@ export interface WorkerSetElement { } export interface WorkerMessage { + uuid: `${string}-${string}-${string}-${string}` event: WorkerMessageEvents data: T } @@ -60,3 +61,10 @@ export enum WorkerMessageEvents { addedWorkerElement = 'addedWorkerElement', workerElementError = 'workerElementError' } + +export interface WorkerDataError extends WorkerData { + event: WorkerMessageEvents + name: string + message: string + stack?: string +} diff --git a/src/worker/index.ts b/src/worker/index.ts index 649e0c62..190a6a41 100644 --- a/src/worker/index.ts +++ b/src/worker/index.ts @@ -1,6 +1,7 @@ export type { WorkerAbstract } from './WorkerAbstract.js' export { DEFAULT_ELEMENT_ADD_DELAY, + DEFAULT_ELEMENTS_PER_WORKER, DEFAULT_POOL_MAX_SIZE, DEFAULT_POOL_MIN_SIZE, DEFAULT_WORKER_START_DELAY @@ -8,6 +9,7 @@ export { export { WorkerFactory } from './WorkerFactory.js' export { type WorkerData, + type WorkerDataError, WorkerEvents, type WorkerMessage, WorkerMessageEvents,