# Define a ChargePoint class inheriting from the OCPP 2.0.1 ChargePoint class.
class ChargePoint(ocpp.v201.ChargePoint):
+ _command_timer: RepeatTimer
+
# Message handlers to receive OCPP messages.
@on(Action.BootNotification)
async def on_boot_notification(self, charging_station, reason, **kwargs):
)
# Request handlers to emit OCPP messages.
- async def send_clear_cache(self):
+ async def _send_clear_cache(self):
request = ocpp.v201.call.ClearCache()
response = await self.call(request)
else:
logging.info("%s failed", Action.ClearCache)
- async def send_get_base_report(self):
+ async def _send_get_base_report(self):
request = ocpp.v201.call.GetBaseReport(
request_id=1, report_base=ReportBaseType.full_inventory
)
else:
logging.info("%s failed", Action.GetBaseReport)
- async def send_command(self, command_name: Action):
+ async def _send_command(self, command_name: Action):
logging.debug("Sending OCPP command: %s", command_name)
match command_name:
case Action.ClearCache:
- await self.send_clear_cache()
+ await self._send_clear_cache()
case Action.GetBaseReport:
- await self.send_get_base_report()
+ await self._send_get_base_report()
case _:
logging.info(f"Not supported command {command_name}")
-
-# Function to send OCPP command
-async def send_ocpp_command(cp, command_name, delay=None, period=None):
- try:
- if delay:
- await asyncio.sleep(delay)
- cp.send_command(command_name)
- if period:
- command_timer = RepeatTimer(
- period,
- cp.send_command,
- [command_name],
- )
- command_timer.start()
- except ConnectionClosed:
- logging.info("ChargePoint %s closed connection", cp.id)
- ChargePoints.remove(cp)
+ async def send_command(self, command_name: Action, delay=None, period=None):
+ if not delay and not period:
+ raise ValueError("Either delay or period must be set")
+ try:
+ if delay and delay > 0:
+ await asyncio.sleep(delay)
+ await self._send_command(command_name)
+ if period and period > 0 and not self._command_timer:
+ self._command_timer = RepeatTimer(
+ period,
+ self._send_command,
+ [command_name],
+ )
+ self._command_timer.start()
+ except ConnectionClosed:
+ self.handle_connection_closed()
+
+ def handle_connection_closed(self):
+ logging.info("ChargePoint %s closed connection", self.id)
+ if self._command_timer:
+ self._command_timer.cancel()
+ ChargePoints.remove(self)
logging.debug("Connected ChargePoint(s): %d", len(ChargePoints))
try:
await cp.start()
except ConnectionClosed:
- logging.info("ChargePoint %s closed connection", cp.id)
- ChargePoints.remove(cp)
- logging.debug("Connected ChargePoint(s): %d", len(ChargePoints))
+ cp.handle_connection_closed()
# Main function to start the WebSocket server.
if args.command:
for cp in ChargePoints:
await asyncio.create_task(
- send_ocpp_command(cp, args.command, args.delay, args.period)
+ cp.send_command(cp, args.command, args.delay, args.period)
)
# Wait for the server to close (runs indefinitely).