test: align OCPP2 server messages
[e-mobility-charging-stations-simulator.git] / tests / ocpp-server / server.py
CommitLineData
fa16d389
S
1import asyncio
2import logging
fa16d389 3from datetime import datetime, timezone
d6488e8d 4from typing import Sequence
fa16d389 5
a89844d4 6import ocpp.v201
1a0d2c47 7import websockets
fa16d389 8from ocpp.routing import on
fa16d389 9from ocpp.v201 import call_result
a89844d4 10from ocpp.v201.enums import RegistrationStatusType, ClearCacheStatusType
fa16d389
S
11
12# Setting up the logging configuration to display debug level messages.
13logging.basicConfig(level=logging.DEBUG)
14
1a0d2c47 15
fa16d389 16# Define a ChargePoint class inheriting from the OCPP 2.0.1 ChargePoint class.
a89844d4 17class ChargePoint(ocpp.v201.ChargePoint):
d6488e8d
JB
18 # Message handlers to receive OCPP message.
19 @on('BootNotification')
20 async def on_boot_notification(self, charging_station, reason, **kwargs):
21 logging.info("Received BootNotification")
22 # Create and return a BootNotification response with the current time,
23 # an interval of 10 seconds, and an accepted status.
24 return call_result.BootNotification(
25 current_time=datetime.now(timezone.utc).isoformat(),
26 interval=10,
27 status=RegistrationStatusType.accepted
28 )
1a0d2c47 29
d6488e8d 30 # Request handlers to emit OCPP messages.
a89844d4
JB
31 async def send_clear_cache(self):
32 request = ocpp.v201.call.ClearCache()
33 response = await self.call(request)
34
35 if response.status == ClearCacheStatusType.accepted:
e1c2dac9 36 logging.info("Cache clearing successful")
a89844d4
JB
37 else:
38 logging.info("Cache clearing failed")
1a0d2c47 39
fa16d389
S
40
41# Function to handle new WebSocket connections.
42async def on_connect(websocket, path):
d6488e8d
JB
43 """ For every new charge point that connects, create a ChargePoint instance and start
44 listening for messages."""
45 try:
46 requested_protocols = websocket.request_headers['Sec-WebSocket-Protocol']
47 except KeyError:
48 logging.info("Client hasn't requested any Subprotocol. Closing Connection")
49 return await websocket.close()
1a0d2c47 50
d6488e8d
JB
51 if websocket.subprotocol:
52 logging.info("Protocols Matched: %s", websocket.subprotocol)
53 else:
54 logging.warning('Protocols Mismatched | Expected Subprotocols: %s,'
55 ' but client supports %s | Closing connection',
56 websocket.available_subprotocols,
57 requested_protocols)
58 return await websocket.close()
1a0d2c47 59
d6488e8d
JB
60 charge_point_id = path.strip('/')
61 cp = ChargePoint(charge_point_id, websocket)
1a0d2c47 62
d6488e8d
JB
63 # Start the ChargePoint instance to listen for incoming messages.
64 await cp.start()
1a0d2c47 65
fa16d389
S
66
67# Main function to start the WebSocket server.
68async def main():
d6488e8d
JB
69 # Create the WebSocket server and specify the handler for new connections.
70 server = await websockets.serve(
71 on_connect,
72 '127.0.0.1', # Listen on loopback.
73 9000, # Port number.
74 subprotocols=Sequence['ocpp2.0', 'ocpp2.0.1'] # Specify OCPP 2.0.1 subprotocols.
75 )
76 logging.info("WebSocket Server Started")
77 # Wait for the server to close (runs indefinitely).
78 await server.wait_closed()
1a0d2c47 79
fa16d389
S
80
81# Entry point of the script.
82if __name__ == '__main__':
d6488e8d
JB
83 # Run the main function to start the server.
84 asyncio.run(main())