4 from datetime
import datetime
, timezone
5 from threading
import Timer
9 from ocpp
.routing
import on
10 from ocpp
.v201
.enums
import (
12 AuthorizationStatusType
,
14 RegistrationStatusType
,
18 from websockets
import ConnectionClosed
20 # Setting up the logging configuration to display debug level messages.
21 logging
.basicConfig(level
=logging
.DEBUG
)
26 class RepeatTimer(Timer
):
27 """Class that inherits from the Timer class. It will run a
28 function at regular intervals."""
31 while not self
.finished
.wait(self
.interval
):
32 self
.function(*self
.args
, **self
.kwargs
)
35 # Define a ChargePoint class inheriting from the OCPP 2.0.1 ChargePoint class.
36 class ChargePoint(ocpp
.v201
.ChargePoint
):
37 # Message handlers to receive OCPP messages.
38 @on(Action
.BootNotification
)
39 async def on_boot_notification(self
, charging_station
, reason
, **kwargs
):
40 logging
.info("Received %s", Action
.BootNotification
)
41 # Create and return a BootNotification response with the current time,
42 # an interval of 60 seconds, and an accepted status.
43 return ocpp
.v201
.call_result
.BootNotification(
44 current_time
=datetime
.now(timezone
.utc
).isoformat(),
46 status
=RegistrationStatusType
.accepted
,
50 async def on_heartbeat(self
, **kwargs
):
51 logging
.info("Received %s", Action
.Heartbeat
)
52 return ocpp
.v201
.call_result
.Heartbeat(
53 current_time
=datetime
.now(timezone
.utc
).isoformat()
56 @on(Action
.StatusNotification
)
57 async def on_status_notification(
58 self
, timestamp
, evse_id
: int, connector_id
: int, connector_status
, **kwargs
60 logging
.info("Received %s", Action
.StatusNotification
)
61 return ocpp
.v201
.call_result
.StatusNotification()
64 async def on_authorize(self
, id_token
, **kwargs
):
65 logging
.info("Received %s", Action
.Authorize
)
66 return ocpp
.v201
.call_result
.Authorize(
67 id_token_info
={"status": AuthorizationStatusType
.accepted
}
70 @on(Action
.TransactionEvent
)
71 async def on_transaction_event(
73 event_type
: TransactionEventType
,
81 case TransactionEventType
.started
:
82 logging
.info("Received %s Started", Action
.TransactionEvent
)
83 return ocpp
.v201
.call_result
.TransactionEvent(
84 id_token_info
={"status": AuthorizationStatusType
.accepted
}
86 case TransactionEventType
.updated
:
87 logging
.info("Received %s Updated", Action
.TransactionEvent
)
88 return ocpp
.v201
.call_result
.TransactionEvent(total_cost
=10)
89 case TransactionEventType
.ended
:
90 logging
.info("Received %s Ended", Action
.TransactionEvent
)
91 return ocpp
.v201
.call_result
.TransactionEvent()
93 @on(Action
.MeterValues
)
94 async def on_meter_values(self
, evse_id
: int, meter_value
, **kwargs
):
95 logging
.info("Received %s", Action
.MeterValues
)
96 return ocpp
.v201
.call_result
.MeterValues()
98 @on(Action
.GetBaseReport
)
99 async def on_get_base_report(
100 self
, request_id
: int, report_base
: ReportBaseType
, **kwargs
102 logging
.info("Received GetBaseReport")
103 return ocpp
.v201
.call_result
.GetBaseReport(status
="Accepted")
105 # Request handlers to emit OCPP messages.
106 async def send_clear_cache(self
):
107 request
= ocpp
.v201
.call
.ClearCache()
108 response
= await self
.call(request
)
110 if response
.status
== ClearCacheStatusType
.accepted
:
111 logging
.info("%s successful", Action
.ClearCache
)
113 logging
.info("%s failed", Action
.ClearCache
)
115 async def send_get_base_report(self
):
116 logging
.info("Executing send_get_base_report...")
117 request
= ocpp
.v201
.call
.GetBaseReport(
118 reportBase
=ReportBaseType
.ConfigurationInventory
119 ) # Use correct ReportBaseType
121 response
= await self
.call(request
)
122 logging
.info("Send GetBaseReport")
125 response
.status
== "Accepted"
126 ): # Adjust depending on the structure of your response
127 logging
.info("Send GetBaseReport successful")
129 logging
.info("Send GetBaseReport failed")
130 except Exception as e
:
131 logging
.error(f
"Send GetBaseReport failed: {str(e)}")
132 logging
.info("send_get_base_report done.")
135 # Define argument parser
136 parser
= argparse
.ArgumentParser(description
="OCPP Charge Point Simulator")
137 parser
.add_argument("--request", type=str, help="OCPP 2 Command Name")
138 parser
.add_argument("--delay", type=int, help="Delay in seconds")
139 parser
.add_argument("--period", type=int, help="Period in seconds")
141 args
= parser
.parse_args()
144 # Function to send OCPP command
145 async def send_ocpp_command(cp
, command_name
, delay
=None, period
=None):
146 # If delay is not None, sleep for delay seconds
148 await asyncio
.sleep(delay
)
150 # If period is not None, send command repeatedly with period interval
153 if command_name
== "GetBaseReport":
154 logging
.info("GetBaseReport parser working")
155 await cp
.send_get_base_report()
157 await asyncio
.sleep(period
)
159 if command_name
== "GetBaseReport":
160 await cp
.send_get_base_report()
163 # Function to handle new WebSocket connections.
164 async def on_connect(websocket
, path
):
165 """For every new charge point that connects, create a ChargePoint instance and start
166 listening for messages."""
168 requested_protocols
= websocket
.request_headers
["Sec-WebSocket-Protocol"]
170 logging
.info("Client hasn't requested any Subprotocol. Closing Connection")
171 return await websocket
.close()
173 if websocket
.subprotocol
:
174 logging
.info("Protocols Matched: %s", websocket
.subprotocol
)
177 "Protocols Mismatched | Expected Subprotocols: %s,"
178 " but client supports %s | Closing connection",
179 websocket
.available_subprotocols
,
182 return await websocket
.close()
184 charge_point_id
= path
.strip("/")
185 cp
= ChargePoint(charge_point_id
, websocket
)
187 # Check if request argument is specified
190 send_ocpp_command(cp
, args
.request
, args
.delay
, args
.period
)
193 # Start the ChargePoint instance to listen for incoming messages.
199 except ConnectionClosed
:
200 logging
.info("ChargePoint %s closed connection", cp
.id)
201 ChargePoints
.remove(cp
)
202 logging
.debug("Connected ChargePoint(s): %d", len(ChargePoints
))
205 # Main function to start the WebSocket server.
207 # Create the WebSocket server and specify the handler for new connections.
208 server
= await websockets
.serve(
210 "127.0.0.1", # Listen on loopback.
212 subprotocols
=["ocpp2.0", "ocpp2.0.1"], # Specify OCPP 2.0.1 subprotocols.
214 logging
.info("WebSocket Server Started")
216 # Wait for the server to close (runs indefinitely).
217 await server
.wait_closed()
220 # Entry point of the script.
221 if __name__
== "__main__":
222 # Run the main function to start the server.