4 from datetime
import datetime
, timezone
5 from threading
import Timer
9 from ocpp
.routing
import on
10 from ocpp
.v201
.enums
import (
12 AuthorizationStatusType
,
14 GenericDeviceModelStatusType
,
15 RegistrationStatusType
,
19 from websockets
import ConnectionClosed
21 # Setting up the logging configuration to display debug level messages.
22 logging
.basicConfig(level
=logging
.DEBUG
)
27 class RepeatTimer(Timer
):
28 """Class that inherits from the Timer class. It will run a
29 function at regular intervals."""
32 while not self
.finished
.wait(self
.interval
):
33 self
.function(*self
.args
, **self
.kwargs
)
36 # Define a ChargePoint class inheriting from the OCPP 2.0.1 ChargePoint class.
37 class ChargePoint(ocpp
.v201
.ChargePoint
):
38 _command_timer
: RepeatTimer
40 # Message handlers to receive OCPP messages.
41 @on(Action
.BootNotification
)
42 async def on_boot_notification(self
, charging_station
, reason
, **kwargs
):
43 logging
.info("Received %s", Action
.BootNotification
)
44 # Create and return a BootNotification response with the current time,
45 # an interval of 60 seconds, and an accepted status.
46 return ocpp
.v201
.call_result
.BootNotification(
47 current_time
=datetime
.now(timezone
.utc
).isoformat(),
49 status
=RegistrationStatusType
.accepted
,
53 async def on_heartbeat(self
, **kwargs
):
54 logging
.info("Received %s", Action
.Heartbeat
)
55 return ocpp
.v201
.call_result
.Heartbeat(
56 current_time
=datetime
.now(timezone
.utc
).isoformat()
59 @on(Action
.StatusNotification
)
60 async def on_status_notification(
61 self
, timestamp
, evse_id
: int, connector_id
: int, connector_status
, **kwargs
63 logging
.info("Received %s", Action
.StatusNotification
)
64 return ocpp
.v201
.call_result
.StatusNotification()
67 async def on_authorize(self
, id_token
, **kwargs
):
68 logging
.info("Received %s", Action
.Authorize
)
69 return ocpp
.v201
.call_result
.Authorize(
70 id_token_info
={"status": AuthorizationStatusType
.accepted
}
73 @on(Action
.TransactionEvent
)
74 async def on_transaction_event(
76 event_type
: TransactionEventType
,
84 case TransactionEventType
.started
:
85 logging
.info("Received %s Started", Action
.TransactionEvent
)
86 return ocpp
.v201
.call_result
.TransactionEvent(
87 id_token_info
={"status": AuthorizationStatusType
.accepted
}
89 case TransactionEventType
.updated
:
90 logging
.info("Received %s Updated", Action
.TransactionEvent
)
91 return ocpp
.v201
.call_result
.TransactionEvent(total_cost
=10)
92 case TransactionEventType
.ended
:
93 logging
.info("Received %s Ended", Action
.TransactionEvent
)
94 return ocpp
.v201
.call_result
.TransactionEvent()
96 @on(Action
.MeterValues
)
97 async def on_meter_values(self
, evse_id
: int, meter_value
, **kwargs
):
98 logging
.info("Received %s", Action
.MeterValues
)
99 return ocpp
.v201
.call_result
.MeterValues()
101 @on(Action
.GetBaseReport
)
102 async def on_get_base_report(
103 self
, request_id
: int, report_base
: ReportBaseType
, **kwargs
105 logging
.info("Received %s", Action
.GetBaseReport
)
106 return ocpp
.v201
.call_result
.GetBaseReport(
107 status
=GenericDeviceModelStatusType
.accepted
110 # Request handlers to emit OCPP messages.
111 async def _send_clear_cache(self
):
112 request
= ocpp
.v201
.call
.ClearCache()
113 response
= await self
.call(request
)
115 if response
.status
== ClearCacheStatusType
.accepted
:
116 logging
.info("%s successful", Action
.ClearCache
)
118 logging
.info("%s failed", Action
.ClearCache
)
120 async def _send_get_base_report(self
):
121 request
= ocpp
.v201
.call
.GetBaseReport(
122 request_id
=1, report_base
=ReportBaseType
.full_inventory
124 response
= await self
.call(request
)
126 if response
.status
== GenericDeviceModelStatusType
.accepted
:
127 logging
.info("%s successful", Action
.GetBaseReport
)
129 logging
.info("%s failed", Action
.GetBaseReport
)
131 async def _send_command(self
, command_name
: Action
):
132 logging
.debug("Sending OCPP command: %s", command_name
)
134 case Action
.ClearCache
:
135 await self
._send
_clear
_cache
()
136 case Action
.GetBaseReport
:
137 await self
._send
_get
_base
_report
()
139 logging
.info(f
"Not supported command {command_name}")
141 async def send_command(self
, command_name
: Action
, delay
=None, period
=None):
142 if not delay
and not period
:
143 raise ValueError("Either delay or period must be set")
145 if delay
and delay
> 0:
146 await asyncio
.sleep(delay
)
147 await self
._send
_command
(command_name
)
148 if period
and period
> 0 and not self
._command
_timer
:
149 self
._command
_timer
= RepeatTimer(
154 self
._command
_timer
.start()
155 except ConnectionClosed
:
156 self
.handle_connection_closed()
158 def handle_connection_closed(self
):
159 logging
.info("ChargePoint %s closed connection", self
.id)
160 if self
._command
_timer
:
161 self
._command
_timer
.cancel()
162 ChargePoints
.remove(self
)
163 logging
.debug("Connected ChargePoint(s): %d", len(ChargePoints
))
166 # Function to handle new WebSocket connections.
167 async def on_connect(websocket
, path
):
168 """For every new charge point that connects, create a ChargePoint instance and start
169 listening for messages."""
171 requested_protocols
= websocket
.request_headers
["Sec-WebSocket-Protocol"]
173 logging
.info("Client hasn't requested any Subprotocol. Closing Connection")
174 return await websocket
.close()
176 if websocket
.subprotocol
:
177 logging
.info("Protocols Matched: %s", websocket
.subprotocol
)
180 "Protocols Mismatched | Expected Subprotocols: %s,"
181 " but client supports %s | Closing connection",
182 websocket
.available_subprotocols
,
185 return await websocket
.close()
187 charge_point_id
= path
.strip("/")
188 cp
= ChargePoint(charge_point_id
, websocket
)
193 except ConnectionClosed
:
194 cp
.handle_connection_closed()
197 # Main function to start the WebSocket server.
199 # Define argument parser
200 parser
= argparse
.ArgumentParser(description
="OCPP2 Charge Point Simulator")
201 parser
.add_argument("--command", type=str, help="OCPP2 Command Name")
202 parser
.add_argument("--delay", type=int, help="Delay in seconds")
203 parser
.add_argument("--period", type=int, help="Period in seconds")
205 # Create the WebSocket server and specify the handler for new connections.
206 server
= await websockets
.serve(
208 "127.0.0.1", # Listen on loopback.
210 subprotocols
=["ocpp2.0", "ocpp2.0.1"], # Specify OCPP 2.0.1 subprotocols.
212 logging
.info("WebSocket Server Started")
214 args
= parser
.parse_args()
217 for cp
in ChargePoints
:
218 await asyncio
.create_task(
219 cp
.send_command(cp
, args
.command
, args
.delay
, args
.period
)
222 # Wait for the server to close (runs indefinitely).
223 await server
.wait_closed()
226 # Entry point of the script.
227 if __name__
== "__main__":
228 # Run the main function to start the server.