X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=tests%2Focpp-server%2Fserver.py;h=c8748763129300dbe54de6980f159ad784a57952;hb=22c4f1fc7fe1dc966a280de7f66e2f9fe592fbd2;hp=e8e1e6afce41bd6c4ee7798a9c90b3f04870c5be;hpb=c11be92a38d40495df5ec37b9ff946993c4dc84f;p=e-mobility-charging-stations-simulator.git diff --git a/tests/ocpp-server/server.py b/tests/ocpp-server/server.py index e8e1e6af..c8748763 100644 --- a/tests/ocpp-server/server.py +++ b/tests/ocpp-server/server.py @@ -1,50 +1,90 @@ import asyncio import logging -import websockets from datetime import datetime, timezone +from threading import Timer +import ocpp.v201 +import websockets from ocpp.routing import on -from ocpp.v201 import ChargePoint as cp -from ocpp.v201 import call_result -from ocpp.v201.enums import RegistrationStatusType, GenericDeviceModelStatusType +from ocpp.v201.enums import RegistrationStatusType, ClearCacheStatusType, AuthorizationStatusType, \ + TransactionEventType, \ + Action # Setting up the logging configuration to display debug level messages. logging.basicConfig(level=logging.DEBUG) + +class RepeatTimer(Timer): + """ Class that inherits from the Timer class. It will run a + function at regular intervals.""" + + def run(self): + while not self.finished.wait(self.interval): + self.function(*self.args, **self.kwargs) + + # Define a ChargePoint class inheriting from the OCPP 2.0.1 ChargePoint class. -class ChargePoint(cp): - # Define a handler for the BootNotification message. - @on('BootNotification') +class ChargePoint(ocpp.v201.ChargePoint): + # Message handlers to receive OCPP messages. + @on(Action.BootNotification) async def on_boot_notification(self, charging_station, reason, **kwargs): logging.info("Received BootNotification") # Create and return a BootNotification response with the current time, - # an interval of 10 seconds, and an accepted status. - return call_result.BootNotification( - current_time = datetime.now(timezone.utc).isoformat(), - interval=10, + # an interval of 60 seconds, and an accepted status. + return ocpp.v201.call_result.BootNotification( + current_time=datetime.now(timezone.utc).isoformat(), + interval=60, status=RegistrationStatusType.accepted ) - # Define a handler for the GetBaseReport message. - @on('GetBaseReport') - async def on_get_base_report(self, request_id, report_base, **kwargs): - try: - logging.info(f"Received GetBaseReport request with RequestId: {request_id} and ReportBase: {report_base}") - - # Create a mock response for demonstration purposes, indicating the report is accepted. - response = call_result.GetBaseReport( - status=GenericDeviceModelStatusType.accepted - ) - - logging.info(f"Sending GetBaseReport response: {response}") - return response - except Exception as e: - # Log any errors that occur while handling the GetBaseReport request. - logging.error(f"Error handling GetBaseReport request: {e}", exc_info=True) - # Return a rejected status in case of error. - return call_result.GetBaseReport( - status=GenericDeviceModelStatusType.rejected - ) + @on(Action.Heartbeat) + async def on_heartbeat(self, **kwargs): + logging.info("Received Heartbeat") + return ocpp.v201.call_result.Heartbeat(current_time=datetime.now(timezone.utc).isoformat()) + + @on(Action.StatusNotification) + async def on_status_notification(self, timestamp, evse_id, connector_id, connector_status, + **kwargs): + logging.info("Received StatusNotification") + return ocpp.v201.call_result.StatusNotification() + + @on(Action.Authorize) + async def on_authorize(self, id_token, **kwargs): + logging.info("Received Authorize") + return ocpp.v201.call_result.Authorize( + id_token_info={'status': AuthorizationStatusType.accepted}) + + @on(Action.TransactionEvent) + async def on_transaction_event(self, event_type, timestamp, trigger_reason, seq_no, + transaction_info, **kwargs): + match event_type: + case TransactionEventType.started: + logging.info("Received TransactionEvent Started") + return ocpp.v201.call_result.TransactionEvent( + id_token_info={'status': AuthorizationStatusType.accepted}) + case TransactionEventType.updated: + logging.info("Received TransactionEvent Updated") + return ocpp.v201.call_result.TransactionEvent( + total_cost=10) + case TransactionEventType.ended: + logging.info("Received TransactionEvent Ended") + return ocpp.v201.call_result.TransactionEvent() + + @on(Action.MeterValues) + async def on_meter_values(self, evse_id, meter_value, **kwargs): + logging.info("Received MeterValues") + return ocpp.v201.call_result.MeterValues() + + # Request handlers to emit OCPP messages. + async def send_clear_cache(self): + request = ocpp.v201.call.ClearCache() + response = await self.call(request) + + if response.status == ClearCacheStatusType.accepted: + logging.info("Cache clearing successful") + else: + logging.info("Cache clearing failed") + # Function to handle new WebSocket connections. async def on_connect(websocket, path): @@ -71,19 +111,21 @@ async def on_connect(websocket, path): # Start the ChargePoint instance to listen for incoming messages. await cp.start() + # Main function to start the WebSocket server. async def main(): # Create the WebSocket server and specify the handler for new connections. server = await websockets.serve( on_connect, '127.0.0.1', # Listen on loopback. - 9000, # Port number. + 9000, # Port number. subprotocols=['ocpp2.0', 'ocpp2.0.1'] # Specify OCPP 2.0.1 subprotocols. ) logging.info("WebSocket Server Started") # Wait for the server to close (runs indefinitely). await server.wait_closed() + # Entry point of the script. if __name__ == '__main__': # Run the main function to start the server.