From f43dbe14bc4075c3fa6ec7611d8cab7885db69d6 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Tue, 29 Apr 2025 19:29:03 +0200 Subject: [PATCH] fix: throttle failed buffered messages sending with exponential backoff (#1399) MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * fix: throttle failed buffered messages sending with exponential backoff closes #1204 Signed-off-by: Jérôme Benoit * docs: refine error log message at buffer message flushing Signed-off-by: Jérôme Benoit --------- Signed-off-by: Jérôme Benoit Co-authored-by: Jérôme Benoit --- src/charging-station/ChargingStation.ts | 104 ++++++++++++++++-------- 1 file changed, 70 insertions(+), 34 deletions(-) diff --git a/src/charging-station/ChargingStation.ts b/src/charging-station/ChargingStation.ts index 8a66d31a..3c4d1749 100644 --- a/src/charging-station/ChargingStation.ts +++ b/src/charging-station/ChargingStation.ts @@ -203,8 +203,9 @@ export class ChargingStation extends EventEmitter { private configuredSupervisionUrl!: URL private connectorsConfigurationHash!: string private evsesConfigurationHash!: string + private flushingMessageBuffer: boolean private flushMessageBufferSetInterval?: NodeJS.Timeout - private readonly messageBuffer: Set + private readonly messageQueue: string[] private ocppIncomingRequestService!: OCPPIncomingRequestService private readonly sharedLRUCache: SharedLRUCache private stopping: boolean @@ -225,7 +226,8 @@ export class ChargingStation extends EventEmitter { this.connectors = new Map() this.evses = new Map() this.requests = new Map() - this.messageBuffer = new Set() + this.flushingMessageBuffer = false + this.messageQueue = new Array() this.sharedLRUCache = SharedLRUCache.getInstance() this.idTagsCache = IdTagsCache.getInstance() this.chargingStationWorkerBroadcastChannel = new ChargingStationWorkerBroadcastChannel(this) @@ -300,7 +302,7 @@ export class ChargingStation extends EventEmitter { } public bufferMessage (message: string): void { - this.messageBuffer.add(message) + this.messageQueue.push(message) this.setIntervalFlushMessageBuffer() } @@ -1073,36 +1075,11 @@ export class ChargingStation extends EventEmitter { } private flushMessageBuffer (): void { - if (this.messageBuffer.size > 0) { - for (const message of this.messageBuffer.values()) { - let beginId: string | undefined - let commandName: RequestCommand | undefined - const [messageType] = JSON.parse(message) as ErrorResponse | OutgoingRequest | Response - const isRequest = messageType === MessageType.CALL_MESSAGE - if (isRequest) { - ;[, , commandName] = JSON.parse(message) as OutgoingRequest - beginId = PerformanceStatistics.beginMeasure(commandName) - } - this.wsConnection?.send(message, (error?: Error) => { - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - isRequest && PerformanceStatistics.endMeasure(commandName!, beginId!) - if (error == null) { - logger.debug( - `${this.logPrefix()} >> Buffered ${getMessageTypeString( - messageType - )} OCPP message sent '${JSON.stringify(message)}'` - ) - this.messageBuffer.delete(message) - } else { - logger.debug( - `${this.logPrefix()} >> Buffered ${getMessageTypeString( - messageType - )} OCPP message '${JSON.stringify(message)}' send failed:`, - error - ) - } - }) - } + if (!this.flushingMessageBuffer && this.messageQueue.length > 0) { + this.flushingMessageBuffer = true + this.sendMessageBuffer(() => { + this.flushingMessageBuffer = false + }) } } @@ -2295,12 +2272,71 @@ export class ChargingStation extends EventEmitter { } } + private readonly sendMessageBuffer = ( + onCompleteCallback: () => void, + messageIdx?: number + ): void => { + if (this.messageQueue.length > 0) { + const message = this.messageQueue[0] + let beginId: string | undefined + let commandName: RequestCommand | undefined + let parsedMessage: ErrorResponse | OutgoingRequest | Response + messageIdx ??= 0 + try { + parsedMessage = JSON.parse(message) as ErrorResponse | OutgoingRequest | Response + } catch (error) { + logger.error( + `${this.logPrefix()} Error while parsing buffered OCPP message '${message}' to JSON:`, + error + ) + this.messageQueue.shift() + this.sendMessageBuffer(onCompleteCallback, messageIdx) + return + } + const [messageType] = parsedMessage + const isRequest = messageType === MessageType.CALL_MESSAGE + if (isRequest) { + ;[, , commandName] = parsedMessage as OutgoingRequest + beginId = PerformanceStatistics.beginMeasure(commandName) + } + this.wsConnection?.send(message, (error?: Error) => { + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + isRequest && PerformanceStatistics.endMeasure(commandName!, beginId!) + if (error == null) { + logger.debug( + `${this.logPrefix()} >> Buffered ${getMessageTypeString( + messageType + )} OCPP message sent '${JSON.stringify(message)}'` + ) + this.messageQueue.shift() + } else { + logger.debug( + `${this.logPrefix()} >> Buffered ${getMessageTypeString( + messageType + )} OCPP message '${JSON.stringify(message)}' send failed:`, + error + ) + } + // eslint-disable-next-line promise/catch-or-return, @typescript-eslint/no-floating-promises, promise/no-promise-in-callback + sleep(exponentialDelay(messageIdx)) + // eslint-disable-next-line promise/always-return + .then(() => { + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + ++messageIdx! + this.sendMessageBuffer(onCompleteCallback, messageIdx) + }) + }) + } else { + onCompleteCallback() + } + } + private setIntervalFlushMessageBuffer (): void { this.flushMessageBufferSetInterval ??= setInterval(() => { if (this.isWebSocketConnectionOpened() && this.inAcceptedState()) { this.flushMessageBuffer() } - if (this.messageBuffer.size === 0) { + if (!this.flushingMessageBuffer && this.messageQueue.length === 0) { this.clearIntervalFlushMessageBuffer() } }, Constants.DEFAULT_MESSAGE_BUFFER_FLUSH_INTERVAL) -- 2.43.0