poetry run python server.py
```
-The server will start listening for connections on port 9000.
+The server will start listening for connections on `127.0.0.1:9000` by default.
+
+### Server Configuration
+
+```shell
+poetry run python server.py --host <HOST> --port <PORT>
+```
+
+**Options:**
+
+- `--host <HOST>`: Server bind address (default: `127.0.0.1`)
+- `--port <PORT>`: Server port (default: `9000`)
## Running the server with OCPP command sending
- `StatusNotification` - Handle status notifications
- `TransactionEvent` - Handle transaction events (Started/Updated/Ended)
+## Charging Behavior Configuration
+
+```shell
+poetry run python server.py --boot-status <STATUS> --total-cost <COST>
+```
+
+**Options:**
+
+- `--boot-status <STATUS>`: BootNotification response status (`accepted`, `pending`, `rejected`; default: `accepted`)
+- `--total-cost <COST>`: Total cost returned in TransactionEvent.Updated responses (default: `10.0`)
+
+**Examples:**
+
+```shell
+poetry run python server.py --boot-status rejected
+poetry run python server.py --total-cost 25.50
+```
+
## Authorization Testing Modes
The server supports configurable authorization behavior for testing OCPP 2.0 authentication scenarios:
[package.extras]
dev = ["argcomplete", "attrs (>=19.2)", "hypothesis (>=3.56)", "mock", "requests", "setuptools", "xmlschema"]
+[[package]]
+name = "pytest-asyncio"
+version = "1.3.0"
+description = "Pytest support for asyncio"
+optional = false
+python-versions = ">=3.10"
+groups = ["dev"]
+files = [
+ {file = "pytest_asyncio-1.3.0-py3-none-any.whl", hash = "sha256:611e26147c7f77640e6d0a92a38ed17c3e9848063698d5c93d5aa7aa11cebff5"},
+ {file = "pytest_asyncio-1.3.0.tar.gz", hash = "sha256:d7f52f36d231b80ee124cd216ffb19369aa168fc10095013c6b014a34d3ee9e5"},
+]
+
+[package.dependencies]
+pytest = ">=8.2,<10"
+typing-extensions = {version = ">=4.12", markers = "python_version < \"3.13\""}
+
+[package.extras]
+docs = ["sphinx (>=5.3)", "sphinx-rtd-theme (>=1)"]
+testing = ["coverage (>=6.2)", "hypothesis (>=5.7.1)"]
+
[[package]]
name = "referencing"
version = "0.37.0"
description = "Backported and Experimental Type Hints for Python 3.9+"
optional = false
python-versions = ">=3.9"
-groups = ["main"]
+groups = ["main", "dev"]
markers = "python_version == \"3.12\""
files = [
{file = "typing_extensions-4.15.0-py3-none-any.whl", hash = "sha256:f0fa19c6845758ab08074a0cfa8b7aecb71c999ca73d62883bc25cc018c4e548"},
[metadata]
lock-version = "2.1"
python-versions = "^3.12"
-content-hash = "4f89833c0ea50e967a4cdc8da890763e911e8ca4f2dd29a5e6e884d014cc2386"
+content-hash = "64161175b159d2fda4d43410892ac9fee77a1137720a8e3f788b70d7c44256d7"
taskipy = "^1"
ruff = "^0.15"
pytest = "^8"
+pytest-asyncio = ">=1.3.0"
[tool.taskipy.tasks]
server = "poetry run python server.py"
[tool.ruff.lint.per-file-ignores]
"test_*.py" = ["S101"]
+[tool.pytest.ini_options]
+asyncio_mode = "auto"
+
[build-system]
requires = ["poetry-core"]
+"""OCPP 2.0.1 mock server for e-mobility charging station simulator testing."""
+
import argparse
import asyncio
import logging
import ocpp.v201
import websockets
-from ocpp.exceptions import InternalError
+from ocpp.exceptions import InternalError, OCPPError
from ocpp.routing import on
from ocpp.v201.enums import (
Action,
from timer import Timer
-# Setting up the logging configuration to display debug level messages.
logging.basicConfig(level=logging.DEBUG)
-ChargePoints = set()
+# Server defaults
+DEFAULT_HOST = "127.0.0.1"
+DEFAULT_PORT = 9000
+DEFAULT_HEARTBEAT_INTERVAL = 60
+DEFAULT_TOTAL_COST = 10.0
+SUBPROTOCOLS = ["ocpp2.0", "ocpp2.0.1"]
+
+ChargePoints: set["ChargePoint"] = set()
class ChargePoint(ocpp.v201.ChargePoint):
+ """OCPP 2.0.1 charge point handler with configurable behavior for testing."""
+
_command_timer: Timer | None
_auth_config: dict
+ _boot_status: RegistrationStatusEnumType
+ _total_cost: float
- def __init__(self, connection, auth_config: dict | None = None):
+ def __init__(
+ self,
+ connection,
+ auth_config: dict | None = None,
+ boot_status: RegistrationStatusEnumType = RegistrationStatusEnumType.accepted,
+ total_cost: float = DEFAULT_TOTAL_COST,
+ ):
super().__init__(connection.path.strip("/"), connection)
self._command_timer = None
- # Auth configuration for testing different scenarios
+ self._boot_status = boot_status
+ self._total_cost = total_cost
self._auth_config = auth_config or {
- "mode": "normal", # normal, offline, whitelist, blacklist, rate_limit
+ "mode": "normal",
"whitelist": ["valid_token", "test_token", "authorized_user"],
"blacklist": ["blocked_token", "invalid_user"],
- "offline": False, # Simulate network failure
+ "offline": False,
"default_status": AuthorizationStatusEnumType.accepted,
}
)
if mode == "rate_limit":
return AuthorizationStatusEnumType.not_at_this_time
- # normal mode
return self._auth_config.get(
"default_status", AuthorizationStatusEnumType.accepted
)
- # Message handlers to receive OCPP messages.
+ # --- Incoming message handlers (CS → CSMS) ---
+
@on(Action.boot_notification)
async def on_boot_notification(self, charging_station, reason, **kwargs):
logging.info("Received %s", Action.boot_notification)
- # Create and return a BootNotification response with the current time,
- # an interval of 60 seconds, and an accepted status.
return ocpp.v201.call_result.BootNotification(
current_time=datetime.now(timezone.utc).isoformat(),
- interval=60,
- status=RegistrationStatusEnumType.accepted,
+ interval=DEFAULT_HEARTBEAT_INTERVAL,
+ status=self._boot_status,
)
@on(Action.heartbeat)
"Received %s for token: %s", Action.authorize, id_token.get("id_token")
)
- # Simulate offline mode (network failure)
if self._auth_config.get("offline", False):
logging.warning("Offline mode - simulating network failure")
raise InternalError(description="Simulated network failure")
)
case TransactionEventEnumType.updated:
logging.info("Received %s Updated", Action.transaction_event)
- return ocpp.v201.call_result.TransactionEvent(total_cost=10)
+ return ocpp.v201.call_result.TransactionEvent(
+ total_cost=self._total_cost
+ )
case TransactionEventEnumType.ended:
logging.info("Received %s Ended", Action.transaction_event)
return ocpp.v201.call_result.TransactionEvent()
logging.info("Received %s", Action.notify_customer_information)
return ocpp.v201.call_result.NotifyCustomerInformation()
- # Request handlers to emit OCPP messages.
+ # --- Outgoing commands (CSMS → CS) ---
+
async def _send_clear_cache(self):
request = ocpp.v201.call.ClearCache()
response = await self.call(request)
else:
logging.info("%s failed", Action.update_firmware)
+ # --- Command dispatch ---
+
async def _send_command(self, command_name: Action):
logging.debug("Sending OCPP command %s", command_name)
- match command_name:
- case Action.clear_cache:
- await self._send_clear_cache()
- case Action.get_base_report:
- await self._send_get_base_report()
- case Action.get_variables:
- await self._send_get_variables()
- case Action.set_variables:
- await self._send_set_variables()
- case Action.request_start_transaction:
- await self._send_request_start_transaction()
- case Action.request_stop_transaction:
- await self._send_request_stop_transaction()
- case Action.reset:
- await self._send_reset()
- case Action.unlock_connector:
- await self._send_unlock_connector()
- case Action.change_availability:
- await self._send_change_availability()
- case Action.trigger_message:
- await self._send_trigger_message()
- case Action.data_transfer:
- await self._send_data_transfer()
- case Action.certificate_signed:
- await self._send_certificate_signed()
- case Action.customer_information:
- await self._send_customer_information()
- case Action.delete_certificate:
- await self._send_delete_certificate()
- case Action.get_installed_certificate_ids:
- await self._send_get_installed_certificate_ids()
- case Action.get_log:
- await self._send_get_log()
- case Action.get_transaction_status:
- await self._send_get_transaction_status()
- case Action.install_certificate:
- await self._send_install_certificate()
- case Action.set_network_profile:
- await self._send_set_network_profile()
- case Action.update_firmware:
- await self._send_update_firmware()
- case _:
- logging.warning("Not supported command %s", command_name)
+ try:
+ match command_name:
+ case Action.clear_cache:
+ await self._send_clear_cache()
+ case Action.get_base_report:
+ await self._send_get_base_report()
+ case Action.get_variables:
+ await self._send_get_variables()
+ case Action.set_variables:
+ await self._send_set_variables()
+ case Action.request_start_transaction:
+ await self._send_request_start_transaction()
+ case Action.request_stop_transaction:
+ await self._send_request_stop_transaction()
+ case Action.reset:
+ await self._send_reset()
+ case Action.unlock_connector:
+ await self._send_unlock_connector()
+ case Action.change_availability:
+ await self._send_change_availability()
+ case Action.trigger_message:
+ await self._send_trigger_message()
+ case Action.data_transfer:
+ await self._send_data_transfer()
+ case Action.certificate_signed:
+ await self._send_certificate_signed()
+ case Action.customer_information:
+ await self._send_customer_information()
+ case Action.delete_certificate:
+ await self._send_delete_certificate()
+ case Action.get_installed_certificate_ids:
+ await self._send_get_installed_certificate_ids()
+ case Action.get_log:
+ await self._send_get_log()
+ case Action.get_transaction_status:
+ await self._send_get_transaction_status()
+ case Action.install_certificate:
+ await self._send_install_certificate()
+ case Action.set_network_profile:
+ await self._send_set_network_profile()
+ case Action.update_firmware:
+ await self._send_update_firmware()
+ case _:
+ logging.warning("Not supported command %s", command_name)
+ except TimeoutError:
+ logging.error("Timeout waiting for %s response", command_name)
+ except OCPPError as e:
+ logging.error(
+ "OCPP error sending %s: [%s] %s",
+ command_name,
+ type(e).__name__,
+ e.description,
+ )
+ except ConnectionClosed:
+ logging.warning("Connection closed while sending %s", command_name)
+ self.handle_connection_closed()
+ except Exception:
+ logging.exception("Unexpected error sending %s", command_name)
async def send_command(
self, command_name: Action, delay: float | None, period: float | None
delay,
False,
self._send_command,
- [command_name],
+ (command_name,),
)
if period and not self._command_timer:
self._command_timer = Timer(
period,
True,
self._send_command,
- [command_name],
+ (command_name,),
)
except ConnectionClosed:
self.handle_connection_closed()
logging.debug("Connected ChargePoint(s): %d", len(ChargePoints))
-# Function to handle new WebSocket connections.
async def on_connect(
websocket,
command_name: Action | None,
delay: float | None,
period: float | None,
auth_config: dict | None,
+ boot_status: RegistrationStatusEnumType,
+ total_cost: float,
):
- """For every new charge point that connects, create a ChargePoint instance and start
- listening for messages.
- """
+ """Handle new WebSocket connections from charge points."""
try:
requested_protocols = websocket.request_headers["Sec-WebSocket-Protocol"]
except KeyError:
)
return await websocket.close()
- cp = ChargePoint(websocket, auth_config)
+ cp = ChargePoint(
+ websocket,
+ auth_config=auth_config,
+ boot_status=boot_status,
+ total_cost=total_cost,
+ )
if command_name:
await cp.send_command(command_name, delay, period)
return value
-# Main function to start the WebSocket server.
async def main():
parser = argparse.ArgumentParser(description="OCPP2 Server")
parser.add_argument("-c", "--command", type=Action, help="command name")
help="period in seconds",
)
- # Auth configuration arguments
+ # Server configuration
+ parser.add_argument(
+ "--host",
+ type=str,
+ default=DEFAULT_HOST,
+ help=f"server host (default: {DEFAULT_HOST})",
+ )
+ parser.add_argument(
+ "--port",
+ type=int,
+ default=DEFAULT_PORT,
+ help=f"server port (default: {DEFAULT_PORT})",
+ )
+
+ # Charging configuration
+ parser.add_argument(
+ "--boot-status",
+ type=RegistrationStatusEnumType,
+ default=RegistrationStatusEnumType.accepted,
+ help="boot notification response status (default: accepted)",
+ )
+ parser.add_argument(
+ "--total-cost",
+ type=float,
+ default=DEFAULT_TOTAL_COST,
+ help=f"TransactionEvent.Updated total cost (default: {DEFAULT_TOTAL_COST})",
+ )
+
+ # Auth configuration
parser.add_argument(
"--auth-mode",
type=str,
help="Simulate offline/network failure mode",
)
- # Parse args to check if group.required should be set
args, _ = parser.parse_known_args()
group.required = args.command is not None
- # Re-parse with full validation
args = parser.parse_args()
- # Build auth configuration from CLI args
auth_config = {
"mode": args.auth_mode,
"whitelist": args.whitelist,
"Auth configuration: mode=%s, offline=%s", args.auth_mode, args.offline
)
- # Create the WebSocket server and specify the handler for new connections.
server = await websockets.serve(
partial(
on_connect,
delay=args.delay,
period=args.period,
auth_config=auth_config,
+ boot_status=args.boot_status,
+ total_cost=args.total_cost,
),
- "127.0.0.1", # Listen on loopback.
- 9000, # Port number.
- subprotocols=["ocpp2.0", "ocpp2.0.1"], # Specify OCPP 2.0.1 subprotocols.
+ args.host,
+ args.port,
+ subprotocols=SUBPROTOCOLS,
)
- logging.info("WebSocket Server Started")
+ logging.info("WebSocket Server Started on %s:%d", args.host, args.port)
- # Wait for the server to close (runs indefinitely).
await server.wait_closed()
-# Entry point of the script.
if __name__ == "__main__":
- # Run the main function to start the server.
asyncio.run(main())
import argparse
from typing import ClassVar
-from unittest.mock import MagicMock
+from unittest.mock import MagicMock, patch
import pytest
-from ocpp.v201.enums import AuthorizationStatusEnumType
-
-from server import ChargePoint, check_positive_number
+from ocpp.v201.enums import (
+ AuthorizationStatusEnumType,
+ DataTransferStatusEnumType,
+ GenericStatusEnumType,
+ GetCertificateStatusEnumType,
+ Iso15118EVCertificateStatusEnumType,
+ RegistrationStatusEnumType,
+ TransactionEventEnumType,
+)
+
+from server import (
+ DEFAULT_HEARTBEAT_INTERVAL,
+ DEFAULT_TOTAL_COST,
+ ChargePoint,
+ check_positive_number,
+)
@pytest.fixture
)
+@pytest.fixture
+def offline_charge_point(mock_connection):
+ """Create a ChargePoint with offline mode enabled."""
+ return ChargePoint(
+ mock_connection,
+ auth_config={
+ "mode": "normal",
+ "whitelist": [],
+ "blacklist": [],
+ "offline": True,
+ "default_status": AuthorizationStatusEnumType.accepted,
+ },
+ )
+
+
class TestCheckPositiveNumber:
"""Tests for the check_positive_number argument validator."""
assert callable(getattr(ChargePoint, method_name))
def test_incoming_handler_count(self):
- """Verify no handlers were accidentally removed."""
handler_count = sum(
1
for name in dir(ChargePoint)
assert handler_count >= len(self.EXPECTED_INCOMING_HANDLERS)
def test_outgoing_command_count(self):
- """Verify no outgoing commands were accidentally removed."""
command_count = sum(
1
for name in dir(ChargePoint)
def test_command_timer_initially_none(self, charge_point):
assert charge_point._command_timer is None
+
+ def test_default_boot_status(self, charge_point):
+ assert charge_point._boot_status == RegistrationStatusEnumType.accepted
+
+ def test_custom_boot_status(self, mock_connection):
+ cp = ChargePoint(
+ mock_connection, boot_status=RegistrationStatusEnumType.rejected
+ )
+ assert cp._boot_status == RegistrationStatusEnumType.rejected
+
+ def test_default_total_cost(self, charge_point):
+ assert charge_point._total_cost == DEFAULT_TOTAL_COST
+
+ def test_custom_total_cost(self, mock_connection):
+ cp = ChargePoint(mock_connection, total_cost=25.50)
+ assert cp._total_cost == 25.50
+
+
+# --- Async handler tests ---
+
+
+class TestBootNotificationHandler:
+ """Tests for the BootNotification incoming handler."""
+
+ async def test_returns_accepted_by_default(self, charge_point):
+ response = await charge_point.on_boot_notification(
+ charging_station={"model": "Test", "vendor_name": "TestVendor"},
+ reason="PowerUp",
+ )
+ assert response.status == RegistrationStatusEnumType.accepted
+ assert response.interval == DEFAULT_HEARTBEAT_INTERVAL
+ assert response.current_time is not None
+
+ async def test_configurable_boot_status(self, mock_connection):
+ cp = ChargePoint(
+ mock_connection, boot_status=RegistrationStatusEnumType.rejected
+ )
+ response = await cp.on_boot_notification(
+ charging_station={"model": "Test", "vendor_name": "TestVendor"},
+ reason="PowerUp",
+ )
+ assert response.status == RegistrationStatusEnumType.rejected
+
+ async def test_pending_boot_status(self, mock_connection):
+ cp = ChargePoint(
+ mock_connection, boot_status=RegistrationStatusEnumType.pending
+ )
+ response = await cp.on_boot_notification(
+ charging_station={"model": "Test", "vendor_name": "TestVendor"},
+ reason="PowerUp",
+ )
+ assert response.status == RegistrationStatusEnumType.pending
+
+
+class TestHeartbeatHandler:
+ """Tests for the Heartbeat incoming handler."""
+
+ async def test_returns_current_time(self, charge_point):
+ response = await charge_point.on_heartbeat()
+ assert response.current_time is not None
+ assert len(response.current_time) > 0
+
+
+class TestStatusNotificationHandler:
+ """Tests for the StatusNotification incoming handler."""
+
+ async def test_returns_empty_response(self, charge_point):
+ response = await charge_point.on_status_notification(
+ timestamp="2026-01-01T00:00:00Z",
+ evse_id=1,
+ connector_id=1,
+ connector_status="Available",
+ )
+ assert response is not None
+
+
+class TestAuthorizeHandler:
+ """Tests for the Authorize incoming handler."""
+
+ async def test_normal_mode_accepts(self, charge_point):
+ response = await charge_point.on_authorize(
+ id_token={"id_token": "any_token", "type": "ISO14443"}
+ )
+ assert response.id_token_info["status"] == AuthorizationStatusEnumType.accepted
+
+ async def test_whitelist_accepts_valid(self, whitelist_charge_point):
+ response = await whitelist_charge_point.on_authorize(
+ id_token={"id_token": "valid_token", "type": "ISO14443"}
+ )
+ assert response.id_token_info["status"] == AuthorizationStatusEnumType.accepted
+
+ async def test_whitelist_blocks_unknown(self, whitelist_charge_point):
+ response = await whitelist_charge_point.on_authorize(
+ id_token={"id_token": "stranger", "type": "ISO14443"}
+ )
+ assert response.id_token_info["status"] == AuthorizationStatusEnumType.blocked
+
+ async def test_offline_raises_internal_error(self, offline_charge_point):
+ from ocpp.exceptions import InternalError
+
+ with pytest.raises(InternalError):
+ await offline_charge_point.on_authorize(
+ id_token={"id_token": "any", "type": "ISO14443"}
+ )
+
+
+class TestTransactionEventHandler:
+ """Tests for the TransactionEvent incoming handler."""
+
+ async def test_started_returns_auth_status(self, charge_point):
+ response = await charge_point.on_transaction_event(
+ event_type=TransactionEventEnumType.started,
+ timestamp="2026-01-01T00:00:00Z",
+ trigger_reason="Authorized",
+ seq_no=0,
+ transaction_info={"transaction_id": "txn-001"},
+ id_token={"id_token": "test_token", "type": "ISO14443"},
+ )
+ assert response.id_token_info["status"] == AuthorizationStatusEnumType.accepted
+
+ async def test_updated_returns_total_cost(self, charge_point):
+ response = await charge_point.on_transaction_event(
+ event_type=TransactionEventEnumType.updated,
+ timestamp="2026-01-01T00:00:00Z",
+ trigger_reason="MeterValuePeriodic",
+ seq_no=1,
+ transaction_info={"transaction_id": "txn-001"},
+ )
+ assert response.total_cost == DEFAULT_TOTAL_COST
+
+ async def test_updated_uses_custom_total_cost(self, mock_connection):
+ cp = ChargePoint(mock_connection, total_cost=42.50)
+ response = await cp.on_transaction_event(
+ event_type=TransactionEventEnumType.updated,
+ timestamp="2026-01-01T00:00:00Z",
+ trigger_reason="MeterValuePeriodic",
+ seq_no=1,
+ transaction_info={"transaction_id": "txn-001"},
+ )
+ assert response.total_cost == 42.50
+
+ async def test_ended_returns_empty(self, charge_point):
+ response = await charge_point.on_transaction_event(
+ event_type=TransactionEventEnumType.ended,
+ timestamp="2026-01-01T00:00:00Z",
+ trigger_reason="StopAuthorized",
+ seq_no=2,
+ transaction_info={"transaction_id": "txn-001"},
+ )
+ assert response is not None
+
+
+class TestDataTransferHandler:
+ """Tests for the DataTransfer incoming handler."""
+
+ async def test_returns_accepted(self, charge_point):
+ response = await charge_point.on_data_transfer(vendor_id="TestVendor")
+ assert response.status == DataTransferStatusEnumType.accepted
+
+
+class TestCertificateHandlers:
+ """Tests for certificate-related incoming handlers."""
+
+ async def test_get_15118_ev_certificate(self, charge_point):
+ response = await charge_point.on_get_15118_ev_certificate(
+ iso15118_schema_version="urn:iso:15118:2:2013:MsgDef",
+ action="Install",
+ exi_request="mock_exi_data",
+ )
+ assert response.status == Iso15118EVCertificateStatusEnumType.accepted
+ assert response.exi_response == "mock_exi_response_data"
+
+ async def test_get_certificate_status(self, charge_point):
+ response = await charge_point.on_get_certificate_status(
+ ocsp_request_data={
+ "hash_algorithm": "SHA256",
+ "issuer_name_hash": "mock",
+ "issuer_key_hash": "mock",
+ "serial_number": "mock",
+ "responder_url": "https://ocsp.example.com",
+ }
+ )
+ assert response.status == GetCertificateStatusEnumType.accepted
+
+ async def test_sign_certificate(self, charge_point):
+ response = await charge_point.on_sign_certificate(csr="mock_csr_data")
+ assert response.status == GenericStatusEnumType.accepted
+
+
+class TestNotificationHandlers:
+ """Tests for notification incoming handlers with empty responses."""
+
+ async def test_meter_values(self, charge_point):
+ response = await charge_point.on_meter_values(
+ evse_id=1, meter_value=[{"timestamp": "2026-01-01T00:00:00Z"}]
+ )
+ assert response is not None
+
+ async def test_notify_report(self, charge_point):
+ response = await charge_point.on_notify_report(
+ request_id=1,
+ generated_at="2026-01-01T00:00:00Z",
+ seq_no=0,
+ )
+ assert response is not None
+
+ async def test_firmware_status_notification(self, charge_point):
+ response = await charge_point.on_firmware_status_notification(
+ status="Installed"
+ )
+ assert response is not None
+
+ async def test_log_status_notification(self, charge_point):
+ response = await charge_point.on_log_status_notification(
+ status="Uploaded", request_id=1
+ )
+ assert response is not None
+
+ async def test_security_event_notification(self, charge_point):
+ response = await charge_point.on_security_event_notification(
+ event_type="FirmwareUpdated", timestamp="2026-01-01T00:00:00Z"
+ )
+ assert response is not None
+
+ async def test_notify_customer_information(self, charge_point):
+ response = await charge_point.on_notify_customer_information(
+ data="customer_data",
+ seq_no=0,
+ generated_at="2026-01-01T00:00:00Z",
+ request_id=1,
+ )
+ assert response is not None
+
+
+class TestSendCommandErrorHandling:
+ """Tests for error handling in the command dispatch layer."""
+
+ async def test_timeout_is_caught(self, charge_point):
+ from ocpp.v201.enums import Action
+
+ with patch.object(
+ charge_point, "_send_clear_cache", side_effect=TimeoutError("timed out")
+ ):
+ await charge_point._send_command(command_name=Action.clear_cache)
+
+ async def test_ocpp_error_is_caught(self, charge_point):
+ from ocpp.exceptions import InternalError as OCPPInternalError
+ from ocpp.v201.enums import Action
+
+ with patch.object(
+ charge_point,
+ "_send_clear_cache",
+ side_effect=OCPPInternalError(description="test error"),
+ ):
+ await charge_point._send_command(command_name=Action.clear_cache)
+
+ async def test_connection_closed_is_caught(self, charge_point):
+ from ocpp.v201.enums import Action
+ from websockets.exceptions import ConnectionClosedOK
+ from websockets.frames import Close
+
+ with (
+ patch.object(
+ charge_point,
+ "_send_clear_cache",
+ side_effect=ConnectionClosedOK(
+ Close(1000, ""), Close(1000, ""), rcvd_then_sent=True
+ ),
+ ),
+ patch.object(charge_point, "handle_connection_closed"),
+ ):
+ await charge_point._send_command(command_name=Action.clear_cache)
+ charge_point.handle_connection_closed.assert_called_once()
+
+ async def test_unsupported_command_logs_warning(self, charge_point):
+ unsupported = MagicMock(value="Unsupported")
+ await charge_point._send_command(command_name=unsupported)
-"""Timer for asyncio."""
+"""Asynchronous timer with support for one-shot and repeating callbacks."""
import asyncio
+import inspect
+from collections.abc import Callable
+from typing import Any
class Timer:
+ """Asynchronous timer with one-shot and repeating callback support."""
+
def __init__(
self,
timeout: float,
repeat: bool,
- callback,
- callback_args=(),
- callback_kwargs=None,
+ callback: Callable[..., Any],
+ callback_args: tuple = (),
+ callback_kwargs: dict[str, Any] | None = None,
):
- """
- An asynchronous Timer object.
+ """Create an asynchronous timer.
Parameters
----------
- timeout: :class:`float`:
- The duration for which the timer should last.
-
- repeat: :class:`bool`:
- Whether the timer should repeat.
-
- callback: :class:`Coroutine` or `Method` or `Function`:
- An `asyncio` coroutine or a regular method that will be called as soon as
- the timer ends.
+ timeout:
+ Duration in seconds before the callback is invoked.
+ repeat:
+ Whether the timer should repeat after each invocation.
+ callback:
+ A sync or async callable to invoke when the timer fires.
+ callback_args:
+ Positional arguments passed to the callback.
+ callback_kwargs:
+ Keyword arguments passed to the callback.
- callback_args: Optional[:class:`tuple`]:
- The args to be passed to the callback.
-
- callback_kwargs: Optional[:class:`dict`]:
- The kwargs to be passed to the callback.
"""
self._timeout = timeout
self._repeat = repeat
self._callback = callback
- self._task = asyncio.create_task(self._job())
self._callback_args = callback_args
- if callback_kwargs is None:
- callback_kwargs = {}
- self._callback_kwargs = callback_kwargs
+ self._callback_kwargs = callback_kwargs or {}
+ self._task = asyncio.create_task(self._job())
- async def _job(self):
+ async def _job(self) -> None:
if self._repeat:
- while self._task.cancelled() is False:
+ while not self._task.cancelled():
await asyncio.sleep(self._timeout)
await self._call_callback()
else:
await asyncio.sleep(self._timeout)
await self._call_callback()
- async def _call_callback(self):
- if asyncio.iscoroutine(self._callback) or asyncio.iscoroutinefunction(
- self._callback
- ):
+ async def _call_callback(self) -> None:
+ if inspect.iscoroutinefunction(self._callback):
await self._callback(*self._callback_args, **self._callback_kwargs)
else:
- self._callback(*self._callback_args, **self._callback_kwargs)
+ result = self._callback(*self._callback_args, **self._callback_kwargs)
+ if inspect.isawaitable(result):
+ await result
- def cancel(self):
- """Cancels the timer. The callback will not be called."""
+ def cancel(self) -> None:
+ """Cancel the timer. The callback will not be called."""
self._task.cancel()