From 61d5ef4a7acd5928257744c70cca7b207f5ad898 Mon Sep 17 00:00:00 2001 From: Steven Engelbert Date: Fri, 21 Mar 2025 13:55:13 -0400 Subject: [PATCH 1/4] Implement Streams as entities, plug that reality into AmpliPiCoordinator and models.py Move supported features to the parent class Fix errors caused by moving supported_features property to parent class Create helper functions to handle the rca-source relationship fill stream extra_attributes with potentially useful data Move helper functions from class to global context Merge AmpliPi client object with AmpliPiCoordinator to get even faster polling --- custom_components/amplipi/__init__.py | 20 ++-- custom_components/amplipi/coordinator.py | 122 ++++++++++++++++++---- custom_components/amplipi/media_player.py | 69 ++++++------ 3 files changed, 151 insertions(+), 60 deletions(-) diff --git a/custom_components/amplipi/__init__.py b/custom_components/amplipi/__init__.py index d6a6219..e874f29 100644 --- a/custom_components/amplipi/__init__.py +++ b/custom_components/amplipi/__init__.py @@ -5,21 +5,27 @@ from homeassistant.const import CONF_HOST, CONF_PORT, CONF_NAME, CONF_ID from homeassistant.core import HomeAssistant from homeassistant.helpers.aiohttp_client import async_get_clientsession -from pyamplipi.amplipi import AmpliPi +import logging +from .coordinator import AmpliPiCoordinator from .const import DOMAIN, AMPLIPI_OBJECT, CONF_VENDOR, CONF_VERSION, CONF_WEBAPP, CONF_API_PATH PLATFORMS = ["media_player"] +_LOGGER = logging.getLogger(__name__) async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: - - hass.data.setdefault(DOMAIN, {})[entry.entry_id] = { - AMPLIPI_OBJECT: AmpliPi( - f'http://{entry.data[CONF_HOST]}:{entry.data[CONF_PORT]}/api/', - 10, + coordinator = AmpliPiCoordinator( + hass=hass, + config_entry=entry, + logger=_LOGGER, + endpoint=f'http://{entry.data[CONF_HOST]}:{entry.data[CONF_PORT]}/api/', + timeout=10, http_session=async_get_clientsession(hass) - ), + ) + + hass.data.setdefault(DOMAIN, {})[entry.entry_id] = { + AMPLIPI_OBJECT: coordinator, CONF_VENDOR: entry.data[CONF_VENDOR], CONF_NAME: entry.data[CONF_NAME], CONF_HOST: entry.data[CONF_HOST], diff --git a/custom_components/amplipi/coordinator.py b/custom_components/amplipi/coordinator.py index 9f50589..1fe0f59 100644 --- a/custom_components/amplipi/coordinator.py +++ b/custom_components/amplipi/coordinator.py @@ -3,13 +3,19 @@ Used to synchronize the current AmpliPi state with all of the corresponding HA Entities """ from datetime import timedelta +from typing import Optional, Union + from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed -from .models import Status, Source, Zone, Group, Stream from homeassistant.helpers.entity_registry import async_get as async_get_entity_registry + +from pyamplipi.amplipi import AmpliPi +from pyamplipi.models import SourceUpdate, ZoneUpdate, MultiZoneUpdate, GroupUpdate, PlayMedia, Announcement, Status as PyStatus, Source as PySource, Stream as PyStream, Group as PyGroup, Zone as PyZone, Status as PyStatus + +from .models import Status, Source, Zone, Group, Stream from .const import DOMAIN -class AmpliPiCoordinator(DataUpdateCoordinator): - def __init__(self, hass, logger, config_entry, api): +class AmpliPiCoordinator(DataUpdateCoordinator, AmpliPi): + def __init__(self, hass, logger, config_entry, endpoint, timeout, http_session): super().__init__( hass, logger, @@ -18,7 +24,13 @@ def __init__(self, hass, logger, config_entry, api): update_interval=timedelta(seconds=2), always_update=True ) - self.api = api + + AmpliPi.__init__( + self, + endpoint=endpoint, + timeout=timeout, + http_session=http_session + ) async def get_friendly_name(self, entity_id): """Look up entity in hass.states and get the friendly name""" @@ -36,23 +48,31 @@ async def get_entity_id_from_unique_id(self, unique_id: str): async def _async_update_data(self) -> Status: """Fetch data from API endpoint and pre-process into lookup tables.""" + return await self.get_status() + - async def build_entity(entity, kind: str, cls, original_name: str): - unique_id = f"{DOMAIN}_{kind}_{entity['id']}" - entity_id = await self.get_entity_id_from_unique_id(unique_id) or f"media_player.{unique_id}" - friendly_name = await self.get_friendly_name(entity_id) or original_name - return cls( - **entity, - original_name=original_name, - unique_id=unique_id, - entity_id=entity_id, - friendly_name=friendly_name, - ) + async def set_data(self, state: PyStatus) -> Status: + """ + Take in a Status object from the AmpliPi API and add home assistant specific encoding to it before pushing it to global state. + Returns the newly encoded Status object just so that _async_update_data has something to return as well. + """ + async def build_entity(entity: Union[PySource, PyZone, PyGroup, PyStream], kind: str, cls, original_name: str): + try: + unique_id = f"{DOMAIN}_{kind}_{entity['id']}" + entity_id = await self.get_entity_id_from_unique_id(unique_id) or f"media_player.{unique_id}" + friendly_name = await self.get_friendly_name(entity_id) or original_name + return cls( + **entity, + original_name=original_name, + unique_id=unique_id, + entity_id=entity_id, + friendly_name=friendly_name, + ) + except TypeError as e: + self.logger.error(f"Original name = {original_name}, entity = {entity}") + raise TypeError(e) from e try: - state = await self.api.get_status() - state = state.dict() - state["sources"] = [ await build_entity(entity, "source", Source, f"Source {entity['id'] + 1}") for entity in state["sources"] @@ -73,7 +93,69 @@ async def build_entity(entity, kind: str, cls, original_name: str): for entity in state["streams"] ] - return Status(**state) + status = Status(**state) + self.async_set_updated_data(status) + return status except Exception as e: - raise UpdateFailed(f"Error fetching data: {e}") + raise UpdateFailed(f"Error fetching data: {e}") from e + + async def get_status(self) -> Status: + resp = await super().get_status() + status = await self.set_data(resp.dict()) + return status + + async def set_source(self, source_id: int, source_update: SourceUpdate) -> Status: + resp = await super().set_source(source_id, source_update) + status = await self.set_data(resp.dict()) + return status + + async def set_zone(self, zone_id: int, zone_update: ZoneUpdate) -> Status: + resp = await super().set_zone(zone_id, zone_update) + status = await self.set_data(resp.dict()) + return status + + async def set_zones(self, zone_update: MultiZoneUpdate) -> Status: + resp = await super().set_zones(zone_update) + status = await self.set_data(resp.dict()) + return status + + async def play_media(self, media: PlayMedia) -> Status: + resp = await super().play_media(media) + status = await self.set_data(resp.dict()) + return status + + async def set_group(self, group_id, update: GroupUpdate) -> Status: + resp = await super().set_group(group_id, update) + status = await self.set_data(resp.dict()) + return status + + async def announce(self, announcement: Announcement, timeout: Optional[int] = None) -> Status: + resp = await super().announce(announcement, timeout) + status = await self.set_data(resp.dict()) + return status + + async def play_stream(self, stream_id: int) -> Status: + resp = await super().play_stream(stream_id) + status = await self.set_data(resp.dict()) + return status + + async def pause_stream(self, stream_id: int) -> Status: + resp = await super().pause_stream(stream_id) + status = await self.set_data(resp.dict()) + return status + + async def previous_stream(self, stream_id: int) -> Status: + resp = await super().previous_stream(stream_id) + status = await self.set_data(resp.dict()) + return status + + async def next_stream(self, stream_id: int) -> Status: + resp = await super().previous_stream(stream_id) + status = await self.set_data(resp.dict()) + return status + + async def stop_stream(self, stream_id: int) -> Status: + resp = await super().stop_stream(stream_id) + status = await self.set_data(resp.dict()) + return status diff --git a/custom_components/amplipi/media_player.py b/custom_components/amplipi/media_player.py index f71be7c..86bae67 100644 --- a/custom_components/amplipi/media_player.py +++ b/custom_components/amplipi/media_player.py @@ -17,11 +17,13 @@ from homeassistant.helpers.entity import DeviceInfo from pyamplipi.amplipi import AmpliPi from pyamplipi.models import ZoneUpdate, SourceUpdate, GroupUpdate, Announcement, MultiZoneUpdate, PlayMedia +from pyamplipi.models import ZoneUpdate, SourceUpdate, GroupUpdate, Announcement, MultiZoneUpdate, PlayMedia from .coordinator import AmpliPiCoordinator from .const import ( DOMAIN, AMPLIPI_OBJECT, CONF_VENDOR, CONF_VERSION, CONF_WEBAPP, ) from .models import Source, Group, Zone, Stream +from .models import Source, Group, Zone, Stream SUPPORT_AMPLIPI_DAC = ( MediaPlayerEntityFeature.SELECT_SOURCE @@ -60,37 +62,41 @@ async def async_setup_entry(hass, config_entry, async_add_entities): """Set up the AmpliPi MultiZone Audio Controller""" hass_entry = hass.data[DOMAIN][config_entry.entry_id] - amplipi: AmpliPi = hass_entry[AMPLIPI_OBJECT] + amplipi_coordinator: AmpliPiCoordinator = hass_entry[AMPLIPI_OBJECT] vendor = hass_entry[CONF_VENDOR] name = hass_entry[CONF_NAME] version = hass_entry[CONF_VERSION] image_base_path = f'{hass_entry[CONF_WEBAPP]}' - data_coordinator = AmpliPiCoordinator(hass, _LOGGER, config_entry, amplipi) - - status = await data_coordinator._async_update_data() + status = amplipi_coordinator.data if amplipi_coordinator.data is not None else await amplipi_coordinator.get_status() sources: list[AmpliPiMediaPlayer] = [ - AmpliPiSource(data_coordinator, DOMAIN, source, status.streams, vendor, version, image_base_path, amplipi) + AmpliPiSource(DOMAIN, source, status.streams, vendor, version, image_base_path, amplipi_coordinator) for source in status.sources] zones: list[AmpliPiMediaPlayer] = [ - AmpliPiZone(data_coordinator, DOMAIN, zone, None, status.streams, status.sources, vendor, version, image_base_path, amplipi) + AmpliPiZone(DOMAIN, zone, None, status.streams, status.sources, vendor, version, image_base_path, amplipi_coordinator) for zone in status.zones] groups: list[AmpliPiMediaPlayer] = [ - AmpliPiZone(data_coordinator, DOMAIN, None, group, status.streams, status.sources, vendor, version, image_base_path, amplipi) + AmpliPiZone(DOMAIN, None, group, status.streams, status.sources, vendor, version, image_base_path, amplipi_coordinator) for group in status.groups] + streams: list[AmpliPiMediaPlayer] = [ + AmpliPiStream(DOMAIN, stream, status.sources, vendor, version, image_base_path, amplipi_coordinator) + for stream in status.streams + ] + streams: list[AmpliPiMediaPlayer] = [ AmpliPiStream(data_coordinator, DOMAIN, stream, status.sources, vendor, version, image_base_path, amplipi) for stream in status.streams ] announcer: list[MediaPlayerEntity] = [ - AmpliPiAnnouncer(DOMAIN, vendor, version, image_base_path, amplipi) + AmpliPiAnnouncer(DOMAIN, vendor, version, image_base_path, amplipi_coordinator) ] async_add_entities(sources + zones + groups + streams + announcer) + async_add_entities(sources + zones + groups + streams + announcer) async def async_remove_entry(hass, entry) -> None: @@ -112,9 +118,6 @@ class AmpliPiMediaPlayer(MediaPlayerEntity, CoordinatorEntity): """ Parent class of all AmpliPi MediaPlayer entities. Used to enforce common variables and provide shared functionality. """ - # State repository - coordinator: AmpliPiCoordinator - # The amplipi-side id _id: int @@ -131,7 +134,7 @@ class AmpliPiMediaPlayer(MediaPlayerEntity, CoordinatorEntity): # Home assistant particulars that are populated at entity instantiation via hass _vendor: str _version: str - _client: AmpliPi + _client: AmpliPiCoordinator _domain: str _image_base_path: str # Where the album art metadata is stored on home assistant @@ -156,8 +159,8 @@ class AmpliPiMediaPlayer(MediaPlayerEntity, CoordinatorEntity): def get_entry_by_value(self, value: str) -> Union[Source, Zone, Group, Stream, None]: """Find what dict within the state array has a given value and return said dict""" - if self.coordinator.data is not None: - for category in (self.coordinator.data.sources, self.coordinator.data.zones, self.coordinator.data.groups, self.coordinator.data.streams): + if self._client.data is not None: + for category in (self._client.data.sources, self._client.data.zones, self._client.data.groups, self._client.data.streams): for entry in category: if value in entry.model_dump().values(): return entry @@ -183,11 +186,11 @@ def extract_amplipi_id_from_unique_id(self, uid: str) -> Optional[int]: def available_streams(self, source: Source): """Returns the available streams (generally all of them minus three of the four RCAs) relative to the provided source""" streams: List[str] = ['None'] - if self.coordinator.data is not None: + if self._client.data is not None: # Excludes every RCA except for the one related to the given source RCAs = [996, 997, 998, 999] rca_selectable = RCAs[source.id] - stream_entries = self.coordinator.data.streams + stream_entries = self._client.data.streams if stream_entries: for entry in stream_entries: amplipi_id = self.extract_amplipi_id_from_unique_id(entry.unique_id) @@ -205,7 +208,7 @@ async def async_connect_stream_to_source(self, stream: Stream, source: Optional[ if self._current_source is not None and source is not None and source.id != self._current_source.id: raise Exception("RCA streams can only connect to sources with the same ID") - state = self.coordinator.data + state = self._client.data source = state.sources[get_fixed_source_id(stream)] # It would be cleaner to do the following, but pyamplipi doesn't support RCA stream's index value atm: # source = state.sources[self._stream.index] @@ -246,7 +249,7 @@ async def async_connect_zones_to_source(self, source: Source, zones: Optional[Li async def async_connect_zones_to_stream(self, stream: Stream, zones: Optional[List[int]], groups: Optional[List[int]]): """Connects zones and/or groups to the source of the selected stream. If stream does not have a source, select one""" - state = self.coordinator.data + state = self._client.data source_id = next((s.id for s in state.sources if s.input == f"stream={stream.id}"), None) if source_id is None: source_id = await self.async_connect_stream_to_source(stream) @@ -300,7 +303,7 @@ async def find_source(self) -> Source: async def swap_source(self, old_source: int, new_source: Optional[int] = None): """Moves a stream from one source to another, ensuring all zones follow. Generally only used for RCA streams, but able to be used by anyone.""" - state = self.coordinator.data + state = self._client.data moved_stream: Stream = next(filter(lambda s: state.sources[old_source].input == f"stream={s.id}", state.streams), None) if moved_stream is not None and moved_stream.type != "rca": @@ -413,9 +416,9 @@ def supported_features(self): class AmpliPiSource(AmpliPiMediaPlayer): """Representation of an AmpliPi Source Input, of which 4 are supported (Hard Coded).""" - def __init__(self, coordinator: AmpliPiCoordinator, namespace: str, source: Source, streams: List[Stream], vendor: str, version: str, - image_base_path: str, client: AmpliPi): - super().__init__(coordinator) + def __init__(self, namespace: str, source: Source, streams: List[Stream], vendor: str, version: str, + image_base_path: str, client: AmpliPiCoordinator): + super().__init__(client) self._streams: List[Stream] = streams self._source = source @@ -609,7 +612,7 @@ def device_info(self) -> DeviceInfo: def sync_state(self): """Retrieve latest state.""" _LOGGER.info(f'Retrieving state for source {self._source.id}') - state = self.coordinator.data + state = self._client.data if state is not None: try: source = next(filter(lambda z: z.id == self._source.id, state.sources), None) @@ -732,11 +735,11 @@ class AmpliPiZone(AmpliPiMediaPlayer): and mute controls and the ability to change the current 'source' a zone is tied to""" - def __init__(self, coordinator: AmpliPiCoordinator, namespace: str, zone: Zone, group: Group, + def __init__(self, namespace: str, zone: Zone, group: Group, streams: List[Stream], sources: List[Source], vendor: str, version: str, image_base_path: str, - client: AmpliPi): - super().__init__(coordinator) + client: AmpliPiCoordinator): + super().__init__(client) self._sources = sources self._split_group: bool = False self._domain = namespace @@ -890,7 +893,7 @@ def device_info(self) -> DeviceInfo: def sync_state(self): """Retrieve latest state.""" _LOGGER.info(f'Retrieving state for source {self._id}') - state = self.coordinator.data + state = self._client.data if state is not None: zone = None group = None @@ -1100,7 +1103,7 @@ def extra_state_attributes(self): async def _get_extra_attributes(self): if self._group is not None: - state = self.coordinator.data + state = self._client.data zone_ids = [] for zone_id in self._group.zones: @@ -1115,7 +1118,7 @@ async def _get_extra_attributes(self): self._extra_attributes = {"amplipi_zone_id" : self._zone.id} async def _update_available(self): - state = self.coordinator.data + state = self._client.data if self._group is not None: for zone_id in self._group.zones: for state_zone in state.zones: @@ -1239,11 +1242,11 @@ class AmpliPiStream(AmpliPiMediaPlayer): and mute controls and the ability to change the current 'source' a stream is tied to""" - def __init__(self, coordinator: AmpliPiCoordinator, namespace: str, stream: Stream, + def __init__(self, namespace: str, stream: Stream, sources: List[Source], vendor: str, version: str, image_base_path: str, - client: AmpliPi): - super().__init__(coordinator) + client: AmpliPiCoordinator): + super().__init__(client) self._stream: Stream = stream self._current_stream = self._stream # Make an alias for use with AmpliPiMediaPlayer functions while keeping local verbiage more correct self._current_source = None @@ -1373,7 +1376,7 @@ def device_info(self) -> DeviceInfo: def sync_state(self): """Retrieve latest state.""" _LOGGER.info(f'Retrieving state for stream {self._id}') - state = self.coordinator.data + state = self._client.data if state is not None: groups = [] zones = [] From d8d773f2324d770c4018efe167fe0c2f174cdfc3 Mon Sep 17 00:00:00 2001 From: Steven Engelbert Date: Tue, 1 Jul 2025 14:55:52 -0400 Subject: [PATCH 2/4] Rename AmpliPiCoordinator to reflect that it now contains the client, rename the _client attribute to _data_client for the same reason --- custom_components/amplipi/__init__.py | 4 +- custom_components/amplipi/coordinator.py | 2 +- custom_components/amplipi/media_player.py | 102 +++++++++++----------- 3 files changed, 54 insertions(+), 54 deletions(-) diff --git a/custom_components/amplipi/__init__.py b/custom_components/amplipi/__init__.py index e874f29..7958cad 100644 --- a/custom_components/amplipi/__init__.py +++ b/custom_components/amplipi/__init__.py @@ -6,7 +6,7 @@ from homeassistant.core import HomeAssistant from homeassistant.helpers.aiohttp_client import async_get_clientsession import logging -from .coordinator import AmpliPiCoordinator +from .coordinator import AmpliPiDataClient from .const import DOMAIN, AMPLIPI_OBJECT, CONF_VENDOR, CONF_VERSION, CONF_WEBAPP, CONF_API_PATH @@ -15,7 +15,7 @@ _LOGGER = logging.getLogger(__name__) async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: - coordinator = AmpliPiCoordinator( + coordinator = AmpliPiDataClient( hass=hass, config_entry=entry, logger=_LOGGER, diff --git a/custom_components/amplipi/coordinator.py b/custom_components/amplipi/coordinator.py index 1fe0f59..02e55bc 100644 --- a/custom_components/amplipi/coordinator.py +++ b/custom_components/amplipi/coordinator.py @@ -14,7 +14,7 @@ from .models import Status, Source, Zone, Group, Stream from .const import DOMAIN -class AmpliPiCoordinator(DataUpdateCoordinator, AmpliPi): +class AmpliPiDataClient(DataUpdateCoordinator, AmpliPi): def __init__(self, hass, logger, config_entry, endpoint, timeout, http_session): super().__init__( hass, diff --git a/custom_components/amplipi/media_player.py b/custom_components/amplipi/media_player.py index 86bae67..381485d 100644 --- a/custom_components/amplipi/media_player.py +++ b/custom_components/amplipi/media_player.py @@ -19,7 +19,7 @@ from pyamplipi.models import ZoneUpdate, SourceUpdate, GroupUpdate, Announcement, MultiZoneUpdate, PlayMedia from pyamplipi.models import ZoneUpdate, SourceUpdate, GroupUpdate, Announcement, MultiZoneUpdate, PlayMedia -from .coordinator import AmpliPiCoordinator +from .coordinator import AmpliPiDataClient from .const import ( DOMAIN, AMPLIPI_OBJECT, CONF_VENDOR, CONF_VERSION, CONF_WEBAPP, ) from .models import Source, Group, Zone, Stream @@ -62,7 +62,7 @@ async def async_setup_entry(hass, config_entry, async_add_entities): """Set up the AmpliPi MultiZone Audio Controller""" hass_entry = hass.data[DOMAIN][config_entry.entry_id] - amplipi_coordinator: AmpliPiCoordinator = hass_entry[AMPLIPI_OBJECT] + amplipi_coordinator: AmpliPiDataClient = hass_entry[AMPLIPI_OBJECT] vendor = hass_entry[CONF_VENDOR] name = hass_entry[CONF_NAME] version = hass_entry[CONF_VERSION] @@ -87,7 +87,7 @@ async def async_setup_entry(hass, config_entry, async_add_entities): ] streams: list[AmpliPiMediaPlayer] = [ - AmpliPiStream(data_coordinator, DOMAIN, stream, status.sources, vendor, version, image_base_path, amplipi) + AmpliPiStream(DOMAIN, stream, status.sources, vendor, version, image_base_path, amplipi_coordinator) for stream in status.streams ] @@ -134,7 +134,7 @@ class AmpliPiMediaPlayer(MediaPlayerEntity, CoordinatorEntity): # Home assistant particulars that are populated at entity instantiation via hass _vendor: str _version: str - _client: AmpliPiCoordinator + _data_client: AmpliPiDataClient _domain: str _image_base_path: str # Where the album art metadata is stored on home assistant @@ -159,8 +159,8 @@ class AmpliPiMediaPlayer(MediaPlayerEntity, CoordinatorEntity): def get_entry_by_value(self, value: str) -> Union[Source, Zone, Group, Stream, None]: """Find what dict within the state array has a given value and return said dict""" - if self._client.data is not None: - for category in (self._client.data.sources, self._client.data.zones, self._client.data.groups, self._client.data.streams): + if self._data_client.data is not None: + for category in (self._data_client.data.sources, self._data_client.data.zones, self._data_client.data.groups, self._data_client.data.streams): for entry in category: if value in entry.model_dump().values(): return entry @@ -186,11 +186,11 @@ def extract_amplipi_id_from_unique_id(self, uid: str) -> Optional[int]: def available_streams(self, source: Source): """Returns the available streams (generally all of them minus three of the four RCAs) relative to the provided source""" streams: List[str] = ['None'] - if self._client.data is not None: + if self._data_client.data is not None: # Excludes every RCA except for the one related to the given source RCAs = [996, 997, 998, 999] rca_selectable = RCAs[source.id] - stream_entries = self._client.data.streams + stream_entries = self._data_client.data.streams if stream_entries: for entry in stream_entries: amplipi_id = self.extract_amplipi_id_from_unique_id(entry.unique_id) @@ -208,7 +208,7 @@ async def async_connect_stream_to_source(self, stream: Stream, source: Optional[ if self._current_source is not None and source is not None and source.id != self._current_source.id: raise Exception("RCA streams can only connect to sources with the same ID") - state = self._client.data + state = self._data_client.data source = state.sources[get_fixed_source_id(stream)] # It would be cleaner to do the following, but pyamplipi doesn't support RCA stream's index value atm: # source = state.sources[self._stream.index] @@ -226,7 +226,7 @@ async def async_connect_stream_to_source(self, stream: Stream, source: Optional[ raise Exception("All sources are in use, disconnect a source or select one to override and try again.") if source_id is not None: - await self._client.set_source( + await self._data_client.set_source( source_id, SourceUpdate( input=f'stream={stream.id}' @@ -237,7 +237,7 @@ async def async_connect_stream_to_source(self, stream: Stream, source: Optional[ async def async_connect_zones_to_source(self, source: Source, zones: Optional[List[int]], groups: Optional[List[int]]): """Connects zones and/or groups to the provided source""" if source is not None: - await self._client.set_zones( + await self._data_client.set_zones( MultiZoneUpdate( zones=zones, groups=groups, @@ -249,7 +249,7 @@ async def async_connect_zones_to_source(self, source: Source, zones: Optional[Li async def async_connect_zones_to_stream(self, stream: Stream, zones: Optional[List[int]], groups: Optional[List[int]]): """Connects zones and/or groups to the source of the selected stream. If stream does not have a source, select one""" - state = self._client.data + state = self._data_client.data source_id = next((s.id for s in state.sources if s.input == f"stream={stream.id}"), None) if source_id is None: source_id = await self.async_connect_stream_to_source(stream) @@ -295,7 +295,7 @@ def get_song_info(self, source): async def find_source(self) -> Source: """Find first available source and return it. If no sources are available, returns None.""" - sources = await self._client.get_sources() + sources = await self._data_client.get_sources() for source in sources: if source.input in ['', 'None', None]: return source @@ -303,7 +303,7 @@ async def find_source(self) -> Source: async def swap_source(self, old_source: int, new_source: Optional[int] = None): """Moves a stream from one source to another, ensuring all zones follow. Generally only used for RCA streams, but able to be used by anyone.""" - state = self._client.data + state = self._data_client.data moved_stream: Stream = next(filter(lambda s: state.sources[old_source].input == f"stream={s.id}", state.streams), None) if moved_stream is not None and moved_stream.type != "rca": @@ -314,7 +314,7 @@ async def swap_source(self, old_source: int, new_source: Optional[int] = None): new_source = source.id if new_source is not None: - await self._client.set_source( + await self._data_client.set_source( new_source, SourceUpdate( input=f'stream={moved_stream.id}' @@ -322,7 +322,7 @@ async def swap_source(self, old_source: int, new_source: Optional[int] = None): ) moved_zones = [z.id for z in state.zones if z.source_id == old_source] - await self._client.set_zones( + await self._data_client.set_zones( MultiZoneUpdate( zones=moved_zones, update=ZoneUpdate( @@ -351,23 +351,23 @@ async def async_volume_down(self): async def async_media_play(self): if self._current_stream is not None: - await self._client.play_stream(self._current_stream.id) + await self._data_client.play_stream(self._current_stream.id) async def async_media_stop(self): if self._current_stream is not None: - await self._client.stop_stream(self._current_stream.id) + await self._data_client.stop_stream(self._current_stream.id) async def async_media_pause(self): if self._current_stream is not None: - await self._client.pause_stream(self._current_stream.id) + await self._data_client.pause_stream(self._current_stream.id) async def async_media_previous_track(self): if self._current_stream is not None: - await self._client.previous_stream(self._current_stream.id) + await self._data_client.previous_stream(self._current_stream.id) async def async_media_next_track(self): if self._current_stream is not None: - await self._client.next_stream(self._current_stream.id) + await self._data_client.next_stream(self._current_stream.id) @property def available(self): @@ -417,7 +417,7 @@ class AmpliPiSource(AmpliPiMediaPlayer): """Representation of an AmpliPi Source Input, of which 4 are supported (Hard Coded).""" def __init__(self, namespace: str, source: Source, streams: List[Stream], vendor: str, version: str, - image_base_path: str, client: AmpliPiCoordinator): + image_base_path: str, client: AmpliPiDataClient): super().__init__(client) self._streams: List[Stream] = streams self._source = source @@ -432,7 +432,7 @@ def __init__(self, namespace: str, source: Source, streams: List[Stream], vendor # Aliased so that AmpliPiMediaPlayer functions know to use the same source while also using a variable name that doesn't imply that the source can change within a source entity self._current_source = self._source - self._client = client + self._data_client = client self._name = source.original_name self._unique_id = source.unique_id @@ -454,7 +454,7 @@ async def async_turn_on(self): async def async_turn_off(self): if self._source is not None: _LOGGER.info(f"Turning source {self._name} off, disconnecting all zones and streams") - await self._client.set_source( + await self._data_client.set_source( self._id, SourceUpdate( input='None' @@ -532,7 +532,7 @@ async def async_play_media(self, media_type, media_id, **kwargs): _LOGGER.info(f'Playing media source: {play_item} {media_id}') media_id = async_process_play_media_url(self.hass, media_id) - await self._client.play_media( + await self._data_client.play_media( PlayMedia( source_id=self._source.id, media=media_id @@ -542,14 +542,14 @@ async def async_play_media(self, media_type, media_id, **kwargs): async def async_select_source(self, source): if self._source is not None and self._source.name == source: - await self._client.set_source( + await self._data_client.set_source( self._id, SourceUpdate( input=f"stream={get_fixed_source_id(self._source)}" ) ) elif source == 'None': - await self._client.set_source( + await self._data_client.set_source( self._id, SourceUpdate( input='None' @@ -560,7 +560,7 @@ async def async_select_source(self, source): stream_hacs_entity = self.get_entry_by_value(source) stream_id = self.extract_amplipi_id_from_unique_id(stream_hacs_entity.unique_id) if stream_id is not None: - await self._client.set_source( + await self._data_client.set_source( self._id, SourceUpdate( input=f'stream={stream_id}' @@ -612,7 +612,7 @@ def device_info(self) -> DeviceInfo: def sync_state(self): """Retrieve latest state.""" _LOGGER.info(f'Retrieving state for source {self._source.id}') - state = self._client.data + state = self._data_client.data if state is not None: try: source = next(filter(lambda z: z.id == self._source.id, state.sources), None) @@ -709,18 +709,18 @@ def source_list(self): return self.available_streams(self._source) async def _update_source(self, update: SourceUpdate): - await self._client.set_source(self._source.id, update) + await self._data_client.set_source(self._source.id, update) async def _update_zones(self, update: MultiZoneUpdate): - # zones = await self._client.get_zones() + # zones = await self._data_client.get_zones() # associated_zones = filter(lambda z: z.source_id == self._source.id, zones) - await self._client.set_zones(update) + await self._data_client.set_zones(update) async def _update_groups(self, update: GroupUpdate): - groups = await self._client.get_groups() + groups = await self._data_client.get_groups() associated_groups = filter(lambda g: g.source_id == self._source.id, groups) for group in associated_groups: - await self._client.set_group(group.id, update) + await self._data_client.set_group(group.id, update) @property def extra_state_attributes(self): @@ -738,7 +738,7 @@ class AmpliPiZone(AmpliPiMediaPlayer): def __init__(self, namespace: str, zone: Zone, group: Group, streams: List[Stream], sources: List[Source], vendor: str, version: str, image_base_path: str, - client: AmpliPiCoordinator): + client: AmpliPiDataClient): super().__init__(client) self._sources = sources self._split_group: bool = False @@ -764,7 +764,7 @@ def __init__(self, namespace: str, zone: Zone, group: Group, self._vendor = vendor self._version = version self._enabled = False - self._client = client + self._data_client = client self._attr_source_list = [ 'None', 'Source 1', @@ -893,7 +893,7 @@ def device_info(self) -> DeviceInfo: def sync_state(self): """Retrieve latest state.""" _LOGGER.info(f'Retrieving state for source {self._id}') - state = self._client.data + state = self._data_client.data if state is not None: zone = None group = None @@ -1044,10 +1044,10 @@ async def async_select_source(self, source: str): await self.async_connect_zones_to_source(*args) async def _update_zone(self, update: ZoneUpdate): - await self._client.set_zone(self._id, update) + await self._data_client.set_zone(self._id, update) async def _update_group(self, update: MultiZoneUpdate): - await self._client.set_zones(update) + await self._data_client.set_zones(update) @property def source_list(self): @@ -1079,7 +1079,7 @@ async def async_play_media(self, media_type, media_id, **kwargs): #No source, see if we can find an empty one if self._current_source is None: - sources = await self._client.get_sources() + sources = await self._data_client.get_sources() for source in sources: if source is not None and source.input in ['', 'None', None]: self._current_source = source @@ -1089,7 +1089,7 @@ async def async_play_media(self, media_type, media_id, **kwargs): media_id = async_process_play_media_url(self.hass, media_id) - await self._client.play_media( + await self._data_client.play_media( PlayMedia( source_id=self._current_source.id, media=media_id, @@ -1103,7 +1103,7 @@ def extra_state_attributes(self): async def _get_extra_attributes(self): if self._group is not None: - state = self._client.data + state = self._data_client.data zone_ids = [] for zone_id in self._group.zones: @@ -1118,7 +1118,7 @@ async def _get_extra_attributes(self): self._extra_attributes = {"amplipi_zone_id" : self._zone.id} async def _update_available(self): - state = self._client.data + state = self._data_client.data if self._group is not None: for zone_id in self._group.zones: for state_zone in state.zones: @@ -1146,7 +1146,7 @@ def __init__(self, namespace: str, self._vendor = vendor self._version = version self._enabled = True - self._client = client + self._data_client = client self._last_update_successful = True self._available = True self._extra_attributes: List = [] @@ -1222,7 +1222,7 @@ async def async_play_media(self, media_type, media_id, **kwargs): _LOGGER.info(f'Playing media source: {play_item} {media_id}') media_id = async_process_play_media_url(self.hass, media_id) - await self._client.announce( + await self._data_client.announce( Announcement( media=media_id, vol_f=self._volume @@ -1245,7 +1245,7 @@ class AmpliPiStream(AmpliPiMediaPlayer): def __init__(self, namespace: str, stream: Stream, sources: List[Source], vendor: str, version: str, image_base_path: str, - client: AmpliPiCoordinator): + client: AmpliPiDataClient): super().__init__(client) self._stream: Stream = stream self._current_stream = self._stream # Make an alias for use with AmpliPiMediaPlayer functions while keeping local verbiage more correct @@ -1265,7 +1265,7 @@ def __init__(self, namespace: str, stream: Stream, self._image_base_path = image_base_path self._vendor = vendor self._version = version - self._client = client + self._data_client = client self._attr_source_list = [ 'None', 'Any', @@ -1285,7 +1285,7 @@ async def _update_zones(self, update: ZoneUpdate): zones=[z.id for z in self._current_zones], update=update ) - await self._client.set_zones(multi_update) + await self._data_client.set_zones(multi_update) async def async_toggle(self): @@ -1313,7 +1313,7 @@ async def async_turn_off(self): source_id=-1, ) ) - await self._client.set_source( + await self._data_client.set_source( self._current_source.id, SourceUpdate( input='None' @@ -1376,7 +1376,7 @@ def device_info(self) -> DeviceInfo: def sync_state(self): """Retrieve latest state.""" _LOGGER.info(f'Retrieving state for stream {self._id}') - state = self._client.data + state = self._data_client.data if state is not None: groups = [] zones = [] @@ -1464,7 +1464,7 @@ async def async_select_source(self, source: Optional[str] = None): # As such, this info must be sorted and then sent down the proper logical path if source: if source == "None" and self._current_source is not None: - await self._client.set_source( + await self._data_client.set_source( self._current_source.id, SourceUpdate( input='None' From 604b0f8537b81ef8442619e270a89a4bf64767d2 Mon Sep 17 00:00:00 2001 From: Steven Engelbert Date: Wed, 2 Jul 2025 12:44:25 -0400 Subject: [PATCH 3/4] Make intercepted functions more convenient with a wrapper that reduces boilerplate --- custom_components/amplipi/coordinator.py | 70 +++++++++++------------- 1 file changed, 33 insertions(+), 37 deletions(-) diff --git a/custom_components/amplipi/coordinator.py b/custom_components/amplipi/coordinator.py index 02e55bc..4ad6687 100644 --- a/custom_components/amplipi/coordinator.py +++ b/custom_components/amplipi/coordinator.py @@ -3,7 +3,7 @@ Used to synchronize the current AmpliPi state with all of the corresponding HA Entities """ from datetime import timedelta -from typing import Optional, Union +from typing import Optional, Union, Callable from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed from homeassistant.helpers.entity_registry import async_get as async_get_entity_registry @@ -100,62 +100,58 @@ async def build_entity(entity: Union[PySource, PyZone, PyGroup, PyStream], kind: except Exception as e: raise UpdateFailed(f"Error fetching data: {e}") from e + def intecept_and_consume(func: Callable): + """Intercept the return of a function and consume the data into the data coordinator""" + async def wrapper(self, *args, **kwargs): + resp = await func(self, *args, **kwargs) + return await self.set_data(resp.dict()) + return wrapper + + @intecept_and_consume async def get_status(self) -> Status: - resp = await super().get_status() - status = await self.set_data(resp.dict()) - return status + return await super().get_status() + @intecept_and_consume async def set_source(self, source_id: int, source_update: SourceUpdate) -> Status: - resp = await super().set_source(source_id, source_update) - status = await self.set_data(resp.dict()) - return status + return await super().set_source(source_id, source_update) + @intecept_and_consume async def set_zone(self, zone_id: int, zone_update: ZoneUpdate) -> Status: - resp = await super().set_zone(zone_id, zone_update) - status = await self.set_data(resp.dict()) - return status + return await super().set_zone(zone_id, zone_update) + @intecept_and_consume async def set_zones(self, zone_update: MultiZoneUpdate) -> Status: - resp = await super().set_zones(zone_update) - status = await self.set_data(resp.dict()) - return status + return await super().set_zones(zone_update) + @intecept_and_consume async def play_media(self, media: PlayMedia) -> Status: - resp = await super().play_media(media) - status = await self.set_data(resp.dict()) - return status + return await super().play_media(media) + @intecept_and_consume async def set_group(self, group_id, update: GroupUpdate) -> Status: - resp = await super().set_group(group_id, update) - status = await self.set_data(resp.dict()) - return status + return await super().set_group(group_id, update) + @intecept_and_consume async def announce(self, announcement: Announcement, timeout: Optional[int] = None) -> Status: - resp = await super().announce(announcement, timeout) - status = await self.set_data(resp.dict()) - return status + return await super().announce(announcement, timeout) + @intecept_and_consume async def play_stream(self, stream_id: int) -> Status: - resp = await super().play_stream(stream_id) - status = await self.set_data(resp.dict()) - return status + return await super().play_stream(stream_id) + @intecept_and_consume async def pause_stream(self, stream_id: int) -> Status: - resp = await super().pause_stream(stream_id) - status = await self.set_data(resp.dict()) - return status + return await super().pause_stream(stream_id) + @intecept_and_consume async def previous_stream(self, stream_id: int) -> Status: - resp = await super().previous_stream(stream_id) - status = await self.set_data(resp.dict()) - return status + return await super().previous_stream(stream_id) + @intecept_and_consume async def next_stream(self, stream_id: int) -> Status: - resp = await super().previous_stream(stream_id) - status = await self.set_data(resp.dict()) - return status + return await super().previous_stream(stream_id) + @intecept_and_consume async def stop_stream(self, stream_id: int) -> Status: - resp = await super().stop_stream(stream_id) - status = await self.set_data(resp.dict()) - return status + return await super().stop_stream(stream_id) + \ No newline at end of file From c1b9aa87d74ff795ffb6f91f62171d031cb2bd99 Mon Sep 17 00:00:00 2001 From: Steven Engelbert Date: Fri, 21 Mar 2025 13:55:13 -0400 Subject: [PATCH 4/4] Add updater entity, change coordinator to house data from github relating to what releases are available --- custom_components/amplipi/__init__.py | 6 +- custom_components/amplipi/const.py | 1 + custom_components/amplipi/coordinator.py | 85 ++++++++--- custom_components/amplipi/media_player.py | 90 ++++++------ custom_components/amplipi/update.py | 165 ++++++++++++++++++++++ 5 files changed, 277 insertions(+), 70 deletions(-) create mode 100644 custom_components/amplipi/update.py diff --git a/custom_components/amplipi/__init__.py b/custom_components/amplipi/__init__.py index 7958cad..843c893 100644 --- a/custom_components/amplipi/__init__.py +++ b/custom_components/amplipi/__init__.py @@ -8,9 +8,9 @@ import logging from .coordinator import AmpliPiDataClient -from .const import DOMAIN, AMPLIPI_OBJECT, CONF_VENDOR, CONF_VERSION, CONF_WEBAPP, CONF_API_PATH +from .const import DOMAIN, AMPLIPI_OBJECT, UPDATER_URL, CONF_VENDOR, CONF_VERSION, CONF_WEBAPP, CONF_API_PATH -PLATFORMS = ["media_player"] +PLATFORMS = ["media_player", "update"] _LOGGER = logging.getLogger(__name__) @@ -18,7 +18,6 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: coordinator = AmpliPiDataClient( hass=hass, config_entry=entry, - logger=_LOGGER, endpoint=f'http://{entry.data[CONF_HOST]}:{entry.data[CONF_PORT]}/api/', timeout=10, http_session=async_get_clientsession(hass) @@ -34,6 +33,7 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: CONF_VERSION: entry.data[CONF_VERSION], CONF_WEBAPP: entry.data[CONF_WEBAPP], CONF_API_PATH: entry.data[CONF_API_PATH], + UPDATER_URL: "http://{entry.data[CONF_HOST]}:5001" } await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS) diff --git a/custom_components/amplipi/const.py b/custom_components/amplipi/const.py index af08716..6158b85 100644 --- a/custom_components/amplipi/const.py +++ b/custom_components/amplipi/const.py @@ -4,5 +4,6 @@ CONF_VENDOR = "vendor" CONF_VERSION = "version" AMPLIPI_OBJECT = "amplipi_object" +UPDATER_URL = "updater_url" CONF_WEBAPP = "webapp" CONF_API_PATH = "api_path" diff --git a/custom_components/amplipi/coordinator.py b/custom_components/amplipi/coordinator.py index 4ad6687..99fdb3c 100644 --- a/custom_components/amplipi/coordinator.py +++ b/custom_components/amplipi/coordinator.py @@ -7,6 +7,12 @@ from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed from homeassistant.helpers.entity_registry import async_get as async_get_entity_registry +from homeassistant.helpers import aiohttp_client +from typing import List +from pydantic import BaseModel +import async_timeout +import logging +from .models import Status, Source, Zone, Group, Stream from pyamplipi.amplipi import AmpliPi from pyamplipi.models import SourceUpdate, ZoneUpdate, MultiZoneUpdate, GroupUpdate, PlayMedia, Announcement, Status as PyStatus, Source as PySource, Stream as PyStream, Group as PyGroup, Zone as PyZone, Status as PyStatus @@ -14,11 +20,18 @@ from .models import Status, Source, Zone, Group, Stream from .const import DOMAIN +_LOGGER = logging.getLogger(__name__) + +class AmpliPiDataSchema(BaseModel): + status: Status + latest: dict + releases: List[dict] + class AmpliPiDataClient(DataUpdateCoordinator, AmpliPi): - def __init__(self, hass, logger, config_entry, endpoint, timeout, http_session): + def __init__(self, hass, config_entry, endpoint, timeout, http_session): super().__init__( hass, - logger, + _LOGGER, config_entry=config_entry, name="hacs_amplipi", update_interval=timedelta(seconds=2), @@ -45,13 +58,26 @@ async def get_entity_id_from_unique_id(self, unique_id: str): if entry.unique_id == unique_id: return entry.entity_id return None + + async def fetch_github_releases(self, session) -> tuple[dict, List[dict]]: + """Fetch latest and all releases from GitHub using aiohttp.""" + base_url = "https://api.github.com/repos/micro-nova/AmpliPi/releases" + try: + with async_timeout.timeout(10): + async with session.get(f"{base_url}/latest") as latest_resp: + latest: dict = await latest_resp.json() + async with session.get(base_url) as releases_resp: + releases: List[dict] = await releases_resp.json() + return latest, releases + except Exception as e: + _LOGGER.warning(f"Failed to fetch GitHub releases: {e}") + return {}, [{}] - async def _async_update_data(self) -> Status: + async def _async_update_data(self) -> AmpliPiDataSchema: """Fetch data from API endpoint and pre-process into lookup tables.""" return await self.get_status() - - async def set_data(self, state: PyStatus) -> Status: + async def set_data(self, state: PyStatus) -> AmpliPiDataSchema: """ Take in a Status object from the AmpliPi API and add home assistant specific encoding to it before pushing it to global state. Returns the newly encoded Status object just so that _async_update_data has something to return as well. @@ -92,14 +118,33 @@ async def build_entity(entity: Union[PySource, PyZone, PyGroup, PyStream], kind: await build_entity(entity, "stream", Stream, entity["name"]) for entity in state["streams"] ] - + status = Status(**state) - self.async_set_updated_data(status) - return status + latest = self.latest + releases = self.releases + if self.latest is None or status.info.latest_release != self.latest.get("name"): + session = aiohttp_client.async_get_clientsession(self.hass) + latest, releases = await self.fetch_github_releases(session) + + data = AmpliPiDataSchema(status=status, latest=latest, releases=releases) + self.async_set_updated_data(data) + return data except Exception as e: raise UpdateFailed(f"Error fetching data: {e}") from e + @property + def status(self) -> Optional[Status]: + return self.data.status if self.data is not None else None + + @property + def latest(self) -> Optional[dict]: + return self.data.latest if self.data is not None else None + + @property + def releases(self) -> Optional[List[dict]]: + return self.data.releases if self.data is not None else None + def intecept_and_consume(func: Callable): """Intercept the return of a function and consume the data into the data coordinator""" async def wrapper(self, *args, **kwargs): @@ -108,50 +153,50 @@ async def wrapper(self, *args, **kwargs): return wrapper @intecept_and_consume - async def get_status(self) -> Status: + async def get_status(self) -> AmpliPiDataSchema: return await super().get_status() @intecept_and_consume - async def set_source(self, source_id: int, source_update: SourceUpdate) -> Status: + async def set_source(self, source_id: int, source_update: SourceUpdate) -> AmpliPiDataSchema: return await super().set_source(source_id, source_update) @intecept_and_consume - async def set_zone(self, zone_id: int, zone_update: ZoneUpdate) -> Status: + async def set_zone(self, zone_id: int, zone_update: ZoneUpdate) -> AmpliPiDataSchema: return await super().set_zone(zone_id, zone_update) @intecept_and_consume - async def set_zones(self, zone_update: MultiZoneUpdate) -> Status: + async def set_zones(self, zone_update: MultiZoneUpdate) -> AmpliPiDataSchema: return await super().set_zones(zone_update) @intecept_and_consume - async def play_media(self, media: PlayMedia) -> Status: + async def play_media(self, media: PlayMedia) -> AmpliPiDataSchema: return await super().play_media(media) @intecept_and_consume - async def set_group(self, group_id, update: GroupUpdate) -> Status: + async def set_group(self, group_id, update: GroupUpdate) -> AmpliPiDataSchema: return await super().set_group(group_id, update) @intecept_and_consume - async def announce(self, announcement: Announcement, timeout: Optional[int] = None) -> Status: + async def announce(self, announcement: Announcement, timeout: Optional[int] = None) -> AmpliPiDataSchema: return await super().announce(announcement, timeout) @intecept_and_consume - async def play_stream(self, stream_id: int) -> Status: + async def play_stream(self, stream_id: int) -> AmpliPiDataSchema: return await super().play_stream(stream_id) @intecept_and_consume - async def pause_stream(self, stream_id: int) -> Status: + async def pause_stream(self, stream_id: int) -> AmpliPiDataSchema: return await super().pause_stream(stream_id) @intecept_and_consume - async def previous_stream(self, stream_id: int) -> Status: + async def previous_stream(self, stream_id: int) -> AmpliPiDataSchema: return await super().previous_stream(stream_id) @intecept_and_consume - async def next_stream(self, stream_id: int) -> Status: + async def next_stream(self, stream_id: int) -> AmpliPiDataSchema: return await super().previous_stream(stream_id) @intecept_and_consume - async def stop_stream(self, stream_id: int) -> Status: + async def stop_stream(self, stream_id: int) -> AmpliPiDataSchema: return await super().stop_stream(stream_id) \ No newline at end of file diff --git a/custom_components/amplipi/media_player.py b/custom_components/amplipi/media_player.py index 381485d..da264ca 100644 --- a/custom_components/amplipi/media_player.py +++ b/custom_components/amplipi/media_player.py @@ -17,12 +17,10 @@ from homeassistant.helpers.entity import DeviceInfo from pyamplipi.amplipi import AmpliPi from pyamplipi.models import ZoneUpdate, SourceUpdate, GroupUpdate, Announcement, MultiZoneUpdate, PlayMedia -from pyamplipi.models import ZoneUpdate, SourceUpdate, GroupUpdate, Announcement, MultiZoneUpdate, PlayMedia from .coordinator import AmpliPiDataClient from .const import ( - DOMAIN, AMPLIPI_OBJECT, CONF_VENDOR, CONF_VERSION, CONF_WEBAPP, ) -from .models import Source, Group, Zone, Stream + DOMAIN, AMPLIPI_OBJECT, CONF_VENDOR, CONF_VERSION, CONF_WEBAPP ) from .models import Source, Group, Zone, Stream SUPPORT_AMPLIPI_DAC = ( @@ -68,7 +66,7 @@ async def async_setup_entry(hass, config_entry, async_add_entities): version = hass_entry[CONF_VERSION] image_base_path = f'{hass_entry[CONF_WEBAPP]}' - status = amplipi_coordinator.data if amplipi_coordinator.data is not None else await amplipi_coordinator.get_status() + status = amplipi_coordinator.status if amplipi_coordinator.status is not None else (await amplipi_coordinator.get_status()).status sources: list[AmpliPiMediaPlayer] = [ AmpliPiSource(DOMAIN, source, status.streams, vendor, version, image_base_path, amplipi_coordinator) for source in status.sources] @@ -96,7 +94,6 @@ async def async_setup_entry(hass, config_entry, async_add_entities): ] async_add_entities(sources + zones + groups + streams + announcer) - async_add_entities(sources + zones + groups + streams + announcer) async def async_remove_entry(hass, entry) -> None: @@ -159,8 +156,8 @@ class AmpliPiMediaPlayer(MediaPlayerEntity, CoordinatorEntity): def get_entry_by_value(self, value: str) -> Union[Source, Zone, Group, Stream, None]: """Find what dict within the state array has a given value and return said dict""" - if self._data_client.data is not None: - for category in (self._data_client.data.sources, self._data_client.data.zones, self._data_client.data.groups, self._data_client.data.streams): + if self._data_client.status is not None: + for category in (self._data_client.status.sources, self._data_client.status.zones, self._data_client.status.groups, self._data_client.status.streams): for entry in category: if value in entry.model_dump().values(): return entry @@ -186,11 +183,11 @@ def extract_amplipi_id_from_unique_id(self, uid: str) -> Optional[int]: def available_streams(self, source: Source): """Returns the available streams (generally all of them minus three of the four RCAs) relative to the provided source""" streams: List[str] = ['None'] - if self._data_client.data is not None: + if self._data_client.status is not None: # Excludes every RCA except for the one related to the given source RCAs = [996, 997, 998, 999] rca_selectable = RCAs[source.id] - stream_entries = self._data_client.data.streams + stream_entries = self._data_client.status.streams if stream_entries: for entry in stream_entries: amplipi_id = self.extract_amplipi_id_from_unique_id(entry.unique_id) @@ -208,7 +205,7 @@ async def async_connect_stream_to_source(self, stream: Stream, source: Optional[ if self._current_source is not None and source is not None and source.id != self._current_source.id: raise Exception("RCA streams can only connect to sources with the same ID") - state = self._data_client.data + state = self._data_client.status source = state.sources[get_fixed_source_id(stream)] # It would be cleaner to do the following, but pyamplipi doesn't support RCA stream's index value atm: # source = state.sources[self._stream.index] @@ -249,13 +246,13 @@ async def async_connect_zones_to_source(self, source: Source, zones: Optional[Li async def async_connect_zones_to_stream(self, stream: Stream, zones: Optional[List[int]], groups: Optional[List[int]]): """Connects zones and/or groups to the source of the selected stream. If stream does not have a source, select one""" - state = self._data_client.data - source_id = next((s.id for s in state.sources if s.input == f"stream={stream.id}"), None) + status = self._data_client.status + source_id = next((s.id for s in status.sources if s.input == f"stream={stream.id}"), None) if source_id is None: source_id = await self.async_connect_stream_to_source(stream) if source_id is not None: - await self.async_connect_zones_to_source(state.sources[source_id], zones, groups) + await self.async_connect_zones_to_source(status.sources[source_id], zones, groups) def build_url(self, img_url): @@ -303,9 +300,9 @@ async def find_source(self) -> Source: async def swap_source(self, old_source: int, new_source: Optional[int] = None): """Moves a stream from one source to another, ensuring all zones follow. Generally only used for RCA streams, but able to be used by anyone.""" - state = self._data_client.data + status = self._data_client.status - moved_stream: Stream = next(filter(lambda s: state.sources[old_source].input == f"stream={s.id}", state.streams), None) + moved_stream: Stream = next(filter(lambda s: status.sources[old_source].input == f"stream={s.id}", status.streams), None) if moved_stream is not None and moved_stream.type != "rca": # RCA streams each have an associated source to output them due to hardware constraints if new_source is None: @@ -321,7 +318,7 @@ async def swap_source(self, old_source: int, new_source: Optional[int] = None): ) ) - moved_zones = [z.id for z in state.zones if z.source_id == old_source] + moved_zones = [z.id for z in status.zones if z.source_id == old_source] await self._data_client.set_zones( MultiZoneUpdate( zones=moved_zones, @@ -612,11 +609,11 @@ def device_info(self) -> DeviceInfo: def sync_state(self): """Retrieve latest state.""" _LOGGER.info(f'Retrieving state for source {self._source.id}') - state = self._data_client.data - if state is not None: + if self._data_client.status is not None: + status = self._data_client.status try: - source = next(filter(lambda z: z.id == self._source.id, state.sources), None) - streams = state.streams + source = next(filter(lambda z: z.id == self._source.id, status.sources), None) + streams = status.streams except Exception: self._last_update_successful = False _LOGGER.error(f'Could not update source {self._source.id}') @@ -634,8 +631,8 @@ def sync_state(self): stream_id = int(self._source.input.split('=')[1]) self._current_stream = next(filter(lambda s: s.id == stream_id, self._streams), None) - self._zones = list(filter(lambda z: z.source_id == self._source.id, state.zones)) - self._groups = list(filter(lambda z: z.source_id == self._source.id, state.groups)) + self._zones = list(filter(lambda z: z.source_id == self._source.id, status.zones)) + self._groups = list(filter(lambda z: z.source_id == self._source.id, status.groups)) self.get_song_info(self._source) self._last_update_successful = True @@ -893,28 +890,28 @@ def device_info(self) -> DeviceInfo: def sync_state(self): """Retrieve latest state.""" _LOGGER.info(f'Retrieving state for source {self._id}') - state = self._data_client.data - if state is not None: + if self._data_client.status is not None: + status = self._data_client.status zone = None group = None enabled = False try: if self._group is not None: - group: Group = next(filter(lambda z: z.id == self._id, state.groups), None) + group: Group = next(filter(lambda z: z.id == self._id, status.groups), None) if not group: self._last_update_successful = False return - any_enabled_zone = next(filter(lambda z: z.id in group.zones, state.zones), None) + any_enabled_zone = next(filter(lambda z: z.id in group.zones, status.zones), None) if any_enabled_zone is not None: enabled = True - connected_sources = [state.zones[zone_index].source_id for zone_index in group.zones] + connected_sources = [status.zones[zone_index].source_id for zone_index in group.zones] # Is every zone connected to the same source? self._split_group = len(set(connected_sources)) != 1 else: - zone = next(filter(lambda z: z.id == self._id, state.zones), None) + zone = next(filter(lambda z: z.id == self._id, status.zones), None) if not zone: self._last_update_successful = False return @@ -928,7 +925,7 @@ def sync_state(self): zone_ids = [] for zone_id in self._group.zones: - for state_zone in state.zones: + for state_zone in status.zones: if state_zone.id == zone_id and not state_zone.disabled: zone_ids.append(zone_id) self._extra_attributes = {"amplipi_zones" : zone_ids} @@ -941,7 +938,7 @@ def sync_state(self): if self._group is not None: for zone_id in self._group.zones: - for state_zone in state.zones: + for state_zone in status.zones: if state_zone.id == zone_id and not state_zone.disabled: self._available = True self._available = False @@ -951,19 +948,19 @@ def sync_state(self): self._zone = zone self._group = group - self._streams = state.streams - self._sources = state.sources + self._streams = status.streams + self._sources = status.sources self._last_update_successful = True self._enabled = enabled self._current_source = None # When a zone is off it connects to source_id -2, groups also yield the source_id that all requisite zones are already connected to if self._group is not None: - self._current_source = next(filter(lambda s: self._group.source_id == s.id, state.sources), None) + self._current_source = next(filter(lambda s: self._group.source_id == s.id, status.sources), None) self._is_off = self._group.source_id == -2 elif self._zone.source_id is not None: - self._current_source = next(filter(lambda s: self._zone.source_id == s.id, state.sources), None) + self._current_source = next(filter(lambda s: self._zone.source_id == s.id, status.sources), None) self._is_off = self._zone.source_id == -2 if self._current_source is not None and 'stream=' in self._current_source.input and 'stream=local' not in self._current_source.input: @@ -1103,11 +1100,11 @@ def extra_state_attributes(self): async def _get_extra_attributes(self): if self._group is not None: - state = self._data_client.data + status = self._data_client.status zone_ids = [] for zone_id in self._group.zones: - for state_zone in state.zones: + for state_zone in status.zones: if state_zone.id == zone_id and not state_zone.disabled: zone_ids.append(zone_id) self._extra_attributes = {"amplipi_zones" : zone_ids} @@ -1118,10 +1115,10 @@ async def _get_extra_attributes(self): self._extra_attributes = {"amplipi_zone_id" : self._zone.id} async def _update_available(self): - state = self._data_client.data + status = self._data_client.status if self._group is not None: for zone_id in self._group.zones: - for state_zone in state.zones: + for state_zone in status.zones: if state_zone.id == zone_id and not state_zone.disabled: return True return False @@ -1217,7 +1214,7 @@ async def async_browse_media(self, media_content_type=None, media_content_id=Non async def async_play_media(self, media_type, media_id, **kwargs): _LOGGER.debug(f'Play Media {media_type} {media_id} {kwargs}') if media_source.is_media_source_id(media_id): - play_item = await media_source.async_resolve_media(self.hass, media_id) + play_item = await media_source.async_resolve_media(self.hass, media_id, self.entity_id) media_id = play_item.url _LOGGER.info(f'Playing media source: {play_item} {media_id}') @@ -1376,18 +1373,18 @@ def device_info(self) -> DeviceInfo: def sync_state(self): """Retrieve latest state.""" _LOGGER.info(f'Retrieving state for stream {self._id}') - state = self._data_client.data - if state is not None: + if self._data_client.status is not None: + status = self._data_client.status groups = [] zones = [] try: - stream = next(filter(lambda s: s.id == self._id, state.streams), None) + stream = next(filter(lambda s: s.id == self._id, status.streams), None) if stream is not None: - current_source = next((s for s in state.sources if s.input == f"stream={stream.id}"), None) + current_source = next((s for s in status.sources if s.input == f"stream={stream.id}"), None) if current_source is not None: - groups = [group for group in state.groups if group.source_id == current_source.id] - zones = [zone for zone in state.zones if zone.source_id == current_source.id] + groups = [group for group in status.groups if group.source_id == current_source.id] + zones = [zone for zone in status.zones if zone.source_id == current_source.id] else: self._last_update_successful = False return @@ -1405,7 +1402,7 @@ def sync_state(self): self._available = self._stream is not None self._stream = stream - self._sources = state.sources + self._sources = status.sources self._current_source = current_source if current_source: # Cannot be off while connected, but can be on while disconnected self._is_off = False @@ -1498,4 +1495,3 @@ async def _update_available(self): if self._stream is None: return False return True - \ No newline at end of file diff --git a/custom_components/amplipi/update.py b/custom_components/amplipi/update.py new file mode 100644 index 0000000..aa7dcd5 --- /dev/null +++ b/custom_components/amplipi/update.py @@ -0,0 +1,165 @@ +from homeassistant.components.update import UpdateEntity, UpdateEntityFeature +from homeassistant.helpers.update_coordinator import CoordinatorEntity +from typing import Any +import logging +import aiohttp +import json +from .coordinator import AmpliPiDataClient + +from .const import DOMAIN, UPDATER_URL, AMPLIPI_OBJECT + +_LOGGER = logging.getLogger(__name__) + +async def async_setup_entry(hass, config_entry, async_add_entities): + """Set up the AmpliPi MultiZone Audio Controller""" + hass_entry = hass.data[DOMAIN][config_entry.entry_id] + updater_url = hass_entry[UPDATER_URL] + + data_coordinator = hass_entry[AMPLIPI_OBJECT] + update: list[UpdateEntity] = [ + AmpliPiUpdate(coordinator=data_coordinator, updater_url=updater_url) + ] + + async_add_entities(update) + +class AmpliPiUpdate(CoordinatorEntity, UpdateEntity): + + def __init__(self, coordinator: AmpliPiDataClient, updater_url): + super().__init__(coordinator) + + self.title = "AmpliPi" + self._LOGGER = _LOGGER + self._updater_url = updater_url + self.entity_id = "update.amplipi_system_update" + self.domain = "update" + self._release_summary = None + self._release_url = None + + @property + def installed_version(self): + if self.coordinator.status is not None: + return getattr(self.coordinator.status.info, "version", None) + return None + + @property + def latest_version(self): + if self.coordinator.status is not None: + return getattr(self.coordinator.status.info, "latest_release", None) + return None + + @property + def release_url(self): + if self.installed_version != self.latest_version: + update_info = self.get_update_info() + if update_info is not None: + self._release_url = update_info["html_url"] + return self._release_url + + @property + def release_summary(self): + if self.installed_version != self.latest_version: + update_info = self.get_update_info() + if update_info is not None: + self._release_summary = update_info["body"] + return self._release_summary + + @property + def should_poll(self): + """Polling needed.""" + return True + + @property + def available(self): + """Is the entity able to be used by the user? Should always return True so long as the entity is loaded.""" + return True + + @property + def name(self) -> str: + return "AmpliPi System Update" + + @property + def unique_id(self) -> str: + return "amplipi_system_update" + + @property + def supported_features(self): + return UpdateEntityFeature.INSTALL | UpdateEntityFeature.RELEASE_NOTES # TODO: Support UpdateEntityFeature.SPECIFIC_VERSION + + def get_update_info(self, update: str = "latest"): + if self.coordinator.data is not None: + if update == "latest": + return self.coordinator.latest + else: + for release in self.coordinator.releases: + if release["name"] == update: + return release + return None + + async def async_install(self, version: str = "latest", backup: bool = False, **kwargs: Any): + async def watch_and_restart(): + async with aiohttp.ClientSession() as session: + async with session.get(f"{self._updater_url}/update/install/progress") as resp: + async for line in resp.content: + # Example lines: + # b'data: {"message": "starting installation", "type": "info"}\r\n' + # b'\r\n' + # b'data: {"message": "Got amplipi release: micro-nova-AmpliPi-ffcec58", "type": "info"}\r\n' + # b'\r\n' + # b'event: ping\r\n' + # b'data: 2025-06-19 14:28:19.371839\r\n' + previous_line_message = "first line" + decoded_line = line.decode("utf-8") + if "message" in decoded_line: + data_json = decoded_line[len("data: "):].strip() + data = json.loads(data_json) + + if data["message"] != " ": + if data["type"] == "info": + # It might seem odd to make something literally labeled info be printed as not info level, but this is very loud + self._LOGGER.debug(data["message"]) + elif data["type"] == "warning": + self._LOGGER.warning(data["message"]) + elif data["type"] == "failure": + self._LOGGER.error(data["message"]) + else: + self._LOGGER.info(data["message"]) + + if data["type"] in ("success", "failed"): + if data["type"] == "success": + session.post(f"{self._updater_url}/update/restart", timeout=aiohttp.ClientTimeout(total=1000)) + else: + raise Exception(f"Update failed on: {previous_line_message}") + break + previous_line_message = data["message"] + + self.in_progress = True + try: + info = self.get_update_info(version) + + async with aiohttp.ClientSession() as session: + async with session.post( + f"{self._updater_url}/update/download", + json={'url': info['tarball_url'], 'version': info['name']}, + timeout=aiohttp.ClientTimeout(total=1000) + ) as download_resp: + if download_resp.status != 200: + raise Exception("AmpliPi System Update download failed") + + async with session.get( + f"{self._updater_url}/update/install", + timeout=aiohttp.ClientTimeout(total=1000) + ) as install_resp: + if install_resp.status != 200: + raise Exception("AmpliPi System Update installation failed") + + await watch_and_restart() + + except Exception as e: + self._LOGGER.error(f"Update failed due to error: {e}") + finally: + self.in_progress = False + await self.coordinator.async_request_refresh() + + async def async_release_notes(self): + return self.release_summary + \ No newline at end of file