private configuredSupervisionUrl!: URL
private connectorsConfigurationHash!: string
private evsesConfigurationHash!: string
+ private flushingMessageBuffer: boolean
private flushMessageBufferSetInterval?: NodeJS.Timeout
- private readonly messageBuffer: Set<string>
+ private readonly messageQueue: string[]
private ocppIncomingRequestService!: OCPPIncomingRequestService
private readonly sharedLRUCache: SharedLRUCache
private stopping: boolean
this.connectors = new Map<number, ConnectorStatus>()
this.evses = new Map<number, EvseStatus>()
this.requests = new Map<string, CachedRequest>()
- this.messageBuffer = new Set<string>()
+ this.flushingMessageBuffer = false
+ this.messageQueue = new Array<string>()
this.sharedLRUCache = SharedLRUCache.getInstance()
this.idTagsCache = IdTagsCache.getInstance()
this.chargingStationWorkerBroadcastChannel = new ChargingStationWorkerBroadcastChannel(this)
}
public bufferMessage (message: string): void {
- this.messageBuffer.add(message)
+ this.messageQueue.push(message)
this.setIntervalFlushMessageBuffer()
}
}
private flushMessageBuffer (): void {
- if (this.messageBuffer.size > 0) {
- for (const message of this.messageBuffer.values()) {
- let beginId: string | undefined
- let commandName: RequestCommand | undefined
- const [messageType] = JSON.parse(message) as ErrorResponse | OutgoingRequest | Response
- const isRequest = messageType === MessageType.CALL_MESSAGE
- if (isRequest) {
- ;[, , commandName] = JSON.parse(message) as OutgoingRequest
- beginId = PerformanceStatistics.beginMeasure(commandName)
- }
- this.wsConnection?.send(message, (error?: Error) => {
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- isRequest && PerformanceStatistics.endMeasure(commandName!, beginId!)
- if (error == null) {
- logger.debug(
- `${this.logPrefix()} >> Buffered ${getMessageTypeString(
- messageType
- )} OCPP message sent '${JSON.stringify(message)}'`
- )
- this.messageBuffer.delete(message)
- } else {
- logger.debug(
- `${this.logPrefix()} >> Buffered ${getMessageTypeString(
- messageType
- )} OCPP message '${JSON.stringify(message)}' send failed:`,
- error
- )
- }
- })
- }
+ if (!this.flushingMessageBuffer && this.messageQueue.length > 0) {
+ this.flushingMessageBuffer = true
+ this.sendMessageBuffer(() => {
+ this.flushingMessageBuffer = false
+ })
}
}
}
}
+ private readonly sendMessageBuffer = (
+ onCompleteCallback: () => void,
+ messageIdx?: number
+ ): void => {
+ if (this.messageQueue.length > 0) {
+ const message = this.messageQueue[0]
+ let beginId: string | undefined
+ let commandName: RequestCommand | undefined
+ let parsedMessage: ErrorResponse | OutgoingRequest | Response
+ messageIdx ??= 0
+ try {
+ parsedMessage = JSON.parse(message) as ErrorResponse | OutgoingRequest | Response
+ } catch (error) {
+ logger.error(
+ `${this.logPrefix()} Error while parsing buffered OCPP message '${message}' to JSON:`,
+ error
+ )
+ this.messageQueue.shift()
+ this.sendMessageBuffer(onCompleteCallback, messageIdx)
+ return
+ }
+ const [messageType] = parsedMessage
+ const isRequest = messageType === MessageType.CALL_MESSAGE
+ if (isRequest) {
+ ;[, , commandName] = parsedMessage as OutgoingRequest
+ beginId = PerformanceStatistics.beginMeasure(commandName)
+ }
+ this.wsConnection?.send(message, (error?: Error) => {
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ isRequest && PerformanceStatistics.endMeasure(commandName!, beginId!)
+ if (error == null) {
+ logger.debug(
+ `${this.logPrefix()} >> Buffered ${getMessageTypeString(
+ messageType
+ )} OCPP message sent '${JSON.stringify(message)}'`
+ )
+ this.messageQueue.shift()
+ } else {
+ logger.debug(
+ `${this.logPrefix()} >> Buffered ${getMessageTypeString(
+ messageType
+ )} OCPP message '${JSON.stringify(message)}' send failed:`,
+ error
+ )
+ }
+ // eslint-disable-next-line promise/catch-or-return, @typescript-eslint/no-floating-promises, promise/no-promise-in-callback
+ sleep(exponentialDelay(messageIdx))
+ // eslint-disable-next-line promise/always-return
+ .then(() => {
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ ++messageIdx!
+ this.sendMessageBuffer(onCompleteCallback, messageIdx)
+ })
+ })
+ } else {
+ onCompleteCallback()
+ }
+ }
+
private setIntervalFlushMessageBuffer (): void {
this.flushMessageBufferSetInterval ??= setInterval(() => {
if (this.isWebSocketConnectionOpened() && this.inAcceptedState()) {
this.flushMessageBuffer()
}
- if (this.messageBuffer.size === 0) {
+ if (!this.flushingMessageBuffer && this.messageQueue.length === 0) {
this.clearIntervalFlushMessageBuffer()
}
}, Constants.DEFAULT_MESSAGE_BUFFER_FLUSH_INTERVAL)