refactor: move more OCPP command sending code to charge point class
[e-mobility-charging-stations-simulator.git] / tests / ocpp-server / server.py
... / ...
CommitLineData
1import argparse
2import asyncio
3import logging
4from datetime import datetime, timezone
5from threading import Timer
6
7import ocpp.v201
8import websockets
9from ocpp.routing import on
10from ocpp.v201.enums import (
11 Action,
12 AuthorizationStatusType,
13 ClearCacheStatusType,
14 GenericDeviceModelStatusType,
15 RegistrationStatusType,
16 ReportBaseType,
17 TransactionEventType,
18)
19from websockets import ConnectionClosed
20
21# Setting up the logging configuration to display debug level messages.
22logging.basicConfig(level=logging.DEBUG)
23
24ChargePoints = set()
25
26
27class RepeatTimer(Timer):
28 """Class that inherits from the Timer class. It will run a
29 function at regular intervals."""
30
31 def run(self):
32 while not self.finished.wait(self.interval):
33 self.function(*self.args, **self.kwargs)
34
35
36# Define a ChargePoint class inheriting from the OCPP 2.0.1 ChargePoint class.
37class ChargePoint(ocpp.v201.ChargePoint):
38 _command_timer: RepeatTimer
39
40 # Message handlers to receive OCPP messages.
41 @on(Action.BootNotification)
42 async def on_boot_notification(self, charging_station, reason, **kwargs):
43 logging.info("Received %s", Action.BootNotification)
44 # Create and return a BootNotification response with the current time,
45 # an interval of 60 seconds, and an accepted status.
46 return ocpp.v201.call_result.BootNotification(
47 current_time=datetime.now(timezone.utc).isoformat(),
48 interval=60,
49 status=RegistrationStatusType.accepted,
50 )
51
52 @on(Action.Heartbeat)
53 async def on_heartbeat(self, **kwargs):
54 logging.info("Received %s", Action.Heartbeat)
55 return ocpp.v201.call_result.Heartbeat(
56 current_time=datetime.now(timezone.utc).isoformat()
57 )
58
59 @on(Action.StatusNotification)
60 async def on_status_notification(
61 self, timestamp, evse_id: int, connector_id: int, connector_status, **kwargs
62 ):
63 logging.info("Received %s", Action.StatusNotification)
64 return ocpp.v201.call_result.StatusNotification()
65
66 @on(Action.Authorize)
67 async def on_authorize(self, id_token, **kwargs):
68 logging.info("Received %s", Action.Authorize)
69 return ocpp.v201.call_result.Authorize(
70 id_token_info={"status": AuthorizationStatusType.accepted}
71 )
72
73 @on(Action.TransactionEvent)
74 async def on_transaction_event(
75 self,
76 event_type: TransactionEventType,
77 timestamp,
78 trigger_reason,
79 seq_no: int,
80 transaction_info,
81 **kwargs,
82 ):
83 match event_type:
84 case TransactionEventType.started:
85 logging.info("Received %s Started", Action.TransactionEvent)
86 return ocpp.v201.call_result.TransactionEvent(
87 id_token_info={"status": AuthorizationStatusType.accepted}
88 )
89 case TransactionEventType.updated:
90 logging.info("Received %s Updated", Action.TransactionEvent)
91 return ocpp.v201.call_result.TransactionEvent(total_cost=10)
92 case TransactionEventType.ended:
93 logging.info("Received %s Ended", Action.TransactionEvent)
94 return ocpp.v201.call_result.TransactionEvent()
95
96 @on(Action.MeterValues)
97 async def on_meter_values(self, evse_id: int, meter_value, **kwargs):
98 logging.info("Received %s", Action.MeterValues)
99 return ocpp.v201.call_result.MeterValues()
100
101 @on(Action.GetBaseReport)
102 async def on_get_base_report(
103 self, request_id: int, report_base: ReportBaseType, **kwargs
104 ):
105 logging.info("Received %s", Action.GetBaseReport)
106 return ocpp.v201.call_result.GetBaseReport(
107 status=GenericDeviceModelStatusType.accepted
108 )
109
110 # Request handlers to emit OCPP messages.
111 async def _send_clear_cache(self):
112 request = ocpp.v201.call.ClearCache()
113 response = await self.call(request)
114
115 if response.status == ClearCacheStatusType.accepted:
116 logging.info("%s successful", Action.ClearCache)
117 else:
118 logging.info("%s failed", Action.ClearCache)
119
120 async def _send_get_base_report(self):
121 request = ocpp.v201.call.GetBaseReport(
122 request_id=1, report_base=ReportBaseType.full_inventory
123 )
124 response = await self.call(request)
125
126 if response.status == GenericDeviceModelStatusType.accepted:
127 logging.info("%s successful", Action.GetBaseReport)
128 else:
129 logging.info("%s failed", Action.GetBaseReport)
130
131 async def _send_command(self, command_name: Action):
132 logging.debug("Sending OCPP command: %s", command_name)
133 match command_name:
134 case Action.ClearCache:
135 await self._send_clear_cache()
136 case Action.GetBaseReport:
137 await self._send_get_base_report()
138 case _:
139 logging.info(f"Not supported command {command_name}")
140
141 async def send_command(self, command_name: Action, delay=None, period=None):
142 if not delay and not period:
143 raise ValueError("Either delay or period must be set")
144 try:
145 if delay and delay > 0:
146 await asyncio.sleep(delay)
147 await self._send_command(command_name)
148 if period and period > 0 and not self._command_timer:
149 self._command_timer = RepeatTimer(
150 period,
151 self._send_command,
152 [command_name],
153 )
154 self._command_timer.start()
155 except ConnectionClosed:
156 self.handle_connection_closed()
157
158 def handle_connection_closed(self):
159 logging.info("ChargePoint %s closed connection", self.id)
160 if self._command_timer:
161 self._command_timer.cancel()
162 ChargePoints.remove(self)
163 logging.debug("Connected ChargePoint(s): %d", len(ChargePoints))
164
165
166# Function to handle new WebSocket connections.
167async def on_connect(websocket, path):
168 """For every new charge point that connects, create a ChargePoint instance and start
169 listening for messages."""
170 try:
171 requested_protocols = websocket.request_headers["Sec-WebSocket-Protocol"]
172 except KeyError:
173 logging.info("Client hasn't requested any Subprotocol. Closing Connection")
174 return await websocket.close()
175
176 if websocket.subprotocol:
177 logging.info("Protocols Matched: %s", websocket.subprotocol)
178 else:
179 logging.warning(
180 "Protocols Mismatched | Expected Subprotocols: %s,"
181 " but client supports %s | Closing connection",
182 websocket.available_subprotocols,
183 requested_protocols,
184 )
185 return await websocket.close()
186
187 charge_point_id = path.strip("/")
188 cp = ChargePoint(charge_point_id, websocket)
189
190 ChargePoints.add(cp)
191 try:
192 await cp.start()
193 except ConnectionClosed:
194 cp.handle_connection_closed()
195
196
197# Main function to start the WebSocket server.
198async def main():
199 # Define argument parser
200 parser = argparse.ArgumentParser(description="OCPP2 Charge Point Simulator")
201 parser.add_argument("--command", type=str, help="OCPP2 Command Name")
202 parser.add_argument("--delay", type=int, help="Delay in seconds")
203 parser.add_argument("--period", type=int, help="Period in seconds")
204
205 # Create the WebSocket server and specify the handler for new connections.
206 server = await websockets.serve(
207 on_connect,
208 "127.0.0.1", # Listen on loopback.
209 9000, # Port number.
210 subprotocols=["ocpp2.0", "ocpp2.0.1"], # Specify OCPP 2.0.1 subprotocols.
211 )
212 logging.info("WebSocket Server Started")
213
214 args = parser.parse_args()
215
216 if args.command:
217 for cp in ChargePoints:
218 await asyncio.create_task(
219 cp.send_command(cp, args.command, args.delay, args.period)
220 )
221
222 # Wait for the server to close (runs indefinitely).
223 await server.wait_closed()
224
225
226# Entry point of the script.
227if __name__ == "__main__":
228 # Run the main function to start the server.
229 asyncio.run(main())