3 from functools
import partial
5 from datetime
import datetime
, timezone
6 from threading
import Timer
10 from ocpp
.routing
import on
11 from ocpp
.v201
.enums
import (
13 AuthorizationStatusType
,
15 RegistrationStatusType
,
19 from websockets
import ConnectionClosed
21 # Setting up the logging configuration to display debug level messages.
22 logging
.basicConfig(level
=logging
.DEBUG
)
27 class RepeatTimer(Timer
):
28 """Class that inherits from the Timer class. It will run a
29 function at regular intervals."""
32 while not self
.finished
.wait(self
.interval
):
33 self
.function(*self
.args
, **self
.kwargs
)
36 # Define a ChargePoint class inheriting from the OCPP 2.0.1 ChargePoint class.
37 class ChargePoint(ocpp
.v201
.ChargePoint
):
38 # Message handlers to receive OCPP messages.
39 @on(Action
.BootNotification
)
40 async def on_boot_notification(self
, charging_station
, reason
, **kwargs
):
41 logging
.info("Received %s", Action
.BootNotification
)
42 # Create and return a BootNotification response with the current time,
43 # an interval of 60 seconds, and an accepted status.
44 return ocpp
.v201
.call_result
.BootNotification(
45 current_time
=datetime
.now(timezone
.utc
).isoformat(),
47 status
=RegistrationStatusType
.accepted
,
51 async def on_heartbeat(self
, **kwargs
):
52 logging
.info("Received %s", Action
.Heartbeat
)
53 return ocpp
.v201
.call_result
.Heartbeat(
54 current_time
=datetime
.now(timezone
.utc
).isoformat()
57 @on(Action
.StatusNotification
)
58 async def on_status_notification(
59 self
, timestamp
, evse_id
: int, connector_id
: int, connector_status
, **kwargs
61 logging
.info("Received %s", Action
.StatusNotification
)
62 return ocpp
.v201
.call_result
.StatusNotification()
65 async def on_authorize(self
, id_token
, **kwargs
):
66 logging
.info("Received %s", Action
.Authorize
)
67 return ocpp
.v201
.call_result
.Authorize(
68 id_token_info
={"status": AuthorizationStatusType
.accepted
}
71 @on(Action
.TransactionEvent
)
72 async def on_transaction_event(
74 event_type
: TransactionEventType
,
82 case TransactionEventType
.started
:
83 logging
.info("Received %s Started", Action
.TransactionEvent
)
84 return ocpp
.v201
.call_result
.TransactionEvent(
85 id_token_info
={"status": AuthorizationStatusType
.accepted
}
87 case TransactionEventType
.updated
:
88 logging
.info("Received %s Updated", Action
.TransactionEvent
)
89 return ocpp
.v201
.call_result
.TransactionEvent(total_cost
=10)
90 case TransactionEventType
.ended
:
91 logging
.info("Received %s Ended", Action
.TransactionEvent
)
92 return ocpp
.v201
.call_result
.TransactionEvent()
94 @on(Action
.MeterValues
)
95 async def on_meter_values(self
, evse_id
: int, meter_value
, **kwargs
):
96 logging
.info("Received %s", Action
.MeterValues
)
97 return ocpp
.v201
.call_result
.MeterValues()
99 @on(Action
.GetBaseReport
)
100 async def on_get_base_report(self
, request_id
: int, report_base
: ReportBaseType
, **kwargs
):
101 logging
.info("Received %s", Action
.GetBaseReport
)
102 return ocpp
.v201
.call_result
.GetBaseReport(
103 id_token_info
={"status": ReportBaseType
.accepted
}
106 # Request handlers to emit OCPP messages.
107 async def send_clear_cache(self
):
108 request
= ocpp
.v201
.call
.ClearCache()
109 response
= await self
.call(request
)
111 if response
.status
== ClearCacheStatusType
.accepted
:
112 logging
.info("%s successful", Action
.ClearCache
)
114 logging
.info("%s failed", Action
.ClearCache
)
116 async def send_get_base_report(self
):
117 request
= ocpp
.v201
.call
.GetBaseReport(
118 reportBase
=ReportBaseType
.ConfigurationInventory
119 ) # Use correct ReportBaseType
121 response
= await self
.call(request
)
122 logging
.info("Send %s", Action
.GetBaseReport
)
124 if response
.status
== ReportBaseType
.accepted
:
125 logging
.info("%s successful", Action
.GetBaseReport
)
127 logging
.info("%s failed", Action
.GetBaseReport
)
129 except ConnectionClosed
as e
:
130 logging
.error("Connection closed: %s", str(e
))
132 except Exception as e
:
133 logging
.error("Unexpected error occurred: %s", str(e
))
135 logging
.info("send_get_base_report done.")
139 # Function to send OCPP command
140 async def send_ocpp_command(cp
, command_name
, delay
=None, period
=None):
141 # If delay is not None, sleep for delay seconds
143 await asyncio
.sleep(delay
)
145 # If period is not None, send command repeatedly with period interval
147 async def send_command_repeatedly():
149 command_name
= await cp
.receive_command()
152 case Action
.ClearCache
:
153 logging
.info("ClearCache parser working")
154 await cp
.send_clear_cache()
155 case Action
.GetBaseReport
:
156 logging
.info("GetBaseReport parser working")
157 await cp
.send_get_base_report()
159 logging
.warning(f
"Unsupported command {command_name}")
160 except Exception as e
:
161 logging
.exception(f
"Failure while processing command {command_name}")
163 await asyncio
.sleep(period
)
165 timer
= RepeatTimer(period
, send_command_repeatedly
)
167 await timer
.wait_closed() # Wait for timer to finish before exiting
170 if command_name
== "GetBaseReport":
171 await cp
.send_get_base_report()
174 # Function to handle new WebSocket connections.
175 async def on_connect(websocket
, path
, args
):
176 """For every new charge point that connects, create a ChargePoint instance and start
177 listening for messages."""
179 requested_protocols
= websocket
.request_headers
["Sec-WebSocket-Protocol"]
181 logging
.info("Client hasn't requested any Subprotocol. Closing Connection")
182 return await websocket
.close()
184 if websocket
.subprotocol
:
185 logging
.info("Protocols Matched: %s", websocket
.subprotocol
)
188 "Protocols Mismatched | Expected Subprotocols: %s,"
189 " but client supports %s | Closing connection",
190 websocket
.available_subprotocols
,
193 return await websocket
.close()
195 charge_point_id
= path
.strip("/")
196 cp
= ChargePoint(charge_point_id
, websocket
)
201 # Check if request argument is specified
203 asyncio
.create_task(send_ocpp_command(cp
, args
.request
, args
.delay
, args
.period
))
205 except ConnectionClosed
:
206 logging
.info("ChargePoint %s closed connection", cp
.id)
207 ChargePoints
.remove(cp
)
208 logging
.debug("Connected ChargePoint(s): %d", len(ChargePoints
))
211 # Main function to start the WebSocket server.
213 # Define argument parser
214 parser
= argparse
.ArgumentParser(description
="OCPP Charge Point Simulator")
215 parser
.add_argument("--request", type=str, help="OCPP 2 Command Name")
216 parser
.add_argument("--delay", type=int, help="Delay in seconds")
217 parser
.add_argument("--period", type=int, help="Period in seconds")
219 args
= parser
.parse_args()
221 on_connect_bound
= partial(on_connect
, args
=args
) # Add args to on_connect
223 # Create the WebSocket server and specify the handler for new connections.
224 server
= await websockets
.serve(
226 "127.0.0.1", # Listen on loopback.
228 subprotocols
=["ocpp2.0", "ocpp2.0.1"], # Specify OCPP 2.0.1 subprotocols.
230 logging
.info("WebSocket Server Started")
232 # Wait for the server to close (runs indefinitely).
233 await server
.wait_closed()
236 # Entry point of the script.
237 if __name__
== "__main__":
238 # Run the main function to start the server.