]> Piment Noir Git Repositories - e-mobility-charging-stations-simulator.git/blob - tests/ocpp-server/server.py
9dce9e1b67c618b0ca9f1d113accc2a4da2f3c7f
[e-mobility-charging-stations-simulator.git] / tests / ocpp-server / server.py
1 import argparse
2 import asyncio
3 import logging
4 from datetime import datetime, timezone
5 from functools import partial
6 from random import randint
7 from typing import Optional
8
9 import ocpp.v201
10 import websockets
11 from ocpp.routing import on
12 from ocpp.v201.enums import (
13 Action,
14 AuthorizationStatusEnumType,
15 ClearCacheStatusEnumType,
16 GenericDeviceModelStatusEnumType,
17 RegistrationStatusEnumType,
18 ReportBaseEnumType,
19 TransactionEventEnumType,
20 )
21 from websockets import ConnectionClosed
22
23 from timer import Timer
24
25 # Setting up the logging configuration to display debug level messages.
26 logging.basicConfig(level=logging.DEBUG)
27
28 ChargePoints = set()
29
30
31 class ChargePoint(ocpp.v201.ChargePoint):
32 _command_timer: Optional[Timer]
33
34 def __init__(self, connection):
35 super().__init__(connection.path.strip("/"), connection)
36 self._command_timer = None
37
38 # Message handlers to receive OCPP messages.
39 @on(Action.boot_notification)
40 async def on_boot_notification(self, charging_station, reason, **kwargs):
41 logging.info("Received %s", Action.boot_notification)
42 # Create and return a BootNotification response with the current time,
43 # an interval of 60 seconds, and an accepted status.
44 return ocpp.v201.call_result.BootNotification(
45 current_time=datetime.now(timezone.utc).isoformat(),
46 interval=60,
47 status=RegistrationStatusEnumType.accepted,
48 )
49
50 @on(Action.heartbeat)
51 async def on_heartbeat(self, **kwargs):
52 logging.info("Received %s", Action.heartbeat)
53 return ocpp.v201.call_result.Heartbeat(
54 current_time=datetime.now(timezone.utc).isoformat()
55 )
56
57 @on(Action.status_notification)
58 async def on_status_notification(
59 self, timestamp, evse_id: int, connector_id: int, connector_status, **kwargs
60 ):
61 logging.info("Received %s", Action.status_notification)
62 return ocpp.v201.call_result.StatusNotification()
63
64 @on(Action.authorize)
65 async def on_authorize(self, id_token, **kwargs):
66 logging.info("Received %s", Action.authorize)
67 return ocpp.v201.call_result.Authorize(
68 id_token_info={"status": AuthorizationStatusEnumType.accepted}
69 )
70
71 @on(Action.transaction_event)
72 async def on_transaction_event(
73 self,
74 event_type: TransactionEventEnumType,
75 timestamp,
76 trigger_reason,
77 seq_no: int,
78 transaction_info,
79 **kwargs,
80 ):
81 match event_type:
82 case TransactionEventEnumType.started:
83 logging.info("Received %s Started", Action.transaction_event)
84 return ocpp.v201.call_result.TransactionEvent(
85 id_token_info={"status": AuthorizationStatusEnumType.accepted}
86 )
87 case TransactionEventEnumType.updated:
88 logging.info("Received %s Updated", Action.transaction_event)
89 return ocpp.v201.call_result.TransactionEvent(total_cost=10)
90 case TransactionEventEnumType.ended:
91 logging.info("Received %s Ended", Action.transaction_event)
92 return ocpp.v201.call_result.TransactionEvent()
93
94 @on(Action.meter_values)
95 async def on_meter_values(self, evse_id: int, meter_value, **kwargs):
96 logging.info("Received %s", Action.meter_values)
97 return ocpp.v201.call_result.MeterValues()
98
99 # Request handlers to emit OCPP messages.
100 async def _send_clear_cache(self):
101 request = ocpp.v201.call.ClearCache()
102 response = await self.call(request)
103
104 if response.status == ClearCacheStatusEnumType.accepted:
105 logging.info("%s successful", Action.clear_cache)
106 else:
107 logging.info("%s failed", Action.clear_cache)
108
109 async def _send_get_base_report(self):
110 request = ocpp.v201.call.GetBaseReport(
111 request_id=randint(1, 100), # noqa: S311
112 report_base=ReportBaseEnumType.full_inventory,
113 )
114 response = await self.call(request)
115
116 if response.status == GenericDeviceModelStatusEnumType.accepted:
117 logging.info("%s successful", Action.get_base_report)
118 else:
119 logging.info("%s failed", Action.get_base_report)
120
121 async def _send_command(self, command_name: Action):
122 logging.debug("Sending OCPP command %s", command_name)
123 match command_name:
124 case Action.clear_cache:
125 await self._send_clear_cache()
126 case Action.get_base_report:
127 await self._send_get_base_report()
128 case _:
129 logging.info(f"Not supported command {command_name}")
130
131 async def send_command(
132 self, command_name: Action, delay: Optional[float], period: Optional[float]
133 ):
134 try:
135 if delay and not self._command_timer:
136 self._command_timer = Timer(
137 delay,
138 False,
139 self._send_command,
140 [command_name],
141 )
142 if period and not self._command_timer:
143 self._command_timer = Timer(
144 period,
145 True,
146 self._send_command,
147 [command_name],
148 )
149 except ConnectionClosed:
150 self.handle_connection_closed()
151
152 def handle_connection_closed(self):
153 logging.info("ChargePoint %s closed connection", self.id)
154 if self._command_timer:
155 self._command_timer.cancel()
156 ChargePoints.remove(self)
157 logging.debug("Connected ChargePoint(s): %d", len(ChargePoints))
158
159
160 # Function to handle new WebSocket connections.
161 async def on_connect(
162 websocket,
163 command_name: Optional[Action],
164 delay: Optional[float],
165 period: Optional[float],
166 ):
167 """For every new charge point that connects, create a ChargePoint instance and start
168 listening for messages.
169 """
170 try:
171 requested_protocols = websocket.request_headers["Sec-WebSocket-Protocol"]
172 except KeyError:
173 logging.info("Client hasn't requested any Subprotocol. Closing Connection")
174 return await websocket.close()
175
176 if websocket.subprotocol:
177 logging.info("Protocols Matched: %s", websocket.subprotocol)
178 else:
179 logging.warning(
180 "Protocols Mismatched | Expected Subprotocols: %s,"
181 " but client supports %s | Closing connection",
182 websocket.available_subprotocols,
183 requested_protocols,
184 )
185 return await websocket.close()
186
187 cp = ChargePoint(websocket)
188 if command_name:
189 await cp.send_command(command_name, delay, period)
190
191 ChargePoints.add(cp)
192
193 try:
194 await cp.start()
195 except ConnectionClosed:
196 cp.handle_connection_closed()
197
198
199 def check_positive_number(value: Optional[float]):
200 try:
201 value = float(value)
202 except ValueError:
203 raise argparse.ArgumentTypeError("must be a number") from None
204 if value <= 0:
205 raise argparse.ArgumentTypeError("must be a positive number")
206 return value
207
208
209 # Main function to start the WebSocket server.
210 async def main():
211 parser = argparse.ArgumentParser(description="OCPP2 Server")
212 parser.add_argument("-c", "--command", type=Action, help="command name")
213 group = parser.add_mutually_exclusive_group()
214 group.add_argument(
215 "-d",
216 "--delay",
217 type=check_positive_number,
218 help="delay in seconds",
219 )
220 group.add_argument(
221 "-p",
222 "--period",
223 type=check_positive_number,
224 help="period in seconds",
225 )
226 group.required = parser.parse_known_args()[0].command is not None
227
228 args = parser.parse_args()
229
230 # Create the WebSocket server and specify the handler for new connections.
231 server = await websockets.serve(
232 partial(
233 on_connect, command_name=args.command, delay=args.delay, period=args.period
234 ),
235 "127.0.0.1", # Listen on loopback.
236 9000, # Port number.
237 subprotocols=["ocpp2.0", "ocpp2.0.1"], # Specify OCPP 2.0.1 subprotocols.
238 )
239 logging.info("WebSocket Server Started")
240
241 # Wait for the server to close (runs indefinitely).
242 await server.wait_closed()
243
244
245 # Entry point of the script.
246 if __name__ == "__main__":
247 # Run the main function to start the server.
248 asyncio.run(main())