]> Piment Noir Git Repositories - e-mobility-charging-stations-simulator.git/commitdiff
refactor(ocpp-server): audit-driven test improvements
authorJérôme Benoit <jerome.benoit@sap.com>
Tue, 24 Mar 2026 11:20:08 +0000 (12:20 +0100)
committerJérôme Benoit <jerome.benoit@sap.com>
Tue, 24 Mar 2026 11:20:08 +0000 (12:20 +0100)
- Extract mock_valid_ws and rate_limit_charge_point fixtures (DRY)
- Replace inline blacklist AuthConfig with existing fixture
- Strengthen log assertions using caplog.records with level checks
- Add edge case tests: empty token in whitelist/blacklist modes
- Add test for unknown transaction event type (case _ branch)
Coverage: 90.18% → 90.98%, 159 → 162 tests.

tests/ocpp-server/test_server.py

index 175e6f0d0791eb57d1aa53004ea71a75c41ee3cb..907de6f1d9a82dbcd3ef26f6b4e1365c2b336e8e 100644 (file)
@@ -139,6 +139,33 @@ def command_charge_point(mock_connection):
     return cp
 
 
+@pytest.fixture
+def rate_limit_charge_point(mock_connection):
+    """Create a ChargePoint with rate_limit auth mode."""
+    return ChargePoint(
+        mock_connection,
+        auth_config=AuthConfig(
+            mode=AuthMode.rate_limit,
+            whitelist=(),
+            blacklist=(),
+            offline=False,
+            default_status=AuthorizationStatusEnumType.accepted,
+        ),
+    )
+
+
+@pytest.fixture
+def mock_valid_ws():
+    """Create a mock WebSocket with valid OCPP 2.0.1 subprotocol."""
+    ws = MagicMock()
+    ws.request = MagicMock()
+    ws.request.headers = {"Sec-WebSocket-Protocol": "ocpp2.0.1"}
+    ws.subprotocol = "ocpp2.0.1"
+    ws.request.path = "/TestCP"
+    ws.close = AsyncMock()
+    return ws
+
+
 @pytest.fixture
 def main_mocks():
     """Provide mock loop, server, shutdown event, and signal capture."""
@@ -269,22 +296,18 @@ class TestResolveAuthStatus:
         status = blacklist_charge_point._resolve_auth_status("good_token")
         assert status == AuthorizationStatusEnumType.accepted
 
-    def test_rate_limit_mode(self, mock_connection):
-        cp = ChargePoint(
-            mock_connection,
-            auth_config=AuthConfig(
-                mode=AuthMode.rate_limit,
-                whitelist=(),
-                blacklist=(),
-                offline=False,
-                default_status=AuthorizationStatusEnumType.accepted,
-            ),
-        )
-        status = cp._resolve_auth_status("any_token")
+    def test_rate_limit_mode(self, rate_limit_charge_point):
+        status = rate_limit_charge_point._resolve_auth_status("any_token")
         assert status == AuthorizationStatusEnumType.not_at_this_time
 
+    def test_whitelist_blocks_empty_token(self, whitelist_charge_point):
+        status = whitelist_charge_point._resolve_auth_status("")
+        assert status == AuthorizationStatusEnumType.blocked
+
+    def test_blacklist_accepts_empty_token(self, blacklist_charge_point):
+        status = blacklist_charge_point._resolve_auth_status("")
+        assert status == AuthorizationStatusEnumType.accepted
 
-class TestChargePointHandlerCoverage:
     """Tests verifying all expected OCPP 2.0.1 handlers and commands are implemented."""
 
     EXPECTED_INCOMING_HANDLERS: ClassVar[list[str]] = [
@@ -472,44 +495,20 @@ class TestAuthorizeHandler:
                 id_token={"id_token": "any", "type": "ISO14443"}
             )
 
-    async def test_blacklist_blocks_blacklisted(self, mock_connection):
-        auth_config = AuthConfig(
-            mode=AuthMode.blacklist,
-            whitelist=(),
-            blacklist=(TEST_BLOCKED_TOKEN,),
-            offline=False,
-            default_status=AuthorizationStatusEnumType.accepted,
-        )
-        cp = ChargePoint(mock_connection, auth_config=auth_config)
-        response = await cp.on_authorize(
+    async def test_blacklist_blocks_blacklisted(self, blacklist_charge_point):
+        response = await blacklist_charge_point.on_authorize(
             id_token={"id_token": TEST_BLOCKED_TOKEN, "type": "ISO14443"}
         )
         assert response.id_token_info["status"] == AuthorizationStatusEnumType.blocked
 
-    async def test_blacklist_accepts_unlisted(self, mock_connection):
-        auth_config = AuthConfig(
-            mode=AuthMode.blacklist,
-            whitelist=(),
-            blacklist=(TEST_BLOCKED_TOKEN,),
-            offline=False,
-            default_status=AuthorizationStatusEnumType.accepted,
-        )
-        cp = ChargePoint(mock_connection, auth_config=auth_config)
-        response = await cp.on_authorize(
+    async def test_blacklist_accepts_unlisted(self, blacklist_charge_point):
+        response = await blacklist_charge_point.on_authorize(
             id_token={"id_token": "unlisted_token", "type": "ISO14443"}
         )
         assert response.id_token_info["status"] == AuthorizationStatusEnumType.accepted
 
-    async def test_rate_limit_rejects(self, mock_connection):
-        auth_config = AuthConfig(
-            mode=AuthMode.rate_limit,
-            whitelist=(),
-            blacklist=(),
-            offline=False,
-            default_status=AuthorizationStatusEnumType.accepted,
-        )
-        cp = ChargePoint(mock_connection, auth_config=auth_config)
-        response = await cp.on_authorize(
+    async def test_rate_limit_rejects(self, rate_limit_charge_point):
+        response = await rate_limit_charge_point.on_authorize(
             id_token={"id_token": "any_token", "type": "ISO14443"}
         )
         assert (
@@ -564,8 +563,17 @@ class TestTransactionEventHandler:
         assert response.total_cost is None
         assert response.id_token_info is None
 
+    async def test_unknown_event_type_returns_empty(self, charge_point):
+        response = await charge_point.on_transaction_event(
+            event_type="UnknownType",
+            timestamp=TEST_TIMESTAMP,
+            trigger_reason="Unknown",
+            seq_no=0,
+            transaction_info={"transaction_id": TEST_TRANSACTION_ID},
+        )
+        assert response.total_cost is None
+        assert response.id_token_info is None
 
-class TestDataTransferHandler:
     """Tests for the DataTransfer incoming handler."""
 
     async def test_returns_accepted(self, charge_point):
@@ -1021,7 +1029,10 @@ class TestOutgoingCommands:
         caplog.set_level(logging.INFO)
         command_charge_point.call.return_value = result_cls(status=failed_status)
         await getattr(command_charge_point, method_name)()
-        assert "failed" in caplog.text.lower()
+        assert any(
+            r.levelno == logging.INFO and "failed" in r.message.lower()
+            for r in caplog.records
+        )
 
 
 class TestOnConnect:
@@ -1069,31 +1080,19 @@ class TestOnConnect:
         await on_connect(mock_ws, config=config)
         mock_ws.close.assert_called_once()
 
-    async def test_successful_connection_creates_charge_point(self):
-        mock_ws = MagicMock()
-        mock_ws.request = MagicMock()
-        mock_ws.request.headers = {"Sec-WebSocket-Protocol": "ocpp2.0.1"}
-        mock_ws.subprotocol = "ocpp2.0.1"
-        mock_ws.request.path = "/TestCP"
-        mock_ws.close = AsyncMock()
+    async def test_successful_connection_creates_charge_point(self, mock_valid_ws):
         config = self._make_config()
 
         with patch("server.ChargePoint") as MockCP:
             mock_cp = AsyncMock()
             MockCP.return_value = mock_cp
-            await on_connect(mock_ws, config=config)
+            await on_connect(mock_valid_ws, config=config)
             mock_cp.start.assert_called_once()
 
-    async def test_connection_closed_during_start_triggers_cleanup(self):
+    async def test_connection_closed_during_start_triggers_cleanup(self, mock_valid_ws):
         from websockets.exceptions import ConnectionClosedOK
         from websockets.frames import Close
 
-        mock_ws = MagicMock()
-        mock_ws.request = MagicMock()
-        mock_ws.request.headers = {"Sec-WebSocket-Protocol": "ocpp2.0.1"}
-        mock_ws.subprotocol = "ocpp2.0.1"
-        mock_ws.request.path = "/TestCP"
-        mock_ws.close = AsyncMock()
         config = self._make_config()
 
         exc = ConnectionClosedOK(Close(1000, ""), Close(1000, ""), rcvd_then_sent=True)
@@ -1103,16 +1102,10 @@ class TestOnConnect:
             mock_cp.start = AsyncMock(side_effect=exc)
             mock_cp.handle_connection_closed = MagicMock()
             MockCP.return_value = mock_cp
-            await on_connect(mock_ws, config=config)
+            await on_connect(mock_valid_ws, config=config)
             mock_cp.handle_connection_closed.assert_called_once()
 
-    async def test_command_sent_on_connect_when_specified(self):
-        mock_ws = MagicMock()
-        mock_ws.request = MagicMock()
-        mock_ws.request.headers = {"Sec-WebSocket-Protocol": "ocpp2.0.1"}
-        mock_ws.subprotocol = "ocpp2.0.1"
-        mock_ws.request.path = "/TestCP"
-        mock_ws.close = AsyncMock()
+    async def test_command_sent_on_connect_when_specified(self, mock_valid_ws):
         config = self._make_config(
             command_name=Action.clear_cache, delay=1.0, period=None
         )
@@ -1120,7 +1113,7 @@ class TestOnConnect:
         with patch("server.ChargePoint") as MockCP:
             mock_cp = AsyncMock()
             MockCP.return_value = mock_cp
-            await on_connect(mock_ws, config=config)
+            await on_connect(mock_valid_ws, config=config)
             mock_cp.send_command.assert_called_once_with(Action.clear_cache, 1.0, None)
 
 
@@ -1231,7 +1224,10 @@ class TestMainGracefulShutdown:
         with _patch_main(mock_loop, mock_server, mock_event):
             await main()
 
-        assert "timed out" in caplog.text.lower()
+        assert any(
+            r.levelno == logging.WARNING and "timed out" in r.message.lower()
+            for r in caplog.records
+        )
 
     async def test_windows_fallback_registers_signal_handlers(self, main_mocks):
         mock_loop, mock_server, mock_event, _ = main_mocks