return ocpp.v201.call_result.MeterValues()
@on(Action.GetBaseReport)
- async def on_get_base_report(self, request_id: int, report_base: ReportBaseType, **kwargs):
+ async def on_get_base_report(
+ self, request_id: int, report_base: ReportBaseType, **kwargs
+ ):
logging.info("Received %s", Action.GetBaseReport)
return ocpp.v201.call_result.GetBaseReport(status="Accepted")
) # Use correct ReportBaseType
try:
response = await self.call(request)
- logging.info("Send GetBaseReport")
+ logging.info("Send %s", Action.GetBaseReport)
if (
response.status == "Accepted"
# If period is not None, send command repeatedly with period interval
if period:
+
async def send_command_repeatedly():
while True:
- command_name = await charge_point.receive_command()
+ command_name = await cp.receive_command()
try:
match command_name:
- case 'ClearCache':
+ case Action.ClearCache:
logging.info("ClearCache parser working")
- await charge_point.send_clear_cache()
- case 'GetBaseReport':
+ await cp.send_clear_cache()
+ case Action.GetBaseReport:
logging.info("GetBaseReport parser working")
- await charge_point.send_get_base_report()
+ await cp.send_get_base_report()
case _:
logging.warning(f"Unsupported command {command_name}")
- except Exception as e:
- logging.exception(f"Failure while processing command {command_name}")
+ except Exception:
+ logging.exception(
+ f"Failure while processing command {command_name}"
+ )
finally:
await asyncio.sleep(period)
await cp.start()
# Check if request argument is specified
if args.request:
- asyncio.create_task(send_ocpp_command(cp, args.request, args.delay, args.period))
+ asyncio.create_task(
+ send_ocpp_command(cp, args.request, args.delay, args.period)
+ )
except ConnectionClosed:
logging.info("ChargePoint %s closed connection", cp.id)