refactor(ocpp-server): format code with black
[e-mobility-charging-stations-simulator.git] / tests / ocpp-server / server.py
CommitLineData
fa16d389
S
1import asyncio
2import logging
fa16d389 3from datetime import datetime, timezone
aea49501 4from threading import Timer
fa16d389 5
a89844d4 6import ocpp.v201
1a0d2c47 7import websockets
fa16d389 8from ocpp.routing import on
d4aa9700
JB
9from ocpp.v201.enums import (
10 Action,
11 AuthorizationStatusType,
12 ClearCacheStatusType,
13 RegistrationStatusType,
14 TransactionEventType,
15)
fa16d389
S
16
17# Setting up the logging configuration to display debug level messages.
18logging.basicConfig(level=logging.DEBUG)
19
1a0d2c47 20
aea49501 21class RepeatTimer(Timer):
d4aa9700 22 """Class that inherits from the Timer class. It will run a
aea49501
JB
23 function at regular intervals."""
24
25 def run(self):
26 while not self.finished.wait(self.interval):
27 self.function(*self.args, **self.kwargs)
28
29
fa16d389 30# Define a ChargePoint class inheriting from the OCPP 2.0.1 ChargePoint class.
a89844d4 31class ChargePoint(ocpp.v201.ChargePoint):
aea49501 32 # Message handlers to receive OCPP messages.
339f65ad 33 @on(Action.BootNotification)
d6488e8d
JB
34 async def on_boot_notification(self, charging_station, reason, **kwargs):
35 logging.info("Received BootNotification")
36 # Create and return a BootNotification response with the current time,
5dd22b9f 37 # an interval of 60 seconds, and an accepted status.
339f65ad 38 return ocpp.v201.call_result.BootNotification(
d6488e8d 39 current_time=datetime.now(timezone.utc).isoformat(),
115f3b17 40 interval=60,
d4aa9700 41 status=RegistrationStatusType.accepted,
d6488e8d 42 )
1a0d2c47 43
115f3b17 44 @on(Action.Heartbeat)
5dd22b9f 45 async def on_heartbeat(self, **kwargs):
115f3b17 46 logging.info("Received Heartbeat")
d4aa9700
JB
47 return ocpp.v201.call_result.Heartbeat(
48 current_time=datetime.now(timezone.utc).isoformat()
49 )
115f3b17 50
65c0600c 51 @on(Action.StatusNotification)
d4aa9700
JB
52 async def on_status_notification(
53 self, timestamp, evse_id: int, connector_id: int, connector_status, **kwargs
54 ):
65c0600c
JB
55 logging.info("Received StatusNotification")
56 return ocpp.v201.call_result.StatusNotification()
57
5dd22b9f
JB
58 @on(Action.Authorize)
59 async def on_authorize(self, id_token, **kwargs):
60 logging.info("Received Authorize")
61 return ocpp.v201.call_result.Authorize(
d4aa9700 62 id_token_info={"status": AuthorizationStatusType.accepted}
8430af0a 63 )
5dd22b9f 64
22c4f1fc 65 @on(Action.TransactionEvent)
d4aa9700
JB
66 async def on_transaction_event(
67 self,
68 event_type: TransactionEventType,
69 timestamp,
70 trigger_reason,
71 seq_no: int,
72 transaction_info,
73 **kwargs,
74 ):
22c4f1fc
JB
75 match event_type:
76 case TransactionEventType.started:
77 logging.info("Received TransactionEvent Started")
78 return ocpp.v201.call_result.TransactionEvent(
d4aa9700 79 id_token_info={"status": AuthorizationStatusType.accepted}
8430af0a 80 )
22c4f1fc
JB
81 case TransactionEventType.updated:
82 logging.info("Received TransactionEvent Updated")
d4aa9700 83 return ocpp.v201.call_result.TransactionEvent(total_cost=10)
22c4f1fc
JB
84 case TransactionEventType.ended:
85 logging.info("Received TransactionEvent Ended")
86 return ocpp.v201.call_result.TransactionEvent()
87
5dd22b9f 88 @on(Action.MeterValues)
c7f80bf9 89 async def on_meter_values(self, evse_id: int, meter_value, **kwargs):
5dd22b9f
JB
90 logging.info("Received MeterValues")
91 return ocpp.v201.call_result.MeterValues()
92
d6488e8d 93 # Request handlers to emit OCPP messages.
a89844d4
JB
94 async def send_clear_cache(self):
95 request = ocpp.v201.call.ClearCache()
96 response = await self.call(request)
97
98 if response.status == ClearCacheStatusType.accepted:
e1c2dac9 99 logging.info("Cache clearing successful")
a89844d4
JB
100 else:
101 logging.info("Cache clearing failed")
1a0d2c47 102
fa16d389
S
103
104# Function to handle new WebSocket connections.
105async def on_connect(websocket, path):
d4aa9700 106 """For every new charge point that connects, create a ChargePoint instance and start
d6488e8d
JB
107 listening for messages."""
108 try:
d4aa9700 109 requested_protocols = websocket.request_headers["Sec-WebSocket-Protocol"]
d6488e8d
JB
110 except KeyError:
111 logging.info("Client hasn't requested any Subprotocol. Closing Connection")
112 return await websocket.close()
1a0d2c47 113
d6488e8d
JB
114 if websocket.subprotocol:
115 logging.info("Protocols Matched: %s", websocket.subprotocol)
116 else:
d4aa9700
JB
117 logging.warning(
118 "Protocols Mismatched | Expected Subprotocols: %s,"
119 " but client supports %s | Closing connection",
120 websocket.available_subprotocols,
121 requested_protocols,
122 )
d6488e8d 123 return await websocket.close()
1a0d2c47 124
d4aa9700 125 charge_point_id = path.strip("/")
d6488e8d 126 cp = ChargePoint(charge_point_id, websocket)
1a0d2c47 127
d6488e8d
JB
128 # Start the ChargePoint instance to listen for incoming messages.
129 await cp.start()
1a0d2c47 130
fa16d389
S
131
132# Main function to start the WebSocket server.
133async def main():
d6488e8d
JB
134 # Create the WebSocket server and specify the handler for new connections.
135 server = await websockets.serve(
136 on_connect,
d4aa9700 137 "127.0.0.1", # Listen on loopback.
d6488e8d 138 9000, # Port number.
d4aa9700 139 subprotocols=["ocpp2.0", "ocpp2.0.1"], # Specify OCPP 2.0.1 subprotocols.
d6488e8d
JB
140 )
141 logging.info("WebSocket Server Started")
142 # Wait for the server to close (runs indefinitely).
143 await server.wait_closed()
1a0d2c47 144
fa16d389
S
145
146# Entry point of the script.
d4aa9700 147if __name__ == "__main__":
d6488e8d
JB
148 # Run the main function to start the server.
149 asyncio.run(main())