diff --git a/src/exportmi.py b/src/exportmi.py index 90ef45ba7..72ee2eba8 100644 --- a/src/exportmi.py +++ b/src/exportmi.py @@ -1,5 +1,6 @@ # Upload Assistant © 2025 Audionut & wastaken7 — Licensed under UAPL v1.0 import asyncio +import glob import json import os import platform @@ -489,7 +490,17 @@ def filter_mediainfo(data: dict[str, Any]) -> dict[str, Any]: # Use standard MediaInfo library for non-DVD or when specialized CLI not available media_info_json = MediaInfo.parse(video, output="JSON") media_info_dict = json.loads(media_info_json) - + + # .VOB must be parsed for scan type on DVD's as .IFO does not contain that field + # from the older DVD-compatible version of MediaInfo + if is_dvd: + scan_from_vob = await _probe_vob_scantype(video, mediainfo_cmd, debug=debug) + if scan_from_vob: + for track in media_info_dict.get("media", {}).get("track", []): + if track.get("@type") == "Video" and not track.get("ScanType"): + track["ScanType"] = scan_from_vob + break + filtered_info = filter_mediainfo(media_info_dict) async with aiofiles.open(f"{base_dir}/tmp/{folder_id}/MediaInfo.json", "w", encoding="utf-8") as export: @@ -511,6 +522,64 @@ def filter_mediainfo(data: dict[str, Any]) -> dict[str, Any]: return mi +async def _probe_vob_scantype(video: str, mediainfo_cmd: str | None, debug: bool = False) -> str: + """Return ScanType from the largest .VOB in the same dir as `video`, or "." (current directory)""" + try: + folder = os.path.dirname(video) or "." + # try heuristic sibling first, then glob fallback + base = os.path.basename(video) + candidates = [] + for candidate in (base.replace('_0.IFO', '_1.VOB'), base.replace('.IFO', '.VOB')): + p = os.path.join(folder, candidate) + if os.path.exists(p): + candidates.append(p) + if not candidates: + candidates = glob.glob(os.path.join(folder, "*.VOB")) + if not candidates: + return "" + + # choose largest VOB (most likely main) + chosen = max(candidates, key=lambda p: os.path.getsize(p) if os.path.exists(p) else 0) + + # try CLI first (faster/more consistent), else pymediainfo + json_text = None + if mediainfo_cmd: + try: + proc = await asyncio.to_thread(subprocess.run, [mediainfo_cmd, "--Output=JSON", chosen], + capture_output=True, text=True, timeout=30) + if proc.returncode == 0 and proc.stdout: + json_text = proc.stdout + except Exception: + json_text = None + + if not json_text: + try: + json_text = MediaInfo.parse(chosen, output="JSON") + except Exception: + json_text = None + + if not json_text: + return "" + + mi = json.loads(json_text) if isinstance(json_text, str) else json_text + for track in mi.get("media", {}).get("track", []): + if track.get("@type") == "Video": + val = track.get("ScanType") or track.get("scanType") or "" + # handle nested dicts + if isinstance(val, dict): + for key in ("#text", "text", "value", "@value", "Name", "String"): + if key in val and val[key]: + val = val[key] + break + else: + val = "" + return str(val) if val else "" + except Exception: + if debug: + console.print("[yellow]Probe VOB scantype failed[/yellow]") + return "" + + def validate_mediainfo(meta: dict[str, Any], debug: bool, settings: bool = False) -> bool: if not any(str(f).lower().endswith(".mkv") for f in meta.get("filelist", [])): if debug: