From 7a63a448818a37c85ec17712c65599b42ca85f91 Mon Sep 17 00:00:00 2001 From: Jean-Pierre Lortie Date: Thu, 26 Mar 2026 12:31:34 -0400 Subject: [PATCH] Fix post-connect stability for OCPP 1.6 chargers --- custom_components/ocpp/chargepoint.py | 130 +++++++++++++++++++------- custom_components/ocpp/ocppv16.py | 98 ++++++++++++------- tests/test_post_connect.py | 83 ++++++++++++++++ 3 files changed, 241 insertions(+), 70 deletions(-) create mode 100644 tests/test_post_connect.py diff --git a/custom_components/ocpp/chargepoint.py b/custom_components/ocpp/chargepoint.py index 5de7e886..62f77fe1 100644 --- a/custom_components/ocpp/chargepoint.py +++ b/custom_components/ocpp/chargepoint.py @@ -52,6 +52,7 @@ CONF_DEFAULT_AUTH_STATUS, CONF_ID_TAG, CONF_MONITORED_VARIABLES, + CONF_MONITORED_VARIABLES_AUTOCONFIG, CONF_CPIDS, CONFIG, DEFAULT_ENERGY_UNIT, @@ -182,6 +183,8 @@ def __init__( self.triggered_boot_notification = False self.received_boot_notification = False self.post_connect_success = False + self._post_connect_lock = asyncio.Lock() + self._post_connect_task = None self.tasks = None self._charger_reports_session_energy = False self._metrics = defaultdict(lambda: Metric(None, None)) @@ -220,41 +223,97 @@ async def fetch_supported_features(self): self._metrics[cdet.features.value].value = self._attr_supported_features _LOGGER.debug("Feature profiles returned: %s", self._attr_supported_features) - async def post_connect(self): + def schedule_post_connect(self, reason: str): + """Schedule post-connect once per active connection.""" + if self.post_connect_success: + _LOGGER.debug( + "%s: skipping post_connect from %s because setup already completed", + self.id, + reason, + ) + return + if self._post_connect_task and not self._post_connect_task.done(): + _LOGGER.debug( + "%s: post_connect already running; ignoring duplicate trigger from %s", + self.id, + reason, + ) + return + _LOGGER.debug("%s: scheduling post_connect from %s", self.id, reason) + self._post_connect_task = self.hass.async_create_task(self.post_connect(reason)) + + async def post_connect(self, reason: str = "unknown"): """Logic to be executed right after a charger connects.""" + async with self._post_connect_lock: + if self.post_connect_success: + _LOGGER.debug( + "%s: skipping post_connect from %s because setup already completed", + self.id, + reason, + ) + return - try: - self.status = STATE_OK - await self.fetch_supported_features() - num_connectors: int = await self.get_number_of_connectors() - self._metrics[cdet.connectors.value].value = num_connectors - await self.get_heartbeat_interval() - - accepted_measurands: str = await self.get_supported_measurands() - updated_entry = {**self.entry.data} - for i in range(len(updated_entry[CONF_CPIDS])): - if self.id in updated_entry[CONF_CPIDS][i]: - updated_entry[CONF_CPIDS][i][self.id][CONF_MONITORED_VARIABLES] = ( - accepted_measurands + try: + self.status = STATE_OK + _LOGGER.debug( + "%s: post_connect starting from %s (autoconfig=%s, skip_schema_validation=%s)", + self.id, + reason, + self.settings.monitored_variables_autoconfig, + self.settings.skip_schema_validation, + ) + + await self.fetch_supported_features() + num_connectors: int = await self.get_number_of_connectors() + self._metrics[cdet.connectors.value].value = num_connectors + await self.get_heartbeat_interval() + + accepted_measurands: str = await self.get_supported_measurands() + updated_entry = {**self.entry.data} + should_update_entry = False + for i in range(len(updated_entry[CONF_CPIDS])): + if self.id in updated_entry[CONF_CPIDS][i]: + charger_entry = updated_entry[CONF_CPIDS][i][self.id] + if ( + accepted_measurands + and charger_entry[CONF_MONITORED_VARIABLES] + != accepted_measurands + ): + charger_entry[CONF_MONITORED_VARIABLES] = ( + accepted_measurands + ) + should_update_entry = True + if accepted_measurands and charger_entry.get( + CONF_MONITORED_VARIABLES_AUTOCONFIG, False + ): + charger_entry[CONF_MONITORED_VARIABLES_AUTOCONFIG] = False + should_update_entry = True + break + if should_update_entry: + # if an entry differs this will unload/reload and stop/restart + # the central system/websocket + self.hass.config_entries.async_update_entry( + self.entry, data=updated_entry ) - break - # if an entry differs this will unload/reload and stop/restart the central system/websocket - self.hass.config_entries.async_update_entry(self.entry, data=updated_entry) - - await self.set_standard_configuration() - - self.post_connect_success = True - _LOGGER.debug(f"'{self.id}' post connection setup completed successfully") - - # nice to have, but not needed for integration to function - # and can cause issues with some chargers - await self.set_availability() - if prof.REM in self._attr_supported_features: - if self.received_boot_notification is False: - await self.trigger_boot_notification() - await self.trigger_status_notification() - except NotImplementedError as e: - _LOGGER.error("Configuration of the charger failed: %s", e) + + await self.set_standard_configuration() + + self.post_connect_success = True + _LOGGER.debug(f"'{self.id}' post connection setup completed successfully") + + # nice to have, but not needed for integration to function + # and can cause issues with some chargers + await self.set_availability() + if prof.REM in self._attr_supported_features: + if self.received_boot_notification is False: + await self.trigger_boot_notification() + await self.trigger_status_notification() + except NotImplementedError as e: + _LOGGER.error("Configuration of the charger failed: %s", e) + finally: + current_task = asyncio.current_task() + if self._post_connect_task is current_task: + self._post_connect_task = None async def trigger_boot_notification(self): """Trigger a boot notification.""" @@ -361,7 +420,7 @@ async def monitor_connection(self): # after 10s to allow for when a boot notification has not been received await asyncio.sleep(10) if not self.post_connect_success: - self.hass.async_create_task(self.post_connect()) + self.schedule_post_connect("monitor backstop") while connection.state is State.OPEN: try: await asyncio.sleep(self.cs_settings.websocket_ping_interval) @@ -451,6 +510,9 @@ async def run(self, tasks): async def stop(self): """Close connection and cancel ongoing tasks.""" self.status = STATE_UNAVAILABLE + if self._post_connect_task and not self._post_connect_task.done(): + self._post_connect_task.cancel() + self._post_connect_task = None if self._connection.state is State.OPEN: _LOGGER.debug(f"Closing websocket to '{self.id}'") await self._connection.close() @@ -493,7 +555,7 @@ def _register_boot_notification(self): if self.triggered_boot_notification is False: self.hass.async_create_task(self.notify_ha(f"Charger {self.id} rebooted")) if not self.post_connect_success: - self.hass.async_create_task(self.post_connect()) + self.schedule_post_connect("boot notification") async def update(self, cpid: str): """Update sensors values in HA.""" diff --git a/custom_components/ocpp/ocppv16.py b/custom_components/ocpp/ocppv16.py index a62da835..5224ecdd 100644 --- a/custom_components/ocpp/ocppv16.py +++ b/custom_components/ocpp/ocppv16.py @@ -2,7 +2,6 @@ from datetime import datetime, timedelta, UTC import logging -import asyncio import time @@ -93,7 +92,44 @@ async def get_number_of_connectors(self): async def get_heartbeat_interval(self): """Retrieve heartbeat interval from the charger and store it.""" - await self.get_configuration(ckey.heartbeat_interval.value) + try: + await self.get_configuration(ckey.heartbeat_interval.value) + except TimeoutError: + _LOGGER.warning( + "%s: timed out waiting for GetConfiguration(%s); continuing without the charger-reported heartbeat interval", + self.id, + ckey.heartbeat_interval.value, + ) + + async def _apply_measurands_configuration( + self, key: str, measurands: str, source: str + ) -> str: + """Apply the negotiated measurands without querying them again first.""" + req = call.ChangeConfiguration(key=key, value=measurands) + resp = await self.call(req) + _LOGGER.debug( + "%s: ChangeConfiguration(%s) from %s returned %s", + self.id, + key, + source, + resp.status, + ) + if resp.status == ConfigurationStatus.reboot_required: + self._requires_reboot = True + await self.notify_ha( + f"A reboot is required to apply negotiated measurands from {source}" + ) + elif resp.status not in [ + ConfigurationStatus.accepted, + ConfigurationStatus.reboot_required, + ]: + _LOGGER.warning( + "%s: failed to apply negotiated measurands from %s with status %s", + self.id, + source, + resp.status, + ) + return measurands async def get_supported_measurands(self) -> str: """Get comma-separated list of measurands supported by the charger.""" @@ -111,17 +147,22 @@ async def get_supported_measurands(self) -> str: for measurand in all_measurands.split(","): _LOGGER.debug(f"'{self.id}' trying measurand: '{measurand}'") - try: - req = call.ChangeConfiguration(key=key, value=measurand) - resp = await self.call(req) - if resp.status in cfg_ok: - _LOGGER.debug(f"'{self.id}' adding measurand: '{measurand}'") - accepted_measurands.append(measurand) - except asyncio.TimeoutError: - _LOGGER.warning(f"Timeout while configuring measurand {measurand} for {self.id}") - continue + req = call.ChangeConfiguration(key=key, value=measurand) + resp = await self.call(req) + if resp.status in cfg_ok: + _LOGGER.debug(f"'{self.id}' adding measurand: '{measurand}'") + accepted_measurands.append(measurand) accepted_measurands = ",".join(accepted_measurands) + if len(accepted_measurands) > 0: + _LOGGER.debug( + "%s: autodetected measurands=%s", + self.id, + accepted_measurands, + ) + return await self._apply_measurands_configuration( + key, accepted_measurands, "autodetect" + ) else: accepted_measurands = all_measurands @@ -132,32 +173,17 @@ async def get_supported_measurands(self) -> str: # as done when calling self.configure, the server avoids charger reboot. # Corresponding issue: https://github.com/lbbrhzn/ocpp/issues/1275 if len(accepted_measurands) > 0: - try: - req = call.ChangeConfiguration(key=key, value=accepted_measurands) - resp = await self.call(req) - _LOGGER.debug( - f"'{self.id}' measurands set manually to {accepted_measurands}" - ) - except asyncio.TimeoutError: - _LOGGER.warning(f"Timeout while setting measurands for {self.id}") - - try: - chgr_measurands = await self.get_configuration(key) - except asyncio.TimeoutError: - _LOGGER.warning(f"Timeout while getting configuration for {self.id}") - chgr_measurands = "" - - if len(accepted_measurands) > 0: - _LOGGER.debug(f"'{self.id}' allowed measurands: '{accepted_measurands}'") - try: - await self.configure(key, accepted_measurands) - except asyncio.TimeoutError: - _LOGGER.warning(f"Timeout while configuring measurands for {self.id}") - else: - _LOGGER.debug(f"'{self.id}' measurands not configurable by integration") - _LOGGER.debug(f"'{self.id}' allowed measurands: '{chgr_measurands}'") + _LOGGER.debug( + f"'{self.id}' measurands set manually to {accepted_measurands}" + ) + return await self._apply_measurands_configuration( + key, accepted_measurands, "manual configuration" + ) - return accepted_measurands + chgr_measurands = await self.get_configuration(key) + _LOGGER.debug(f"'{self.id}' measurands not configurable by integration") + _LOGGER.debug(f"'{self.id}' allowed measurands: '{chgr_measurands}'") + return chgr_measurands or "" async def set_standard_configuration(self): """Send configuration values to the charger.""" diff --git a/tests/test_post_connect.py b/tests/test_post_connect.py new file mode 100644 index 00000000..16974db5 --- /dev/null +++ b/tests/test_post_connect.py @@ -0,0 +1,83 @@ +"""Targeted tests for post-connect stability fixes.""" + +import asyncio +from types import SimpleNamespace +from unittest.mock import AsyncMock, patch + +from custom_components.ocpp.chargepoint import ChargePoint as BaseChargePoint +from custom_components.ocpp.enums import ConfigurationKey +from custom_components.ocpp.ocppv16 import ChargePoint as ChargePointV16 +from ocpp.v16.enums import ConfigurationStatus + + +async def test_schedule_post_connect_ignores_duplicate_triggers(hass): + """Only schedule one post-connect task per active connection.""" + cp = BaseChargePoint.__new__(BaseChargePoint) + cp.id = "test_cpid" + cp.hass = hass + cp.post_connect_success = False + cp._post_connect_task = None + + started = asyncio.Event() + release = asyncio.Event() + + async def fake_post_connect(reason: str): + started.set() + await release.wait() + + cp.post_connect = fake_post_connect + + with patch.object(hass, "async_create_task", wraps=hass.async_create_task) as create: + cp.schedule_post_connect("boot notification") + cp.schedule_post_connect("monitor backstop") + await asyncio.wait_for(started.wait(), timeout=1) + assert create.call_count == 1 + + release.set() + await asyncio.gather(cp._post_connect_task, return_exceptions=False) + + +async def test_get_heartbeat_interval_timeout_is_non_fatal(): + """Continue post-connect when HeartbeatInterval cannot be queried.""" + cp = ChargePointV16.__new__(ChargePointV16) + cp.id = "test_cpid" + cp.get_configuration = AsyncMock(side_effect=TimeoutError) + + await cp.get_heartbeat_interval() + + cp.get_configuration.assert_awaited_once_with( + ConfigurationKey.heartbeat_interval.value + ) + + +async def test_get_supported_measurands_autodetect_skips_follow_up_query(): + """Avoid querying MeterValuesSampledData again after autodetection.""" + cp = ChargePointV16.__new__(ChargePointV16) + cp.id = "test_cpid" + cp.settings = SimpleNamespace( + monitored_variables="Current.Import,Voltage", + monitored_variables_autoconfig=True, + ) + cp._requires_reboot = False + cp.notify_ha = AsyncMock() + cp.call = AsyncMock( + side_effect=[ + SimpleNamespace(status=ConfigurationStatus.accepted), + SimpleNamespace(status=ConfigurationStatus.accepted), + SimpleNamespace(status=ConfigurationStatus.accepted), + ] + ) + cp.get_configuration = AsyncMock( + side_effect=AssertionError("get_configuration should not be called") + ) + + result = await cp.get_supported_measurands() + + assert result == "Current.Import,Voltage" + assert cp.call.await_count == 3 + assert [item.args[0].value for item in cp.call.await_args_list] == [ + "Current.Import", + "Voltage", + "Current.Import,Voltage", + ] + cp.get_configuration.assert_not_awaited()