From fed42b138e0d14fc49ce3dd2a76840c4f1da9765 Mon Sep 17 00:00:00 2001 From: Ashley Camba Garrido Date: Mon, 17 Aug 2020 14:41:51 +0200 Subject: [PATCH 01/18] Add minimal working cli with plugin support --- mixtape/__init__.py | 21 ++++++++- mixtape/cli.py | 36 ++++++++++++++ mixtape/core.py | 33 +++++++++++++ mixtape/hookspecs.py | 110 +++++++++++++++++++++++++++++++++++++++++++ mixtape/players.py | 10 ++-- req-install.txt | 3 +- setup.py | 3 ++ 7 files changed, 210 insertions(+), 6 deletions(-) create mode 100644 mixtape/cli.py create mode 100644 mixtape/core.py create mode 100644 mixtape/hookspecs.py diff --git a/mixtape/__init__.py b/mixtape/__init__.py index 6bf7c45..f78e6c9 100644 --- a/mixtape/__init__.py +++ b/mixtape/__init__.py @@ -1,3 +1,20 @@ -from .players import AsyncPlayer +from .core import BoomBox -__all__ = ["AsyncPlayer"] +from typing import (Any, Callable, List, Mapping, MutableMapping, Optional, + Tuple, Type, TypeVar) + +import pluggy + +from . import hookspecs + +hookimpl = pluggy.HookimplMarker("mixtape") + +def load_plugin_manager(plugins=None): + """Init mixtape plugin manager""" + + if plugins is None: + plugins = [] + pm = pluggy.PluginManager(__name__) + pm.add_hookspecs(hookspecs) + pm.load_setuptools_entrypoints(group=__name__) + return pm \ No newline at end of file diff --git a/mixtape/cli.py b/mixtape/cli.py new file mode 100644 index 0000000..189b9b0 --- /dev/null +++ b/mixtape/cli.py @@ -0,0 +1,36 @@ +import click +import asyncio +import signal +import gi + +gi.require_version("Gst", "1.0") +from gi.repository import Gst + +from . import BoomBox, load_plugin_manager + +class PluggyCLI(click.Command): + """Click command that adds options from pluggy hooks""" + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.pm = load_plugin_manager() + # we add an option for every plugin to disable + for plugin in self.pm.get_plugins(): + name = self.pm.get_name(plugin) + option = click.Option([f'--{name}/--no-{name}'], default=True) + self.params.append(option) + +@click.command(cls=PluggyCLI) +@click.argument('description', nargs=-1, type=click.UNPROCESSED, required=True) +@click.pass_context +def play(ctx, description, **kwargs): + description = " ".join(description) + Gst.init(None) + async def main(description, pm): + pipeline = BoomBox.parse_description(description) + player = BoomBox(pipeline, pm) + player.setup() + await player.play_until_eos() + player.teardown() + asyncio.run(main(description, ctx.command.pm)) + diff --git a/mixtape/core.py b/mixtape/core.py new file mode 100644 index 0000000..cf138f9 --- /dev/null +++ b/mixtape/core.py @@ -0,0 +1,33 @@ +import attr +import pluggy + +from typing import Type, Optional, Any + +from .players import Player, PlayerType, Gst + +@attr.s +class BoomBox(Player): + "Boom boom" + pm: Type[pluggy.PluginManager] = attr.ib(repr=False) + + @property + def hook(self) -> Any: + """Convenience shortcut for pm hook""" + return self.pm.hook + + def setup(self) -> None: + super().setup() + self.hook.mixtape_setup(player=self) + + def tearddown(self) -> None: + self.hook.mixtape_teardown(player=self) + super().setup() + + async def set_state(self, state: Gst.State) -> Gst.StateChangeReturn: + self.hook.mixtape_before_state_changed(player=self, state=state) + ret = await super().set_state(state) + self.hook.mixtape_on_state_changed(player=self, state=state) + return ret + + + \ No newline at end of file diff --git a/mixtape/hookspecs.py b/mixtape/hookspecs.py new file mode 100644 index 0000000..1ddad23 --- /dev/null +++ b/mixtape/hookspecs.py @@ -0,0 +1,110 @@ +import attr +from typing import Any, Iterable +from pluggy import HookspecMarker +from .players import PlayerType, Gst + +hookspec = HookspecMarker("mixtape") + +@attr.s +class Option: + name: str = attr.ib() + required: bool = attr.ib(default=False) + type: Any = attr.ib(default=str) + +# plugins and config + + +# @hookspec +# def mixtape_addhooks(): +# """ +# Register a plugin hook +# """ + +@hookspec +def mixtape_addoptions(player: PlayerType) -> Iterable[Option]: + """ + Register an option + """ + +# @hookspec +# def mixtape_plugin_registered(player, pipeline, options): +# pass + +# @hookspec +# def mixtape_plugin_autoload(player, pipeline, options): +# pass + + + +# @hookspec +# def mixtape_configure(): +# pass + +# pipeline creation and signals + +# @hookspec +# def mixtape_create_pipeline(player): +# pass + +# @hookspec +# def mixtape_on_element_added(player, element): +# pass + +# @hookspec +# def mixtape_on_deep_element_added(player, element): +# pass + +# @hookspec +# def mixtape_on_deep_element_removed(player, element): +# pass + + +# player init and teardown + +@hookspec +def mixtape_setup(player: PlayerType): + """ + Hook called on player setup + """ + +@hookspec +def mixtape_teardown(player: PlayerType): + """ + Hook called on player teardown + """ + + +# pipeline control and event hooks + +@hookspec +def mixtape_before_state_changed(player: PlayerType, state: Gst.State): + """ + Hook called before a `set_state` call. + """ + +@hookspec +def mixtape_on_state_changed(player: PlayerType, state: Gst.State): + """ + Hook called on state changed + """ + +def mixtape_on_bus_message(player: PlayerType, msg: Gst.Message): + """ + Hook called on bus message + """ + +# @hookspec +# def mixtape_on_eos(player: PlayerType): +# pass + + +# player actions and properties + + +# @hookspec +# def mixtape_register_method(): +# pass + +# @hookspec +# def mixtape_register_property(): +# pass \ No newline at end of file diff --git a/mixtape/players.py b/mixtape/players.py index ed2dc8a..237f4c2 100644 --- a/mixtape/players.py +++ b/mixtape/players.py @@ -237,10 +237,14 @@ async def create(cls: Type[PlayerType], pipeline: Gst.Pipeline) -> PlayerType: @classmethod async def from_description(cls: Type[PlayerType], description: str) -> PlayerType: """Player factory from a pipeline description""" - pipeline = Gst.parse_launch(description) - assert isinstance(pipeline, Gst.Pipeline) - return await cls.create(pipeline=pipeline) + return await cls.create(pipeline=cls.parse_description(description)) + @staticmethod + def parse_description(description: str) -> Gst.Pipeline: + pipeline = Gst.parse_launch(description) + if not isinstance(pipeline, Gst.Pipeline): + raise ValueError("Invalid pipeline description") + return pipeline class AsyncPlayer(Player): def __init_subclass__(cls) -> None: diff --git a/req-install.txt b/req-install.txt index 87cbba5..b2ef188 100644 --- a/req-install.txt +++ b/req-install.txt @@ -1,2 +1,3 @@ attrs -beppu \ No newline at end of file +beppu +click \ No newline at end of file diff --git a/setup.py b/setup.py index 03f1e66..22d01de 100644 --- a/setup.py +++ b/setup.py @@ -35,6 +35,9 @@ def parse_requirements(filename): tests_require=TEST_DEPS, extras_require=EXTRAS, install_requires=INSTALL_DEPS, + entry_points={ + 'console_scripts': ['mixtape = mixtape.cli:play'], + }, classifiers=[ "Development Status :: 2 - Pre-Alpha", "Intended Audience :: Developers", From bd8e76d109f5ea839ebe68813b4a797ff356e95c Mon Sep 17 00:00:00 2001 From: Ashley Camba Garrido Date: Thu, 27 Aug 2020 15:42:45 +0200 Subject: [PATCH 02/18] Add experimental changes to core --- mixtape/__init__.py | 22 +-- mixtape/cli.py | 95 ++++++++++--- mixtape/core.py | 316 ++++++++++++++++++++++++++++++++++++++++--- mixtape/hookspecs.py | 97 ++++--------- mixtape/players.py | 245 +-------------------------------- mypy.ini | 3 + req-install.txt | 3 +- setup.py | 4 +- tests/conftest.py | 2 +- tests/test_core.py | 57 ++++++++ tests/test_player.py | 2 +- 11 files changed, 479 insertions(+), 367 deletions(-) create mode 100644 tests/test_core.py diff --git a/mixtape/__init__.py b/mixtape/__init__.py index f78e6c9..e272c70 100644 --- a/mixtape/__init__.py +++ b/mixtape/__init__.py @@ -1,20 +1,24 @@ -from .core import BoomBox - -from typing import (Any, Callable, List, Mapping, MutableMapping, Optional, - Tuple, Type, TypeVar) +from typing import ( + Any, + Sequence, + Optional, +) import pluggy from . import hookspecs +from .core import BoomBox, Player -hookimpl = pluggy.HookimplMarker("mixtape") +hookimpl = pluggy.HookimplMarker(__name__) -def load_plugin_manager(plugins=None): - """Init mixtape plugin manager""" +__all__ = ["hookimpl", "hookspecs", "Player", "BoomBox", "load_mixtape_plugins"] + +def load_mixtape_plugins(plugins: Optional[Sequence[Any]] = None) -> pluggy.PluginManager: + """Init mixtape plugin manager""" if plugins is None: plugins = [] pm = pluggy.PluginManager(__name__) pm.add_hookspecs(hookspecs) - pm.load_setuptools_entrypoints(group=__name__) - return pm \ No newline at end of file + pm.load_setuptools_entrypoints(group=__name__) + return pm diff --git a/mixtape/cli.py b/mixtape/cli.py index 189b9b0..0cc9503 100644 --- a/mixtape/cli.py +++ b/mixtape/cli.py @@ -1,36 +1,87 @@ +# type: ignore import click import asyncio -import signal import gi +import logging +import colorlog +from prompt_toolkit import HTML +from prompt_toolkit.patch_stdout import patch_stdout +from prompt_toolkit import PromptSession gi.require_version("Gst", "1.0") from gi.repository import Gst -from . import BoomBox, load_plugin_manager +from . import BoomBox, Player, load_mixtape_plugins -class PluggyCLI(click.Command): +logger = logging.getLogger(__name__) +handler = colorlog.StreamHandler() +handler.setFormatter( + colorlog.ColoredFormatter( + "(%(asctime)s) [%(log_color)s%(levelname)s] | %(name)s | %(message)s [%(threadName)-10s]" + ) +) + +# get root logger +logger = logging.getLogger() +logger.handlers = [] +logger.addHandler(handler) +logger.setLevel(logging.INFO) + + +class MixtapeCommand(click.Command): """Click command that adds options from pluggy hooks""" - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.pm = load_plugin_manager() - # we add an option for every plugin to disable - for plugin in self.pm.get_plugins(): - name = self.pm.get_name(plugin) - option = click.Option([f'--{name}/--no-{name}'], default=True) - self.params.append(option) - -@click.command(cls=PluggyCLI) -@click.argument('description', nargs=-1, type=click.UNPROCESSED, required=True) + def make_context(self, info_name, args, parent=None, **extra): + """Override make_context to add Boombox early""" + for key, value in click.core.iteritems(self.context_settings): # noqa: B301 + if key not in extra: + extra[key] = value + ctx = click.Context(self, info_name=info_name, parent=parent, **extra) + ctx.pm = load_mixtape_plugins() + with ctx.scope(cleanup=False): + self.parse_args(ctx, args) + return ctx + + def get_params(self, ctx): + """Add plugin params to cli interface""" + params = super().get_params(ctx) + for plugin in ctx.pm.get_plugins(): + name = ctx.pm.get_name(plugin) + option = click.Option([f"--{name}/--no-{name}"], default=True) + params.append(option) + return params + + +def bottom_toolbar(): + """Returns formatted bottom toolbar""" + return HTML('Mixtape !') + + +@click.command(cls=MixtapeCommand) +@click.argument("description", nargs=-1, type=click.UNPROCESSED, required=True) @click.pass_context def play(ctx, description, **kwargs): description = " ".join(description) - Gst.init(None) - async def main(description, pm): - pipeline = BoomBox.parse_description(description) - player = BoomBox(pipeline, pm) - player.setup() - await player.play_until_eos() - player.teardown() - asyncio.run(main(description, ctx.command.pm)) + async def main(description): + Gst.init(None) + player = await Player.from_description(description) + boombox = BoomBox(player=player, pm=ctx.pm) + help_text = "Press key:" + boombox.setup() + session = PromptSession() + while True: + with patch_stdout(): + + result = await session.prompt_async( + help_text, bottom_toolbar=lambda: bottom_toolbar + ) + print("You said: %s" % result) + if result == "p": + await boombox.play() + if result == "s": + await boombox.stop() + break + boombox.teardown() + + asyncio.run(main(description)) diff --git a/mixtape/core.py b/mixtape/core.py index cf138f9..248c08a 100644 --- a/mixtape/core.py +++ b/mixtape/core.py @@ -1,33 +1,315 @@ +import asyncio +import itertools +import logging +from typing import Any, Callable, List, MutableMapping, Tuple, Type, TypeVar + import attr +import gi import pluggy -from typing import Type, Optional, Any +gi.require_version("Gst", "1.0") +from gi.repository import Gst + +from .events import PlayerEvents +from .exceptions import PlayerNotConfigured, PlayerPipelineError, PlayerSetStateError + + +logger = logging.getLogger(__name__) + +PlayerType = TypeVar("PlayerType", bound="Player") + + +# from . import hookspecs + + +class Context: + """Plugin shared state object.""" + + def __init__(self) -> None: + self.properties: MutableMapping[str, Any] = dict() + self.commands: MutableMapping[str, Any] = dict() + + def register_property(self, name: str, value: Any) -> None: + self.properties[name] = value + + def register_command(self, name: str, value: Any) -> None: + self.commands[name] = value -from .players import Player, PlayerType, Gst @attr.s -class BoomBox(Player): - "Boom boom" - pm: Type[pluggy.PluginManager] = attr.ib(repr=False) +class Player: + """Player base player""" + + pipeline: Gst.Pipeline = attr.ib(validator=attr.validators.instance_of(Gst.Pipeline)) + events: PlayerEvents = attr.ib(init=False, default=attr.Factory(PlayerEvents)) + handlers: MutableMapping[Gst.MessageType, Callable[[Gst.Bus, Gst.Message], None]] = attr.ib( + init=False, repr=False + ) + + @handlers.default + def _handlers(self) -> MutableMapping[Gst.MessageType, Callable[[Gst.Bus, Gst.Message], None]]: + return { + Gst.MessageType.QOS: self._on_qos, + Gst.MessageType.ERROR: self._on_error, + Gst.MessageType.EOS: self._on_eos, + Gst.MessageType.STATE_CHANGED: self._on_state_changed, + Gst.MessageType.ASYNC_DONE: self._on_async_done, + } + + def __del__(self) -> None: + """ + Make sure that the gstreamer pipeline is always cleaned up + """ + if self.state is not Gst.State.NULL: + self.teardown() @property - def hook(self) -> Any: - """Convenience shortcut for pm hook""" - return self.pm.hook + def bus(self) -> Gst.Bus: + """Convenience property for the pipeline Gst.Bus""" + return self.pipeline.get_bus() + + @property + def state(self) -> Gst.State: + """Convenience property for the current pipeline Gst.State""" + return self.pipeline.get_state(0)[1] + + @property + def sinks(self) -> List[Any]: + """Returns all sink elements""" + return list(self.pipeline.iterate_sinks()) + + @property + def sources(self) -> List[Any]: + """Return all source elements""" + return list(self.pipeline.iterate_sources()) + + @property + def elements(self) -> List[Any]: + """Return all pipeline elements""" + return list(self.pipeline.iterate_elements()) + + def get_elements_by_gtype(self, gtype: Any) -> List[Any]: + """Return all elements in pipeline that match gtype""" + return [e for e in self.elements if e.get_factory().get_element_type() == gtype] def setup(self) -> None: - super().setup() - self.hook.mixtape_setup(player=self) + """Setup needs a running asyncio loop""" + loop = asyncio.get_running_loop() + pollfd = self.bus.get_pollfd() + loop.add_reader(pollfd.fd, self._handle) + self.events.setup.set() - def tearddown(self) -> None: - self.hook.mixtape_teardown(player=self) - super().setup() + def teardown(self) -> None: + """Cleanup player references to loop and gst resources""" + if self.state is not Gst.State.NULL: + self.pipeline.set_state(Gst.State.NULL) + logger.debug("Teardown set state to null") + logger.debug("Removing pollfd") + loop = asyncio.get_running_loop() + pollfd = self.bus.get_pollfd() + loop.remove_reader(pollfd.fd) + self.events.teardown.set() + + # controls async def set_state(self, state: Gst.State) -> Gst.StateChangeReturn: - self.hook.mixtape_before_state_changed(player=self, state=state) - ret = await super().set_state(state) - self.hook.mixtape_on_state_changed(player=self, state=state) + """Async set state""" + if self.state == state: + raise ValueError("Pipeline state is already in state %s.", state) + if not self.events.setup.is_set(): + raise PlayerNotConfigured("Setting state before setup is not allowed.") + ret = self.pipeline.set_state(state) + if ret == Gst.StateChangeReturn.FAILURE: + raise PlayerSetStateError + if ret == Gst.StateChangeReturn.ASYNC: + await self.events.wait_for_state(state) + return ret + + async def ready(self) -> Tuple[Gst.StateChangeReturn, Gst.State, Gst.State]: + """Async override of base.ready""" + return await self.set_state(Gst.State.READY) + + async def play(self) -> Tuple[Gst.StateChangeReturn, Gst.State, Gst.State]: + """Async override of base.play""" + return await self.set_state(Gst.State.PLAYING) + + async def pause(self) -> Tuple[Gst.StateChangeReturn, Gst.State, Gst.State]: + """Async override of base.pause""" + return await self.set_state(Gst.State.PAUSED) + + async def stop(self) -> Tuple[Gst.StateChangeReturn, Gst.State, Gst.State]: + """Async override of base.stop""" + return await self.set_state(Gst.State.NULL) + + # -- utility methods -- # + + async def send_eos(self) -> bool: + """Send eos to pipeline and await event""" + ret = self.pipeline.send_event(Gst.Event.new_eos()) + await self.events.eos.wait() return ret + async def play_until_eos(self) -> None: + """Play until eos or an error""" + await self.play() + await asyncio.wait( + {self.events.eos.wait(), self.events.error.wait()}, return_when=asyncio.FIRST_COMPLETED + ) + + # -- bus message handling -- # + + def _handle(self) -> None: + """ + Asyncio reader callback, called when a message is available on + the bus. + """ + msg = self.bus.pop() + if msg: + handler = self.handlers.get(msg.type, self._on_unhandled_msg) + handler(self.bus, msg) + + def _on_state_changed( + self, bus: Gst.Bus, message: Gst.Message + ) -> None: # pylint: disable=unused-argument + """ + Handler for `state_changed` messages + By default will only log to `debug` + """ + old, new, _ = message.parse_state_changed() + + if message.src != self.pipeline: + return + logger.info( + "State changed from %s to %s", + Gst.Element.state_get_name(old), + Gst.Element.state_get_name(new), + ) + + self.events.pick_state(new) + + def _on_error(self, bus: Gst.Bus, message: Gst.Message) -> None: + """ + Handler for `error` messages + By default it will parse the error message, + log to `error` and append to `self.errors` + """ + err, debug = message.parse_error() + logger.error( + "Error received from element %s:%s on %s", message.src.get_name(), err.message, bus + ) + if debug is not None: + logger.error("Debugging information: %s", debug) + + self.teardown() + self.events.error.set() + raise PlayerPipelineError(err) + + def _on_eos(self, bus: Gst.Bus, message: Gst.Message) -> None: + """ + Handler for eos messages + By default it sets the eos event + """ + logger.info("Received EOS message on bus") + self.events.eos.set() + + def _on_async_done(self, bus: Gst.Bus, message: Gst.Message) -> None: + """ + Handler for `async_done` messages + By default, it will pop any futures available in `self.futures` + and call their result. + """ + msg = message.parse_async_done() + logger.debug("Unhandled ASYNC_DONE message: %s on %s", msg, bus) + + def _on_unhandled_msg(self, bus: Gst.Bus, message: Gst.Message) -> None: + """ + Handler for all other messages. + By default will just log with `debug` + """ + logger.debug("Unhandled msg: %s on %s", message.type, bus) + + def _on_qos(self, bus: Gst.Bus, message: Gst.Message) -> None: + """ + Handler for `qos` messages + By default it will parse the error message, + log to `error` and append to `self.errors` + """ + live, running_time, stream_time, timestamp, duration = message.parse_qos() + logger.warning( + "Qos message: live:%s - running:%s - stream:%s - timestamp:%s - duration:%s received from %s on %s", + live, + running_time, + stream_time, + timestamp, + duration, + message.src.get_name(), + bus, + ) + + @classmethod + async def create(cls: Type[PlayerType], pipeline: Gst.Pipeline) -> PlayerType: + """Player factory from a given pipeline that calls setup by default""" + player = cls(pipeline) + player.setup() + return player - \ No newline at end of file + @classmethod + async def from_description(cls: Type[PlayerType], description: str) -> PlayerType: + """Player factory from a pipeline description""" + return await cls.create(pipeline=cls.parse_description(description)) + + @staticmethod + def parse_description(description: str) -> Gst.Pipeline: + pipeline = Gst.parse_launch(description) + if not isinstance(pipeline, Gst.Pipeline): + raise ValueError("Invalid pipeline description") + return pipeline + + +class BoomBox: + """ + Facade object that orchestrates plugin callbacks + and exposes plugin commands and properties. + """ + + DEFAULT_PLAYER_COMMANDS: List[str] = ["play", "pause", "stop", "ready"] + DEFAULT_PLAYER_ATTRIBUTES: List[Any] = [] + + def __init__(self, player: Player, pm: Type[pluggy.PluginManager]): + self._player = player + self._pm = pm + self._context = Context() + for cmd in self.DEFAULT_PLAYER_COMMANDS: + self._context.register_command(cmd, getattr(self._player, cmd)) + results = self._hook.mixtape_register_commands(player=self._player, ctx=self._context) + results = list(itertools.chain(*results)) + for name, method in results: + self._context.register_command(name, method) + + def __getattr__(self, name: str) -> Any: + """ + Expose methods and properties from plugins + """ + try: + return {**self._context.properties, **self._context.commands}[name] + except KeyError: + raise AttributeError + + @property + def _hook(self) -> Any: + """Convenience shortcut for pm hook""" + return self._pm.hook + + def setup(self) -> None: + self._player.setup() + self._hook.mixtape_setup(player=self._player, ctx=self._context) + + def teardown(self) -> None: + self._hook.mixtape_teardown(player=self._player, ctx=self._context) + self._player.teardown() + + async def set_state(self, state: Gst.State) -> Gst.StateChangeReturn: + self._hook.mixtape_before_state_changed(player=self._player, ctx=self._context, state=state) + ret = await self._player.set_state(state) + self._hook.mixtape_on_state_changed(player=self._player, ctx=self._context, state=state) + return ret diff --git a/mixtape/hookspecs.py b/mixtape/hookspecs.py index 1ddad23..7a92df0 100644 --- a/mixtape/hookspecs.py +++ b/mixtape/hookspecs.py @@ -1,74 +1,29 @@ -import attr -from typing import Any, Iterable -from pluggy import HookspecMarker -from .players import PlayerType, Gst +# type: ignore +from typing import Any, Callable, cast, TypeVar +import pluggy +from .core import Context, Player +import gi -hookspec = HookspecMarker("mixtape") +gi.require_version("Gst", "1.0") +from gi.repository import Gst -@attr.s -class Option: - name: str = attr.ib() - required: bool = attr.ib(default=False) - type: Any = attr.ib(default=str) -# plugins and config - - -# @hookspec -# def mixtape_addhooks(): -# """ -# Register a plugin hook -# """ - -@hookspec -def mixtape_addoptions(player: PlayerType) -> Iterable[Option]: - """ - Register an option - """ - -# @hookspec -# def mixtape_plugin_registered(player, pipeline, options): -# pass - -# @hookspec -# def mixtape_plugin_autoload(player, pipeline, options): -# pass - - - -# @hookspec -# def mixtape_configure(): -# pass - -# pipeline creation and signals - -# @hookspec -# def mixtape_create_pipeline(player): -# pass - -# @hookspec -# def mixtape_on_element_added(player, element): -# pass - -# @hookspec -# def mixtape_on_deep_element_added(player, element): -# pass - -# @hookspec -# def mixtape_on_deep_element_removed(player, element): -# pass +F = TypeVar("F", bound=Callable[..., Any]) +hookspec = cast(Callable[[F], F], pluggy.HookspecMarker("mixtape")) # player init and teardown + @hookspec -def mixtape_setup(player: PlayerType): +def mixtape_setup(player: Player, ctx: Context): """ Hook called on player setup """ + @hookspec -def mixtape_teardown(player: PlayerType): +def mixtape_teardown(player: Player, ctx: Context): """ Hook called on player teardown """ @@ -76,22 +31,30 @@ def mixtape_teardown(player: PlayerType): # pipeline control and event hooks + @hookspec -def mixtape_before_state_changed(player: PlayerType, state: Gst.State): +def mixtape_before_state_changed(player: Player, ctx: Context, state: Gst.State): """ Hook called before a `set_state` call. """ + @hookspec -def mixtape_on_state_changed(player: PlayerType, state: Gst.State): +def mixtape_on_state_changed(player: Player, ctx: Context, state: Gst.State): """ Hook called on state changed """ -def mixtape_on_bus_message(player: PlayerType, msg: Gst.Message): - """ - Hook called on bus message - """ + +@hookspec +def mixtape_register_commands(player: Player, ctx: Context): + pass + + +# def mixtape_on_bus_message(player: Player,ctx: Context, msg: Gst.Message): +# """ +# Hook called on bus message +# """ # @hookspec # def mixtape_on_eos(player: PlayerType): @@ -101,10 +64,6 @@ def mixtape_on_bus_message(player: PlayerType, msg: Gst.Message): # player actions and properties -# @hookspec -# def mixtape_register_method(): -# pass - # @hookspec # def mixtape_register_property(): -# pass \ No newline at end of file +# pass diff --git a/mixtape/players.py b/mixtape/players.py index 237f4c2..605515a 100644 --- a/mixtape/players.py +++ b/mixtape/players.py @@ -1,250 +1,7 @@ -import asyncio -import logging -from typing import Any, Type, TypeVar, Tuple, List, Callable, MutableMapping -import attr import warnings -import gi -gi.require_version("Gst", "1.0") -from gi.repository import Gst +from mixtape import Player -from .exceptions import PlayerSetStateError, PlayerPipelineError, PlayerNotConfigured -from .events import PlayerEvents - -logger = logging.getLogger(__name__) - -PlayerType = TypeVar("PlayerType", bound="Player") -AsyncPlayerType = TypeVar("AsyncPlayerType", bound="AsyncPlayer") - - -@attr.s -class Player: - """Player base player""" - - pipeline: Gst.Pipeline = attr.ib(validator=attr.validators.instance_of(Gst.Pipeline)) - events: PlayerEvents = attr.ib(init=False, default=attr.Factory(PlayerEvents)) - handlers: MutableMapping[Gst.MessageType, Callable[[Gst.Bus, Gst.Message], None]] = attr.ib( - init=False, repr=False - ) - - @handlers.default - def _handlers(self) -> MutableMapping[Gst.MessageType, Callable[[Gst.Bus, Gst.Message], None]]: - return { - Gst.MessageType.QOS: self._on_qos, - Gst.MessageType.ERROR: self._on_error, - Gst.MessageType.EOS: self._on_eos, - Gst.MessageType.STATE_CHANGED: self._on_state_changed, - Gst.MessageType.ASYNC_DONE: self._on_async_done, - } - - def __del__(self) -> None: - """ - Make sure that the gstreamer pipeline is always cleaned up - """ - if self.state is not Gst.State.NULL: - self.teardown() - - @property - def bus(self) -> Gst.Bus: - """Convenience property for the pipeline Gst.Bus""" - return self.pipeline.get_bus() - - @property - def state(self) -> Gst.State: - """Convenience property for the current pipeline Gst.State""" - return self.pipeline.get_state(0)[1] - - @property - def sinks(self) -> List[Any]: - """Returns all sink elements""" - return list(self.pipeline.iterate_sinks()) - - @property - def sources(self) -> List[Any]: - """Return all source elements""" - return list(self.pipeline.iterate_sources()) - - @property - def elements(self) -> List[Any]: - """Return all pipeline elements""" - return list(self.pipeline.iterate_elements()) - - def get_elements_by_gtype(self, gtype: Any) -> List[Any]: - """Return all elements in pipeline that match gtype""" - return [e for e in self.elements if e.get_factory().get_element_type() == gtype] - - def setup(self) -> None: - """Setup needs a running asyncio loop""" - loop = asyncio.get_running_loop() - pollfd = self.bus.get_pollfd() - loop.add_reader(pollfd.fd, self._handle) - self.events.setup.set() - - def teardown(self) -> None: - """Cleanup player references to loop and gst resources""" - if self.state is not Gst.State.NULL: - self.pipeline.set_state(Gst.State.NULL) - logger.debug("Teardown set state to null") - logger.debug("Removing pollfd") - loop = asyncio.get_running_loop() - pollfd = self.bus.get_pollfd() - loop.remove_reader(pollfd.fd) - self.events.teardown.set() - - # controls - - async def set_state(self, state: Gst.State) -> Gst.StateChangeReturn: - """Async set state""" - if self.state == state: - raise ValueError("Pipeline state is already in state %s.", state) - if not self.events.setup.is_set(): - raise PlayerNotConfigured("Setting state before setup is not allowed.") - ret = self.pipeline.set_state(state) - if ret == Gst.StateChangeReturn.FAILURE: - raise PlayerSetStateError - if ret == Gst.StateChangeReturn.ASYNC: - await self.events.wait_for_state(state) - return ret - - async def ready(self) -> Tuple[Gst.StateChangeReturn, Gst.State, Gst.State]: - """Async override of base.ready""" - return await self.set_state(Gst.State.READY) - - async def play(self) -> Tuple[Gst.StateChangeReturn, Gst.State, Gst.State]: - """Async override of base.play""" - return await self.set_state(Gst.State.PLAYING) - - async def pause(self) -> Tuple[Gst.StateChangeReturn, Gst.State, Gst.State]: - """Async override of base.pause""" - return await self.set_state(Gst.State.PAUSED) - - async def stop(self) -> Tuple[Gst.StateChangeReturn, Gst.State, Gst.State]: - """Async override of base.stop""" - return await self.set_state(Gst.State.NULL) - - # -- utility methods -- # - - async def send_eos(self) -> bool: - """Send eos to pipeline and await event""" - ret = self.pipeline.send_event(Gst.Event.new_eos()) - await self.events.eos.wait() - return ret - - async def play_until_eos(self) -> None: - """Play until eos or an error""" - await self.play() - await asyncio.wait( - {self.events.eos.wait(), self.events.error.wait()}, return_when=asyncio.FIRST_COMPLETED - ) - - # -- bus message handling -- # - - def _handle(self) -> None: - """ - Asyncio reader callback, called when a message is available on - the bus. - """ - msg = self.bus.pop() - if msg: - handler = self.handlers.get(msg.type, self._on_unhandled_msg) - handler(self.bus, msg) - - def _on_state_changed( - self, bus: Gst.Bus, message: Gst.Message - ) -> None: # pylint: disable=unused-argument - """ - Handler for `state_changed` messages - By default will only log to `debug` - """ - old, new, _ = message.parse_state_changed() - - if message.src != self.pipeline: - return - logger.info( - "State changed from %s to %s", - Gst.Element.state_get_name(old), - Gst.Element.state_get_name(new), - ) - - self.events.pick_state(new) - - def _on_error(self, bus: Gst.Bus, message: Gst.Message) -> None: - """ - Handler for `error` messages - By default it will parse the error message, - log to `error` and append to `self.errors` - """ - err, debug = message.parse_error() - logger.error( - "Error received from element %s:%s on %s", message.src.get_name(), err.message, bus - ) - if debug is not None: - logger.error("Debugging information: %s", debug) - - self.teardown() - self.events.error.set() - raise PlayerPipelineError(err) - - def _on_eos(self, bus: Gst.Bus, message: Gst.Message) -> None: - """ - Handler for eos messages - By default it sets the eos event - """ - logger.info("EOS message: %s received from pipeline on %s", message, bus) - self.events.eos.set() - - def _on_async_done(self, bus: Gst.Bus, message: Gst.Message) -> None: - """ - Handler for `async_done` messages - By default, it will pop any futures available in `self.futures` - and call their result. - """ - msg = message.parse_async_done() - logger.debug("Unhandled ASYNC_DONE message: %s on %s", msg, bus) - - def _on_unhandled_msg(self, bus: Gst.Bus, message: Gst.Message) -> None: - """ - Handler for all other messages. - By default will just log with `debug` - """ - logger.debug("Unhandled msg: %s on %s", message.type, bus) - - def _on_qos(self, bus: Gst.Bus, message: Gst.Message) -> None: - """ - Handler for `qos` messages - By default it will parse the error message, - log to `error` and append to `self.errors` - """ - live, running_time, stream_time, timestamp, duration = message.parse_qos() - logger.warning( - "Qos message: live:%s - running:%s - stream:%s - timestamp:%s - duration:%s received from %s on %s", - live, - running_time, - stream_time, - timestamp, - duration, - message.src.get_name(), - bus, - ) - - @classmethod - async def create(cls: Type[PlayerType], pipeline: Gst.Pipeline) -> PlayerType: - """Player factory from a given pipeline that calls setup by default""" - player = cls(pipeline) - player.setup() - return player - - @classmethod - async def from_description(cls: Type[PlayerType], description: str) -> PlayerType: - """Player factory from a pipeline description""" - return await cls.create(pipeline=cls.parse_description(description)) - - @staticmethod - def parse_description(description: str) -> Gst.Pipeline: - pipeline = Gst.parse_launch(description) - if not isinstance(pipeline, Gst.Pipeline): - raise ValueError("Invalid pipeline description") - return pipeline class AsyncPlayer(Player): def __init_subclass__(cls) -> None: diff --git a/mypy.ini b/mypy.ini index 999eff9..c802076 100644 --- a/mypy.ini +++ b/mypy.ini @@ -51,4 +51,7 @@ ignore_missing_imports = True ignore_missing_imports = True [mypy-pampy.*] +ignore_missing_imports = True + +[mypy-pluggy.*] ignore_missing_imports = True \ No newline at end of file diff --git a/req-install.txt b/req-install.txt index b2ef188..501a5a6 100644 --- a/req-install.txt +++ b/req-install.txt @@ -1,3 +1,4 @@ attrs beppu -click \ No newline at end of file +click==7.1.2 +prompt-toolkit==3.0.6 diff --git a/setup.py b/setup.py index 22d01de..1901447 100644 --- a/setup.py +++ b/setup.py @@ -35,9 +35,7 @@ def parse_requirements(filename): tests_require=TEST_DEPS, extras_require=EXTRAS, install_requires=INSTALL_DEPS, - entry_points={ - 'console_scripts': ['mixtape = mixtape.cli:play'], - }, + entry_points={"console_scripts": ["mixtape = mixtape.cli:play"]}, classifiers=[ "Development Status :: 2 - Pre-Alpha", "Intended Audience :: Developers", diff --git a/tests/conftest.py b/tests/conftest.py index f7b24b0..413dbe9 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -21,7 +21,7 @@ # flake8 plugin is way too verbose def pytest_configure(config): - logging.getLogger("flake8").setLevel(logging.WARN) + logging.getLogger("flake8").setLevel(logging.ERROR) logging.getLogger("bandit").setLevel(logging.WARN) logging.getLogger("blib2to3").setLevel(logging.WARN) logging.getLogger("stevedore").setLevel(logging.WARN) diff --git a/tests/test_core.py b/tests/test_core.py new file mode 100644 index 0000000..699dc88 --- /dev/null +++ b/tests/test_core.py @@ -0,0 +1,57 @@ +# type: ignore +import pytest +import asyncio +from mixtape.core import BoomBox + +from mixtape import load_mixtape_plugins, hookimpl + + +class ExamplePlugin: + def __init__(self): + self.state = None + + def setup_plugin_state(self): + """Initial setup""" + self.state = "Something not None" + + async def clear(self): + """Reset calibration on demand""" + + async def call(self): + """Call calibration on demand""" + + @hookimpl + def mixtape_setup(self, player, ctx): + # self.pipeline.get_element + self.setup_plugin_state() + + @hookimpl + def mixtape_register_commands(self, player, ctx): + return [("clear", self.clear), ("call", self.call)] + + +@pytest.mark.asyncio +async def test_boombox_plugin_hooks(player, pipeline, mocker): + p = player(pipeline=pipeline) + pm = load_mixtape_plugins() + + assert not pm.get_plugins(), "Plugins should return empty set" + + plugin = ExamplePlugin() + + pm.register(plugin) + + assert pm.get_plugins(), "We now should have one plugin" + + b = BoomBox(player=p, pm=pm) + b.setup() + + assert plugin.state == "Something not None" + + await b.play() + await asyncio.sleep(3) + await b.call() + + await b.stop() + await b.clear() + assert b diff --git a/tests/test_player.py b/tests/test_player.py index a5611ff..cc4bd6b 100644 --- a/tests/test_player.py +++ b/tests/test_player.py @@ -7,7 +7,7 @@ gi.require_version("Gst", "1.0") from gi.repository import Gst -from mixtape.players import Player +from mixtape import Player from mixtape.exceptions import PlayerSetStateError, PlayerNotConfigured From 2c0326fd1d5afb812ec2fb6a678a2dd515bae94d Mon Sep 17 00:00:00 2001 From: lima-tango Date: Fri, 28 Aug 2020 18:04:30 +0200 Subject: [PATCH 03/18] Add hacky command mapping and toolbar prompt --- mixtape/cli.py | 38 ++++++++++++++++++++++++++++++-------- 1 file changed, 30 insertions(+), 8 deletions(-) diff --git a/mixtape/cli.py b/mixtape/cli.py index 0cc9503..ffdd689 100644 --- a/mixtape/cli.py +++ b/mixtape/cli.py @@ -7,6 +7,8 @@ from prompt_toolkit import HTML from prompt_toolkit.patch_stdout import patch_stdout from prompt_toolkit import PromptSession +import string +from typing import Dict, List gi.require_version("Gst", "1.0") from gi.repository import Gst @@ -52,9 +54,26 @@ def get_params(self, ctx): return params -def bottom_toolbar(): +def bottom_toolbar(keyboard_mapping: Dict): """Returns formatted bottom toolbar""" - return HTML('Mixtape !') + available_commands = ', '.join(f'{v} [{k}]' for k,v in keyboard_mapping.items()) + return HTML(f'Mixtape {available_commands}!') + + +def get_key_command_mapping(commands: List) -> Dict: + """Returns a mapping of keys to commands""" + keyboard_mapping = {} + for command in commands: + if command[0] not in keyboard_mapping.keys(): + keyboard_mapping[command[0]] = command + else: + available_letters = [l for l in string.ascii_lowercase if l not in keyboard_mapping.keys()] + try: + keyboard_mapping[available_letters[0]] = command + except IndexError: + raise Exception('More commands than lowercase letters!') + return keyboard_mapping + @click.command(cls=MixtapeCommand) @@ -69,19 +88,22 @@ async def main(description): boombox = BoomBox(player=player, pm=ctx.pm) help_text = "Press key:" boombox.setup() + + commands = boombox._context.commands + key_command_mapping = get_key_command_mapping(list(commands)) session = PromptSession() while True: with patch_stdout(): + toolbar = bottom_toolbar(key_command_mapping) result = await session.prompt_async( - help_text, bottom_toolbar=lambda: bottom_toolbar + help_text, bottom_toolbar=toolbar ) print("You said: %s" % result) - if result == "p": - await boombox.play() - if result == "s": - await boombox.stop() - break + if result in key_command_mapping: + await commands[key_command_mapping[result]]() + if key_command_mapping[result] == 'stop': + break boombox.teardown() asyncio.run(main(description)) From 5396f95385750dc2f68c82f019a94e455736483c Mon Sep 17 00:00:00 2001 From: Ashley Camba Garrido Date: Mon, 28 Sep 2020 12:59:43 +0200 Subject: [PATCH 04/18] Add fixes to adapt to registering commands --- mixtape/cli.py | 50 +++++++++++++++++++++----------------------- mixtape/core.py | 30 +++++++++++++++++++------- mixtape/hookspecs.py | 12 +++++++++++ 3 files changed, 59 insertions(+), 33 deletions(-) diff --git a/mixtape/cli.py b/mixtape/cli.py index ffdd689..56722b9 100644 --- a/mixtape/cli.py +++ b/mixtape/cli.py @@ -74,7 +74,29 @@ def get_key_command_mapping(commands: List) -> Dict: raise Exception('More commands than lowercase letters!') return keyboard_mapping - +async def main(description, ctx): + Gst.init(None) + player = await Player.from_description(description) + boombox = BoomBox(player=player, pm=ctx.pm) + help_text = "Press key:" + boombox.setup() + + commands = boombox._context.commands + key_command_mapping = get_key_command_mapping(list(commands)) + session = PromptSession() + while True: + with patch_stdout(): + + toolbar = bottom_toolbar(key_command_mapping) + result = await session.prompt_async( + help_text, bottom_toolbar=toolbar + ) + print("You said: %s" % result) + if result in key_command_mapping: + await commands[key_command_mapping[result]]() + if key_command_mapping[result] == 'stop': + break + boombox.teardown() @click.command(cls=MixtapeCommand) @click.argument("description", nargs=-1, type=click.UNPROCESSED, required=True) @@ -82,28 +104,4 @@ def get_key_command_mapping(commands: List) -> Dict: def play(ctx, description, **kwargs): description = " ".join(description) - async def main(description): - Gst.init(None) - player = await Player.from_description(description) - boombox = BoomBox(player=player, pm=ctx.pm) - help_text = "Press key:" - boombox.setup() - - commands = boombox._context.commands - key_command_mapping = get_key_command_mapping(list(commands)) - session = PromptSession() - while True: - with patch_stdout(): - - toolbar = bottom_toolbar(key_command_mapping) - result = await session.prompt_async( - help_text, bottom_toolbar=toolbar - ) - print("You said: %s" % result) - if result in key_command_mapping: - await commands[key_command_mapping[result]]() - if key_command_mapping[result] == 'stop': - break - boombox.teardown() - - asyncio.run(main(description)) + asyncio.run(main(description, ctx)) diff --git a/mixtape/core.py b/mixtape/core.py index 248c08a..83da761 100644 --- a/mixtape/core.py +++ b/mixtape/core.py @@ -35,6 +35,8 @@ def register_property(self, name: str, value: Any) -> None: def register_command(self, name: str, value: Any) -> None: self.commands[name] = value + def clear_commands(self): + self.commands = {} @attr.s class Player: @@ -279,12 +281,14 @@ def __init__(self, player: Player, pm: Type[pluggy.PluginManager]): self._player = player self._pm = pm self._context = Context() - for cmd in self.DEFAULT_PLAYER_COMMANDS: - self._context.register_command(cmd, getattr(self._player, cmd)) - results = self._hook.mixtape_register_commands(player=self._player, ctx=self._context) - results = list(itertools.chain(*results)) - for name, method in results: - self._context.register_command(name, method) + # init all the plugins + self._hook.mixtape_plugin_init(player=self._player, ctx=self._context) + + # rename and monkeypatch default set state + self._player._set_state = self._player.set_state + self._player.set_state = self.set_state + # register initial commands + self._register_commands() def __getattr__(self, name: str) -> Any: """ @@ -300,6 +304,16 @@ def _hook(self) -> Any: """Convenience shortcut for pm hook""" return self._pm.hook + def _register_commands(self) -> None: + # register all the commands + self._context.clear_commands() + for cmd in self.DEFAULT_PLAYER_COMMANDS: + self._context.register_command(cmd, getattr(self._player, cmd)) + results = self._hook.mixtape_register_commands(player=self._player, ctx=self._context) + results = list(itertools.chain(*results)) + for name, method in results: + self._context.register_command(name, method) + def setup(self) -> None: self._player.setup() self._hook.mixtape_setup(player=self._player, ctx=self._context) @@ -310,6 +324,8 @@ def teardown(self) -> None: async def set_state(self, state: Gst.State) -> Gst.StateChangeReturn: self._hook.mixtape_before_state_changed(player=self._player, ctx=self._context, state=state) - ret = await self._player.set_state(state) + ret = await self._player._set_state(state) self._hook.mixtape_on_state_changed(player=self._player, ctx=self._context, state=state) + logger.info("Registering commands on state change") + self._register_commands() return ret diff --git a/mixtape/hookspecs.py b/mixtape/hookspecs.py index 7a92df0..57702a8 100644 --- a/mixtape/hookspecs.py +++ b/mixtape/hookspecs.py @@ -11,6 +11,15 @@ F = TypeVar("F", bound=Callable[..., Any]) hookspec = cast(Callable[[F], F], pluggy.HookspecMarker("mixtape")) +# plugin + +@hookspec +def mixtape_plugin_init(player: Player, ctx: Context): + pass + +@hookspec +def mixtape_plugin_autoload(player: Player, ctx: Context): + pass # player init and teardown @@ -50,6 +59,9 @@ def mixtape_on_state_changed(player: Player, ctx: Context, state: Gst.State): def mixtape_register_commands(player: Player, ctx: Context): pass +# @hookspec +# def mixtape_register_conditions(player: Player, ctx: Context): +# pass # def mixtape_on_bus_message(player: Player,ctx: Context, msg: Gst.Message): # """ From 7aebaef85bbcbaff435bca34094dda8b82fd8821 Mon Sep 17 00:00:00 2001 From: lima-tango Date: Tue, 29 Sep 2020 14:36:57 +0200 Subject: [PATCH 05/18] Add Command class --- mixtape/core.py | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/mixtape/core.py b/mixtape/core.py index 83da761..5bc150d 100644 --- a/mixtape/core.py +++ b/mixtape/core.py @@ -38,6 +38,18 @@ def register_command(self, name: str, value: Any) -> None: def clear_commands(self): self.commands = {} + +@attr.s +class Command: + name: str = attr.ib() + method: Callable = attr.ib() + availability_check: Callable = attr.ib(default=None) + + def register_command(self, ctx): + if self.availability_check and self.availability_check(): + ctx.register_command(self.name, self.method) + + @attr.s class Player: """Player base player""" @@ -311,8 +323,8 @@ def _register_commands(self) -> None: self._context.register_command(cmd, getattr(self._player, cmd)) results = self._hook.mixtape_register_commands(player=self._player, ctx=self._context) results = list(itertools.chain(*results)) - for name, method in results: - self._context.register_command(name, method) + for command in results: + command.register_command(self._context) def setup(self) -> None: self._player.setup() From e42968173913f442cbc5740a3b4f9545878a983e Mon Sep 17 00:00:00 2001 From: lima-tango Date: Tue, 29 Sep 2020 13:06:43 +0000 Subject: [PATCH 06/18] Use Command class in test_core --- tests/test_core.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/test_core.py b/tests/test_core.py index 699dc88..c6ad194 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -4,6 +4,7 @@ from mixtape.core import BoomBox from mixtape import load_mixtape_plugins, hookimpl +from mixtape.core import Command class ExamplePlugin: @@ -27,7 +28,7 @@ def mixtape_setup(self, player, ctx): @hookimpl def mixtape_register_commands(self, player, ctx): - return [("clear", self.clear), ("call", self.call)] + return [Command("clear", self.clear), Command("call", self.call)] @pytest.mark.asyncio From a57d6f13e3c5f6a637ea882b5d5d6bbb83e7d660 Mon Sep 17 00:00:00 2001 From: lima-tango Date: Tue, 6 Oct 2020 12:54:27 +0000 Subject: [PATCH 07/18] Add options to context and add hookspec --- mixtape/core.py | 9 ++++++++- mixtape/hookspecs.py | 8 ++++++++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/mixtape/core.py b/mixtape/core.py index 5bc150d..6e384f2 100644 --- a/mixtape/core.py +++ b/mixtape/core.py @@ -1,7 +1,7 @@ import asyncio import itertools import logging -from typing import Any, Callable, List, MutableMapping, Tuple, Type, TypeVar +from typing import Any, Callable, List, MutableMapping, Tuple, Type, TypeVar, Dict import attr import gi @@ -28,6 +28,7 @@ class Context: def __init__(self) -> None: self.properties: MutableMapping[str, Any] = dict() self.commands: MutableMapping[str, Any] = dict() + self.options: MutableMapping[str, Any] = dict() def register_property(self, name: str, value: Any) -> None: self.properties[name] = value @@ -38,6 +39,10 @@ def register_command(self, name: str, value: Any) -> None: def clear_commands(self): self.commands = {} + def add_option(self, option: Dict): + name = option.pop('name') + self.options[name] = option + @attr.s class Command: @@ -293,6 +298,8 @@ def __init__(self, player: Player, pm: Type[pluggy.PluginManager]): self._player = player self._pm = pm self._context = Context() + # add all options + self._hook.mixtape_add_options(player=self._player, ctx=self._context) # init all the plugins self._hook.mixtape_plugin_init(player=self._player, ctx=self._context) diff --git a/mixtape/hookspecs.py b/mixtape/hookspecs.py index 57702a8..47c9b1c 100644 --- a/mixtape/hookspecs.py +++ b/mixtape/hookspecs.py @@ -21,6 +21,14 @@ def mixtape_plugin_init(player: Player, ctx: Context): def mixtape_plugin_autoload(player: Player, ctx: Context): pass +# interface options + +@hookspec +def mixtape_add_option(player: Player, ctx: Context): + """ + Hook called on setup to add interface options exposed by plug-ins. + """ + # player init and teardown From 6f8be6f8a1f6bc3dfbcc6c62cf42e0586ea9859a Mon Sep 17 00:00:00 2001 From: lima-tango Date: Wed, 7 Oct 2020 10:49:55 +0000 Subject: [PATCH 08/18] Add hooks for adding options and getting pipeline --- mixtape/cli.py | 20 +++++++++++++++----- mixtape/core.py | 5 ++--- mixtape/hookspecs.py | 10 ++++++++-- 3 files changed, 25 insertions(+), 10 deletions(-) diff --git a/mixtape/cli.py b/mixtape/cli.py index 56722b9..751ecda 100644 --- a/mixtape/cli.py +++ b/mixtape/cli.py @@ -51,6 +51,11 @@ def get_params(self, ctx): name = ctx.pm.get_name(plugin) option = click.Option([f"--{name}/--no-{name}"], default=True) params.append(option) + plugin_options = ctx.pm.hook.mixtape_add_options() + for option_group in plugin_options: + for option in option_group: + o = click.Option([f"--{option['name']}"]) + params.append(o) return params @@ -74,10 +79,14 @@ def get_key_command_mapping(commands: List) -> Dict: raise Exception('More commands than lowercase letters!') return keyboard_mapping -async def main(description, ctx): +async def main(description, ctx, options): Gst.init(None) - player = await Player.from_description(description) - boombox = BoomBox(player=player, pm=ctx.pm) + + pipeline_string = ctx.pm.hook.mixtape_get_pipeline(description=description, options=options) + if not pipeline_string: + pipeline_string = description + player = await Player.from_description(pipeline_string) + boombox = BoomBox(player=player, pm=ctx.pm, options=options) help_text = "Press key:" boombox.setup() @@ -99,9 +108,10 @@ async def main(description, ctx): boombox.teardown() @click.command(cls=MixtapeCommand) -@click.argument("description", nargs=-1, type=click.UNPROCESSED, required=True) +@click.argument("description", nargs=-1, type=click.UNPROCESSED, required=False) @click.pass_context def play(ctx, description, **kwargs): description = " ".join(description) - asyncio.run(main(description, ctx)) + asyncio.run(main(description, ctx, kwargs)) + diff --git a/mixtape/core.py b/mixtape/core.py index 6e384f2..2527bf9 100644 --- a/mixtape/core.py +++ b/mixtape/core.py @@ -294,12 +294,11 @@ class BoomBox: DEFAULT_PLAYER_COMMANDS: List[str] = ["play", "pause", "stop", "ready"] DEFAULT_PLAYER_ATTRIBUTES: List[Any] = [] - def __init__(self, player: Player, pm: Type[pluggy.PluginManager]): + def __init__(self, player: Player, pm: Type[pluggy.PluginManager], options:dict): self._player = player self._pm = pm + self._options = options self._context = Context() - # add all options - self._hook.mixtape_add_options(player=self._player, ctx=self._context) # init all the plugins self._hook.mixtape_plugin_init(player=self._player, ctx=self._context) diff --git a/mixtape/hookspecs.py b/mixtape/hookspecs.py index 47c9b1c..8ece284 100644 --- a/mixtape/hookspecs.py +++ b/mixtape/hookspecs.py @@ -18,17 +18,23 @@ def mixtape_plugin_init(player: Player, ctx: Context): pass @hookspec -def mixtape_plugin_autoload(player: Player, ctx: Context): +def mixtape_plugin_autoload(): pass # interface options @hookspec -def mixtape_add_option(player: Player, ctx: Context): +def mixtape_add_options(): """ Hook called on setup to add interface options exposed by plug-ins. """ +@hookspec(firstresult=True) +def mixtape_get_pipeline(description, options): + """ + Hook allowing a plugin to return a pipeline + """ + # player init and teardown From 96ac8977bcb288eb51697a4e37ab1ce3664e1217 Mon Sep 17 00:00:00 2001 From: lima-tango Date: Fri, 9 Oct 2020 12:15:17 +0000 Subject: [PATCH 09/18] Update handling of options --- mixtape/core.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/mixtape/core.py b/mixtape/core.py index 2527bf9..00243e5 100644 --- a/mixtape/core.py +++ b/mixtape/core.py @@ -39,9 +39,8 @@ def register_command(self, name: str, value: Any) -> None: def clear_commands(self): self.commands = {} - def add_option(self, option: Dict): - name = option.pop('name') - self.options[name] = option + def add_option(self, name: str, value: Any) -> None: + self.options[name] = value @attr.s @@ -79,8 +78,8 @@ def __del__(self) -> None: """ Make sure that the gstreamer pipeline is always cleaned up """ - if self.state is not Gst.State.NULL: - self.teardown() + # if self.state is not Gst.State.NULL: + # self.teardown() @property def bus(self) -> Gst.Bus: @@ -299,6 +298,8 @@ def __init__(self, player: Player, pm: Type[pluggy.PluginManager], options:dict) self._pm = pm self._options = options self._context = Context() + for name, value in options.items(): + self._context.add_option(name, value) # init all the plugins self._hook.mixtape_plugin_init(player=self._player, ctx=self._context) From 5867bb68336cbc6256b85dece3a2b92ee90bee32 Mon Sep 17 00:00:00 2001 From: lima-tango Date: Mon, 12 Oct 2020 11:10:15 +0000 Subject: [PATCH 10/18] Update available commands in prompt --- mixtape/cli.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/mixtape/cli.py b/mixtape/cli.py index 751ecda..7544ece 100644 --- a/mixtape/cli.py +++ b/mixtape/cli.py @@ -90,12 +90,11 @@ async def main(description, ctx, options): help_text = "Press key:" boombox.setup() - commands = boombox._context.commands - key_command_mapping = get_key_command_mapping(list(commands)) session = PromptSession() while True: with patch_stdout(): - + commands = boombox._context.commands + key_command_mapping = get_key_command_mapping(list(commands)) toolbar = bottom_toolbar(key_command_mapping) result = await session.prompt_async( help_text, bottom_toolbar=toolbar From 52ce1b1b64e9828d41b524404e2dbe36d62e6ad7 Mon Sep 17 00:00:00 2001 From: lima-tango Date: Mon, 12 Oct 2020 12:31:48 +0000 Subject: [PATCH 11/18] Fix Command registration default --- mixtape/core.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/mixtape/core.py b/mixtape/core.py index 00243e5..b6e7171 100644 --- a/mixtape/core.py +++ b/mixtape/core.py @@ -47,10 +47,16 @@ def add_option(self, name: str, value: Any) -> None: class Command: name: str = attr.ib() method: Callable = attr.ib() - availability_check: Callable = attr.ib(default=None) + availability_check: Callable = attr.ib() + + @availability_check.default + def default_for_availability_check(self): + def default_true(): + return True + return default_true def register_command(self, ctx): - if self.availability_check and self.availability_check(): + if self.availability_check(): ctx.register_command(self.name, self.method) From 76cf1edd9b07f616f1df4981f88242a25b51d05a Mon Sep 17 00:00:00 2001 From: lima-tango Date: Mon, 12 Oct 2020 12:34:02 +0000 Subject: [PATCH 12/18] Fix BoomBox options default --- mixtape/core.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/mixtape/core.py b/mixtape/core.py index b6e7171..f05bfa9 100644 --- a/mixtape/core.py +++ b/mixtape/core.py @@ -299,12 +299,14 @@ class BoomBox: DEFAULT_PLAYER_COMMANDS: List[str] = ["play", "pause", "stop", "ready"] DEFAULT_PLAYER_ATTRIBUTES: List[Any] = [] - def __init__(self, player: Player, pm: Type[pluggy.PluginManager], options:dict): + def __init__(self, player: Player, pm: Type[pluggy.PluginManager], options: Dict=None): self._player = player self._pm = pm - self._options = options + self._options = options + if self._options is None: + self._options = dict() self._context = Context() - for name, value in options.items(): + for name, value in self._options.items(): self._context.add_option(name, value) # init all the plugins self._hook.mixtape_plugin_init(player=self._player, ctx=self._context) From b2071a22450a41db3f735db85ebaafa03115d847 Mon Sep 17 00:00:00 2001 From: lima-tango Date: Mon, 12 Oct 2020 16:01:25 +0200 Subject: [PATCH 13/18] Refactor BoomBox init Initialize Boombox with player or description or pipeline options, where description is a pipeline string and pipeline selects a predefined pipeline string exposed by a plugin. Add test for this initialization. --- mixtape/cli.py | 7 ++----- mixtape/core.py | 13 ++++++++++++- mixtape/hookspecs.py | 2 +- tests/test_core.py | 16 ++++++++++++++++ 4 files changed, 31 insertions(+), 7 deletions(-) diff --git a/mixtape/cli.py b/mixtape/cli.py index 7544ece..89fb519 100644 --- a/mixtape/cli.py +++ b/mixtape/cli.py @@ -82,11 +82,8 @@ def get_key_command_mapping(commands: List) -> Dict: async def main(description, ctx, options): Gst.init(None) - pipeline_string = ctx.pm.hook.mixtape_get_pipeline(description=description, options=options) - if not pipeline_string: - pipeline_string = description - player = await Player.from_description(pipeline_string) - boombox = BoomBox(player=player, pm=ctx.pm, options=options) + options["description"] = description + boombox = BoomBox(player=None, pm=ctx.pm, options=options) help_text = "Press key:" boombox.setup() diff --git a/mixtape/core.py b/mixtape/core.py index f05bfa9..d379234 100644 --- a/mixtape/core.py +++ b/mixtape/core.py @@ -299,7 +299,7 @@ class BoomBox: DEFAULT_PLAYER_COMMANDS: List[str] = ["play", "pause", "stop", "ready"] DEFAULT_PLAYER_ATTRIBUTES: List[Any] = [] - def __init__(self, player: Player, pm: Type[pluggy.PluginManager], options: Dict=None): + def __init__(self, player: Player, pm: Type[pluggy.PluginManager], options: Dict = None): self._player = player self._pm = pm self._options = options @@ -311,6 +311,17 @@ def __init__(self, player: Player, pm: Type[pluggy.PluginManager], options: Dict # init all the plugins self._hook.mixtape_plugin_init(player=self._player, ctx=self._context) + if not self._player: + pipeline_str = self._hook.mixtape_get_pipeline(ctx=self._context) + if not pipeline_str: + pipeline_str = self._context.options.get('description') + try: + pipeline = Gst.parse_launch(pipeline_str) + except TypeError: + #TODO raise correct Exception + raise Exception("PlayerNotAvailable: No pipeline description") + self._player = Player(pipeline=pipeline) + # rename and monkeypatch default set state self._player._set_state = self._player.set_state self._player.set_state = self.set_state diff --git a/mixtape/hookspecs.py b/mixtape/hookspecs.py index 8ece284..a3239e9 100644 --- a/mixtape/hookspecs.py +++ b/mixtape/hookspecs.py @@ -30,7 +30,7 @@ def mixtape_add_options(): """ @hookspec(firstresult=True) -def mixtape_get_pipeline(description, options): +def mixtape_get_pipeline(ctx: Context): """ Hook allowing a plugin to return a pipeline """ diff --git a/tests/test_core.py b/tests/test_core.py index c6ad194..6fefc70 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -8,6 +8,8 @@ class ExamplePlugin: + PIPELINE_STR = "videotestsrc ! fakesink" + def __init__(self): self.state = None @@ -30,6 +32,12 @@ def mixtape_setup(self, player, ctx): def mixtape_register_commands(self, player, ctx): return [Command("clear", self.clear), Command("call", self.call)] + @hookimpl + def mixtape_get_pipeline(self, ctx): + pipeline_name = ctx.options.get("pipeline") + if pipeline_name == "test": + return self.PIPELINE_STR + @pytest.mark.asyncio async def test_boombox_plugin_hooks(player, pipeline, mocker): @@ -56,3 +64,11 @@ async def test_boombox_plugin_hooks(player, pipeline, mocker): await b.stop() await b.clear() assert b + + +def test_init_boombox_with_predefined_pipeline_from_plugin(player): + pm = load_mixtape_plugins() + plugin = ExamplePlugin() + pm.register(plugin) + b = BoomBox(player=None, pm=pm, options={"pipeline": "test"}) + assert b From e5ab2dfb40c223393cb8d6e97bfad3eda668268e Mon Sep 17 00:00:00 2001 From: lima-tango Date: Tue, 13 Oct 2020 07:08:11 +0000 Subject: [PATCH 14/18] Refacter Boombox constructor with kwargs --- mixtape/cli.py | 2 +- mixtape/core.py | 2 +- tests/test_core.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/mixtape/cli.py b/mixtape/cli.py index 89fb519..126abc5 100644 --- a/mixtape/cli.py +++ b/mixtape/cli.py @@ -83,7 +83,7 @@ async def main(description, ctx, options): Gst.init(None) options["description"] = description - boombox = BoomBox(player=None, pm=ctx.pm, options=options) + boombox = BoomBox(player=None, pm=ctx.pm, **options) help_text = "Press key:" boombox.setup() diff --git a/mixtape/core.py b/mixtape/core.py index d379234..3071487 100644 --- a/mixtape/core.py +++ b/mixtape/core.py @@ -299,7 +299,7 @@ class BoomBox: DEFAULT_PLAYER_COMMANDS: List[str] = ["play", "pause", "stop", "ready"] DEFAULT_PLAYER_ATTRIBUTES: List[Any] = [] - def __init__(self, player: Player, pm: Type[pluggy.PluginManager], options: Dict = None): + def __init__(self, player: Player, pm: Type[pluggy.PluginManager], **options): self._player = player self._pm = pm self._options = options diff --git a/tests/test_core.py b/tests/test_core.py index 6fefc70..6b8cc1f 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -70,5 +70,5 @@ def test_init_boombox_with_predefined_pipeline_from_plugin(player): pm = load_mixtape_plugins() plugin = ExamplePlugin() pm.register(plugin) - b = BoomBox(player=None, pm=pm, options={"pipeline": "test"}) + b = BoomBox(player=None, pm=pm, pipeline="test") assert b From 788333ee23e450cb5f55f8f8d48a55aa717a7145 Mon Sep 17 00:00:00 2001 From: lima-tango Date: Tue, 13 Oct 2020 08:40:55 +0000 Subject: [PATCH 15/18] Add hook for each state --- mixtape/core.py | 9 ++++++++- mixtape/hookspecs.py | 25 +++++++++++++++++++++++-- 2 files changed, 31 insertions(+), 3 deletions(-) diff --git a/mixtape/core.py b/mixtape/core.py index 3071487..3d51f81 100644 --- a/mixtape/core.py +++ b/mixtape/core.py @@ -363,7 +363,14 @@ def teardown(self) -> None: async def set_state(self, state: Gst.State) -> Gst.StateChangeReturn: self._hook.mixtape_before_state_changed(player=self._player, ctx=self._context, state=state) ret = await self._player._set_state(state) - self._hook.mixtape_on_state_changed(player=self._player, ctx=self._context, state=state) + if ret == Gst.State.NULL: + self._hook.mixtape_on_state_changed_to_NULL(player=self._player, ctx=self._context) + elif ret == Gst.State.PAUSED: + self._hook.mixtape_on_state_changed_to_PAUSED(player=self._player, ctx=self._context) + elif ret == Gst.State.READY: + self._hook.mixtape_on_state_changed_to_READY(player=self._player, ctx=self._context) + elif ret == Gst.State.PLAYING: + self._hook.mixtape_on_state_changed_to_PLAYING(player=self._player, ctx=self._context) logger.info("Registering commands on state change") self._register_commands() return ret diff --git a/mixtape/hookspecs.py b/mixtape/hookspecs.py index a3239e9..43d1a6c 100644 --- a/mixtape/hookspecs.py +++ b/mixtape/hookspecs.py @@ -63,9 +63,30 @@ def mixtape_before_state_changed(player: Player, ctx: Context, state: Gst.State) @hookspec -def mixtape_on_state_changed(player: Player, ctx: Context, state: Gst.State): +def mixtape_on_state_changed_to_NULL(player: Player, ctx: Context): """ - Hook called on state changed + Hook called on state changed to NULL + """ + + +@hookspec +def mixtape_on_state_changed_to_READY(player: Player, ctx: Context): + """ + Hook called on state changed to READY + """ + + +@hookspec +def mixtape_on_state_changed_to_PAUSED(player: Player, ctx: Context): + """ + Hook called on state changed to PAUSED + """ + + +@hookspec +def mixtape_on_state_changed_to_PLAYING(player: Player, ctx: Context): + """ + Hook called on state changed to PLAYING """ From 4cdf69c18733dbf86e65ab95253cae66b233155a Mon Sep 17 00:00:00 2001 From: lima-tango Date: Wed, 14 Oct 2020 08:03:52 +0000 Subject: [PATCH 16/18] Fix on_state_changed and BoomBox init --- mixtape/core.py | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/mixtape/core.py b/mixtape/core.py index 3d51f81..b469b93 100644 --- a/mixtape/core.py +++ b/mixtape/core.py @@ -303,14 +303,13 @@ def __init__(self, player: Player, pm: Type[pluggy.PluginManager], **options): self._player = player self._pm = pm self._options = options + self._context = Context() + # init options if self._options is None: self._options = dict() - self._context = Context() for name, value in self._options.items(): self._context.add_option(name, value) - # init all the plugins - self._hook.mixtape_plugin_init(player=self._player, ctx=self._context) - + # init player if not self._player: pipeline_str = self._hook.mixtape_get_pipeline(ctx=self._context) if not pipeline_str: @@ -322,6 +321,9 @@ def __init__(self, player: Player, pm: Type[pluggy.PluginManager], **options): raise Exception("PlayerNotAvailable: No pipeline description") self._player = Player(pipeline=pipeline) + # init all the plugins + self._hook.mixtape_plugin_init(player=self._player, ctx=self._context) + # rename and monkeypatch default set state self._player._set_state = self._player.set_state self._player.set_state = self.set_state @@ -363,13 +365,14 @@ def teardown(self) -> None: async def set_state(self, state: Gst.State) -> Gst.StateChangeReturn: self._hook.mixtape_before_state_changed(player=self._player, ctx=self._context, state=state) ret = await self._player._set_state(state) - if ret == Gst.State.NULL: + # check if ret is SUCCESS + if state == Gst.State.NULL: self._hook.mixtape_on_state_changed_to_NULL(player=self._player, ctx=self._context) - elif ret == Gst.State.PAUSED: + elif state == Gst.State.PAUSED: self._hook.mixtape_on_state_changed_to_PAUSED(player=self._player, ctx=self._context) - elif ret == Gst.State.READY: + elif state == Gst.State.READY: self._hook.mixtape_on_state_changed_to_READY(player=self._player, ctx=self._context) - elif ret == Gst.State.PLAYING: + elif state == Gst.State.PLAYING: self._hook.mixtape_on_state_changed_to_PLAYING(player=self._player, ctx=self._context) logger.info("Registering commands on state change") self._register_commands() From ba3c225d96e3213fecafaa6a967899c49e76e116 Mon Sep 17 00:00:00 2001 From: Ashley Camba Garrido Date: Thu, 14 Jan 2021 19:58:27 +0100 Subject: [PATCH 17/18] Add initial plugin spec --- mixtape/__init__.py | 25 +-- mixtape/base.py | 114 ----------- mixtape/boombox.py | 210 +++++++++++++++++++ mixtape/cli.py | 113 ----------- mixtape/core.py | 379 ----------------------------------- mixtape/events.py | 6 +- mixtape/exceptions.py | 6 +- mixtape/features/__init__.py | 0 mixtape/features/cmdline.py | 0 mixtape/features/console.py | 0 mixtape/features/dbus.py | 0 mixtape/features/http.py | 0 mixtape/hookspecs.py | 116 ----------- mixtape/players.py | 240 +++++++++++++++++++++- mypy.ini | 7 +- pytest.ini | 2 +- tests/conftest.py | 13 +- tests/test_boombox.py | 22 ++ tests/test_core.py | 74 ------- tests/test_events.py | 0 tests/test_player.py | 5 +- 21 files changed, 491 insertions(+), 841 deletions(-) delete mode 100644 mixtape/base.py create mode 100644 mixtape/boombox.py delete mode 100644 mixtape/cli.py delete mode 100644 mixtape/core.py delete mode 100644 mixtape/features/__init__.py delete mode 100644 mixtape/features/cmdline.py delete mode 100644 mixtape/features/console.py delete mode 100644 mixtape/features/dbus.py delete mode 100644 mixtape/features/http.py delete mode 100644 mixtape/hookspecs.py create mode 100644 tests/test_boombox.py delete mode 100644 tests/test_core.py delete mode 100644 tests/test_events.py diff --git a/mixtape/__init__.py b/mixtape/__init__.py index e272c70..3ee3775 100644 --- a/mixtape/__init__.py +++ b/mixtape/__init__.py @@ -1,24 +1,5 @@ -from typing import ( - Any, - Sequence, - Optional, -) +from .players import Player +from .boombox import BoomBox, hookspec -import pluggy -from . import hookspecs -from .core import BoomBox, Player - -hookimpl = pluggy.HookimplMarker(__name__) - -__all__ = ["hookimpl", "hookspecs", "Player", "BoomBox", "load_mixtape_plugins"] - - -def load_mixtape_plugins(plugins: Optional[Sequence[Any]] = None) -> pluggy.PluginManager: - """Init mixtape plugin manager""" - if plugins is None: - plugins = [] - pm = pluggy.PluginManager(__name__) - pm.add_hookspecs(hookspecs) - pm.load_setuptools_entrypoints(group=__name__) - return pm +__all__ = ["Player", "BoomBox", "hookspec"] diff --git a/mixtape/base.py b/mixtape/base.py deleted file mode 100644 index 2171d7b..0000000 --- a/mixtape/base.py +++ /dev/null @@ -1,114 +0,0 @@ -import logging -from typing import Tuple, Type, TypeVar - -import attr -import gi - -gi.require_version("Gst", "1.0") -from gi.repository import Gst - -from .exceptions import PlayerSetStateError - - -logger = logging.getLogger(__name__) - -BasePlayerType = TypeVar("BasePlayerType", bound="BasePlayer") - - -@attr.s -class BasePlayer: - """Player base player""" - - # TODO: configuration for set_auto_flush_bus - # as the application depends on async bus messages - # we might want to handle flushing the bus ourselves, - # otherwise setting the pipeline to `Gst.State.NULL` - # flushes the bus including the state change messages - # self.pipeline.set_auto_flush_bus(False) - - pipeline: Gst.Pipeline = attr.ib() - init: bool = attr.ib(init=False, default=False) - - def __del__(self) -> None: - """ - Make sure that the gstreamer pipeline is always cleaned up - """ - if self.state is not Gst.State.NULL: - logger.warning("Player cleanup on destructor") - self.teardown() - - @property - def bus(self) -> Gst.Bus: - """Convenience property for the pipeline Gst.Bus""" - return self.pipeline.get_bus() - - @property - def state(self) -> Gst.State: - """Convenience property for the current pipeline Gst.State""" - return self.pipeline.get_state(0)[1] - - def set_state(self, state: Gst.State) -> Tuple[Gst.StateChangeReturn, Gst.State, Gst.State]: - """Set pipeline state""" - if not self.init: - logger.warning("Calling set_state without calling setup. Trying to do this now.") - self.setup() - ret = self.pipeline.set_state(state) - if ret == Gst.StateChangeReturn.FAILURE: - raise PlayerSetStateError - return ret - - def setup(self) -> None: - """ - Player setup: meant to be used with hooks or subclassed - Call super() after custom code. - """ - self.init = True - - def teardown(self) -> None: - """Player teardown: by default sets the pipeline to Gst.State.NULL""" - if self.state is not Gst.State.NULL: - self.set_state(Gst.State.NULL) - - def ready(self) -> Tuple[Gst.StateChangeReturn, Gst.State, Gst.State]: - """Set pipeline to state to Gst.State.READY""" - return self.set_state(Gst.State.READY) - - def play(self) -> Tuple[Gst.StateChangeReturn, Gst.State, Gst.State]: - """Set pipeline to state to Gst.State.PLAY""" - return self.set_state(Gst.State.PLAYING) - - def pause(self) -> Tuple[Gst.StateChangeReturn, Gst.State, Gst.State]: - """Set pipeline to state to Gst.State.PAUSED""" - return self.set_state(Gst.State.PAUSED) - - # fmt: off - def stop(self, send_eos: bool = False, teardown: bool = False) -> Tuple[Gst.StateChangeReturn, Gst.State, Gst.State]: - """Set pipeline to state to Gst.State.NULL, with the option of sending eos and teardown""" - # fmt: on - if send_eos: - self.send_eos() - - ret = self.set_state(Gst.State.NULL) - - if teardown: - self.teardown() - - return ret - - def send_eos(self) -> bool: - """Send a eos event to the pipeline""" - return self.pipeline.send_event(Gst.Event.new_eos()) - - @classmethod - def create(cls: Type[BasePlayerType], pipeline: Gst.Pipeline) -> BasePlayerType: - """Player factory from a given pipeline that calls setup by default""" - player = cls(pipeline) - player.setup() - return player - - @classmethod - def from_description(cls: Type[BasePlayerType], description: str) -> BasePlayerType: - """Player factory from a pipeline description""" - pipeline = Gst.parse_launch(description) - assert isinstance(pipeline, Gst.Pipeline) - return cls.create(pipeline=pipeline) diff --git a/mixtape/boombox.py b/mixtape/boombox.py new file mode 100644 index 0000000..f86b3c5 --- /dev/null +++ b/mixtape/boombox.py @@ -0,0 +1,210 @@ +import logging + +from collections import ChainMap, UserDict +from typing import Any, List, Mapping, Optional, Tuple + +import attr +import gi +import simplug + +gi.require_version("Gst", "1.0") +from gi.repository import Gst + +from .players import Player +from .exceptions import BoomBoxNotConfigured + +logger = logging.getLogger(__name__) + + +hookspec = simplug.Simplug("mixtape") + + +class Context(UserDict[Any, Any]): + """ + Application state object + """ + + +class PluginSpec: + """Mixtape plugin namespace""" + + @hookspec.spec + def mixtape_plugin_init(self, ctx: Context) -> None: + """Called""" + + @hookspec.spec + def mixtape_add_pipelines(self, ctx: Context) -> Mapping[str, Any]: + """ + Hook allowing a plugin to return a pipeline + """ + + # player init and teardown + + @hookspec.spec + def mixtape_player_setup(self, ctx: Context, player: Player) -> None: + """ + Hook called on player setup + """ + + @hookspec.spec + def mixtape_player_teardown(self, ctx: Context, player: Player) -> None: + """ + Hook called on player teardown + """ + + # pipeline control and event hooks + + @hookspec.spec + async def mixtape_on_message(self, ctx: Context, player: Player, message: Gst.Message) -> None: + """ + Generic hook for all bus messages + """ + + @hookspec.spec + async def mixtape_before_state_changed( + self, ctx: Context, player: Player, state: Gst.State + ) -> None: + """ + Hook called before a `set_state` call. + """ + + @hookspec.spec + async def mixtape_on_ready(self, ctx: Context, player: Player) -> None: + """ + Shortcut Hook called on state changed to READY + """ + + @hookspec.spec + async def mixtape_on_pause(self, ctx: Context, player: Player) -> None: + """ + Shortcut Hook called on state changed to PAUSED + """ + + @hookspec.spec + async def mixtape_on_play(self, ctx: Context, player: Player) -> None: + """ + Shortcut Hook called on state changed to PLAYING + """ + + @hookspec.spec + async def mixtape_on_stop(self, ctx: Context, player: Player) -> None: + """ + Hook called on state changed to NULL + """ + + # asyncio player events + + @hookspec.spec + async def mixtape_on_eos(self, ctx: Context, player: Player) -> None: + """ + Hook called on eos + """ + + @hookspec.spec + async def mixtape_on_error(self, ctx: Context, player: Player) -> None: + """ + Hook called on bus message error + """ + + +@attr.s +class BoomBox: + """ + Facade object that orchestrates plugin callbacks + and exposes plugin events and properties. + """ + + player: Player = attr.ib() + context: Context = attr.ib(repr=False) + _hookspec: simplug.Simplug = attr.ib(repr=False) + + @property + def _hooks(self) -> simplug.SimplugHooks: + """Shortcut property for plugin hooks""" + return self._hookspec.hooks + + def setup(self) -> None: + """Wrapper for player setup""" + self.player.setup() + self._hooks.mixtape_player_setup(ctx=self.context, player=self.player) + + def teardown(self) -> None: + """wrapper for player teardown""" + self._hooks.mixtape_player_teardown(ctx=self.context, player=self.player) + self.player.teardown() + + async def ready(self) -> Tuple[Gst.StateChangeReturn, Gst.State, Gst.State]: + """Wrapper for player ready""" + await self._hooks.mixtape_before_state_changed( + ctx=self.context, player=self.player, state=Gst.State.READY + ) + ret = await self.player.ready() + await self._hooks.mixtape_on_ready(ctx=self.context, player=self.player) + return ret + + async def pause(self) -> Tuple[Gst.StateChangeReturn, Gst.State, Gst.State]: + """Wrapper for player pause""" + await self._hooks.mixtape_before_state_changed( + ctx=self.context, player=self.player, state=Gst.State.PAUSED + ) + ret = await self.player.pause() + await self._hooks.mixtape_on_pause(ctx=self.context, player=self.player) + return ret + + async def play(self) -> Tuple[Gst.StateChangeReturn, Gst.State, Gst.State]: + """Wrapper for player play""" + await self._hooks.mixtape_before_state_changed( + ctx=self.context, player=self.player, state=Gst.State.PLAYING + ) + ret = await self.player.play() + await self._hooks.mixtape_on_play(ctx=self.context, player=self.player) + return ret + + async def stop(self) -> Tuple[Gst.StateChangeReturn, Gst.State, Gst.State]: + """Wrapper for player stop""" + await self._hooks.mixtape_before_state_changed( + ctx=self.context, player=self.player, state=Gst.State.NULL + ) + ret = await self.player.stop() + await self._hooks.mixtape_on_stop(ctx=self.context, player=self.player) + return ret + + @classmethod + async def init( + cls, + player: Optional[Any] = None, + context: Optional[Context] = None, + plugins: Optional[List[Any]] = None, + **settings: Any, + ) -> "BoomBox": + """Boombox async init method""" + + # load plugins + if plugins: + for plugin in plugins: + hookspec.register(plugin) + else: + hookspec.load_entrypoints("mixtape") + + # init context + + if context is None: + context = Context() + + # init plugins + + hookspec.hooks.mixtape_plugin_init(ctx=Context) + + # init player + + if not player: + context["pipelines"] = ChainMap(*hookspec.hooks.mixtape_add_pipelines(ctx=context)) + + try: + description = context["pipelines"][settings["name"]] + except AttributeError: + raise BoomBoxNotConfigured("Pipeline needed explicitly or provided by hook") + else: + player = await Player.from_description(description) + + return cls(player, context, hookspec) diff --git a/mixtape/cli.py b/mixtape/cli.py deleted file mode 100644 index 126abc5..0000000 --- a/mixtape/cli.py +++ /dev/null @@ -1,113 +0,0 @@ -# type: ignore -import click -import asyncio -import gi -import logging -import colorlog -from prompt_toolkit import HTML -from prompt_toolkit.patch_stdout import patch_stdout -from prompt_toolkit import PromptSession -import string -from typing import Dict, List - -gi.require_version("Gst", "1.0") -from gi.repository import Gst - -from . import BoomBox, Player, load_mixtape_plugins - -logger = logging.getLogger(__name__) -handler = colorlog.StreamHandler() -handler.setFormatter( - colorlog.ColoredFormatter( - "(%(asctime)s) [%(log_color)s%(levelname)s] | %(name)s | %(message)s [%(threadName)-10s]" - ) -) - -# get root logger -logger = logging.getLogger() -logger.handlers = [] -logger.addHandler(handler) -logger.setLevel(logging.INFO) - - -class MixtapeCommand(click.Command): - """Click command that adds options from pluggy hooks""" - - def make_context(self, info_name, args, parent=None, **extra): - """Override make_context to add Boombox early""" - for key, value in click.core.iteritems(self.context_settings): # noqa: B301 - if key not in extra: - extra[key] = value - ctx = click.Context(self, info_name=info_name, parent=parent, **extra) - ctx.pm = load_mixtape_plugins() - with ctx.scope(cleanup=False): - self.parse_args(ctx, args) - return ctx - - def get_params(self, ctx): - """Add plugin params to cli interface""" - params = super().get_params(ctx) - for plugin in ctx.pm.get_plugins(): - name = ctx.pm.get_name(plugin) - option = click.Option([f"--{name}/--no-{name}"], default=True) - params.append(option) - plugin_options = ctx.pm.hook.mixtape_add_options() - for option_group in plugin_options: - for option in option_group: - o = click.Option([f"--{option['name']}"]) - params.append(o) - return params - - -def bottom_toolbar(keyboard_mapping: Dict): - """Returns formatted bottom toolbar""" - available_commands = ', '.join(f'{v} [{k}]' for k,v in keyboard_mapping.items()) - return HTML(f'Mixtape {available_commands}!') - - -def get_key_command_mapping(commands: List) -> Dict: - """Returns a mapping of keys to commands""" - keyboard_mapping = {} - for command in commands: - if command[0] not in keyboard_mapping.keys(): - keyboard_mapping[command[0]] = command - else: - available_letters = [l for l in string.ascii_lowercase if l not in keyboard_mapping.keys()] - try: - keyboard_mapping[available_letters[0]] = command - except IndexError: - raise Exception('More commands than lowercase letters!') - return keyboard_mapping - -async def main(description, ctx, options): - Gst.init(None) - - options["description"] = description - boombox = BoomBox(player=None, pm=ctx.pm, **options) - help_text = "Press key:" - boombox.setup() - - session = PromptSession() - while True: - with patch_stdout(): - commands = boombox._context.commands - key_command_mapping = get_key_command_mapping(list(commands)) - toolbar = bottom_toolbar(key_command_mapping) - result = await session.prompt_async( - help_text, bottom_toolbar=toolbar - ) - print("You said: %s" % result) - if result in key_command_mapping: - await commands[key_command_mapping[result]]() - if key_command_mapping[result] == 'stop': - break - boombox.teardown() - -@click.command(cls=MixtapeCommand) -@click.argument("description", nargs=-1, type=click.UNPROCESSED, required=False) -@click.pass_context -def play(ctx, description, **kwargs): - description = " ".join(description) - - asyncio.run(main(description, ctx, kwargs)) - diff --git a/mixtape/core.py b/mixtape/core.py deleted file mode 100644 index b469b93..0000000 --- a/mixtape/core.py +++ /dev/null @@ -1,379 +0,0 @@ -import asyncio -import itertools -import logging -from typing import Any, Callable, List, MutableMapping, Tuple, Type, TypeVar, Dict - -import attr -import gi -import pluggy - -gi.require_version("Gst", "1.0") -from gi.repository import Gst - -from .events import PlayerEvents -from .exceptions import PlayerNotConfigured, PlayerPipelineError, PlayerSetStateError - - -logger = logging.getLogger(__name__) - -PlayerType = TypeVar("PlayerType", bound="Player") - - -# from . import hookspecs - - -class Context: - """Plugin shared state object.""" - - def __init__(self) -> None: - self.properties: MutableMapping[str, Any] = dict() - self.commands: MutableMapping[str, Any] = dict() - self.options: MutableMapping[str, Any] = dict() - - def register_property(self, name: str, value: Any) -> None: - self.properties[name] = value - - def register_command(self, name: str, value: Any) -> None: - self.commands[name] = value - - def clear_commands(self): - self.commands = {} - - def add_option(self, name: str, value: Any) -> None: - self.options[name] = value - - -@attr.s -class Command: - name: str = attr.ib() - method: Callable = attr.ib() - availability_check: Callable = attr.ib() - - @availability_check.default - def default_for_availability_check(self): - def default_true(): - return True - return default_true - - def register_command(self, ctx): - if self.availability_check(): - ctx.register_command(self.name, self.method) - - -@attr.s -class Player: - """Player base player""" - - pipeline: Gst.Pipeline = attr.ib(validator=attr.validators.instance_of(Gst.Pipeline)) - events: PlayerEvents = attr.ib(init=False, default=attr.Factory(PlayerEvents)) - handlers: MutableMapping[Gst.MessageType, Callable[[Gst.Bus, Gst.Message], None]] = attr.ib( - init=False, repr=False - ) - - @handlers.default - def _handlers(self) -> MutableMapping[Gst.MessageType, Callable[[Gst.Bus, Gst.Message], None]]: - return { - Gst.MessageType.QOS: self._on_qos, - Gst.MessageType.ERROR: self._on_error, - Gst.MessageType.EOS: self._on_eos, - Gst.MessageType.STATE_CHANGED: self._on_state_changed, - Gst.MessageType.ASYNC_DONE: self._on_async_done, - } - - def __del__(self) -> None: - """ - Make sure that the gstreamer pipeline is always cleaned up - """ - # if self.state is not Gst.State.NULL: - # self.teardown() - - @property - def bus(self) -> Gst.Bus: - """Convenience property for the pipeline Gst.Bus""" - return self.pipeline.get_bus() - - @property - def state(self) -> Gst.State: - """Convenience property for the current pipeline Gst.State""" - return self.pipeline.get_state(0)[1] - - @property - def sinks(self) -> List[Any]: - """Returns all sink elements""" - return list(self.pipeline.iterate_sinks()) - - @property - def sources(self) -> List[Any]: - """Return all source elements""" - return list(self.pipeline.iterate_sources()) - - @property - def elements(self) -> List[Any]: - """Return all pipeline elements""" - return list(self.pipeline.iterate_elements()) - - def get_elements_by_gtype(self, gtype: Any) -> List[Any]: - """Return all elements in pipeline that match gtype""" - return [e for e in self.elements if e.get_factory().get_element_type() == gtype] - - def setup(self) -> None: - """Setup needs a running asyncio loop""" - loop = asyncio.get_running_loop() - pollfd = self.bus.get_pollfd() - loop.add_reader(pollfd.fd, self._handle) - self.events.setup.set() - - def teardown(self) -> None: - """Cleanup player references to loop and gst resources""" - if self.state is not Gst.State.NULL: - self.pipeline.set_state(Gst.State.NULL) - logger.debug("Teardown set state to null") - logger.debug("Removing pollfd") - loop = asyncio.get_running_loop() - pollfd = self.bus.get_pollfd() - loop.remove_reader(pollfd.fd) - self.events.teardown.set() - - # controls - - async def set_state(self, state: Gst.State) -> Gst.StateChangeReturn: - """Async set state""" - if self.state == state: - raise ValueError("Pipeline state is already in state %s.", state) - if not self.events.setup.is_set(): - raise PlayerNotConfigured("Setting state before setup is not allowed.") - ret = self.pipeline.set_state(state) - if ret == Gst.StateChangeReturn.FAILURE: - raise PlayerSetStateError - if ret == Gst.StateChangeReturn.ASYNC: - await self.events.wait_for_state(state) - return ret - - async def ready(self) -> Tuple[Gst.StateChangeReturn, Gst.State, Gst.State]: - """Async override of base.ready""" - return await self.set_state(Gst.State.READY) - - async def play(self) -> Tuple[Gst.StateChangeReturn, Gst.State, Gst.State]: - """Async override of base.play""" - return await self.set_state(Gst.State.PLAYING) - - async def pause(self) -> Tuple[Gst.StateChangeReturn, Gst.State, Gst.State]: - """Async override of base.pause""" - return await self.set_state(Gst.State.PAUSED) - - async def stop(self) -> Tuple[Gst.StateChangeReturn, Gst.State, Gst.State]: - """Async override of base.stop""" - return await self.set_state(Gst.State.NULL) - - # -- utility methods -- # - - async def send_eos(self) -> bool: - """Send eos to pipeline and await event""" - ret = self.pipeline.send_event(Gst.Event.new_eos()) - await self.events.eos.wait() - return ret - - async def play_until_eos(self) -> None: - """Play until eos or an error""" - await self.play() - await asyncio.wait( - {self.events.eos.wait(), self.events.error.wait()}, return_when=asyncio.FIRST_COMPLETED - ) - - # -- bus message handling -- # - - def _handle(self) -> None: - """ - Asyncio reader callback, called when a message is available on - the bus. - """ - msg = self.bus.pop() - if msg: - handler = self.handlers.get(msg.type, self._on_unhandled_msg) - handler(self.bus, msg) - - def _on_state_changed( - self, bus: Gst.Bus, message: Gst.Message - ) -> None: # pylint: disable=unused-argument - """ - Handler for `state_changed` messages - By default will only log to `debug` - """ - old, new, _ = message.parse_state_changed() - - if message.src != self.pipeline: - return - logger.info( - "State changed from %s to %s", - Gst.Element.state_get_name(old), - Gst.Element.state_get_name(new), - ) - - self.events.pick_state(new) - - def _on_error(self, bus: Gst.Bus, message: Gst.Message) -> None: - """ - Handler for `error` messages - By default it will parse the error message, - log to `error` and append to `self.errors` - """ - err, debug = message.parse_error() - logger.error( - "Error received from element %s:%s on %s", message.src.get_name(), err.message, bus - ) - if debug is not None: - logger.error("Debugging information: %s", debug) - - self.teardown() - self.events.error.set() - raise PlayerPipelineError(err) - - def _on_eos(self, bus: Gst.Bus, message: Gst.Message) -> None: - """ - Handler for eos messages - By default it sets the eos event - """ - logger.info("Received EOS message on bus") - self.events.eos.set() - - def _on_async_done(self, bus: Gst.Bus, message: Gst.Message) -> None: - """ - Handler for `async_done` messages - By default, it will pop any futures available in `self.futures` - and call their result. - """ - msg = message.parse_async_done() - logger.debug("Unhandled ASYNC_DONE message: %s on %s", msg, bus) - - def _on_unhandled_msg(self, bus: Gst.Bus, message: Gst.Message) -> None: - """ - Handler for all other messages. - By default will just log with `debug` - """ - logger.debug("Unhandled msg: %s on %s", message.type, bus) - - def _on_qos(self, bus: Gst.Bus, message: Gst.Message) -> None: - """ - Handler for `qos` messages - By default it will parse the error message, - log to `error` and append to `self.errors` - """ - live, running_time, stream_time, timestamp, duration = message.parse_qos() - logger.warning( - "Qos message: live:%s - running:%s - stream:%s - timestamp:%s - duration:%s received from %s on %s", - live, - running_time, - stream_time, - timestamp, - duration, - message.src.get_name(), - bus, - ) - - @classmethod - async def create(cls: Type[PlayerType], pipeline: Gst.Pipeline) -> PlayerType: - """Player factory from a given pipeline that calls setup by default""" - player = cls(pipeline) - player.setup() - return player - - @classmethod - async def from_description(cls: Type[PlayerType], description: str) -> PlayerType: - """Player factory from a pipeline description""" - return await cls.create(pipeline=cls.parse_description(description)) - - @staticmethod - def parse_description(description: str) -> Gst.Pipeline: - pipeline = Gst.parse_launch(description) - if not isinstance(pipeline, Gst.Pipeline): - raise ValueError("Invalid pipeline description") - return pipeline - - -class BoomBox: - """ - Facade object that orchestrates plugin callbacks - and exposes plugin commands and properties. - """ - - DEFAULT_PLAYER_COMMANDS: List[str] = ["play", "pause", "stop", "ready"] - DEFAULT_PLAYER_ATTRIBUTES: List[Any] = [] - - def __init__(self, player: Player, pm: Type[pluggy.PluginManager], **options): - self._player = player - self._pm = pm - self._options = options - self._context = Context() - # init options - if self._options is None: - self._options = dict() - for name, value in self._options.items(): - self._context.add_option(name, value) - # init player - if not self._player: - pipeline_str = self._hook.mixtape_get_pipeline(ctx=self._context) - if not pipeline_str: - pipeline_str = self._context.options.get('description') - try: - pipeline = Gst.parse_launch(pipeline_str) - except TypeError: - #TODO raise correct Exception - raise Exception("PlayerNotAvailable: No pipeline description") - self._player = Player(pipeline=pipeline) - - # init all the plugins - self._hook.mixtape_plugin_init(player=self._player, ctx=self._context) - - # rename and monkeypatch default set state - self._player._set_state = self._player.set_state - self._player.set_state = self.set_state - # register initial commands - self._register_commands() - - def __getattr__(self, name: str) -> Any: - """ - Expose methods and properties from plugins - """ - try: - return {**self._context.properties, **self._context.commands}[name] - except KeyError: - raise AttributeError - - @property - def _hook(self) -> Any: - """Convenience shortcut for pm hook""" - return self._pm.hook - - def _register_commands(self) -> None: - # register all the commands - self._context.clear_commands() - for cmd in self.DEFAULT_PLAYER_COMMANDS: - self._context.register_command(cmd, getattr(self._player, cmd)) - results = self._hook.mixtape_register_commands(player=self._player, ctx=self._context) - results = list(itertools.chain(*results)) - for command in results: - command.register_command(self._context) - - def setup(self) -> None: - self._player.setup() - self._hook.mixtape_setup(player=self._player, ctx=self._context) - - def teardown(self) -> None: - self._hook.mixtape_teardown(player=self._player, ctx=self._context) - self._player.teardown() - - async def set_state(self, state: Gst.State) -> Gst.StateChangeReturn: - self._hook.mixtape_before_state_changed(player=self._player, ctx=self._context, state=state) - ret = await self._player._set_state(state) - # check if ret is SUCCESS - if state == Gst.State.NULL: - self._hook.mixtape_on_state_changed_to_NULL(player=self._player, ctx=self._context) - elif state == Gst.State.PAUSED: - self._hook.mixtape_on_state_changed_to_PAUSED(player=self._player, ctx=self._context) - elif state == Gst.State.READY: - self._hook.mixtape_on_state_changed_to_READY(player=self._player, ctx=self._context) - elif state == Gst.State.PLAYING: - self._hook.mixtape_on_state_changed_to_PLAYING(player=self._player, ctx=self._context) - logger.info("Registering commands on state change") - self._register_commands() - return ret diff --git a/mixtape/events.py b/mixtape/events.py index de0879f..79e85b8 100644 --- a/mixtape/events.py +++ b/mixtape/events.py @@ -10,6 +10,8 @@ class States(enum.Enum): + """Python enum representing the Gst.Pipeline states""" + VOID_PENDING = 0 NULL = 1 READY = 2 @@ -36,10 +38,10 @@ class TearDownEvent(asyncio.Event): @attr.s(auto_attribs=True, slots=True, frozen=True) class PlayerEvents: """ - Async event syncronization flags + Player async event synchronization flags """ - # simple events + # core events setup: asyncio.Event = attr.Factory(SetupEvent) eos: asyncio.Event = attr.Factory(EosEvent) error: asyncio.Event = attr.Factory(ErrorEvent) diff --git a/mixtape/exceptions.py b/mixtape/exceptions.py index df08c2f..8cfcf27 100644 --- a/mixtape/exceptions.py +++ b/mixtape/exceptions.py @@ -1,5 +1,5 @@ class MixTapeError(Exception): - """Mixtape exception base clase""" + """Mixtape exception base class""" class PlayerNotConfigured(MixTapeError): @@ -16,3 +16,7 @@ class PlayerSetStateError(MixTapeError): class PlayerPipelineError(MixTapeError): """Error originating from Gst Pipeline""" + + +class BoomBoxNotConfigured(MixTapeError): + """Boombox improperly configured""" diff --git a/mixtape/features/__init__.py b/mixtape/features/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/mixtape/features/cmdline.py b/mixtape/features/cmdline.py deleted file mode 100644 index e69de29..0000000 diff --git a/mixtape/features/console.py b/mixtape/features/console.py deleted file mode 100644 index e69de29..0000000 diff --git a/mixtape/features/dbus.py b/mixtape/features/dbus.py deleted file mode 100644 index e69de29..0000000 diff --git a/mixtape/features/http.py b/mixtape/features/http.py deleted file mode 100644 index e69de29..0000000 diff --git a/mixtape/hookspecs.py b/mixtape/hookspecs.py deleted file mode 100644 index 43d1a6c..0000000 --- a/mixtape/hookspecs.py +++ /dev/null @@ -1,116 +0,0 @@ -# type: ignore -from typing import Any, Callable, cast, TypeVar -import pluggy -from .core import Context, Player -import gi - -gi.require_version("Gst", "1.0") -from gi.repository import Gst - - -F = TypeVar("F", bound=Callable[..., Any]) -hookspec = cast(Callable[[F], F], pluggy.HookspecMarker("mixtape")) - -# plugin - -@hookspec -def mixtape_plugin_init(player: Player, ctx: Context): - pass - -@hookspec -def mixtape_plugin_autoload(): - pass - -# interface options - -@hookspec -def mixtape_add_options(): - """ - Hook called on setup to add interface options exposed by plug-ins. - """ - -@hookspec(firstresult=True) -def mixtape_get_pipeline(ctx: Context): - """ - Hook allowing a plugin to return a pipeline - """ - -# player init and teardown - - -@hookspec -def mixtape_setup(player: Player, ctx: Context): - """ - Hook called on player setup - """ - - -@hookspec -def mixtape_teardown(player: Player, ctx: Context): - """ - Hook called on player teardown - """ - - -# pipeline control and event hooks - - -@hookspec -def mixtape_before_state_changed(player: Player, ctx: Context, state: Gst.State): - """ - Hook called before a `set_state` call. - """ - - -@hookspec -def mixtape_on_state_changed_to_NULL(player: Player, ctx: Context): - """ - Hook called on state changed to NULL - """ - - -@hookspec -def mixtape_on_state_changed_to_READY(player: Player, ctx: Context): - """ - Hook called on state changed to READY - """ - - -@hookspec -def mixtape_on_state_changed_to_PAUSED(player: Player, ctx: Context): - """ - Hook called on state changed to PAUSED - """ - - -@hookspec -def mixtape_on_state_changed_to_PLAYING(player: Player, ctx: Context): - """ - Hook called on state changed to PLAYING - """ - - -@hookspec -def mixtape_register_commands(player: Player, ctx: Context): - pass - -# @hookspec -# def mixtape_register_conditions(player: Player, ctx: Context): -# pass - -# def mixtape_on_bus_message(player: Player,ctx: Context, msg: Gst.Message): -# """ -# Hook called on bus message -# """ - -# @hookspec -# def mixtape_on_eos(player: PlayerType): -# pass - - -# player actions and properties - - -# @hookspec -# def mixtape_register_property(): -# pass diff --git a/mixtape/players.py b/mixtape/players.py index 605515a..3a2ef2b 100644 --- a/mixtape/players.py +++ b/mixtape/players.py @@ -1,6 +1,244 @@ +import asyncio +import logging import warnings +from typing import Any, Callable, List, MutableMapping, Tuple, Type, TypeVar -from mixtape import Player +import attr +import gi + +gi.require_version("Gst", "1.0") +from gi.repository import Gst + +from .events import PlayerEvents +from .exceptions import PlayerNotConfigured, PlayerPipelineError, PlayerSetStateError + +logger = logging.getLogger(__name__) + +P = TypeVar("P", bound="Player") + + +@attr.s +class Player: + """Player base player""" + + pipeline: Gst.Pipeline = attr.ib(validator=attr.validators.instance_of(Gst.Pipeline)) + events: PlayerEvents = attr.ib(init=False, default=attr.Factory(PlayerEvents)) + handlers: MutableMapping[Gst.MessageType, Callable[[Gst.Bus, Gst.Message], None]] = attr.ib( + init=False, repr=False + ) + + @handlers.default + def _handlers(self) -> MutableMapping[Gst.MessageType, Callable[[Gst.Bus, Gst.Message], None]]: + return { + Gst.MessageType.QOS: self._on_qos, + Gst.MessageType.ERROR: self._on_error, + Gst.MessageType.EOS: self._on_eos, + Gst.MessageType.STATE_CHANGED: self._on_state_changed, + Gst.MessageType.ASYNC_DONE: self._on_async_done, + } + + def __del__(self) -> None: + """ + Make sure that the gstreamer pipeline is always cleaned up + """ + + @property + def bus(self) -> Gst.Bus: + """Convenience property for the pipeline Gst.Bus""" + return self.pipeline.get_bus() + + @property + def state(self) -> Gst.State: + """Convenience property for the current pipeline Gst.State""" + return self.pipeline.get_state(0)[1] + + @property + def sinks(self) -> List[Any]: + """Returns all sink elements""" + return list(self.pipeline.iterate_sinks()) + + @property + def sources(self) -> List[Any]: + """Return all source elements""" + return list(self.pipeline.iterate_sources()) + + @property + def elements(self) -> List[Any]: + """Return all pipeline elements""" + return list(self.pipeline.iterate_elements()) + + def get_elements_by_gtype(self, gtype: Any) -> List[Any]: + """Return all elements in pipeline that match gtype""" + return [e for e in self.elements if e.get_factory().get_element_type() == gtype] + + def setup(self) -> None: + """Setup needs a running asyncio loop""" + loop = asyncio.get_running_loop() + pollfd = self.bus.get_pollfd() + loop.add_reader(pollfd.fd, self._handle) + self.events.setup.set() + + def teardown(self) -> None: + """Cleanup player references to loop and gst resources""" + if self.state is not Gst.State.NULL: + self.pipeline.set_state(Gst.State.NULL) + logger.debug("Teardown set state to null") + logger.debug("Removing pollfd") + loop = asyncio.get_running_loop() + pollfd = self.bus.get_pollfd() + loop.remove_reader(pollfd.fd) + self.events.teardown.set() + + # controls + + async def set_state(self, state: Gst.State) -> Gst.StateChangeReturn: + """Async set state""" + if self.state == state: + raise ValueError("Pipeline state is already in state %s.", state) + if not self.events.setup.is_set(): + raise PlayerNotConfigured("Setting state before setup is not allowed.") + ret = self.pipeline.set_state(state) + if ret == Gst.StateChangeReturn.FAILURE: + raise PlayerSetStateError + if ret == Gst.StateChangeReturn.ASYNC: + await self.events.wait_for_state(state) + return ret + + async def ready(self) -> Tuple[Gst.StateChangeReturn, Gst.State, Gst.State]: + """Async override of base.ready""" + return await self.set_state(Gst.State.READY) + + async def play(self) -> Tuple[Gst.StateChangeReturn, Gst.State, Gst.State]: + """Async override of base.play""" + return await self.set_state(Gst.State.PLAYING) + + async def pause(self) -> Tuple[Gst.StateChangeReturn, Gst.State, Gst.State]: + """Async override of base.pause""" + return await self.set_state(Gst.State.PAUSED) + + async def stop(self) -> Tuple[Gst.StateChangeReturn, Gst.State, Gst.State]: + """Async override of base.stop""" + return await self.set_state(Gst.State.NULL) + + # -- utility methods -- # + + async def send_eos(self) -> bool: + """Send eos to pipeline and await event""" + ret = self.pipeline.send_event(Gst.Event.new_eos()) + await self.events.eos.wait() + return ret + + async def play_until_eos(self) -> None: + """Play until eos or an error""" + await self.play() + await asyncio.wait( + {self.events.eos.wait(), self.events.error.wait()}, return_when=asyncio.FIRST_COMPLETED + ) + + # -- bus message handling -- # + + def _handle(self) -> None: + """ + Asyncio reader callback, called when a message is available on + the bus. + """ + msg = self.bus.pop() + if msg: + handler = self.handlers.get(msg.type, self._on_unhandled_msg) + handler(self.bus, msg) + + def _on_state_changed( + self, bus: Gst.Bus, message: Gst.Message + ) -> None: # pylint: disable=unused-argument + """ + Handler for `state_changed` messages + By default will only log to `debug` + """ + old, new, _ = message.parse_state_changed() + + if message.src != self.pipeline: + return + logger.info( + "State changed from %s to %s", + Gst.Element.state_get_name(old), + Gst.Element.state_get_name(new), + ) + + self.events.pick_state(new) + + def _on_error(self, bus: Gst.Bus, message: Gst.Message) -> None: + """ + Handler for `error` messages + By default it will parse the error message, + log to `error` and append to `self.errors` + """ + err, debug = message.parse_error() + logger.error( + "Error received from element %s:%s on %s", message.src.get_name(), err.message, bus + ) + if debug is not None: + logger.error("Debugging information: %s", debug) + + self.teardown() + self.events.error.set() + raise PlayerPipelineError(err) + + def _on_eos(self, bus: Gst.Bus, message: Gst.Message) -> None: + """ + Handler for eos messages + By default it sets the eos event + """ + logger.info("Received EOS message on bus") + self.events.eos.set() + + def _on_async_done(self, bus: Gst.Bus, message: Gst.Message) -> None: + """ + Handler for `async_done` messages + By default, it will pop any futures available in `self.futures` + and call their result. + """ + msg = message.parse_async_done() + logger.debug("Unhandled ASYNC_DONE message: %s on %s", msg, bus) + + def _on_unhandled_msg(self, bus: Gst.Bus, message: Gst.Message) -> None: + """ + Handler for all other messages. + By default will just log with `debug` + """ + logger.debug("Unhandled msg: %s on %s", message.type, bus) + + def _on_qos(self, bus: Gst.Bus, message: Gst.Message) -> None: + """ + Handler for `qos` messages + By default it will parse the error message, + log to `error` and append to `self.errors` + """ + live, running_time, stream_time, timestamp, duration = message.parse_qos() + logger.warning( + "Qos message: live:%s - running:%s - stream:%s - timestamp:%s - duration:%s received from %s on %s", + live, + running_time, + stream_time, + timestamp, + duration, + message.src.get_name(), + bus, + ) + + @classmethod + async def create(cls: Type[P], pipeline: Gst.Pipeline) -> P: + """Player factory from a given pipeline that calls setup by default""" + player = cls(pipeline) + player.setup() + return player + + @classmethod + async def from_description(cls: Type[P], description: str) -> P: + """Player factory from a pipeline description""" + pipeline = Gst.parse_launch(description) + if not isinstance(pipeline, Gst.Pipeline): + raise ValueError("Invalid pipeline description") + return await cls.create(pipeline) class AsyncPlayer(Player): diff --git a/mypy.ini b/mypy.ini index c802076..5cd063b 100644 --- a/mypy.ini +++ b/mypy.ini @@ -24,7 +24,7 @@ disallow_any_generics=True disallow_incomplete_defs = True disallow_subclassing_any = True -disallow_untyped_decorators = True +disallow_untyped_decorators = False disallow_untyped_defs = True check_untyped_defs=True @@ -50,8 +50,5 @@ ignore_missing_imports = True [mypy-beppu.*] ignore_missing_imports = True -[mypy-pampy.*] +[mypy-simplug.*] ignore_missing_imports = True - -[mypy-pluggy.*] -ignore_missing_imports = True \ No newline at end of file diff --git a/pytest.ini b/pytest.ini index f3f947b..a44ebac 100644 --- a/pytest.ini +++ b/pytest.ini @@ -1,4 +1,4 @@ [pytest] addopts = -ra --tb=short --cov-report=xml --cov --mypy --flake8 --pylint --black timeout = 20 -timeout_method = thread \ No newline at end of file +timeout_method = thread diff --git a/tests/conftest.py b/tests/conftest.py index 413dbe9..9269057 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -48,17 +48,8 @@ def pipeline(Gst): return pipeline -@pytest.fixture -def error_pipeline(Gst): - """Error pipeline""" - ERROR_PIPELINE_DESCRIPTION = "filesrc ! queue ! fakesink" - pipeline = Gst.parse_launch(ERROR_PIPELINE_DESCRIPTION) - assert isinstance(pipeline, Gst.Pipeline) - return pipeline - - @pytest.fixture def player(Gst): - from mixtape.players import AsyncPlayer + from mixtape import Player - return AsyncPlayer + return Player diff --git a/tests/test_boombox.py b/tests/test_boombox.py new file mode 100644 index 0000000..5f159ce --- /dev/null +++ b/tests/test_boombox.py @@ -0,0 +1,22 @@ +# type: ignore +import pytest +import asyncio +from mixtape import BoomBox, hookspec + + +@pytest.mark.asyncio +async def test_boombox_usage(Gst): + class PluginA: + @hookspec.impl + def mixtape_add_pipelines(self, ctx): + return {"simple": "videotestsrc ! queue ! fakesink"} + + class PluginB: + @hookspec.impl + def mixtape_add_pipelines(self, ctx): + return {"error": "filesink ! queue ! fakesink"} + + b = await BoomBox.init(plugins=[PluginA, PluginB], name="simple") + await b.play() + await asyncio.sleep(5) + await b.stop() diff --git a/tests/test_core.py b/tests/test_core.py deleted file mode 100644 index 6b8cc1f..0000000 --- a/tests/test_core.py +++ /dev/null @@ -1,74 +0,0 @@ -# type: ignore -import pytest -import asyncio -from mixtape.core import BoomBox - -from mixtape import load_mixtape_plugins, hookimpl -from mixtape.core import Command - - -class ExamplePlugin: - PIPELINE_STR = "videotestsrc ! fakesink" - - def __init__(self): - self.state = None - - def setup_plugin_state(self): - """Initial setup""" - self.state = "Something not None" - - async def clear(self): - """Reset calibration on demand""" - - async def call(self): - """Call calibration on demand""" - - @hookimpl - def mixtape_setup(self, player, ctx): - # self.pipeline.get_element - self.setup_plugin_state() - - @hookimpl - def mixtape_register_commands(self, player, ctx): - return [Command("clear", self.clear), Command("call", self.call)] - - @hookimpl - def mixtape_get_pipeline(self, ctx): - pipeline_name = ctx.options.get("pipeline") - if pipeline_name == "test": - return self.PIPELINE_STR - - -@pytest.mark.asyncio -async def test_boombox_plugin_hooks(player, pipeline, mocker): - p = player(pipeline=pipeline) - pm = load_mixtape_plugins() - - assert not pm.get_plugins(), "Plugins should return empty set" - - plugin = ExamplePlugin() - - pm.register(plugin) - - assert pm.get_plugins(), "We now should have one plugin" - - b = BoomBox(player=p, pm=pm) - b.setup() - - assert plugin.state == "Something not None" - - await b.play() - await asyncio.sleep(3) - await b.call() - - await b.stop() - await b.clear() - assert b - - -def test_init_boombox_with_predefined_pipeline_from_plugin(player): - pm = load_mixtape_plugins() - plugin = ExamplePlugin() - pm.register(plugin) - b = BoomBox(player=None, pm=pm, pipeline="test") - assert b diff --git a/tests/test_events.py b/tests/test_events.py deleted file mode 100644 index e69de29..0000000 diff --git a/tests/test_player.py b/tests/test_player.py index cc4bd6b..2e13987 100644 --- a/tests/test_player.py +++ b/tests/test_player.py @@ -32,8 +32,9 @@ async def test_error_state_change_before_setup(pipeline): @pytest.mark.asyncio -async def test_gst_error_on_start_exception(error_pipeline): - player = Player(error_pipeline) +async def test_gst_error_on_start_exception(Gst): + pipeline = Gst.parse_launch("filesrc ! fakesink") + player = Player(pipeline) player.setup() with pytest.raises(PlayerSetStateError): From 468b10135a11341fd0eabfaf0414e417e421caef Mon Sep 17 00:00:00 2001 From: Ashley Camba Garrido Date: Thu, 14 Jan 2021 20:14:26 +0100 Subject: [PATCH 18/18] Add simplug to requirements --- req-install.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/req-install.txt b/req-install.txt index 501a5a6..d29117c 100644 --- a/req-install.txt +++ b/req-install.txt @@ -1,4 +1,5 @@ attrs beppu click==7.1.2 +simplug==0.0.6 prompt-toolkit==3.0.6