fix(ocpp-server): add asyncio compatible timer implementation and use it
[e-mobility-charging-stations-simulator.git] / tests / ocpp-server / server.py
1 import argparse
2 import asyncio
3 import logging
4 from datetime import datetime, timezone
5 from functools import partial
6 from typing import Optional
7
8 import ocpp.v201
9 import websockets
10 from ocpp.routing import on
11 from ocpp.v201.enums import (
12 Action,
13 AuthorizationStatusType,
14 ClearCacheStatusType,
15 GenericDeviceModelStatusType,
16 RegistrationStatusType,
17 ReportBaseType,
18 TransactionEventType,
19 )
20 from websockets import ConnectionClosed
21
22 from timer import Timer
23
24 # Setting up the logging configuration to display debug level messages.
25 logging.basicConfig(level=logging.DEBUG)
26
27 ChargePoints = set()
28
29
30 # Define a ChargePoint class inheriting from the OCPP 2.0.1 ChargePoint class.
31 class ChargePoint(ocpp.v201.ChargePoint):
32 _command_timer: Optional[Timer]
33
34 def __init__(self, connection):
35 super().__init__(connection.path.strip("/"), connection)
36 self._command_timer = None
37
38 # Message handlers to receive OCPP messages.
39 @on(Action.BootNotification)
40 async def on_boot_notification(self, charging_station, reason, **kwargs):
41 logging.info("Received %s", Action.BootNotification)
42 # Create and return a BootNotification response with the current time,
43 # an interval of 60 seconds, and an accepted status.
44 return ocpp.v201.call_result.BootNotification(
45 current_time=datetime.now(timezone.utc).isoformat(),
46 interval=60,
47 status=RegistrationStatusType.accepted,
48 )
49
50 @on(Action.Heartbeat)
51 async def on_heartbeat(self, **kwargs):
52 logging.info("Received %s", Action.Heartbeat)
53 return ocpp.v201.call_result.Heartbeat(
54 current_time=datetime.now(timezone.utc).isoformat()
55 )
56
57 @on(Action.StatusNotification)
58 async def on_status_notification(
59 self, timestamp, evse_id: int, connector_id: int, connector_status, **kwargs
60 ):
61 logging.info("Received %s", Action.StatusNotification)
62 return ocpp.v201.call_result.StatusNotification()
63
64 @on(Action.Authorize)
65 async def on_authorize(self, id_token, **kwargs):
66 logging.info("Received %s", Action.Authorize)
67 return ocpp.v201.call_result.Authorize(
68 id_token_info={"status": AuthorizationStatusType.accepted}
69 )
70
71 @on(Action.TransactionEvent)
72 async def on_transaction_event(
73 self,
74 event_type: TransactionEventType,
75 timestamp,
76 trigger_reason,
77 seq_no: int,
78 transaction_info,
79 **kwargs,
80 ):
81 match event_type:
82 case TransactionEventType.started:
83 logging.info("Received %s Started", Action.TransactionEvent)
84 return ocpp.v201.call_result.TransactionEvent(
85 id_token_info={"status": AuthorizationStatusType.accepted}
86 )
87 case TransactionEventType.updated:
88 logging.info("Received %s Updated", Action.TransactionEvent)
89 return ocpp.v201.call_result.TransactionEvent(total_cost=10)
90 case TransactionEventType.ended:
91 logging.info("Received %s Ended", Action.TransactionEvent)
92 return ocpp.v201.call_result.TransactionEvent()
93
94 @on(Action.MeterValues)
95 async def on_meter_values(self, evse_id: int, meter_value, **kwargs):
96 logging.info("Received %s", Action.MeterValues)
97 return ocpp.v201.call_result.MeterValues()
98
99 # Request handlers to emit OCPP messages.
100 async def _send_clear_cache(self):
101 request = ocpp.v201.call.ClearCache()
102 response = await self.call(request)
103
104 if response.status == ClearCacheStatusType.accepted:
105 logging.info("%s successful", Action.ClearCache)
106 else:
107 logging.info("%s failed", Action.ClearCache)
108
109 async def _send_get_base_report(self):
110 request = ocpp.v201.call.GetBaseReport(
111 request_id=1, report_base=ReportBaseType.full_inventory
112 )
113 response = await self.call(request)
114
115 if response.status == GenericDeviceModelStatusType.accepted:
116 logging.info("%s successful", Action.GetBaseReport)
117 else:
118 logging.info("%s failed", Action.GetBaseReport)
119
120 async def _send_command(self, command_name: Action):
121 logging.debug("Sending OCPP command: %s", command_name)
122 match command_name:
123 case Action.ClearCache:
124 await self._send_clear_cache()
125 case Action.GetBaseReport:
126 await self._send_get_base_report()
127 case _:
128 logging.info(f"Not supported command {command_name}")
129
130 async def send_command(
131 self, command_name: Action, delay: Optional[float], period: Optional[float]
132 ):
133 if not delay and not period:
134 raise ValueError("Either delay or period must be defined")
135 if delay and delay <= 0:
136 raise ValueError("Delay must be a positive number")
137 if period and period <= 0:
138 raise ValueError("Period must be a positive number")
139 try:
140 if delay and not self._command_timer:
141 self._command_timer = Timer(
142 delay,
143 False,
144 self._send_command,
145 [command_name],
146 )
147 if period and not self._command_timer:
148 self._command_timer = Timer(
149 period,
150 True,
151 self._send_command,
152 [command_name],
153 )
154 except ConnectionClosed:
155 self.handle_connection_closed()
156
157 def handle_connection_closed(self):
158 logging.info("ChargePoint %s closed connection", self.id)
159 if self._command_timer:
160 self._command_timer.cancel()
161 ChargePoints.remove(self)
162 logging.debug("Connected ChargePoint(s): %d", len(ChargePoints))
163
164
165 # Function to handle new WebSocket connections.
166 async def on_connect(
167 websocket,
168 command_name: Optional[Action],
169 delay: Optional[float],
170 period: Optional[float],
171 ):
172 """For every new charge point that connects, create a ChargePoint instance and start
173 listening for messages."""
174 try:
175 requested_protocols = websocket.request_headers["Sec-WebSocket-Protocol"]
176 except KeyError:
177 logging.info("Client hasn't requested any Subprotocol. Closing Connection")
178 return await websocket.close()
179
180 if websocket.subprotocol:
181 logging.info("Protocols Matched: %s", websocket.subprotocol)
182 else:
183 logging.warning(
184 "Protocols Mismatched | Expected Subprotocols: %s,"
185 " but client supports %s | Closing connection",
186 websocket.available_subprotocols,
187 requested_protocols,
188 )
189 return await websocket.close()
190
191 cp = ChargePoint(websocket)
192 if command_name:
193 await cp.send_command(command_name, delay, period)
194
195 ChargePoints.add(cp)
196
197 try:
198 await cp.start()
199 except ConnectionClosed:
200 cp.handle_connection_closed()
201
202
203 # Main function to start the WebSocket server.
204 async def main():
205 parser = argparse.ArgumentParser(description="OCPP2 Server")
206 parser.add_argument("-c", "--command", type=Action, help="OCPP2 Command Name")
207 parser.add_argument("-d", "--delay", type=float, help="Delay in seconds")
208 parser.add_argument("-p", "--period", type=float, help="Period in seconds")
209
210 args = parser.parse_args()
211
212 # Create the WebSocket server and specify the handler for new connections.
213 server = await websockets.serve(
214 partial(
215 on_connect, command_name=args.command, delay=args.delay, period=args.period
216 ),
217 "127.0.0.1", # Listen on loopback.
218 9000, # Port number.
219 subprotocols=["ocpp2.0", "ocpp2.0.1"], # Specify OCPP 2.0.1 subprotocols.
220 )
221 logging.info("WebSocket Server Started")
222
223 # Wait for the server to close (runs indefinitely).
224 await server.wait_closed()
225
226
227 # Entry point of the script.
228 if __name__ == "__main__":
229 # Run the main function to start the server.
230 asyncio.run(main())