[autofix.ci] apply automated fixes
[e-mobility-charging-stations-simulator.git] / tests / ocpp-server / server.py
CommitLineData
b794f42e 1import argparse
fa16d389
S
2import asyncio
3import logging
fa16d389 4from datetime import datetime, timezone
1f71f83f 5from functools import partial
aea49501 6from threading import Timer
fa16d389 7
a89844d4 8import ocpp.v201
1a0d2c47 9import websockets
fa16d389 10from ocpp.routing import on
d4aa9700
JB
11from ocpp.v201.enums import (
12 Action,
13 AuthorizationStatusType,
14 ClearCacheStatusType,
15 RegistrationStatusType,
f937c172 16 ReportBaseType,
b794f42e 17 TransactionEventType,
d4aa9700 18)
a5c2d21f 19from websockets import ConnectionClosed
fa16d389
S
20
21# Setting up the logging configuration to display debug level messages.
22logging.basicConfig(level=logging.DEBUG)
23
a5c2d21f
JB
24ChargePoints = set()
25
1a0d2c47 26
aea49501 27class RepeatTimer(Timer):
d4aa9700 28 """Class that inherits from the Timer class. It will run a
aea49501
JB
29 function at regular intervals."""
30
31 def run(self):
32 while not self.finished.wait(self.interval):
33 self.function(*self.args, **self.kwargs)
34
35
fa16d389 36# Define a ChargePoint class inheriting from the OCPP 2.0.1 ChargePoint class.
a89844d4 37class ChargePoint(ocpp.v201.ChargePoint):
aea49501 38 # Message handlers to receive OCPP messages.
339f65ad 39 @on(Action.BootNotification)
d6488e8d 40 async def on_boot_notification(self, charging_station, reason, **kwargs):
7628b7e6 41 logging.info("Received %s", Action.BootNotification)
d6488e8d 42 # Create and return a BootNotification response with the current time,
5dd22b9f 43 # an interval of 60 seconds, and an accepted status.
339f65ad 44 return ocpp.v201.call_result.BootNotification(
d6488e8d 45 current_time=datetime.now(timezone.utc).isoformat(),
115f3b17 46 interval=60,
d4aa9700 47 status=RegistrationStatusType.accepted,
d6488e8d 48 )
1a0d2c47 49
115f3b17 50 @on(Action.Heartbeat)
5dd22b9f 51 async def on_heartbeat(self, **kwargs):
7628b7e6 52 logging.info("Received %s", Action.Heartbeat)
d4aa9700
JB
53 return ocpp.v201.call_result.Heartbeat(
54 current_time=datetime.now(timezone.utc).isoformat()
55 )
115f3b17 56
65c0600c 57 @on(Action.StatusNotification)
d4aa9700
JB
58 async def on_status_notification(
59 self, timestamp, evse_id: int, connector_id: int, connector_status, **kwargs
60 ):
7628b7e6 61 logging.info("Received %s", Action.StatusNotification)
65c0600c
JB
62 return ocpp.v201.call_result.StatusNotification()
63
5dd22b9f
JB
64 @on(Action.Authorize)
65 async def on_authorize(self, id_token, **kwargs):
7628b7e6 66 logging.info("Received %s", Action.Authorize)
5dd22b9f 67 return ocpp.v201.call_result.Authorize(
d4aa9700 68 id_token_info={"status": AuthorizationStatusType.accepted}
8430af0a 69 )
5dd22b9f 70
22c4f1fc 71 @on(Action.TransactionEvent)
d4aa9700
JB
72 async def on_transaction_event(
73 self,
74 event_type: TransactionEventType,
75 timestamp,
76 trigger_reason,
77 seq_no: int,
78 transaction_info,
79 **kwargs,
80 ):
22c4f1fc
JB
81 match event_type:
82 case TransactionEventType.started:
7628b7e6 83 logging.info("Received %s Started", Action.TransactionEvent)
22c4f1fc 84 return ocpp.v201.call_result.TransactionEvent(
d4aa9700 85 id_token_info={"status": AuthorizationStatusType.accepted}
8430af0a 86 )
22c4f1fc 87 case TransactionEventType.updated:
7628b7e6 88 logging.info("Received %s Updated", Action.TransactionEvent)
d4aa9700 89 return ocpp.v201.call_result.TransactionEvent(total_cost=10)
22c4f1fc 90 case TransactionEventType.ended:
7628b7e6 91 logging.info("Received %s Ended", Action.TransactionEvent)
22c4f1fc
JB
92 return ocpp.v201.call_result.TransactionEvent()
93
5dd22b9f 94 @on(Action.MeterValues)
c7f80bf9 95 async def on_meter_values(self, evse_id: int, meter_value, **kwargs):
7628b7e6 96 logging.info("Received %s", Action.MeterValues)
5dd22b9f
JB
97 return ocpp.v201.call_result.MeterValues()
98
f937c172 99 @on(Action.GetBaseReport)
1f71f83f 100 async def on_get_base_report(
101 self, request_id: int, report_base: ReportBaseType, **kwargs
102 ):
7c945b4a 103 logging.info("Received %s", Action.GetBaseReport)
b2254601 104 return ocpp.v201.call_result.GetBaseReport(
1f71f83f 105 id_token_info={"status": ReportBaseType.accepted}
106 )
f937c172 107
d6488e8d 108 # Request handlers to emit OCPP messages.
a89844d4
JB
109 async def send_clear_cache(self):
110 request = ocpp.v201.call.ClearCache()
111 response = await self.call(request)
112
113 if response.status == ClearCacheStatusType.accepted:
7628b7e6 114 logging.info("%s successful", Action.ClearCache)
a89844d4 115 else:
7628b7e6 116 logging.info("%s failed", Action.ClearCache)
1a0d2c47 117
f937c172 118 async def send_get_base_report(self):
b794f42e 119 request = ocpp.v201.call.GetBaseReport(
120 reportBase=ReportBaseType.ConfigurationInventory
121 ) # Use correct ReportBaseType
f937c172
S
122 try:
123 response = await self.call(request)
ba0a7592 124 logging.info("Send %s", Action.GetBaseReport)
f937c172 125
b2254601
S
126 if response.status == ReportBaseType.accepted:
127 logging.info("%s successful", Action.GetBaseReport)
f937c172 128 else:
b2254601 129 logging.info("%s failed", Action.GetBaseReport)
f937c172 130
b2254601
S
131 except ConnectionClosed as e:
132 logging.error("Connection closed: %s", str(e))
b794f42e 133
b2254601
S
134 except Exception as e:
135 logging.error("Unexpected error occurred: %s", str(e))
136
137 logging.info("send_get_base_report done.")
f937c172 138
f937c172
S
139
140# Function to send OCPP command
141async def send_ocpp_command(cp, command_name, delay=None, period=None):
142 # If delay is not None, sleep for delay seconds
143 if delay:
144 await asyncio.sleep(delay)
145
146 # If period is not None, send command repeatedly with period interval
147 if period:
1f71f83f 148
7c945b4a
S
149 async def send_command_repeatedly():
150 while True:
b5185375 151 command_name = await cp.receive_command()
7c945b4a
S
152 try:
153 match command_name:
39d32b71 154 case Action.ClearCache:
7c945b4a 155 logging.info("ClearCache parser working")
7068f7f3 156 await cp.send_clear_cache()
04578196 157 case Action.GetBaseReport:
7c945b4a 158 logging.info("GetBaseReport parser working")
12798afb 159 await cp.send_get_base_report()
7c945b4a
S
160 case _:
161 logging.warning(f"Unsupported command {command_name}")
1f71f83f 162 except Exception:
163 logging.exception(
164 f"Failure while processing command {command_name}"
165 )
7c945b4a
S
166 finally:
167 await asyncio.sleep(period)
168
169 timer = RepeatTimer(period, send_command_repeatedly)
170 await timer.start()
171 await timer.wait_closed() # Wait for timer to finish before exiting
f937c172 172
f937c172 173 else:
b794f42e 174 if command_name == "GetBaseReport":
f937c172
S
175 await cp.send_get_base_report()
176
fa16d389
S
177
178# Function to handle new WebSocket connections.
b2254601 179async def on_connect(websocket, path, args):
d4aa9700 180 """For every new charge point that connects, create a ChargePoint instance and start
d6488e8d
JB
181 listening for messages."""
182 try:
d4aa9700 183 requested_protocols = websocket.request_headers["Sec-WebSocket-Protocol"]
d6488e8d
JB
184 except KeyError:
185 logging.info("Client hasn't requested any Subprotocol. Closing Connection")
186 return await websocket.close()
1a0d2c47 187
d6488e8d
JB
188 if websocket.subprotocol:
189 logging.info("Protocols Matched: %s", websocket.subprotocol)
190 else:
d4aa9700
JB
191 logging.warning(
192 "Protocols Mismatched | Expected Subprotocols: %s,"
193 " but client supports %s | Closing connection",
194 websocket.available_subprotocols,
195 requested_protocols,
196 )
d6488e8d 197 return await websocket.close()
1a0d2c47 198
d4aa9700 199 charge_point_id = path.strip("/")
d6488e8d 200 cp = ChargePoint(charge_point_id, websocket)
1a0d2c47 201
a5c2d21f 202 ChargePoints.add(cp)
7628b7e6
JB
203 try:
204 await cp.start()
7c945b4a
S
205 # Check if request argument is specified
206 if args.request:
1f71f83f 207 asyncio.create_task(
208 send_ocpp_command(cp, args.request, args.delay, args.period)
209 )
7c945b4a 210
7628b7e6
JB
211 except ConnectionClosed:
212 logging.info("ChargePoint %s closed connection", cp.id)
213 ChargePoints.remove(cp)
950b6c5a 214 logging.debug("Connected ChargePoint(s): %d", len(ChargePoints))
1a0d2c47 215
fa16d389
S
216
217# Main function to start the WebSocket server.
218async def main():
b2254601
S
219 # Define argument parser
220 parser = argparse.ArgumentParser(description="OCPP Charge Point Simulator")
221 parser.add_argument("--request", type=str, help="OCPP 2 Command Name")
222 parser.add_argument("--delay", type=int, help="Delay in seconds")
223 parser.add_argument("--period", type=int, help="Period in seconds")
224
225 args = parser.parse_args()
226
1f71f83f 227 on_connect_bound = partial(on_connect, args=args) # Add args to on_connect
b2254601 228
d6488e8d
JB
229 # Create the WebSocket server and specify the handler for new connections.
230 server = await websockets.serve(
d4eddfae 231 on_connect_bound,
d4aa9700 232 "127.0.0.1", # Listen on loopback.
d6488e8d 233 9000, # Port number.
d4aa9700 234 subprotocols=["ocpp2.0", "ocpp2.0.1"], # Specify OCPP 2.0.1 subprotocols.
d6488e8d
JB
235 )
236 logging.info("WebSocket Server Started")
f937c172 237
d6488e8d
JB
238 # Wait for the server to close (runs indefinitely).
239 await server.wait_closed()
1a0d2c47 240
fa16d389
S
241
242# Entry point of the script.
d4aa9700 243if __name__ == "__main__":
d6488e8d
JB
244 # Run the main function to start the server.
245 asyncio.run(main())