Merge branch 'main' into fix-ocpp2-server-send-command
authorJérôme Benoit <jerome.benoit@sap.com>
Thu, 27 Jun 2024 20:23:13 +0000 (22:23 +0200)
committerGitHub <noreply@github.com>
Thu, 27 Jun 2024 20:23:13 +0000 (22:23 +0200)
tests/ocpp-server/README.md
tests/ocpp-server/pyproject.toml
tests/ocpp-server/server.py
tests/ocpp-server/timer.py [new file with mode: 0644]

index e6a4dfa180791a9c62293c160f1127b06b3b4332..32e136196b9fb2eba197b78001ed235897d95656 100644 (file)
@@ -20,9 +20,9 @@ poetry run task server
 
 The server will start listening for connections on port 9000.
 
-## Running the server with command sending
+## Running the server with OCPP command sending
 
-You can also specify a command and a period duration with the --request and --period options respectively when running the server. The server will then send your chosen command to the connected client every period seconds.
+You can also specify a command and a period duration with the --command and --period options respectively when running the server. The server will then send your chosen command to the connected client(s) every period seconds.
 
 ### GetBaseReport Command
 
@@ -40,7 +40,7 @@ To run the server and send a ClearCache command every 5 seconds, use:
 poetry run task server --command ClearCache --period 5
 ```
 
-Please be mindful that these commands were examples according to the provided scenario, the available commands and their syntax might vary depending on the ocpp version and the implemented functionalities on your server.
+Please be mindful that these commands were examples according to the provided scenario, the available commands and their syntax might vary depending on the ocpp version and the implemented functionalities on your client.
 
 ## Overview of the Server Scripts
 
index f5c44fdffcdd9d02e9ce5d3d06a06c103c046b71..25b3fd086ad3e0c4d3046435a987e4a69af7f257 100644 (file)
@@ -20,7 +20,7 @@ format = "ruff format . && ruff check --fix ."
 lint = "ruff format --check --diff . && ruff check --diff ."
 
 [tool.ruff.lint]
-select = ["E", "W", "F", "B", "Q", "I"]
+select = ["E", "W", "F", "ASYNC", "S", "B", "A", "Q", "RUF", "I"]
 
 
 [build-system]
index 3001ebd843cb7b787bef94b34da24752f0c28ff4..a5ebed38ef5f04b58ccf389459c79b1dff6c83b5 100644 (file)
@@ -2,7 +2,8 @@ import argparse
 import asyncio
 import logging
 from datetime import datetime, timezone
-from threading import Timer
+from functools import partial
+from typing import Optional
 
 import ocpp.v201
 import websockets
@@ -11,29 +12,29 @@ from ocpp.v201.enums import (
     Action,
     AuthorizationStatusType,
     ClearCacheStatusType,
+    GenericDeviceModelStatusType,
     RegistrationStatusType,
     ReportBaseType,
     TransactionEventType,
 )
 from websockets import ConnectionClosed
 
+from timer import Timer
+
 # Setting up the logging configuration to display debug level messages.
 logging.basicConfig(level=logging.DEBUG)
 
 ChargePoints = set()
 
 
-class RepeatTimer(Timer):
-    """Class that inherits from the Timer class. It will run a
-    function at regular intervals."""
-
-    def run(self):
-        while not self.finished.wait(self.interval):
-            self.function(*self.args, **self.kwargs)
-
-
 # Define a ChargePoint class inheriting from the OCPP 2.0.1 ChargePoint class.
 class ChargePoint(ocpp.v201.ChargePoint):
+    _command_timer: Optional[Timer]
+
+    def __init__(self, connection):
+        super().__init__(connection.path.strip("/"), connection)
+        self._command_timer = None
+
     # Message handlers to receive OCPP messages.
     @on(Action.BootNotification)
     async def on_boot_notification(self, charging_station, reason, **kwargs):
@@ -95,17 +96,8 @@ class ChargePoint(ocpp.v201.ChargePoint):
         logging.info("Received %s", Action.MeterValues)
         return ocpp.v201.call_result.MeterValues()
 
-    @on(Action.GetBaseReport)
-    async def on_get_base_report(
-        self, request_id: int, report_base: ReportBaseType, **kwargs
-    ):
-        logging.info("Received %s", Action.GetBaseReport)
-        return ocpp.v201.call_result.GetBaseReport(
-            id_token_info={"status": ReportBaseType.accepted}
-        )
-
     # Request handlers to emit OCPP messages.
-    async def send_clear_cache(self):
+    async def _send_clear_cache(self):
         request = ocpp.v201.call.ClearCache()
         response = await self.call(request)
 
@@ -114,47 +106,72 @@ class ChargePoint(ocpp.v201.ChargePoint):
         else:
             logging.info("%s failed", Action.ClearCache)
 
-    async def send_get_base_report(self):
+    async def _send_get_base_report(self):
         request = ocpp.v201.call.GetBaseReport(
-            reportBase=ReportBaseType.ConfigurationInventory
+            request_id=1, report_base=ReportBaseType.full_inventory
         )
         response = await self.call(request)
 
-        if response.status == ReportBaseType.accepted:
+        if response.status == GenericDeviceModelStatusType.accepted:
             logging.info("%s successful", Action.GetBaseReport)
         else:
             logging.info("%s failed", Action.GetBaseReport)
 
-
-# Function to send OCPP command
-async def send_ocpp_command(cp, command_name, delay=None, period=None):
-    try:
+    async def _send_command(self, command_name: Action):
+        logging.debug("Sending OCPP command %s", command_name)
         match command_name:
             case Action.ClearCache:
-                logging.info("%s Send:", Action.ClearCache)
-                await cp.send_clear_cache()
+                await self._send_clear_cache()
             case Action.GetBaseReport:
-                logging.info("%s Send:", Action.GetBaseReport)
-                await cp.send_get_base_report()
-    except Exception:
-        logging.exception(
-            f"Not supported or Failure while processing command {command_name}"
-        )
-
-    if delay:
-        await asyncio.sleep(delay)
+                await self._send_get_base_report()
+            case _:
+                logging.info(f"Not supported command {command_name}")
 
-    if period:
-        my_timer = RepeatTimer(
-            period, asyncio.create_task, [cp.send_ocpp_command(command_name)]
-        )
-        my_timer.start()
+    async def send_command(
+        self, command_name: Action, delay: Optional[float], period: Optional[float]
+    ):
+        if not delay and not period:
+            raise ValueError("Either delay or period must be defined")
+        if delay and delay <= 0:
+            raise ValueError("Delay must be a positive number")
+        if period and period <= 0:
+            raise ValueError("Period must be a positive number")
+        try:
+            if delay and not self._command_timer:
+                self._command_timer = Timer(
+                    delay,
+                    False,
+                    self._send_command,
+                    [command_name],
+                )
+            if period and not self._command_timer:
+                self._command_timer = Timer(
+                    period,
+                    True,
+                    self._send_command,
+                    [command_name],
+                )
+        except ConnectionClosed:
+            self.handle_connection_closed()
+
+    def handle_connection_closed(self):
+        logging.info("ChargePoint %s closed connection", self.id)
+        if self._command_timer:
+            self._command_timer.cancel()
+        ChargePoints.remove(self)
+        logging.debug("Connected ChargePoint(s): %d", len(ChargePoints))
 
 
 # Function to handle new WebSocket connections.
-async def on_connect(websocket, path):
+async def on_connect(
+    websocket,
+    command_name: Optional[Action],
+    delay: Optional[float],
+    period: Optional[float],
+):
     """For every new charge point that connects, create a ChargePoint instance and start
-    listening for messages."""
+    listening for messages.
+    """
     try:
         requested_protocols = websocket.request_headers["Sec-WebSocket-Protocol"]
     except KeyError:
@@ -172,44 +189,38 @@ async def on_connect(websocket, path):
         )
         return await websocket.close()
 
-    charge_point_id = path.strip("/")
-    cp = ChargePoint(charge_point_id, websocket)
+    cp = ChargePoint(websocket)
+    if command_name:
+        await cp.send_command(command_name, delay, period)
 
     ChargePoints.add(cp)
+
     try:
         await cp.start()
-
     except ConnectionClosed:
-        logging.info("ChargePoint %s closed connection", cp.id)
-        ChargePoints.remove(cp)
-        logging.debug("Connected ChargePoint(s): %d", len(ChargePoints))
+        cp.handle_connection_closed()
 
 
 # Main function to start the WebSocket server.
 async def main():
-    # Define argument parser
-    parser = argparse.ArgumentParser(description="OCPP2 Charge Point Simulator")
-    parser.add_argument("--command", type=str, help="OCPP2 Command Name")
-    parser.add_argument("--delay", type=int, help="Delay in seconds")
-    parser.add_argument("--period", type=int, help="Period in seconds")
+    parser = argparse.ArgumentParser(description="OCPP2 Server")
+    parser.add_argument("-c", "--command", type=Action, help="OCPP2 Command Name")
+    parser.add_argument("-d", "--delay", type=float, help="Delay in seconds")
+    parser.add_argument("-p", "--period", type=float, help="Period in seconds")
+
+    args = parser.parse_args()
 
     # Create the WebSocket server and specify the handler for new connections.
     server = await websockets.serve(
-        on_connect,
+        partial(
+            on_connect, command_name=args.command, delay=args.delay, period=args.period
+        ),
         "127.0.0.1",  # Listen on loopback.
         9000,  # Port number.
         subprotocols=["ocpp2.0", "ocpp2.0.1"],  # Specify OCPP 2.0.1 subprotocols.
     )
     logging.info("WebSocket Server Started")
 
-    args = parser.parse_args()
-
-    if args.command:
-        for cp in ChargePoints:
-            asyncio.create_task(
-                send_ocpp_command(cp, args.command, args.delay, args.period)
-            )
-
     # Wait for the server to close (runs indefinitely).
     await server.wait_closed()
 
diff --git a/tests/ocpp-server/timer.py b/tests/ocpp-server/timer.py
new file mode 100644 (file)
index 0000000..aca7404
--- /dev/null
@@ -0,0 +1,64 @@
+"""Timer for asyncio."""
+
+import asyncio
+
+
+class Timer:
+    def __init__(
+        self,
+        timeout: float,
+        repeat: bool,
+        callback,
+        callback_args=(),
+        callback_kwargs=None,
+    ):
+        """An asynchronous Timer object.
+
+        Parameters
+        ----------
+        timeout: :class:`float`:
+        The duration for which the timer should last.
+
+        repeat: :class:`bool`:
+        Whether the timer should repeat.
+
+        callback: :class:`Coroutine` or `Method` or `Function`:
+        An `asyncio` coroutine or a regular method that will be called as soon as
+        the timer ends.
+
+        callback_args: Optional[:class:`tuple`]:
+        The args to be passed to the callback.
+
+        callback_kwargs: Optional[:class:`dict`]:
+        The kwargs to be passed to the callback.
+
+        """
+        self._timeout = timeout
+        self._repeat = repeat
+        self._callback = callback
+        self._task = asyncio.create_task(self._job())
+        self._callback_args = callback_args
+        if callback_kwargs is None:
+            callback_kwargs = {}
+        self._callback_kwargs = callback_kwargs
+
+    async def _job(self):
+        if self._repeat:
+            while self._task.cancelled() is False:
+                await asyncio.sleep(self._timeout)
+                await self._call_callback()
+        else:
+            await asyncio.sleep(self._timeout)
+            await self._call_callback()
+
+    async def _call_callback(self):
+        if asyncio.iscoroutine(self._callback) or asyncio.iscoroutinefunction(
+            self._callback
+        ):
+            await self._callback(*self._callback_args, **self._callback_kwargs)
+        else:
+            self._callback(*self._callback_args, **self._callback_kwargs)
+
+    def cancel(self):
+        """Cancels the timer. The callback will not be called."""
+        self._task.cancel()