The server will start listening for connections on port 9000.
+## Running the server with command sending
+
+You can also specify a command and a period duration with the --request and --period options respectively when running the server. The server will then send your chosen command to the connected client every period seconds.
+
+### GetBaseReport Command
+
+To run the server and send a GetBaseReport command every 5 seconds, use:
+
+```shell
+poetry run task server --command GetBaseReport --period 5
+```
+
+### ClearCache Command
+
+To run the server and send a ClearCache command every 5 seconds, use:
+
+```shell
+poetry run task server --command ClearCache --period 5
+```
+
+Please be mindful that these commands were examples according to the provided scenario, the available commands and their syntax might vary depending on the ocpp version and the implemented functionalities on your server.
+
## Overview of the Server Scripts
### Server.py
+import argparse
import asyncio
import logging
from datetime import datetime, timezone
AuthorizationStatusType,
ClearCacheStatusType,
RegistrationStatusType,
+ ReportBaseType,
TransactionEventType,
)
from websockets import ConnectionClosed
logging.info("Received %s", Action.MeterValues)
return ocpp.v201.call_result.MeterValues()
+ @on(Action.GetBaseReport)
+ async def on_get_base_report(
+ self, request_id: int, report_base: ReportBaseType, **kwargs
+ ):
+ logging.info("Received %s", Action.GetBaseReport)
+ return ocpp.v201.call_result.GetBaseReport(
+ id_token_info={"status": ReportBaseType.accepted}
+ )
+
# Request handlers to emit OCPP messages.
async def send_clear_cache(self):
request = ocpp.v201.call.ClearCache()
else:
logging.info("%s failed", Action.ClearCache)
+ async def send_get_base_report(self):
+ request = ocpp.v201.call.GetBaseReport(
+ reportBase=ReportBaseType.ConfigurationInventory
+ )
+ response = await self.call(request)
+
+ if response.status == ReportBaseType.accepted:
+ logging.info("%s successful", Action.GetBaseReport)
+ else:
+ logging.info("%s failed", Action.GetBaseReport)
+
+
+# Function to send OCPP command
+async def send_ocpp_command(cp, command_name, delay=None, period=None):
+ try:
+ match command_name:
+ case Action.ClearCache:
+ logging.info("%s Send:", Action.ClearCache)
+ await cp.send_clear_cache()
+ case Action.GetBaseReport:
+ logging.info("%s Send:", Action.GetBaseReport)
+ await cp.send_get_base_report()
+ except Exception:
+ logging.exception(
+ f"Not supported or Failure while processing command {command_name}"
+ )
+
+ if delay:
+ await asyncio.sleep(delay)
+
+ if period:
+ my_timer = RepeatTimer(
+ period, asyncio.create_task, [cp.send_ocpp_command(command_name)]
+ )
+ my_timer.start()
+
# Function to handle new WebSocket connections.
async def on_connect(websocket, path):
charge_point_id = path.strip("/")
cp = ChargePoint(charge_point_id, websocket)
+
ChargePoints.add(cp)
try:
await cp.start()
+
except ConnectionClosed:
logging.info("ChargePoint %s closed connection", cp.id)
ChargePoints.remove(cp)
# Main function to start the WebSocket server.
async def main():
+ # Define argument parser
+ parser = argparse.ArgumentParser(description="OCPP2 Charge Point Simulator")
+ parser.add_argument("--command", type=str, help="OCPP2 Command Name")
+ parser.add_argument("--delay", type=int, help="Delay in seconds")
+ parser.add_argument("--period", type=int, help="Period in seconds")
+
# Create the WebSocket server and specify the handler for new connections.
server = await websockets.serve(
on_connect,
subprotocols=["ocpp2.0", "ocpp2.0.1"], # Specify OCPP 2.0.1 subprotocols.
)
logging.info("WebSocket Server Started")
+
+ args = parser.parse_args()
+
+ if args.command:
+ for cp in ChargePoints:
+ asyncio.create_task(
+ send_ocpp_command(cp, args.command, args.delay, args.period)
+ )
+
# Wait for the server to close (runs indefinitely).
await server.wait_closed()