diff --git a/src/pooled_stream_manager.py b/src/pooled_stream_manager.py index 9a777ec..5b4e9f4 100644 --- a/src/pooled_stream_manager.py +++ b/src/pooled_stream_manager.py @@ -28,6 +28,13 @@ r"^\s*Input #\d+,\s*(?P[\w,]+),\s*from\s+" ) +# "Output #0, mpegts, to 'pipe:1':" or "Output #0, hls, to '/path/index.m3u8':". +# Once this line is emitted by ffmpeg, every subsequent Stream # line refers to +# the encoder/muxer output rather than the source input. +_FFMPEG_OUTPUT_LINE_RE = re.compile( + r"^\s*Output #\d+,\s*(?P[\w,]+),\s*to\s+" +) + # "Stream #0:0[0x100]: Video: h264 (Main) ([27][0][0][0] / 0x001B), yuv420p..." # "Stream #0:0[0x100][0x200]: Video: hevc ..." (dual PID bracket groups in MPEG-TS) # "Stream #0:1[0x101](eng): Audio: aac (LC) ..." @@ -137,6 +144,15 @@ def __init__( # providers reject the second connection. Empty for resolver streams # (streamlink/yt-dlp) that don't go through ffmpeg. self.media_info: Dict[str, Any] = {} + # Output-side counterpart describing what ffmpeg is producing right now + # (encoder codec / target resolution / muxer container) plus the live + # progress fields, which are technically output stats. Populated only + # after ffmpeg prints its "Output #" line, so it stays empty for plain + # passthrough where no ffmpeg process is involved. + self.output_media_info: Dict[str, Any] = {} + # Parser state: flips True once "Output #" is seen in stderr so the + # Stream-line parser knows where to route subsequent codec lines. + self._parsing_output: bool = False # Broadcasting support - each client gets its own queue self.client_queues: Dict[str, asyncio.Queue] = {} @@ -539,15 +555,34 @@ def _parse_ffmpeg_input_line(self, line_str: str) -> None: if container and "container" not in self.media_info: self.media_info["container"] = container.upper() + def _parse_ffmpeg_output_line(self, line_str: str) -> None: + """ + Parse ffmpeg's "Output #0, FORMAT, to 'TARGET':" line. Captures the + muxer container into output_media_info and flips the parser state so + that subsequent Stream # lines are recognised as output streams. Only + the first Output block is honoured — multi-output configs (e.g. tee) + would each restate the marker but we keep the first match. + """ + match = _FFMPEG_OUTPUT_LINE_RE.match(line_str) + if not match: + return + self._parsing_output = True + container = match.group("container").strip().split(",")[0].strip() + if container and "container" not in self.output_media_info: + self.output_media_info["container"] = container.upper() + def _parse_ffmpeg_stream_line(self, line_str: str) -> None: """ Parse ffmpeg's "Stream #0:N[...]: Video|Audio: ..." lines for codec details. ffmpeg prints one per stream right after the input is opened — this is free metadata that doesn't require a second connection. + Routes into media_info or output_media_info based on whether the + "Output #" marker has been seen yet. """ match = _FFMPEG_STREAM_LINE_RE.search(line_str) if not match: return + target = self.output_media_info if self._parsing_output else self.media_info stream_type = match.group("type").lower() details = match.group("details") if stream_type == "video": @@ -555,55 +590,70 @@ def _parse_ffmpeg_stream_line(self, line_str: str) -> None: # profile (e.g. "h264 (Main) (HEVC / 0x...)" — strip parens). codec = details.split(",", 1)[0].strip() codec = re.sub(r"\s*\(.*", "", codec).strip() - if codec and "video_codec" not in self.media_info: - self.media_info["video_codec"] = codec + if codec and "video_codec" not in target: + target["video_codec"] = codec res_match = _RESOLUTION_RE.search(details) - if res_match and "resolution" not in self.media_info: - self.media_info["resolution"] = ( + if res_match and "resolution" not in target: + target["resolution"] = ( f"{res_match.group('w')}x{res_match.group('h')}" ) elif stream_type == "audio": codec = details.split(",", 1)[0].strip() codec = re.sub(r"\s*\(.*", "", codec).strip() - if codec and "audio_codec" not in self.media_info: - self.media_info["audio_codec"] = codec + if codec and "audio_codec" not in target: + target["audio_codec"] = codec layout_match = _AUDIO_LAYOUT_RE.search(details) - if layout_match and "audio_channels" not in self.media_info: - self.media_info["audio_channels"] = layout_match.group("layout") + if layout_match and "audio_channels" not in target: + target["audio_channels"] = layout_match.group("layout") def _parse_ffmpeg_progress(self, line_str: str) -> None: """ Extract live progress fields (bitrate, fps, frame, speed) from a single - ffmpeg stderr line and update media_info. No-op for non-progress lines. + ffmpeg stderr line. Progress numbers describe the encoder output, so + once the "Output #" marker has been seen we mirror them into + output_media_info. We also keep writing them to media_info so existing + callers (Stream Info badge row in m3u-editor PR #1089) don't regress. + No-op for non-progress lines. """ # Cheap pre-filter — most stderr lines aren't progress if "bitrate=" not in line_str and "fps=" not in line_str: return + targets = [self.media_info] + if self._parsing_output: + targets.append(self.output_media_info) for match in _FFMPEG_PROGRESS_FIELD_RE.finditer(line_str): key = match.group("key") raw = match.group("val") if key == "bitrate": if raw.endswith("kbits/s"): try: - self.media_info["bitrate_kbps"] = round(float(raw[:-7]), 1) + value = round(float(raw[:-7]), 1) except ValueError: - pass + continue + for t in targets: + t["bitrate_kbps"] = value elif key == "fps": try: - self.media_info["fps"] = round(float(raw), 2) + value = round(float(raw), 2) except ValueError: - pass + continue + for t in targets: + t["fps"] = value elif key == "frame": try: - self.media_info["frame"] = int(raw) + value = int(raw) except ValueError: - pass + continue + for t in targets: + t["frame"] = value elif key == "speed": if raw.endswith("x"): try: - self.media_info["speed"] = round(float(raw[:-1]), 2) + value = round(float(raw[:-1]), 2) except ValueError: - pass + continue + for t in targets: + t["speed"] = value async def _log_stderr(self): """Log FFmpeg stderr output and monitor for write errors and input failures""" @@ -660,6 +710,7 @@ async def _log_stderr(self): # populate stale values. if not self.resolver_type: self._parse_ffmpeg_input_line(line_str) + self._parse_ffmpeg_output_line(line_str) self._parse_ffmpeg_stream_line(line_str) self._parse_ffmpeg_progress(line_str) diff --git a/src/stream_manager.py b/src/stream_manager.py index cb68a1f..94fe93b 100644 --- a/src/stream_manager.py +++ b/src/stream_manager.py @@ -4551,6 +4551,21 @@ def _get_media_info(self, stream: "StreamInfo") -> Dict[str, Any]: return {} return dict(getattr(process, "media_info", {}) or {}) + def _get_output_media_info(self, stream: "StreamInfo") -> Dict[str, Any]: + """ + Look up live encoder/muxer output info for a transcoded stream — the + codec/container/resolution ffmpeg has been told to produce, plus the + live progress fields that describe what's being written downstream. + Empty for non-transcoded streams (no ffmpeg process) or before ffmpeg + emits its "Output #" line. + """ + if not stream.transcode_stream_key or not self.pooled_manager: + return {} + process = self.pooled_manager.shared_processes.get(stream.transcode_stream_key) + if not process: + return {} + return dict(getattr(process, "output_media_info", {}) or {}) + def get_stats(self) -> Dict: """Get comprehensive stats - aggregates variant stream stats into parent streams""" # Only count non-variant streams @@ -4683,6 +4698,7 @@ def get_stats(self) -> Dict: "metadata": stream.metadata, "headers": stream.headers, "media_info": self._get_media_info(stream), + "output_media_info": self._get_output_media_info(stream), } for stream in non_variant_streams ], diff --git a/tests/test_media_info.py b/tests/test_media_info.py index 907c36e..b6ad47f 100644 --- a/tests/test_media_info.py +++ b/tests/test_media_info.py @@ -246,6 +246,193 @@ def test_parse_ffmpeg_input_line_ignores_unrelated_lines(): assert "container" not in process.media_info +def test_output_media_info_starts_empty_with_parser_state_unset(): + """Fresh processes track output info separately and start in input mode.""" + process = _make_process() + + assert process.output_media_info == {} + assert process._parsing_output is False + + +def test_parse_ffmpeg_output_line_captures_container_and_flips_state(): + """The 'Output #0, FORMAT, to TARGET:' line should populate output_media_info + and switch the parser into output mode for subsequent Stream lines.""" + process = _make_process() + + process._parse_ffmpeg_output_line("Output #0, mpegts, to 'pipe:1':") + + assert process.output_media_info["container"] == "MPEGTS" + assert process._parsing_output is True + # Input-side dict must not be polluted with the output container. + assert "container" not in process.media_info + + +def test_parse_ffmpeg_output_line_takes_first_synonym(): + """Multi-synonym output containers collapse to the first canonical name.""" + process = _make_process() + + process._parse_ffmpeg_output_line( + "Output #0, mov,mp4,m4a,3gp,3g2,mj2, to '/tmp/out.mp4':" + ) + + assert process.output_media_info["container"] == "MOV" + + +def test_parse_ffmpeg_output_line_handles_hls_muxer(): + """HLS output ('Output #0, hls, to ...') is the common transcoded path.""" + process = _make_process() + + process._parse_ffmpeg_output_line("Output #0, hls, to '/tmp/abc/index.m3u8':") + + assert process.output_media_info["container"] == "HLS" + + +def test_stream_lines_before_output_marker_go_to_input_media_info(): + """While the parser hasn't seen Output #0, codec lines describe the source.""" + process = _make_process() + + process._parse_ffmpeg_stream_line( + " Stream #0:0[0x100]: Video: hevc (Main), yuv420p, 1920x1080, 50 fps" + ) + process._parse_ffmpeg_stream_line( + " Stream #0:1[0x101](eng): Audio: ac3, 48000 Hz, 5.1, fltp, 384 kb/s" + ) + + assert process.media_info["video_codec"] == "hevc" + assert process.media_info["resolution"] == "1920x1080" + assert process.media_info["audio_codec"] == "ac3" + assert process.media_info["audio_channels"] == "5.1" + assert process.output_media_info == {} + + +def test_stream_lines_after_output_marker_go_to_output_media_info(): + """Once Output # is seen, the encoder Stream lines populate output_media_info, + leaving the input description intact.""" + process = _make_process() + + process._parse_ffmpeg_stream_line( + " Stream #0:0[0x100]: Video: hevc, yuv420p, 1920x1080, 50 fps" + ) + process._parse_ffmpeg_stream_line( + " Stream #0:1[0x101]: Audio: ac3, 48000 Hz, 5.1, fltp, 384 kb/s" + ) + process._parse_ffmpeg_output_line("Output #0, mpegts, to 'pipe:1':") + process._parse_ffmpeg_stream_line( + " Stream #0:0: Video: h264, yuv420p, 1280x720, q=2-31, 25 fps" + ) + process._parse_ffmpeg_stream_line( + " Stream #0:1: Audio: aac (LC), 48000 Hz, stereo, fltp, 128 kb/s" + ) + + # Source side untouched + assert process.media_info["video_codec"] == "hevc" + assert process.media_info["resolution"] == "1920x1080" + assert process.media_info["audio_codec"] == "ac3" + assert process.media_info["audio_channels"] == "5.1" + # Encoder side captured separately + assert process.output_media_info["video_codec"] == "h264" + assert process.output_media_info["resolution"] == "1280x720" + assert process.output_media_info["audio_codec"] == "aac" + assert process.output_media_info["audio_channels"] == "stereo" + assert process.output_media_info["container"] == "MPEGTS" + + +def test_progress_fields_dual_write_once_output_seen(): + """Progress numbers describe encoder output. Before Output #0 they only + populate media_info (so the existing UI keeps getting bitrate/fps even on + very early reads); after Output #0 they mirror into output_media_info.""" + process = _make_process() + + process._parse_ffmpeg_progress( + "frame=10 fps=25 bitrate=1000.0kbits/s speed=1.0x" + ) + assert process.media_info["bitrate_kbps"] == 1000.0 + assert process.media_info["fps"] == 25.0 + assert "bitrate_kbps" not in process.output_media_info + + process._parse_ffmpeg_output_line("Output #0, mpegts, to 'pipe:1':") + process._parse_ffmpeg_progress( + "frame=20 fps=30 bitrate=2000.5kbits/s speed=1.5x" + ) + + assert process.media_info["bitrate_kbps"] == 2000.5 + assert process.media_info["fps"] == 30.0 + assert process.output_media_info["bitrate_kbps"] == 2000.5 + assert process.output_media_info["fps"] == 30.0 + assert process.output_media_info["frame"] == 20 + assert process.output_media_info["speed"] == 1.5 + + +def test_parse_ffmpeg_output_line_does_not_clobber_existing_container(): + """Some configs may emit multiple Output # markers (e.g. tee); only the + first wins so output_media_info reflects the primary muxer.""" + process = _make_process() + + process._parse_ffmpeg_output_line("Output #0, mpegts, to 'pipe:1':") + process._parse_ffmpeg_output_line("Output #1, hls, to '/tmp/idx.m3u8':") + + assert process.output_media_info["container"] == "MPEGTS" + + +def test_get_output_media_info_returns_empty_for_non_transcoded_streams(): + """Plain HTTP-proxy streams have no ffmpeg process — output_media_info must + be empty so the editor knows not to render the Output Info badge row.""" + from stream_manager import StreamInfo, StreamManager + from datetime import datetime, timezone + + manager = StreamManager.__new__(StreamManager) + manager.pooled_manager = None + + stream = StreamInfo( + stream_id="plain-stream", + original_url="http://example.com/plain.ts", + created_at=datetime.now(timezone.utc), + last_access=datetime.now(timezone.utc), + ) + + assert manager._get_output_media_info(stream) == {} + + +def test_get_output_media_info_pulls_from_linked_pooled_process(): + """Transcoded streams expose the linked process's output_media_info dict so + encoder-side stats reach the API response alongside the input description.""" + from stream_manager import StreamInfo, StreamManager + from datetime import datetime, timezone + + manager = StreamManager.__new__(StreamManager) + pooled = MagicMock() + fake_process = MagicMock() + fake_process.output_media_info = { + "resolution": "1280x720", + "video_codec": "h264", + "audio_codec": "aac", + "audio_channels": "stereo", + "container": "MPEGTS", + "fps": 25.0, + "bitrate_kbps": 2500.0, + } + pooled.shared_processes = {"key-out": fake_process} + manager.pooled_manager = pooled + + stream = StreamInfo( + stream_id="t-stream", + original_url="http://example.com/t.ts", + created_at=datetime.now(timezone.utc), + last_access=datetime.now(timezone.utc), + transcode_stream_key="key-out", + ) + + info = manager._get_output_media_info(stream) + + assert info["resolution"] == "1280x720" + assert info["video_codec"] == "h264" + assert info["audio_codec"] == "aac" + assert info["audio_channels"] == "stereo" + assert info["container"] == "MPEGTS" + assert info["fps"] == 25.0 + assert info["bitrate_kbps"] == 2500.0 + + def test_force_stop_stream_does_not_clobber_concurrent_reinsertion(): """ Regression for failover race: force_stop_stream must tear down the process