[autofix.ci] apply automated fixes
[e-mobility-charging-stations-simulator.git] / tests / ocpp-server / server.py
CommitLineData
b794f42e 1import argparse
fa16d389
S
2import asyncio
3import logging
fa16d389 4from datetime import datetime, timezone
aea49501 5from threading import Timer
fa16d389 6
a89844d4 7import ocpp.v201
1a0d2c47 8import websockets
fa16d389 9from ocpp.routing import on
d4aa9700
JB
10from ocpp.v201.enums import (
11 Action,
12 AuthorizationStatusType,
13 ClearCacheStatusType,
14 RegistrationStatusType,
f937c172 15 ReportBaseType,
b794f42e 16 TransactionEventType,
d4aa9700 17)
a5c2d21f 18from websockets import ConnectionClosed
fa16d389
S
19
20# Setting up the logging configuration to display debug level messages.
21logging.basicConfig(level=logging.DEBUG)
22
a5c2d21f
JB
23ChargePoints = set()
24
1a0d2c47 25
aea49501 26class RepeatTimer(Timer):
d4aa9700 27 """Class that inherits from the Timer class. It will run a
aea49501
JB
28 function at regular intervals."""
29
30 def run(self):
31 while not self.finished.wait(self.interval):
32 self.function(*self.args, **self.kwargs)
33
34
fa16d389 35# Define a ChargePoint class inheriting from the OCPP 2.0.1 ChargePoint class.
a89844d4 36class ChargePoint(ocpp.v201.ChargePoint):
aea49501 37 # Message handlers to receive OCPP messages.
339f65ad 38 @on(Action.BootNotification)
d6488e8d 39 async def on_boot_notification(self, charging_station, reason, **kwargs):
7628b7e6 40 logging.info("Received %s", Action.BootNotification)
d6488e8d 41 # Create and return a BootNotification response with the current time,
5dd22b9f 42 # an interval of 60 seconds, and an accepted status.
339f65ad 43 return ocpp.v201.call_result.BootNotification(
d6488e8d 44 current_time=datetime.now(timezone.utc).isoformat(),
115f3b17 45 interval=60,
d4aa9700 46 status=RegistrationStatusType.accepted,
d6488e8d 47 )
1a0d2c47 48
115f3b17 49 @on(Action.Heartbeat)
5dd22b9f 50 async def on_heartbeat(self, **kwargs):
7628b7e6 51 logging.info("Received %s", Action.Heartbeat)
d4aa9700
JB
52 return ocpp.v201.call_result.Heartbeat(
53 current_time=datetime.now(timezone.utc).isoformat()
54 )
115f3b17 55
65c0600c 56 @on(Action.StatusNotification)
d4aa9700
JB
57 async def on_status_notification(
58 self, timestamp, evse_id: int, connector_id: int, connector_status, **kwargs
59 ):
7628b7e6 60 logging.info("Received %s", Action.StatusNotification)
65c0600c
JB
61 return ocpp.v201.call_result.StatusNotification()
62
5dd22b9f
JB
63 @on(Action.Authorize)
64 async def on_authorize(self, id_token, **kwargs):
7628b7e6 65 logging.info("Received %s", Action.Authorize)
5dd22b9f 66 return ocpp.v201.call_result.Authorize(
d4aa9700 67 id_token_info={"status": AuthorizationStatusType.accepted}
8430af0a 68 )
5dd22b9f 69
22c4f1fc 70 @on(Action.TransactionEvent)
d4aa9700
JB
71 async def on_transaction_event(
72 self,
73 event_type: TransactionEventType,
74 timestamp,
75 trigger_reason,
76 seq_no: int,
77 transaction_info,
78 **kwargs,
79 ):
22c4f1fc
JB
80 match event_type:
81 case TransactionEventType.started:
7628b7e6 82 logging.info("Received %s Started", Action.TransactionEvent)
22c4f1fc 83 return ocpp.v201.call_result.TransactionEvent(
d4aa9700 84 id_token_info={"status": AuthorizationStatusType.accepted}
8430af0a 85 )
22c4f1fc 86 case TransactionEventType.updated:
7628b7e6 87 logging.info("Received %s Updated", Action.TransactionEvent)
d4aa9700 88 return ocpp.v201.call_result.TransactionEvent(total_cost=10)
22c4f1fc 89 case TransactionEventType.ended:
7628b7e6 90 logging.info("Received %s Ended", Action.TransactionEvent)
22c4f1fc
JB
91 return ocpp.v201.call_result.TransactionEvent()
92
5dd22b9f 93 @on(Action.MeterValues)
c7f80bf9 94 async def on_meter_values(self, evse_id: int, meter_value, **kwargs):
7628b7e6 95 logging.info("Received %s", Action.MeterValues)
5dd22b9f
JB
96 return ocpp.v201.call_result.MeterValues()
97
f937c172 98 @on(Action.GetBaseReport)
acf133cd 99 async def on_get_base_report(
100 self, request_id: int, report_base: ReportBaseType, **kwargs
101 ):
7c945b4a 102 logging.info("Received %s", Action.GetBaseReport)
f937c172
S
103 return ocpp.v201.call_result.GetBaseReport(status="Accepted")
104
d6488e8d 105 # Request handlers to emit OCPP messages.
a89844d4
JB
106 async def send_clear_cache(self):
107 request = ocpp.v201.call.ClearCache()
108 response = await self.call(request)
109
110 if response.status == ClearCacheStatusType.accepted:
7628b7e6 111 logging.info("%s successful", Action.ClearCache)
a89844d4 112 else:
7628b7e6 113 logging.info("%s failed", Action.ClearCache)
1a0d2c47 114
f937c172
S
115 async def send_get_base_report(self):
116 logging.info("Executing send_get_base_report...")
b794f42e 117 request = ocpp.v201.call.GetBaseReport(
118 reportBase=ReportBaseType.ConfigurationInventory
119 ) # Use correct ReportBaseType
f937c172
S
120 try:
121 response = await self.call(request)
ba0a7592 122 logging.info("Send %s", Action.GetBaseReport)
f937c172 123
b794f42e 124 if (
125 response.status == "Accepted"
126 ): # Adjust depending on the structure of your response
f937c172
S
127 logging.info("Send GetBaseReport successful")
128 else:
129 logging.info("Send GetBaseReport failed")
130 except Exception as e:
131 logging.error(f"Send GetBaseReport failed: {str(e)}")
132 logging.info("send_get_base_report done.")
133
b794f42e 134
f937c172 135# Define argument parser
b794f42e 136parser = argparse.ArgumentParser(description="OCPP Charge Point Simulator")
137parser.add_argument("--request", type=str, help="OCPP 2 Command Name")
138parser.add_argument("--delay", type=int, help="Delay in seconds")
139parser.add_argument("--period", type=int, help="Period in seconds")
f937c172
S
140
141args = parser.parse_args()
142
b794f42e 143
f937c172
S
144# Function to send OCPP command
145async def send_ocpp_command(cp, command_name, delay=None, period=None):
146 # If delay is not None, sleep for delay seconds
147 if delay:
148 await asyncio.sleep(delay)
149
150 # If period is not None, send command repeatedly with period interval
151 if period:
acf133cd 152
7c945b4a
S
153 async def send_command_repeatedly():
154 while True:
b5185375 155 command_name = await cp.receive_command()
7c945b4a
S
156 try:
157 match command_name:
39d32b71 158 case Action.ClearCache:
7c945b4a 159 logging.info("ClearCache parser working")
7068f7f3 160 await cp.send_clear_cache()
04578196 161 case Action.GetBaseReport:
7c945b4a 162 logging.info("GetBaseReport parser working")
12798afb 163 await cp.send_get_base_report()
7c945b4a
S
164 case _:
165 logging.warning(f"Unsupported command {command_name}")
acf133cd 166 except Exception:
167 logging.exception(
168 f"Failure while processing command {command_name}"
169 )
7c945b4a
S
170 finally:
171 await asyncio.sleep(period)
172
173 timer = RepeatTimer(period, send_command_repeatedly)
174 await timer.start()
175 await timer.wait_closed() # Wait for timer to finish before exiting
f937c172 176
f937c172 177 else:
b794f42e 178 if command_name == "GetBaseReport":
f937c172
S
179 await cp.send_get_base_report()
180
fa16d389
S
181
182# Function to handle new WebSocket connections.
183async def on_connect(websocket, path):
d4aa9700 184 """For every new charge point that connects, create a ChargePoint instance and start
d6488e8d
JB
185 listening for messages."""
186 try:
d4aa9700 187 requested_protocols = websocket.request_headers["Sec-WebSocket-Protocol"]
d6488e8d
JB
188 except KeyError:
189 logging.info("Client hasn't requested any Subprotocol. Closing Connection")
190 return await websocket.close()
1a0d2c47 191
d6488e8d
JB
192 if websocket.subprotocol:
193 logging.info("Protocols Matched: %s", websocket.subprotocol)
194 else:
d4aa9700
JB
195 logging.warning(
196 "Protocols Mismatched | Expected Subprotocols: %s,"
197 " but client supports %s | Closing connection",
198 websocket.available_subprotocols,
199 requested_protocols,
200 )
d6488e8d 201 return await websocket.close()
1a0d2c47 202
d4aa9700 203 charge_point_id = path.strip("/")
d6488e8d 204 cp = ChargePoint(charge_point_id, websocket)
1a0d2c47 205
a5c2d21f 206 ChargePoints.add(cp)
7628b7e6
JB
207 try:
208 await cp.start()
7c945b4a
S
209 # Check if request argument is specified
210 if args.request:
acf133cd 211 asyncio.create_task(
212 send_ocpp_command(cp, args.request, args.delay, args.period)
213 )
7c945b4a 214
7628b7e6
JB
215 except ConnectionClosed:
216 logging.info("ChargePoint %s closed connection", cp.id)
217 ChargePoints.remove(cp)
950b6c5a 218 logging.debug("Connected ChargePoint(s): %d", len(ChargePoints))
1a0d2c47 219
fa16d389
S
220
221# Main function to start the WebSocket server.
222async def main():
d6488e8d
JB
223 # Create the WebSocket server and specify the handler for new connections.
224 server = await websockets.serve(
225 on_connect,
d4aa9700 226 "127.0.0.1", # Listen on loopback.
d6488e8d 227 9000, # Port number.
d4aa9700 228 subprotocols=["ocpp2.0", "ocpp2.0.1"], # Specify OCPP 2.0.1 subprotocols.
d6488e8d
JB
229 )
230 logging.info("WebSocket Server Started")
f937c172 231
d6488e8d
JB
232 # Wait for the server to close (runs indefinitely).
233 await server.wait_closed()
1a0d2c47 234
fa16d389
S
235
236# Entry point of the script.
d4aa9700 237if __name__ == "__main__":
d6488e8d
JB
238 # Run the main function to start the server.
239 asyncio.run(main())