diff --git a/plugins/multiview/__init__.py b/plugins/multiview/__init__.py index 4fc48ff..abd4d48 100644 --- a/plugins/multiview/__init__.py +++ b/plugins/multiview/__init__.py @@ -132,27 +132,7 @@ def fields(self): settings = cfg.settings except Exception: settings = {} - fields = _config().build_plugin_fields(settings) - return [self._pyav_status_field()] + fields - - def _pyav_status_field(self) -> dict: - """Info field showing whether the PyAV media engine is installed.""" - try: - deps = self._deps() - arch = deps.detect_arch() - if not arch: - import platform - desc = (f"⚠ Unsupported CPU architecture ({platform.machine()}); " - f"PyAV is unavailable, streaming will not work.") - elif deps.pyav_status(arch): - desc = f"✓ PyAV {deps.pyav_status(arch)} installed for {arch}." - else: - desc = (f"⚠ PyAV is NOT installed for {arch}. Run the " - f"'Install PyAV' action below before streaming.") - except Exception as e: # noqa: BLE001 - desc = f"PyAV status unknown: {e}" - return {"id": "_pyav_status", "label": "Media Engine (PyAV)", - "type": "info", "description": desc} + return _config().build_plugin_fields(settings) # Action dispatcher diff --git a/plugins/multiview/channel.py b/plugins/multiview/channel.py new file mode 100644 index 0000000..2c626b3 --- /dev/null +++ b/plugins/multiview/channel.py @@ -0,0 +1,327 @@ +"""Channel decoder: one child IPTV stream demuxed into video and audio buffers. + +Imported by compositor_worker.py (the subprocess entry point). Lives here so +the PyAV/numpy dependency and the YUV compositing utilities are co-located with +the class that uses them, separate from the encoder and orchestration code. +""" + +import os +import platform +import sys +import threading +import time + +# Vendored PyAV is shipped per-platform under vendor//; pick the one +# matching this machine and put it on the path before importing av. +_VENDOR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "vendor") +_ARCH_DIR = { + "x86_64": "linux-x86_64", "amd64": "linux-x86_64", + "aarch64": "linux-aarch64", "arm64": "linux-aarch64", +}.get(platform.machine().lower()) +if _ARCH_DIR and os.path.isdir(os.path.join(_VENDOR, _ARCH_DIR)): + sys.path.insert(0, os.path.join(_VENDOR, _ARCH_DIR)) + +import numpy as np # noqa: E402 +try: + import av # noqa: E402 (vendored, installed on demand) +except ImportError: + sys.stderr.write( + f"[mvworker] FATAL: PyAV not installed for arch '{platform.machine()}' " + f"(expected {_VENDOR}/{_ARCH_DIR}). Open the Multiview plugin settings and " + f"run the 'Install PyAV' action.\n") + raise + +TILE_STALE_SECS = 1.5 +RECONNECT_BASE = 2.0 # first retry delay (seconds) +RECONNECT_MAX = 60.0 # cap on per-retry delay +RECONNECT_RETRIES = 12 # consecutive failures before giving up (~8 min total) +AUDIO_RATE = 48000 +AUDIO_LAYOUT = "stereo" + +# Tolerate flaky IPTV (skip corrupt packets, ignore decode errors, generous +# probe) and bound I/O so a dead child errors and retries instead of hanging. +# Matches what the old ffmpeg tile decoders used; PyAV's strict defaults choke +# on partial/corrupt mpegts ("Invalid data found when processing input"). +DECODE_OPTS = { + "fflags": "+discardcorrupt+genpts", + "analyzeduration": "5000000", + "probesize": "5000000", + "err_detect": "ignore_err", + "rw_timeout": "15000000", # 15s I/O timeout (microseconds) +} + + +def log(msg): + sys.stderr.write(f"[mvworker] {msg}\n") + sys.stderr.flush() + + +def yuv_planes_from_frame(frame, w, h): + """Extract (Y, U, V) as contiguous numpy arrays from a yuv420p VideoFrame, + stripping each plane's stride padding.""" + p0, p1, p2 = frame.planes + Y = np.frombuffer(memoryview(p0), np.uint8).reshape(h, p0.line_size)[:, :w] + U = np.frombuffer(memoryview(p1), np.uint8).reshape(h // 2, p1.line_size)[:, :w // 2] + V = np.frombuffer(memoryview(p2), np.uint8).reshape(h // 2, p2.line_size)[:, :w // 2] + return Y.copy(), U.copy(), V.copy() + + +def black_planes(w, h): + return (np.zeros((h, w), np.uint8), + np.full((h // 2, w // 2), 128, np.uint8), + np.full((h // 2, w // 2), 128, np.uint8)) + + +def _yuv_planes(buf, w, h): + """(Y, U, V) plane views into a flat yuv420p buffer (Y|U|V byte order).""" + ysize = w * h + csize = (w // 2) * (h // 2) + Y = buf[:ysize].reshape(h, w) + U = buf[ysize:ysize + csize].reshape(h // 2, w // 2) + V = buf[ysize + csize:ysize + 2 * csize].reshape(h // 2, w // 2) + return Y, U, V + + +def _even(v): + return max(2, (int(v) // 2) * 2) + + +def fit_into_tile(frame, w, h): + """Scale a decoded frame into a w x h yuv420p tile preserving aspect ratio, + centered on black (letterbox/pillarbox) - matches the old scale+pad behavior.""" + sw, sh = frame.width, frame.height + if sw <= 0 or sh <= 0: + return black_planes(w, h) + scale = min(w / sw, h / sh) + tw, th = _even(sw * scale), _even(sh * scale) + tw, th = min(tw, w), min(th, h) + sf = frame.reformat(width=tw, height=th, format="yuv420p") + sy, su, sv = yuv_planes_from_frame(sf, tw, th) + Y, U, V = black_planes(w, h) + ox = ((w - tw) // 2) & ~1 + oy = ((h - th) // 2) & ~1 + Y[oy:oy + th, ox:ox + tw] = sy + U[oy // 2:oy // 2 + th // 2, ox // 2:ox // 2 + tw // 2] = su + V[oy // 2:oy // 2 + th // 2, ox // 2:ox // 2 + tw // 2] = sv + return (Y, U, V) + + +class Channel: + """One child channel: ONE realsrc connection, demuxed into this tile's video + and (if this channel supplies audio) its audio track. Decoding each channel + once (instead of separate video+audio connections) halves the load on the + provider/proxy, which was corrupting the video under multiview load.""" + + def __init__(self, spec): + self.url = spec["url"] + self.x, self.y = spec["x"], spec["y"] + self.w, self.h = spec["w"], spec["h"] + self.name = spec.get("name", "") + self.provides_audio = bool(spec.get("audio", False)) + self.lang = spec.get("lang", "und") + self.featured = bool(spec.get("featured", False)) + self.fallback = black_planes(self.w, self.h) + self.latest = self.fallback + self.fresh_until = 0.0 + logo = spec.get("logo") + if logo: + threading.Thread(target=self._load_logo, args=(logo,), daemon=True).start() + self.running = True + self.vcount = 0 # decoded video frames (for rate diagnostics) + # audio buffer (only used when provides_audio) + self.alock = threading.Lock() + self.aframes = [] # list of (pts_s: float|None, ndarray(n,2) int16) + self.abuffered = 0 + # video PTS clock anchor — updated by run(), read by audio_pts_now() + self.clk_pts: "float | None" = None + self.clk_wall: "float | None" = None + + def _make_fallback(self, logo): + Y, U, V = black_planes(self.w, self.h) + if logo: + try: + with av.open(logo) as c: + for frame in c.decode(video=0): + # Scale to fit within one-third of the tile, preserving aspect ratio. + max_w = (self.w // 3) & ~1 + max_h = (self.h // 3) & ~1 + scale = min(max_w / frame.width, max_h / frame.height) + lw = _even(frame.width * scale) + lh = _even(frame.height * scale) + # Decode as RGBA so transparent areas composite cleanly over black. + # Use to_ndarray() -- planes[0] has stride padding that makes + # raw frombuffer shapes wrong for non-aligned widths. + rf = frame.reformat(width=lw, height=lh, format="rgba") + arr = rf.to_ndarray(format="rgba") # (lh, lw, 4), stride-free + alpha = arr[:, :, 3:4].astype(np.float32) / 255.0 + rgb = (arr[:, :, :3] * alpha).astype(np.uint8) + rgb_frame = av.VideoFrame.from_ndarray(rgb, format="rgb24") + lf = rgb_frame.reformat(format="yuv420p") + ly, lu, lv = yuv_planes_from_frame(lf, lw, lh) + oy = ((self.h - lh) // 2) & ~1 + ox = ((self.w - lw) // 2) & ~1 + Y[oy:oy + lh, ox:ox + lw] = ly + U[oy // 2:(oy + lh) // 2, ox // 2:(ox + lw) // 2] = lu + V[oy // 2:(oy + lh) // 2, ox // 2:(ox + lw) // 2] = lv + break + except Exception as e: # noqa: BLE001 + log(f"logo decode failed for {self.name}: {e}") + return (Y, U, V) + + def _load_logo(self, logo): + """Load logo in background and swap self.fallback when ready.""" + fb = self._make_fallback(logo) + self.fallback = fb # CPython GIL makes tuple attr swap atomic + if self.fresh_until == 0.0: # no real video yet; update latest too + self.latest = fb + + def run(self): + failures = 0 + while self.running: + if failures >= RECONNECT_RETRIES: + log(f"channel {self.name}: giving up after {RECONNECT_RETRIES} failed retries") + break + cont = None + # Flush stale audio and reset the PTS clock before each new + # connection so old samples never bleed into the new stream. + with self.alock: + self.aframes.clear() + self.abuffered = 0 + self.clk_pts = None + self.clk_wall = None + vcount_before = self.vcount + try: + cont = av.open(self.url, options=DECODE_OPTS) + vs = cont.streams.video[0] + # Multi-threaded decode so 1080p sources keep up with the output + # rate (single-threaded PyAV decode runs ~22-27fps -> slow motion). + vs.thread_type = "AUTO" + vs.codec_context.thread_count = 3 + # Sources are 1080p60 but we output 30fps; skip non-reference + # (B) frames at decode to cut decode CPU on the box, which + # otherwise saturates (3x 1080p60 decode + encode). + try: + vs.codec_context.skip_frame = "NONREF" + except Exception: + pass + # Lower-effort decode for non-featured tiles: skip the deblocking + # loop filter. Big decode-CPU saving; the minor blockiness is + # hidden by downscaling small tiles. The featured tile keeps full + # deblocking so it stays sharp. + if not self.featured: + try: + vs.codec_context.skip_loop_filter = "ALL" + except Exception: + pass + streams = [vs] + res = None + aus = None + if self.provides_audio and cont.streams.audio: + aus = cont.streams.audio[0] + streams.append(aus) + res = av.AudioResampler(format="s16", layout=AUDIO_LAYOUT, rate=AUDIO_RATE) + try: + for packet in cont.demux(*streams): + if not self.running: + break + if packet.dts is None: + continue + if packet.stream.type == "video": + for frame in packet.decode(): + if frame.pts is not None: + pts_s = float(frame.pts * vs.time_base) + now = time.monotonic() + if self.clk_pts is None: + self.clk_pts, self.clk_wall = pts_s, now + else: + gap = (self.clk_wall + pts_s - self.clk_pts) - now + if 0 < gap < 2.0: + time.sleep(gap) + elif gap <= -2.0: + self.clk_pts, self.clk_wall = pts_s, time.monotonic() + self.latest = fit_into_tile(frame, self.w, self.h) + self.fresh_until = time.monotonic() + TILE_STALE_SECS + self.vcount += 1 + elif res is not None and packet.stream.type == "audio": + for frame in packet.decode(): + pts_s = (float(frame.pts * aus.time_base) + if frame.pts is not None else None) + for rf in res.resample(frame): + a = rf.to_ndarray() + a = a.reshape(-1, 2) if a.shape[0] == 1 else a.T + with self.alock: + self.aframes.append((pts_s, a.astype(np.int16))) + self.abuffered += a.shape[0] + self._trim() + finally: + if res is not None: + try: + res.close() + except Exception: + pass + except Exception as e: # noqa: BLE001 + log(f"channel {self.name} ended: {e}") + finally: + if cont is not None: + try: + cont.close() + except Exception: + pass + if self.vcount > vcount_before: + failures = 0 + else: + failures += 1 + if self.running and failures < RECONNECT_RETRIES: + delay = min(RECONNECT_BASE * (2 ** (failures - 1)), RECONNECT_MAX) + log(f"channel {self.name}: retry {failures}/{RECONNECT_RETRIES} in {delay:.0f}s") + time.sleep(delay) + + def current(self): + if time.monotonic() < self.fresh_until: + return self.latest + return self.fallback + + def _trim(self): + cap = AUDIO_RATE * 2 # ~2s + while self.abuffered > cap and self.aframes: + _, drop = self.aframes.pop(0) + self.abuffered -= drop.shape[0] + + def audio_pts_now(self) -> "float | None": + """Current source PTS (seconds) implied by the video clock anchor.""" + if self.clk_pts is None or self.clk_wall is None: + return None + return self.clk_pts + (time.monotonic() - self.clk_wall) + + def _align_to_pts(self, pts_limit: float): + """Discard buffered audio chunks that end before pts_limit.""" + with self.alock: + while self.aframes: + pts_s, chunk = self.aframes[0] + if pts_s is None: + break + if pts_s + chunk.shape[0] / AUDIO_RATE < pts_limit: + self.aframes.pop(0) + self.abuffered -= chunk.shape[0] + else: + break + + def take(self, nsamples: int) -> np.ndarray: + """Return exactly nsamples of int16 (nsamples, 2), silence-padded.""" + out = np.zeros((nsamples, 2), np.int16) + filled = 0 + with self.alock: + while filled < nsamples and self.aframes: + pts_s, chunk = self.aframes[0] + need = nsamples - filled + if chunk.shape[0] <= need: + out[filled:filled + chunk.shape[0]] = chunk + self.aframes.pop(0) + self.abuffered -= chunk.shape[0] + filled += chunk.shape[0] + else: + out[filled:] = chunk[:need] + self.aframes[0] = (pts_s, chunk[need:]) + self.abuffered -= need + filled = nsamples + return out diff --git a/plugins/multiview/compositor_worker.py b/plugins/multiview/compositor_worker.py index 2344b16..b4710ac 100644 --- a/plugins/multiview/compositor_worker.py +++ b/plugins/multiview/compositor_worker.py @@ -2,358 +2,34 @@ Spawned by server._serve_stream as `python compositor_worker.py `. It runs as a plain CPython process (so real threads parallelize across cores and -nothing fights Dispatcharr's gevent hub), using vendored PyAV to: - - * decode each child channel in its own thread, keeping that tile's latest frame - (or a logo/black fallback card when the child is stale/down), - * composite a numpy canvas at the target fps (latest-frame, so a slow/dead child - never stalls the grid: no synchronous barrier), - * decode + resample each selected channel's audio into per-track buffers, - * encode one libx264 video stream + N ac3 audio tracks to MPEG-TS on stdout. - -The plugin's gevent server just reads this process's stdout and forwards it to -Dispatcharr (the proven low-volume boundary). Config schema (argv[1] JSON): +nothing fights Dispatcharr's gevent hub). Config schema (argv[1] JSON): {"out_w","out_h","fps","bitrate","crf","preset", "tiles":[{"url","x","y","w","h","logo","name"}...], "audio":[{"url","name","lang"}...]} + +The Channel class (decode, YUV compositing, audio buffering) and its PyAV/numpy +dependencies live in channel.py. Encoder construction and hardware detection live +in parameters.py. This module handles the main compositing loop. """ import json import os -import platform import subprocess import sys import threading import time -from fractions import Fraction -# Vendored PyAV is shipped per-platform under vendor//; pick the one -# matching this machine and put it on the path before importing av. -_VENDOR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "vendor") -_ARCH_DIR = { - "x86_64": "linux-x86_64", "amd64": "linux-x86_64", - "aarch64": "linux-aarch64", "arm64": "linux-aarch64", -}.get(platform.machine().lower()) -if _ARCH_DIR and os.path.isdir(os.path.join(_VENDOR, _ARCH_DIR)): - sys.path.insert(0, os.path.join(_VENDOR, _ARCH_DIR)) +# channel.py sets up the vendored PyAV sys.path as a side effect of import; +# numpy must be imported after so it finds the vendored build. +from channel import Channel, AUDIO_RATE, AUDIO_LAYOUT, log, _yuv_planes # noqa: E402 import numpy as np # noqa: E402 -try: - import av # noqa: E402 (vendored, installed on demand) -except ImportError: - sys.stderr.write( - f"[mvworker] FATAL: PyAV not installed for arch '{platform.machine()}' " - f"(expected {_VENDOR}/{_ARCH_DIR}). Open the Multiview plugin settings and " - f"run the 'Install PyAV' action.\n") - raise - -TILE_STALE_SECS = 1.5 -RECONNECT_BASE = 2.0 # first retry delay (seconds) -RECONNECT_MAX = 60.0 # cap on per-retry delay -RECONNECT_RETRIES = 12 # consecutive failures before giving up (~8 min total) -AUDIO_RATE = 48000 -AUDIO_LAYOUT = "stereo" - -# Tolerate flaky IPTV (skip corrupt packets, ignore decode errors, generous -# probe) and bound I/O so a dead child errors and retries instead of hanging. -# Matches what the old ffmpeg tile decoders used; PyAV's strict defaults choke -# on partial/corrupt mpegts ("Invalid data found when processing input"). -DECODE_OPTS = { - "fflags": "+discardcorrupt+genpts", - "analyzeduration": "5000000", - "probesize": "5000000", - "err_detect": "ignore_err", - "rw_timeout": "15000000", # 15s I/O timeout (microseconds) -} - - -def log(msg): - sys.stderr.write(f"[mvworker] {msg}\n") - sys.stderr.flush() - - -def fps_fraction(fps: str) -> Fraction: - if "/" in fps: - a, b = fps.split("/") - return Fraction(int(a), int(b)) - return Fraction(int(fps), 1) - - -def yuv_planes_from_frame(frame, w, h): - """Extract (Y, U, V) as contiguous numpy arrays from a yuv420p VideoFrame, - stripping each plane's stride padding.""" - p0, p1, p2 = frame.planes - Y = np.frombuffer(memoryview(p0), np.uint8).reshape(h, p0.line_size)[:, :w] - U = np.frombuffer(memoryview(p1), np.uint8).reshape(h // 2, p1.line_size)[:, :w // 2] - V = np.frombuffer(memoryview(p2), np.uint8).reshape(h // 2, p2.line_size)[:, :w // 2] - return Y.copy(), U.copy(), V.copy() - - -def black_planes(w, h): - return (np.zeros((h, w), np.uint8), - np.full((h // 2, w // 2), 128, np.uint8), - np.full((h // 2, w // 2), 128, np.uint8)) - - -def _yuv_planes(buf, w, h): - """(Y, U, V) plane views into a flat yuv420p buffer (Y|U|V byte order).""" - ysize = w * h - csize = (w // 2) * (h // 2) - Y = buf[:ysize].reshape(h, w) - U = buf[ysize:ysize + csize].reshape(h // 2, w // 2) - V = buf[ysize + csize:ysize + 2 * csize].reshape(h // 2, w // 2) - return Y, U, V - - -def _even(v): - return max(2, (int(v) // 2) * 2) - - -def fit_into_tile(frame, w, h): - """Scale a decoded frame into a w x h yuv420p tile preserving aspect ratio, - centered on black (letterbox/pillarbox) - matches the old scale+pad behavior.""" - sw, sh = frame.width, frame.height - if sw <= 0 or sh <= 0: - return black_planes(w, h) - scale = min(w / sw, h / sh) - tw, th = _even(sw * scale), _even(sh * scale) - tw, th = min(tw, w), min(th, h) - sf = frame.reformat(width=tw, height=th, format="yuv420p") - sy, su, sv = yuv_planes_from_frame(sf, tw, th) - Y, U, V = black_planes(w, h) - ox = ((w - tw) // 2) & ~1 - oy = ((h - th) // 2) & ~1 - Y[oy:oy + th, ox:ox + tw] = sy - U[oy // 2:oy // 2 + th // 2, ox // 2:ox // 2 + tw // 2] = su - V[oy // 2:oy // 2 + th // 2, ox // 2:ox // 2 + tw // 2] = sv - return (Y, U, V) - - -# ---------------------------------------------------------------- channels - -class Channel: - """One child channel: ONE realsrc connection, demuxed into this tile's video - and (if this channel supplies audio) its audio track. Decoding each channel - once (instead of separate video+audio connections) halves the load on the - provider/proxy, which was corrupting the video under multiview load.""" - - def __init__(self, spec): - self.url = spec["url"] - self.x, self.y = spec["x"], spec["y"] - self.w, self.h = spec["w"], spec["h"] - self.name = spec.get("name", "") - self.provides_audio = bool(spec.get("audio", False)) - self.lang = spec.get("lang", "und") - self.featured = bool(spec.get("featured", False)) - self.fallback = black_planes(self.w, self.h) - self.latest = self.fallback - self.fresh_until = 0.0 - logo = spec.get("logo") - if logo: - threading.Thread(target=self._load_logo, args=(logo,), daemon=True).start() - self.running = True - self.vcount = 0 # decoded video frames (for rate diagnostics) - # audio buffer (only used when provides_audio) - self.alock = threading.Lock() - self.aframes = [] # list of (pts_s: float|None, ndarray(n,2) int16) - self.abuffered = 0 - # video PTS clock anchor — updated by run(), read by audio_pts_now() - self.clk_pts: "float | None" = None - self.clk_wall: "float | None" = None - - def _make_fallback(self, logo): - Y, U, V = black_planes(self.w, self.h) - if logo: - try: - with av.open(logo) as c: - for frame in c.decode(video=0): - # Scale to fit within one-third of the tile, preserving aspect ratio. - max_w = (self.w // 3) & ~1 - max_h = (self.h // 3) & ~1 - scale = min(max_w / frame.width, max_h / frame.height) - lw = _even(frame.width * scale) - lh = _even(frame.height * scale) - # Decode as RGBA so transparent areas composite cleanly over black. - # Use to_ndarray() -- planes[0] has stride padding that makes - # raw frombuffer shapes wrong for non-aligned widths. - rf = frame.reformat(width=lw, height=lh, format="rgba") - arr = rf.to_ndarray(format="rgba") # (lh, lw, 4), stride-free - alpha = arr[:, :, 3:4].astype(np.float32) / 255.0 - rgb = (arr[:, :, :3] * alpha).astype(np.uint8) - rgb_frame = av.VideoFrame.from_ndarray(rgb, format="rgb24") - lf = rgb_frame.reformat(format="yuv420p") - ly, lu, lv = yuv_planes_from_frame(lf, lw, lh) - oy = ((self.h - lh) // 2) & ~1 - ox = ((self.w - lw) // 2) & ~1 - Y[oy:oy + lh, ox:ox + lw] = ly - U[oy // 2:(oy + lh) // 2, ox // 2:(ox + lw) // 2] = lu - V[oy // 2:(oy + lh) // 2, ox // 2:(ox + lw) // 2] = lv - break - except Exception as e: # noqa: BLE001 - log(f"logo decode failed for {self.name}: {e}") - return (Y, U, V) - - def _load_logo(self, logo): - """Load logo in background and swap self.fallback when ready.""" - fb = self._make_fallback(logo) - self.fallback = fb # CPython GIL makes tuple attr swap atomic - if self.fresh_until == 0.0: # no real video yet; update latest too - self.latest = fb - - def run(self): - failures = 0 - while self.running: - if failures >= RECONNECT_RETRIES: - log(f"channel {self.name}: giving up after {RECONNECT_RETRIES} failed retries") - break - cont = None - # Flush stale audio and reset the PTS clock before each new - # connection so old samples never bleed into the new stream. - with self.alock: - self.aframes.clear() - self.abuffered = 0 - self.clk_pts = None - self.clk_wall = None - vcount_before = self.vcount - try: - cont = av.open(self.url, options=DECODE_OPTS) - vs = cont.streams.video[0] - # Multi-threaded decode so 1080p sources keep up with the output - # rate (single-threaded PyAV decode runs ~22-27fps -> slow motion). - vs.thread_type = "AUTO" - vs.codec_context.thread_count = 3 - # Sources are 1080p60 but we output 30fps; skip non-reference - # (B) frames at decode to cut decode CPU on the box, which - # otherwise saturates (3x 1080p60 decode + encode). - try: - vs.codec_context.skip_frame = "NONREF" - except Exception: - pass - # Lower-effort decode for non-featured tiles: skip the deblocking - # loop filter. Big decode-CPU saving; the minor blockiness is - # hidden by downscaling small tiles. The featured tile keeps full - # deblocking so it stays sharp. - if not self.featured: - try: - vs.codec_context.skip_loop_filter = "ALL" - except Exception: - pass - streams = [vs] - res = None - aus = None - if self.provides_audio and cont.streams.audio: - aus = cont.streams.audio[0] - streams.append(aus) - res = av.AudioResampler(format="s16", layout=AUDIO_LAYOUT, rate=AUDIO_RATE) - try: - for packet in cont.demux(*streams): - if not self.running: - break - if packet.dts is None: - continue - if packet.stream.type == "video": - for frame in packet.decode(): - if frame.pts is not None: - pts_s = float(frame.pts * vs.time_base) - now = time.monotonic() - if self.clk_pts is None: - self.clk_pts, self.clk_wall = pts_s, now - else: - gap = (self.clk_wall + pts_s - self.clk_pts) - now - if 0 < gap < 2.0: - time.sleep(gap) - elif gap <= -2.0: - self.clk_pts, self.clk_wall = pts_s, time.monotonic() - self.latest = fit_into_tile(frame, self.w, self.h) - self.fresh_until = time.monotonic() + TILE_STALE_SECS - self.vcount += 1 - elif res is not None and packet.stream.type == "audio": - for frame in packet.decode(): - pts_s = (float(frame.pts * aus.time_base) - if frame.pts is not None else None) - for rf in res.resample(frame): - a = rf.to_ndarray() - a = a.reshape(-1, 2) if a.shape[0] == 1 else a.T - with self.alock: - self.aframes.append((pts_s, a.astype(np.int16))) - self.abuffered += a.shape[0] - self._trim() - finally: - if res is not None: - try: - res.close() - except Exception: - pass - except Exception as e: # noqa: BLE001 - log(f"channel {self.name} ended: {e}") - finally: - if cont is not None: - try: - cont.close() - except Exception: - pass - if self.vcount > vcount_before: - failures = 0 - else: - failures += 1 - if self.running and failures < RECONNECT_RETRIES: - delay = min(RECONNECT_BASE * (2 ** (failures - 1)), RECONNECT_MAX) - log(f"channel {self.name}: retry {failures}/{RECONNECT_RETRIES} in {delay:.0f}s") - time.sleep(delay) - def current(self): - if time.monotonic() < self.fresh_until: - return self.latest - return self.fallback +from parameters import fps_fraction, build_encoder_cmd, validate_encoder # noqa: E402 - def _trim(self): - cap = AUDIO_RATE * 2 # ~2s - while self.abuffered > cap and self.aframes: - _, drop = self.aframes.pop(0) - self.abuffered -= drop.shape[0] - def audio_pts_now(self) -> "float | None": - """Current source PTS (seconds) implied by the video clock anchor.""" - if self.clk_pts is None or self.clk_wall is None: - return None - return self.clk_pts + (time.monotonic() - self.clk_wall) - - def _align_to_pts(self, pts_limit: float): - """Discard buffered audio chunks that end before pts_limit.""" - with self.alock: - while self.aframes: - pts_s, chunk = self.aframes[0] - if pts_s is None: - break - if pts_s + chunk.shape[0] / AUDIO_RATE < pts_limit: - self.aframes.pop(0) - self.abuffered -= chunk.shape[0] - else: - break - - def take(self, nsamples: int) -> np.ndarray: - """Return exactly nsamples of int16 (nsamples, 2), silence-padded.""" - out = np.zeros((nsamples, 2), np.int16) - filled = 0 - with self.alock: - while filled < nsamples and self.aframes: - pts_s, chunk = self.aframes[0] - need = nsamples - filled - if chunk.shape[0] <= need: - out[filled:filled + chunk.shape[0]] = chunk - self.aframes.pop(0) - self.abuffered -= chunk.shape[0] - filled += chunk.shape[0] - else: - out[filled:] = chunk[:need] - self.aframes[0] = (pts_s, chunk[need:]) - self.abuffered -= need - filled = nsamples - return out - - -# ---------------------------------------------------------------- encoder +# ---------------------------------------------------------------- compositing helpers def _write_all(fd, data): mv = memoryview(data) @@ -366,89 +42,40 @@ def _write_all(fd, data): return True -def _nvenc_available() -> bool: - try: - r = subprocess.run( - ["ffmpeg", "-hide_banner", "-encoders"], - capture_output=True, text=True, timeout=5, - ) - return "h264_nvenc" in r.stdout - except Exception: - return False - - -def build_encoder_cmd(cfg, out_w, out_h, audio_read): - bitrate = int(cfg.get("bitrate", 8000)) - gop = max(2, round(float(fps_fraction(cfg["fps"])) * 2)) - encoder = cfg.get("video_encoder", "libx264") - preset = cfg.get("preset") or ("p4" if encoder == "h264_nvenc" else "ultrafast") - cmd = ["ffmpeg", "-hide_banner", "-loglevel", "error", - # Cap muxer/filter threads so it doesn't grab every core and starve - # the PyAV decoders (3x 1080p60 decode already loads the box). - "-threads", str(cfg.get("enc_threads", 4)), - "-f", "rawvideo", "-pix_fmt", "yuv420p", "-s", f"{out_w}x{out_h}", - "-r", cfg["fps"], "-thread_queue_size", "512", "-i", "pipe:0"] - for r in audio_read: - cmd += ["-f", "s16le", "-ar", str(AUDIO_RATE), "-ac", "2", - "-thread_queue_size", "512", "-i", f"pipe:{r}"] - cmd += ["-map", "0:v:0"] - for i in range(len(audio_read)): - cmd += ["-map", f"{i + 1}:a:0"] - # VBV CBR: constant bitrate regardless of content complexity. CRF (VBR) - # produces near-zero bitrate for static/logo content; IPTV players drain - # their receive buffer faster than realtime when the rate is very low, - # causing fast-forward. CBR pads with filler NAL units to hold constant - # rate. bufsize = 0.5x target keeps encode latency low. - # -muxrate is NOT used: CBR already guarantees constant output rate; - # -muxrate adds MPEG-TS null packets that shift the PCR clock away from - # video PTS, causing player sync issues. - if encoder == "h264_nvenc": - # NVENC CBR via -rc cbr (pads with filler NAL units, same guarantee as - # x264 CBR). -minrate, -keyint_min, -sc_threshold are x264-only. - cmd += ["-c:v", "h264_nvenc", "-preset", preset, - "-rc", "cbr", - "-pix_fmt", "yuv420p", - "-b:v", f"{bitrate}k", - "-maxrate", f"{bitrate}k", - "-bufsize", f"{bitrate // 2}k", - "-g", str(gop)] - else: - cmd += ["-c:v", "libx264", "-preset", preset, - "-pix_fmt", "yuv420p", - "-b:v", f"{bitrate}k", - "-minrate", f"{bitrate}k", - "-maxrate", f"{bitrate}k", - "-bufsize", f"{bitrate // 2}k", - "-g", str(gop), "-keyint_min", str(gop), "-sc_threshold", "0"] - if audio_read: - cmd += ["-c:a", "ac3", "-b:a", "192k"] - cmd += ["-max_muxing_queue_size", "1024", - "-mpegts_flags", "+pat_pmt_at_frames+resend_headers+initial_discontinuity", - "-flush_packets", "1", "-f", "mpegts", "pipe:1"] - return cmd - - def audio_feeder(track, fd, stop): CHUNK = int(AUDIO_RATE * 0.02) # 960 samples = 20ms per tick SILENCE = np.zeros((CHUNK, 2), dtype=np.int16) - # Phase 1: wait for the video PTS clock to establish, then snap the audio - # buffer to that position. This discards any audio Dispatcharr pre-buffered - # ahead of realtime before we start constant-rate output. + start = None + written = 0 + snapped = False + was_valid = False + while not stop.is_set(): pts_now = track.audio_pts_now() - if pts_now is not None: + + if pts_now is None: + if was_valid: + # Clock just went None -- reconnect in progress; reset snap state + # so we re-anchor when the new stream establishes its first frame. + snapped = False + start = None + written = 0 + was_valid = False + _write_all(fd, SILENCE.tobytes()) + time.sleep(0.02) + continue + + if not snapped: + # New clock available (startup or post-reconnect): snap audio buffer + # to current video PTS and reset wall-clock counters. track._align_to_pts(pts_now - 0.10) - break - _write_all(fd, SILENCE.tobytes()) - time.sleep(0.02) + start = time.monotonic() + written = 0 + snapped = True + + was_valid = True - # Phase 2: constant wall-clock rate. Smooth output is more important than - # perfect PTS tracking; the reconnect flush in Channel.run() handles the - # stale-audio problem, so wall-clock pacing is safe here. - start = time.monotonic() - written = 0 - while not stop.is_set(): target = int((time.monotonic() - start) * AUDIO_RATE) need = target - written if need > 0: @@ -478,9 +105,7 @@ def main(): audio_pipes = [os.pipe() for _ in audio_chs] audio_read = [r for (r, _w) in audio_pipes] enc_out_r, enc_out_w = os.pipe() - if cfg.get("video_encoder") == "h264_nvenc" and not _nvenc_available(): - sys.exit("h264_nvenc selected but ffmpeg reports no NVENC encoder -- " - "check NVIDIA driver and ffmpeg build") + validate_encoder(cfg.get("video_encoder", "libx264")) cmd = build_encoder_cmd(cfg, out_w, out_h, audio_read) for i, a in enumerate(audio_chs): cmd[-1:-1] = [f"-metadata:s:a:{i}", f"title={a.name}", diff --git a/plugins/multiview/config.py b/plugins/multiview/config.py index e4c5fc3..e5e23e3 100644 --- a/plugins/multiview/config.py +++ b/plugins/multiview/config.py @@ -3,13 +3,6 @@ import json import os -# Constants - -PLUGIN_DB_KEY = "multiview" - -DEFAULT_SERVER_PORT = 9292 -DEFAULT_SERVER_HOST = "127.0.0.1" - def _load_plugin_config() -> dict: config_path = os.path.join(os.path.dirname(__file__), "plugin.json") @@ -22,6 +15,8 @@ def _load_plugin_config() -> dict: _ENCODER_OPTIONS = [ {"value": "libx264", "label": "Software (libx264)"}, {"value": "h264_nvenc", "label": "NVIDIA NVENC (h264_nvenc)"}, + {"value": "h264_qsv", "label": "Intel QSV (h264_qsv)"}, + {"value": "h264_vaapi", "label": "Intel/AMD VAAPI (h264_vaapi)"}, ] @@ -82,10 +77,25 @@ def _load_plugin_config() -> dict: "type": "select", "default": "libx264", "options": [], # populated from _ENCODER_OPTIONS in build_plugin_fields - "description": "Software (libx264) or NVIDIA GPU (h264_nvenc). NVENC requires an NVIDIA GPU with driver support.", + "description": "Software encoder (libx264) or hardware GPU encoder. NVENC requires NVIDIA GPU; QSV/VAAPI require Intel/AMD GPU with /dev/dri support.", } # Per-encoder quality / preset fields +# +# ENCODER_PRESETS maps encoder name -> (valid_preset_set, default_preset). +# server.py imports this for validation; values must stay in sync with the +# option lists in the field builders below. +ENCODER_PRESETS: dict[str, tuple[frozenset, str]] = {} + + +def _register_presets(encoder: str, fields_fn): + """Populate ENCODER_PRESETS from a field builder's options list.""" + for f in fields_fn(): + if f.get("id") == "encoder_preset": + vals = frozenset(o["value"] for o in f.get("options", [])) + ENCODER_PRESETS[encoder] = (vals, f.get("default", "")) + return + def _x264_fields() -> list: return [ @@ -127,11 +137,39 @@ def _nvenc_fields() -> list: ] +def _qsv_fields() -> list: + return [ + { + "id": "encoder_preset", + "label": "Encoder Preset", + "type": "select", "default": "medium", + "options": [ + {"value": "veryfast", "label": "Very Fast (lowest quality)"}, + {"value": "faster", "label": "Faster"}, + {"value": "fast", "label": "Fast"}, + {"value": "medium", "label": "Medium (recommended)"}, + {"value": "slow", "label": "Slow (higher quality)"}, + ], + "description": "QSV encode speed vs quality. Medium is recommended for live multiview.", + }, + ] + + +def _vaapi_fields() -> list: + return [] + + _ENCODER_EXTRA_FIELDS = { "libx264": _x264_fields, "h264_nvenc": _nvenc_fields, + "h264_qsv": _qsv_fields, + "h264_vaapi": _vaapi_fields, } +# Populate ENCODER_PRESETS from the field definitions above. +for _enc, _fn in _ENCODER_EXTRA_FIELDS.items(): + _register_presets(_enc, _fn) + _MULTIVIEW_COUNT_FIELD = { "id": "multiview_count", "label": "Number of Multiview Layouts", @@ -154,6 +192,89 @@ def _nvenc_fields() -> list: ] +def _get_multiview_profile_params() -> str: + """Return the ffmpeg parameters string for the globally-enabled default stream profile.""" + try: + from core.models import CoreSettings, StreamProfile + default_id = CoreSettings.get_default_stream_profile_id() + profile = StreamProfile.objects.filter(id=default_id).first() + return profile.parameters if profile else "" + except Exception: + return "" + + +def _build_warnings_fields(settings: dict) -> list: + """Return warning info fields for the settings page. Empty list = no warnings = section hidden.""" + warnings = [] + + try: + from . import deps as _deps + import platform as _platform + arch = _deps.detect_arch() + if not arch: + warnings.append({ + "id": "_warn_pyav_arch", "label": "Media Engine (PyAV)", "type": "info", + "description": (f"Unsupported CPU architecture ({_platform.machine()}); " + f"PyAV is unavailable, streaming will not work."), + }) + elif not _deps.pyav_status(arch): + warnings.append({ + "id": "_warn_pyav_missing", "label": "Media Engine (PyAV)", "type": "info", + "description": (f"PyAV is NOT installed for {arch}. Run the " + f"'Install PyAV' action below before streaming."), + }) + except Exception as e: + warnings.append({ + "id": "_warn_pyav_unknown", "label": "Media Engine (PyAV)", "type": "info", + "description": f"PyAV status unknown: {e}", + }) + + params = _get_multiview_profile_params() + if params and any(t in params for t in ("-c copy", "-c:a copy", "-codec:a copy", "acodec copy")): + warnings.append({ + "id": "_warn_audio_copy", + "label": "Audio: multi-track will be dropped", + "type": "info", + "description": ( + "The default stream profile uses audio copy (-c copy) without mapping " + "all tracks. Multi-track audio from multiview will be silently dropped " + "-- players will only see one audio track. Fix: create a stream profile " + "that includes '-map 0' or '-map 0:a' and set it as the default." + ), + }) + + encoder = settings.get("video_encoder", "libx264") + if encoder == "libx264": + mv_count = max(1, int(settings.get("multiview_count", 1))) + heavy_layouts = [ + n for n in range(1, mv_count + 1) + if max(2, int(settings.get(f"multiview_{n}_channel_count", 4))) > 3 + ] + if heavy_layouts: + layout_str = ", ".join(f"Layout {n}" for n in heavy_layouts) + warnings.append({ + "id": "_warn_sw_encode", + "label": "Performance: software encoding with 4+ streams", + "type": "info", + "description": ( + f"{layout_str} has more than 3 streams configured with software " + f"encoding (libx264). This is CPU-intensive and may cause dropped " + f"frames or slow-motion output. Enable a hardware encoder " + f"(NVENC, QSV, VAAPI) in Video Settings if available." + ), + }) + + if not warnings: + return [] + + return [{ + "id": "_warnings_header", + "label": "── Warnings ──────────────────────────", + "type": "info", + "description": "Use the refresh button (top-right) or restart Dispatcharr to re-check warnings.", + }] + warnings + + def _get_multiview_channel_ids() -> set: """Return the set of Channel IDs that belong to the Dispatcharr Multiview M3U account.""" try: @@ -400,6 +521,14 @@ def _build_multiview_block(n: int, ch_count: int, selector_type: str = "classic" return fields +_VIDEO_SETTINGS_HEADER = { + "id": "_video_settings_header", + "label": "── Video Settings ───────────────────────", + "type": "info", + "description": "", +} + + def build_plugin_fields(settings: dict) -> list: """Build the full field list based on current settings.""" mv_count = max(1, int(settings.get("multiview_count", 1))) @@ -408,7 +537,9 @@ def build_plugin_fields(settings: dict) -> list: enc_field = dict(_VIDEO_ENCODER_FIELD) enc_field["options"] = _ENCODER_OPTIONS - fields = list(_GLOBAL_FIELDS) + fields = _build_warnings_fields(settings) + fields.append(_VIDEO_SETTINGS_HEADER) + fields.extend(_GLOBAL_FIELDS) fields.append(enc_field) extra_fn = _ENCODER_EXTRA_FIELDS.get(encoder, _x264_fields) diff --git a/plugins/multiview/dispatcharr.py b/plugins/multiview/dispatcharr.py index fe8b77f..a208b5c 100644 --- a/plugins/multiview/dispatcharr.py +++ b/plugins/multiview/dispatcharr.py @@ -49,49 +49,53 @@ def live_stream(channel_id): streams via StreamGenerator with channel_initializing=True (so Dispatcharr's own connect/buffer wait applies). Always removes the client on exit, whether the consumer disconnects (GeneratorExit) or the stream ends. + + If the channel is already managed by live_proxy (e.g. mid-fallback or in its + shutdown grace period), we skip initialize_channel() so we don't reset its + stream selection back to URL 1. We just join the existing channel as a new + client and let live_proxy continue its retry/fallback cycle. """ from apps.proxy.live_proxy.server import ProxyServer from apps.proxy.live_proxy.services.channel_service import ChannelService from apps.proxy.live_proxy.url_utils import get_stream_info_for_switch from apps.proxy.live_proxy.output.ts.generator import StreamGenerator - info = get_stream_info_for_switch(channel_id) - if not info or info.get("error"): - err = (info or {}).get("error", "no stream info") - logger.warning(f"multiview: channel {channel_id} unavailable: {err}") - return - - ok = ChannelService.initialize_channel( - channel_id, - info["url"], - info.get("user_agent"), - transcode=info.get("transcode", False), - stream_profile_value=info.get("stream_profile"), - stream_id=info.get("stream_id"), - m3u_profile_id=info.get("m3u_profile_id"), - stream_name=info.get("stream_name"), - ) - if not ok: - logger.warning(f"multiview: initialize_channel failed for {channel_id}") - return - proxy = ProxyServer.get_instance() - client_id = str(_uuid.uuid4()) client_manager = proxy.client_managers.get(channel_id) + if client_manager is None: - logger.warning(f"multiview: no client manager for channel {channel_id}") - return + info = get_stream_info_for_switch(channel_id) + if not info or info.get("error"): + err = (info or {}).get("error", "no stream info") + logger.warning(f"multiview: channel {channel_id} unavailable: {err}") + return + + ok = ChannelService.initialize_channel( + channel_id, + info["url"], + info.get("user_agent"), + transcode=info.get("transcode", False), + stream_profile_value=info.get("stream_profile"), + stream_id=info.get("stream_id"), + m3u_profile_id=info.get("m3u_profile_id"), + stream_name=info.get("stream_name"), + ) + if not ok: + logger.warning(f"multiview: initialize_channel failed for {channel_id}") + return + + client_manager = proxy.client_managers.get(channel_id) + if client_manager is None: + logger.warning(f"multiview: no client manager for channel {channel_id} after init") + return + + client_id = str(_uuid.uuid4()) client_manager.add_client( client_id, USER_AGENT, user_agent=USER_AGENT, user=None, output_format="mpegts", ) - # client_manager.add_client( - # client_id, CLIENT_IP, - # user_agent=USER_AGENT, user=None, output_format="mpegts", - # ) - try: buffer = proxy.get_buffer(channel_id) except Exception as e: # noqa: BLE001 diff --git a/plugins/multiview/epg.py b/plugins/multiview/epg.py index 940546c..6886887 100644 --- a/plugins/multiview/epg.py +++ b/plugins/multiview/epg.py @@ -43,6 +43,115 @@ def _fmt_xmltv_time(dt) -> str: return utc.strftime("%Y%m%d%H%M%S +0000") +def _emit_custom_props(cp: dict, lines: list) -> None: + """Append XMLTV inner elements derived from a ProgramData.custom_properties dict.""" + if not cp: + return + + for cat in cp.get("categories") or []: + lines.append(f" {html.escape(str(cat))}") + + for kw in cp.get("keywords") or []: + lines.append(f" {html.escape(str(kw))}") + + # episode-num: custom_properties stores 1-based values; xmltv_ns is 0-based. + season = cp.get("season") + episode = cp.get("episode") + if season is not None or episode is not None: + s = (int(season) - 1) if season is not None else "" + e = (int(episode) - 1) if episode is not None else "" + lines.append(f' {s}.{e}.') + if cp.get("onscreen_episode"): + lines.append(f' {html.escape(str(cp["onscreen_episode"]))}') + if cp.get("dd_progid"): + lines.append(f' {html.escape(str(cp["dd_progid"]))}') + for ext_sys in ("thetvdb.com_id", "themoviedb.org_id", "imdb.com_id"): + if cp.get(ext_sys): + tag = ext_sys.replace("_id", "") + lines.append(f' {html.escape(str(cp[ext_sys]))}') + + if cp.get("date"): + lines.append(f" {html.escape(str(cp['date']))}") + if cp.get("country"): + lines.append(f" {html.escape(str(cp['country']))}") + if cp.get("language"): + lines.append(f" {html.escape(str(cp['language']))}") + if cp.get("original_language"): + lines.append(f" {html.escape(str(cp['original_language']))}") + + if cp.get("icon"): + lines.append(f' ') + + for img in cp.get("images") or []: + attrs = "" + for attr in ("type", "size", "orient", "system"): + if img.get(attr): + attrs += f' {attr}="{html.escape(str(img[attr]))}"' + if img.get("url"): + lines.append(f" {html.escape(str(img['url']))}") + + if cp.get("rating"): + sys_attr = f' system="{html.escape(str(cp["rating_system"]))}"' if cp.get("rating_system") else "" + lines.append(f" {html.escape(str(cp['rating']))}") + for sr in cp.get("star_ratings") or []: + sys_attr = f' system="{html.escape(str(sr["system"]))}"' if sr.get("system") else "" + lines.append(f" {html.escape(str(sr.get('value', '')))}") + + if cp.get("previously_shown"): + details = cp.get("previously_shown_details") or {} + attrs = "" + if details.get("start"): + attrs += f' start="{html.escape(str(details["start"]))}"' + if details.get("channel"): + attrs += f' channel="{html.escape(str(details["channel"]))}"' + lines.append(f" ") + if cp.get("premiere"): + text = cp.get("premiere_text") or "" + lines.append(f" {html.escape(text)}" if text else " ") + if cp.get("new"): + lines.append(" ") + if cp.get("live"): + lines.append(" ") + if cp.get("last_chance"): + text = cp.get("last_chance_text") or "" + lines.append(f" {html.escape(text)}" if text else " ") + + length = cp.get("length") + if length and length.get("value"): + units_attr = f' units="{html.escape(str(length["units"]))}"' if length.get("units") else "" + lines.append(f" {html.escape(str(length['value']))}") + + video = cp.get("video") + if video: + lines.append(" ") + + audio = cp.get("audio") + if audio: + lines.append(" ") + + for sub in cp.get("subtitles") or []: + type_attr = f' type="{html.escape(str(sub["type"]))}"' if sub.get("type") else "" + lang = html.escape(str(sub["language"])) if sub.get("language") else "" + inner = f"{lang}" if lang else "" + lines.append(f" {inner}") + + for review in cp.get("reviews") or []: + attrs = "" + for attr in ("type", "source", "reviewer"): + if review.get(attr): + attrs += f' {attr}="{html.escape(str(review[attr]))}"' + content = html.escape(str(review.get("content", ""))) + lines.append(f" {content}") + + def _build_xmltv(settings: dict, mv_count: int, window_start, window_end) -> str: chunk = timedelta(hours=4) @@ -72,6 +181,7 @@ def _build_xmltv(settings: dict, mv_count: int, window_start, window_end) -> str .order_by("start_time") ) if programs: + ch_list = ", ".join(resolve_channel_names(settings, n)) for prog in programs: lines.append( f' str lines.append(f" {html.escape(prog.title)}") if prog.sub_title: lines.append(f" {html.escape(prog.sub_title)}") - if prog.description: - lines.append(f" {html.escape(prog.description)}") + desc = prog.description or "" + if ch_list: + desc = (desc + "\n(" + ch_list + ")") if desc else "(" + ch_list + ")" + if desc: + lines.append(f" {html.escape(desc)}") + _emit_custom_props(prog.custom_properties or {}, lines) lines.append(" ") forwarded = True except Exception: diff --git a/plugins/multiview/ffmpeg.py b/plugins/multiview/ffmpeg.py deleted file mode 100644 index 97ed32c..0000000 --- a/plugins/multiview/ffmpeg.py +++ /dev/null @@ -1,29 +0,0 @@ -"""Output media-settings helpers (resolution / frame rate). - -The actual decode/compose/encode pipeline lives in compositor_worker.py (PyAV + -a libx264 ffmpeg subprocess). This module just parses the user's output settings -for server._worker_config. -""" - -_FPS_CHOICES = {"24", "25", "30", "50", "60", "30000/1001", "60000/1001"} - - -def fps_string(settings: dict) -> str: - """Output/sampling frame rate as a string (validated against the choices).""" - v = str(settings.get("output_fps") or "30") - return v if v in _FPS_CHOICES else "30" - - -def fps_value(fps: str) -> float: - if "/" in fps: - a, b = fps.split("/") - return float(a) / float(b) - return float(fps) - - -def _parse_resolution(settings: dict) -> tuple: - try: - w, h = (int(x) for x in (settings.get("output_resolution") or "1920x1080").split("x")) - return w, h - except Exception: - return 1920, 1080 diff --git a/plugins/multiview/parameters.py b/plugins/multiview/parameters.py new file mode 100644 index 0000000..3a3188f --- /dev/null +++ b/plugins/multiview/parameters.py @@ -0,0 +1,134 @@ +"""FFmpeg encoder parameter construction and validation for the compositor worker.""" + +import glob +import subprocess +import sys +from fractions import Fraction + +try: + from .config import ENCODER_PRESETS +except ImportError: + from config import ENCODER_PRESETS # script context (compositor_worker.py) + +# Must match channel.py AUDIO_RATE. +AUDIO_RATE = 48000 + + +def fps_fraction(fps: str) -> Fraction: + if "/" in fps: + a, b = fps.split("/") + return Fraction(int(a), int(b)) + return Fraction(int(fps), 1) + + +def _find_dri_device() -> str: + devices = sorted(glob.glob("/dev/dri/render*")) + return devices[0] if devices else "/dev/dri/renderD128" + + +def _encoder_available(name: str) -> bool: + try: + r = subprocess.run( + ["ffmpeg", "-hide_banner", "-encoders"], + capture_output=True, text=True, timeout=5, + ) + return name in r.stdout + except Exception: + return False + + +def resolve_preset(encoder: str, saved) -> str: + """Return a validated preset string for the given encoder.""" + valid, default = ENCODER_PRESETS.get(encoder, (frozenset(), "ultrafast")) + return saved if (valid and saved in valid) else default + + +def validate_encoder(encoder: str) -> None: + """sys.exit if the selected hardware encoder is unavailable.""" + _checks = { + "h264_nvenc": ("NVENC", "check NVIDIA driver and ffmpeg build"), + "h264_qsv": ("QSV", "check Intel driver and ffmpeg build"), + "h264_vaapi": ("VAAPI", "check GPU driver and ffmpeg build"), + } + if encoder in _checks and not _encoder_available(encoder): + name, hint = _checks[encoder] + sys.exit(f"{encoder} selected but ffmpeg reports no {name} encoder -- {hint}") + + +def build_encoder_cmd(cfg, out_w, out_h, audio_read) -> list: + bitrate = int(cfg.get("bitrate", 8000)) + gop = max(2, round(float(fps_fraction(cfg["fps"])) * 2)) + encoder = cfg.get("video_encoder", "libx264") + preset = resolve_preset(encoder, cfg.get("preset")) + + cmd = ["ffmpeg", "-hide_banner", "-loglevel", "error"] + + # Hardware device init must precede inputs. + if encoder == "h264_vaapi": + cmd += ["-vaapi_device", _find_dri_device()] + elif encoder == "h264_qsv": + cmd += ["-init_hw_device", f"qsv=hw:{_find_dri_device()}", "-filter_hw_device", "hw"] + + # Cap muxer/filter threads so it doesn't grab every core and starve + # the PyAV decoders (3x 1080p60 decode already loads the box). + cmd += ["-threads", str(cfg.get("enc_threads", 4)), + "-f", "rawvideo", "-pix_fmt", "yuv420p", "-s", f"{out_w}x{out_h}", + "-r", cfg["fps"], "-thread_queue_size", "512", "-i", "pipe:0"] + for r in audio_read: + cmd += ["-f", "s16le", "-ar", str(AUDIO_RATE), "-ac", "2", + "-thread_queue_size", "512", "-i", f"pipe:{r}"] + cmd += ["-map", "0:v:0"] + for i in range(len(audio_read)): + cmd += ["-map", f"{i + 1}:a:0"] + + # VBV CBR: constant bitrate regardless of content complexity. CRF (VBR) + # produces near-zero bitrate for static/logo content; IPTV players drain + # their receive buffer faster than realtime when the rate is very low, + # causing fast-forward. CBR pads with filler NAL units to hold constant + # rate. bufsize = 0.5x target keeps encode latency low. + # -muxrate is NOT used: CBR already guarantees constant output rate; + # -muxrate adds MPEG-TS null packets that shift the PCR clock away from + # video PTS, causing player sync issues. + if encoder == "h264_nvenc": + # NVENC CBR via -rc cbr (pads with filler NAL units, same guarantee as + # x264 CBR). -minrate, -keyint_min, -sc_threshold are x264-only. + cmd += ["-c:v", "h264_nvenc", "-preset", preset, + "-rc", "cbr", + "-pix_fmt", "yuv420p", + "-b:v", f"{bitrate}k", + "-maxrate", f"{bitrate}k", + "-bufsize", f"{bitrate // 2}k", + "-g", str(gop)] + elif encoder == "h264_vaapi": + # VAAPI CBR: yuv420p input must be converted to nv12 before hwupload. + # -rc_mode CBR enforces constant rate; driver pads output to hold bitrate. + cmd += ["-vf", "format=nv12,hwupload", + "-c:v", "h264_vaapi", + "-rc_mode", "CBR", + "-b:v", f"{bitrate}k", + "-maxrate", f"{bitrate}k", + "-bufsize", f"{bitrate // 2}k", + "-g", str(gop)] + elif encoder == "h264_qsv": + # QSV CBR: hwupload sends software frames to QSV device initialized above. + cmd += ["-vf", "format=nv12,hwupload=extra_hw_frames=64", + "-c:v", "h264_qsv", + "-preset", preset, + "-b:v", f"{bitrate}k", + "-maxrate", f"{bitrate}k", + "-bufsize", f"{bitrate // 2}k", + "-g", str(gop)] + else: + cmd += ["-c:v", "libx264", "-preset", preset, + "-pix_fmt", "yuv420p", + "-b:v", f"{bitrate}k", + "-minrate", f"{bitrate}k", + "-maxrate", f"{bitrate}k", + "-bufsize", f"{bitrate // 2}k", + "-g", str(gop), "-keyint_min", str(gop), "-sc_threshold", "0"] + if audio_read: + cmd += ["-c:a", "ac3", "-b:a", "192k"] + cmd += ["-max_muxing_queue_size", "1024", + "-mpegts_flags", "+pat_pmt_at_frames+resend_headers+initial_discontinuity", + "-flush_packets", "1", "-f", "mpegts", "pipe:1"] + return cmd diff --git a/plugins/multiview/plugin.json b/plugins/multiview/plugin.json index 1983845..068eda5 100644 --- a/plugins/multiview/plugin.json +++ b/plugins/multiview/plugin.json @@ -4,7 +4,7 @@ "description": "Tile multiple Dispatcharr channel streams into multi-view outputs using FFmpeg", "author": "sethwv", - "version": "0.2.2", + "version": "0.2.3", "min_dispatcharr_version": "v0.27.0", "discord_thread": "https://discord.com/channels/1340492560220684331/1509200002407465001", diff --git a/plugins/multiview/server.py b/plugins/multiview/server.py index 8e8e977..b24e343 100644 --- a/plugins/multiview/server.py +++ b/plugins/multiview/server.py @@ -26,169 +26,29 @@ from . import dispatcharr as _dispatcharr from . import layouts as _layouts -from .ffmpeg import _parse_resolution, fps_string +from .parameters import resolve_preset logger = logging.getLogger(__name__) -CHUNK_SIZE = 65536 -_WORKER = os.path.join(os.path.dirname(os.path.abspath(__file__)), "compositor_worker.py") +_FPS_CHOICES = {"24", "25", "30", "50", "60", "30000/1001", "60000/1001"} -_server_instance = None -_mv_keepalives: dict = {} - - - -def _mv_keepalive_ensure(channel_id: str, proxy_server) -> "str | None": - """Register keepalive client (if absent), increment ref-count, and spawn drainer.""" - import time as _t - import uuid as _u - if channel_id in _mv_keepalives: - cid, last, refs, drainer = _mv_keepalives[channel_id] - _mv_keepalives[channel_id] = (cid, last, refs + 1, drainer) - return cid - mgr = proxy_server.client_managers.get(channel_id) - if mgr is None: - return None - cid = str(_u.uuid4()) - try: - if mgr.add_client(cid, "127.0.0.1", "multiview-keepalive", None, "mpegts", None): - drainer = None - try: - import gevent as _gv_ka - drainer = _gv_ka.spawn(_mv_keepalive_drain, channel_id, cid, proxy_server) - except Exception: - pass - _mv_keepalives[channel_id] = (cid, _t.time(), 1, drainer) - logger.info(f"Keepalive registered for {channel_id}") - return cid - except Exception as e: - logger.warning(f"Keepalive register failed for {channel_id}: {e}") - return None +def fps_string(settings: dict) -> str: + v = str(settings.get("output_fps") or "30") + return v if v in _FPS_CHOICES else "30" -def _mv_keepalive_drain(channel_id: str, cid: str, proxy_server) -> None: - """Background greenlet: drain data for the keepalive client to prevent - Dispatcharr ghost-removal (last_active must stay fresh). - Only runs a StreamGenerator when ref_count == 0 (no active streaming - consumer). While the streaming client is connected, it sleeps cheaply — - the streaming client's own generator keeps last_active current.""" +def _parse_resolution(settings: dict) -> tuple: try: - import gevent as _gv - from apps.proxy.live_proxy.output.ts.generator import StreamGenerator as _SG - except ImportError: - return - - while True: - entry = _mv_keepalives.get(channel_id) - if entry is None or entry[0] != cid: - return - - _, _, refs, _ = entry - if refs > 0: - # Streaming client is active; sleep, then refresh keepalive's - # last_active so Dispatcharr's ghost detector doesn't remove it. - _gv.sleep(1.0) - try: - import time as _t_ka - _mgr_ka = proxy_server.client_managers.get(channel_id) - if _mgr_ka and _mgr_ka.redis_client: - _mgr_ka.redis_client.hset( - f"live:channel:{channel_id}:clients:{cid}", - "last_active", str(_t_ka.time()), - ) - except Exception: - pass - continue - - # refs == 0: no streaming consumer — drain to keep last_active fresh. - buf = proxy_server.get_buffer(channel_id) - if buf is None: - _gv.sleep(0.5) - continue - - mgr = proxy_server.client_managers.get(channel_id) - if mgr is None: - _gv.sleep(0.5) - continue - - # Re-register if StreamGenerator._cleanup() removed the slot last iteration. - mgr.add_client(cid, "127.0.0.1", "multiview-keepalive", None, "mpegts", None) - - try: - gen = _SG( - channel_id=channel_id, - client_id=cid, - client_ip="127.0.0.1", - client_user_agent="multiview-keepalive", - channel_initializing=False, - buffer=buf, - ) - for _chunk in gen.generate(): - _gv.sleep(0) - entry = _mv_keepalives.get(channel_id) - if entry is None or entry[0] != cid: - return - if entry[2] > 0: - # Streaming client reconnected; stop draining. - break - except Exception: - _gv.sleep(0.1) - - -def _mv_keepalive_release(channel_id: str) -> None: - """Decrement ref-count; TTL starts counting when it reaches zero.""" - import time as _t - if channel_id in _mv_keepalives: - cid, _, refs, drainer = _mv_keepalives[channel_id] - _mv_keepalives[channel_id] = (cid, _t.time(), max(0, refs - 1), drainer) - - -def _mv_keepalive_cleanup(proxy_server, ttl: float = 30.0) -> None: - """Remove keepalive clients idle (ref_count==0) longer than ttl seconds. - Kills drainer greenlet and removes Dispatcharr client slot.""" - import time as _t - now = _t.time() - stale = [ - u for u, (_, last, refs, _drainer) in list(_mv_keepalives.items()) - if refs == 0 and now - last > ttl - ] - for uid in stale: - entry = _mv_keepalives.pop(uid, None) - if entry: - cid, _, _, drainer = entry - try: - if drainer is not None: - drainer.kill(block=False) - except Exception: - pass - try: - mgr = proxy_server.client_managers.get(uid) - if mgr: - mgr.remove_client(cid) - logger.info(f"Keepalive expired and removed for {uid}") - except Exception: - pass + w, h = (int(x) for x in (settings.get("output_resolution") or "1920x1080").split("x")) + return w, h + except Exception: + return 1920, 1080 +CHUNK_SIZE = 65536 +_WORKER = os.path.join(os.path.dirname(os.path.abspath(__file__)), "compositor_worker.py") -def _mv_keepalive_shutdown(proxy_server) -> None: - """Kill all drainer greenlets and remove all keepalive clients immediately. - Called when the multiview composition stops to avoid channels lingering.""" - entries = list(_mv_keepalives.items()) - _mv_keepalives.clear() - for uid, (cid, _, _, drainer) in entries: - try: - if drainer is not None: - drainer.kill(block=False) - except Exception: - pass - try: - mgr = proxy_server.client_managers.get(uid) - if mgr: - mgr.remove_client(cid) - logger.info(f"Keepalive shutdown: removed {uid}") - except Exception: - pass +_server_instance = None def _python_exe() -> str: @@ -233,11 +93,8 @@ def _lang_code(name: str) -> str: words = clean.split() if len(words) <= 1: return ((words[0] if words else "unk") + " ")[:3].lower() - sig = [w for w in words if len(w) >= 2 and w.isupper()] - if sig: - return ("".join(sig) + " ")[:3].lower() - initials = "".join(w[0] for w in words if w) - return (initials + " ")[:3].lower() + parts = [w if (w.isupper() and len(w) >= 2) else w[0] for w in words] + return ("".join(parts) + " ")[:3].lower() def _deduplicate_lang_codes(names: list) -> list: @@ -256,23 +113,6 @@ def _deduplicate_lang_codes(names: list) -> list: return result -def _audio_metadata_args(audio_source: str, channel_names: list, n: int) -> list: - args = [] - if audio_source == "all": - lang_codes = _deduplicate_lang_codes(channel_names or []) - for i, (name, code) in enumerate(zip(channel_names or [], lang_codes)): - args += [f"-metadata:s:a:{i}", f"title={name}", - f"-metadata:s:a:{i}", f"language={code}"] - else: - audio_idx = int(audio_source) if str(audio_source).isdigit() else 0 - audio_idx = max(0, min(audio_idx, n - 1)) - if channel_names and audio_idx < len(channel_names): - name = channel_names[audio_idx] - args += ["-metadata:s:a:0", f"title={name}", - "-metadata:s:a:0", f"language={_lang_code(name)}"] - return args - - def _channel_logo(ch) -> "str | None": """URL or path for the channel's logo, passable to av.open().""" try: @@ -388,14 +228,7 @@ def _worker_config(self, tiles, layout, audio_source, settings) -> dict: }) encoder = settings.get("video_encoder") or "libx264" - _nvenc_presets = {"p1", "p2", "p3", "p4", "p5", "p6", "p7"} - _x264_presets = {"ultrafast", "superfast", "veryfast", "faster", "fast", "medium", "slow"} - if encoder == "h264_nvenc": - saved = settings.get("encoder_preset") - preset = saved if saved in _nvenc_presets else "p4" - else: - saved = settings.get("encoder_preset") - preset = saved if saved in _x264_presets else "ultrafast" + preset = resolve_preset(encoder, settings.get("encoder_preset")) return { "out_w": out_w, "out_h": out_h, "fps": fps_string(settings), "bitrate": int(settings.get("output_bitrate") or 8000), @@ -441,16 +274,28 @@ def _serve_realsrc(self, raw_id: str, start_response): start_response("404 Not Found", [("Content-Type", "text/plain")]) return [b"Unknown channel\n"] - gen = _dispatcharr.live_stream(channel_uuid) - try: - first = next(gen) - except StopIteration: - # live_stream returned without yielding -- proxy not ready yet. - # Return 503 so the compositor worker sees a clean HTTP error and - # retries via its backoff, rather than "Invalid data found when - # processing input" from a 200 with an empty body. - start_response("503 Service Unavailable", [("Content-Type", "text/plain")]) - return [b"Channel not ready\n"] + # Retry briefly while the live_proxy warms up the channel after container + # start. live_stream() cleans up its client registration in its finally + # block before raising StopIteration, so retrying is safe. Keep retries + # low (2) to avoid spamming logs when a channel is genuinely unavailable. + first = None + gen = None + for attempt in range(3): + gen = _dispatcharr.live_stream(channel_uuid) + try: + first = next(gen) + break + except StopIteration: + if attempt < 2: + try: + import gevent as _gv + _gv.sleep(1.0) + except ImportError: + import time as _t + _t.sleep(1.0) + else: + start_response("503 Service Unavailable", [("Content-Type", "text/plain")]) + return [b"Channel not ready\n"] start_response("200 OK", [ ("Content-Type", "video/mp2t"), @@ -533,32 +378,37 @@ def start(self) -> bool: sock.bind((self.host, self.port)) sock.close() except OSError as e: - logger.error(f"Cannot bind to {self.host}:{self.port}: {e}") + logger.info(f"Multiview: port {self.port} already taken, skipping ({e})") return False try: from gevent import pywsgi import gevent as _gevent - - def _run(): - try: - self._server = pywsgi.WSGIServer( - (self.host, self.port), self.wsgi_app, log=None, - ) - self.running = True - set_server(self) - self._server.serve_forever() - except Exception as e: # noqa: BLE001 - logger.error(f"Multiview server crashed: {e}", exc_info=True) - finally: - self.running = False - - self._greenlet = _gevent.spawn(_run) - return True except ImportError: logger.error("gevent is not installed; cannot start multiview server") return False + def _run(): + try: + self._server = pywsgi.WSGIServer( + (self.host, self.port), self.wsgi_app, log=None, + ) + self.running = True + set_server(self) + self._server.serve_forever() + except OSError as e: + # EADDRINUSE here means a concurrent worker won the race between + # our test-bind above and this re-bind -- expected on multi-worker + # startup, not an error. + logger.info(f"Multiview: port {self.port} taken by concurrent worker ({e})") + except Exception as e: # noqa: BLE001 + logger.error(f"Multiview server crashed: {e}", exc_info=True) + finally: + self.running = False + + self._greenlet = _gevent.spawn(_run) + return True + def stop(self): if self._server: try: