diff --git a/custom_components/amplipi/__init__.py b/custom_components/amplipi/__init__.py index d6a6219..843c893 100644 --- a/custom_components/amplipi/__init__.py +++ b/custom_components/amplipi/__init__.py @@ -5,21 +5,26 @@ from homeassistant.const import CONF_HOST, CONF_PORT, CONF_NAME, CONF_ID from homeassistant.core import HomeAssistant from homeassistant.helpers.aiohttp_client import async_get_clientsession -from pyamplipi.amplipi import AmpliPi +import logging +from .coordinator import AmpliPiDataClient -from .const import DOMAIN, AMPLIPI_OBJECT, CONF_VENDOR, CONF_VERSION, CONF_WEBAPP, CONF_API_PATH +from .const import DOMAIN, AMPLIPI_OBJECT, UPDATER_URL, CONF_VENDOR, CONF_VERSION, CONF_WEBAPP, CONF_API_PATH -PLATFORMS = ["media_player"] +PLATFORMS = ["media_player", "update"] +_LOGGER = logging.getLogger(__name__) async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: - - hass.data.setdefault(DOMAIN, {})[entry.entry_id] = { - AMPLIPI_OBJECT: AmpliPi( - f'http://{entry.data[CONF_HOST]}:{entry.data[CONF_PORT]}/api/', - 10, + coordinator = AmpliPiDataClient( + hass=hass, + config_entry=entry, + endpoint=f'http://{entry.data[CONF_HOST]}:{entry.data[CONF_PORT]}/api/', + timeout=10, http_session=async_get_clientsession(hass) - ), + ) + + hass.data.setdefault(DOMAIN, {})[entry.entry_id] = { + AMPLIPI_OBJECT: coordinator, CONF_VENDOR: entry.data[CONF_VENDOR], CONF_NAME: entry.data[CONF_NAME], CONF_HOST: entry.data[CONF_HOST], @@ -28,6 +33,7 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: CONF_VERSION: entry.data[CONF_VERSION], CONF_WEBAPP: entry.data[CONF_WEBAPP], CONF_API_PATH: entry.data[CONF_API_PATH], + UPDATER_URL: "http://{entry.data[CONF_HOST]}:5001" } await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS) diff --git a/custom_components/amplipi/const.py b/custom_components/amplipi/const.py index af08716..6158b85 100644 --- a/custom_components/amplipi/const.py +++ b/custom_components/amplipi/const.py @@ -4,5 +4,6 @@ CONF_VENDOR = "vendor" CONF_VERSION = "version" AMPLIPI_OBJECT = "amplipi_object" +UPDATER_URL = "updater_url" CONF_WEBAPP = "webapp" CONF_API_PATH = "api_path" diff --git a/custom_components/amplipi/coordinator.py b/custom_components/amplipi/coordinator.py index 9f50589..99fdb3c 100644 --- a/custom_components/amplipi/coordinator.py +++ b/custom_components/amplipi/coordinator.py @@ -3,22 +3,47 @@ Used to synchronize the current AmpliPi state with all of the corresponding HA Entities """ from datetime import timedelta +from typing import Optional, Union, Callable + from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed -from .models import Status, Source, Zone, Group, Stream from homeassistant.helpers.entity_registry import async_get as async_get_entity_registry +from homeassistant.helpers import aiohttp_client +from typing import List +from pydantic import BaseModel +import async_timeout +import logging +from .models import Status, Source, Zone, Group, Stream + +from pyamplipi.amplipi import AmpliPi +from pyamplipi.models import SourceUpdate, ZoneUpdate, MultiZoneUpdate, GroupUpdate, PlayMedia, Announcement, Status as PyStatus, Source as PySource, Stream as PyStream, Group as PyGroup, Zone as PyZone, Status as PyStatus + +from .models import Status, Source, Zone, Group, Stream from .const import DOMAIN -class AmpliPiCoordinator(DataUpdateCoordinator): - def __init__(self, hass, logger, config_entry, api): +_LOGGER = logging.getLogger(__name__) + +class AmpliPiDataSchema(BaseModel): + status: Status + latest: dict + releases: List[dict] + +class AmpliPiDataClient(DataUpdateCoordinator, AmpliPi): + def __init__(self, hass, config_entry, endpoint, timeout, http_session): super().__init__( hass, - logger, + _LOGGER, config_entry=config_entry, name="hacs_amplipi", update_interval=timedelta(seconds=2), always_update=True ) - self.api = api + + AmpliPi.__init__( + self, + endpoint=endpoint, + timeout=timeout, + http_session=http_session + ) async def get_friendly_name(self, entity_id): """Look up entity in hass.states and get the friendly name""" @@ -33,26 +58,47 @@ async def get_entity_id_from_unique_id(self, unique_id: str): if entry.unique_id == unique_id: return entry.entity_id return None + + async def fetch_github_releases(self, session) -> tuple[dict, List[dict]]: + """Fetch latest and all releases from GitHub using aiohttp.""" + base_url = "https://api.github.com/repos/micro-nova/AmpliPi/releases" + try: + with async_timeout.timeout(10): + async with session.get(f"{base_url}/latest") as latest_resp: + latest: dict = await latest_resp.json() + async with session.get(base_url) as releases_resp: + releases: List[dict] = await releases_resp.json() + return latest, releases + except Exception as e: + _LOGGER.warning(f"Failed to fetch GitHub releases: {e}") + return {}, [{}] - async def _async_update_data(self) -> Status: + async def _async_update_data(self) -> AmpliPiDataSchema: """Fetch data from API endpoint and pre-process into lookup tables.""" + return await self.get_status() - async def build_entity(entity, kind: str, cls, original_name: str): - unique_id = f"{DOMAIN}_{kind}_{entity['id']}" - entity_id = await self.get_entity_id_from_unique_id(unique_id) or f"media_player.{unique_id}" - friendly_name = await self.get_friendly_name(entity_id) or original_name - return cls( - **entity, - original_name=original_name, - unique_id=unique_id, - entity_id=entity_id, - friendly_name=friendly_name, - ) + async def set_data(self, state: PyStatus) -> AmpliPiDataSchema: + """ + Take in a Status object from the AmpliPi API and add home assistant specific encoding to it before pushing it to global state. + Returns the newly encoded Status object just so that _async_update_data has something to return as well. + """ + async def build_entity(entity: Union[PySource, PyZone, PyGroup, PyStream], kind: str, cls, original_name: str): + try: + unique_id = f"{DOMAIN}_{kind}_{entity['id']}" + entity_id = await self.get_entity_id_from_unique_id(unique_id) or f"media_player.{unique_id}" + friendly_name = await self.get_friendly_name(entity_id) or original_name + return cls( + **entity, + original_name=original_name, + unique_id=unique_id, + entity_id=entity_id, + friendly_name=friendly_name, + ) + except TypeError as e: + self.logger.error(f"Original name = {original_name}, entity = {entity}") + raise TypeError(e) from e try: - state = await self.api.get_status() - state = state.dict() - state["sources"] = [ await build_entity(entity, "source", Source, f"Source {entity['id'] + 1}") for entity in state["sources"] @@ -72,8 +118,85 @@ async def build_entity(entity, kind: str, cls, original_name: str): await build_entity(entity, "stream", Stream, entity["name"]) for entity in state["streams"] ] - - return Status(**state) + + status = Status(**state) + latest = self.latest + releases = self.releases + if self.latest is None or status.info.latest_release != self.latest.get("name"): + session = aiohttp_client.async_get_clientsession(self.hass) + latest, releases = await self.fetch_github_releases(session) + + data = AmpliPiDataSchema(status=status, latest=latest, releases=releases) + self.async_set_updated_data(data) + return data except Exception as e: - raise UpdateFailed(f"Error fetching data: {e}") + raise UpdateFailed(f"Error fetching data: {e}") from e + + @property + def status(self) -> Optional[Status]: + return self.data.status if self.data is not None else None + + @property + def latest(self) -> Optional[dict]: + return self.data.latest if self.data is not None else None + + @property + def releases(self) -> Optional[List[dict]]: + return self.data.releases if self.data is not None else None + + def intecept_and_consume(func: Callable): + """Intercept the return of a function and consume the data into the data coordinator""" + async def wrapper(self, *args, **kwargs): + resp = await func(self, *args, **kwargs) + return await self.set_data(resp.dict()) + return wrapper + + @intecept_and_consume + async def get_status(self) -> AmpliPiDataSchema: + return await super().get_status() + + @intecept_and_consume + async def set_source(self, source_id: int, source_update: SourceUpdate) -> AmpliPiDataSchema: + return await super().set_source(source_id, source_update) + + @intecept_and_consume + async def set_zone(self, zone_id: int, zone_update: ZoneUpdate) -> AmpliPiDataSchema: + return await super().set_zone(zone_id, zone_update) + + @intecept_and_consume + async def set_zones(self, zone_update: MultiZoneUpdate) -> AmpliPiDataSchema: + return await super().set_zones(zone_update) + + @intecept_and_consume + async def play_media(self, media: PlayMedia) -> AmpliPiDataSchema: + return await super().play_media(media) + + @intecept_and_consume + async def set_group(self, group_id, update: GroupUpdate) -> AmpliPiDataSchema: + return await super().set_group(group_id, update) + + @intecept_and_consume + async def announce(self, announcement: Announcement, timeout: Optional[int] = None) -> AmpliPiDataSchema: + return await super().announce(announcement, timeout) + + @intecept_and_consume + async def play_stream(self, stream_id: int) -> AmpliPiDataSchema: + return await super().play_stream(stream_id) + + @intecept_and_consume + async def pause_stream(self, stream_id: int) -> AmpliPiDataSchema: + return await super().pause_stream(stream_id) + + @intecept_and_consume + async def previous_stream(self, stream_id: int) -> AmpliPiDataSchema: + return await super().previous_stream(stream_id) + + @intecept_and_consume + async def next_stream(self, stream_id: int) -> AmpliPiDataSchema: + return await super().previous_stream(stream_id) + + @intecept_and_consume + async def stop_stream(self, stream_id: int) -> AmpliPiDataSchema: + return await super().stop_stream(stream_id) + \ No newline at end of file diff --git a/custom_components/amplipi/media_player.py b/custom_components/amplipi/media_player.py index f71be7c..da264ca 100644 --- a/custom_components/amplipi/media_player.py +++ b/custom_components/amplipi/media_player.py @@ -18,9 +18,9 @@ from pyamplipi.amplipi import AmpliPi from pyamplipi.models import ZoneUpdate, SourceUpdate, GroupUpdate, Announcement, MultiZoneUpdate, PlayMedia -from .coordinator import AmpliPiCoordinator +from .coordinator import AmpliPiDataClient from .const import ( - DOMAIN, AMPLIPI_OBJECT, CONF_VENDOR, CONF_VERSION, CONF_WEBAPP, ) + DOMAIN, AMPLIPI_OBJECT, CONF_VENDOR, CONF_VERSION, CONF_WEBAPP ) from .models import Source, Group, Zone, Stream SUPPORT_AMPLIPI_DAC = ( @@ -60,34 +60,37 @@ async def async_setup_entry(hass, config_entry, async_add_entities): """Set up the AmpliPi MultiZone Audio Controller""" hass_entry = hass.data[DOMAIN][config_entry.entry_id] - amplipi: AmpliPi = hass_entry[AMPLIPI_OBJECT] + amplipi_coordinator: AmpliPiDataClient = hass_entry[AMPLIPI_OBJECT] vendor = hass_entry[CONF_VENDOR] name = hass_entry[CONF_NAME] version = hass_entry[CONF_VERSION] image_base_path = f'{hass_entry[CONF_WEBAPP]}' - data_coordinator = AmpliPiCoordinator(hass, _LOGGER, config_entry, amplipi) - - status = await data_coordinator._async_update_data() + status = amplipi_coordinator.status if amplipi_coordinator.status is not None else (await amplipi_coordinator.get_status()).status sources: list[AmpliPiMediaPlayer] = [ - AmpliPiSource(data_coordinator, DOMAIN, source, status.streams, vendor, version, image_base_path, amplipi) + AmpliPiSource(DOMAIN, source, status.streams, vendor, version, image_base_path, amplipi_coordinator) for source in status.sources] zones: list[AmpliPiMediaPlayer] = [ - AmpliPiZone(data_coordinator, DOMAIN, zone, None, status.streams, status.sources, vendor, version, image_base_path, amplipi) + AmpliPiZone(DOMAIN, zone, None, status.streams, status.sources, vendor, version, image_base_path, amplipi_coordinator) for zone in status.zones] groups: list[AmpliPiMediaPlayer] = [ - AmpliPiZone(data_coordinator, DOMAIN, None, group, status.streams, status.sources, vendor, version, image_base_path, amplipi) + AmpliPiZone(DOMAIN, None, group, status.streams, status.sources, vendor, version, image_base_path, amplipi_coordinator) for group in status.groups] streams: list[AmpliPiMediaPlayer] = [ - AmpliPiStream(data_coordinator, DOMAIN, stream, status.sources, vendor, version, image_base_path, amplipi) + AmpliPiStream(DOMAIN, stream, status.sources, vendor, version, image_base_path, amplipi_coordinator) + for stream in status.streams + ] + + streams: list[AmpliPiMediaPlayer] = [ + AmpliPiStream(DOMAIN, stream, status.sources, vendor, version, image_base_path, amplipi_coordinator) for stream in status.streams ] announcer: list[MediaPlayerEntity] = [ - AmpliPiAnnouncer(DOMAIN, vendor, version, image_base_path, amplipi) + AmpliPiAnnouncer(DOMAIN, vendor, version, image_base_path, amplipi_coordinator) ] async_add_entities(sources + zones + groups + streams + announcer) @@ -112,9 +115,6 @@ class AmpliPiMediaPlayer(MediaPlayerEntity, CoordinatorEntity): """ Parent class of all AmpliPi MediaPlayer entities. Used to enforce common variables and provide shared functionality. """ - # State repository - coordinator: AmpliPiCoordinator - # The amplipi-side id _id: int @@ -131,7 +131,7 @@ class AmpliPiMediaPlayer(MediaPlayerEntity, CoordinatorEntity): # Home assistant particulars that are populated at entity instantiation via hass _vendor: str _version: str - _client: AmpliPi + _data_client: AmpliPiDataClient _domain: str _image_base_path: str # Where the album art metadata is stored on home assistant @@ -156,8 +156,8 @@ class AmpliPiMediaPlayer(MediaPlayerEntity, CoordinatorEntity): def get_entry_by_value(self, value: str) -> Union[Source, Zone, Group, Stream, None]: """Find what dict within the state array has a given value and return said dict""" - if self.coordinator.data is not None: - for category in (self.coordinator.data.sources, self.coordinator.data.zones, self.coordinator.data.groups, self.coordinator.data.streams): + if self._data_client.status is not None: + for category in (self._data_client.status.sources, self._data_client.status.zones, self._data_client.status.groups, self._data_client.status.streams): for entry in category: if value in entry.model_dump().values(): return entry @@ -183,11 +183,11 @@ def extract_amplipi_id_from_unique_id(self, uid: str) -> Optional[int]: def available_streams(self, source: Source): """Returns the available streams (generally all of them minus three of the four RCAs) relative to the provided source""" streams: List[str] = ['None'] - if self.coordinator.data is not None: + if self._data_client.status is not None: # Excludes every RCA except for the one related to the given source RCAs = [996, 997, 998, 999] rca_selectable = RCAs[source.id] - stream_entries = self.coordinator.data.streams + stream_entries = self._data_client.status.streams if stream_entries: for entry in stream_entries: amplipi_id = self.extract_amplipi_id_from_unique_id(entry.unique_id) @@ -205,7 +205,7 @@ async def async_connect_stream_to_source(self, stream: Stream, source: Optional[ if self._current_source is not None and source is not None and source.id != self._current_source.id: raise Exception("RCA streams can only connect to sources with the same ID") - state = self.coordinator.data + state = self._data_client.status source = state.sources[get_fixed_source_id(stream)] # It would be cleaner to do the following, but pyamplipi doesn't support RCA stream's index value atm: # source = state.sources[self._stream.index] @@ -223,7 +223,7 @@ async def async_connect_stream_to_source(self, stream: Stream, source: Optional[ raise Exception("All sources are in use, disconnect a source or select one to override and try again.") if source_id is not None: - await self._client.set_source( + await self._data_client.set_source( source_id, SourceUpdate( input=f'stream={stream.id}' @@ -234,7 +234,7 @@ async def async_connect_stream_to_source(self, stream: Stream, source: Optional[ async def async_connect_zones_to_source(self, source: Source, zones: Optional[List[int]], groups: Optional[List[int]]): """Connects zones and/or groups to the provided source""" if source is not None: - await self._client.set_zones( + await self._data_client.set_zones( MultiZoneUpdate( zones=zones, groups=groups, @@ -246,13 +246,13 @@ async def async_connect_zones_to_source(self, source: Source, zones: Optional[Li async def async_connect_zones_to_stream(self, stream: Stream, zones: Optional[List[int]], groups: Optional[List[int]]): """Connects zones and/or groups to the source of the selected stream. If stream does not have a source, select one""" - state = self.coordinator.data - source_id = next((s.id for s in state.sources if s.input == f"stream={stream.id}"), None) + status = self._data_client.status + source_id = next((s.id for s in status.sources if s.input == f"stream={stream.id}"), None) if source_id is None: source_id = await self.async_connect_stream_to_source(stream) if source_id is not None: - await self.async_connect_zones_to_source(state.sources[source_id], zones, groups) + await self.async_connect_zones_to_source(status.sources[source_id], zones, groups) def build_url(self, img_url): @@ -292,7 +292,7 @@ def get_song_info(self, source): async def find_source(self) -> Source: """Find first available source and return it. If no sources are available, returns None.""" - sources = await self._client.get_sources() + sources = await self._data_client.get_sources() for source in sources: if source.input in ['', 'None', None]: return source @@ -300,9 +300,9 @@ async def find_source(self) -> Source: async def swap_source(self, old_source: int, new_source: Optional[int] = None): """Moves a stream from one source to another, ensuring all zones follow. Generally only used for RCA streams, but able to be used by anyone.""" - state = self.coordinator.data + status = self._data_client.status - moved_stream: Stream = next(filter(lambda s: state.sources[old_source].input == f"stream={s.id}", state.streams), None) + moved_stream: Stream = next(filter(lambda s: status.sources[old_source].input == f"stream={s.id}", status.streams), None) if moved_stream is not None and moved_stream.type != "rca": # RCA streams each have an associated source to output them due to hardware constraints if new_source is None: @@ -311,15 +311,15 @@ async def swap_source(self, old_source: int, new_source: Optional[int] = None): new_source = source.id if new_source is not None: - await self._client.set_source( + await self._data_client.set_source( new_source, SourceUpdate( input=f'stream={moved_stream.id}' ) ) - moved_zones = [z.id for z in state.zones if z.source_id == old_source] - await self._client.set_zones( + moved_zones = [z.id for z in status.zones if z.source_id == old_source] + await self._data_client.set_zones( MultiZoneUpdate( zones=moved_zones, update=ZoneUpdate( @@ -348,23 +348,23 @@ async def async_volume_down(self): async def async_media_play(self): if self._current_stream is not None: - await self._client.play_stream(self._current_stream.id) + await self._data_client.play_stream(self._current_stream.id) async def async_media_stop(self): if self._current_stream is not None: - await self._client.stop_stream(self._current_stream.id) + await self._data_client.stop_stream(self._current_stream.id) async def async_media_pause(self): if self._current_stream is not None: - await self._client.pause_stream(self._current_stream.id) + await self._data_client.pause_stream(self._current_stream.id) async def async_media_previous_track(self): if self._current_stream is not None: - await self._client.previous_stream(self._current_stream.id) + await self._data_client.previous_stream(self._current_stream.id) async def async_media_next_track(self): if self._current_stream is not None: - await self._client.next_stream(self._current_stream.id) + await self._data_client.next_stream(self._current_stream.id) @property def available(self): @@ -413,9 +413,9 @@ def supported_features(self): class AmpliPiSource(AmpliPiMediaPlayer): """Representation of an AmpliPi Source Input, of which 4 are supported (Hard Coded).""" - def __init__(self, coordinator: AmpliPiCoordinator, namespace: str, source: Source, streams: List[Stream], vendor: str, version: str, - image_base_path: str, client: AmpliPi): - super().__init__(coordinator) + def __init__(self, namespace: str, source: Source, streams: List[Stream], vendor: str, version: str, + image_base_path: str, client: AmpliPiDataClient): + super().__init__(client) self._streams: List[Stream] = streams self._source = source @@ -429,7 +429,7 @@ def __init__(self, coordinator: AmpliPiCoordinator, namespace: str, source: Sour # Aliased so that AmpliPiMediaPlayer functions know to use the same source while also using a variable name that doesn't imply that the source can change within a source entity self._current_source = self._source - self._client = client + self._data_client = client self._name = source.original_name self._unique_id = source.unique_id @@ -451,7 +451,7 @@ async def async_turn_on(self): async def async_turn_off(self): if self._source is not None: _LOGGER.info(f"Turning source {self._name} off, disconnecting all zones and streams") - await self._client.set_source( + await self._data_client.set_source( self._id, SourceUpdate( input='None' @@ -529,7 +529,7 @@ async def async_play_media(self, media_type, media_id, **kwargs): _LOGGER.info(f'Playing media source: {play_item} {media_id}') media_id = async_process_play_media_url(self.hass, media_id) - await self._client.play_media( + await self._data_client.play_media( PlayMedia( source_id=self._source.id, media=media_id @@ -539,14 +539,14 @@ async def async_play_media(self, media_type, media_id, **kwargs): async def async_select_source(self, source): if self._source is not None and self._source.name == source: - await self._client.set_source( + await self._data_client.set_source( self._id, SourceUpdate( input=f"stream={get_fixed_source_id(self._source)}" ) ) elif source == 'None': - await self._client.set_source( + await self._data_client.set_source( self._id, SourceUpdate( input='None' @@ -557,7 +557,7 @@ async def async_select_source(self, source): stream_hacs_entity = self.get_entry_by_value(source) stream_id = self.extract_amplipi_id_from_unique_id(stream_hacs_entity.unique_id) if stream_id is not None: - await self._client.set_source( + await self._data_client.set_source( self._id, SourceUpdate( input=f'stream={stream_id}' @@ -609,11 +609,11 @@ def device_info(self) -> DeviceInfo: def sync_state(self): """Retrieve latest state.""" _LOGGER.info(f'Retrieving state for source {self._source.id}') - state = self.coordinator.data - if state is not None: + if self._data_client.status is not None: + status = self._data_client.status try: - source = next(filter(lambda z: z.id == self._source.id, state.sources), None) - streams = state.streams + source = next(filter(lambda z: z.id == self._source.id, status.sources), None) + streams = status.streams except Exception: self._last_update_successful = False _LOGGER.error(f'Could not update source {self._source.id}') @@ -631,8 +631,8 @@ def sync_state(self): stream_id = int(self._source.input.split('=')[1]) self._current_stream = next(filter(lambda s: s.id == stream_id, self._streams), None) - self._zones = list(filter(lambda z: z.source_id == self._source.id, state.zones)) - self._groups = list(filter(lambda z: z.source_id == self._source.id, state.groups)) + self._zones = list(filter(lambda z: z.source_id == self._source.id, status.zones)) + self._groups = list(filter(lambda z: z.source_id == self._source.id, status.groups)) self.get_song_info(self._source) self._last_update_successful = True @@ -706,18 +706,18 @@ def source_list(self): return self.available_streams(self._source) async def _update_source(self, update: SourceUpdate): - await self._client.set_source(self._source.id, update) + await self._data_client.set_source(self._source.id, update) async def _update_zones(self, update: MultiZoneUpdate): - # zones = await self._client.get_zones() + # zones = await self._data_client.get_zones() # associated_zones = filter(lambda z: z.source_id == self._source.id, zones) - await self._client.set_zones(update) + await self._data_client.set_zones(update) async def _update_groups(self, update: GroupUpdate): - groups = await self._client.get_groups() + groups = await self._data_client.get_groups() associated_groups = filter(lambda g: g.source_id == self._source.id, groups) for group in associated_groups: - await self._client.set_group(group.id, update) + await self._data_client.set_group(group.id, update) @property def extra_state_attributes(self): @@ -732,11 +732,11 @@ class AmpliPiZone(AmpliPiMediaPlayer): and mute controls and the ability to change the current 'source' a zone is tied to""" - def __init__(self, coordinator: AmpliPiCoordinator, namespace: str, zone: Zone, group: Group, + def __init__(self, namespace: str, zone: Zone, group: Group, streams: List[Stream], sources: List[Source], vendor: str, version: str, image_base_path: str, - client: AmpliPi): - super().__init__(coordinator) + client: AmpliPiDataClient): + super().__init__(client) self._sources = sources self._split_group: bool = False self._domain = namespace @@ -761,7 +761,7 @@ def __init__(self, coordinator: AmpliPiCoordinator, namespace: str, zone: Zone, self._vendor = vendor self._version = version self._enabled = False - self._client = client + self._data_client = client self._attr_source_list = [ 'None', 'Source 1', @@ -890,28 +890,28 @@ def device_info(self) -> DeviceInfo: def sync_state(self): """Retrieve latest state.""" _LOGGER.info(f'Retrieving state for source {self._id}') - state = self.coordinator.data - if state is not None: + if self._data_client.status is not None: + status = self._data_client.status zone = None group = None enabled = False try: if self._group is not None: - group: Group = next(filter(lambda z: z.id == self._id, state.groups), None) + group: Group = next(filter(lambda z: z.id == self._id, status.groups), None) if not group: self._last_update_successful = False return - any_enabled_zone = next(filter(lambda z: z.id in group.zones, state.zones), None) + any_enabled_zone = next(filter(lambda z: z.id in group.zones, status.zones), None) if any_enabled_zone is not None: enabled = True - connected_sources = [state.zones[zone_index].source_id for zone_index in group.zones] + connected_sources = [status.zones[zone_index].source_id for zone_index in group.zones] # Is every zone connected to the same source? self._split_group = len(set(connected_sources)) != 1 else: - zone = next(filter(lambda z: z.id == self._id, state.zones), None) + zone = next(filter(lambda z: z.id == self._id, status.zones), None) if not zone: self._last_update_successful = False return @@ -925,7 +925,7 @@ def sync_state(self): zone_ids = [] for zone_id in self._group.zones: - for state_zone in state.zones: + for state_zone in status.zones: if state_zone.id == zone_id and not state_zone.disabled: zone_ids.append(zone_id) self._extra_attributes = {"amplipi_zones" : zone_ids} @@ -938,7 +938,7 @@ def sync_state(self): if self._group is not None: for zone_id in self._group.zones: - for state_zone in state.zones: + for state_zone in status.zones: if state_zone.id == zone_id and not state_zone.disabled: self._available = True self._available = False @@ -948,19 +948,19 @@ def sync_state(self): self._zone = zone self._group = group - self._streams = state.streams - self._sources = state.sources + self._streams = status.streams + self._sources = status.sources self._last_update_successful = True self._enabled = enabled self._current_source = None # When a zone is off it connects to source_id -2, groups also yield the source_id that all requisite zones are already connected to if self._group is not None: - self._current_source = next(filter(lambda s: self._group.source_id == s.id, state.sources), None) + self._current_source = next(filter(lambda s: self._group.source_id == s.id, status.sources), None) self._is_off = self._group.source_id == -2 elif self._zone.source_id is not None: - self._current_source = next(filter(lambda s: self._zone.source_id == s.id, state.sources), None) + self._current_source = next(filter(lambda s: self._zone.source_id == s.id, status.sources), None) self._is_off = self._zone.source_id == -2 if self._current_source is not None and 'stream=' in self._current_source.input and 'stream=local' not in self._current_source.input: @@ -1041,10 +1041,10 @@ async def async_select_source(self, source: str): await self.async_connect_zones_to_source(*args) async def _update_zone(self, update: ZoneUpdate): - await self._client.set_zone(self._id, update) + await self._data_client.set_zone(self._id, update) async def _update_group(self, update: MultiZoneUpdate): - await self._client.set_zones(update) + await self._data_client.set_zones(update) @property def source_list(self): @@ -1076,7 +1076,7 @@ async def async_play_media(self, media_type, media_id, **kwargs): #No source, see if we can find an empty one if self._current_source is None: - sources = await self._client.get_sources() + sources = await self._data_client.get_sources() for source in sources: if source is not None and source.input in ['', 'None', None]: self._current_source = source @@ -1086,7 +1086,7 @@ async def async_play_media(self, media_type, media_id, **kwargs): media_id = async_process_play_media_url(self.hass, media_id) - await self._client.play_media( + await self._data_client.play_media( PlayMedia( source_id=self._current_source.id, media=media_id, @@ -1100,11 +1100,11 @@ def extra_state_attributes(self): async def _get_extra_attributes(self): if self._group is not None: - state = self.coordinator.data + status = self._data_client.status zone_ids = [] for zone_id in self._group.zones: - for state_zone in state.zones: + for state_zone in status.zones: if state_zone.id == zone_id and not state_zone.disabled: zone_ids.append(zone_id) self._extra_attributes = {"amplipi_zones" : zone_ids} @@ -1115,10 +1115,10 @@ async def _get_extra_attributes(self): self._extra_attributes = {"amplipi_zone_id" : self._zone.id} async def _update_available(self): - state = self.coordinator.data + status = self._data_client.status if self._group is not None: for zone_id in self._group.zones: - for state_zone in state.zones: + for state_zone in status.zones: if state_zone.id == zone_id and not state_zone.disabled: return True return False @@ -1143,7 +1143,7 @@ def __init__(self, namespace: str, self._vendor = vendor self._version = version self._enabled = True - self._client = client + self._data_client = client self._last_update_successful = True self._available = True self._extra_attributes: List = [] @@ -1214,12 +1214,12 @@ async def async_browse_media(self, media_content_type=None, media_content_id=Non async def async_play_media(self, media_type, media_id, **kwargs): _LOGGER.debug(f'Play Media {media_type} {media_id} {kwargs}') if media_source.is_media_source_id(media_id): - play_item = await media_source.async_resolve_media(self.hass, media_id) + play_item = await media_source.async_resolve_media(self.hass, media_id, self.entity_id) media_id = play_item.url _LOGGER.info(f'Playing media source: {play_item} {media_id}') media_id = async_process_play_media_url(self.hass, media_id) - await self._client.announce( + await self._data_client.announce( Announcement( media=media_id, vol_f=self._volume @@ -1239,11 +1239,11 @@ class AmpliPiStream(AmpliPiMediaPlayer): and mute controls and the ability to change the current 'source' a stream is tied to""" - def __init__(self, coordinator: AmpliPiCoordinator, namespace: str, stream: Stream, + def __init__(self, namespace: str, stream: Stream, sources: List[Source], vendor: str, version: str, image_base_path: str, - client: AmpliPi): - super().__init__(coordinator) + client: AmpliPiDataClient): + super().__init__(client) self._stream: Stream = stream self._current_stream = self._stream # Make an alias for use with AmpliPiMediaPlayer functions while keeping local verbiage more correct self._current_source = None @@ -1262,7 +1262,7 @@ def __init__(self, coordinator: AmpliPiCoordinator, namespace: str, stream: Stre self._image_base_path = image_base_path self._vendor = vendor self._version = version - self._client = client + self._data_client = client self._attr_source_list = [ 'None', 'Any', @@ -1282,7 +1282,7 @@ async def _update_zones(self, update: ZoneUpdate): zones=[z.id for z in self._current_zones], update=update ) - await self._client.set_zones(multi_update) + await self._data_client.set_zones(multi_update) async def async_toggle(self): @@ -1310,7 +1310,7 @@ async def async_turn_off(self): source_id=-1, ) ) - await self._client.set_source( + await self._data_client.set_source( self._current_source.id, SourceUpdate( input='None' @@ -1373,18 +1373,18 @@ def device_info(self) -> DeviceInfo: def sync_state(self): """Retrieve latest state.""" _LOGGER.info(f'Retrieving state for stream {self._id}') - state = self.coordinator.data - if state is not None: + if self._data_client.status is not None: + status = self._data_client.status groups = [] zones = [] try: - stream = next(filter(lambda s: s.id == self._id, state.streams), None) + stream = next(filter(lambda s: s.id == self._id, status.streams), None) if stream is not None: - current_source = next((s for s in state.sources if s.input == f"stream={stream.id}"), None) + current_source = next((s for s in status.sources if s.input == f"stream={stream.id}"), None) if current_source is not None: - groups = [group for group in state.groups if group.source_id == current_source.id] - zones = [zone for zone in state.zones if zone.source_id == current_source.id] + groups = [group for group in status.groups if group.source_id == current_source.id] + zones = [zone for zone in status.zones if zone.source_id == current_source.id] else: self._last_update_successful = False return @@ -1402,7 +1402,7 @@ def sync_state(self): self._available = self._stream is not None self._stream = stream - self._sources = state.sources + self._sources = status.sources self._current_source = current_source if current_source: # Cannot be off while connected, but can be on while disconnected self._is_off = False @@ -1461,7 +1461,7 @@ async def async_select_source(self, source: Optional[str] = None): # As such, this info must be sorted and then sent down the proper logical path if source: if source == "None" and self._current_source is not None: - await self._client.set_source( + await self._data_client.set_source( self._current_source.id, SourceUpdate( input='None' @@ -1495,4 +1495,3 @@ async def _update_available(self): if self._stream is None: return False return True - \ No newline at end of file diff --git a/custom_components/amplipi/update.py b/custom_components/amplipi/update.py new file mode 100644 index 0000000..aa7dcd5 --- /dev/null +++ b/custom_components/amplipi/update.py @@ -0,0 +1,165 @@ +from homeassistant.components.update import UpdateEntity, UpdateEntityFeature +from homeassistant.helpers.update_coordinator import CoordinatorEntity +from typing import Any +import logging +import aiohttp +import json +from .coordinator import AmpliPiDataClient + +from .const import DOMAIN, UPDATER_URL, AMPLIPI_OBJECT + +_LOGGER = logging.getLogger(__name__) + +async def async_setup_entry(hass, config_entry, async_add_entities): + """Set up the AmpliPi MultiZone Audio Controller""" + hass_entry = hass.data[DOMAIN][config_entry.entry_id] + updater_url = hass_entry[UPDATER_URL] + + data_coordinator = hass_entry[AMPLIPI_OBJECT] + update: list[UpdateEntity] = [ + AmpliPiUpdate(coordinator=data_coordinator, updater_url=updater_url) + ] + + async_add_entities(update) + +class AmpliPiUpdate(CoordinatorEntity, UpdateEntity): + + def __init__(self, coordinator: AmpliPiDataClient, updater_url): + super().__init__(coordinator) + + self.title = "AmpliPi" + self._LOGGER = _LOGGER + self._updater_url = updater_url + self.entity_id = "update.amplipi_system_update" + self.domain = "update" + self._release_summary = None + self._release_url = None + + @property + def installed_version(self): + if self.coordinator.status is not None: + return getattr(self.coordinator.status.info, "version", None) + return None + + @property + def latest_version(self): + if self.coordinator.status is not None: + return getattr(self.coordinator.status.info, "latest_release", None) + return None + + @property + def release_url(self): + if self.installed_version != self.latest_version: + update_info = self.get_update_info() + if update_info is not None: + self._release_url = update_info["html_url"] + return self._release_url + + @property + def release_summary(self): + if self.installed_version != self.latest_version: + update_info = self.get_update_info() + if update_info is not None: + self._release_summary = update_info["body"] + return self._release_summary + + @property + def should_poll(self): + """Polling needed.""" + return True + + @property + def available(self): + """Is the entity able to be used by the user? Should always return True so long as the entity is loaded.""" + return True + + @property + def name(self) -> str: + return "AmpliPi System Update" + + @property + def unique_id(self) -> str: + return "amplipi_system_update" + + @property + def supported_features(self): + return UpdateEntityFeature.INSTALL | UpdateEntityFeature.RELEASE_NOTES # TODO: Support UpdateEntityFeature.SPECIFIC_VERSION + + def get_update_info(self, update: str = "latest"): + if self.coordinator.data is not None: + if update == "latest": + return self.coordinator.latest + else: + for release in self.coordinator.releases: + if release["name"] == update: + return release + return None + + async def async_install(self, version: str = "latest", backup: bool = False, **kwargs: Any): + async def watch_and_restart(): + async with aiohttp.ClientSession() as session: + async with session.get(f"{self._updater_url}/update/install/progress") as resp: + async for line in resp.content: + # Example lines: + # b'data: {"message": "starting installation", "type": "info"}\r\n' + # b'\r\n' + # b'data: {"message": "Got amplipi release: micro-nova-AmpliPi-ffcec58", "type": "info"}\r\n' + # b'\r\n' + # b'event: ping\r\n' + # b'data: 2025-06-19 14:28:19.371839\r\n' + previous_line_message = "first line" + decoded_line = line.decode("utf-8") + if "message" in decoded_line: + data_json = decoded_line[len("data: "):].strip() + data = json.loads(data_json) + + if data["message"] != " ": + if data["type"] == "info": + # It might seem odd to make something literally labeled info be printed as not info level, but this is very loud + self._LOGGER.debug(data["message"]) + elif data["type"] == "warning": + self._LOGGER.warning(data["message"]) + elif data["type"] == "failure": + self._LOGGER.error(data["message"]) + else: + self._LOGGER.info(data["message"]) + + if data["type"] in ("success", "failed"): + if data["type"] == "success": + session.post(f"{self._updater_url}/update/restart", timeout=aiohttp.ClientTimeout(total=1000)) + else: + raise Exception(f"Update failed on: {previous_line_message}") + break + previous_line_message = data["message"] + + self.in_progress = True + try: + info = self.get_update_info(version) + + async with aiohttp.ClientSession() as session: + async with session.post( + f"{self._updater_url}/update/download", + json={'url': info['tarball_url'], 'version': info['name']}, + timeout=aiohttp.ClientTimeout(total=1000) + ) as download_resp: + if download_resp.status != 200: + raise Exception("AmpliPi System Update download failed") + + async with session.get( + f"{self._updater_url}/update/install", + timeout=aiohttp.ClientTimeout(total=1000) + ) as install_resp: + if install_resp.status != 200: + raise Exception("AmpliPi System Update installation failed") + + await watch_and_restart() + + except Exception as e: + self._LOGGER.error(f"Update failed due to error: {e}") + finally: + self.in_progress = False + await self.coordinator.async_request_refresh() + + async def async_release_notes(self): + return self.release_summary + \ No newline at end of file