From 6fc2f1bc2ae77ecec2b01efdb4ef5c2b9badfa55 Mon Sep 17 00:00:00 2001 From: warbs816 Date: Sun, 3 May 2026 17:20:22 +0100 Subject: [PATCH 1/2] feat: Expose output_media_info for transcoded stream encoder side MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Pairs with the m3u-editor side: surfaces what ffmpeg is *producing* (encoder codec, target resolution, muxer container) alongside the existing media_info which describes the source ffmpeg is reading. The Stream Monitor UI's existing "Stream Info" badge row reflects the input; the editor adds an "Output Info" row to render this new payload separately. The parser now tracks state — once ffmpeg's "Output #" line is observed, subsequent "Stream #N: Video|Audio:" lines populate output_media_info instead of media_info. Live progress fields (bitrate/fps) describe encoder output and are dual-written so existing Stream Info readers keep working without coupling to a coordinated client release. --- src/pooled_stream_manager.py | 85 ++++++++++++---- src/stream_manager.py | 16 +++ tests/test_media_info.py | 187 +++++++++++++++++++++++++++++++++++ 3 files changed, 271 insertions(+), 17 deletions(-) diff --git a/src/pooled_stream_manager.py b/src/pooled_stream_manager.py index a76d3bb..c701fce 100644 --- a/src/pooled_stream_manager.py +++ b/src/pooled_stream_manager.py @@ -28,6 +28,13 @@ r"^\s*Input #\d+,\s*(?P[\w,]+),\s*from\s+" ) +# "Output #0, mpegts, to 'pipe:1':" or "Output #0, hls, to '/path/index.m3u8':". +# Once this line is emitted by ffmpeg, every subsequent Stream # line refers to +# the encoder/muxer output rather than the source input. +_FFMPEG_OUTPUT_LINE_RE = re.compile( + r"^\s*Output #\d+,\s*(?P[\w,]+),\s*to\s+" +) + # "Stream #0:0[0x100]: Video: h264 (Main) ([27][0][0][0] / 0x001B), yuv420p..." # "Stream #0:0[0x100][0x200]: Video: hevc ..." (dual PID bracket groups in MPEG-TS) # "Stream #0:1[0x101](eng): Audio: aac (LC) ..." @@ -102,6 +109,15 @@ def __init__( # providers reject the second connection. Empty for resolver streams # (streamlink/yt-dlp) that don't go through ffmpeg. self.media_info: Dict[str, Any] = {} + # Output-side counterpart describing what ffmpeg is producing right now + # (encoder codec / target resolution / muxer container) plus the live + # progress fields, which are technically output stats. Populated only + # after ffmpeg prints its "Output #" line, so it stays empty for plain + # passthrough where no ffmpeg process is involved. + self.output_media_info: Dict[str, Any] = {} + # Parser state: flips True once "Output #" is seen in stderr so the + # Stream-line parser knows where to route subsequent codec lines. + self._parsing_output: bool = False # Broadcasting support - each client gets its own queue self.client_queues: Dict[str, asyncio.Queue] = {} @@ -504,15 +520,34 @@ def _parse_ffmpeg_input_line(self, line_str: str) -> None: if container and "container" not in self.media_info: self.media_info["container"] = container.upper() + def _parse_ffmpeg_output_line(self, line_str: str) -> None: + """ + Parse ffmpeg's "Output #0, FORMAT, to 'TARGET':" line. Captures the + muxer container into output_media_info and flips the parser state so + that subsequent Stream # lines are recognised as output streams. Only + the first Output block is honoured — multi-output configs (e.g. tee) + would each restate the marker but we keep the first match. + """ + match = _FFMPEG_OUTPUT_LINE_RE.match(line_str) + if not match: + return + self._parsing_output = True + container = match.group("container").strip().split(",")[0].strip() + if container and "container" not in self.output_media_info: + self.output_media_info["container"] = container.upper() + def _parse_ffmpeg_stream_line(self, line_str: str) -> None: """ Parse ffmpeg's "Stream #0:N[...]: Video|Audio: ..." lines for codec details. ffmpeg prints one per stream right after the input is opened — this is free metadata that doesn't require a second connection. + Routes into media_info or output_media_info based on whether the + "Output #" marker has been seen yet. """ match = _FFMPEG_STREAM_LINE_RE.search(line_str) if not match: return + target = self.output_media_info if self._parsing_output else self.media_info stream_type = match.group("type").lower() details = match.group("details") if stream_type == "video": @@ -520,55 +555,70 @@ def _parse_ffmpeg_stream_line(self, line_str: str) -> None: # profile (e.g. "h264 (Main) (HEVC / 0x...)" — strip parens). codec = details.split(",", 1)[0].strip() codec = re.sub(r"\s*\(.*", "", codec).strip() - if codec and "video_codec" not in self.media_info: - self.media_info["video_codec"] = codec + if codec and "video_codec" not in target: + target["video_codec"] = codec res_match = _RESOLUTION_RE.search(details) - if res_match and "resolution" not in self.media_info: - self.media_info["resolution"] = ( + if res_match and "resolution" not in target: + target["resolution"] = ( f"{res_match.group('w')}x{res_match.group('h')}" ) elif stream_type == "audio": codec = details.split(",", 1)[0].strip() codec = re.sub(r"\s*\(.*", "", codec).strip() - if codec and "audio_codec" not in self.media_info: - self.media_info["audio_codec"] = codec + if codec and "audio_codec" not in target: + target["audio_codec"] = codec layout_match = _AUDIO_LAYOUT_RE.search(details) - if layout_match and "audio_channels" not in self.media_info: - self.media_info["audio_channels"] = layout_match.group("layout") + if layout_match and "audio_channels" not in target: + target["audio_channels"] = layout_match.group("layout") def _parse_ffmpeg_progress(self, line_str: str) -> None: """ Extract live progress fields (bitrate, fps, frame, speed) from a single - ffmpeg stderr line and update media_info. No-op for non-progress lines. + ffmpeg stderr line. Progress numbers describe the encoder output, so + once the "Output #" marker has been seen we mirror them into + output_media_info. We also keep writing them to media_info so existing + callers (Stream Info badge row in m3u-editor PR #1089) don't regress. + No-op for non-progress lines. """ # Cheap pre-filter — most stderr lines aren't progress if "bitrate=" not in line_str and "fps=" not in line_str: return + targets = [self.media_info] + if self._parsing_output: + targets.append(self.output_media_info) for match in _FFMPEG_PROGRESS_FIELD_RE.finditer(line_str): key = match.group("key") raw = match.group("val") if key == "bitrate": if raw.endswith("kbits/s"): try: - self.media_info["bitrate_kbps"] = round(float(raw[:-7]), 1) + value = round(float(raw[:-7]), 1) except ValueError: - pass + continue + for t in targets: + t["bitrate_kbps"] = value elif key == "fps": try: - self.media_info["fps"] = round(float(raw), 2) + value = round(float(raw), 2) except ValueError: - pass + continue + for t in targets: + t["fps"] = value elif key == "frame": try: - self.media_info["frame"] = int(raw) + value = int(raw) except ValueError: - pass + continue + for t in targets: + t["frame"] = value elif key == "speed": if raw.endswith("x"): try: - self.media_info["speed"] = round(float(raw[:-1]), 2) + value = round(float(raw[:-1]), 2) except ValueError: - pass + continue + for t in targets: + t["speed"] = value async def _log_stderr(self): """Log FFmpeg stderr output and monitor for write errors and input failures""" @@ -650,6 +700,7 @@ async def _log_stderr(self): # populate stale values. if not self.resolver_type: self._parse_ffmpeg_input_line(line_str) + self._parse_ffmpeg_output_line(line_str) self._parse_ffmpeg_stream_line(line_str) self._parse_ffmpeg_progress(line_str) diff --git a/src/stream_manager.py b/src/stream_manager.py index cb68a1f..94fe93b 100644 --- a/src/stream_manager.py +++ b/src/stream_manager.py @@ -4551,6 +4551,21 @@ def _get_media_info(self, stream: "StreamInfo") -> Dict[str, Any]: return {} return dict(getattr(process, "media_info", {}) or {}) + def _get_output_media_info(self, stream: "StreamInfo") -> Dict[str, Any]: + """ + Look up live encoder/muxer output info for a transcoded stream — the + codec/container/resolution ffmpeg has been told to produce, plus the + live progress fields that describe what's being written downstream. + Empty for non-transcoded streams (no ffmpeg process) or before ffmpeg + emits its "Output #" line. + """ + if not stream.transcode_stream_key or not self.pooled_manager: + return {} + process = self.pooled_manager.shared_processes.get(stream.transcode_stream_key) + if not process: + return {} + return dict(getattr(process, "output_media_info", {}) or {}) + def get_stats(self) -> Dict: """Get comprehensive stats - aggregates variant stream stats into parent streams""" # Only count non-variant streams @@ -4683,6 +4698,7 @@ def get_stats(self) -> Dict: "metadata": stream.metadata, "headers": stream.headers, "media_info": self._get_media_info(stream), + "output_media_info": self._get_output_media_info(stream), } for stream in non_variant_streams ], diff --git a/tests/test_media_info.py b/tests/test_media_info.py index 907c36e..b6ad47f 100644 --- a/tests/test_media_info.py +++ b/tests/test_media_info.py @@ -246,6 +246,193 @@ def test_parse_ffmpeg_input_line_ignores_unrelated_lines(): assert "container" not in process.media_info +def test_output_media_info_starts_empty_with_parser_state_unset(): + """Fresh processes track output info separately and start in input mode.""" + process = _make_process() + + assert process.output_media_info == {} + assert process._parsing_output is False + + +def test_parse_ffmpeg_output_line_captures_container_and_flips_state(): + """The 'Output #0, FORMAT, to TARGET:' line should populate output_media_info + and switch the parser into output mode for subsequent Stream lines.""" + process = _make_process() + + process._parse_ffmpeg_output_line("Output #0, mpegts, to 'pipe:1':") + + assert process.output_media_info["container"] == "MPEGTS" + assert process._parsing_output is True + # Input-side dict must not be polluted with the output container. + assert "container" not in process.media_info + + +def test_parse_ffmpeg_output_line_takes_first_synonym(): + """Multi-synonym output containers collapse to the first canonical name.""" + process = _make_process() + + process._parse_ffmpeg_output_line( + "Output #0, mov,mp4,m4a,3gp,3g2,mj2, to '/tmp/out.mp4':" + ) + + assert process.output_media_info["container"] == "MOV" + + +def test_parse_ffmpeg_output_line_handles_hls_muxer(): + """HLS output ('Output #0, hls, to ...') is the common transcoded path.""" + process = _make_process() + + process._parse_ffmpeg_output_line("Output #0, hls, to '/tmp/abc/index.m3u8':") + + assert process.output_media_info["container"] == "HLS" + + +def test_stream_lines_before_output_marker_go_to_input_media_info(): + """While the parser hasn't seen Output #0, codec lines describe the source.""" + process = _make_process() + + process._parse_ffmpeg_stream_line( + " Stream #0:0[0x100]: Video: hevc (Main), yuv420p, 1920x1080, 50 fps" + ) + process._parse_ffmpeg_stream_line( + " Stream #0:1[0x101](eng): Audio: ac3, 48000 Hz, 5.1, fltp, 384 kb/s" + ) + + assert process.media_info["video_codec"] == "hevc" + assert process.media_info["resolution"] == "1920x1080" + assert process.media_info["audio_codec"] == "ac3" + assert process.media_info["audio_channels"] == "5.1" + assert process.output_media_info == {} + + +def test_stream_lines_after_output_marker_go_to_output_media_info(): + """Once Output # is seen, the encoder Stream lines populate output_media_info, + leaving the input description intact.""" + process = _make_process() + + process._parse_ffmpeg_stream_line( + " Stream #0:0[0x100]: Video: hevc, yuv420p, 1920x1080, 50 fps" + ) + process._parse_ffmpeg_stream_line( + " Stream #0:1[0x101]: Audio: ac3, 48000 Hz, 5.1, fltp, 384 kb/s" + ) + process._parse_ffmpeg_output_line("Output #0, mpegts, to 'pipe:1':") + process._parse_ffmpeg_stream_line( + " Stream #0:0: Video: h264, yuv420p, 1280x720, q=2-31, 25 fps" + ) + process._parse_ffmpeg_stream_line( + " Stream #0:1: Audio: aac (LC), 48000 Hz, stereo, fltp, 128 kb/s" + ) + + # Source side untouched + assert process.media_info["video_codec"] == "hevc" + assert process.media_info["resolution"] == "1920x1080" + assert process.media_info["audio_codec"] == "ac3" + assert process.media_info["audio_channels"] == "5.1" + # Encoder side captured separately + assert process.output_media_info["video_codec"] == "h264" + assert process.output_media_info["resolution"] == "1280x720" + assert process.output_media_info["audio_codec"] == "aac" + assert process.output_media_info["audio_channels"] == "stereo" + assert process.output_media_info["container"] == "MPEGTS" + + +def test_progress_fields_dual_write_once_output_seen(): + """Progress numbers describe encoder output. Before Output #0 they only + populate media_info (so the existing UI keeps getting bitrate/fps even on + very early reads); after Output #0 they mirror into output_media_info.""" + process = _make_process() + + process._parse_ffmpeg_progress( + "frame=10 fps=25 bitrate=1000.0kbits/s speed=1.0x" + ) + assert process.media_info["bitrate_kbps"] == 1000.0 + assert process.media_info["fps"] == 25.0 + assert "bitrate_kbps" not in process.output_media_info + + process._parse_ffmpeg_output_line("Output #0, mpegts, to 'pipe:1':") + process._parse_ffmpeg_progress( + "frame=20 fps=30 bitrate=2000.5kbits/s speed=1.5x" + ) + + assert process.media_info["bitrate_kbps"] == 2000.5 + assert process.media_info["fps"] == 30.0 + assert process.output_media_info["bitrate_kbps"] == 2000.5 + assert process.output_media_info["fps"] == 30.0 + assert process.output_media_info["frame"] == 20 + assert process.output_media_info["speed"] == 1.5 + + +def test_parse_ffmpeg_output_line_does_not_clobber_existing_container(): + """Some configs may emit multiple Output # markers (e.g. tee); only the + first wins so output_media_info reflects the primary muxer.""" + process = _make_process() + + process._parse_ffmpeg_output_line("Output #0, mpegts, to 'pipe:1':") + process._parse_ffmpeg_output_line("Output #1, hls, to '/tmp/idx.m3u8':") + + assert process.output_media_info["container"] == "MPEGTS" + + +def test_get_output_media_info_returns_empty_for_non_transcoded_streams(): + """Plain HTTP-proxy streams have no ffmpeg process — output_media_info must + be empty so the editor knows not to render the Output Info badge row.""" + from stream_manager import StreamInfo, StreamManager + from datetime import datetime, timezone + + manager = StreamManager.__new__(StreamManager) + manager.pooled_manager = None + + stream = StreamInfo( + stream_id="plain-stream", + original_url="http://example.com/plain.ts", + created_at=datetime.now(timezone.utc), + last_access=datetime.now(timezone.utc), + ) + + assert manager._get_output_media_info(stream) == {} + + +def test_get_output_media_info_pulls_from_linked_pooled_process(): + """Transcoded streams expose the linked process's output_media_info dict so + encoder-side stats reach the API response alongside the input description.""" + from stream_manager import StreamInfo, StreamManager + from datetime import datetime, timezone + + manager = StreamManager.__new__(StreamManager) + pooled = MagicMock() + fake_process = MagicMock() + fake_process.output_media_info = { + "resolution": "1280x720", + "video_codec": "h264", + "audio_codec": "aac", + "audio_channels": "stereo", + "container": "MPEGTS", + "fps": 25.0, + "bitrate_kbps": 2500.0, + } + pooled.shared_processes = {"key-out": fake_process} + manager.pooled_manager = pooled + + stream = StreamInfo( + stream_id="t-stream", + original_url="http://example.com/t.ts", + created_at=datetime.now(timezone.utc), + last_access=datetime.now(timezone.utc), + transcode_stream_key="key-out", + ) + + info = manager._get_output_media_info(stream) + + assert info["resolution"] == "1280x720" + assert info["video_codec"] == "h264" + assert info["audio_codec"] == "aac" + assert info["audio_channels"] == "stereo" + assert info["container"] == "MPEGTS" + assert info["fps"] == 25.0 + assert info["bitrate_kbps"] == 2500.0 + + def test_force_stop_stream_does_not_clobber_concurrent_reinsertion(): """ Regression for failover race: force_stop_stream must tear down the process From c0770235883c70f3c0d227f2468cf985a2f21f6a Mon Sep 17 00:00:00 2001 From: warbs816 Date: Tue, 5 May 2026 00:36:34 +0100 Subject: [PATCH 2/2] fix: Stop treating ffmpeg's HLS segment-end EOF as an upstream failure MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ffmpeg 8.1 (linuxserver/ffmpeg:latest as of May 2026) writes "Error reading HTTP response: End of file" to stderr at the end of every HLS segment fetch — the HTTP layer closes when the segment body is exhausted and -reconnect 1 silently reopens the connection for the next segment. ffmpeg 8.0.1 didn't surface this; 8.1 does, and it tripped the existing "end of file" entry in input_error_patterns, causing the proxy to fail over every ~10 seconds during normal playback of any HLS source. Drop the bare "end of file" string. Real upstream loss is still caught by: - the more specific input patterns (connection refused, server returned 4xx/5xx, error opening input, etc. — emitted when reconnect actually fails) - the low-bitrate / silence detectors in stream_manager.py, which trigger on actual data starvation regardless of stderr. Also: extract write/input pattern lists to module-level tuples so they can be referenced by tests and don't get rebuilt per stderr-reader call. --- src/pooled_stream_manager.py | 64 +++++++++++++++++++++--------------- tests/test_failover.py | 60 +++++++++++++++++++++++++-------- 2 files changed, 83 insertions(+), 41 deletions(-) diff --git a/src/pooled_stream_manager.py b/src/pooled_stream_manager.py index a76d3bb..9a777ec 100644 --- a/src/pooled_stream_manager.py +++ b/src/pooled_stream_manager.py @@ -45,6 +45,41 @@ # "1280x720" or "1920x1080 [SAR 1:1 DAR 16:9]" inside a Stream line. _RESOLUTION_RE = re.compile(r"\b(?P\d{2,5})x(?P\d{2,5})\b") +# stderr substrings (case-insensitive match) that signal ffmpeg has been told +# to write somewhere it can't. These mark the stream as failed for cleanup. +_FFMPEG_WRITE_ERROR_PATTERNS = ( + "no space left on device", + "permission denied", + "i/o error", + "disk full", + "cannot write", + "failed to open", + "error writing", +) + +# stderr substrings (case-insensitive match) that signal a genuine upstream +# input failure and should trip failover. We deliberately do NOT include the +# bare "end of file" string: ffmpeg 8.1 writes "Error reading HTTP response: +# End of file" on every HLS segment fetch end (the HTTP layer closes when the +# segment body is exhausted) and immediately reconnects via -reconnect 1, so +# that line is normal traffic — not a stream failure. Genuine upstream loss +# is still caught by: +# * the more specific patterns below ("connection refused", "server returned +# 4xx/5xx", "error opening input", etc. — emitted when reconnect fails) +# * the low-bitrate / silence detectors in stream_manager.py, which trigger +# on actual data starvation regardless of what stderr says. +_FFMPEG_INPUT_ERROR_PATTERNS = ( + "error opening input", + "failed to resolve hostname", + "connection refused", + "connection timed out", + "input/output error", + "server returned 4", # Matches 403, 404, etc. + "server returned 5", # Matches 500, 502, 503, etc. + "invalid data found", + "protocol not found", +) + logger = logging.getLogger(__name__) try: @@ -576,31 +611,6 @@ async def _log_stderr(self): return try: - # Monitor FFmpeg stderr for various error conditions - write_error_patterns = [ - "no space left on device", - "permission denied", - "i/o error", - "disk full", - "cannot write", - "failed to open", - "error writing", - ] - - # Input/connection error patterns that should trigger failover - input_error_patterns = [ - "error opening input", - "failed to resolve hostname", - "connection refused", - "connection timed out", - "input/output error", - "server returned 4", # Matches 403, 404, etc. - "server returned 5", # Matches 500, 502, 503, etc. - "invalid data found", - "protocol not found", - "end of file", - ] - # Read stderr in small chunks and buffer lines ourselves to avoid # asyncio.StreamReader's LimitOverrunError when ffmpeg writes very # long lines without a newline (which results in the message @@ -656,7 +666,7 @@ async def _log_stderr(self): line_lower = line_str.lower() # Check for write errors - for pattern in write_error_patterns: + for pattern in _FFMPEG_WRITE_ERROR_PATTERNS: if pattern in line_lower: logger.error( f"FFmpeg write error detected for {self.stream_id}: {line_str}" @@ -666,7 +676,7 @@ async def _log_stderr(self): break # Check for input/connection errors that should trigger failover - for pattern in input_error_patterns: + for pattern in _FFMPEG_INPUT_ERROR_PATTERNS: if pattern in line_lower: logger.error( f"FFmpeg input error detected for {self.stream_id}: {line_str}" diff --git a/tests/test_failover.py b/tests/test_failover.py index fe158d0..45e35db 100644 --- a/tests/test_failover.py +++ b/tests/test_failover.py @@ -452,7 +452,10 @@ class TestFFmpegErrorPatterns: """Test FFmpeg stderr error pattern detection""" def test_input_error_patterns(self): - """Test that SharedTranscodingProcess detects input errors""" + """Test that SharedTranscodingProcess detects input errors via the + module-level pattern tuple.""" + from pooled_stream_manager import _FFMPEG_INPUT_ERROR_PATTERNS + # These are the error patterns we should detect error_patterns = [ "Error opening input file https://example.com/stream.m3u8", @@ -465,21 +468,20 @@ def test_input_error_patterns(self): "Protocol not found", ] - # Verify each pattern would be caught for error_msg in error_patterns: assert any( - pattern in error_msg.lower() - for pattern in [ - "error opening input", - "failed to resolve hostname", - "connection refused", - "connection timed out", - "server returned 4", - "server returned 5", - "invalid data found", - "protocol not found", - ] - ) + pattern in error_msg.lower() for pattern in _FFMPEG_INPUT_ERROR_PATTERNS + ), f"Expected to match an input error pattern: {error_msg}" + + def test_input_error_patterns_excludes_bare_end_of_file(self): + """Regression: ffmpeg 8.1 prints 'Error reading HTTP response: End of file' + on every HLS segment fetch end while -reconnect 1 silently reconnects, + so 'end of file' must not be a failover trigger on its own. Real upstream + loss is caught by the more specific patterns + the data-starvation + detector in stream_manager.""" + from pooled_stream_manager import _FFMPEG_INPUT_ERROR_PATTERNS + + assert "end of file" not in _FFMPEG_INPUT_ERROR_PATTERNS @pytest.mark.asyncio async def test_stderr_monitor_detects_input_error(self): @@ -511,6 +513,36 @@ async def test_stderr_monitor_detects_input_error(self): # Should have marked the stream as input_failed assert process.status == "input_failed" + @pytest.mark.asyncio + async def test_stderr_monitor_ignores_segment_end_eof(self): + """Regression for ffmpeg 8.1: a normal HLS segment-fetch EOF must NOT + flip status to input_failed. ffmpeg 8.1 logs this on every segment end + and recovers via -reconnect 1, so it isn't a failure.""" + process = SharedTranscodingProcess( + stream_id="test_stream_eof", + url="http://example.com/stream.m3u8", + profile="test", + ffmpeg_args=["-i", "{input_url}", "-c", "copy", "pipe:1"], + user_agent="test-agent", + ) + + mock_process = Mock() + mock_stderr = AsyncMock() + + eof_line = ( + b"[http @ 0x7f202400c740] Error reading HTTP response: End of file\n" + ) + mock_stderr.read = AsyncMock(side_effect=[eof_line, b""]) + + mock_process.stderr = mock_stderr + mock_process.returncode = None + process.process = mock_process + + await process._log_stderr() + + # Status must remain "starting" (init default), not flip to input_failed. + assert process.status != "input_failed" + class TestFailoverStats: """Test failover statistics tracking"""