type ChargingStationInfo,
type ChargingStationOptions,
type ChargingStationWorkerData,
- type ChargingStationWorkerEventError,
type ChargingStationWorkerMessage,
type ChargingStationWorkerMessageData,
ChargingStationWorkerMessageEvents,
logger,
logPrefix
} from '../utils/index.js'
-import { type WorkerAbstract, WorkerFactory } from '../worker/index.js'
+import { DEFAULT_ELEMENTS_PER_WORKER, type WorkerAbstract, WorkerFactory } from '../worker/index.js'
import { buildTemplateName, waitChargingStationEvents } from './Helpers.js'
import type { AbstractUIServer } from './ui-server/AbstractUIServer.js'
import { UIServerFactory } from './ui-server/UIServerFactory.js'
ChargingStationWorkerMessageEvents.performanceStatistics,
this.workerEventPerformanceStatistics
)
- this.on(
- ChargingStationWorkerMessageEvents.workerElementError,
- (eventError: ChargingStationWorkerEventError) => {
- logger.error(
- `${this.logPrefix()} ${moduleName}.start: Error occurred while handling '${eventError.event}' event on worker:`,
- eventError
- )
- }
- )
// eslint-disable-next-line @typescript-eslint/unbound-method
if (isAsyncFunction(this.workerImplementation?.start)) {
await this.workerImplementation.start()
elementsPerWorker = this.numberOfConfiguredChargingStations
break
case 'auto':
- default:
elementsPerWorker =
this.numberOfConfiguredChargingStations > availableParallelism()
? Math.round(this.numberOfConfiguredChargingStations / (availableParallelism() * 1.5))
: 1
break
+ default:
+ elementsPerWorker = workerConfiguration.elementsPerWorker ?? DEFAULT_ELEMENTS_PER_WORKER
}
this.workerImplementation = WorkerFactory.getWorkerImplementation<
ChargingStationWorkerData,
case ChargingStationWorkerMessageEvents.performanceStatistics:
this.emit(ChargingStationWorkerMessageEvents.performanceStatistics, msg.data)
break
- case ChargingStationWorkerMessageEvents.addedWorkerElement:
- this.emit(ChargingStationWorkerMessageEvents.addWorkerElement, msg.data)
- break
- case ChargingStationWorkerMessageEvents.workerElementError:
- this.emit(ChargingStationWorkerMessageEvents.workerElementError, msg.data)
- break
default:
throw new BaseError(
`Unknown charging station worker event: '${
import { ThreadWorker } from 'poolifier'
import { BaseError } from '../exception/index.js'
-import type {
- ChargingStationInfo,
- ChargingStationWorkerData,
- ChargingStationWorkerEventError,
- ChargingStationWorkerMessage
-} from '../types/index.js'
+import type { ChargingStationInfo, ChargingStationWorkerData } from '../types/index.js'
import { Configuration } from '../utils/index.js'
-import { type WorkerMessage, WorkerMessageEvents } from '../worker/index.js'
+import { type WorkerDataError, type WorkerMessage, WorkerMessageEvents } from '../worker/index.js'
import { ChargingStation } from './ChargingStation.js'
export let chargingStationWorker: object
ChargingStationWorkerData,
ChargingStationInfo | undefined
>((data?: ChargingStationWorkerData): ChargingStationInfo | undefined => {
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion, no-new
- return new ChargingStation(data!.index, data!.templateFile, data!.options).stationInfo
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ const { index, templateFile, options } = data!
+ return new ChargingStation(index, templateFile, options).stationInfo
})
} else {
// eslint-disable-next-line @typescript-eslint/no-extraneous-class
class ChargingStationWorker<Data extends ChargingStationWorkerData> {
constructor () {
parentPort?.on('message', (message: WorkerMessage<Data>) => {
- switch (message.event) {
+ const { uuid, event, data } = message
+ switch (event) {
case WorkerMessageEvents.addWorkerElement:
try {
const chargingStation = new ChargingStation(
- message.data.index,
- message.data.templateFile,
- message.data.options
+ data.index,
+ data.templateFile,
+ data.options
)
parentPort?.postMessage({
+ uuid,
event: WorkerMessageEvents.addedWorkerElement,
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
data: chargingStation.stationInfo!
- } satisfies ChargingStationWorkerMessage<ChargingStationInfo>)
+ } satisfies WorkerMessage<ChargingStationInfo>)
} catch (error) {
parentPort?.postMessage({
+ uuid,
event: WorkerMessageEvents.workerElementError,
data: {
- event: message.event,
+ event,
name: (error as Error).name,
message: (error as Error).message,
stack: (error as Error).stack
}
- } satisfies ChargingStationWorkerMessage<ChargingStationWorkerEventError>)
+ } satisfies WorkerMessage<WorkerDataError>)
}
break
default:
throw new BaseError(
- `Unknown worker event: '${message.event}' received with data: '${JSON.stringify(
- message.data,
+ `Unknown worker event: '${event}' received with data: '${JSON.stringify(
+ data,
undefined,
2
)}'`
}
public open (): void {
- /** Intentionally empty */
+ /** Intentionally empty */
}
public close (): void {
import type { WebSocket } from 'ws'
-import { type WorkerData, type WorkerMessage, WorkerMessageEvents } from '../worker/index.js'
+import { type WorkerData, type WorkerMessage } from '../worker/index.js'
import type { ChargingStationAutomaticTransactionGeneratorConfiguration } from './AutomaticTransactionGenerator.js'
import { ChargingStationEvents } from './ChargingStationEvents.js'
import type { ChargingStationInfo } from './ChargingStationInfo.js'
}
export const ChargingStationWorkerMessageEvents = {
- ...WorkerMessageEvents,
...ChargingStationEvents,
...ChargingStationMessageEvents
} as const
// eslint-disable-next-line @typescript-eslint/no-redeclare
export type ChargingStationWorkerMessageEvents =
- | WorkerMessageEvents
| ChargingStationEvents
| ChargingStationMessageEvents
-export interface ChargingStationWorkerEventError extends WorkerData {
- event: WorkerMessageEvents
- name: string
- message: string
- stack?: string
-}
-
-export type ChargingStationWorkerMessageData =
- | ChargingStationInfo
- | ChargingStationData
- | Statistics
- | ChargingStationWorkerEventError
+export type ChargingStationWorkerMessageData = ChargingStationData | Statistics
export type ChargingStationWorkerMessage<T extends ChargingStationWorkerMessageData> = Omit<
WorkerMessage<T>,
-'event'
+'uuid' | 'event'
> & {
event: ChargingStationWorkerMessageEvents
}
type ChargingStationData,
type ChargingStationOptions,
type ChargingStationWorkerData,
- type ChargingStationWorkerEventError,
type ChargingStationWorkerMessage,
type ChargingStationWorkerMessageData,
ChargingStationWorkerMessageEvents,
// Partial Copyright Jerome Benoit. 2021-2024. All Rights Reserved.
+import { randomUUID } from 'node:crypto'
import { EventEmitterAsyncResource } from 'node:events'
import { SHARE_ENV, Worker } from 'node:worker_threads'
} from './WorkerTypes.js'
import { randomizeDelay, sleep } from './WorkerUtils.js'
+interface ResponseWrapper<R extends WorkerData> {
+ resolve: (value: R | PromiseLike<R>) => void
+ reject: (reason?: unknown) => void
+ workerSetElement: WorkerSetElement
+}
+
export class WorkerSet<D extends WorkerData, R extends WorkerData> extends WorkerAbstract<D, R> {
public readonly emitter: EventEmitterAsyncResource | undefined
private readonly workerSet: Set<WorkerSetElement>
+ private readonly promiseResponseMap: Map<
+ `${string}-${string}-${string}-${string}`,
+ ResponseWrapper<R>
+ >
+
private started: boolean
private workerStartup: boolean
throw new RangeError('Elements per worker must be greater than zero')
}
this.workerSet = new Set<WorkerSetElement>()
+ this.promiseResponseMap = new Map<
+ `${string}-${string}-${string}-${string}`,
+ ResponseWrapper<R>
+ >()
if (this.workerOptions.poolOptions?.enableEvents === true) {
this.emitter = new EventEmitterAsyncResource({ name: 'workerset' })
}
throw new Error('Cannot add a WorkerSet element: not started')
}
const workerSetElement = await this.getWorkerSetElement()
- const waitAddedWorkerElement = new Promise<R>((resolve, reject) => {
- const messageHandler = (message: WorkerMessage<R>): void => {
- if (message.event === WorkerMessageEvents.addedWorkerElement) {
- ++workerSetElement.numberOfWorkerElements
- resolve(message.data)
- workerSetElement.worker.off('message', messageHandler)
- } else if (message.event === WorkerMessageEvents.workerElementError) {
- // eslint-disable-next-line @typescript-eslint/prefer-promise-reject-errors
- reject(message.data)
- workerSetElement.worker.off('message', messageHandler)
- }
- }
- workerSetElement.worker.on('message', messageHandler)
- })
- workerSetElement.worker.postMessage({
- event: WorkerMessageEvents.addWorkerElement,
- data: elementData
+ const sendMessageToWorker = new Promise<R>((resolve, reject) => {
+ const message = {
+ uuid: randomUUID(),
+ event: WorkerMessageEvents.addWorkerElement,
+ data: elementData
+ } satisfies WorkerMessage<D>
+ workerSetElement.worker.postMessage(message)
+ this.promiseResponseMap.set(message.uuid, { resolve, reject, workerSetElement })
})
- const response = await waitAddedWorkerElement
+ const response = await sendMessageToWorker
// Add element sequentially to optimize memory at startup
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
if (this.workerOptions.elementAddDelay! > 0) {
})
worker.on('message', this.workerOptions.poolOptions?.messageHandler ?? EMPTY_FUNCTION)
worker.on('message', (message: WorkerMessage<R>) => {
- if (message.event === WorkerMessageEvents.addedWorkerElement) {
- this.emitter?.emit(WorkerSetEvents.elementAdded, this.info)
- } else if (message.event === WorkerMessageEvents.workerElementError) {
- this.emitter?.emit(WorkerSetEvents.elementError, message.data)
+ if (this.promiseResponseMap.has(message.uuid)) {
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ const { resolve, reject, workerSetElement } = this.promiseResponseMap.get(message.uuid)!
+ if (message.event === WorkerMessageEvents.addedWorkerElement) {
+ this.emitter?.emit(WorkerSetEvents.elementAdded, this.info)
+ workerSetElement.numberOfWorkerElements++
+ resolve(message.data)
+ } else if (message.event === WorkerMessageEvents.workerElementError) {
+ this.emitter?.emit(WorkerSetEvents.elementError, message.data)
+ reject(message.data)
+ }
+ this.promiseResponseMap.delete(message.uuid)
}
})
worker.on('error', this.workerOptions.poolOptions?.errorHandler ?? EMPTY_FUNCTION)
}
export interface WorkerMessage<T extends WorkerData> {
+ uuid: `${string}-${string}-${string}-${string}`
event: WorkerMessageEvents
data: T
}
addedWorkerElement = 'addedWorkerElement',
workerElementError = 'workerElementError'
}
+
+export interface WorkerDataError extends WorkerData {
+ event: WorkerMessageEvents
+ name: string
+ message: string
+ stack?: string
+}
export type { WorkerAbstract } from './WorkerAbstract.js'
export {
DEFAULT_ELEMENT_ADD_DELAY,
+ DEFAULT_ELEMENTS_PER_WORKER,
DEFAULT_POOL_MAX_SIZE,
DEFAULT_POOL_MIN_SIZE,
DEFAULT_WORKER_START_DELAY
export { WorkerFactory } from './WorkerFactory.js'
export {
type WorkerData,
+ type WorkerDataError,
WorkerEvents,
type WorkerMessage,
WorkerMessageEvents,