test(ocpp-server): add RepeatTimer class to allow to emit OCPP messages on a regular
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Tue, 11 Jun 2024 12:58:01 +0000 (14:58 +0200)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Tue, 11 Jun 2024 12:58:01 +0000 (14:58 +0200)
basis

Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
tests/ocpp-server/server.py

index 051e3108257dbafcbb2ae69dbac063e0a87a225d..8bdba4bfc3960c8f860178ad3e6e62f7e68dd74f 100644 (file)
@@ -1,6 +1,7 @@
 import asyncio
 import logging
 from datetime import datetime, timezone
+from threading import Timer
 from typing import Sequence
 
 import ocpp.v201
@@ -13,9 +14,18 @@ from ocpp.v201.enums import RegistrationStatusType, ClearCacheStatusType
 logging.basicConfig(level=logging.DEBUG)
 
 
+class RepeatTimer(Timer):
+    """ Class that inherits from the Timer class. It will run a
+    function at regular intervals."""
+
+    def run(self):
+        while not self.finished.wait(self.interval):
+            self.function(*self.args, **self.kwargs)
+
+
 # Define a ChargePoint class inheriting from the OCPP 2.0.1 ChargePoint class.
 class ChargePoint(ocpp.v201.ChargePoint):
-    # Message handlers to receive OCPP message.
+    # Message handlers to receive OCPP messages.
     @on('BootNotification')
     async def on_boot_notification(self, charging_station, reason, **kwargs):
         logging.info("Received BootNotification")