3 from datetime
import datetime
, timezone
4 from threading
import Timer
9 from ocpp
.routing
import on
10 from ocpp
.v201
.enums
import (
12 AuthorizationStatusType
,
14 RegistrationStatusType
,
19 # Setting up the logging configuration to display debug level messages.
20 logging
.basicConfig(level
=logging
.DEBUG
)
23 class RepeatTimer(Timer
):
24 """Class that inherits from the Timer class. It will run a
25 function at regular intervals."""
28 while not self
.finished
.wait(self
.interval
):
29 self
.function(*self
.args
, **self
.kwargs
)
32 # Define a ChargePoint class inheriting from the OCPP 2.0.1 ChargePoint class.
33 class ChargePoint(ocpp
.v201
.ChargePoint
):
34 # Message handlers to receive OCPP messages.
35 @on(Action
.BootNotification
)
36 async def on_boot_notification(self
, charging_station
, reason
, **kwargs
):
37 logging
.info("Received BootNotification")
38 # Create and return a BootNotification response with the current time,
39 # an interval of 60 seconds, and an accepted status.
40 return ocpp
.v201
.call_result
.BootNotification(
41 current_time
=datetime
.now(timezone
.utc
).isoformat(),
43 status
=RegistrationStatusType
.accepted
,
47 async def on_heartbeat(self
, **kwargs
):
48 logging
.info("Received Heartbeat")
49 return ocpp
.v201
.call_result
.Heartbeat(
50 current_time
=datetime
.now(timezone
.utc
).isoformat()
53 @on(Action
.StatusNotification
)
54 async def on_status_notification(
55 self
, timestamp
, evse_id
: int, connector_id
: int, connector_status
, **kwargs
57 logging
.info("Received StatusNotification")
58 return ocpp
.v201
.call_result
.StatusNotification()
61 async def on_authorize(self
, id_token
, **kwargs
):
62 logging
.info("Received Authorize")
63 return ocpp
.v201
.call_result
.Authorize(
64 id_token_info
={"status": AuthorizationStatusType
.accepted
}
67 @on(Action
.TransactionEvent
)
68 async def on_transaction_event(
70 event_type
: TransactionEventType
,
78 case TransactionEventType
.started
:
79 logging
.info("Received TransactionEvent Started")
80 return ocpp
.v201
.call_result
.TransactionEvent(
81 id_token_info
={"status": AuthorizationStatusType
.accepted
}
83 case TransactionEventType
.updated
:
84 logging
.info("Received TransactionEvent Updated")
85 return ocpp
.v201
.call_result
.TransactionEvent(total_cost
=10)
86 case TransactionEventType
.ended
:
87 logging
.info("Received TransactionEvent Ended")
88 return ocpp
.v201
.call_result
.TransactionEvent()
90 @on(Action
.MeterValues
)
91 async def on_meter_values(self
, evse_id
: int, meter_value
, **kwargs
):
92 logging
.info("Received MeterValues")
93 return ocpp
.v201
.call_result
.MeterValues()
95 @on(Action
.GetBaseReport
)
96 async def on_get_base_report(self
, request_id
: int, report_base
: ReportBaseType
, **kwargs
):
97 logging
.info("Received GetBaseReport")
98 return ocpp
.v201
.call_result
.GetBaseReport(status
="Accepted")
100 # Request handlers to emit OCPP messages.
101 async def send_clear_cache(self
):
102 request
= ocpp
.v201
.call
.ClearCache()
103 response
= await self
.call(request
)
105 if response
.status
== ClearCacheStatusType
.accepted
:
106 logging
.info("Cache clearing successful")
108 logging
.info("Cache clearing failed")
110 async def send_get_base_report(self
):
111 logging
.info("Executing send_get_base_report...")
112 request
= ocpp
.v201
.call
.GetBaseReport(reportBase
=ReportBaseType
.ConfigurationInventory
) # Use correct ReportBaseType
114 response
= await self
.call(request
)
115 logging
.info("Send GetBaseReport")
117 if response
.status
== "Accepted": # Adjust depending on the structure of your response
118 logging
.info("Send GetBaseReport successful")
120 logging
.info("Send GetBaseReport failed")
121 except Exception as e
:
122 logging
.error(f
"Send GetBaseReport failed: {str(e)}")
123 logging
.info("send_get_base_report done.")
125 # Define argument parser
126 parser
= argparse
.ArgumentParser(description
='OCPP Charge Point Simulator')
127 parser
.add_argument('--request', type=str, help='OCPP 2 Command Name')
128 parser
.add_argument('--delay', type=int, help='Delay in seconds')
129 parser
.add_argument('--period', type=int, help='Period in seconds')
131 args
= parser
.parse_args()
133 # Function to send OCPP command
134 async def send_ocpp_command(cp
, command_name
, delay
=None, period
=None):
135 # If delay is not None, sleep for delay seconds
137 await asyncio
.sleep(delay
)
139 # If period is not None, send command repeatedly with period interval
142 if command_name
== 'GetBaseReport':
143 logging
.info("GetBaseReport parser working")
144 await cp
.send_get_base_report()
146 await asyncio
.sleep(period
)
148 if command_name
== 'GetBaseReport':
149 await cp
.send_get_base_report()
152 # Function to handle new WebSocket connections.
153 async def on_connect(websocket
, path
):
154 """For every new charge point that connects, create a ChargePoint instance and start
155 listening for messages."""
157 requested_protocols
= websocket
.request_headers
["Sec-WebSocket-Protocol"]
159 logging
.info("Client hasn't requested any Subprotocol. Closing Connection")
160 return await websocket
.close()
162 if websocket
.subprotocol
:
163 logging
.info("Protocols Matched: %s", websocket
.subprotocol
)
166 "Protocols Mismatched | Expected Subprotocols: %s,"
167 " but client supports %s | Closing connection",
168 websocket
.available_subprotocols
,
171 return await websocket
.close()
173 charge_point_id
= path
.strip("/")
174 cp
= ChargePoint(charge_point_id
, websocket
)
176 # Check if request argument is specified
178 asyncio
.create_task(send_ocpp_command(cp
, args
.request
, args
.delay
, args
.period
))
180 # Start the ChargePoint instance to listen for incoming messages.
184 # Main function to start the WebSocket server.
186 # Create the WebSocket server and specify the handler for new connections.
187 server
= await websockets
.serve(
189 "127.0.0.1", # Listen on loopback.
191 subprotocols
=["ocpp2.0", "ocpp2.0.1"], # Specify OCPP 2.0.1 subprotocols.
193 logging
.info("WebSocket Server Started")
195 # Create a ChargePoint instance
196 # websocket = await websockets.connect('ws://localhost:9000')
197 # cp = ChargePoint('test', websocket)
199 # Call send_ocpp_command function
200 # asyncio.create_task(send_ocpp_command(cp, args.request, args.delay, args.period))
202 # Wait for the server to close (runs indefinitely).
203 await server
.wait_closed()
206 # Entry point of the script.
207 if __name__
== "__main__":
208 # Run the main function to start the server.