From 87dd8a85cf076059df34f0fc7e56a4dff9a56004 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Fri, 13 Mar 2026 19:53:46 +0100 Subject: [PATCH] feat(ocpp-server): add error handling, configurable params, async tests, and Python 3.14+ compat - Add centralized error handling in _send_command dispatch for TimeoutError, OCPPError, and ConnectionClosed - Make host/port configurable via --host and --port CLI args - Make boot notification status configurable via --boot-status - Make TransactionEvent total cost configurable via --total-cost - Extract server defaults as module-level constants - Fix asyncio.iscoroutine() deprecation in timer.py: use inspect.iscoroutinefunction() + inspect.isawaitable() for Python 3.14+ - Add proper type annotations and docstrings to timer.py - Add pytest-asyncio for async handler testing (82 tests total) - Add async tests for all 15 incoming OCPP handlers - Add error handling tests for command dispatch layer - Add tests for configurable boot_status and total_cost - Update README with new server and charging behavior CLI options --- tests/ocpp-server/README.md | 31 ++- tests/ocpp-server/poetry.lock | 24 ++- tests/ocpp-server/pyproject.toml | 4 + tests/ocpp-server/server.py | 220 +++++++++++++-------- tests/ocpp-server/test_server.py | 315 ++++++++++++++++++++++++++++++- tests/ocpp-server/timer.py | 64 +++---- 6 files changed, 537 insertions(+), 121 deletions(-) diff --git a/tests/ocpp-server/README.md b/tests/ocpp-server/README.md index 86262e09..d703a2a4 100644 --- a/tests/ocpp-server/README.md +++ b/tests/ocpp-server/README.md @@ -34,7 +34,18 @@ Or poetry run python server.py ``` -The server will start listening for connections on port 9000. +The server will start listening for connections on `127.0.0.1:9000` by default. + +### Server Configuration + +```shell +poetry run python server.py --host --port +``` + +**Options:** + +- `--host `: Server bind address (default: `127.0.0.1`) +- `--port `: Server port (default: `9000`) ## Running the server with OCPP command sending @@ -96,6 +107,24 @@ poetry run task server --command GetBaseReport --period 5 - `StatusNotification` - Handle status notifications - `TransactionEvent` - Handle transaction events (Started/Updated/Ended) +## Charging Behavior Configuration + +```shell +poetry run python server.py --boot-status --total-cost +``` + +**Options:** + +- `--boot-status `: BootNotification response status (`accepted`, `pending`, `rejected`; default: `accepted`) +- `--total-cost `: Total cost returned in TransactionEvent.Updated responses (default: `10.0`) + +**Examples:** + +```shell +poetry run python server.py --boot-status rejected +poetry run python server.py --total-cost 25.50 +``` + ## Authorization Testing Modes The server supports configurable authorization behavior for testing OCPP 2.0 authentication scenarios: diff --git a/tests/ocpp-server/poetry.lock b/tests/ocpp-server/poetry.lock index a53725c6..7187666b 100644 --- a/tests/ocpp-server/poetry.lock +++ b/tests/ocpp-server/poetry.lock @@ -197,6 +197,26 @@ pygments = ">=2.7.2" [package.extras] dev = ["argcomplete", "attrs (>=19.2)", "hypothesis (>=3.56)", "mock", "requests", "setuptools", "xmlschema"] +[[package]] +name = "pytest-asyncio" +version = "1.3.0" +description = "Pytest support for asyncio" +optional = false +python-versions = ">=3.10" +groups = ["dev"] +files = [ + {file = "pytest_asyncio-1.3.0-py3-none-any.whl", hash = "sha256:611e26147c7f77640e6d0a92a38ed17c3e9848063698d5c93d5aa7aa11cebff5"}, + {file = "pytest_asyncio-1.3.0.tar.gz", hash = "sha256:d7f52f36d231b80ee124cd216ffb19369aa168fc10095013c6b014a34d3ee9e5"}, +] + +[package.dependencies] +pytest = ">=8.2,<10" +typing-extensions = {version = ">=4.12", markers = "python_version < \"3.13\""} + +[package.extras] +docs = ["sphinx (>=5.3)", "sphinx-rtd-theme (>=1)"] +testing = ["coverage (>=6.2)", "hypothesis (>=5.7.1)"] + [[package]] name = "referencing" version = "0.37.0" @@ -448,7 +468,7 @@ version = "4.15.0" description = "Backported and Experimental Type Hints for Python 3.9+" optional = false python-versions = ">=3.9" -groups = ["main"] +groups = ["main", "dev"] markers = "python_version == \"3.12\"" files = [ {file = "typing_extensions-4.15.0-py3-none-any.whl", hash = "sha256:f0fa19c6845758ab08074a0cfa8b7aecb71c999ca73d62883bc25cc018c4e548"}, @@ -537,4 +557,4 @@ files = [ [metadata] lock-version = "2.1" python-versions = "^3.12" -content-hash = "4f89833c0ea50e967a4cdc8da890763e911e8ca4f2dd29a5e6e884d014cc2386" +content-hash = "64161175b159d2fda4d43410892ac9fee77a1137720a8e3f788b70d7c44256d7" diff --git a/tests/ocpp-server/pyproject.toml b/tests/ocpp-server/pyproject.toml index f5004283..ccbb047f 100644 --- a/tests/ocpp-server/pyproject.toml +++ b/tests/ocpp-server/pyproject.toml @@ -14,6 +14,7 @@ ocpp = "^2.1.0" taskipy = "^1" ruff = "^0.15" pytest = "^8" +pytest-asyncio = ">=1.3.0" [tool.taskipy.tasks] server = "poetry run python server.py" @@ -27,6 +28,9 @@ select = ["E", "W", "F", "ASYNC", "S", "B", "A", "Q", "RUF", "I"] [tool.ruff.lint.per-file-ignores] "test_*.py" = ["S101"] +[tool.pytest.ini_options] +asyncio_mode = "auto" + [build-system] requires = ["poetry-core"] diff --git a/tests/ocpp-server/server.py b/tests/ocpp-server/server.py index 02507893..a0d7810a 100644 --- a/tests/ocpp-server/server.py +++ b/tests/ocpp-server/server.py @@ -1,3 +1,5 @@ +"""OCPP 2.0.1 mock server for e-mobility charging station simulator testing.""" + import argparse import asyncio import logging @@ -7,7 +9,7 @@ from random import randint import ocpp.v201 import websockets -from ocpp.exceptions import InternalError +from ocpp.exceptions import InternalError, OCPPError from ocpp.routing import on from ocpp.v201.enums import ( Action, @@ -45,25 +47,42 @@ from websockets import ConnectionClosed from timer import Timer -# Setting up the logging configuration to display debug level messages. logging.basicConfig(level=logging.DEBUG) -ChargePoints = set() +# Server defaults +DEFAULT_HOST = "127.0.0.1" +DEFAULT_PORT = 9000 +DEFAULT_HEARTBEAT_INTERVAL = 60 +DEFAULT_TOTAL_COST = 10.0 +SUBPROTOCOLS = ["ocpp2.0", "ocpp2.0.1"] + +ChargePoints: set["ChargePoint"] = set() class ChargePoint(ocpp.v201.ChargePoint): + """OCPP 2.0.1 charge point handler with configurable behavior for testing.""" + _command_timer: Timer | None _auth_config: dict + _boot_status: RegistrationStatusEnumType + _total_cost: float - def __init__(self, connection, auth_config: dict | None = None): + def __init__( + self, + connection, + auth_config: dict | None = None, + boot_status: RegistrationStatusEnumType = RegistrationStatusEnumType.accepted, + total_cost: float = DEFAULT_TOTAL_COST, + ): super().__init__(connection.path.strip("/"), connection) self._command_timer = None - # Auth configuration for testing different scenarios + self._boot_status = boot_status + self._total_cost = total_cost self._auth_config = auth_config or { - "mode": "normal", # normal, offline, whitelist, blacklist, rate_limit + "mode": "normal", "whitelist": ["valid_token", "test_token", "authorized_user"], "blacklist": ["blocked_token", "invalid_user"], - "offline": False, # Simulate network failure + "offline": False, "default_status": AuthorizationStatusEnumType.accepted, } @@ -84,21 +103,19 @@ class ChargePoint(ocpp.v201.ChargePoint): ) if mode == "rate_limit": return AuthorizationStatusEnumType.not_at_this_time - # normal mode return self._auth_config.get( "default_status", AuthorizationStatusEnumType.accepted ) - # Message handlers to receive OCPP messages. + # --- Incoming message handlers (CS → CSMS) --- + @on(Action.boot_notification) async def on_boot_notification(self, charging_station, reason, **kwargs): logging.info("Received %s", Action.boot_notification) - # Create and return a BootNotification response with the current time, - # an interval of 60 seconds, and an accepted status. return ocpp.v201.call_result.BootNotification( current_time=datetime.now(timezone.utc).isoformat(), - interval=60, - status=RegistrationStatusEnumType.accepted, + interval=DEFAULT_HEARTBEAT_INTERVAL, + status=self._boot_status, ) @on(Action.heartbeat) @@ -121,7 +138,6 @@ class ChargePoint(ocpp.v201.ChargePoint): "Received %s for token: %s", Action.authorize, id_token.get("id_token") ) - # Simulate offline mode (network failure) if self._auth_config.get("offline", False): logging.warning("Offline mode - simulating network failure") raise InternalError(description="Simulated network failure") @@ -158,7 +174,9 @@ class ChargePoint(ocpp.v201.ChargePoint): ) case TransactionEventEnumType.updated: logging.info("Received %s Updated", Action.transaction_event) - return ocpp.v201.call_result.TransactionEvent(total_cost=10) + return ocpp.v201.call_result.TransactionEvent( + total_cost=self._total_cost + ) case TransactionEventEnumType.ended: logging.info("Received %s Ended", Action.transaction_event) return ocpp.v201.call_result.TransactionEvent() @@ -231,7 +249,8 @@ class ChargePoint(ocpp.v201.ChargePoint): logging.info("Received %s", Action.notify_customer_information) return ocpp.v201.call_result.NotifyCustomerInformation() - # Request handlers to emit OCPP messages. + # --- Outgoing commands (CSMS → CS) --- + async def _send_clear_cache(self): request = ocpp.v201.call.ClearCache() response = await self.call(request) @@ -475,51 +494,68 @@ class ChargePoint(ocpp.v201.ChargePoint): else: logging.info("%s failed", Action.update_firmware) + # --- Command dispatch --- + async def _send_command(self, command_name: Action): logging.debug("Sending OCPP command %s", command_name) - match command_name: - case Action.clear_cache: - await self._send_clear_cache() - case Action.get_base_report: - await self._send_get_base_report() - case Action.get_variables: - await self._send_get_variables() - case Action.set_variables: - await self._send_set_variables() - case Action.request_start_transaction: - await self._send_request_start_transaction() - case Action.request_stop_transaction: - await self._send_request_stop_transaction() - case Action.reset: - await self._send_reset() - case Action.unlock_connector: - await self._send_unlock_connector() - case Action.change_availability: - await self._send_change_availability() - case Action.trigger_message: - await self._send_trigger_message() - case Action.data_transfer: - await self._send_data_transfer() - case Action.certificate_signed: - await self._send_certificate_signed() - case Action.customer_information: - await self._send_customer_information() - case Action.delete_certificate: - await self._send_delete_certificate() - case Action.get_installed_certificate_ids: - await self._send_get_installed_certificate_ids() - case Action.get_log: - await self._send_get_log() - case Action.get_transaction_status: - await self._send_get_transaction_status() - case Action.install_certificate: - await self._send_install_certificate() - case Action.set_network_profile: - await self._send_set_network_profile() - case Action.update_firmware: - await self._send_update_firmware() - case _: - logging.warning("Not supported command %s", command_name) + try: + match command_name: + case Action.clear_cache: + await self._send_clear_cache() + case Action.get_base_report: + await self._send_get_base_report() + case Action.get_variables: + await self._send_get_variables() + case Action.set_variables: + await self._send_set_variables() + case Action.request_start_transaction: + await self._send_request_start_transaction() + case Action.request_stop_transaction: + await self._send_request_stop_transaction() + case Action.reset: + await self._send_reset() + case Action.unlock_connector: + await self._send_unlock_connector() + case Action.change_availability: + await self._send_change_availability() + case Action.trigger_message: + await self._send_trigger_message() + case Action.data_transfer: + await self._send_data_transfer() + case Action.certificate_signed: + await self._send_certificate_signed() + case Action.customer_information: + await self._send_customer_information() + case Action.delete_certificate: + await self._send_delete_certificate() + case Action.get_installed_certificate_ids: + await self._send_get_installed_certificate_ids() + case Action.get_log: + await self._send_get_log() + case Action.get_transaction_status: + await self._send_get_transaction_status() + case Action.install_certificate: + await self._send_install_certificate() + case Action.set_network_profile: + await self._send_set_network_profile() + case Action.update_firmware: + await self._send_update_firmware() + case _: + logging.warning("Not supported command %s", command_name) + except TimeoutError: + logging.error("Timeout waiting for %s response", command_name) + except OCPPError as e: + logging.error( + "OCPP error sending %s: [%s] %s", + command_name, + type(e).__name__, + e.description, + ) + except ConnectionClosed: + logging.warning("Connection closed while sending %s", command_name) + self.handle_connection_closed() + except Exception: + logging.exception("Unexpected error sending %s", command_name) async def send_command( self, command_name: Action, delay: float | None, period: float | None @@ -530,14 +566,14 @@ class ChargePoint(ocpp.v201.ChargePoint): delay, False, self._send_command, - [command_name], + (command_name,), ) if period and not self._command_timer: self._command_timer = Timer( period, True, self._send_command, - [command_name], + (command_name,), ) except ConnectionClosed: self.handle_connection_closed() @@ -550,17 +586,16 @@ class ChargePoint(ocpp.v201.ChargePoint): logging.debug("Connected ChargePoint(s): %d", len(ChargePoints)) -# Function to handle new WebSocket connections. async def on_connect( websocket, command_name: Action | None, delay: float | None, period: float | None, auth_config: dict | None, + boot_status: RegistrationStatusEnumType, + total_cost: float, ): - """For every new charge point that connects, create a ChargePoint instance and start - listening for messages. - """ + """Handle new WebSocket connections from charge points.""" try: requested_protocols = websocket.request_headers["Sec-WebSocket-Protocol"] except KeyError: @@ -578,7 +613,12 @@ async def on_connect( ) return await websocket.close() - cp = ChargePoint(websocket, auth_config) + cp = ChargePoint( + websocket, + auth_config=auth_config, + boot_status=boot_status, + total_cost=total_cost, + ) if command_name: await cp.send_command(command_name, delay, period) @@ -600,7 +640,6 @@ def check_positive_number(value): return value -# Main function to start the WebSocket server. async def main(): parser = argparse.ArgumentParser(description="OCPP2 Server") parser.add_argument("-c", "--command", type=Action, help="command name") @@ -618,7 +657,35 @@ async def main(): help="period in seconds", ) - # Auth configuration arguments + # Server configuration + parser.add_argument( + "--host", + type=str, + default=DEFAULT_HOST, + help=f"server host (default: {DEFAULT_HOST})", + ) + parser.add_argument( + "--port", + type=int, + default=DEFAULT_PORT, + help=f"server port (default: {DEFAULT_PORT})", + ) + + # Charging configuration + parser.add_argument( + "--boot-status", + type=RegistrationStatusEnumType, + default=RegistrationStatusEnumType.accepted, + help="boot notification response status (default: accepted)", + ) + parser.add_argument( + "--total-cost", + type=float, + default=DEFAULT_TOTAL_COST, + help=f"TransactionEvent.Updated total cost (default: {DEFAULT_TOTAL_COST})", + ) + + # Auth configuration parser.add_argument( "--auth-mode", type=str, @@ -646,14 +713,11 @@ async def main(): help="Simulate offline/network failure mode", ) - # Parse args to check if group.required should be set args, _ = parser.parse_known_args() group.required = args.command is not None - # Re-parse with full validation args = parser.parse_args() - # Build auth configuration from CLI args auth_config = { "mode": args.auth_mode, "whitelist": args.whitelist, @@ -666,7 +730,6 @@ async def main(): "Auth configuration: mode=%s, offline=%s", args.auth_mode, args.offline ) - # Create the WebSocket server and specify the handler for new connections. server = await websockets.serve( partial( on_connect, @@ -674,18 +737,17 @@ async def main(): delay=args.delay, period=args.period, auth_config=auth_config, + boot_status=args.boot_status, + total_cost=args.total_cost, ), - "127.0.0.1", # Listen on loopback. - 9000, # Port number. - subprotocols=["ocpp2.0", "ocpp2.0.1"], # Specify OCPP 2.0.1 subprotocols. + args.host, + args.port, + subprotocols=SUBPROTOCOLS, ) - logging.info("WebSocket Server Started") + logging.info("WebSocket Server Started on %s:%d", args.host, args.port) - # Wait for the server to close (runs indefinitely). await server.wait_closed() -# Entry point of the script. if __name__ == "__main__": - # Run the main function to start the server. asyncio.run(main()) diff --git a/tests/ocpp-server/test_server.py b/tests/ocpp-server/test_server.py index 3e2f90ad..33459d5a 100644 --- a/tests/ocpp-server/test_server.py +++ b/tests/ocpp-server/test_server.py @@ -2,12 +2,25 @@ import argparse from typing import ClassVar -from unittest.mock import MagicMock +from unittest.mock import MagicMock, patch import pytest -from ocpp.v201.enums import AuthorizationStatusEnumType - -from server import ChargePoint, check_positive_number +from ocpp.v201.enums import ( + AuthorizationStatusEnumType, + DataTransferStatusEnumType, + GenericStatusEnumType, + GetCertificateStatusEnumType, + Iso15118EVCertificateStatusEnumType, + RegistrationStatusEnumType, + TransactionEventEnumType, +) + +from server import ( + DEFAULT_HEARTBEAT_INTERVAL, + DEFAULT_TOTAL_COST, + ChargePoint, + check_positive_number, +) @pytest.fixture @@ -54,6 +67,21 @@ def blacklist_charge_point(mock_connection): ) +@pytest.fixture +def offline_charge_point(mock_connection): + """Create a ChargePoint with offline mode enabled.""" + return ChargePoint( + mock_connection, + auth_config={ + "mode": "normal", + "whitelist": [], + "blacklist": [], + "offline": True, + "default_status": AuthorizationStatusEnumType.accepted, + }, + ) + + class TestCheckPositiveNumber: """Tests for the check_positive_number argument validator.""" @@ -177,7 +205,6 @@ class TestChargePointHandlerCoverage: assert callable(getattr(ChargePoint, method_name)) def test_incoming_handler_count(self): - """Verify no handlers were accidentally removed.""" handler_count = sum( 1 for name in dir(ChargePoint) @@ -186,7 +213,6 @@ class TestChargePointHandlerCoverage: assert handler_count >= len(self.EXPECTED_INCOMING_HANDLERS) def test_outgoing_command_count(self): - """Verify no outgoing commands were accidentally removed.""" command_count = sum( 1 for name in dir(ChargePoint) @@ -212,3 +238,280 @@ class TestChargePointDefaultConfig: def test_command_timer_initially_none(self, charge_point): assert charge_point._command_timer is None + + def test_default_boot_status(self, charge_point): + assert charge_point._boot_status == RegistrationStatusEnumType.accepted + + def test_custom_boot_status(self, mock_connection): + cp = ChargePoint( + mock_connection, boot_status=RegistrationStatusEnumType.rejected + ) + assert cp._boot_status == RegistrationStatusEnumType.rejected + + def test_default_total_cost(self, charge_point): + assert charge_point._total_cost == DEFAULT_TOTAL_COST + + def test_custom_total_cost(self, mock_connection): + cp = ChargePoint(mock_connection, total_cost=25.50) + assert cp._total_cost == 25.50 + + +# --- Async handler tests --- + + +class TestBootNotificationHandler: + """Tests for the BootNotification incoming handler.""" + + async def test_returns_accepted_by_default(self, charge_point): + response = await charge_point.on_boot_notification( + charging_station={"model": "Test", "vendor_name": "TestVendor"}, + reason="PowerUp", + ) + assert response.status == RegistrationStatusEnumType.accepted + assert response.interval == DEFAULT_HEARTBEAT_INTERVAL + assert response.current_time is not None + + async def test_configurable_boot_status(self, mock_connection): + cp = ChargePoint( + mock_connection, boot_status=RegistrationStatusEnumType.rejected + ) + response = await cp.on_boot_notification( + charging_station={"model": "Test", "vendor_name": "TestVendor"}, + reason="PowerUp", + ) + assert response.status == RegistrationStatusEnumType.rejected + + async def test_pending_boot_status(self, mock_connection): + cp = ChargePoint( + mock_connection, boot_status=RegistrationStatusEnumType.pending + ) + response = await cp.on_boot_notification( + charging_station={"model": "Test", "vendor_name": "TestVendor"}, + reason="PowerUp", + ) + assert response.status == RegistrationStatusEnumType.pending + + +class TestHeartbeatHandler: + """Tests for the Heartbeat incoming handler.""" + + async def test_returns_current_time(self, charge_point): + response = await charge_point.on_heartbeat() + assert response.current_time is not None + assert len(response.current_time) > 0 + + +class TestStatusNotificationHandler: + """Tests for the StatusNotification incoming handler.""" + + async def test_returns_empty_response(self, charge_point): + response = await charge_point.on_status_notification( + timestamp="2026-01-01T00:00:00Z", + evse_id=1, + connector_id=1, + connector_status="Available", + ) + assert response is not None + + +class TestAuthorizeHandler: + """Tests for the Authorize incoming handler.""" + + async def test_normal_mode_accepts(self, charge_point): + response = await charge_point.on_authorize( + id_token={"id_token": "any_token", "type": "ISO14443"} + ) + assert response.id_token_info["status"] == AuthorizationStatusEnumType.accepted + + async def test_whitelist_accepts_valid(self, whitelist_charge_point): + response = await whitelist_charge_point.on_authorize( + id_token={"id_token": "valid_token", "type": "ISO14443"} + ) + assert response.id_token_info["status"] == AuthorizationStatusEnumType.accepted + + async def test_whitelist_blocks_unknown(self, whitelist_charge_point): + response = await whitelist_charge_point.on_authorize( + id_token={"id_token": "stranger", "type": "ISO14443"} + ) + assert response.id_token_info["status"] == AuthorizationStatusEnumType.blocked + + async def test_offline_raises_internal_error(self, offline_charge_point): + from ocpp.exceptions import InternalError + + with pytest.raises(InternalError): + await offline_charge_point.on_authorize( + id_token={"id_token": "any", "type": "ISO14443"} + ) + + +class TestTransactionEventHandler: + """Tests for the TransactionEvent incoming handler.""" + + async def test_started_returns_auth_status(self, charge_point): + response = await charge_point.on_transaction_event( + event_type=TransactionEventEnumType.started, + timestamp="2026-01-01T00:00:00Z", + trigger_reason="Authorized", + seq_no=0, + transaction_info={"transaction_id": "txn-001"}, + id_token={"id_token": "test_token", "type": "ISO14443"}, + ) + assert response.id_token_info["status"] == AuthorizationStatusEnumType.accepted + + async def test_updated_returns_total_cost(self, charge_point): + response = await charge_point.on_transaction_event( + event_type=TransactionEventEnumType.updated, + timestamp="2026-01-01T00:00:00Z", + trigger_reason="MeterValuePeriodic", + seq_no=1, + transaction_info={"transaction_id": "txn-001"}, + ) + assert response.total_cost == DEFAULT_TOTAL_COST + + async def test_updated_uses_custom_total_cost(self, mock_connection): + cp = ChargePoint(mock_connection, total_cost=42.50) + response = await cp.on_transaction_event( + event_type=TransactionEventEnumType.updated, + timestamp="2026-01-01T00:00:00Z", + trigger_reason="MeterValuePeriodic", + seq_no=1, + transaction_info={"transaction_id": "txn-001"}, + ) + assert response.total_cost == 42.50 + + async def test_ended_returns_empty(self, charge_point): + response = await charge_point.on_transaction_event( + event_type=TransactionEventEnumType.ended, + timestamp="2026-01-01T00:00:00Z", + trigger_reason="StopAuthorized", + seq_no=2, + transaction_info={"transaction_id": "txn-001"}, + ) + assert response is not None + + +class TestDataTransferHandler: + """Tests for the DataTransfer incoming handler.""" + + async def test_returns_accepted(self, charge_point): + response = await charge_point.on_data_transfer(vendor_id="TestVendor") + assert response.status == DataTransferStatusEnumType.accepted + + +class TestCertificateHandlers: + """Tests for certificate-related incoming handlers.""" + + async def test_get_15118_ev_certificate(self, charge_point): + response = await charge_point.on_get_15118_ev_certificate( + iso15118_schema_version="urn:iso:15118:2:2013:MsgDef", + action="Install", + exi_request="mock_exi_data", + ) + assert response.status == Iso15118EVCertificateStatusEnumType.accepted + assert response.exi_response == "mock_exi_response_data" + + async def test_get_certificate_status(self, charge_point): + response = await charge_point.on_get_certificate_status( + ocsp_request_data={ + "hash_algorithm": "SHA256", + "issuer_name_hash": "mock", + "issuer_key_hash": "mock", + "serial_number": "mock", + "responder_url": "https://ocsp.example.com", + } + ) + assert response.status == GetCertificateStatusEnumType.accepted + + async def test_sign_certificate(self, charge_point): + response = await charge_point.on_sign_certificate(csr="mock_csr_data") + assert response.status == GenericStatusEnumType.accepted + + +class TestNotificationHandlers: + """Tests for notification incoming handlers with empty responses.""" + + async def test_meter_values(self, charge_point): + response = await charge_point.on_meter_values( + evse_id=1, meter_value=[{"timestamp": "2026-01-01T00:00:00Z"}] + ) + assert response is not None + + async def test_notify_report(self, charge_point): + response = await charge_point.on_notify_report( + request_id=1, + generated_at="2026-01-01T00:00:00Z", + seq_no=0, + ) + assert response is not None + + async def test_firmware_status_notification(self, charge_point): + response = await charge_point.on_firmware_status_notification( + status="Installed" + ) + assert response is not None + + async def test_log_status_notification(self, charge_point): + response = await charge_point.on_log_status_notification( + status="Uploaded", request_id=1 + ) + assert response is not None + + async def test_security_event_notification(self, charge_point): + response = await charge_point.on_security_event_notification( + event_type="FirmwareUpdated", timestamp="2026-01-01T00:00:00Z" + ) + assert response is not None + + async def test_notify_customer_information(self, charge_point): + response = await charge_point.on_notify_customer_information( + data="customer_data", + seq_no=0, + generated_at="2026-01-01T00:00:00Z", + request_id=1, + ) + assert response is not None + + +class TestSendCommandErrorHandling: + """Tests for error handling in the command dispatch layer.""" + + async def test_timeout_is_caught(self, charge_point): + from ocpp.v201.enums import Action + + with patch.object( + charge_point, "_send_clear_cache", side_effect=TimeoutError("timed out") + ): + await charge_point._send_command(command_name=Action.clear_cache) + + async def test_ocpp_error_is_caught(self, charge_point): + from ocpp.exceptions import InternalError as OCPPInternalError + from ocpp.v201.enums import Action + + with patch.object( + charge_point, + "_send_clear_cache", + side_effect=OCPPInternalError(description="test error"), + ): + await charge_point._send_command(command_name=Action.clear_cache) + + async def test_connection_closed_is_caught(self, charge_point): + from ocpp.v201.enums import Action + from websockets.exceptions import ConnectionClosedOK + from websockets.frames import Close + + with ( + patch.object( + charge_point, + "_send_clear_cache", + side_effect=ConnectionClosedOK( + Close(1000, ""), Close(1000, ""), rcvd_then_sent=True + ), + ), + patch.object(charge_point, "handle_connection_closed"), + ): + await charge_point._send_command(command_name=Action.clear_cache) + charge_point.handle_connection_closed.assert_called_once() + + async def test_unsupported_command_logs_warning(self, charge_point): + unsupported = MagicMock(value="Unsupported") + await charge_point._send_command(command_name=unsupported) diff --git a/tests/ocpp-server/timer.py b/tests/ocpp-server/timer.py index 53e6373e..d3eef454 100644 --- a/tests/ocpp-server/timer.py +++ b/tests/ocpp-server/timer.py @@ -1,64 +1,62 @@ -"""Timer for asyncio.""" +"""Asynchronous timer with support for one-shot and repeating callbacks.""" import asyncio +import inspect +from collections.abc import Callable +from typing import Any class Timer: + """Asynchronous timer with one-shot and repeating callback support.""" + def __init__( self, timeout: float, repeat: bool, - callback, - callback_args=(), - callback_kwargs=None, + callback: Callable[..., Any], + callback_args: tuple = (), + callback_kwargs: dict[str, Any] | None = None, ): - """ - An asynchronous Timer object. + """Create an asynchronous timer. Parameters ---------- - timeout: :class:`float`: - The duration for which the timer should last. - - repeat: :class:`bool`: - Whether the timer should repeat. - - callback: :class:`Coroutine` or `Method` or `Function`: - An `asyncio` coroutine or a regular method that will be called as soon as - the timer ends. + timeout: + Duration in seconds before the callback is invoked. + repeat: + Whether the timer should repeat after each invocation. + callback: + A sync or async callable to invoke when the timer fires. + callback_args: + Positional arguments passed to the callback. + callback_kwargs: + Keyword arguments passed to the callback. - callback_args: Optional[:class:`tuple`]: - The args to be passed to the callback. - - callback_kwargs: Optional[:class:`dict`]: - The kwargs to be passed to the callback. """ self._timeout = timeout self._repeat = repeat self._callback = callback - self._task = asyncio.create_task(self._job()) self._callback_args = callback_args - if callback_kwargs is None: - callback_kwargs = {} - self._callback_kwargs = callback_kwargs + self._callback_kwargs = callback_kwargs or {} + self._task = asyncio.create_task(self._job()) - async def _job(self): + async def _job(self) -> None: if self._repeat: - while self._task.cancelled() is False: + while not self._task.cancelled(): await asyncio.sleep(self._timeout) await self._call_callback() else: await asyncio.sleep(self._timeout) await self._call_callback() - async def _call_callback(self): - if asyncio.iscoroutine(self._callback) or asyncio.iscoroutinefunction( - self._callback - ): + async def _call_callback(self) -> None: + if inspect.iscoroutinefunction(self._callback): await self._callback(*self._callback_args, **self._callback_kwargs) else: - self._callback(*self._callback_args, **self._callback_kwargs) + result = self._callback(*self._callback_args, **self._callback_kwargs) + if inspect.isawaitable(result): + await result - def cancel(self): - """Cancels the timer. The callback will not be called.""" + def cancel(self) -> None: + """Cancel the timer. The callback will not be called.""" self._task.cancel() -- 2.43.0