Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 96 additions & 34 deletions custom_components/ocpp/chargepoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
CONF_DEFAULT_AUTH_STATUS,
CONF_ID_TAG,
CONF_MONITORED_VARIABLES,
CONF_MONITORED_VARIABLES_AUTOCONFIG,
CONF_CPIDS,
CONFIG,
DEFAULT_ENERGY_UNIT,
Expand Down Expand Up @@ -182,6 +183,8 @@ def __init__(
self.triggered_boot_notification = False
self.received_boot_notification = False
self.post_connect_success = False
self._post_connect_lock = asyncio.Lock()
self._post_connect_task = None
self.tasks = None
self._charger_reports_session_energy = False
self._metrics = defaultdict(lambda: Metric(None, None))
Expand Down Expand Up @@ -220,41 +223,97 @@ async def fetch_supported_features(self):
self._metrics[cdet.features.value].value = self._attr_supported_features
_LOGGER.debug("Feature profiles returned: %s", self._attr_supported_features)

async def post_connect(self):
def schedule_post_connect(self, reason: str):
"""Schedule post-connect once per active connection."""
if self.post_connect_success:
_LOGGER.debug(
"%s: skipping post_connect from %s because setup already completed",
self.id,
reason,
)
return
if self._post_connect_task and not self._post_connect_task.done():
_LOGGER.debug(
"%s: post_connect already running; ignoring duplicate trigger from %s",
self.id,
reason,
)
return
_LOGGER.debug("%s: scheduling post_connect from %s", self.id, reason)
self._post_connect_task = self.hass.async_create_task(self.post_connect(reason))

async def post_connect(self, reason: str = "unknown"):
"""Logic to be executed right after a charger connects."""
async with self._post_connect_lock:
if self.post_connect_success:
_LOGGER.debug(
"%s: skipping post_connect from %s because setup already completed",
self.id,
reason,
)
return

try:
self.status = STATE_OK
await self.fetch_supported_features()
num_connectors: int = await self.get_number_of_connectors()
self._metrics[cdet.connectors.value].value = num_connectors
await self.get_heartbeat_interval()

accepted_measurands: str = await self.get_supported_measurands()
updated_entry = {**self.entry.data}
for i in range(len(updated_entry[CONF_CPIDS])):
if self.id in updated_entry[CONF_CPIDS][i]:
updated_entry[CONF_CPIDS][i][self.id][CONF_MONITORED_VARIABLES] = (
accepted_measurands
try:
self.status = STATE_OK
_LOGGER.debug(
"%s: post_connect starting from %s (autoconfig=%s, skip_schema_validation=%s)",
self.id,
reason,
self.settings.monitored_variables_autoconfig,
self.settings.skip_schema_validation,
)

await self.fetch_supported_features()
num_connectors: int = await self.get_number_of_connectors()
self._metrics[cdet.connectors.value].value = num_connectors
await self.get_heartbeat_interval()

accepted_measurands: str = await self.get_supported_measurands()
updated_entry = {**self.entry.data}
should_update_entry = False
for i in range(len(updated_entry[CONF_CPIDS])):
if self.id in updated_entry[CONF_CPIDS][i]:
charger_entry = updated_entry[CONF_CPIDS][i][self.id]
if (
accepted_measurands
and charger_entry[CONF_MONITORED_VARIABLES]
!= accepted_measurands
):
charger_entry[CONF_MONITORED_VARIABLES] = (
accepted_measurands
)
should_update_entry = True
if accepted_measurands and charger_entry.get(
CONF_MONITORED_VARIABLES_AUTOCONFIG, False
):
charger_entry[CONF_MONITORED_VARIABLES_AUTOCONFIG] = False
should_update_entry = True
break
if should_update_entry:
# if an entry differs this will unload/reload and stop/restart
# the central system/websocket
self.hass.config_entries.async_update_entry(
self.entry, data=updated_entry
)
break
# if an entry differs this will unload/reload and stop/restart the central system/websocket
self.hass.config_entries.async_update_entry(self.entry, data=updated_entry)

await self.set_standard_configuration()

self.post_connect_success = True
_LOGGER.debug(f"'{self.id}' post connection setup completed successfully")

# nice to have, but not needed for integration to function
# and can cause issues with some chargers
await self.set_availability()
if prof.REM in self._attr_supported_features:
if self.received_boot_notification is False:
await self.trigger_boot_notification()
await self.trigger_status_notification()
except NotImplementedError as e:
_LOGGER.error("Configuration of the charger failed: %s", e)

await self.set_standard_configuration()

self.post_connect_success = True
_LOGGER.debug(f"'{self.id}' post connection setup completed successfully")

# nice to have, but not needed for integration to function
# and can cause issues with some chargers
await self.set_availability()
if prof.REM in self._attr_supported_features:
if self.received_boot_notification is False:
await self.trigger_boot_notification()
await self.trigger_status_notification()
except NotImplementedError as e:
_LOGGER.error("Configuration of the charger failed: %s", e)
finally:
current_task = asyncio.current_task()
if self._post_connect_task is current_task:
self._post_connect_task = None

async def trigger_boot_notification(self):
"""Trigger a boot notification."""
Expand Down Expand Up @@ -361,7 +420,7 @@ async def monitor_connection(self):
# after 10s to allow for when a boot notification has not been received
await asyncio.sleep(10)
if not self.post_connect_success:
self.hass.async_create_task(self.post_connect())
self.schedule_post_connect("monitor backstop")
while connection.state is State.OPEN:
try:
await asyncio.sleep(self.cs_settings.websocket_ping_interval)
Expand Down Expand Up @@ -451,6 +510,9 @@ async def run(self, tasks):
async def stop(self):
"""Close connection and cancel ongoing tasks."""
self.status = STATE_UNAVAILABLE
if self._post_connect_task and not self._post_connect_task.done():
self._post_connect_task.cancel()
self._post_connect_task = None
if self._connection.state is State.OPEN:
_LOGGER.debug(f"Closing websocket to '{self.id}'")
await self._connection.close()
Expand Down Expand Up @@ -493,7 +555,7 @@ def _register_boot_notification(self):
if self.triggered_boot_notification is False:
self.hass.async_create_task(self.notify_ha(f"Charger {self.id} rebooted"))
if not self.post_connect_success:
self.hass.async_create_task(self.post_connect())
self.schedule_post_connect("boot notification")

async def update(self, cpid: str):
"""Update sensors values in HA."""
Expand Down
98 changes: 62 additions & 36 deletions custom_components/ocpp/ocppv16.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

from datetime import datetime, timedelta, UTC
import logging
import asyncio

import time

Expand Down Expand Up @@ -93,7 +92,44 @@ async def get_number_of_connectors(self):

async def get_heartbeat_interval(self):
"""Retrieve heartbeat interval from the charger and store it."""
await self.get_configuration(ckey.heartbeat_interval.value)
try:
await self.get_configuration(ckey.heartbeat_interval.value)
except TimeoutError:
_LOGGER.warning(
"%s: timed out waiting for GetConfiguration(%s); continuing without the charger-reported heartbeat interval",
self.id,
ckey.heartbeat_interval.value,
)

async def _apply_measurands_configuration(
self, key: str, measurands: str, source: str
) -> str:
"""Apply the negotiated measurands without querying them again first."""
req = call.ChangeConfiguration(key=key, value=measurands)
resp = await self.call(req)
_LOGGER.debug(
"%s: ChangeConfiguration(%s) from %s returned %s",
self.id,
key,
source,
resp.status,
)
if resp.status == ConfigurationStatus.reboot_required:
self._requires_reboot = True
await self.notify_ha(
f"A reboot is required to apply negotiated measurands from {source}"
)
elif resp.status not in [
ConfigurationStatus.accepted,
ConfigurationStatus.reboot_required,
]:
_LOGGER.warning(
"%s: failed to apply negotiated measurands from %s with status %s",
self.id,
source,
resp.status,
)
return measurands

async def get_supported_measurands(self) -> str:
"""Get comma-separated list of measurands supported by the charger."""
Expand All @@ -111,17 +147,22 @@ async def get_supported_measurands(self) -> str:

for measurand in all_measurands.split(","):
_LOGGER.debug(f"'{self.id}' trying measurand: '{measurand}'")
try:
req = call.ChangeConfiguration(key=key, value=measurand)
resp = await self.call(req)
if resp.status in cfg_ok:
_LOGGER.debug(f"'{self.id}' adding measurand: '{measurand}'")
accepted_measurands.append(measurand)
except asyncio.TimeoutError:
_LOGGER.warning(f"Timeout while configuring measurand {measurand} for {self.id}")
continue
req = call.ChangeConfiguration(key=key, value=measurand)
resp = await self.call(req)
if resp.status in cfg_ok:
_LOGGER.debug(f"'{self.id}' adding measurand: '{measurand}'")
accepted_measurands.append(measurand)

accepted_measurands = ",".join(accepted_measurands)
if len(accepted_measurands) > 0:
_LOGGER.debug(
"%s: autodetected measurands=%s",
self.id,
accepted_measurands,
)
return await self._apply_measurands_configuration(
key, accepted_measurands, "autodetect"
)
else:
accepted_measurands = all_measurands

Expand All @@ -132,32 +173,17 @@ async def get_supported_measurands(self) -> str:
# as done when calling self.configure, the server avoids charger reboot.
# Corresponding issue: https://github.com/lbbrhzn/ocpp/issues/1275
if len(accepted_measurands) > 0:
try:
req = call.ChangeConfiguration(key=key, value=accepted_measurands)
resp = await self.call(req)
_LOGGER.debug(
f"'{self.id}' measurands set manually to {accepted_measurands}"
)
except asyncio.TimeoutError:
_LOGGER.warning(f"Timeout while setting measurands for {self.id}")

try:
chgr_measurands = await self.get_configuration(key)
except asyncio.TimeoutError:
_LOGGER.warning(f"Timeout while getting configuration for {self.id}")
chgr_measurands = ""

if len(accepted_measurands) > 0:
_LOGGER.debug(f"'{self.id}' allowed measurands: '{accepted_measurands}'")
try:
await self.configure(key, accepted_measurands)
except asyncio.TimeoutError:
_LOGGER.warning(f"Timeout while configuring measurands for {self.id}")
else:
_LOGGER.debug(f"'{self.id}' measurands not configurable by integration")
_LOGGER.debug(f"'{self.id}' allowed measurands: '{chgr_measurands}'")
_LOGGER.debug(
f"'{self.id}' measurands set manually to {accepted_measurands}"
)
return await self._apply_measurands_configuration(
key, accepted_measurands, "manual configuration"
)

return accepted_measurands
chgr_measurands = await self.get_configuration(key)
_LOGGER.debug(f"'{self.id}' measurands not configurable by integration")
_LOGGER.debug(f"'{self.id}' allowed measurands: '{chgr_measurands}'")
return chgr_measurands or ""

async def set_standard_configuration(self):
"""Send configuration values to the charger."""
Expand Down
83 changes: 83 additions & 0 deletions tests/test_post_connect.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
"""Targeted tests for post-connect stability fixes."""

import asyncio
from types import SimpleNamespace
from unittest.mock import AsyncMock, patch

from custom_components.ocpp.chargepoint import ChargePoint as BaseChargePoint
from custom_components.ocpp.enums import ConfigurationKey
from custom_components.ocpp.ocppv16 import ChargePoint as ChargePointV16
from ocpp.v16.enums import ConfigurationStatus


async def test_schedule_post_connect_ignores_duplicate_triggers(hass):
"""Only schedule one post-connect task per active connection."""
cp = BaseChargePoint.__new__(BaseChargePoint)
cp.id = "test_cpid"
cp.hass = hass
cp.post_connect_success = False
cp._post_connect_task = None

started = asyncio.Event()
release = asyncio.Event()

async def fake_post_connect(reason: str):
started.set()
await release.wait()

cp.post_connect = fake_post_connect

with patch.object(hass, "async_create_task", wraps=hass.async_create_task) as create:
cp.schedule_post_connect("boot notification")
cp.schedule_post_connect("monitor backstop")
await asyncio.wait_for(started.wait(), timeout=1)
assert create.call_count == 1

release.set()
await asyncio.gather(cp._post_connect_task, return_exceptions=False)


async def test_get_heartbeat_interval_timeout_is_non_fatal():
"""Continue post-connect when HeartbeatInterval cannot be queried."""
cp = ChargePointV16.__new__(ChargePointV16)
cp.id = "test_cpid"
cp.get_configuration = AsyncMock(side_effect=TimeoutError)

await cp.get_heartbeat_interval()

cp.get_configuration.assert_awaited_once_with(
ConfigurationKey.heartbeat_interval.value
)


async def test_get_supported_measurands_autodetect_skips_follow_up_query():
"""Avoid querying MeterValuesSampledData again after autodetection."""
cp = ChargePointV16.__new__(ChargePointV16)
cp.id = "test_cpid"
cp.settings = SimpleNamespace(
monitored_variables="Current.Import,Voltage",
monitored_variables_autoconfig=True,
)
cp._requires_reboot = False
cp.notify_ha = AsyncMock()
cp.call = AsyncMock(
side_effect=[
SimpleNamespace(status=ConfigurationStatus.accepted),
SimpleNamespace(status=ConfigurationStatus.accepted),
SimpleNamespace(status=ConfigurationStatus.accepted),
]
)
cp.get_configuration = AsyncMock(
side_effect=AssertionError("get_configuration should not be called")
)

result = await cp.get_supported_measurands()

assert result == "Current.Import,Voltage"
assert cp.call.await_count == 3
assert [item.args[0].value for item in cp.call.await_args_list] == [
"Current.Import",
"Voltage",
"Current.Import,Voltage",
]
cp.get_configuration.assert_not_awaited()