From 32582d0b2f29121c2d322478f4f108f6851b58e8 Mon Sep 17 00:00:00 2001 From: warbs816 Date: Sat, 2 May 2026 01:16:57 +0100 Subject: [PATCH 1/4] feat: Expose live media info for transcoded streams MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Captures live ffmpeg progress (bitrate, fps, frame, speed) from each SharedTranscodingProcess's stderr by extending the line-buffer to split on \r as well as \n (ffmpeg writes its periodic stats line with a trailing \r so it overwrites in-place in a terminal — without the \r split they got buffered until the next newline arrived and we missed every update). On stream start we additionally fire-and-forget an ffprobe against the source URL so codec/container/resolution/audio get populated without delaying the start. The merged media_info dict is exposed in GET /streams alongside existing per-stream stats so the m3u-editor UI can render Dispatcharr-style badges. media_info is empty for plain HTTP-proxy streams since there's no live ffmpeg producer. --- src/pooled_stream_manager.py | 182 ++++++++++++++++++++++++++++++++++- src/stream_manager.py | 16 +++ tests/test_media_info.py | 154 +++++++++++++++++++++++++++++ 3 files changed, 348 insertions(+), 4 deletions(-) create mode 100644 tests/test_media_info.py diff --git a/src/pooled_stream_manager.py b/src/pooled_stream_manager.py index c357e8e..c860436 100644 --- a/src/pooled_stream_manager.py +++ b/src/pooled_stream_manager.py @@ -5,6 +5,7 @@ import asyncio import json +import re import time import uuid import hashlib @@ -14,6 +15,12 @@ import os import tempfile +# Live ffmpeg progress fields written to stderr (e.g. +# "frame= 243 fps= 30 q=28.0 size= 1152kB time=00:00:08.13 bitrate=1162.5kbits/s speed=1.01x") +_FFMPEG_PROGRESS_FIELD_RE = re.compile( + r"\b(?Pframe|fps|bitrate|speed|q|size|time)\s*=\s*(?P\S+)" +) + logger = logging.getLogger(__name__) try: @@ -63,6 +70,11 @@ def __init__( self.last_access = time.time() self.total_bytes_served = 0 self.status = "starting" + # Live media info populated from ffprobe (codec/container/resolution/audio) + # and the active ffmpeg process's stderr (bitrate/fps/frame/speed). Empty + # for resolver streams (streamlink/yt-dlp) that don't go through ffmpeg. + self.media_info: Dict[str, Any] = {} + self._probe_task: Optional[asyncio.Task] = None # Broadcasting support - each client gets its own queue self.client_queues: Dict[str, asyncio.Queue] = {} @@ -350,6 +362,11 @@ async def start_process(self): # Start stderr logging task asyncio.create_task(self._log_stderr()) + # Probe the input async to populate codec/container/resolution. + # Fire-and-forget — never await this; stream startup must not block + # on ffprobe (it can take a few seconds against slow upstreams). + self._probe_task = asyncio.create_task(self._probe_input_async()) + # Start broadcaster task to read from FFmpeg and send to all clients if self.mode == "stdout": self._broadcaster_task = asyncio.create_task(self._broadcast_loop()) @@ -452,6 +469,149 @@ async def _hls_watch_loop(self): except Exception as e: logger.debug(f"HLS watch loop ended for {self.stream_id}: {e}") + async def _probe_input_async(self): + """ + Run ffprobe against the source URL on a background task and populate + media_info with codec/container/resolution/audio info. Fire-and-forget; + callers must not await this so stream startup is never delayed. + """ + cmd = [ + "ffprobe", + "-v", + "quiet", + "-print_format", + "json", + "-show_streams", + "-show_format", + ] + if ( + self.user_agent + and isinstance(self.url, str) + and ("://" in self.url and not self.url.startswith("file://")) + ): + cmd.extend(["-user_agent", self.user_agent]) + if ( + self.headers + and isinstance(self.url, str) + and ("://" in self.url and not self.url.startswith("file://")) + ): + header_str = "".join([f"{k}: {v}\r\n" for k, v in self.headers.items()]) + cmd.extend(["-headers", header_str]) + cmd.append(self.url) + + try: + proc = await asyncio.create_subprocess_exec( + *cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + try: + stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=15) + except asyncio.TimeoutError: + logger.debug( + f"ffprobe timed out for stream {self.stream_id}; skipping media_info population" + ) + try: + proc.kill() + except Exception: + pass + return + + if proc.returncode != 0 or not stdout: + return + + try: + data = json.loads(stdout.decode("utf-8", errors="ignore")) + except json.JSONDecodeError: + return + + video = next( + (s for s in data.get("streams", []) if s.get("codec_type") == "video"), + None, + ) + audio = next( + (s for s in data.get("streams", []) if s.get("codec_type") == "audio"), + None, + ) + fmt = data.get("format", {}) or {} + + info: Dict[str, Any] = {} + if video: + width = video.get("width") + height = video.get("height") + if width and height: + info["resolution"] = f"{width}x{height}" + if video.get("codec_name"): + info["video_codec"] = video["codec_name"] + # Static fps from input — gets overwritten by live -progress data later + rate = video.get("avg_frame_rate") or video.get("r_frame_rate") + if rate and "/" in rate: + try: + num, den = rate.split("/", 1) + if float(den) > 0: + info["fps"] = round(float(num) / float(den), 2) + except ValueError: + pass + if audio: + if audio.get("codec_name"): + info["audio_codec"] = audio["codec_name"] + channels = audio.get("channels") + if channels: + info["audio_channels"] = { + 1: "mono", + 2: "stereo", + 6: "5.1", + 8: "7.1", + }.get(int(channels), str(channels)) + container = fmt.get("format_name") + if container: + # ffprobe returns comma-separated synonyms (e.g. "mov,mp4,m4a") + info["container"] = container.split(",")[0].upper() + + # Merge — never clobber live fields already populated by ffmpeg progress + for key, value in info.items(): + self.media_info.setdefault(key, value) + except FileNotFoundError: + logger.warning("ffprobe binary not found; cannot populate media_info") + except Exception as e: + logger.debug( + f"Error probing input for stream {self.stream_id}: {e}" + ) + + def _parse_ffmpeg_progress(self, line_str: str) -> None: + """ + Extract live progress fields (bitrate, fps, frame, speed) from a single + ffmpeg stderr line and update media_info. No-op for non-progress lines. + """ + # Cheap pre-filter — most stderr lines aren't progress + if "bitrate=" not in line_str and "fps=" not in line_str: + return + for match in _FFMPEG_PROGRESS_FIELD_RE.finditer(line_str): + key = match.group("key") + raw = match.group("val") + if key == "bitrate": + if raw.endswith("kbits/s"): + try: + self.media_info["bitrate_kbps"] = round(float(raw[:-7]), 1) + except ValueError: + pass + elif key == "fps": + try: + self.media_info["fps"] = round(float(raw), 2) + except ValueError: + pass + elif key == "frame": + try: + self.media_info["frame"] = int(raw) + except ValueError: + pass + elif key == "speed": + if raw.endswith("x"): + try: + self.media_info["speed"] = round(float(raw[:-1]), 2) + except ValueError: + pass + async def _log_stderr(self): """Log FFmpeg stderr output and monitor for write errors and input failures""" if not self.process or not self.process.stderr: @@ -502,15 +662,29 @@ async def _log_stderr(self): buf += chunk - # Split on newline and process full lines - while b"\n" in buf: - line, buf = buf.split(b"\n", 1) + # Split on \n OR \r — ffmpeg writes its periodic stats line with + # a trailing \r so it overwrites in-place in a terminal. Without + # splitting on \r those stats lines are buffered until the next + # newline arrives and we miss the live bitrate/fps updates. + while True: + n_idx = buf.find(b"\n") + r_idx = buf.find(b"\r") + if n_idx == -1 and r_idx == -1: + break + if n_idx == -1: + idx = r_idx + elif r_idx == -1: + idx = n_idx + else: + idx = min(n_idx, r_idx) + line, buf = buf[:idx], buf[idx + 1 :] line_str = line.decode("utf-8", errors="ignore").strip() if not line_str: continue - # Log FFmpeg output (you could parse stats here) + # Log FFmpeg output and capture live progress fields logger.debug(f"FFmpeg [{self.stream_id}]: {line_str}") + self._parse_ffmpeg_progress(line_str) line_lower = line_str.lower() diff --git a/src/stream_manager.py b/src/stream_manager.py index ae59cc8..cbd827b 100644 --- a/src/stream_manager.py +++ b/src/stream_manager.py @@ -4531,6 +4531,21 @@ async def _monitor_connection_idle_time(self): client_info.idle_warning_logged = False client_info.idle_error_logged = False + def _get_media_info(self, stream: "StreamInfo") -> Dict[str, Any]: + """ + Look up live media info (codec/container/resolution/bitrate/fps) for a + stream. Returns an empty dict for non-transcoded streams since live + ffmpeg metadata only exists when ffmpeg is the active producer. + """ + if not stream.transcode_stream_key or not self.pooled_manager: + return {} + process = self.pooled_manager.shared_processes.get( + stream.transcode_stream_key + ) + if not process: + return {} + return dict(getattr(process, "media_info", {}) or {}) + def get_stats(self) -> Dict: """Get comprehensive stats - aggregates variant stream stats into parent streams""" # Only count non-variant streams @@ -4662,6 +4677,7 @@ def get_stats(self) -> Dict: "last_access": stream.last_access.isoformat(), "metadata": stream.metadata, "headers": stream.headers, + "media_info": self._get_media_info(stream), } for stream in non_variant_streams ], diff --git a/tests/test_media_info.py b/tests/test_media_info.py new file mode 100644 index 0000000..2539b87 --- /dev/null +++ b/tests/test_media_info.py @@ -0,0 +1,154 @@ +""" +Unit tests for live media_info population on transcoded streams. + +These tests cover the parser that extracts ffmpeg progress fields from stderr, +and verify that media_info propagates through stream_manager.get_stats() so the +m3u-editor UI can display live codec/bitrate/fps badges. +""" + +import sys +import os + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "src")) + +import pytest +from unittest.mock import MagicMock + + +def _make_process(): + """Build a minimal SharedTranscodingProcess for parser-level assertions.""" + from pooled_stream_manager import SharedTranscodingProcess + + return SharedTranscodingProcess( + stream_id="test-stream", + url="http://example.com/test.ts", + profile="default", + ffmpeg_args=["-i", "input", "-c", "copy", "-f", "mpegts", "pipe:1"], + ) + + +def test_media_info_starts_empty(): + """Fresh processes should have an empty media_info dict, not None.""" + process = _make_process() + + assert process.media_info == {} + + +def test_parse_ffmpeg_progress_extracts_live_fields(): + """A typical ffmpeg stats line should populate bitrate/fps/frame/speed.""" + process = _make_process() + + line = ( + "frame= 243 fps= 30 q=28.0 size= 1152kB " + "time=00:00:08.13 bitrate=1162.5kbits/s speed=1.01x" + ) + process._parse_ffmpeg_progress(line) + + assert process.media_info["bitrate_kbps"] == 1162.5 + assert process.media_info["fps"] == 30.0 + assert process.media_info["frame"] == 243 + assert process.media_info["speed"] == 1.01 + + +def test_parse_ffmpeg_progress_ignores_non_progress_lines(): + """Header/info lines should not contribute progress values.""" + process = _make_process() + + process._parse_ffmpeg_progress("Input #0, mpegts, from 'http://example.com':") + process._parse_ffmpeg_progress(" Duration: N/A, start: 1.400000, bitrate: N/A") + + # The "bitrate: N/A" form is not in kbits/s and should be skipped. + assert "bitrate_kbps" not in process.media_info + assert "fps" not in process.media_info + + +def test_parse_ffmpeg_progress_updates_overwrite_previous_values(): + """Each new progress line should overwrite the prior live snapshot.""" + process = _make_process() + + process._parse_ffmpeg_progress("frame=10 fps=25 bitrate=1000.0kbits/s speed=1.0x") + process._parse_ffmpeg_progress("frame=20 fps=30 bitrate=2000.5kbits/s speed=1.5x") + + assert process.media_info["frame"] == 20 + assert process.media_info["fps"] == 30.0 + assert process.media_info["bitrate_kbps"] == 2000.5 + assert process.media_info["speed"] == 1.5 + + +def test_get_media_info_returns_empty_for_non_transcoded_streams(): + """ + Plain HTTP-proxy streams (no ffmpeg) must return empty media_info — the + UI relies on this to hide metadata badges when there's nothing live to show. + """ + sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "src")) + from stream_manager import StreamInfo, StreamManager + from datetime import datetime, timezone + + manager = StreamManager.__new__(StreamManager) + manager.pooled_manager = None + + stream = StreamInfo( + stream_id="plain-stream", + original_url="http://example.com/plain.ts", + created_at=datetime.now(timezone.utc), + last_access=datetime.now(timezone.utc), + ) + + assert manager._get_media_info(stream) == {} + + +def test_get_media_info_pulls_from_linked_pooled_process(): + """ + Transcoded streams should surface the linked SharedTranscodingProcess's + media_info dict so live ffmpeg data reaches the API response. + """ + from stream_manager import StreamInfo, StreamManager + from datetime import datetime, timezone + + manager = StreamManager.__new__(StreamManager) + pooled = MagicMock() + fake_process = MagicMock() + fake_process.media_info = { + "resolution": "1920x1080", + "video_codec": "h264", + "fps": 30.0, + "bitrate_kbps": 4500.0, + } + pooled.shared_processes = {"key-abc": fake_process} + manager.pooled_manager = pooled + + stream = StreamInfo( + stream_id="t-stream", + original_url="http://example.com/t.ts", + created_at=datetime.now(timezone.utc), + last_access=datetime.now(timezone.utc), + transcode_stream_key="key-abc", + ) + + info = manager._get_media_info(stream) + + assert info["resolution"] == "1920x1080" + assert info["video_codec"] == "h264" + assert info["fps"] == 30.0 + assert info["bitrate_kbps"] == 4500.0 + + +@pytest.mark.asyncio +async def test_probe_input_async_does_not_block_on_missing_ffprobe(monkeypatch): + """ + If ffprobe isn't installed, _probe_input_async must swallow FileNotFoundError + rather than propagate it — the stream must still play even if probing fails. + """ + import asyncio + + process = _make_process() + + async def _raise(*args, **kwargs): + raise FileNotFoundError("ffprobe not on PATH") + + monkeypatch.setattr(asyncio, "create_subprocess_exec", _raise) + + # Should complete cleanly without raising. + await process._probe_input_async() + + assert process.media_info == {} From 87542d755d535c13c13c8ce52616995416f3a2b8 Mon Sep 17 00:00:00 2001 From: warbs816 Date: Sat, 2 May 2026 01:46:15 +0100 Subject: [PATCH 2/4] fix: Drop ffprobe; capture media info from ffmpeg's own stderr instead MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Running ffprobe against the source URL alongside ffmpeg doubled the upstream connection count, which trips per-user concurrent-connection limits at IPTV providers — the provider rejects the second connection (or both, looking like a duplicate session) and ffmpeg dies seconds after starting. That broke failover end-to-end: the old process was force-stopped on failover, the new one would also die almost immediately, and the client connection timed out before recovery. ffmpeg already prints all the codec/container/resolution/audio info we need in its own stderr at startup ("Input #0, FORMAT, from URL:" and "Stream #N:N: Video|Audio: ..." lines). Parse those instead — zero extra connections, and the data reflects what ffmpeg actually negotiated rather than what a separate probe might see. Live progress (bitrate/fps/frame/speed) was already coming from the periodic stats line, that path is unchanged. --- src/pooled_stream_manager.py | 185 +++++++++++++---------------------- tests/test_media_info.py | 101 +++++++++++++++---- 2 files changed, 154 insertions(+), 132 deletions(-) diff --git a/src/pooled_stream_manager.py b/src/pooled_stream_manager.py index c860436..84c5503 100644 --- a/src/pooled_stream_manager.py +++ b/src/pooled_stream_manager.py @@ -21,6 +21,23 @@ r"\b(?Pframe|fps|bitrate|speed|q|size|time)\s*=\s*(?P\S+)" ) +# "Input #0, mpegts, from 'http://...':" or +# "Input #0, mov,mp4,m4a,3gp,3g2,mj2, from 'file.mp4':" — captures the comma- +# separated container synonyms list (split into the canonical name later). +_FFMPEG_INPUT_LINE_RE = re.compile( + r"^\s*Input #\d+,\s*(?P[\w,]+),\s*from\s+" +) + +# "Stream #0:0[0x100]: Video: h264 (Main) ([27][0][0][0] / 0x001B), yuv420p..." +# "Stream #0:1[0x101](eng): Audio: aac (LC) ..." +_FFMPEG_STREAM_LINE_RE = re.compile( + r"Stream #\d+:\d+(?:\[[^\]]+\])?(?:\([^)]+\))?:\s+" + r"(?PVideo|Audio):\s+(?P
.+)$" +) + +# "1280x720" or "1920x1080 [SAR 1:1 DAR 16:9]" inside a Stream line. +_RESOLUTION_RE = re.compile(r"\b(?P\d{2,5})x(?P\d{2,5})\b") + logger = logging.getLogger(__name__) try: @@ -70,11 +87,14 @@ def __init__( self.last_access = time.time() self.total_bytes_served = 0 self.status = "starting" - # Live media info populated from ffprobe (codec/container/resolution/audio) - # and the active ffmpeg process's stderr (bitrate/fps/frame/speed). Empty - # for resolver streams (streamlink/yt-dlp) that don't go through ffmpeg. + # Live media info parsed from the active ffmpeg process's own stderr — + # codec/container/resolution/audio from the "Input #" and "Stream #" + # lines printed at startup, plus live bitrate/fps from the periodic + # progress line. We do NOT run a separate ffprobe against the source + # because that would double the upstream connection count and IPTV + # providers reject the second connection. Empty for resolver streams + # (streamlink/yt-dlp) that don't go through ffmpeg. self.media_info: Dict[str, Any] = {} - self._probe_task: Optional[asyncio.Task] = None # Broadcasting support - each client gets its own queue self.client_queues: Dict[str, asyncio.Queue] = {} @@ -359,14 +379,10 @@ async def start_process(self): self.status = "running" logger.info(f"Shared FFmpeg process started with PID: {self.process.pid}") - # Start stderr logging task + # Start stderr logging task — also parses codec/resolution/bitrate + # from ffmpeg's own output (no extra upstream connections). asyncio.create_task(self._log_stderr()) - # Probe the input async to populate codec/container/resolution. - # Fire-and-forget — never await this; stream startup must not block - # on ffprobe (it can take a few seconds against slow upstreams). - self._probe_task = asyncio.create_task(self._probe_input_async()) - # Start broadcaster task to read from FFmpeg and send to all clients if self.mode == "stdout": self._broadcaster_task = asyncio.create_task(self._broadcast_loop()) @@ -469,114 +485,51 @@ async def _hls_watch_loop(self): except Exception as e: logger.debug(f"HLS watch loop ended for {self.stream_id}: {e}") - async def _probe_input_async(self): + def _parse_ffmpeg_input_line(self, line_str: str) -> None: """ - Run ffprobe against the source URL on a background task and populate - media_info with codec/container/resolution/audio info. Fire-and-forget; - callers must not await this so stream startup is never delayed. + Parse ffmpeg's "Input #0, FORMAT, from 'URL':" line for the container + format. ffmpeg prints this once per input when it opens the stream. """ - cmd = [ - "ffprobe", - "-v", - "quiet", - "-print_format", - "json", - "-show_streams", - "-show_format", - ] - if ( - self.user_agent - and isinstance(self.url, str) - and ("://" in self.url and not self.url.startswith("file://")) - ): - cmd.extend(["-user_agent", self.user_agent]) - if ( - self.headers - and isinstance(self.url, str) - and ("://" in self.url and not self.url.startswith("file://")) - ): - header_str = "".join([f"{k}: {v}\r\n" for k, v in self.headers.items()]) - cmd.extend(["-headers", header_str]) - cmd.append(self.url) + match = _FFMPEG_INPUT_LINE_RE.match(line_str) + if not match: + return + container = match.group("container").strip().split(",")[0].strip() + if container: + self.media_info["container"] = container.upper() - try: - proc = await asyncio.create_subprocess_exec( - *cmd, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - ) - try: - stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=15) - except asyncio.TimeoutError: - logger.debug( - f"ffprobe timed out for stream {self.stream_id}; skipping media_info population" + def _parse_ffmpeg_stream_line(self, line_str: str) -> None: + """ + Parse ffmpeg's "Stream #0:N[...]: Video|Audio: ..." lines for codec + details. ffmpeg prints one per stream right after the input is opened + — this is free metadata that doesn't require a second connection. + """ + match = _FFMPEG_STREAM_LINE_RE.search(line_str) + if not match: + return + stream_type = match.group("type").lower() + details = match.group("details") + if stream_type == "video": + # Codec name is the first token, may be followed by a parenthesised + # profile (e.g. "h264 (Main) (HEVC / 0x...)" — strip parens). + codec = details.split(",", 1)[0].strip() + codec = re.sub(r"\s*\(.*", "", codec).strip() + if codec and "video_codec" not in self.media_info: + self.media_info["video_codec"] = codec + res_match = _RESOLUTION_RE.search(details) + if res_match and "resolution" not in self.media_info: + self.media_info["resolution"] = ( + f"{res_match.group('w')}x{res_match.group('h')}" ) - try: - proc.kill() - except Exception: - pass - return - - if proc.returncode != 0 or not stdout: - return - - try: - data = json.loads(stdout.decode("utf-8", errors="ignore")) - except json.JSONDecodeError: - return - - video = next( - (s for s in data.get("streams", []) if s.get("codec_type") == "video"), - None, - ) - audio = next( - (s for s in data.get("streams", []) if s.get("codec_type") == "audio"), - None, - ) - fmt = data.get("format", {}) or {} - - info: Dict[str, Any] = {} - if video: - width = video.get("width") - height = video.get("height") - if width and height: - info["resolution"] = f"{width}x{height}" - if video.get("codec_name"): - info["video_codec"] = video["codec_name"] - # Static fps from input — gets overwritten by live -progress data later - rate = video.get("avg_frame_rate") or video.get("r_frame_rate") - if rate and "/" in rate: - try: - num, den = rate.split("/", 1) - if float(den) > 0: - info["fps"] = round(float(num) / float(den), 2) - except ValueError: - pass - if audio: - if audio.get("codec_name"): - info["audio_codec"] = audio["codec_name"] - channels = audio.get("channels") - if channels: - info["audio_channels"] = { - 1: "mono", - 2: "stereo", - 6: "5.1", - 8: "7.1", - }.get(int(channels), str(channels)) - container = fmt.get("format_name") - if container: - # ffprobe returns comma-separated synonyms (e.g. "mov,mp4,m4a") - info["container"] = container.split(",")[0].upper() - - # Merge — never clobber live fields already populated by ffmpeg progress - for key, value in info.items(): - self.media_info.setdefault(key, value) - except FileNotFoundError: - logger.warning("ffprobe binary not found; cannot populate media_info") - except Exception as e: - logger.debug( - f"Error probing input for stream {self.stream_id}: {e}" - ) + elif stream_type == "audio": + codec = details.split(",", 1)[0].strip() + codec = re.sub(r"\s*\(.*", "", codec).strip() + if codec and "audio_codec" not in self.media_info: + self.media_info["audio_codec"] = codec + for layout in ("stereo", "mono", "5.1", "7.1", "quad"): + if f", {layout}," in details or f", {layout} " in details: + if "audio_channels" not in self.media_info: + self.media_info["audio_channels"] = layout + break def _parse_ffmpeg_progress(self, line_str: str) -> None: """ @@ -682,8 +635,10 @@ async def _log_stderr(self): if not line_str: continue - # Log FFmpeg output and capture live progress fields + # Log FFmpeg output and capture media info / live progress logger.debug(f"FFmpeg [{self.stream_id}]: {line_str}") + self._parse_ffmpeg_input_line(line_str) + self._parse_ffmpeg_stream_line(line_str) self._parse_ffmpeg_progress(line_str) line_lower = line_str.lower() diff --git a/tests/test_media_info.py b/tests/test_media_info.py index 2539b87..b079cad 100644 --- a/tests/test_media_info.py +++ b/tests/test_media_info.py @@ -1,9 +1,12 @@ """ Unit tests for live media_info population on transcoded streams. -These tests cover the parser that extracts ffmpeg progress fields from stderr, -and verify that media_info propagates through stream_manager.get_stats() so the -m3u-editor UI can display live codec/bitrate/fps badges. +These tests cover the parsers that extract codec/container/resolution/audio +info and live progress (bitrate/fps/frame/speed) from ffmpeg's own stderr +output, and verify media_info propagates through stream_manager.get_stats() +so the m3u-editor UI can display live badges. We deliberately do NOT run a +separate ffprobe against the source URL — that doubles the upstream connection +count and trips per-user limits at IPTV providers. """ import sys @@ -11,8 +14,7 @@ sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "src")) -import pytest -from unittest.mock import MagicMock +from unittest.mock import MagicMock # noqa: E402 def _make_process(): @@ -133,22 +135,87 @@ def test_get_media_info_pulls_from_linked_pooled_process(): assert info["bitrate_kbps"] == 4500.0 -@pytest.mark.asyncio -async def test_probe_input_async_does_not_block_on_missing_ffprobe(monkeypatch): +def test_parse_ffmpeg_input_line_extracts_container(): + """The 'Input #0, FORMAT, from URL:' line should populate container.""" + process = _make_process() + + process._parse_ffmpeg_input_line( + "Input #0, mpegts, from 'http://example.com/stream.ts':" + ) + + assert process.media_info["container"] == "MPEGTS" + + +def test_parse_ffmpeg_input_line_takes_first_synonym(): + """When ffmpeg lists multiple format synonyms, take the first one.""" + process = _make_process() + + process._parse_ffmpeg_input_line( + "Input #0, mov,mp4,m4a,3gp,3g2,mj2, from 'file.mp4':" + ) + + assert process.media_info["container"] == "MOV" + + +def test_parse_ffmpeg_stream_line_extracts_video_codec_and_resolution(): + """Video stream lines populate video_codec and resolution.""" + process = _make_process() + + process._parse_ffmpeg_stream_line( + " Stream #0:0[0x100]: Video: h264 (Main) ([27][0][0][0] / 0x001B), " + "yuv420p(progressive), 1280x720 [SAR 1:1 DAR 16:9], 50 fps, 50 tbr, 90k tbn" + ) + + assert process.media_info["video_codec"] == "h264" + assert process.media_info["resolution"] == "1280x720" + + +def test_parse_ffmpeg_stream_line_extracts_audio_codec_and_channels(): + """Audio stream lines populate audio_codec and audio_channels.""" + process = _make_process() + + process._parse_ffmpeg_stream_line( + " Stream #0:1[0x101](eng): Audio: aac (LC) ([15][0][0][0] / 0x000F), " + "48000 Hz, stereo, fltp, 192 kb/s" + ) + + assert process.media_info["audio_codec"] == "aac" + assert process.media_info["audio_channels"] == "stereo" + + +def test_parse_ffmpeg_stream_line_handles_5_1_channel_layout(): + """5.1 surround layout should map correctly.""" + process = _make_process() + + process._parse_ffmpeg_stream_line( + " Stream #0:1: Audio: ac3, 48000 Hz, 5.1, fltp, 384 kb/s" + ) + + assert process.media_info["audio_channels"] == "5.1" + + +def test_parse_ffmpeg_stream_line_does_not_clobber_existing_codec(): """ - If ffprobe isn't installed, _probe_input_async must swallow FileNotFoundError - rather than propagate it — the stream must still play even if probing fails. + The first Video stream wins so we don't overwrite with secondary streams + (e.g. embedded thumbnails). Live progress fields (fps/bitrate) are still + free to update because they're handled by _parse_ffmpeg_progress. """ - import asyncio - process = _make_process() + process.media_info["video_codec"] = "h264" - async def _raise(*args, **kwargs): - raise FileNotFoundError("ffprobe not on PATH") + process._parse_ffmpeg_stream_line( + " Stream #0:2: Video: png, rgba, 256x256" + ) - monkeypatch.setattr(asyncio, "create_subprocess_exec", _raise) + assert process.media_info["video_codec"] == "h264" - # Should complete cleanly without raising. - await process._probe_input_async() - assert process.media_info == {} +def test_parse_ffmpeg_input_line_ignores_unrelated_lines(): + """Non-Input lines should be no-ops.""" + process = _make_process() + + process._parse_ffmpeg_input_line( + "frame= 243 fps= 30 bitrate=1162.5kbits/s speed=1.01x" + ) + + assert "container" not in process.media_info From c84f3b9fa9c165059d8913ad6d55f3b38d20b687 Mon Sep 17 00:00:00 2001 From: warbs816 Date: Sat, 2 May 2026 02:11:06 +0100 Subject: [PATCH 3/4] fix: Stop failover racing with the new transcoding process it just started MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two concurrent issues caused the freshly-started failover ffmpeg to die two seconds after it started, leaving the stream alive in name only (is_active true, no data flowing, media_info empty). 1. _try_update_failover_url awaited force_stop_stream before clearing stream_info.transcode_stream_key. While that await was in flight the client's streaming generator woke from failover_event, read the still-set key, and called get_or_create_shared_stream(reuse_stream_key=OLD_KEY, url=NEW_URL) — which detected url_changed, cleaned up the old process, and stood up a new SharedTranscodingProcess at the same key with the new URL. Clear the key first so concurrent readers see None and generate a fresh key from the new URL. 2. force_stop_stream took a reference to the old process, awaited per-client removal, then called _cleanup_local_process(stream_key) at the end. By the time that ran the dict could already hold the brand-new process the reconnecting client just installed at the same key, and we'd tear it down instead. Pop and capture up front; clean up the captured reference. --- src/pooled_stream_manager.py | 31 ++++++++++++++++++---- src/stream_manager.py | 27 ++++++++++++------- tests/test_media_info.py | 51 ++++++++++++++++++++++++++++++++++++ 3 files changed, 94 insertions(+), 15 deletions(-) diff --git a/src/pooled_stream_manager.py b/src/pooled_stream_manager.py index 84c5503..7b38d89 100644 --- a/src/pooled_stream_manager.py +++ b/src/pooled_stream_manager.py @@ -1422,16 +1422,21 @@ async def remove_client_from_stream(self, client_id: str): async def force_stop_stream(self, stream_key: str): """ Immediately stop a stream and its FFmpeg process without grace period. - Used when explicitly deleting a stream via API. + Used when explicitly deleting a stream via API and during failover. + + Pops the entry up front and cleans up the captured reference so a + concurrent get_or_create_shared_stream that re-inserts at the same key + (e.g. a failover client racing this teardown) isn't torn down by the + trailing cleanup. """ - if stream_key not in self.shared_processes: + process = self.shared_processes.pop(stream_key, None) + if process is None: logger.info(f"Stream {stream_key} not found in local processes") return False logger.info( f"Force stopping stream {stream_key} and terminating FFmpeg process" ) - process = self.shared_processes[stream_key] # Remove all clients from this stream immediately clients_to_remove = list(process.clients.keys()) @@ -1440,8 +1445,24 @@ async def force_stop_stream(self, stream_key: str): if client_id in self.client_streams: del self.client_streams[client_id] - # Immediately cleanup the FFmpeg process - await self._cleanup_local_process(stream_key) + # Tear down the captured process — NOT whatever is at stream_key now. + await process.cleanup() + + # Mirror the auxiliary cleanup that _cleanup_local_process does so the + # state stays consistent regardless of which path called us. + if stream_key in self.stream_key_to_id: + del self.stream_key_to_id[stream_key] + if self.redis_client: + try: + redis_key = f"stream:{stream_key}" + await self.redis_client.delete(redis_key) + await self.redis_client.srem( + f"worker:{self.worker_id}:streams", redis_key + ) + except Exception as e: + logger.warning( + f"Error cleaning up Redis state for stream {stream_key}: {e}" + ) logger.info(f"Stream {stream_key} force stopped, FFmpeg process terminated") return True diff --git a/src/stream_manager.py b/src/stream_manager.py index cbd827b..da579a6 100644 --- a/src/stream_manager.py +++ b/src/stream_manager.py @@ -4315,20 +4315,27 @@ async def _try_update_failover_url( stream_info.failover_event.clear() stream_info.failover_event.set() - # For transcoded streams, stop and restart the transcoding process + # For transcoded streams, stop and restart the transcoding process. + # Clear the stream key BEFORE awaiting force_stop_stream — otherwise a + # client coroutine woken by failover_event can read the still-set + # transcode_stream_key and pass it as reuse_stream_key, which creates a + # new SharedTranscodingProcess at the same key. The tail-end + # _cleanup_local_process inside force_stop_stream then pops whatever is + # at that key (the brand-new process) and tears down its ffmpeg, killing + # the failover stream a couple of seconds after it started. if stream_info.is_transcoded and self.pooled_manager: - try: - # Stop the old transcoding process - if stream_info.transcode_stream_key: + old_stream_key = stream_info.transcode_stream_key + stream_info.transcode_stream_key = None + if old_stream_key: + try: logger.info( - f"Stopping transcoding process for failover: {stream_info.transcode_stream_key}" + f"Stopping transcoding process for failover: {old_stream_key}" ) - await self.pooled_manager.force_stop_stream( - stream_info.transcode_stream_key + await self.pooled_manager.force_stop_stream(old_stream_key) + except Exception as e: + logger.error( + f"Error stopping transcoding process during failover: {e}" ) - stream_info.transcode_stream_key = None - except Exception as e: - logger.error(f"Error stopping transcoding process during failover: {e}") # Emit failover event await self._emit_event( diff --git a/tests/test_media_info.py b/tests/test_media_info.py index b079cad..33de52a 100644 --- a/tests/test_media_info.py +++ b/tests/test_media_info.py @@ -219,3 +219,54 @@ def test_parse_ffmpeg_input_line_ignores_unrelated_lines(): ) assert "container" not in process.media_info + + +def test_force_stop_stream_does_not_clobber_concurrent_reinsertion(): + """ + Regression for failover race: force_stop_stream must tear down the process + it was called for, even if a concurrent get_or_create_shared_stream + re-inserts a brand-new SharedTranscodingProcess at the same stream_key + while the old one is being awaited. Otherwise the freshly-started failover + transcoder gets killed seconds after it starts and the stream goes dark. + """ + import asyncio + from pooled_stream_manager import PooledStreamManager + + async def _run(): + # Build a manager without the redis/event-loop setup; we only exercise + # the in-memory force_stop_stream path. + manager = PooledStreamManager.__new__(PooledStreamManager) + manager.shared_processes = {} + manager.client_streams = {} + manager.stream_key_to_id = {} + manager.redis_client = None + manager.worker_id = "test-worker" + + # The "old" process — replace its async methods with stubs so we don't + # actually need a running ffmpeg. + old = _make_process() + old.process = None # cleanup() short-circuits when there's no process + old.clients = {"client-a": 0.0} + + async def remove_client(client_id): + # During this await, another coroutine swaps in a new process at + # the same stream_key — simulating the client reconnect that + # caused the original race. + new = _make_process() + new.media_info["video_codec"] = "h264" + manager.shared_processes["key-X"] = new + old.clients.pop(client_id, None) + + old.remove_client = remove_client + manager.shared_processes["key-X"] = old + + await manager.force_stop_stream("key-X") + + # The new process inserted mid-flight must still be present — only the + # captured 'old' reference should have been torn down. + survivor = manager.shared_processes.get("key-X") + assert survivor is not None, "force_stop_stream destroyed the new process" + assert survivor is not old + assert survivor.media_info.get("video_codec") == "h264" + + asyncio.run(_run()) From b7c988a6b299b8f5479245991921864b43bddf2c Mon Sep 17 00:00:00 2001 From: Shaun Parkison Date: Sat, 2 May 2026 08:20:59 -0600 Subject: [PATCH 4/4] chore: Optimize audio regex for stream metadata scrape --- src/pooled_stream_manager.py | 32 +++++++++++++++++++++----------- src/stream_manager.py | 4 +--- tests/test_media_info.py | 33 +++++++++++++++++++++++++++++---- 3 files changed, 51 insertions(+), 18 deletions(-) diff --git a/src/pooled_stream_manager.py b/src/pooled_stream_manager.py index 7b38d89..f479c60 100644 --- a/src/pooled_stream_manager.py +++ b/src/pooled_stream_manager.py @@ -29,12 +29,19 @@ ) # "Stream #0:0[0x100]: Video: h264 (Main) ([27][0][0][0] / 0x001B), yuv420p..." +# "Stream #0:0[0x100][0x200]: Video: hevc ..." (dual PID bracket groups in MPEG-TS) # "Stream #0:1[0x101](eng): Audio: aac (LC) ..." _FFMPEG_STREAM_LINE_RE = re.compile( - r"Stream #\d+:\d+(?:\[[^\]]+\])?(?:\([^)]+\))?:\s+" + r"Stream #\d+:\d+(?:\[[^\]]+\])*(?:\([^)]+\))?:\s+" r"(?PVideo|Audio):\s+(?P
.+)$" ) +# Audio channel layout: "stereo", "mono", "5.1", "5.1(side)", "5.1(back)", "7.1", "quad", etc. +# The (?:\([^)]*\))? tolerates the variant suffixes ffmpeg appends to 5.1/7.1. +_AUDIO_LAYOUT_RE = re.compile( + r",\s*(?Pstereo|mono|5\.1|7\.1|quad)(?:\([^)]*\))?[,\s]" +) + # "1280x720" or "1920x1080 [SAR 1:1 DAR 16:9]" inside a Stream line. _RESOLUTION_RE = re.compile(r"\b(?P\d{2,5})x(?P\d{2,5})\b") @@ -494,7 +501,7 @@ def _parse_ffmpeg_input_line(self, line_str: str) -> None: if not match: return container = match.group("container").strip().split(",")[0].strip() - if container: + if container and "container" not in self.media_info: self.media_info["container"] = container.upper() def _parse_ffmpeg_stream_line(self, line_str: str) -> None: @@ -525,11 +532,9 @@ def _parse_ffmpeg_stream_line(self, line_str: str) -> None: codec = re.sub(r"\s*\(.*", "", codec).strip() if codec and "audio_codec" not in self.media_info: self.media_info["audio_codec"] = codec - for layout in ("stereo", "mono", "5.1", "7.1", "quad"): - if f", {layout}," in details or f", {layout} " in details: - if "audio_channels" not in self.media_info: - self.media_info["audio_channels"] = layout - break + layout_match = _AUDIO_LAYOUT_RE.search(details) + if layout_match and "audio_channels" not in self.media_info: + self.media_info["audio_channels"] = layout_match.group("layout") def _parse_ffmpeg_progress(self, line_str: str) -> None: """ @@ -635,11 +640,16 @@ async def _log_stderr(self): if not line_str: continue - # Log FFmpeg output and capture media info / live progress + # Log FFmpeg output and capture media info / live progress. + # Skip parsers for resolver (yt-dlp/streamlink) stderr — their + # output format is not ffmpeg's, so the regexes won't match but + # a coincidental "fps=" or "bitrate=" in debug output could + # populate stale values. logger.debug(f"FFmpeg [{self.stream_id}]: {line_str}") - self._parse_ffmpeg_input_line(line_str) - self._parse_ffmpeg_stream_line(line_str) - self._parse_ffmpeg_progress(line_str) + if not self.resolver_type: + self._parse_ffmpeg_input_line(line_str) + self._parse_ffmpeg_stream_line(line_str) + self._parse_ffmpeg_progress(line_str) line_lower = line_str.lower() diff --git a/src/stream_manager.py b/src/stream_manager.py index da579a6..cb68a1f 100644 --- a/src/stream_manager.py +++ b/src/stream_manager.py @@ -4546,9 +4546,7 @@ def _get_media_info(self, stream: "StreamInfo") -> Dict[str, Any]: """ if not stream.transcode_stream_key or not self.pooled_manager: return {} - process = self.pooled_manager.shared_processes.get( - stream.transcode_stream_key - ) + process = self.pooled_manager.shared_processes.get(stream.transcode_stream_key) if not process: return {} return dict(getattr(process, "media_info", {}) or {}) diff --git a/tests/test_media_info.py b/tests/test_media_info.py index 33de52a..907c36e 100644 --- a/tests/test_media_info.py +++ b/tests/test_media_info.py @@ -82,7 +82,6 @@ def test_get_media_info_returns_empty_for_non_transcoded_streams(): Plain HTTP-proxy streams (no ffmpeg) must return empty media_info — the UI relies on this to hide metadata badges when there's nothing live to show. """ - sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "src")) from stream_manager import StreamInfo, StreamManager from datetime import datetime, timezone @@ -194,6 +193,34 @@ def test_parse_ffmpeg_stream_line_handles_5_1_channel_layout(): assert process.media_info["audio_channels"] == "5.1" +def test_parse_ffmpeg_stream_line_handles_dual_pid_brackets(): + """ + MPEG-TS streams commonly emit two consecutive PID bracket groups + (e.g. [0x100][0x200]). The regex must match both so video_codec and + resolution are populated — regression guard for the ? → * fix. + """ + process = _make_process() + + process._parse_ffmpeg_stream_line( + " Stream #0:0[0x100][0x200]: Video: hevc (Main), " + "yuv420p(tv, bt709), 1920x1080, 50 fps, 50 tbr, 90k tbn" + ) + + assert process.media_info["video_codec"] == "hevc" + assert process.media_info["resolution"] == "1920x1080" + + +def test_parse_ffmpeg_stream_line_handles_5_1_side_channel_layout(): + """5.1(side) channel layout variant should map to '5.1'.""" + process = _make_process() + + process._parse_ffmpeg_stream_line( + " Stream #0:1: Audio: eac3, 48000 Hz, 5.1(side), fltp, 384 kb/s" + ) + + assert process.media_info["audio_channels"] == "5.1" + + def test_parse_ffmpeg_stream_line_does_not_clobber_existing_codec(): """ The first Video stream wins so we don't overwrite with secondary streams @@ -203,9 +230,7 @@ def test_parse_ffmpeg_stream_line_does_not_clobber_existing_codec(): process = _make_process() process.media_info["video_codec"] = "h264" - process._parse_ffmpeg_stream_line( - " Stream #0:2: Video: png, rgba, 256x256" - ) + process._parse_ffmpeg_stream_line(" Stream #0:2: Video: png, rgba, 256x256") assert process.media_info["video_codec"] == "h264"