17e328a661ee459ce421c768f0501e71effe537e
[e-mobility-charging-stations-simulator.git] / tests / ocpp-server / server.py
1 import argparse
2 import asyncio
3 import logging
4 from datetime import datetime, timezone
5 from functools import partial
6 from random import randint
7 from typing import Optional
8
9 import ocpp.v201
10 import websockets
11 from ocpp.routing import on
12 from ocpp.v201.enums import (
13 Action,
14 AuthorizationStatusType,
15 ClearCacheStatusType,
16 GenericDeviceModelStatusType,
17 RegistrationStatusType,
18 ReportBaseType,
19 TransactionEventType,
20 )
21 from websockets import ConnectionClosed
22
23 from timer import Timer
24
25 # Setting up the logging configuration to display debug level messages.
26 logging.basicConfig(level=logging.DEBUG)
27
28 ChargePoints = set()
29
30
31 # Define a ChargePoint class inheriting from the OCPP 2.0.1 ChargePoint class.
32 class ChargePoint(ocpp.v201.ChargePoint):
33 _command_timer: Optional[Timer]
34
35 def __init__(self, connection):
36 super().__init__(connection.path.strip("/"), connection)
37 self._command_timer = None
38
39 # Message handlers to receive OCPP messages.
40 @on(Action.BootNotification)
41 async def on_boot_notification(self, charging_station, reason, **kwargs):
42 logging.info("Received %s", Action.BootNotification)
43 # Create and return a BootNotification response with the current time,
44 # an interval of 60 seconds, and an accepted status.
45 return ocpp.v201.call_result.BootNotification(
46 current_time=datetime.now(timezone.utc).isoformat(),
47 interval=60,
48 status=RegistrationStatusType.accepted,
49 )
50
51 @on(Action.Heartbeat)
52 async def on_heartbeat(self, **kwargs):
53 logging.info("Received %s", Action.Heartbeat)
54 return ocpp.v201.call_result.Heartbeat(
55 current_time=datetime.now(timezone.utc).isoformat()
56 )
57
58 @on(Action.StatusNotification)
59 async def on_status_notification(
60 self, timestamp, evse_id: int, connector_id: int, connector_status, **kwargs
61 ):
62 logging.info("Received %s", Action.StatusNotification)
63 return ocpp.v201.call_result.StatusNotification()
64
65 @on(Action.Authorize)
66 async def on_authorize(self, id_token, **kwargs):
67 logging.info("Received %s", Action.Authorize)
68 return ocpp.v201.call_result.Authorize(
69 id_token_info={"status": AuthorizationStatusType.accepted}
70 )
71
72 @on(Action.TransactionEvent)
73 async def on_transaction_event(
74 self,
75 event_type: TransactionEventType,
76 timestamp,
77 trigger_reason,
78 seq_no: int,
79 transaction_info,
80 **kwargs,
81 ):
82 match event_type:
83 case TransactionEventType.started:
84 logging.info("Received %s Started", Action.TransactionEvent)
85 return ocpp.v201.call_result.TransactionEvent(
86 id_token_info={"status": AuthorizationStatusType.accepted}
87 )
88 case TransactionEventType.updated:
89 logging.info("Received %s Updated", Action.TransactionEvent)
90 return ocpp.v201.call_result.TransactionEvent(total_cost=10)
91 case TransactionEventType.ended:
92 logging.info("Received %s Ended", Action.TransactionEvent)
93 return ocpp.v201.call_result.TransactionEvent()
94
95 @on(Action.MeterValues)
96 async def on_meter_values(self, evse_id: int, meter_value, **kwargs):
97 logging.info("Received %s", Action.MeterValues)
98 return ocpp.v201.call_result.MeterValues()
99
100 # Request handlers to emit OCPP messages.
101 async def _send_clear_cache(self):
102 request = ocpp.v201.call.ClearCache()
103 response = await self.call(request)
104
105 if response.status == ClearCacheStatusType.accepted:
106 logging.info("%s successful", Action.ClearCache)
107 else:
108 logging.info("%s failed", Action.ClearCache)
109
110 async def _send_get_base_report(self):
111 request = ocpp.v201.call.GetBaseReport(
112 request_id=randint(1, 100), # noqa: S311
113 report_base=ReportBaseType.full_inventory,
114 )
115 response = await self.call(request)
116
117 if response.status == GenericDeviceModelStatusType.accepted:
118 logging.info("%s successful", Action.GetBaseReport)
119 else:
120 logging.info("%s failed", Action.GetBaseReport)
121
122 async def _send_command(self, command_name: Action):
123 logging.debug("Sending OCPP command %s", command_name)
124 match command_name:
125 case Action.ClearCache:
126 await self._send_clear_cache()
127 case Action.GetBaseReport:
128 await self._send_get_base_report()
129 case _:
130 logging.info(f"Not supported command {command_name}")
131
132 async def send_command(
133 self, command_name: Action, delay: Optional[float], period: Optional[float]
134 ):
135 try:
136 if delay and not self._command_timer:
137 self._command_timer = Timer(
138 delay,
139 False,
140 self._send_command,
141 [command_name],
142 )
143 if period and not self._command_timer:
144 self._command_timer = Timer(
145 period,
146 True,
147 self._send_command,
148 [command_name],
149 )
150 except ConnectionClosed:
151 self.handle_connection_closed()
152
153 def handle_connection_closed(self):
154 logging.info("ChargePoint %s closed connection", self.id)
155 if self._command_timer:
156 self._command_timer.cancel()
157 ChargePoints.remove(self)
158 logging.debug("Connected ChargePoint(s): %d", len(ChargePoints))
159
160
161 # Function to handle new WebSocket connections.
162 async def on_connect(
163 websocket,
164 command_name: Optional[Action],
165 delay: Optional[float],
166 period: Optional[float],
167 ):
168 """For every new charge point that connects, create a ChargePoint instance and start
169 listening for messages.
170 """
171 try:
172 requested_protocols = websocket.request_headers["Sec-WebSocket-Protocol"]
173 except KeyError:
174 logging.info("Client hasn't requested any Subprotocol. Closing Connection")
175 return await websocket.close()
176
177 if websocket.subprotocol:
178 logging.info("Protocols Matched: %s", websocket.subprotocol)
179 else:
180 logging.warning(
181 "Protocols Mismatched | Expected Subprotocols: %s,"
182 " but client supports %s | Closing connection",
183 websocket.available_subprotocols,
184 requested_protocols,
185 )
186 return await websocket.close()
187
188 cp = ChargePoint(websocket)
189 if command_name:
190 await cp.send_command(command_name, delay, period)
191
192 ChargePoints.add(cp)
193
194 try:
195 await cp.start()
196 except ConnectionClosed:
197 cp.handle_connection_closed()
198
199
200 def check_positive_number(value: Optional[float]):
201 try:
202 value = float(value)
203 except ValueError:
204 raise argparse.ArgumentTypeError("must be a number") from None
205 if value <= 0:
206 raise argparse.ArgumentTypeError("must be a positive number")
207 return value
208
209
210 # Main function to start the WebSocket server.
211 async def main():
212 parser = argparse.ArgumentParser(description="OCPP2 Server")
213 parser.add_argument("-c", "--command", type=Action, help="command name")
214 group = parser.add_mutually_exclusive_group()
215 group.add_argument(
216 "-d",
217 "--delay",
218 type=check_positive_number,
219 help="delay in seconds",
220 )
221 group.add_argument(
222 "-p",
223 "--period",
224 type=check_positive_number,
225 help="period in seconds",
226 )
227 group.required = parser.parse_known_args()[0].command is not None
228
229 args = parser.parse_args()
230
231 # Create the WebSocket server and specify the handler for new connections.
232 server = await websockets.serve(
233 partial(
234 on_connect, command_name=args.command, delay=args.delay, period=args.period
235 ),
236 "127.0.0.1", # Listen on loopback.
237 9000, # Port number.
238 subprotocols=["ocpp2.0", "ocpp2.0.1"], # Specify OCPP 2.0.1 subprotocols.
239 )
240 logging.info("WebSocket Server Started")
241
242 # Wait for the server to close (runs indefinitely).
243 await server.wait_closed()
244
245
246 # Entry point of the script.
247 if __name__ == "__main__":
248 # Run the main function to start the server.
249 asyncio.run(main())