diff --git a/CHANGELOG.md b/CHANGELOG.md index 0d578d6a4..17d03ac70 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ * System * Upgraded volume calculations to preserve relative positions when hitting the min or max setting via source volume bar * Update our spotify provider `go-librespot` to `0.7.3` + * Added volume matching between AmpliPi and Spotify and vice-versa # 0.4.11 * System diff --git a/amplipi/streams/__init__.py b/amplipi/streams/__init__.py index f094c9bd8..d141999a6 100644 --- a/amplipi/streams/__init__.py +++ b/amplipi/streams/__init__.py @@ -65,7 +65,7 @@ def build_stream(stream: models.Stream, mock: bool = False, validate: bool = True) -> AnyStream: """ Build a stream from the generic arguments given in stream, discriminated by stream.type - we are waiting on Pydantic's implemenatation of discriminators to fully integrate streams into our model definitions + we are waiting on Pydantic's implementation of discriminators to fully integrate streams into our model definitions """ # pylint: disable=too-many-return-statements args = stream.dict(exclude_none=True) diff --git a/amplipi/streams/base_streams.py b/amplipi/streams/base_streams.py index b8645d237..e158204b5 100644 --- a/amplipi/streams/base_streams.py +++ b/amplipi/streams/base_streams.py @@ -5,6 +5,7 @@ import logging from amplipi import models from amplipi import utils +from amplipi import app logger = logging.getLogger(__name__) logger.level = logging.DEBUG @@ -62,6 +63,24 @@ def __init__(self, stype: str, name: str, only_src=None, disabled: bool = False, if validate: self.validate_stream(name=name, mock=mock, **kwargs) + def get_zone_data(self): + if self.src is not None: + ctrl = app.get_ctrl() + state = ctrl.get_state() + return [zone for zone in state.zones if zone.source_id == self.src] + + @property + def connected_zones(self) -> List[int]: + connected_zones = self.get_zone_data() + return [zone.id for zone in connected_zones] + + @property + def volume(self) -> float: + connected_zones = self.get_zone_data() + if connected_zones: + return sum([zone.vol_f for zone in connected_zones]) / len(connected_zones) + return 0 + def __del__(self): self.disconnect() @@ -242,7 +261,7 @@ def deactivate(self): raise Exception(f'Failed to deactivate {self.name}: {e}') from e finally: self.state = "disconnected" # make this look like a normal stream for now - if 'vsrc' in self.__dir__() and self.vsrc: + if 'vsrc' in self.__dir__() and self.vsrc is not None: vsrc = self.vsrc self.vsrc = None vsources.free(vsrc) diff --git a/amplipi/streams/spotify_connect.py b/amplipi/streams/spotify_connect.py index 34e8b105d..460d8ed12 100644 --- a/amplipi/streams/spotify_connect.py +++ b/amplipi/streams/spotify_connect.py @@ -2,19 +2,27 @@ import io import os +import threading import re import sys import subprocess import time from typing import ClassVar, Optional import yaml +import logging +import json from amplipi import models, utils -from .base_streams import PersistentStream, InvalidStreamField, logger +from .base_streams import PersistentStream, InvalidStreamField from .. import tasks # Our subprocesses run behind the scenes, is there a more standard way to do this? # pylint: disable=consider-using-with +logger = logging.getLogger(__name__) +logger.level = logging.DEBUG +sh = logging.StreamHandler(sys.stdout) +logger.addHandler(sh) + class SpotifyConnect(PersistentStream): """ A SpotifyConnect Stream based off librespot-go """ @@ -33,9 +41,30 @@ def __init__(self, name: str, disabled: bool = False, mock: bool = False, valida self._log_file: Optional[io.TextIOBase] = None self._api_port: int self.proc2: Optional[subprocess.Popen] = None + self.volume_sync_process: Optional[subprocess.Popen] = None # Runs the actual vol sync script + self.volume_watcher_process: Optional[threading.Thread] = None # Populates the fifo that the vol sync script depends on + self.src_config_folder: Optional[str] = None self.meta_file: str = '' - self.max_volume: int = 100 # default configuration from 'volume_steps' - self.last_volume: float = 0 + self._volume_fifo = None + + def watch_vol(self): + """Creates and supplies a FIFO with volume data for volume sync""" + while True: + try: + if self.src is not None: + if self._volume_fifo is None and self.src_config_folder is not None: + fifo_path = f"{self.src_config_folder}/vol" + if not os.path.isfile(fifo_path): + os.mkfifo(fifo_path) + self._volume_fifo = os.open(fifo_path, os.O_WRONLY, os.O_NONBLOCK) + data = json.dumps({ + 'zones': self.connected_zones, + 'volume': self.volume, + }) + os.write(self._volume_fifo, bytearray(f"{data}\r\n", encoding="utf8")) + except Exception as e: + logger.error(f"{self.name} volume thread ran into exception: {e}") + time.sleep(0.1) def reconfig(self, **kwargs): self.validate_stream(**kwargs) @@ -52,9 +81,9 @@ def _activate(self, vsrc: int): """ Connect to a given audio source """ - src_config_folder = f'{utils.get_folder("config")}/srcs/v{vsrc}' + self.src_config_folder = f'{utils.get_folder("config")}/srcs/v{vsrc}' try: - os.remove(f'{src_config_folder}/currentSong') + os.remove(f'{self.src_config_folder}/currentSong') except FileNotFoundError: pass self._connect_time = time.time() @@ -78,16 +107,16 @@ def _activate(self, vsrc: int): } # make all of the necessary dir(s) & files - os.makedirs(src_config_folder, exist_ok=True) + os.makedirs(self.src_config_folder, exist_ok=True) - config_file = f'{src_config_folder}/config.yml' + config_file = f'{self.src_config_folder}/config.yml' with open(config_file, 'w', encoding='utf8') as f: f.write(yaml.dump(config)) - self.meta_file = f'{src_config_folder}/metadata.json' + self.meta_file = f'{self.src_config_folder}/metadata.json' - self._log_file = open(f'{src_config_folder}/log', mode='w', encoding='utf8') - player_args = f"{utils.get_folder('streams')}/go-librespot --config_dir {src_config_folder}".split(' ') + self._log_file = open(f'{self.src_config_folder}/log', mode='w', encoding='utf8') + player_args = f"{utils.get_folder('streams')}/go-librespot --config_dir {self.src_config_folder}".split(' ') logger.debug(f'spotify player args: {player_args}') self.proc = subprocess.Popen(args=player_args, stdin=subprocess.PIPE, @@ -99,20 +128,45 @@ def _activate(self, vsrc: int): logger.info(f'{self.name}: starting metadata reader: {meta_args}') self.proc2 = subprocess.Popen(args=meta_args, stdout=self._log_file, stderr=self._log_file) + vol_sync = f"{utils.get_folder('streams')}/spotify_volume_handler.py" + vol_args = [sys.executable, vol_sync, str(self._api_port), self.src_config_folder, "--debug"] + logger.info(f'{self.name}: starting vol synchronizer: {vol_args}') + self.volume_sync_process = subprocess.Popen(args=vol_args, stdout=self._log_file, stderr=self._log_file) + + self.volume_watcher_process = threading.Thread(target=self.watch_vol, daemon=True) + self.volume_watcher_process.start() + def _deactivate(self): if self._is_running(): self.proc.stdin.close() logger.info(f'{self.name}: stopping player') + + # Call terminate on all processes self.proc.terminate() self.proc2.terminate() + if self.volume_sync_process: + self.volume_sync_process.terminate() + + # Ensure the processes have closed, by force if necessary if self.proc.wait(1) != 0: logger.info(f'{self.name}: killing player') self.proc.kill() + if self.proc2.wait(1) != 0: logger.info(f'{self.name}: killing metadata reader') self.proc2.kill() + + if self.volume_sync_process: + if self.volume_sync_process.wait(1) != 0: + logger.info(f'{self.name}: killing volume synchronizer') + self.volume_sync_process.kill() + + # Validate on the way out self.proc.communicate() self.proc2.communicate() + if self.volume_sync_process: + self.volume_sync_process.communicate() + if self.proc and self._log_file: # prevent checking _log_file when it may not exist, thanks validation! self._log_file.close() if self.src: @@ -121,8 +175,12 @@ def _deactivate(self): except Exception as e: logger.exception(f'{self.name}: Error removing config files: {e}') self._disconnect() + self.proc = None self.proc2 = None + self.volume_sync_process = None + self.volume_watcher_process = None + self._volume_fifo = None def info(self) -> models.SourceInfo: source = models.SourceInfo( @@ -190,10 +248,3 @@ def validate_stream(self, **kwargs): NAME = r"[a-zA-Z0-9][A-Za-z0-9\- ]*[a-zA-Z0-9]" if 'name' in kwargs and not re.fullmatch(NAME, kwargs['name']): raise InvalidStreamField("name", "Invalid stream name") - - def sync_volume(self, volume: float) -> None: - """ Set the volume of amplipi to the Spotify Connect stream""" - if volume != self.last_volume: - url = f"http://localhost:{self._api_port}/" - self.last_volume = volume # update last_volume for future syncs - tasks.post.delay(url + 'volume', data={'volume': int(volume * self.max_volume)}) diff --git a/streams/spotify_volume_handler.py b/streams/spotify_volume_handler.py new file mode 100644 index 000000000..4a0aa89fb --- /dev/null +++ b/streams/spotify_volume_handler.py @@ -0,0 +1,79 @@ +"""Script for synchronizing AmpliPi and Spotify volumes""" +import argparse +import json +import logging +import sys + +import websockets +import requests + +from volume_synchronizer import VolSyncDispatcher, StreamWatcher, VolEvents +from spot_connect_meta import Event + + +class SpotifyWatcher(StreamWatcher): + """A class that watches and tracks changes to spotify-side volume""" + + def __init__(self, api_port: int): + super().__init__() + + self.api_port: int = api_port + """What port is go-librespot running on? Typically set to 3678 + vsrc.""" + + async def watch_vol(self): + """Watch the go-librespot websocket endpoint for volume change events and update AmpliPi volume info accordingly""" + try: + # pylint: disable=E1101 + # E1101: Module 'websockets' has no 'connect' member (no-member) + async with websockets.connect(f"ws://localhost:{self.api_port}/events", open_timeout=5) as websocket: + while True: + try: + msg = await websocket.recv() + event = Event.from_json(json.loads(msg)) + if event.event_type == "volume": + last_volume = float(self.volume) if self.volume is not None else None + self.volume = event.data.value / 100 # Translate spotify volume (0 - 100) to amplipi volume (0 - 1) + + self.logger.debug(f"Spotify volume changed from {last_volume} to {self.volume}") + if last_volume is not None and self.volume != last_volume: + self.schedule_event(VolEvents.CHANGE_AMPLIPI) + elif event.event_type == "will_play" and self.volume is None: + self.schedule_event(VolEvents.CHANGE_STREAM) # Intercept the event that occurs when a song starts playing and use that as a trigger for the initial state sync + + except Exception as e: + self.logger.exception(f"Error: {e}") + return + except Exception as e: + self.logger.exception(f"Error: {e}") + return + + def set_vol(self, new_vol: float, vol_set_point: float) -> float: + """Update Spotify's volume slider""" + try: + if new_vol is None: + return vol_set_point + + if abs(new_vol - vol_set_point) <= 0.005 and self.volume is not None: + self.logger.debug("Ignored minor AmpliPi -> Spotify change") + return vol_set_point + + url = f"http://localhost:{self.api_port}/player/volume" + spot_vol = int(new_vol * 100) + self.logger.debug(f"Setting Spotify volume to {new_vol} from {self.volume}") + requests.post(url, json={"volume": spot_vol}, timeout=5) + return new_vol + except Exception as e: + self.logger.exception(f"Exception: {e}") + + +if __name__ == "__main__": + + parser = argparse.ArgumentParser(description="Read metadata from a given URL and write it to a file.") + + parser.add_argument("port", help="port that go-librespot is running on", type=int) + parser.add_argument("config_dir", help="The directory of the vsrc config", type=str) + parser.add_argument("--debug", action="store_true", help="Change log level from WARNING to DEBUG") + + args = parser.parse_args() + + handler = VolSyncDispatcher(SpotifyWatcher(api_port=args.port), args.config_dir, args.debug) diff --git a/streams/volume_synchronizer.py b/streams/volume_synchronizer.py new file mode 100644 index 000000000..87a9fe93a --- /dev/null +++ b/streams/volume_synchronizer.py @@ -0,0 +1,208 @@ +"""Classes for synchronizing AmpliPi volume with the internal volume of a given stream""" +import json +import asyncio +import threading +import queue +import logging +import os +from typing import Callable, List, Optional +from enum import Enum +import requests + + +class VolEvents(Enum): + CHANGE_STREAM = "change_stream" + CHANGE_AMPLIPI = "change_amplipi" + + +class StreamWatcher: + """ + A class that is used as a blueprint for stream volume watchers + Child classes must provide the following functions. Both of these functions are automatically used by the VolSyncDispatcher, so there's no need to do anything with them: + + watch_vol: a function that contains a while True loop that collects the remote volume, sets self.volume, and calls self.schedule_event(VolEvents.CHANGE_AMPLIPI) when the volume changes + + set_vol: a function that takes the new_volume as well as the previous volume_set_point (both floats) and returns the new set point volume (either the previous set point if the change was unsuccessful or the newly recognized volume) + """ + + def __init__(self): + self.schedule_event: Callable[[VolEvents]] + """Event scheduler function provided by VolSyncDispatcher, has limited valid inputs that can be seen in the VolEvents enum""" + + self._volume: float = None + """Value between 0 and 1, or None if not yet initialized by the upstream""" + + self.logger: logging.Logger + """logging.Logger instance provided by VolSyncDispatcher""" + + self.thread: threading.Thread = threading.Thread(target=self.run_async_watch, daemon=True) + self.thread.start() + + @property + def volume(self) -> Optional[float]: + """Value between 0 and 1, or None if not yet initialized by the upstream""" + return self._volume + + @volume.setter + def volume(self, value: float) -> None: + if 0 > value or value > 1: + raise ValueError("Volume must be between 0 and 1") + self._volume = value + + def run_async_watch(self): + """Middleman function for creating an asyncio run inside of a new threading.Thread""" + asyncio.run(self.watch_vol()) + + async def watch_vol(self): + """A function to be implemented by child classes that must contain a while True loop and do self.schedule_event(VolEvents.CHANGE_AMPLIPI) when new_vol != old_vol""" + raise NotImplementedError("Function must be implemented by child classes") + + def set_vol(self, new_vol: float, vol_set_point: float) -> float: + """A function to be implemented by child classes to update the stream's volume and returns the new set point volume""" + raise NotImplementedError("Function must be implemented by child classes") + + +class AmpliPiWatcher: + """ + A class to watch changes to a streams vol fifo and change the volume of connected zones + Already fully handled by VolSyncDispatcher and should not be used by itself + """ + + def __init__(self, config_dir: str, schedule_event: Callable, logger: logging.Logger): + self.schedule_event: Callable[[VolEvents]] = schedule_event + """Event scheduler function provided by VolSyncDispatcher, has limited valid inputs that can be seen in the VolEvents enum""" + + self.logger: logging.Logger = logger + self.volume: float = None + self.config_dir: str = config_dir + + self.connected_zones: List[int] = [] + """List of zone ids, used to send volume change requests to these connected zones""" + + self.thread = threading.Thread(target=self.get_vol, daemon=True) + self.thread.start() + + def get_vol(self): + """ + Read the volume FIFO from .config/amplipi/srcs/v{vsrc}/vol to load the currently connected zones and the averaged volume of them + If the read volume is different than the previous volume, send a volume change event to the stream + """ + with open(f'{self.config_dir}/vol', 'r') as fifo: + while True: + data = json.loads(fifo.readline().strip()) + if self.volume != data["volume"]: + self.volume = data["volume"] + self.schedule_event(VolEvents.CHANGE_STREAM) + self.connected_zones = data["zones"] + + def set_vol(self, stream_volume: float, vol_set_point: float): + """Update AmpliPi's volume to match the stream volume""" + try: + if stream_volume is None: + return vol_set_point + + if abs(stream_volume - self.volume) <= 0.005: + self.logger.debug("Ignored minor Stream -> AmpliPi change") + return vol_set_point + + delta = float(stream_volume - self.volume) + expected_volume = self.volume + delta + self.logger.debug(f"Setting AmpliPi volume to {expected_volume} from {self.volume}") + requests.patch( + "http://localhost/api/zones", + json={ + "zones": self.connected_zones, + "update": {"vol_delta_f": delta, "mute": False}, + }, + timeout=5, + ) + return expected_volume + except Exception as e: + self.logger.exception(f"Exception: {e}") + + +class VolSyncDispatcher: + """ + Volume synchronizer for AmpliPi and another volume-providing stream. + + stream: A fully constructed instance of a class that extends StreamWatcher + + config_dir: the path to the .config/amplipi/srcs/v{vsrc} folder for this persistent stream + + debug: bool that decides whether log level is DEBUG (if True) or WARNING (if False). False by default. + + Example Usage: + + class SomeStreamWatcher(StreamWatcher): + __init__(**kwargs): + super().__init__() + ... + + async def get_vol(self) -> None: + ... + self.volume = new_volume + + def set_vol(self, new_vol: float, vol_set_point: float) -> float: + ... + return new_vol if change_successful else vol_set_point + + {build standard argparse flow here, containing args for your constructor as well as config_dir and --debug} + + handler = VolSyncDispatcher(SomeStreamWatcher(**kwargs), args.config_dir, args.debug) + """ + + # All you need to do to use this class is build a StreamWatcher extension and then follow the above example with a simple argparse flow, everything else is handled automatically + + def __init__(self, stream: StreamWatcher, config_dir: str, debug=False): + logfile = f"{config_dir}/vol_log" + if not os.path.exists(logfile): + with open(logfile, "w", encoding="utf-8") as f: + f.write("") + + self.logger = logging.getLogger(__name__) + self.logger.setLevel(logging.DEBUG if debug else logging.WARNING) + fh = logging.FileHandler(logfile) + self.logger.addHandler(fh) + + self.event_queue = queue.Queue() + self.amplipi = AmpliPiWatcher(config_dir, self.schedule_event, self.logger) + + self.stream: StreamWatcher = stream + + # Set these directly so children don't need to add them to their constructors + self.stream.logger = self.logger + self.stream.schedule_event = self.schedule_event + + self.vol_set_point = self.amplipi.volume + self.event_loop() + + def schedule_event(self, event_type: VolEvents): + """When an event occurs in a child, that child can use this callback function to schedule the response to said event in the event queue""" + self.event_queue.put(event_type) + + def event_loop(self): + """Watch for events coming from amplipi and the stream to then change the volume of the other""" + while True: + try: + if self.vol_set_point is None: + # While vol_set_point is set to amplipi vol by default, sometimes amplipi vol isn't read at initialization + # In that case, set the set point to AmpliPi volume and then set remote vol to match + self.vol_set_point = float(self.amplipi.volume) + if self.amplipi.volume is not None: + self.schedule_event(VolEvents.CHANGE_STREAM) + + event = self.event_queue.get() + if event == VolEvents.CHANGE_AMPLIPI: + self.logger.debug(f"Attempting to set AmpliPi volume to {self.stream.volume}") + self.vol_set_point = self.amplipi.set_vol(self.stream.volume, self.vol_set_point) + elif event == VolEvents.CHANGE_STREAM: + self.logger.debug(f"Attempting to set remote volume to {self.amplipi.volume}") + self.vol_set_point = self.stream.set_vol(self.amplipi.volume, self.vol_set_point) + except queue.Empty: + continue + except (KeyboardInterrupt, SystemExit): + self.logger.exception("Exiting...") + break + except Exception as e: + self.logger.exception(f"Exception: {e}") + continue diff --git a/web/src/pages/Home/Home.jsx b/web/src/pages/Home/Home.jsx index 0f28f48bc..b80b0a72b 100644 --- a/web/src/pages/Home/Home.jsx +++ b/web/src/pages/Home/Home.jsx @@ -119,7 +119,7 @@ const Home = () => { // on apply, we want to call onApply={async (customSourceId) => { const ret = await executeApplyAction(customSourceId); - if(ret.ok){ret.json().then(s => setSystemState(s))}; + if(ret.ok){ret.json().then(s => setSystemState(s));} }} onClose={() => setZonesModalOpen(false)} /> @@ -129,7 +129,7 @@ const Home = () => { sourceId={nextAvailableSource} onApply={async (customSourceId) => { const ret = await executeApplyAction(customSourceId); - if(ret.ok){ret.json().then(s => setSystemState(s))}; + if(ret.ok){ret.json().then(s => setSystemState(s));} }} onClose={() => setStreamerOutputModalOpen(false)} />