45a7668f33ca9a7d18c0a34a2cd71494cc1a11f1
[e-mobility-charging-stations-simulator.git] / tests / ocpp-server / server.py
1 import argparse
2 import asyncio
3 from functools import partial
4 import logging
5 from datetime import datetime, timezone
6 from threading import Timer
7
8 import ocpp.v201
9 import websockets
10 from ocpp.routing import on
11 from ocpp.v201.enums import (
12 Action,
13 AuthorizationStatusType,
14 ClearCacheStatusType,
15 RegistrationStatusType,
16 ReportBaseType,
17 TransactionEventType,
18 )
19 from websockets import ConnectionClosed
20
21 # Setting up the logging configuration to display debug level messages.
22 logging.basicConfig(level=logging.DEBUG)
23
24 ChargePoints = set()
25
26
27 class RepeatTimer(Timer):
28 """Class that inherits from the Timer class. It will run a
29 function at regular intervals."""
30
31 def run(self):
32 while not self.finished.wait(self.interval):
33 self.function(*self.args, **self.kwargs)
34
35
36 # Define a ChargePoint class inheriting from the OCPP 2.0.1 ChargePoint class.
37 class ChargePoint(ocpp.v201.ChargePoint):
38 # Message handlers to receive OCPP messages.
39 @on(Action.BootNotification)
40 async def on_boot_notification(self, charging_station, reason, **kwargs):
41 logging.info("Received %s", Action.BootNotification)
42 # Create and return a BootNotification response with the current time,
43 # an interval of 60 seconds, and an accepted status.
44 return ocpp.v201.call_result.BootNotification(
45 current_time=datetime.now(timezone.utc).isoformat(),
46 interval=60,
47 status=RegistrationStatusType.accepted,
48 )
49
50 @on(Action.Heartbeat)
51 async def on_heartbeat(self, **kwargs):
52 logging.info("Received %s", Action.Heartbeat)
53 return ocpp.v201.call_result.Heartbeat(
54 current_time=datetime.now(timezone.utc).isoformat()
55 )
56
57 @on(Action.StatusNotification)
58 async def on_status_notification(
59 self, timestamp, evse_id: int, connector_id: int, connector_status, **kwargs
60 ):
61 logging.info("Received %s", Action.StatusNotification)
62 return ocpp.v201.call_result.StatusNotification()
63
64 @on(Action.Authorize)
65 async def on_authorize(self, id_token, **kwargs):
66 logging.info("Received %s", Action.Authorize)
67 return ocpp.v201.call_result.Authorize(
68 id_token_info={"status": AuthorizationStatusType.accepted}
69 )
70
71 @on(Action.TransactionEvent)
72 async def on_transaction_event(
73 self,
74 event_type: TransactionEventType,
75 timestamp,
76 trigger_reason,
77 seq_no: int,
78 transaction_info,
79 **kwargs,
80 ):
81 match event_type:
82 case TransactionEventType.started:
83 logging.info("Received %s Started", Action.TransactionEvent)
84 return ocpp.v201.call_result.TransactionEvent(
85 id_token_info={"status": AuthorizationStatusType.accepted}
86 )
87 case TransactionEventType.updated:
88 logging.info("Received %s Updated", Action.TransactionEvent)
89 return ocpp.v201.call_result.TransactionEvent(total_cost=10)
90 case TransactionEventType.ended:
91 logging.info("Received %s Ended", Action.TransactionEvent)
92 return ocpp.v201.call_result.TransactionEvent()
93
94 @on(Action.MeterValues)
95 async def on_meter_values(self, evse_id: int, meter_value, **kwargs):
96 logging.info("Received %s", Action.MeterValues)
97 return ocpp.v201.call_result.MeterValues()
98
99 @on(Action.GetBaseReport)
100 async def on_get_base_report(self, request_id: int, report_base: ReportBaseType, **kwargs):
101 logging.info("Received %s", Action.GetBaseReport)
102 return ocpp.v201.call_result.GetBaseReport(
103 id_token_info={"status": ReportBaseType.accepted}
104 )
105
106 # Request handlers to emit OCPP messages.
107 async def send_clear_cache(self):
108 request = ocpp.v201.call.ClearCache()
109 response = await self.call(request)
110
111 if response.status == ClearCacheStatusType.accepted:
112 logging.info("%s successful", Action.ClearCache)
113 else:
114 logging.info("%s failed", Action.ClearCache)
115
116 async def send_get_base_report(self):
117 request = ocpp.v201.call.GetBaseReport(
118 reportBase=ReportBaseType.ConfigurationInventory
119 ) # Use correct ReportBaseType
120 try:
121 response = await self.call(request)
122 logging.info("Send %s", Action.GetBaseReport)
123
124 if response.status == ReportBaseType.accepted:
125 logging.info("%s successful", Action.GetBaseReport)
126 else:
127 logging.info("%s failed", Action.GetBaseReport)
128
129 except ConnectionClosed as e:
130 logging.error("Connection closed: %s", str(e))
131
132 except Exception as e:
133 logging.error("Unexpected error occurred: %s", str(e))
134
135 logging.info("send_get_base_report done.")
136
137
138
139 # Function to send OCPP command
140 async def send_ocpp_command(cp, command_name, delay=None, period=None):
141 # If delay is not None, sleep for delay seconds
142 if delay:
143 await asyncio.sleep(delay)
144
145 # If period is not None, send command repeatedly with period interval
146 if period:
147 async def send_command_repeatedly():
148 while True:
149 command_name = await cp.receive_command()
150 try:
151 match command_name:
152 case Action.ClearCache:
153 logging.info("ClearCache parser working")
154 await cp.send_clear_cache()
155 case Action.GetBaseReport:
156 logging.info("GetBaseReport parser working")
157 await cp.send_get_base_report()
158 case _:
159 logging.warning(f"Unsupported command {command_name}")
160 except Exception as e:
161 logging.exception(f"Failure while processing command {command_name}")
162 finally:
163 await asyncio.sleep(period)
164
165 timer = RepeatTimer(period, send_command_repeatedly)
166 await timer.start()
167 await timer.wait_closed() # Wait for timer to finish before exiting
168
169 else:
170 if command_name == "GetBaseReport":
171 await cp.send_get_base_report()
172
173
174 # Function to handle new WebSocket connections.
175 async def on_connect(websocket, path, args):
176 """For every new charge point that connects, create a ChargePoint instance and start
177 listening for messages."""
178 try:
179 requested_protocols = websocket.request_headers["Sec-WebSocket-Protocol"]
180 except KeyError:
181 logging.info("Client hasn't requested any Subprotocol. Closing Connection")
182 return await websocket.close()
183
184 if websocket.subprotocol:
185 logging.info("Protocols Matched: %s", websocket.subprotocol)
186 else:
187 logging.warning(
188 "Protocols Mismatched | Expected Subprotocols: %s,"
189 " but client supports %s | Closing connection",
190 websocket.available_subprotocols,
191 requested_protocols,
192 )
193 return await websocket.close()
194
195 charge_point_id = path.strip("/")
196 cp = ChargePoint(charge_point_id, websocket)
197
198 ChargePoints.add(cp)
199 try:
200 await cp.start()
201 # Check if request argument is specified
202 if args.request:
203 asyncio.create_task(send_ocpp_command(cp, args.request, args.delay, args.period))
204
205 except ConnectionClosed:
206 logging.info("ChargePoint %s closed connection", cp.id)
207 ChargePoints.remove(cp)
208 logging.debug("Connected ChargePoint(s): %d", len(ChargePoints))
209
210
211 # Main function to start the WebSocket server.
212 async def main():
213 # Define argument parser
214 parser = argparse.ArgumentParser(description="OCPP Charge Point Simulator")
215 parser.add_argument("--request", type=str, help="OCPP 2 Command Name")
216 parser.add_argument("--delay", type=int, help="Delay in seconds")
217 parser.add_argument("--period", type=int, help="Period in seconds")
218
219 args = parser.parse_args()
220
221 on_connect_bound = partial(on_connect, args=args) # Add args to on_connect
222
223 # Create the WebSocket server and specify the handler for new connections.
224 server = await websockets.serve(
225 on_connect_bound,
226 "127.0.0.1", # Listen on loopback.
227 9000, # Port number.
228 subprotocols=["ocpp2.0", "ocpp2.0.1"], # Specify OCPP 2.0.1 subprotocols.
229 )
230 logging.info("WebSocket Server Started")
231
232 # Wait for the server to close (runs indefinitely).
233 await server.wait_closed()
234
235
236 # Entry point of the script.
237 if __name__ == "__main__":
238 # Run the main function to start the server.
239 asyncio.run(main())