diff --git a/src/pooled_stream_manager.py b/src/pooled_stream_manager.py index c357e8e..f479c60 100644 --- a/src/pooled_stream_manager.py +++ b/src/pooled_stream_manager.py @@ -5,6 +5,7 @@ import asyncio import json +import re import time import uuid import hashlib @@ -14,6 +15,36 @@ import os import tempfile +# Live ffmpeg progress fields written to stderr (e.g. +# "frame= 243 fps= 30 q=28.0 size= 1152kB time=00:00:08.13 bitrate=1162.5kbits/s speed=1.01x") +_FFMPEG_PROGRESS_FIELD_RE = re.compile( + r"\b(?Pframe|fps|bitrate|speed|q|size|time)\s*=\s*(?P\S+)" +) + +# "Input #0, mpegts, from 'http://...':" or +# "Input #0, mov,mp4,m4a,3gp,3g2,mj2, from 'file.mp4':" — captures the comma- +# separated container synonyms list (split into the canonical name later). +_FFMPEG_INPUT_LINE_RE = re.compile( + r"^\s*Input #\d+,\s*(?P[\w,]+),\s*from\s+" +) + +# "Stream #0:0[0x100]: Video: h264 (Main) ([27][0][0][0] / 0x001B), yuv420p..." +# "Stream #0:0[0x100][0x200]: Video: hevc ..." (dual PID bracket groups in MPEG-TS) +# "Stream #0:1[0x101](eng): Audio: aac (LC) ..." +_FFMPEG_STREAM_LINE_RE = re.compile( + r"Stream #\d+:\d+(?:\[[^\]]+\])*(?:\([^)]+\))?:\s+" + r"(?PVideo|Audio):\s+(?P
.+)$" +) + +# Audio channel layout: "stereo", "mono", "5.1", "5.1(side)", "5.1(back)", "7.1", "quad", etc. +# The (?:\([^)]*\))? tolerates the variant suffixes ffmpeg appends to 5.1/7.1. +_AUDIO_LAYOUT_RE = re.compile( + r",\s*(?Pstereo|mono|5\.1|7\.1|quad)(?:\([^)]*\))?[,\s]" +) + +# "1280x720" or "1920x1080 [SAR 1:1 DAR 16:9]" inside a Stream line. +_RESOLUTION_RE = re.compile(r"\b(?P\d{2,5})x(?P\d{2,5})\b") + logger = logging.getLogger(__name__) try: @@ -63,6 +94,14 @@ def __init__( self.last_access = time.time() self.total_bytes_served = 0 self.status = "starting" + # Live media info parsed from the active ffmpeg process's own stderr — + # codec/container/resolution/audio from the "Input #" and "Stream #" + # lines printed at startup, plus live bitrate/fps from the periodic + # progress line. We do NOT run a separate ffprobe against the source + # because that would double the upstream connection count and IPTV + # providers reject the second connection. Empty for resolver streams + # (streamlink/yt-dlp) that don't go through ffmpeg. + self.media_info: Dict[str, Any] = {} # Broadcasting support - each client gets its own queue self.client_queues: Dict[str, asyncio.Queue] = {} @@ -347,7 +386,8 @@ async def start_process(self): self.status = "running" logger.info(f"Shared FFmpeg process started with PID: {self.process.pid}") - # Start stderr logging task + # Start stderr logging task — also parses codec/resolution/bitrate + # from ffmpeg's own output (no extra upstream connections). asyncio.create_task(self._log_stderr()) # Start broadcaster task to read from FFmpeg and send to all clients @@ -452,6 +492,84 @@ async def _hls_watch_loop(self): except Exception as e: logger.debug(f"HLS watch loop ended for {self.stream_id}: {e}") + def _parse_ffmpeg_input_line(self, line_str: str) -> None: + """ + Parse ffmpeg's "Input #0, FORMAT, from 'URL':" line for the container + format. ffmpeg prints this once per input when it opens the stream. + """ + match = _FFMPEG_INPUT_LINE_RE.match(line_str) + if not match: + return + container = match.group("container").strip().split(",")[0].strip() + if container and "container" not in self.media_info: + self.media_info["container"] = container.upper() + + def _parse_ffmpeg_stream_line(self, line_str: str) -> None: + """ + Parse ffmpeg's "Stream #0:N[...]: Video|Audio: ..." lines for codec + details. ffmpeg prints one per stream right after the input is opened + — this is free metadata that doesn't require a second connection. + """ + match = _FFMPEG_STREAM_LINE_RE.search(line_str) + if not match: + return + stream_type = match.group("type").lower() + details = match.group("details") + if stream_type == "video": + # Codec name is the first token, may be followed by a parenthesised + # profile (e.g. "h264 (Main) (HEVC / 0x...)" — strip parens). + codec = details.split(",", 1)[0].strip() + codec = re.sub(r"\s*\(.*", "", codec).strip() + if codec and "video_codec" not in self.media_info: + self.media_info["video_codec"] = codec + res_match = _RESOLUTION_RE.search(details) + if res_match and "resolution" not in self.media_info: + self.media_info["resolution"] = ( + f"{res_match.group('w')}x{res_match.group('h')}" + ) + elif stream_type == "audio": + codec = details.split(",", 1)[0].strip() + codec = re.sub(r"\s*\(.*", "", codec).strip() + if codec and "audio_codec" not in self.media_info: + self.media_info["audio_codec"] = codec + layout_match = _AUDIO_LAYOUT_RE.search(details) + if layout_match and "audio_channels" not in self.media_info: + self.media_info["audio_channels"] = layout_match.group("layout") + + def _parse_ffmpeg_progress(self, line_str: str) -> None: + """ + Extract live progress fields (bitrate, fps, frame, speed) from a single + ffmpeg stderr line and update media_info. No-op for non-progress lines. + """ + # Cheap pre-filter — most stderr lines aren't progress + if "bitrate=" not in line_str and "fps=" not in line_str: + return + for match in _FFMPEG_PROGRESS_FIELD_RE.finditer(line_str): + key = match.group("key") + raw = match.group("val") + if key == "bitrate": + if raw.endswith("kbits/s"): + try: + self.media_info["bitrate_kbps"] = round(float(raw[:-7]), 1) + except ValueError: + pass + elif key == "fps": + try: + self.media_info["fps"] = round(float(raw), 2) + except ValueError: + pass + elif key == "frame": + try: + self.media_info["frame"] = int(raw) + except ValueError: + pass + elif key == "speed": + if raw.endswith("x"): + try: + self.media_info["speed"] = round(float(raw[:-1]), 2) + except ValueError: + pass + async def _log_stderr(self): """Log FFmpeg stderr output and monitor for write errors and input failures""" if not self.process or not self.process.stderr: @@ -502,15 +620,36 @@ async def _log_stderr(self): buf += chunk - # Split on newline and process full lines - while b"\n" in buf: - line, buf = buf.split(b"\n", 1) + # Split on \n OR \r — ffmpeg writes its periodic stats line with + # a trailing \r so it overwrites in-place in a terminal. Without + # splitting on \r those stats lines are buffered until the next + # newline arrives and we miss the live bitrate/fps updates. + while True: + n_idx = buf.find(b"\n") + r_idx = buf.find(b"\r") + if n_idx == -1 and r_idx == -1: + break + if n_idx == -1: + idx = r_idx + elif r_idx == -1: + idx = n_idx + else: + idx = min(n_idx, r_idx) + line, buf = buf[:idx], buf[idx + 1 :] line_str = line.decode("utf-8", errors="ignore").strip() if not line_str: continue - # Log FFmpeg output (you could parse stats here) + # Log FFmpeg output and capture media info / live progress. + # Skip parsers for resolver (yt-dlp/streamlink) stderr — their + # output format is not ffmpeg's, so the regexes won't match but + # a coincidental "fps=" or "bitrate=" in debug output could + # populate stale values. logger.debug(f"FFmpeg [{self.stream_id}]: {line_str}") + if not self.resolver_type: + self._parse_ffmpeg_input_line(line_str) + self._parse_ffmpeg_stream_line(line_str) + self._parse_ffmpeg_progress(line_str) line_lower = line_str.lower() @@ -1293,16 +1432,21 @@ async def remove_client_from_stream(self, client_id: str): async def force_stop_stream(self, stream_key: str): """ Immediately stop a stream and its FFmpeg process without grace period. - Used when explicitly deleting a stream via API. + Used when explicitly deleting a stream via API and during failover. + + Pops the entry up front and cleans up the captured reference so a + concurrent get_or_create_shared_stream that re-inserts at the same key + (e.g. a failover client racing this teardown) isn't torn down by the + trailing cleanup. """ - if stream_key not in self.shared_processes: + process = self.shared_processes.pop(stream_key, None) + if process is None: logger.info(f"Stream {stream_key} not found in local processes") return False logger.info( f"Force stopping stream {stream_key} and terminating FFmpeg process" ) - process = self.shared_processes[stream_key] # Remove all clients from this stream immediately clients_to_remove = list(process.clients.keys()) @@ -1311,8 +1455,24 @@ async def force_stop_stream(self, stream_key: str): if client_id in self.client_streams: del self.client_streams[client_id] - # Immediately cleanup the FFmpeg process - await self._cleanup_local_process(stream_key) + # Tear down the captured process — NOT whatever is at stream_key now. + await process.cleanup() + + # Mirror the auxiliary cleanup that _cleanup_local_process does so the + # state stays consistent regardless of which path called us. + if stream_key in self.stream_key_to_id: + del self.stream_key_to_id[stream_key] + if self.redis_client: + try: + redis_key = f"stream:{stream_key}" + await self.redis_client.delete(redis_key) + await self.redis_client.srem( + f"worker:{self.worker_id}:streams", redis_key + ) + except Exception as e: + logger.warning( + f"Error cleaning up Redis state for stream {stream_key}: {e}" + ) logger.info(f"Stream {stream_key} force stopped, FFmpeg process terminated") return True diff --git a/src/stream_manager.py b/src/stream_manager.py index ae59cc8..cb68a1f 100644 --- a/src/stream_manager.py +++ b/src/stream_manager.py @@ -4315,20 +4315,27 @@ async def _try_update_failover_url( stream_info.failover_event.clear() stream_info.failover_event.set() - # For transcoded streams, stop and restart the transcoding process + # For transcoded streams, stop and restart the transcoding process. + # Clear the stream key BEFORE awaiting force_stop_stream — otherwise a + # client coroutine woken by failover_event can read the still-set + # transcode_stream_key and pass it as reuse_stream_key, which creates a + # new SharedTranscodingProcess at the same key. The tail-end + # _cleanup_local_process inside force_stop_stream then pops whatever is + # at that key (the brand-new process) and tears down its ffmpeg, killing + # the failover stream a couple of seconds after it started. if stream_info.is_transcoded and self.pooled_manager: - try: - # Stop the old transcoding process - if stream_info.transcode_stream_key: + old_stream_key = stream_info.transcode_stream_key + stream_info.transcode_stream_key = None + if old_stream_key: + try: logger.info( - f"Stopping transcoding process for failover: {stream_info.transcode_stream_key}" + f"Stopping transcoding process for failover: {old_stream_key}" ) - await self.pooled_manager.force_stop_stream( - stream_info.transcode_stream_key + await self.pooled_manager.force_stop_stream(old_stream_key) + except Exception as e: + logger.error( + f"Error stopping transcoding process during failover: {e}" ) - stream_info.transcode_stream_key = None - except Exception as e: - logger.error(f"Error stopping transcoding process during failover: {e}") # Emit failover event await self._emit_event( @@ -4531,6 +4538,19 @@ async def _monitor_connection_idle_time(self): client_info.idle_warning_logged = False client_info.idle_error_logged = False + def _get_media_info(self, stream: "StreamInfo") -> Dict[str, Any]: + """ + Look up live media info (codec/container/resolution/bitrate/fps) for a + stream. Returns an empty dict for non-transcoded streams since live + ffmpeg metadata only exists when ffmpeg is the active producer. + """ + if not stream.transcode_stream_key or not self.pooled_manager: + return {} + process = self.pooled_manager.shared_processes.get(stream.transcode_stream_key) + if not process: + return {} + return dict(getattr(process, "media_info", {}) or {}) + def get_stats(self) -> Dict: """Get comprehensive stats - aggregates variant stream stats into parent streams""" # Only count non-variant streams @@ -4662,6 +4682,7 @@ def get_stats(self) -> Dict: "last_access": stream.last_access.isoformat(), "metadata": stream.metadata, "headers": stream.headers, + "media_info": self._get_media_info(stream), } for stream in non_variant_streams ], diff --git a/tests/test_media_info.py b/tests/test_media_info.py new file mode 100644 index 0000000..907c36e --- /dev/null +++ b/tests/test_media_info.py @@ -0,0 +1,297 @@ +""" +Unit tests for live media_info population on transcoded streams. + +These tests cover the parsers that extract codec/container/resolution/audio +info and live progress (bitrate/fps/frame/speed) from ffmpeg's own stderr +output, and verify media_info propagates through stream_manager.get_stats() +so the m3u-editor UI can display live badges. We deliberately do NOT run a +separate ffprobe against the source URL — that doubles the upstream connection +count and trips per-user limits at IPTV providers. +""" + +import sys +import os + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "src")) + +from unittest.mock import MagicMock # noqa: E402 + + +def _make_process(): + """Build a minimal SharedTranscodingProcess for parser-level assertions.""" + from pooled_stream_manager import SharedTranscodingProcess + + return SharedTranscodingProcess( + stream_id="test-stream", + url="http://example.com/test.ts", + profile="default", + ffmpeg_args=["-i", "input", "-c", "copy", "-f", "mpegts", "pipe:1"], + ) + + +def test_media_info_starts_empty(): + """Fresh processes should have an empty media_info dict, not None.""" + process = _make_process() + + assert process.media_info == {} + + +def test_parse_ffmpeg_progress_extracts_live_fields(): + """A typical ffmpeg stats line should populate bitrate/fps/frame/speed.""" + process = _make_process() + + line = ( + "frame= 243 fps= 30 q=28.0 size= 1152kB " + "time=00:00:08.13 bitrate=1162.5kbits/s speed=1.01x" + ) + process._parse_ffmpeg_progress(line) + + assert process.media_info["bitrate_kbps"] == 1162.5 + assert process.media_info["fps"] == 30.0 + assert process.media_info["frame"] == 243 + assert process.media_info["speed"] == 1.01 + + +def test_parse_ffmpeg_progress_ignores_non_progress_lines(): + """Header/info lines should not contribute progress values.""" + process = _make_process() + + process._parse_ffmpeg_progress("Input #0, mpegts, from 'http://example.com':") + process._parse_ffmpeg_progress(" Duration: N/A, start: 1.400000, bitrate: N/A") + + # The "bitrate: N/A" form is not in kbits/s and should be skipped. + assert "bitrate_kbps" not in process.media_info + assert "fps" not in process.media_info + + +def test_parse_ffmpeg_progress_updates_overwrite_previous_values(): + """Each new progress line should overwrite the prior live snapshot.""" + process = _make_process() + + process._parse_ffmpeg_progress("frame=10 fps=25 bitrate=1000.0kbits/s speed=1.0x") + process._parse_ffmpeg_progress("frame=20 fps=30 bitrate=2000.5kbits/s speed=1.5x") + + assert process.media_info["frame"] == 20 + assert process.media_info["fps"] == 30.0 + assert process.media_info["bitrate_kbps"] == 2000.5 + assert process.media_info["speed"] == 1.5 + + +def test_get_media_info_returns_empty_for_non_transcoded_streams(): + """ + Plain HTTP-proxy streams (no ffmpeg) must return empty media_info — the + UI relies on this to hide metadata badges when there's nothing live to show. + """ + from stream_manager import StreamInfo, StreamManager + from datetime import datetime, timezone + + manager = StreamManager.__new__(StreamManager) + manager.pooled_manager = None + + stream = StreamInfo( + stream_id="plain-stream", + original_url="http://example.com/plain.ts", + created_at=datetime.now(timezone.utc), + last_access=datetime.now(timezone.utc), + ) + + assert manager._get_media_info(stream) == {} + + +def test_get_media_info_pulls_from_linked_pooled_process(): + """ + Transcoded streams should surface the linked SharedTranscodingProcess's + media_info dict so live ffmpeg data reaches the API response. + """ + from stream_manager import StreamInfo, StreamManager + from datetime import datetime, timezone + + manager = StreamManager.__new__(StreamManager) + pooled = MagicMock() + fake_process = MagicMock() + fake_process.media_info = { + "resolution": "1920x1080", + "video_codec": "h264", + "fps": 30.0, + "bitrate_kbps": 4500.0, + } + pooled.shared_processes = {"key-abc": fake_process} + manager.pooled_manager = pooled + + stream = StreamInfo( + stream_id="t-stream", + original_url="http://example.com/t.ts", + created_at=datetime.now(timezone.utc), + last_access=datetime.now(timezone.utc), + transcode_stream_key="key-abc", + ) + + info = manager._get_media_info(stream) + + assert info["resolution"] == "1920x1080" + assert info["video_codec"] == "h264" + assert info["fps"] == 30.0 + assert info["bitrate_kbps"] == 4500.0 + + +def test_parse_ffmpeg_input_line_extracts_container(): + """The 'Input #0, FORMAT, from URL:' line should populate container.""" + process = _make_process() + + process._parse_ffmpeg_input_line( + "Input #0, mpegts, from 'http://example.com/stream.ts':" + ) + + assert process.media_info["container"] == "MPEGTS" + + +def test_parse_ffmpeg_input_line_takes_first_synonym(): + """When ffmpeg lists multiple format synonyms, take the first one.""" + process = _make_process() + + process._parse_ffmpeg_input_line( + "Input #0, mov,mp4,m4a,3gp,3g2,mj2, from 'file.mp4':" + ) + + assert process.media_info["container"] == "MOV" + + +def test_parse_ffmpeg_stream_line_extracts_video_codec_and_resolution(): + """Video stream lines populate video_codec and resolution.""" + process = _make_process() + + process._parse_ffmpeg_stream_line( + " Stream #0:0[0x100]: Video: h264 (Main) ([27][0][0][0] / 0x001B), " + "yuv420p(progressive), 1280x720 [SAR 1:1 DAR 16:9], 50 fps, 50 tbr, 90k tbn" + ) + + assert process.media_info["video_codec"] == "h264" + assert process.media_info["resolution"] == "1280x720" + + +def test_parse_ffmpeg_stream_line_extracts_audio_codec_and_channels(): + """Audio stream lines populate audio_codec and audio_channels.""" + process = _make_process() + + process._parse_ffmpeg_stream_line( + " Stream #0:1[0x101](eng): Audio: aac (LC) ([15][0][0][0] / 0x000F), " + "48000 Hz, stereo, fltp, 192 kb/s" + ) + + assert process.media_info["audio_codec"] == "aac" + assert process.media_info["audio_channels"] == "stereo" + + +def test_parse_ffmpeg_stream_line_handles_5_1_channel_layout(): + """5.1 surround layout should map correctly.""" + process = _make_process() + + process._parse_ffmpeg_stream_line( + " Stream #0:1: Audio: ac3, 48000 Hz, 5.1, fltp, 384 kb/s" + ) + + assert process.media_info["audio_channels"] == "5.1" + + +def test_parse_ffmpeg_stream_line_handles_dual_pid_brackets(): + """ + MPEG-TS streams commonly emit two consecutive PID bracket groups + (e.g. [0x100][0x200]). The regex must match both so video_codec and + resolution are populated — regression guard for the ? → * fix. + """ + process = _make_process() + + process._parse_ffmpeg_stream_line( + " Stream #0:0[0x100][0x200]: Video: hevc (Main), " + "yuv420p(tv, bt709), 1920x1080, 50 fps, 50 tbr, 90k tbn" + ) + + assert process.media_info["video_codec"] == "hevc" + assert process.media_info["resolution"] == "1920x1080" + + +def test_parse_ffmpeg_stream_line_handles_5_1_side_channel_layout(): + """5.1(side) channel layout variant should map to '5.1'.""" + process = _make_process() + + process._parse_ffmpeg_stream_line( + " Stream #0:1: Audio: eac3, 48000 Hz, 5.1(side), fltp, 384 kb/s" + ) + + assert process.media_info["audio_channels"] == "5.1" + + +def test_parse_ffmpeg_stream_line_does_not_clobber_existing_codec(): + """ + The first Video stream wins so we don't overwrite with secondary streams + (e.g. embedded thumbnails). Live progress fields (fps/bitrate) are still + free to update because they're handled by _parse_ffmpeg_progress. + """ + process = _make_process() + process.media_info["video_codec"] = "h264" + + process._parse_ffmpeg_stream_line(" Stream #0:2: Video: png, rgba, 256x256") + + assert process.media_info["video_codec"] == "h264" + + +def test_parse_ffmpeg_input_line_ignores_unrelated_lines(): + """Non-Input lines should be no-ops.""" + process = _make_process() + + process._parse_ffmpeg_input_line( + "frame= 243 fps= 30 bitrate=1162.5kbits/s speed=1.01x" + ) + + assert "container" not in process.media_info + + +def test_force_stop_stream_does_not_clobber_concurrent_reinsertion(): + """ + Regression for failover race: force_stop_stream must tear down the process + it was called for, even if a concurrent get_or_create_shared_stream + re-inserts a brand-new SharedTranscodingProcess at the same stream_key + while the old one is being awaited. Otherwise the freshly-started failover + transcoder gets killed seconds after it starts and the stream goes dark. + """ + import asyncio + from pooled_stream_manager import PooledStreamManager + + async def _run(): + # Build a manager without the redis/event-loop setup; we only exercise + # the in-memory force_stop_stream path. + manager = PooledStreamManager.__new__(PooledStreamManager) + manager.shared_processes = {} + manager.client_streams = {} + manager.stream_key_to_id = {} + manager.redis_client = None + manager.worker_id = "test-worker" + + # The "old" process — replace its async methods with stubs so we don't + # actually need a running ffmpeg. + old = _make_process() + old.process = None # cleanup() short-circuits when there's no process + old.clients = {"client-a": 0.0} + + async def remove_client(client_id): + # During this await, another coroutine swaps in a new process at + # the same stream_key — simulating the client reconnect that + # caused the original race. + new = _make_process() + new.media_info["video_codec"] = "h264" + manager.shared_processes["key-X"] = new + old.clients.pop(client_id, None) + + old.remove_client = remove_client + manager.shared_processes["key-X"] = old + + await manager.force_stop_stream("key-X") + + # The new process inserted mid-flight must still be present — only the + # captured 'old' reference should have been torn down. + survivor = manager.shared_processes.get("key-X") + assert survivor is not None, "force_stop_stream destroyed the new process" + assert survivor is not old + assert survivor.media_info.get("video_codec") == "h264" + + asyncio.run(_run())