test(ocpp-server): add RepeatTimer class to allow to emit OCPP messages on a regular
[e-mobility-charging-stations-simulator.git] / tests / ocpp-server / server.py
CommitLineData
fa16d389
S
1import asyncio
2import logging
fa16d389 3from datetime import datetime, timezone
aea49501 4from threading import Timer
d6488e8d 5from typing import Sequence
fa16d389 6
a89844d4 7import ocpp.v201
1a0d2c47 8import websockets
fa16d389 9from ocpp.routing import on
fa16d389 10from ocpp.v201 import call_result
a89844d4 11from ocpp.v201.enums import RegistrationStatusType, ClearCacheStatusType
fa16d389
S
12
13# Setting up the logging configuration to display debug level messages.
14logging.basicConfig(level=logging.DEBUG)
15
1a0d2c47 16
aea49501
JB
17class RepeatTimer(Timer):
18 """ Class that inherits from the Timer class. It will run a
19 function at regular intervals."""
20
21 def run(self):
22 while not self.finished.wait(self.interval):
23 self.function(*self.args, **self.kwargs)
24
25
fa16d389 26# Define a ChargePoint class inheriting from the OCPP 2.0.1 ChargePoint class.
a89844d4 27class ChargePoint(ocpp.v201.ChargePoint):
aea49501 28 # Message handlers to receive OCPP messages.
d6488e8d
JB
29 @on('BootNotification')
30 async def on_boot_notification(self, charging_station, reason, **kwargs):
31 logging.info("Received BootNotification")
32 # Create and return a BootNotification response with the current time,
33 # an interval of 10 seconds, and an accepted status.
34 return call_result.BootNotification(
35 current_time=datetime.now(timezone.utc).isoformat(),
36 interval=10,
37 status=RegistrationStatusType.accepted
38 )
1a0d2c47 39
d6488e8d 40 # Request handlers to emit OCPP messages.
a89844d4
JB
41 async def send_clear_cache(self):
42 request = ocpp.v201.call.ClearCache()
43 response = await self.call(request)
44
45 if response.status == ClearCacheStatusType.accepted:
e1c2dac9 46 logging.info("Cache clearing successful")
a89844d4
JB
47 else:
48 logging.info("Cache clearing failed")
1a0d2c47 49
fa16d389
S
50
51# Function to handle new WebSocket connections.
52async def on_connect(websocket, path):
d6488e8d
JB
53 """ For every new charge point that connects, create a ChargePoint instance and start
54 listening for messages."""
55 try:
56 requested_protocols = websocket.request_headers['Sec-WebSocket-Protocol']
57 except KeyError:
58 logging.info("Client hasn't requested any Subprotocol. Closing Connection")
59 return await websocket.close()
1a0d2c47 60
d6488e8d
JB
61 if websocket.subprotocol:
62 logging.info("Protocols Matched: %s", websocket.subprotocol)
63 else:
64 logging.warning('Protocols Mismatched | Expected Subprotocols: %s,'
65 ' but client supports %s | Closing connection',
66 websocket.available_subprotocols,
67 requested_protocols)
68 return await websocket.close()
1a0d2c47 69
d6488e8d
JB
70 charge_point_id = path.strip('/')
71 cp = ChargePoint(charge_point_id, websocket)
1a0d2c47 72
d6488e8d
JB
73 # Start the ChargePoint instance to listen for incoming messages.
74 await cp.start()
1a0d2c47 75
fa16d389
S
76
77# Main function to start the WebSocket server.
78async def main():
d6488e8d
JB
79 # Create the WebSocket server and specify the handler for new connections.
80 server = await websockets.serve(
81 on_connect,
82 '127.0.0.1', # Listen on loopback.
83 9000, # Port number.
84 subprotocols=Sequence['ocpp2.0', 'ocpp2.0.1'] # Specify OCPP 2.0.1 subprotocols.
85 )
86 logging.info("WebSocket Server Started")
87 # Wait for the server to close (runs indefinitely).
88 await server.wait_closed()
1a0d2c47 89
fa16d389
S
90
91# Entry point of the script.
92if __name__ == '__main__':
d6488e8d
JB
93 # Run the main function to start the server.
94 asyncio.run(main())