Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 68 additions & 17 deletions src/pooled_stream_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@
r"^\s*Input #\d+,\s*(?P<container>[\w,]+),\s*from\s+"
)

# "Output #0, mpegts, to 'pipe:1':" or "Output #0, hls, to '/path/index.m3u8':".
# Once this line is emitted by ffmpeg, every subsequent Stream # line refers to
# the encoder/muxer output rather than the source input.
_FFMPEG_OUTPUT_LINE_RE = re.compile(
r"^\s*Output #\d+,\s*(?P<container>[\w,]+),\s*to\s+"
)

# "Stream #0:0[0x100]: Video: h264 (Main) ([27][0][0][0] / 0x001B), yuv420p..."
# "Stream #0:0[0x100][0x200]: Video: hevc ..." (dual PID bracket groups in MPEG-TS)
# "Stream #0:1[0x101](eng): Audio: aac (LC) ..."
Expand Down Expand Up @@ -137,6 +144,15 @@ def __init__(
# providers reject the second connection. Empty for resolver streams
# (streamlink/yt-dlp) that don't go through ffmpeg.
self.media_info: Dict[str, Any] = {}
# Output-side counterpart describing what ffmpeg is producing right now
# (encoder codec / target resolution / muxer container) plus the live
# progress fields, which are technically output stats. Populated only
# after ffmpeg prints its "Output #" line, so it stays empty for plain
# passthrough where no ffmpeg process is involved.
self.output_media_info: Dict[str, Any] = {}
# Parser state: flips True once "Output #" is seen in stderr so the
# Stream-line parser knows where to route subsequent codec lines.
self._parsing_output: bool = False

# Broadcasting support - each client gets its own queue
self.client_queues: Dict[str, asyncio.Queue] = {}
Expand Down Expand Up @@ -539,71 +555,105 @@ def _parse_ffmpeg_input_line(self, line_str: str) -> None:
if container and "container" not in self.media_info:
self.media_info["container"] = container.upper()

def _parse_ffmpeg_output_line(self, line_str: str) -> None:
"""
Parse ffmpeg's "Output #0, FORMAT, to 'TARGET':" line. Captures the
muxer container into output_media_info and flips the parser state so
that subsequent Stream # lines are recognised as output streams. Only
the first Output block is honoured — multi-output configs (e.g. tee)
would each restate the marker but we keep the first match.
"""
match = _FFMPEG_OUTPUT_LINE_RE.match(line_str)
if not match:
return
self._parsing_output = True
container = match.group("container").strip().split(",")[0].strip()
if container and "container" not in self.output_media_info:
self.output_media_info["container"] = container.upper()

def _parse_ffmpeg_stream_line(self, line_str: str) -> None:
"""
Parse ffmpeg's "Stream #0:N[...]: Video|Audio: ..." lines for codec
details. ffmpeg prints one per stream right after the input is opened
— this is free metadata that doesn't require a second connection.
Routes into media_info or output_media_info based on whether the
"Output #" marker has been seen yet.
"""
match = _FFMPEG_STREAM_LINE_RE.search(line_str)
if not match:
return
target = self.output_media_info if self._parsing_output else self.media_info
stream_type = match.group("type").lower()
details = match.group("details")
if stream_type == "video":
# Codec name is the first token, may be followed by a parenthesised
# profile (e.g. "h264 (Main) (HEVC / 0x...)" — strip parens).
codec = details.split(",", 1)[0].strip()
codec = re.sub(r"\s*\(.*", "", codec).strip()
if codec and "video_codec" not in self.media_info:
self.media_info["video_codec"] = codec
if codec and "video_codec" not in target:
target["video_codec"] = codec
res_match = _RESOLUTION_RE.search(details)
if res_match and "resolution" not in self.media_info:
self.media_info["resolution"] = (
if res_match and "resolution" not in target:
target["resolution"] = (
f"{res_match.group('w')}x{res_match.group('h')}"
)
elif stream_type == "audio":
codec = details.split(",", 1)[0].strip()
codec = re.sub(r"\s*\(.*", "", codec).strip()
if codec and "audio_codec" not in self.media_info:
self.media_info["audio_codec"] = codec
if codec and "audio_codec" not in target:
target["audio_codec"] = codec
layout_match = _AUDIO_LAYOUT_RE.search(details)
if layout_match and "audio_channels" not in self.media_info:
self.media_info["audio_channels"] = layout_match.group("layout")
if layout_match and "audio_channels" not in target:
target["audio_channels"] = layout_match.group("layout")

def _parse_ffmpeg_progress(self, line_str: str) -> None:
"""
Extract live progress fields (bitrate, fps, frame, speed) from a single
ffmpeg stderr line and update media_info. No-op for non-progress lines.
ffmpeg stderr line. Progress numbers describe the encoder output, so
once the "Output #" marker has been seen we mirror them into
output_media_info. We also keep writing them to media_info so existing
callers (Stream Info badge row in m3u-editor PR #1089) don't regress.
No-op for non-progress lines.
"""
# Cheap pre-filter — most stderr lines aren't progress
if "bitrate=" not in line_str and "fps=" not in line_str:
return
targets = [self.media_info]
if self._parsing_output:
targets.append(self.output_media_info)
for match in _FFMPEG_PROGRESS_FIELD_RE.finditer(line_str):
key = match.group("key")
raw = match.group("val")
if key == "bitrate":
if raw.endswith("kbits/s"):
try:
self.media_info["bitrate_kbps"] = round(float(raw[:-7]), 1)
value = round(float(raw[:-7]), 1)
except ValueError:
pass
continue
for t in targets:
t["bitrate_kbps"] = value
elif key == "fps":
try:
self.media_info["fps"] = round(float(raw), 2)
value = round(float(raw), 2)
except ValueError:
pass
continue
for t in targets:
t["fps"] = value
elif key == "frame":
try:
self.media_info["frame"] = int(raw)
value = int(raw)
except ValueError:
pass
continue
for t in targets:
t["frame"] = value
elif key == "speed":
if raw.endswith("x"):
try:
self.media_info["speed"] = round(float(raw[:-1]), 2)
value = round(float(raw[:-1]), 2)
except ValueError:
pass
continue
for t in targets:
t["speed"] = value

async def _log_stderr(self):
"""Log FFmpeg stderr output and monitor for write errors and input failures"""
Expand Down Expand Up @@ -660,6 +710,7 @@ async def _log_stderr(self):
# populate stale values.
if not self.resolver_type:
self._parse_ffmpeg_input_line(line_str)
self._parse_ffmpeg_output_line(line_str)
self._parse_ffmpeg_stream_line(line_str)
self._parse_ffmpeg_progress(line_str)

Expand Down
16 changes: 16 additions & 0 deletions src/stream_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -4551,6 +4551,21 @@ def _get_media_info(self, stream: "StreamInfo") -> Dict[str, Any]:
return {}
return dict(getattr(process, "media_info", {}) or {})

def _get_output_media_info(self, stream: "StreamInfo") -> Dict[str, Any]:
"""
Look up live encoder/muxer output info for a transcoded stream — the
codec/container/resolution ffmpeg has been told to produce, plus the
live progress fields that describe what's being written downstream.
Empty for non-transcoded streams (no ffmpeg process) or before ffmpeg
emits its "Output #" line.
"""
if not stream.transcode_stream_key or not self.pooled_manager:
return {}
process = self.pooled_manager.shared_processes.get(stream.transcode_stream_key)
if not process:
return {}
return dict(getattr(process, "output_media_info", {}) or {})

def get_stats(self) -> Dict:
"""Get comprehensive stats - aggregates variant stream stats into parent streams"""
# Only count non-variant streams
Expand Down Expand Up @@ -4683,6 +4698,7 @@ def get_stats(self) -> Dict:
"metadata": stream.metadata,
"headers": stream.headers,
"media_info": self._get_media_info(stream),
"output_media_info": self._get_output_media_info(stream),
}
for stream in non_variant_streams
],
Expand Down
187 changes: 187 additions & 0 deletions tests/test_media_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,193 @@ def test_parse_ffmpeg_input_line_ignores_unrelated_lines():
assert "container" not in process.media_info


def test_output_media_info_starts_empty_with_parser_state_unset():
"""Fresh processes track output info separately and start in input mode."""
process = _make_process()

assert process.output_media_info == {}
assert process._parsing_output is False


def test_parse_ffmpeg_output_line_captures_container_and_flips_state():
"""The 'Output #0, FORMAT, to TARGET:' line should populate output_media_info
and switch the parser into output mode for subsequent Stream lines."""
process = _make_process()

process._parse_ffmpeg_output_line("Output #0, mpegts, to 'pipe:1':")

assert process.output_media_info["container"] == "MPEGTS"
assert process._parsing_output is True
# Input-side dict must not be polluted with the output container.
assert "container" not in process.media_info


def test_parse_ffmpeg_output_line_takes_first_synonym():
"""Multi-synonym output containers collapse to the first canonical name."""
process = _make_process()

process._parse_ffmpeg_output_line(
"Output #0, mov,mp4,m4a,3gp,3g2,mj2, to '/tmp/out.mp4':"
)

assert process.output_media_info["container"] == "MOV"


def test_parse_ffmpeg_output_line_handles_hls_muxer():
"""HLS output ('Output #0, hls, to ...') is the common transcoded path."""
process = _make_process()

process._parse_ffmpeg_output_line("Output #0, hls, to '/tmp/abc/index.m3u8':")

assert process.output_media_info["container"] == "HLS"


def test_stream_lines_before_output_marker_go_to_input_media_info():
"""While the parser hasn't seen Output #0, codec lines describe the source."""
process = _make_process()

process._parse_ffmpeg_stream_line(
" Stream #0:0[0x100]: Video: hevc (Main), yuv420p, 1920x1080, 50 fps"
)
process._parse_ffmpeg_stream_line(
" Stream #0:1[0x101](eng): Audio: ac3, 48000 Hz, 5.1, fltp, 384 kb/s"
)

assert process.media_info["video_codec"] == "hevc"
assert process.media_info["resolution"] == "1920x1080"
assert process.media_info["audio_codec"] == "ac3"
assert process.media_info["audio_channels"] == "5.1"
assert process.output_media_info == {}


def test_stream_lines_after_output_marker_go_to_output_media_info():
"""Once Output # is seen, the encoder Stream lines populate output_media_info,
leaving the input description intact."""
process = _make_process()

process._parse_ffmpeg_stream_line(
" Stream #0:0[0x100]: Video: hevc, yuv420p, 1920x1080, 50 fps"
)
process._parse_ffmpeg_stream_line(
" Stream #0:1[0x101]: Audio: ac3, 48000 Hz, 5.1, fltp, 384 kb/s"
)
process._parse_ffmpeg_output_line("Output #0, mpegts, to 'pipe:1':")
process._parse_ffmpeg_stream_line(
" Stream #0:0: Video: h264, yuv420p, 1280x720, q=2-31, 25 fps"
)
process._parse_ffmpeg_stream_line(
" Stream #0:1: Audio: aac (LC), 48000 Hz, stereo, fltp, 128 kb/s"
)

# Source side untouched
assert process.media_info["video_codec"] == "hevc"
assert process.media_info["resolution"] == "1920x1080"
assert process.media_info["audio_codec"] == "ac3"
assert process.media_info["audio_channels"] == "5.1"
# Encoder side captured separately
assert process.output_media_info["video_codec"] == "h264"
assert process.output_media_info["resolution"] == "1280x720"
assert process.output_media_info["audio_codec"] == "aac"
assert process.output_media_info["audio_channels"] == "stereo"
assert process.output_media_info["container"] == "MPEGTS"


def test_progress_fields_dual_write_once_output_seen():
"""Progress numbers describe encoder output. Before Output #0 they only
populate media_info (so the existing UI keeps getting bitrate/fps even on
very early reads); after Output #0 they mirror into output_media_info."""
process = _make_process()

process._parse_ffmpeg_progress(
"frame=10 fps=25 bitrate=1000.0kbits/s speed=1.0x"
)
assert process.media_info["bitrate_kbps"] == 1000.0
assert process.media_info["fps"] == 25.0
assert "bitrate_kbps" not in process.output_media_info

process._parse_ffmpeg_output_line("Output #0, mpegts, to 'pipe:1':")
process._parse_ffmpeg_progress(
"frame=20 fps=30 bitrate=2000.5kbits/s speed=1.5x"
)

assert process.media_info["bitrate_kbps"] == 2000.5
assert process.media_info["fps"] == 30.0
assert process.output_media_info["bitrate_kbps"] == 2000.5
assert process.output_media_info["fps"] == 30.0
assert process.output_media_info["frame"] == 20
assert process.output_media_info["speed"] == 1.5


def test_parse_ffmpeg_output_line_does_not_clobber_existing_container():
"""Some configs may emit multiple Output # markers (e.g. tee); only the
first wins so output_media_info reflects the primary muxer."""
process = _make_process()

process._parse_ffmpeg_output_line("Output #0, mpegts, to 'pipe:1':")
process._parse_ffmpeg_output_line("Output #1, hls, to '/tmp/idx.m3u8':")

assert process.output_media_info["container"] == "MPEGTS"


def test_get_output_media_info_returns_empty_for_non_transcoded_streams():
"""Plain HTTP-proxy streams have no ffmpeg process — output_media_info must
be empty so the editor knows not to render the Output Info badge row."""
from stream_manager import StreamInfo, StreamManager
from datetime import datetime, timezone

manager = StreamManager.__new__(StreamManager)
manager.pooled_manager = None

stream = StreamInfo(
stream_id="plain-stream",
original_url="http://example.com/plain.ts",
created_at=datetime.now(timezone.utc),
last_access=datetime.now(timezone.utc),
)

assert manager._get_output_media_info(stream) == {}


def test_get_output_media_info_pulls_from_linked_pooled_process():
"""Transcoded streams expose the linked process's output_media_info dict so
encoder-side stats reach the API response alongside the input description."""
from stream_manager import StreamInfo, StreamManager
from datetime import datetime, timezone

manager = StreamManager.__new__(StreamManager)
pooled = MagicMock()
fake_process = MagicMock()
fake_process.output_media_info = {
"resolution": "1280x720",
"video_codec": "h264",
"audio_codec": "aac",
"audio_channels": "stereo",
"container": "MPEGTS",
"fps": 25.0,
"bitrate_kbps": 2500.0,
}
pooled.shared_processes = {"key-out": fake_process}
manager.pooled_manager = pooled

stream = StreamInfo(
stream_id="t-stream",
original_url="http://example.com/t.ts",
created_at=datetime.now(timezone.utc),
last_access=datetime.now(timezone.utc),
transcode_stream_key="key-out",
)

info = manager._get_output_media_info(stream)

assert info["resolution"] == "1280x720"
assert info["video_codec"] == "h264"
assert info["audio_codec"] == "aac"
assert info["audio_channels"] == "stereo"
assert info["container"] == "MPEGTS"
assert info["fps"] == 25.0
assert info["bitrate_kbps"] == 2500.0


def test_force_stop_stream_does_not_clobber_concurrent_reinsertion():
"""
Regression for failover race: force_stop_stream must tear down the process
Expand Down
Loading