Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 170 additions & 10 deletions src/pooled_stream_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import asyncio
import json
import re
import time
import uuid
import hashlib
Expand All @@ -14,6 +15,36 @@
import os
import tempfile

# Live ffmpeg progress fields written to stderr (e.g.
# "frame= 243 fps= 30 q=28.0 size= 1152kB time=00:00:08.13 bitrate=1162.5kbits/s speed=1.01x")
_FFMPEG_PROGRESS_FIELD_RE = re.compile(
r"\b(?P<key>frame|fps|bitrate|speed|q|size|time)\s*=\s*(?P<val>\S+)"
)

# "Input #0, mpegts, from 'http://...':" or
# "Input #0, mov,mp4,m4a,3gp,3g2,mj2, from 'file.mp4':" — captures the comma-
# separated container synonyms list (split into the canonical name later).
_FFMPEG_INPUT_LINE_RE = re.compile(
r"^\s*Input #\d+,\s*(?P<container>[\w,]+),\s*from\s+"
)

# "Stream #0:0[0x100]: Video: h264 (Main) ([27][0][0][0] / 0x001B), yuv420p..."
# "Stream #0:0[0x100][0x200]: Video: hevc ..." (dual PID bracket groups in MPEG-TS)
# "Stream #0:1[0x101](eng): Audio: aac (LC) ..."
_FFMPEG_STREAM_LINE_RE = re.compile(
r"Stream #\d+:\d+(?:\[[^\]]+\])*(?:\([^)]+\))?:\s+"
r"(?P<type>Video|Audio):\s+(?P<details>.+)$"
)

# Audio channel layout: "stereo", "mono", "5.1", "5.1(side)", "5.1(back)", "7.1", "quad", etc.
# The (?:\([^)]*\))? tolerates the variant suffixes ffmpeg appends to 5.1/7.1.
_AUDIO_LAYOUT_RE = re.compile(
r",\s*(?P<layout>stereo|mono|5\.1|7\.1|quad)(?:\([^)]*\))?[,\s]"
)

# "1280x720" or "1920x1080 [SAR 1:1 DAR 16:9]" inside a Stream line.
_RESOLUTION_RE = re.compile(r"\b(?P<w>\d{2,5})x(?P<h>\d{2,5})\b")

logger = logging.getLogger(__name__)

try:
Expand Down Expand Up @@ -63,6 +94,14 @@ def __init__(
self.last_access = time.time()
self.total_bytes_served = 0
self.status = "starting"
# Live media info parsed from the active ffmpeg process's own stderr —
# codec/container/resolution/audio from the "Input #" and "Stream #"
# lines printed at startup, plus live bitrate/fps from the periodic
# progress line. We do NOT run a separate ffprobe against the source
# because that would double the upstream connection count and IPTV
# providers reject the second connection. Empty for resolver streams
# (streamlink/yt-dlp) that don't go through ffmpeg.
self.media_info: Dict[str, Any] = {}

# Broadcasting support - each client gets its own queue
self.client_queues: Dict[str, asyncio.Queue] = {}
Expand Down Expand Up @@ -347,7 +386,8 @@ async def start_process(self):
self.status = "running"
logger.info(f"Shared FFmpeg process started with PID: {self.process.pid}")

# Start stderr logging task
# Start stderr logging task — also parses codec/resolution/bitrate
# from ffmpeg's own output (no extra upstream connections).
asyncio.create_task(self._log_stderr())

# Start broadcaster task to read from FFmpeg and send to all clients
Expand Down Expand Up @@ -452,6 +492,84 @@ async def _hls_watch_loop(self):
except Exception as e:
logger.debug(f"HLS watch loop ended for {self.stream_id}: {e}")

def _parse_ffmpeg_input_line(self, line_str: str) -> None:
"""
Parse ffmpeg's "Input #0, FORMAT, from 'URL':" line for the container
format. ffmpeg prints this once per input when it opens the stream.
"""
match = _FFMPEG_INPUT_LINE_RE.match(line_str)
if not match:
return
container = match.group("container").strip().split(",")[0].strip()
if container and "container" not in self.media_info:
self.media_info["container"] = container.upper()

def _parse_ffmpeg_stream_line(self, line_str: str) -> None:
"""
Parse ffmpeg's "Stream #0:N[...]: Video|Audio: ..." lines for codec
details. ffmpeg prints one per stream right after the input is opened
— this is free metadata that doesn't require a second connection.
"""
match = _FFMPEG_STREAM_LINE_RE.search(line_str)
if not match:
return
stream_type = match.group("type").lower()
details = match.group("details")
if stream_type == "video":
# Codec name is the first token, may be followed by a parenthesised
# profile (e.g. "h264 (Main) (HEVC / 0x...)" — strip parens).
codec = details.split(",", 1)[0].strip()
codec = re.sub(r"\s*\(.*", "", codec).strip()
if codec and "video_codec" not in self.media_info:
self.media_info["video_codec"] = codec
res_match = _RESOLUTION_RE.search(details)
if res_match and "resolution" not in self.media_info:
self.media_info["resolution"] = (
f"{res_match.group('w')}x{res_match.group('h')}"
)
elif stream_type == "audio":
codec = details.split(",", 1)[0].strip()
codec = re.sub(r"\s*\(.*", "", codec).strip()
if codec and "audio_codec" not in self.media_info:
self.media_info["audio_codec"] = codec
layout_match = _AUDIO_LAYOUT_RE.search(details)
if layout_match and "audio_channels" not in self.media_info:
self.media_info["audio_channels"] = layout_match.group("layout")

def _parse_ffmpeg_progress(self, line_str: str) -> None:
"""
Extract live progress fields (bitrate, fps, frame, speed) from a single
ffmpeg stderr line and update media_info. No-op for non-progress lines.
"""
# Cheap pre-filter — most stderr lines aren't progress
if "bitrate=" not in line_str and "fps=" not in line_str:
return
for match in _FFMPEG_PROGRESS_FIELD_RE.finditer(line_str):
key = match.group("key")
raw = match.group("val")
if key == "bitrate":
if raw.endswith("kbits/s"):
try:
self.media_info["bitrate_kbps"] = round(float(raw[:-7]), 1)
except ValueError:
pass
elif key == "fps":
try:
self.media_info["fps"] = round(float(raw), 2)
except ValueError:
pass
elif key == "frame":
try:
self.media_info["frame"] = int(raw)
except ValueError:
pass
elif key == "speed":
if raw.endswith("x"):
try:
self.media_info["speed"] = round(float(raw[:-1]), 2)
except ValueError:
pass

async def _log_stderr(self):
"""Log FFmpeg stderr output and monitor for write errors and input failures"""
if not self.process or not self.process.stderr:
Expand Down Expand Up @@ -502,15 +620,36 @@ async def _log_stderr(self):

buf += chunk

# Split on newline and process full lines
while b"\n" in buf:
line, buf = buf.split(b"\n", 1)
# Split on \n OR \r — ffmpeg writes its periodic stats line with
# a trailing \r so it overwrites in-place in a terminal. Without
# splitting on \r those stats lines are buffered until the next
# newline arrives and we miss the live bitrate/fps updates.
while True:
n_idx = buf.find(b"\n")
r_idx = buf.find(b"\r")
if n_idx == -1 and r_idx == -1:
break
if n_idx == -1:
idx = r_idx
elif r_idx == -1:
idx = n_idx
else:
idx = min(n_idx, r_idx)
line, buf = buf[:idx], buf[idx + 1 :]
line_str = line.decode("utf-8", errors="ignore").strip()
if not line_str:
continue

# Log FFmpeg output (you could parse stats here)
# Log FFmpeg output and capture media info / live progress.
# Skip parsers for resolver (yt-dlp/streamlink) stderr — their
# output format is not ffmpeg's, so the regexes won't match but
# a coincidental "fps=" or "bitrate=" in debug output could
# populate stale values.
logger.debug(f"FFmpeg [{self.stream_id}]: {line_str}")
if not self.resolver_type:
self._parse_ffmpeg_input_line(line_str)
self._parse_ffmpeg_stream_line(line_str)
self._parse_ffmpeg_progress(line_str)

line_lower = line_str.lower()

Expand Down Expand Up @@ -1293,16 +1432,21 @@ async def remove_client_from_stream(self, client_id: str):
async def force_stop_stream(self, stream_key: str):
"""
Immediately stop a stream and its FFmpeg process without grace period.
Used when explicitly deleting a stream via API.
Used when explicitly deleting a stream via API and during failover.

Pops the entry up front and cleans up the captured reference so a
concurrent get_or_create_shared_stream that re-inserts at the same key
(e.g. a failover client racing this teardown) isn't torn down by the
trailing cleanup.
"""
if stream_key not in self.shared_processes:
process = self.shared_processes.pop(stream_key, None)
if process is None:
logger.info(f"Stream {stream_key} not found in local processes")
return False

logger.info(
f"Force stopping stream {stream_key} and terminating FFmpeg process"
)
process = self.shared_processes[stream_key]

# Remove all clients from this stream immediately
clients_to_remove = list(process.clients.keys())
Expand All @@ -1311,8 +1455,24 @@ async def force_stop_stream(self, stream_key: str):
if client_id in self.client_streams:
del self.client_streams[client_id]

# Immediately cleanup the FFmpeg process
await self._cleanup_local_process(stream_key)
# Tear down the captured process — NOT whatever is at stream_key now.
await process.cleanup()

# Mirror the auxiliary cleanup that _cleanup_local_process does so the
# state stays consistent regardless of which path called us.
if stream_key in self.stream_key_to_id:
del self.stream_key_to_id[stream_key]
if self.redis_client:
try:
redis_key = f"stream:{stream_key}"
await self.redis_client.delete(redis_key)
await self.redis_client.srem(
f"worker:{self.worker_id}:streams", redis_key
)
except Exception as e:
logger.warning(
f"Error cleaning up Redis state for stream {stream_key}: {e}"
)

logger.info(f"Stream {stream_key} force stopped, FFmpeg process terminated")
return True
Expand Down
41 changes: 31 additions & 10 deletions src/stream_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -4315,20 +4315,27 @@ async def _try_update_failover_url(
stream_info.failover_event.clear()
stream_info.failover_event.set()

# For transcoded streams, stop and restart the transcoding process
# For transcoded streams, stop and restart the transcoding process.
# Clear the stream key BEFORE awaiting force_stop_stream — otherwise a
# client coroutine woken by failover_event can read the still-set
# transcode_stream_key and pass it as reuse_stream_key, which creates a
# new SharedTranscodingProcess at the same key. The tail-end
# _cleanup_local_process inside force_stop_stream then pops whatever is
# at that key (the brand-new process) and tears down its ffmpeg, killing
# the failover stream a couple of seconds after it started.
if stream_info.is_transcoded and self.pooled_manager:
try:
# Stop the old transcoding process
if stream_info.transcode_stream_key:
old_stream_key = stream_info.transcode_stream_key
stream_info.transcode_stream_key = None
if old_stream_key:
try:
logger.info(
f"Stopping transcoding process for failover: {stream_info.transcode_stream_key}"
f"Stopping transcoding process for failover: {old_stream_key}"
)
await self.pooled_manager.force_stop_stream(
stream_info.transcode_stream_key
await self.pooled_manager.force_stop_stream(old_stream_key)
except Exception as e:
logger.error(
f"Error stopping transcoding process during failover: {e}"
)
stream_info.transcode_stream_key = None
except Exception as e:
logger.error(f"Error stopping transcoding process during failover: {e}")

# Emit failover event
await self._emit_event(
Expand Down Expand Up @@ -4531,6 +4538,19 @@ async def _monitor_connection_idle_time(self):
client_info.idle_warning_logged = False
client_info.idle_error_logged = False

def _get_media_info(self, stream: "StreamInfo") -> Dict[str, Any]:
"""
Look up live media info (codec/container/resolution/bitrate/fps) for a
stream. Returns an empty dict for non-transcoded streams since live
ffmpeg metadata only exists when ffmpeg is the active producer.
"""
if not stream.transcode_stream_key or not self.pooled_manager:
return {}
process = self.pooled_manager.shared_processes.get(stream.transcode_stream_key)
if not process:
return {}
return dict(getattr(process, "media_info", {}) or {})

def get_stats(self) -> Dict:
"""Get comprehensive stats - aggregates variant stream stats into parent streams"""
# Only count non-variant streams
Expand Down Expand Up @@ -4662,6 +4682,7 @@ def get_stats(self) -> Dict:
"last_access": stream.last_access.isoformat(),
"metadata": stream.metadata,
"headers": stream.headers,
"media_info": self._get_media_info(stream),
}
for stream in non_variant_streams
],
Expand Down
Loading
Loading