feat: modified OCPP2.0 test
[e-mobility-charging-stations-simulator.git] / tests / ocpp-server / server.py
1 import argparse
2 import asyncio
3 import logging
4 from datetime import datetime, timezone
5 from threading import Timer
6
7 import ocpp.v201
8 import websockets
9 from ocpp.routing import on
10 from ocpp.v201.enums import (
11 Action,
12 AuthorizationStatusType,
13 ClearCacheStatusType,
14 RegistrationStatusType,
15 ReportBaseType,
16 TransactionEventType,
17 )
18 from websockets import ConnectionClosed
19
20 # Setting up the logging configuration to display debug level messages.
21 logging.basicConfig(level=logging.DEBUG)
22
23 ChargePoints = set()
24
25
26 class RepeatTimer(Timer):
27 """Class that inherits from the Timer class. It will run a
28 function at regular intervals."""
29
30 def run(self):
31 while not self.finished.wait(self.interval):
32 self.function(*self.args, **self.kwargs)
33
34
35 # Define a ChargePoint class inheriting from the OCPP 2.0.1 ChargePoint class.
36 class ChargePoint(ocpp.v201.ChargePoint):
37 # Message handlers to receive OCPP messages.
38 @on(Action.BootNotification)
39 async def on_boot_notification(self, charging_station, reason, **kwargs):
40 logging.info("Received %s", Action.BootNotification)
41 # Create and return a BootNotification response with the current time,
42 # an interval of 60 seconds, and an accepted status.
43 return ocpp.v201.call_result.BootNotification(
44 current_time=datetime.now(timezone.utc).isoformat(),
45 interval=60,
46 status=RegistrationStatusType.accepted,
47 )
48
49 @on(Action.Heartbeat)
50 async def on_heartbeat(self, **kwargs):
51 logging.info("Received %s", Action.Heartbeat)
52 return ocpp.v201.call_result.Heartbeat(
53 current_time=datetime.now(timezone.utc).isoformat()
54 )
55
56 @on(Action.StatusNotification)
57 async def on_status_notification(
58 self, timestamp, evse_id: int, connector_id: int, connector_status, **kwargs
59 ):
60 logging.info("Received %s", Action.StatusNotification)
61 return ocpp.v201.call_result.StatusNotification()
62
63 @on(Action.Authorize)
64 async def on_authorize(self, id_token, **kwargs):
65 logging.info("Received %s", Action.Authorize)
66 return ocpp.v201.call_result.Authorize(
67 id_token_info={"status": AuthorizationStatusType.accepted}
68 )
69
70 @on(Action.TransactionEvent)
71 async def on_transaction_event(
72 self,
73 event_type: TransactionEventType,
74 timestamp,
75 trigger_reason,
76 seq_no: int,
77 transaction_info,
78 **kwargs,
79 ):
80 match event_type:
81 case TransactionEventType.started:
82 logging.info("Received %s Started", Action.TransactionEvent)
83 return ocpp.v201.call_result.TransactionEvent(
84 id_token_info={"status": AuthorizationStatusType.accepted}
85 )
86 case TransactionEventType.updated:
87 logging.info("Received %s Updated", Action.TransactionEvent)
88 return ocpp.v201.call_result.TransactionEvent(total_cost=10)
89 case TransactionEventType.ended:
90 logging.info("Received %s Ended", Action.TransactionEvent)
91 return ocpp.v201.call_result.TransactionEvent()
92
93 @on(Action.MeterValues)
94 async def on_meter_values(self, evse_id: int, meter_value, **kwargs):
95 logging.info("Received %s", Action.MeterValues)
96 return ocpp.v201.call_result.MeterValues()
97
98 @on(Action.GetBaseReport)
99 async def on_get_base_report(self, request_id: int, report_base: ReportBaseType, **kwargs):
100 logging.info("Received %s", Action.GetBaseReport)
101 return ocpp.v201.call_result.GetBaseReport(status="Accepted")
102
103 # Request handlers to emit OCPP messages.
104 async def send_clear_cache(self):
105 request = ocpp.v201.call.ClearCache()
106 response = await self.call(request)
107
108 if response.status == ClearCacheStatusType.accepted:
109 logging.info("%s successful", Action.ClearCache)
110 else:
111 logging.info("%s failed", Action.ClearCache)
112
113 async def send_get_base_report(self):
114 logging.info("Executing send_get_base_report...")
115 request = ocpp.v201.call.GetBaseReport(
116 reportBase=ReportBaseType.ConfigurationInventory
117 ) # Use correct ReportBaseType
118 try:
119 response = await self.call(request)
120 logging.info("Send GetBaseReport")
121
122 if (
123 response.status == "Accepted"
124 ): # Adjust depending on the structure of your response
125 logging.info("Send GetBaseReport successful")
126 else:
127 logging.info("Send GetBaseReport failed")
128 except Exception as e:
129 logging.error(f"Send GetBaseReport failed: {str(e)}")
130 logging.info("send_get_base_report done.")
131
132
133 # Define argument parser
134 parser = argparse.ArgumentParser(description="OCPP Charge Point Simulator")
135 parser.add_argument("--request", type=str, help="OCPP 2 Command Name")
136 parser.add_argument("--delay", type=int, help="Delay in seconds")
137 parser.add_argument("--period", type=int, help="Period in seconds")
138
139 args = parser.parse_args()
140
141
142 # Function to send OCPP command
143 async def send_ocpp_command(cp, command_name, delay=None, period=None):
144 # If delay is not None, sleep for delay seconds
145 if delay:
146 await asyncio.sleep(delay)
147
148 # If period is not None, send command repeatedly with period interval
149 if period:
150 async def send_command_repeatedly():
151 while True:
152 command_name = await charge_point.receive_command()
153 try:
154 match command_name:
155 case 'ClearCache':
156 logging.info("ClearCache parser working")
157 await charge_point.send_clear_cache()
158 case 'GetBaseReport':
159 logging.info("GetBaseReport parser working")
160 await charge_point.send_get_base_report()
161 case _:
162 logging.warning(f"Unsupported command {command_name}")
163 except Exception as e:
164 logging.exception(f"Failure while processing command {command_name}")
165 finally:
166 await asyncio.sleep(period)
167
168 timer = RepeatTimer(period, send_command_repeatedly)
169 await timer.start()
170 await timer.wait_closed() # Wait for timer to finish before exiting
171
172 else:
173 if command_name == "GetBaseReport":
174 await cp.send_get_base_report()
175
176
177 # Function to handle new WebSocket connections.
178 async def on_connect(websocket, path):
179 """For every new charge point that connects, create a ChargePoint instance and start
180 listening for messages."""
181 try:
182 requested_protocols = websocket.request_headers["Sec-WebSocket-Protocol"]
183 except KeyError:
184 logging.info("Client hasn't requested any Subprotocol. Closing Connection")
185 return await websocket.close()
186
187 if websocket.subprotocol:
188 logging.info("Protocols Matched: %s", websocket.subprotocol)
189 else:
190 logging.warning(
191 "Protocols Mismatched | Expected Subprotocols: %s,"
192 " but client supports %s | Closing connection",
193 websocket.available_subprotocols,
194 requested_protocols,
195 )
196 return await websocket.close()
197
198 charge_point_id = path.strip("/")
199 cp = ChargePoint(charge_point_id, websocket)
200
201 ChargePoints.add(cp)
202 try:
203 await cp.start()
204 # Check if request argument is specified
205 if args.request:
206 asyncio.create_task(send_ocpp_command(cp, args.request, args.delay, args.period))
207
208 except ConnectionClosed:
209 logging.info("ChargePoint %s closed connection", cp.id)
210 ChargePoints.remove(cp)
211 logging.debug("Connected ChargePoint(s): %d", len(ChargePoints))
212
213
214 # Main function to start the WebSocket server.
215 async def main():
216 # Create the WebSocket server and specify the handler for new connections.
217 server = await websockets.serve(
218 on_connect,
219 "127.0.0.1", # Listen on loopback.
220 9000, # Port number.
221 subprotocols=["ocpp2.0", "ocpp2.0.1"], # Specify OCPP 2.0.1 subprotocols.
222 )
223 logging.info("WebSocket Server Started")
224
225 # Wait for the server to close (runs indefinitely).
226 await server.wait_closed()
227
228
229 # Entry point of the script.
230 if __name__ == "__main__":
231 # Run the main function to start the server.
232 asyncio.run(main())