import argparse
import asyncio
import logging
+import math
from dataclasses import dataclass
from datetime import datetime, timezone
from enum import StrEnum
from timer import Timer
-logging.basicConfig(level=logging.DEBUG)
+logger = logging.getLogger(__name__)
# Server defaults
DEFAULT_HOST = "127.0.0.1"
DEFAULT_PORT = 9000
DEFAULT_HEARTBEAT_INTERVAL = 60
DEFAULT_TOTAL_COST = 10.0
+MAX_REQUEST_ID = 2**31 - 1
SUBPROTOCOLS: list[websockets.Subprotocol] = [
websockets.Subprotocol("ocpp2.0"),
websockets.Subprotocol("ocpp2.0.1"),
]
+def _random_request_id() -> int:
+ """Generate a random OCPP request ID within the valid range."""
+ return randint(1, MAX_REQUEST_ID) # noqa: S311
+
+
class AuthMode(StrEnum):
"""Authorization modes for testing different authentication scenarios."""
auth_config: AuthConfig
boot_status: RegistrationStatusEnumType
total_cost: float
+ # Intentionally mutable despite frozen dataclass
charge_points: set["ChargePoint"]
# Extract CP ID from last URL segment (OCPP 2.0.1 Part 4)
cp_id = connection.path.strip("/").split("/")[-1]
if cp_id == "":
- logging.warning("Empty CP ID extracted from path: %s", connection.path)
+ logger.warning("Empty CP ID extracted from path: %s", connection.path)
super().__init__(cp_id, connection)
self._charge_points = charge_points if charge_points is not None else set()
self._command_timer = None
@on(Action.boot_notification)
async def on_boot_notification(self, charging_station, reason, **kwargs):
- logging.info("Received %s", Action.boot_notification)
+ logger.info("Received %s", Action.boot_notification)
return ocpp.v201.call_result.BootNotification(
current_time=datetime.now(timezone.utc).isoformat(),
interval=DEFAULT_HEARTBEAT_INTERVAL,
@on(Action.heartbeat)
async def on_heartbeat(self, **kwargs):
- logging.info("Received %s", Action.heartbeat)
+ logger.info("Received %s", Action.heartbeat)
return ocpp.v201.call_result.Heartbeat(
current_time=datetime.now(timezone.utc).isoformat()
)
async def on_status_notification(
self, timestamp, evse_id: int, connector_id: int, connector_status, **kwargs
):
- logging.info("Received %s", Action.status_notification)
+ logger.info("Received %s", Action.status_notification)
return ocpp.v201.call_result.StatusNotification()
@on(Action.authorize)
async def on_authorize(self, id_token, **kwargs):
- logging.info(
+ logger.info(
"Received %s for token: %s", Action.authorize, id_token.get("id_token")
)
if self._auth_config.offline:
- logging.warning("Offline mode - simulating network failure")
+ logger.warning("Offline mode - simulating network failure")
raise InternalError(description="Simulated network failure")
token_id = id_token.get("id_token", "")
status = self._resolve_auth_status(token_id)
- logging.info("Authorization status for %s: %s", token_id, status)
+ logger.info("Authorization status for %s: %s", token_id, status)
return ocpp.v201.call_result.Authorize(id_token_info={"status": status})
@on(Action.transaction_event)
):
match event_type:
case TransactionEventEnumType.started:
- logging.info("Received %s Started", Action.transaction_event)
+ logger.info("Received %s Started", Action.transaction_event)
id_token = kwargs.get("id_token", {})
token_id = id_token.get("id_token", "")
status = self._resolve_auth_status(token_id)
- logging.info(
+ logger.info(
"Transaction start auth status for %s: %s", token_id, status
)
return ocpp.v201.call_result.TransactionEvent(
id_token_info={"status": status}
)
case TransactionEventEnumType.updated:
- logging.info("Received %s Updated", Action.transaction_event)
+ logger.info("Received %s Updated", Action.transaction_event)
return ocpp.v201.call_result.TransactionEvent(
total_cost=self._total_cost
)
case TransactionEventEnumType.ended:
- logging.info("Received %s Ended", Action.transaction_event)
+ logger.info("Received %s Ended", Action.transaction_event)
return ocpp.v201.call_result.TransactionEvent()
case _:
- logging.warning("Unknown transaction event type: %s", event_type)
+ logger.warning("Unknown transaction event type: %s", event_type)
return ocpp.v201.call_result.TransactionEvent()
@on(Action.meter_values)
async def on_meter_values(self, evse_id: int, meter_value, **kwargs):
- logging.info("Received %s", Action.meter_values)
+ logger.info("Received %s", Action.meter_values)
return ocpp.v201.call_result.MeterValues()
@on(Action.notify_report)
async def on_notify_report(
self, request_id: int, generated_at, seq_no: int, **kwargs
):
- logging.info("Received %s", Action.notify_report)
+ logger.info("Received %s", Action.notify_report)
return ocpp.v201.call_result.NotifyReport()
@on(Action.data_transfer)
async def on_data_transfer(self, vendor_id: str, **kwargs):
- logging.info("Received %s", Action.data_transfer)
+ logger.info("Received %s", Action.data_transfer)
return ocpp.v201.call_result.DataTransfer(
status=DataTransferStatusEnumType.accepted
)
@on(Action.firmware_status_notification)
async def on_firmware_status_notification(self, status, **kwargs):
- logging.info("Received %s", Action.firmware_status_notification)
+ logger.info("Received %s", Action.firmware_status_notification)
return ocpp.v201.call_result.FirmwareStatusNotification()
@on(Action.log_status_notification)
async def on_log_status_notification(self, status, request_id: int, **kwargs):
- logging.info("Received %s", Action.log_status_notification)
+ logger.info("Received %s", Action.log_status_notification)
return ocpp.v201.call_result.LogStatusNotification()
@on(Action.security_event_notification)
async def on_security_event_notification(self, event_type, timestamp, **kwargs):
- logging.info("Received %s", Action.security_event_notification)
+ logger.info("Received %s", Action.security_event_notification)
return ocpp.v201.call_result.SecurityEventNotification()
@on(Action.get_15118_ev_certificate)
async def on_get_15118_ev_certificate(
self, iso15118_schema_version, action, exi_request, **kwargs
):
- logging.info("Received %s", Action.get_15118_ev_certificate)
+ logger.info("Received %s", Action.get_15118_ev_certificate)
return ocpp.v201.call_result.Get15118EVCertificate(
status=Iso15118EVCertificateStatusEnumType.accepted,
exi_response="mock_exi_response_data",
@on(Action.get_certificate_status)
async def on_get_certificate_status(self, ocsp_request_data, **kwargs):
- logging.info("Received %s", Action.get_certificate_status)
+ logger.info("Received %s", Action.get_certificate_status)
return ocpp.v201.call_result.GetCertificateStatus(
status=GetCertificateStatusEnumType.accepted,
)
@on(Action.sign_certificate)
async def on_sign_certificate(self, csr, **kwargs):
- logging.info("Received %s", Action.sign_certificate)
+ logger.info("Received %s", Action.sign_certificate)
return ocpp.v201.call_result.SignCertificate(
status=GenericStatusEnumType.accepted,
)
async def on_notify_customer_information(
self, data, seq_no: int, generated_at, request_id: int, **kwargs
):
- logging.info("Received %s", Action.notify_customer_information)
+ logger.info("Received %s", Action.notify_customer_information)
return ocpp.v201.call_result.NotifyCustomerInformation()
# --- Outgoing commands (CSMS → CS) ---
- async def _send_clear_cache(self):
- request = ocpp.v201.call.ClearCache()
+ async def _call_and_log(self, request, action: Action, success_status) -> None:
+ """Send an OCPP request and log success or failure."""
response = await self.call(request, suppress=False)
-
- if response.status == ClearCacheStatusEnumType.accepted:
- logging.info("%s successful", Action.clear_cache)
+ if response.status == success_status:
+ logger.info("%s successful", action)
else:
- logging.info("%s failed", Action.clear_cache)
+ logger.info("%s failed", action)
+
+ async def _send_clear_cache(self):
+ request = ocpp.v201.call.ClearCache()
+ await self._call_and_log(
+ request, Action.clear_cache, ClearCacheStatusEnumType.accepted
+ )
async def _send_get_base_report(self):
request = ocpp.v201.call.GetBaseReport(
- request_id=randint(1, 2**31 - 1), # noqa: S311
+ request_id=_random_request_id(),
report_base=ReportBaseEnumType.full_inventory,
)
- response = await self.call(request, suppress=False)
-
- if response.status == GenericDeviceModelStatusEnumType.accepted:
- logging.info("%s successful", Action.get_base_report)
- else:
- logging.info("%s failed", Action.get_base_report)
+ await self._call_and_log(
+ request, Action.get_base_report, GenericDeviceModelStatusEnumType.accepted
+ )
async def _send_get_variables(self):
request = ocpp.v201.call.GetVariables(
]
)
await self.call(request, suppress=False)
- logging.info("%s response received", Action.get_variables)
+ logger.info("%s response received", Action.get_variables)
async def _send_set_variables(self):
request = ocpp.v201.call.SetVariables(
]
)
await self.call(request, suppress=False)
- logging.info("%s response received", Action.set_variables)
+ logger.info("%s response received", Action.set_variables)
async def _send_request_start_transaction(self):
request = ocpp.v201.call.RequestStartTransaction(
id_token={"id_token": "test_token", "type": "ISO14443"},
evse_id=1,
- remote_start_id=randint(1, 2**31 - 1), # noqa: S311
+ remote_start_id=_random_request_id(),
)
await self.call(request, suppress=False)
- logging.info("%s response received", Action.request_start_transaction)
+ logger.info("%s response received", Action.request_start_transaction)
async def _send_request_stop_transaction(self):
request = ocpp.v201.call.RequestStopTransaction(
transaction_id="test_transaction_123"
)
await self.call(request, suppress=False)
- logging.info("%s response received", Action.request_stop_transaction)
+ logger.info("%s response received", Action.request_stop_transaction)
async def _send_reset(self):
request = ocpp.v201.call.Reset(type=ResetEnumType.immediate)
- response = await self.call(request, suppress=False)
-
- if response.status == ResetStatusEnumType.accepted:
- logging.info("%s successful", Action.reset)
- else:
- logging.info("%s failed", Action.reset)
+ await self._call_and_log(request, Action.reset, ResetStatusEnumType.accepted)
async def _send_unlock_connector(self):
request = ocpp.v201.call.UnlockConnector(evse_id=1, connector_id=1)
- response = await self.call(request, suppress=False)
-
- if response.status == UnlockStatusEnumType.unlocked:
- logging.info("%s successful", Action.unlock_connector)
- else:
- logging.info("%s failed", Action.unlock_connector)
+ await self._call_and_log(
+ request, Action.unlock_connector, UnlockStatusEnumType.unlocked
+ )
async def _send_change_availability(self):
request = ocpp.v201.call.ChangeAvailability(
operational_status=OperationalStatusEnumType.operative
)
- response = await self.call(request, suppress=False)
-
- if response.status == ChangeAvailabilityStatusEnumType.accepted:
- logging.info("%s successful", Action.change_availability)
- else:
- logging.info("%s failed", Action.change_availability)
+ await self._call_and_log(
+ request,
+ Action.change_availability,
+ ChangeAvailabilityStatusEnumType.accepted,
+ )
async def _send_trigger_message(self):
request = ocpp.v201.call.TriggerMessage(
requested_message=MessageTriggerEnumType.status_notification
)
- response = await self.call(request, suppress=False)
-
- if response.status == TriggerMessageStatusEnumType.accepted:
- logging.info("%s successful", Action.trigger_message)
- else:
- logging.info("%s failed", Action.trigger_message)
+ await self._call_and_log(
+ request, Action.trigger_message, TriggerMessageStatusEnumType.accepted
+ )
async def _send_data_transfer(self):
request = ocpp.v201.call.DataTransfer(
vendor_id="TestVendor", message_id="TestMessage", data="test_data"
)
- response = await self.call(request, suppress=False)
-
- if response.status == DataTransferStatusEnumType.accepted:
- logging.info("%s successful", Action.data_transfer)
- else:
- logging.info("%s failed", Action.data_transfer)
+ await self._call_and_log(
+ request, Action.data_transfer, DataTransferStatusEnumType.accepted
+ )
async def _send_certificate_signed(self):
request = ocpp.v201.call.CertificateSigned(
),
certificate_type=CertificateSigningUseEnumType.charging_station_certificate,
)
- response = await self.call(request, suppress=False)
-
- if response.status == CertificateSignedStatusEnumType.accepted:
- logging.info("%s successful", Action.certificate_signed)
- else:
- logging.info("%s failed", Action.certificate_signed)
+ await self._call_and_log(
+ request,
+ Action.certificate_signed,
+ CertificateSignedStatusEnumType.accepted,
+ )
async def _send_customer_information(self):
request = ocpp.v201.call.CustomerInformation(
- request_id=randint(1, 2**31 - 1), # noqa: S311
+ request_id=_random_request_id(),
report=True,
clear=False,
)
- response = await self.call(request, suppress=False)
-
- if response.status == CustomerInformationStatusEnumType.accepted:
- logging.info("%s successful", Action.customer_information)
- else:
- logging.info("%s failed", Action.customer_information)
+ await self._call_and_log(
+ request,
+ Action.customer_information,
+ CustomerInformationStatusEnumType.accepted,
+ )
async def _send_delete_certificate(self):
request = ocpp.v201.call.DeleteCertificate(
"serial_number": "mock_serial_number",
}
)
- response = await self.call(request, suppress=False)
-
- if response.status == DeleteCertificateStatusEnumType.accepted:
- logging.info("%s successful", Action.delete_certificate)
- else:
- logging.info("%s failed", Action.delete_certificate)
+ await self._call_and_log(
+ request,
+ Action.delete_certificate,
+ DeleteCertificateStatusEnumType.accepted,
+ )
async def _send_get_installed_certificate_ids(self):
request = ocpp.v201.call.GetInstalledCertificateIds(
certificate_type=[GetCertificateIdUseEnumType.csms_root_certificate],
)
- response = await self.call(request, suppress=False)
-
- if response.status == GetInstalledCertificateStatusEnumType.accepted:
- logging.info("%s successful", Action.get_installed_certificate_ids)
- else:
- logging.info("%s failed", Action.get_installed_certificate_ids)
+ await self._call_and_log(
+ request,
+ Action.get_installed_certificate_ids,
+ GetInstalledCertificateStatusEnumType.accepted,
+ )
async def _send_get_log(self):
request = ocpp.v201.call.GetLog(
log={"remote_location": "https://example.com/logs"},
log_type=LogEnumType.diagnostics_log,
- request_id=randint(1, 2**31 - 1), # noqa: S311
+ request_id=_random_request_id(),
)
- response = await self.call(request, suppress=False)
-
- if response.status == LogStatusEnumType.accepted:
- logging.info("%s successful", Action.get_log)
- else:
- logging.info("%s failed", Action.get_log)
+ await self._call_and_log(request, Action.get_log, LogStatusEnumType.accepted)
async def _send_get_transaction_status(self):
request = ocpp.v201.call.GetTransactionStatus(
transaction_id="test_transaction_123",
)
response = await self.call(request, suppress=False)
- logging.info(
+ logger.info(
"%s response received: messages_in_queue=%s",
Action.get_transaction_status,
response.messages_in_queue,
"-----END CERTIFICATE-----"
),
)
- response = await self.call(request, suppress=False)
-
- if response.status == InstallCertificateStatusEnumType.accepted:
- logging.info("%s successful", Action.install_certificate)
- else:
- logging.info("%s failed", Action.install_certificate)
+ await self._call_and_log(
+ request,
+ Action.install_certificate,
+ InstallCertificateStatusEnumType.accepted,
+ )
async def _send_set_network_profile(self):
request = ocpp.v201.call.SetNetworkProfile(
"ocpp_interface": "Wired0",
},
)
- response = await self.call(request, suppress=False)
-
- if response.status == SetNetworkProfileStatusEnumType.accepted:
- logging.info("%s successful", Action.set_network_profile)
- else:
- logging.info("%s failed", Action.set_network_profile)
+ await self._call_and_log(
+ request,
+ Action.set_network_profile,
+ SetNetworkProfileStatusEnumType.accepted,
+ )
async def _send_update_firmware(self):
request = ocpp.v201.call.UpdateFirmware(
- request_id=randint(1, 2**31 - 1), # noqa: S311
+ request_id=_random_request_id(),
firmware={
"location": "https://example.com/firmware/v2.0.bin",
"retrieve_date_time": datetime.now(timezone.utc).isoformat(),
},
)
- response = await self.call(request, suppress=False)
-
- if response.status == UpdateFirmwareStatusEnumType.accepted:
- logging.info("%s successful", Action.update_firmware)
- else:
- logging.info("%s failed", Action.update_firmware)
+ await self._call_and_log(
+ request, Action.update_firmware, UpdateFirmwareStatusEnumType.accepted
+ )
# --- Command dispatch ---
async def _send_command(self, command_name: Action):
- logging.debug("Sending OCPP command %s", command_name)
+ logger.debug("Sending OCPP command %s", command_name)
try:
match command_name:
case Action.clear_cache:
case Action.update_firmware:
await self._send_update_firmware()
case _:
- logging.warning("Not supported command %s", command_name)
+ logger.warning("Not supported command %s", command_name)
except TimeoutError:
- logging.error("Timeout waiting for %s response", command_name)
+ logger.error("Timeout waiting for %s response", command_name)
except OCPPError as e:
- logging.error(
+ logger.error(
"OCPP error sending %s: [%s] %s",
command_name,
type(e).__name__,
e.description,
)
except ConnectionClosed:
- logging.warning("Connection closed while sending %s", command_name)
+ logger.warning("Connection closed while sending %s", command_name)
self.handle_connection_closed()
except Exception:
- logging.exception("Unexpected error sending %s", command_name)
+ logger.exception("Unexpected error sending %s", command_name)
async def send_command(
self, command_name: Action, delay: float | None, period: float | None
self.handle_connection_closed()
def handle_connection_closed(self):
- logging.info("ChargePoint %s closed connection", self.id)
+ logger.info("ChargePoint %s closed connection", self.id)
if self._command_timer:
self._command_timer.cancel()
self._charge_points.discard(self)
- logging.debug("Connected ChargePoint(s): %d", len(self._charge_points))
+ logger.debug("Connected ChargePoint(s): %d", len(self._charge_points))
async def on_connect(
try:
requested_protocols = websocket.request_headers["Sec-WebSocket-Protocol"]
except KeyError:
- logging.info("Client hasn't requested any Subprotocol. Closing Connection")
+ logger.info("Client hasn't requested any Subprotocol. Closing Connection")
return await websocket.close()
if websocket.subprotocol:
- logging.info("Protocols Matched: %s", websocket.subprotocol)
+ logger.info("Protocols Matched: %s", websocket.subprotocol)
else:
- logging.warning(
+ logger.warning(
"Protocols Mismatched | Expected Subprotocols: %s,"
" but client supports %s | Closing connection",
websocket.available_subprotocols,
value = float(value)
except ValueError:
raise argparse.ArgumentTypeError("must be a number") from None
+ if not math.isfinite(value):
+ raise argparse.ArgumentTypeError("must be a finite number")
if value <= 0:
raise argparse.ArgumentTypeError("must be a positive number")
return value
charge_points=set(),
)
- logging.info(
+ logger.info(
"Auth configuration: mode=%s, offline=%s",
auth_config.mode,
auth_config.offline,
args.port,
subprotocols=SUBPROTOCOLS,
)
- logging.info("WebSocket Server Started on %s:%d", args.host, args.port)
+ logger.info("WebSocket Server Started on %s:%d", args.host, args.port)
await server.wait_closed()
if __name__ == "__main__":
+ logging.basicConfig(level=logging.DEBUG)
asyncio.run(main())
from server import (
DEFAULT_HEARTBEAT_INTERVAL,
DEFAULT_TOTAL_COST,
+ MAX_REQUEST_ID,
AuthConfig,
AuthMode,
ChargePoint,
ServerConfig,
+ _random_request_id,
check_positive_number,
on_connect,
)
with pytest.raises(argparse.ArgumentTypeError, match="must be a number"):
check_positive_number("abc")
+ @pytest.mark.parametrize("value", ["inf", "-inf"])
+ def test_infinity_raises(self, value):
+ with pytest.raises(argparse.ArgumentTypeError, match="must be a finite number"):
+ check_positive_number(value)
+
+ def test_nan_raises(self):
+ with pytest.raises(argparse.ArgumentTypeError, match="must be a finite number"):
+ check_positive_number("nan")
+
+
+class TestRandomRequestId:
+ """Tests for MAX_REQUEST_ID constant and _random_request_id helper."""
+
+ def test_max_request_id_value(self):
+ assert MAX_REQUEST_ID == 2**31 - 1
+
+ def test_random_request_id_in_range(self):
+ for _ in range(100):
+ rid = _random_request_id()
+ assert 1 <= rid <= MAX_REQUEST_ID
+
+ def test_random_request_id_returns_int(self):
+ assert isinstance(_random_request_id(), int)
+
class TestResolveAuthStatus:
"""Tests for the _resolve_auth_status method."""
await command_charge_point._send_get_installed_certificate_ids()
assert "failed" in caplog.text.lower()
+ async def test_send_get_base_report_failure_logs(
+ self, command_charge_point, caplog
+ ):
+ caplog.set_level(logging.INFO)
+ command_charge_point.call.return_value = ocpp.v201.call_result.GetBaseReport(
+ status=GenericDeviceModelStatusEnumType.rejected
+ )
+ await command_charge_point._send_get_base_report()
+ assert "failed" in caplog.text.lower()
+
+ async def test_send_unlock_connector_failure_logs(
+ self, command_charge_point, caplog
+ ):
+ caplog.set_level(logging.INFO)
+ command_charge_point.call.return_value = ocpp.v201.call_result.UnlockConnector(
+ status=UnlockStatusEnumType.unlock_failed
+ )
+ await command_charge_point._send_unlock_connector()
+ assert "failed" in caplog.text.lower()
+
+ async def test_send_change_availability_failure_logs(
+ self, command_charge_point, caplog
+ ):
+ caplog.set_level(logging.INFO)
+ command_charge_point.call.return_value = (
+ ocpp.v201.call_result.ChangeAvailability(
+ status=ChangeAvailabilityStatusEnumType.rejected
+ )
+ )
+ await command_charge_point._send_change_availability()
+ assert "failed" in caplog.text.lower()
+
+ async def test_send_trigger_message_failure_logs(
+ self, command_charge_point, caplog
+ ):
+ caplog.set_level(logging.INFO)
+ command_charge_point.call.return_value = ocpp.v201.call_result.TriggerMessage(
+ status=TriggerMessageStatusEnumType.rejected
+ )
+ await command_charge_point._send_trigger_message()
+ assert "failed" in caplog.text.lower()
+
+ async def test_send_certificate_signed_failure_logs(
+ self, command_charge_point, caplog
+ ):
+ caplog.set_level(logging.INFO)
+ command_charge_point.call.return_value = (
+ ocpp.v201.call_result.CertificateSigned(
+ status=CertificateSignedStatusEnumType.rejected
+ )
+ )
+ await command_charge_point._send_certificate_signed()
+ assert "failed" in caplog.text.lower()
+
+ async def test_send_customer_information_failure_logs(
+ self, command_charge_point, caplog
+ ):
+ caplog.set_level(logging.INFO)
+ command_charge_point.call.return_value = (
+ ocpp.v201.call_result.CustomerInformation(
+ status=CustomerInformationStatusEnumType.rejected
+ )
+ )
+ await command_charge_point._send_customer_information()
+ assert "failed" in caplog.text.lower()
+
+ async def test_send_delete_certificate_failure_logs(
+ self, command_charge_point, caplog
+ ):
+ caplog.set_level(logging.INFO)
+ command_charge_point.call.return_value = (
+ ocpp.v201.call_result.DeleteCertificate(
+ status=DeleteCertificateStatusEnumType.failed
+ )
+ )
+ await command_charge_point._send_delete_certificate()
+ assert "failed" in caplog.text.lower()
+
+ async def test_send_get_log_failure_logs(self, command_charge_point, caplog):
+ caplog.set_level(logging.INFO)
+ command_charge_point.call.return_value = ocpp.v201.call_result.GetLog(
+ status=LogStatusEnumType.rejected
+ )
+ await command_charge_point._send_get_log()
+ assert "failed" in caplog.text.lower()
+
+ async def test_send_install_certificate_failure_logs(
+ self, command_charge_point, caplog
+ ):
+ caplog.set_level(logging.INFO)
+ command_charge_point.call.return_value = (
+ ocpp.v201.call_result.InstallCertificate(
+ status=InstallCertificateStatusEnumType.rejected
+ )
+ )
+ await command_charge_point._send_install_certificate()
+ assert "failed" in caplog.text.lower()
+
+ async def test_send_set_network_profile_failure_logs(
+ self, command_charge_point, caplog
+ ):
+ caplog.set_level(logging.INFO)
+ command_charge_point.call.return_value = (
+ ocpp.v201.call_result.SetNetworkProfile(
+ status=SetNetworkProfileStatusEnumType.rejected
+ )
+ )
+ await command_charge_point._send_set_network_profile()
+ assert "failed" in caplog.text.lower()
+
+ async def test_send_update_firmware_failure_logs(
+ self, command_charge_point, caplog
+ ):
+ caplog.set_level(logging.INFO)
+ command_charge_point.call.return_value = ocpp.v201.call_result.UpdateFirmware(
+ status=UpdateFirmwareStatusEnumType.rejected
+ )
+ await command_charge_point._send_update_firmware()
+ assert "failed" in caplog.text.lower()
+
class TestOnConnect:
"""Tests for the on_connect WebSocket connection handler."""