From: Jérôme Benoit Date: Thu, 27 Jun 2024 20:23:13 +0000 (+0200) Subject: Merge branch 'main' into fix-ocpp2-server-send-command X-Git-Tag: v1.3.7~13^2~5 X-Git-Url: https://git.piment-noir.org/?a=commitdiff_plain;h=043d750e5207af299c8277febcae40a424b0ab05;hp=7fe5b1b47f62b761505483e471e6621235f596d2;p=e-mobility-charging-stations-simulator.git Merge branch 'main' into fix-ocpp2-server-send-command --- diff --git a/tests/ocpp-server/README.md b/tests/ocpp-server/README.md index e6a4dfa1..32e13619 100644 --- a/tests/ocpp-server/README.md +++ b/tests/ocpp-server/README.md @@ -20,9 +20,9 @@ poetry run task server The server will start listening for connections on port 9000. -## Running the server with command sending +## Running the server with OCPP command sending -You can also specify a command and a period duration with the --request and --period options respectively when running the server. The server will then send your chosen command to the connected client every period seconds. +You can also specify a command and a period duration with the --command and --period options respectively when running the server. The server will then send your chosen command to the connected client(s) every period seconds. ### GetBaseReport Command @@ -40,7 +40,7 @@ To run the server and send a ClearCache command every 5 seconds, use: poetry run task server --command ClearCache --period 5 ``` -Please be mindful that these commands were examples according to the provided scenario, the available commands and their syntax might vary depending on the ocpp version and the implemented functionalities on your server. +Please be mindful that these commands were examples according to the provided scenario, the available commands and their syntax might vary depending on the ocpp version and the implemented functionalities on your client. ## Overview of the Server Scripts diff --git a/tests/ocpp-server/pyproject.toml b/tests/ocpp-server/pyproject.toml index f5c44fdf..25b3fd08 100644 --- a/tests/ocpp-server/pyproject.toml +++ b/tests/ocpp-server/pyproject.toml @@ -20,7 +20,7 @@ format = "ruff format . && ruff check --fix ." lint = "ruff format --check --diff . && ruff check --diff ." [tool.ruff.lint] -select = ["E", "W", "F", "B", "Q", "I"] +select = ["E", "W", "F", "ASYNC", "S", "B", "A", "Q", "RUF", "I"] [build-system] diff --git a/tests/ocpp-server/server.py b/tests/ocpp-server/server.py index 3001ebd8..a5ebed38 100644 --- a/tests/ocpp-server/server.py +++ b/tests/ocpp-server/server.py @@ -2,7 +2,8 @@ import argparse import asyncio import logging from datetime import datetime, timezone -from threading import Timer +from functools import partial +from typing import Optional import ocpp.v201 import websockets @@ -11,29 +12,29 @@ from ocpp.v201.enums import ( Action, AuthorizationStatusType, ClearCacheStatusType, + GenericDeviceModelStatusType, RegistrationStatusType, ReportBaseType, TransactionEventType, ) from websockets import ConnectionClosed +from timer import Timer + # Setting up the logging configuration to display debug level messages. logging.basicConfig(level=logging.DEBUG) ChargePoints = set() -class RepeatTimer(Timer): - """Class that inherits from the Timer class. It will run a - function at regular intervals.""" - - def run(self): - while not self.finished.wait(self.interval): - self.function(*self.args, **self.kwargs) - - # Define a ChargePoint class inheriting from the OCPP 2.0.1 ChargePoint class. class ChargePoint(ocpp.v201.ChargePoint): + _command_timer: Optional[Timer] + + def __init__(self, connection): + super().__init__(connection.path.strip("/"), connection) + self._command_timer = None + # Message handlers to receive OCPP messages. @on(Action.BootNotification) async def on_boot_notification(self, charging_station, reason, **kwargs): @@ -95,17 +96,8 @@ class ChargePoint(ocpp.v201.ChargePoint): logging.info("Received %s", Action.MeterValues) return ocpp.v201.call_result.MeterValues() - @on(Action.GetBaseReport) - async def on_get_base_report( - self, request_id: int, report_base: ReportBaseType, **kwargs - ): - logging.info("Received %s", Action.GetBaseReport) - return ocpp.v201.call_result.GetBaseReport( - id_token_info={"status": ReportBaseType.accepted} - ) - # Request handlers to emit OCPP messages. - async def send_clear_cache(self): + async def _send_clear_cache(self): request = ocpp.v201.call.ClearCache() response = await self.call(request) @@ -114,47 +106,72 @@ class ChargePoint(ocpp.v201.ChargePoint): else: logging.info("%s failed", Action.ClearCache) - async def send_get_base_report(self): + async def _send_get_base_report(self): request = ocpp.v201.call.GetBaseReport( - reportBase=ReportBaseType.ConfigurationInventory + request_id=1, report_base=ReportBaseType.full_inventory ) response = await self.call(request) - if response.status == ReportBaseType.accepted: + if response.status == GenericDeviceModelStatusType.accepted: logging.info("%s successful", Action.GetBaseReport) else: logging.info("%s failed", Action.GetBaseReport) - -# Function to send OCPP command -async def send_ocpp_command(cp, command_name, delay=None, period=None): - try: + async def _send_command(self, command_name: Action): + logging.debug("Sending OCPP command %s", command_name) match command_name: case Action.ClearCache: - logging.info("%s Send:", Action.ClearCache) - await cp.send_clear_cache() + await self._send_clear_cache() case Action.GetBaseReport: - logging.info("%s Send:", Action.GetBaseReport) - await cp.send_get_base_report() - except Exception: - logging.exception( - f"Not supported or Failure while processing command {command_name}" - ) - - if delay: - await asyncio.sleep(delay) + await self._send_get_base_report() + case _: + logging.info(f"Not supported command {command_name}") - if period: - my_timer = RepeatTimer( - period, asyncio.create_task, [cp.send_ocpp_command(command_name)] - ) - my_timer.start() + async def send_command( + self, command_name: Action, delay: Optional[float], period: Optional[float] + ): + if not delay and not period: + raise ValueError("Either delay or period must be defined") + if delay and delay <= 0: + raise ValueError("Delay must be a positive number") + if period and period <= 0: + raise ValueError("Period must be a positive number") + try: + if delay and not self._command_timer: + self._command_timer = Timer( + delay, + False, + self._send_command, + [command_name], + ) + if period and not self._command_timer: + self._command_timer = Timer( + period, + True, + self._send_command, + [command_name], + ) + except ConnectionClosed: + self.handle_connection_closed() + + def handle_connection_closed(self): + logging.info("ChargePoint %s closed connection", self.id) + if self._command_timer: + self._command_timer.cancel() + ChargePoints.remove(self) + logging.debug("Connected ChargePoint(s): %d", len(ChargePoints)) # Function to handle new WebSocket connections. -async def on_connect(websocket, path): +async def on_connect( + websocket, + command_name: Optional[Action], + delay: Optional[float], + period: Optional[float], +): """For every new charge point that connects, create a ChargePoint instance and start - listening for messages.""" + listening for messages. + """ try: requested_protocols = websocket.request_headers["Sec-WebSocket-Protocol"] except KeyError: @@ -172,44 +189,38 @@ async def on_connect(websocket, path): ) return await websocket.close() - charge_point_id = path.strip("/") - cp = ChargePoint(charge_point_id, websocket) + cp = ChargePoint(websocket) + if command_name: + await cp.send_command(command_name, delay, period) ChargePoints.add(cp) + try: await cp.start() - except ConnectionClosed: - logging.info("ChargePoint %s closed connection", cp.id) - ChargePoints.remove(cp) - logging.debug("Connected ChargePoint(s): %d", len(ChargePoints)) + cp.handle_connection_closed() # Main function to start the WebSocket server. async def main(): - # Define argument parser - parser = argparse.ArgumentParser(description="OCPP2 Charge Point Simulator") - parser.add_argument("--command", type=str, help="OCPP2 Command Name") - parser.add_argument("--delay", type=int, help="Delay in seconds") - parser.add_argument("--period", type=int, help="Period in seconds") + parser = argparse.ArgumentParser(description="OCPP2 Server") + parser.add_argument("-c", "--command", type=Action, help="OCPP2 Command Name") + parser.add_argument("-d", "--delay", type=float, help="Delay in seconds") + parser.add_argument("-p", "--period", type=float, help="Period in seconds") + + args = parser.parse_args() # Create the WebSocket server and specify the handler for new connections. server = await websockets.serve( - on_connect, + partial( + on_connect, command_name=args.command, delay=args.delay, period=args.period + ), "127.0.0.1", # Listen on loopback. 9000, # Port number. subprotocols=["ocpp2.0", "ocpp2.0.1"], # Specify OCPP 2.0.1 subprotocols. ) logging.info("WebSocket Server Started") - args = parser.parse_args() - - if args.command: - for cp in ChargePoints: - asyncio.create_task( - send_ocpp_command(cp, args.command, args.delay, args.period) - ) - # Wait for the server to close (runs indefinitely). await server.wait_closed() diff --git a/tests/ocpp-server/timer.py b/tests/ocpp-server/timer.py new file mode 100644 index 00000000..aca74045 --- /dev/null +++ b/tests/ocpp-server/timer.py @@ -0,0 +1,64 @@ +"""Timer for asyncio.""" + +import asyncio + + +class Timer: + def __init__( + self, + timeout: float, + repeat: bool, + callback, + callback_args=(), + callback_kwargs=None, + ): + """An asynchronous Timer object. + + Parameters + ---------- + timeout: :class:`float`: + The duration for which the timer should last. + + repeat: :class:`bool`: + Whether the timer should repeat. + + callback: :class:`Coroutine` or `Method` or `Function`: + An `asyncio` coroutine or a regular method that will be called as soon as + the timer ends. + + callback_args: Optional[:class:`tuple`]: + The args to be passed to the callback. + + callback_kwargs: Optional[:class:`dict`]: + The kwargs to be passed to the callback. + + """ + self._timeout = timeout + self._repeat = repeat + self._callback = callback + self._task = asyncio.create_task(self._job()) + self._callback_args = callback_args + if callback_kwargs is None: + callback_kwargs = {} + self._callback_kwargs = callback_kwargs + + async def _job(self): + if self._repeat: + while self._task.cancelled() is False: + await asyncio.sleep(self._timeout) + await self._call_callback() + else: + await asyncio.sleep(self._timeout) + await self._call_callback() + + async def _call_callback(self): + if asyncio.iscoroutine(self._callback) or asyncio.iscoroutinefunction( + self._callback + ): + await self._callback(*self._callback_args, **self._callback_kwargs) + else: + self._callback(*self._callback_args, **self._callback_kwargs) + + def cancel(self): + """Cancels the timer. The callback will not be called.""" + self._task.cancel()