]> Piment Noir Git Repositories - e-mobility-charging-stations-simulator.git/commitdiff
fix: throttle failed buffered messages sending with exponential backoff (#1399)
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Tue, 29 Apr 2025 17:29:03 +0000 (19:29 +0200)
committerGitHub <noreply@github.com>
Tue, 29 Apr 2025 17:29:03 +0000 (19:29 +0200)
* fix: throttle failed buffered messages sending with exponential backoff

closes #1204

Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
* docs: refine error log message at buffer message flushing

Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
---------

Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
Co-authored-by: Jérôme Benoit <jerome.benoit@sap.com>
src/charging-station/ChargingStation.ts

index 8a66d31a6bad605f4c33b74b42ffc80524c1e062..3c4d174935a34f02e9415f7368899403bcffc922 100644 (file)
@@ -203,8 +203,9 @@ export class ChargingStation extends EventEmitter {
   private configuredSupervisionUrl!: URL
   private connectorsConfigurationHash!: string
   private evsesConfigurationHash!: string
+  private flushingMessageBuffer: boolean
   private flushMessageBufferSetInterval?: NodeJS.Timeout
-  private readonly messageBuffer: Set<string>
+  private readonly messageQueue: string[]
   private ocppIncomingRequestService!: OCPPIncomingRequestService
   private readonly sharedLRUCache: SharedLRUCache
   private stopping: boolean
@@ -225,7 +226,8 @@ export class ChargingStation extends EventEmitter {
     this.connectors = new Map<number, ConnectorStatus>()
     this.evses = new Map<number, EvseStatus>()
     this.requests = new Map<string, CachedRequest>()
-    this.messageBuffer = new Set<string>()
+    this.flushingMessageBuffer = false
+    this.messageQueue = new Array<string>()
     this.sharedLRUCache = SharedLRUCache.getInstance()
     this.idTagsCache = IdTagsCache.getInstance()
     this.chargingStationWorkerBroadcastChannel = new ChargingStationWorkerBroadcastChannel(this)
@@ -300,7 +302,7 @@ export class ChargingStation extends EventEmitter {
   }
 
   public bufferMessage (message: string): void {
-    this.messageBuffer.add(message)
+    this.messageQueue.push(message)
     this.setIntervalFlushMessageBuffer()
   }
 
@@ -1073,36 +1075,11 @@ export class ChargingStation extends EventEmitter {
   }
 
   private flushMessageBuffer (): void {
-    if (this.messageBuffer.size > 0) {
-      for (const message of this.messageBuffer.values()) {
-        let beginId: string | undefined
-        let commandName: RequestCommand | undefined
-        const [messageType] = JSON.parse(message) as ErrorResponse | OutgoingRequest | Response
-        const isRequest = messageType === MessageType.CALL_MESSAGE
-        if (isRequest) {
-          ;[, , commandName] = JSON.parse(message) as OutgoingRequest
-          beginId = PerformanceStatistics.beginMeasure(commandName)
-        }
-        this.wsConnection?.send(message, (error?: Error) => {
-          // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
-          isRequest && PerformanceStatistics.endMeasure(commandName!, beginId!)
-          if (error == null) {
-            logger.debug(
-              `${this.logPrefix()} >> Buffered ${getMessageTypeString(
-                messageType
-              )} OCPP message sent '${JSON.stringify(message)}'`
-            )
-            this.messageBuffer.delete(message)
-          } else {
-            logger.debug(
-              `${this.logPrefix()} >> Buffered ${getMessageTypeString(
-                messageType
-              )} OCPP message '${JSON.stringify(message)}' send failed:`,
-              error
-            )
-          }
-        })
-      }
+    if (!this.flushingMessageBuffer && this.messageQueue.length > 0) {
+      this.flushingMessageBuffer = true
+      this.sendMessageBuffer(() => {
+        this.flushingMessageBuffer = false
+      })
     }
   }
 
@@ -2295,12 +2272,71 @@ export class ChargingStation extends EventEmitter {
     }
   }
 
+  private readonly sendMessageBuffer = (
+    onCompleteCallback: () => void,
+    messageIdx?: number
+  ): void => {
+    if (this.messageQueue.length > 0) {
+      const message = this.messageQueue[0]
+      let beginId: string | undefined
+      let commandName: RequestCommand | undefined
+      let parsedMessage: ErrorResponse | OutgoingRequest | Response
+      messageIdx ??= 0
+      try {
+        parsedMessage = JSON.parse(message) as ErrorResponse | OutgoingRequest | Response
+      } catch (error) {
+        logger.error(
+          `${this.logPrefix()} Error while parsing buffered OCPP message '${message}' to JSON:`,
+          error
+        )
+        this.messageQueue.shift()
+        this.sendMessageBuffer(onCompleteCallback, messageIdx)
+        return
+      }
+      const [messageType] = parsedMessage
+      const isRequest = messageType === MessageType.CALL_MESSAGE
+      if (isRequest) {
+        ;[, , commandName] = parsedMessage as OutgoingRequest
+        beginId = PerformanceStatistics.beginMeasure(commandName)
+      }
+      this.wsConnection?.send(message, (error?: Error) => {
+        // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+        isRequest && PerformanceStatistics.endMeasure(commandName!, beginId!)
+        if (error == null) {
+          logger.debug(
+            `${this.logPrefix()} >> Buffered ${getMessageTypeString(
+              messageType
+            )} OCPP message sent '${JSON.stringify(message)}'`
+          )
+          this.messageQueue.shift()
+        } else {
+          logger.debug(
+            `${this.logPrefix()} >> Buffered ${getMessageTypeString(
+              messageType
+            )} OCPP message '${JSON.stringify(message)}' send failed:`,
+            error
+          )
+        }
+        // eslint-disable-next-line promise/catch-or-return, @typescript-eslint/no-floating-promises, promise/no-promise-in-callback
+        sleep(exponentialDelay(messageIdx))
+          // eslint-disable-next-line promise/always-return
+          .then(() => {
+            // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+            ++messageIdx!
+            this.sendMessageBuffer(onCompleteCallback, messageIdx)
+          })
+      })
+    } else {
+      onCompleteCallback()
+    }
+  }
+
   private setIntervalFlushMessageBuffer (): void {
     this.flushMessageBufferSetInterval ??= setInterval(() => {
       if (this.isWebSocketConnectionOpened() && this.inAcceptedState()) {
         this.flushMessageBuffer()
       }
-      if (this.messageBuffer.size === 0) {
+      if (!this.flushingMessageBuffer && this.messageQueue.length === 0) {
         this.clearIntervalFlushMessageBuffer()
       }
     }, Constants.DEFAULT_MESSAGE_BUFFER_FLUSH_INTERVAL)