diff --git a/deps.txt b/deps.txt new file mode 100644 index 0000000..59ac1da --- /dev/null +++ b/deps.txt @@ -0,0 +1,83 @@ +# Lufus External Dependencies (Non-Python) + +## System Privileges +- pkexec (polkit) +- sudo +- runuser (util-linux) + +## Flashing & Imaging +- dd (coreutils) +- sync (coreutils) +- cp (coreutils) +- rm (coreutils) +- mkdir (coreutils) + +## Partitioning & Disk Info +- lsblk (util-linux) +- parted +- sfdisk (util-linux) +- partprobe (parted) +- udevadm (systemd/udev) +- wipefs (util-linux) +- blockdev (util-linux) +- badblocks (e2fsprogs) + +## Filesystem Tools +- mkfs.vfat, fatlabel (dosfstools) +- mkfs.exfat (exfatprogs or exfat-utils) +- mkfs.ntfs / mkntfs, ntfslabel (ntfs-3g) +- mkfs.ext4, e2label (e2fsprogs) +- mkudffs, udflabel (udftools) + +## Mounting & ISO Handling +- mount (util-linux) +- umount (util-linux) +- 7z (p7zip-full) - Used for Windows ISO detection +- isoinfo (genisoimage) - Fallback for Windows ISO detection + +## Process & System Management +- lsof +- fuser (psmisc) +- pgrep (procps) +- which (debianutils or similar) +- stty (coreutils) +- xdg-user-dir (xdg-user-dirs) +- xdg-open (xdg-utils) + +## Windows Image Operations (WIM) +- wimlib-imagex (wimlib / wimtools) +- wimmountrw (wimlib) +- wimunmount (wimlib) + +## Bootloader Installation +- grub-install (grub2 / grub-common) + +## GUI & System Libraries (PyQt6 Requirements) +- libgl1 +- libx11-6 +- libxcb1 +- libxrender1 +- fontconfig +- libfreetype6 +- libxext6 +- libxrandr2 +- libxcursor1 +- libxi6 +- libxfixes3 +- libxcomposite1 +- libxdamage1 + +## Package Managers (Used for auto-installing missing tools) +- apt-get (Debian/Ubuntu) +- dnf (Fedora/RHEL) +- pacman (Arch) +- zypper (openSUSE) + +## Python Environment +- python3 +- python3-pip +- python3-venv + +## Other (from requirements-system.txt) +- wget +- file diff --git a/src/lufus/drives/find_usb.py b/src/lufus/drives/find_usb.py index f748656..999d3d6 100644 --- a/src/lufus/drives/find_usb.py +++ b/src/lufus/drives/find_usb.py @@ -1,6 +1,6 @@ import psutil import os -import subprocess +import pyudev import getpass from lufus import state from lufus.lufus_logging import get_logger @@ -43,6 +43,7 @@ def find_usb() -> dict[str, str]: dir_set = set(all_directories) # Check each partition to see if it matches our potential mount points + context = pyudev.Context() for part in psutil.disk_partitions(all=True): if part.mountpoint not in dir_set: continue @@ -53,12 +54,11 @@ def find_usb() -> dict[str, str]: label = None try: - label = subprocess.check_output( - ["lsblk", "-d", "-n", "-o", "LABEL", device_node], - text=True, - timeout=5, - ).strip() - except (subprocess.CalledProcessError, subprocess.TimeoutExpired): + # Using os.stat to get device number as per requirements + st = os.stat(device_node) + device = pyudev.Devices.from_device_number(context, "block", st.st_rdev) + label = device.get("ID_FS_LABEL") + except Exception: pass if not label: diff --git a/src/lufus/drives/get_usb_info.py b/src/lufus/drives/get_usb_info.py index f9c7d03..10e9ef2 100644 --- a/src/lufus/drives/get_usb_info.py +++ b/src/lufus/drives/get_usb_info.py @@ -1,6 +1,6 @@ import psutil import os -import subprocess +import pyudev from typing import TypedDict from lufus.lufus_logging import get_logger @@ -25,20 +25,20 @@ def get_usb_info(usb_path: str) -> USBDeviceInfo | None: log.warning("Could not find device node for USB path: %s", usb_path) return None - size_output = subprocess.check_output( - ["lsblk", "-d", "-n", "-b", "-o", "SIZE", device_node], - text=True, - timeout=5, - ).strip() - - usb_size = int(size_output) if size_output.isdigit() else 0 - if not size_output.isdigit(): - log.warning("Could not parse device size: %r", size_output) + context = pyudev.Context() + # Using os.stat to get device number as per requirements + st = os.stat(device_node) + device = pyudev.Devices.from_device_number(context, "block", st.st_rdev) + + # Size in bytes: udev attributes 'size' is in 512-byte sectors + size_attr = device.attributes.get("size") + usb_size = int(size_attr) * 512 if size_attr else 0 + + label = device.get("ID_FS_LABEL") if usb_size > 32 * 1024**3: log.warning("USB device is large (%d bytes); confirm before flashing.", usb_size) - label = subprocess.check_output(["lsblk", "-d", "-n", "-o", "LABEL", device_node], text=True, timeout=5).strip() if not label: label = os.path.basename(usb_path) @@ -49,15 +49,9 @@ def get_usb_info(usb_path: str) -> USBDeviceInfo | None: } log.info("USB Info: %s", usb_info) return usb_info - except subprocess.TimeoutExpired as e: - log.error("Timed out getting USB info for %s: %s", usb_path, e) - return None except PermissionError: log.error("Permission denied when trying to get USB info: %s", usb_path) return None - except subprocess.CalledProcessError as e: - log.error("Error getting USB info: %s", e) - return None except Exception as err: log.error("Unexpected error getting USB info: %s", err) return None diff --git a/src/lufus/writing/flash_usb.py b/src/lufus/writing/flash_usb.py index 50e038d..2cd0437 100644 --- a/src/lufus/writing/flash_usb.py +++ b/src/lufus/writing/flash_usb.py @@ -51,6 +51,8 @@ def _status(msg: str) -> None: try: iso_size = os.path.getsize(iso_path) _status(f"File size: {iso_size:,} bytes ({iso_size / (1024**3):.2f} GiB)") + if progress_cb: + progress_cb(2) if iso_path.lower().endswith(".iso"): _status(f"Validating ISO9660 signature for: {iso_path}") @@ -62,8 +64,15 @@ def _status(msg: str) -> None: else: _status(f"Not an ISO file ({os.path.basename(iso_path)}), skipping ISO signature check") + if progress_cb: + progress_cb(5) + _status("Checking if image contains installation markers...") - if is_windows_iso(iso_path): + iso_type = detect_iso_type(iso_path) + if progress_cb: + progress_cb(8) + + if iso_type == IsoType.WINDOWS: _status("Windows Installation media detected, routing to flash_windows (ISO mode)") return flash_windows( device, @@ -73,12 +82,14 @@ def _status(msg: str) -> None: status_cb=status_cb, ) - iso_type = detect_iso_type(iso_path) if iso_type == IsoType.LINUX: _status("Linux Installation media detected, will use dd for flashing") else: _status("Generic or unknown image, will use dd for flashing") + if progress_cb: + progress_cb(10) + dd_args = [ "dd", f"if={iso_path}", @@ -93,7 +104,11 @@ def _status(msg: str) -> None: _status(f"Writing {iso_size:,} bytes to {shlex.quote(device)}, this may take several minutes...") try: - process = subprocess.Popen(dd_args, stderr=subprocess.PIPE, stdout=subprocess.DEVNULL) + # Use LC_ALL=C to ensure "bytes" is the keyword for progress parsing + # and set a consistent output format across different locales. + env = os.environ.copy() + env["LC_ALL"] = "C" + process = subprocess.Popen(dd_args, stderr=subprocess.PIPE, stdout=subprocess.DEVNULL, env=env) except FileNotFoundError: log.error("Flash failed: 'dd' utility not found. Install coreutils.") _status("Flash failed: 'dd' utility not found. Install coreutils.") @@ -104,12 +119,22 @@ def _status(msg: str) -> None: buf = b"" last_pct = -1 while True: - chunk = process.stderr.readline() + # Read in small chunks to handle \r progress updates from dd without blocking + # until a newline (\n) is received. status=progress usually emits \r. + try: + chunk = process.stderr.read(128) + except Exception as e: + log.warning("Error reading dd stderr: %s", e) + break + if not chunk: break buf += chunk + # Split by \r or \n to catch all progress updates parts = re.split(rb"[\r\n]", buf) + # The last part might be incomplete, keep it in the buffer buf = parts[-1] + for line in parts[:-1]: line = line.strip() if not line: @@ -117,14 +142,20 @@ def _status(msg: str) -> None: m = re.match(rb"^(\d+)\s+bytes", line) if m and iso_size > 0: bytes_done = int(m.group(1)) - pct = min(int(bytes_done * 100 / iso_size), 99) + # Scale progress to 10-95% range to leave room for early steps and final sync + pct_raw = min(int(bytes_done * 100 / iso_size), 100) + pct = 10 + int(pct_raw * 0.85) + if pct != last_pct: - _status(f"dd progress: {bytes_done:,} / {iso_size:,} bytes ({pct}%)") + _status(f"dd progress: {bytes_done:,} / {iso_size:,} bytes ({pct_raw}%)") last_pct = pct if progress_cb: progress_cb(pct) else: - log.warning("dd stderr: %s", line.decode("utf-8", errors="replace")) + # Filter out common dd output lines to avoid logging noise + line_str = line.decode("utf-8", errors="replace") + if not any(x in line_str for x in ["records in", "records out", "copied"]): + log.warning("dd stderr: %s", line_str) process.wait() _status(f"dd process exited with return code {process.returncode}") diff --git a/src/lufus/writing/install_ventoy.py b/src/lufus/writing/install_ventoy.py index 1171d8a..16cb6de 100644 --- a/src/lufus/writing/install_ventoy.py +++ b/src/lufus/writing/install_ventoy.py @@ -162,7 +162,7 @@ def install_grub(target_device: str) -> bool: # Refresh kernel table subprocess.run(["partprobe", target_device], check=False) subprocess.run(["udevadm", "settle"], check=False) - subprocess.run(["sync"], check=True) + os.sync() # Wait for device nodes to be created by udev efi_part = f"{target_device}{sep}2" diff --git a/src/lufus/writing/windows/flash.py b/src/lufus/writing/windows/flash.py index a68895f..690915f 100644 --- a/src/lufus/writing/windows/flash.py +++ b/src/lufus/writing/windows/flash.py @@ -71,12 +71,12 @@ def _fix_efi_bootloader(efi_mount): log.info("EFI bootloader fix: BOOTX64.EFI not found, will attempt to create at %s", boot_dir) bootx64 = os.path.join(boot_dir, "BOOTX64.EFI") - run_cmd(["sudo", "mkdir", "-p", boot_dir]) + os.makedirs(boot_dir, exist_ok=True) log.info("EFI bootloader fix: created directory %s", boot_dir) src = _find_path_case_insensitive(efi_mount, "EFI", "Microsoft", "Boot", "bootmgfw.efi") if src: - run_cmd(["sudo", "cp", src, bootx64]) + shutil.copy2(src, bootx64) log.info("EFI bootloader fix: copied %s -> %s", src, bootx64) return @@ -155,7 +155,7 @@ def _copy_file(src: str, dst: str) -> str: def _find_ntfs_tool(status_cb=None) -> str | None: """Find mkfs.ntfs/mkntfs, installing ntfs-3g if needed. Returns command name or None.""" for candidate in ["mkfs.ntfs", "mkntfs"]: - if subprocess.run(["which", candidate], capture_output=True).returncode == 0: + if shutil.which(candidate): return candidate if status_cb: @@ -167,13 +167,13 @@ def _find_ntfs_tool(status_cb=None) -> str | None: ["zypper", "install", "-y", "ntfs-3g"], ] for pm_cmd in pkg_managers: - if subprocess.run(["which", pm_cmd[0]], capture_output=True).returncode == 0: + if shutil.which(pm_cmd[0]): run_cmd(["sudo"] + pm_cmd) break # stop after the first working package manager # Re-check after installation attempt for candidate in ["mkfs.ntfs", "mkntfs"]: - if subprocess.run(["which", candidate], capture_output=True).returncode == 0: + if shutil.which(candidate): return candidate # installation succeeded return None @@ -181,7 +181,7 @@ def _find_ntfs_tool(status_cb=None) -> str | None: def _ensure_wimlib(status_cb=None) -> None: """Install wimlib-imagex if not present. Raises FileNotFoundError if it can't be found after install.""" - if subprocess.run(["which", "wimlib-imagex"], capture_output=True).returncode == 0: + if shutil.which("wimlib-imagex"): return if status_cb: status_cb("wimlib-imagex not found, attempting to install...") @@ -192,10 +192,10 @@ def _ensure_wimlib(status_cb=None) -> None: ["zypper", "install", "-y", "wimtools"], ] for pm_cmd in pkg_managers: - if subprocess.run(["which", pm_cmd[0]], capture_output=True).returncode == 0: + if shutil.which(pm_cmd[0]): run_cmd(["sudo"] + pm_cmd) break - if subprocess.run(["which", "wimlib-imagex"], capture_output=True).returncode != 0: + if not shutil.which("wimlib-imagex"): raise FileNotFoundError( "wimlib-imagex not found. Install manually: sudo pacman -S wimlib / sudo apt install wimtools" ) @@ -271,20 +271,34 @@ def _copy_efi_boot_files(iso_mount, mount_efi, _status): if efi_src: efi_items = os.listdir(efi_src) _status(f"Found EFI/ with {len(efi_items)} items: {efi_items}") - run_cmd(["sudo", "cp", "-r"] + [os.path.join(efi_src, i) for i in efi_items] + [mount_efi]) + for item in efi_items: + src_path = os.path.join(efi_src, item) + dst_path = os.path.join(mount_efi, "EFI", item) + if os.path.isdir(src_path): + shutil.copytree(src_path, dst_path, dirs_exist_ok=True) + else: + os.makedirs(os.path.dirname(dst_path), exist_ok=True) + shutil.copy2(src_path, dst_path) _status("Copied EFI/ tree to EFI partition") else: _status("WARNING: No EFI directory found in ISO - drive may not be UEFI bootable") boot_src = _find_path_case_insensitive(iso_mount, "boot") if boot_src: - run_cmd(["sudo", "cp", "-r"] + [os.path.join(boot_src, i) for i in os.listdir(boot_src)] + [mount_efi]) + for item in os.listdir(boot_src): + src_path = os.path.join(boot_src, item) + dst_path = os.path.join(mount_efi, "boot", item) + if os.path.isdir(src_path): + shutil.copytree(src_path, dst_path, dirs_exist_ok=True) + else: + os.makedirs(os.path.dirname(dst_path), exist_ok=True) + shutil.copy2(src_path, dst_path) _status("Copied boot/ tree to EFI partition") for fname in ["bootmgr", "bootmgr.efi"]: src = _find_path_case_insensitive(iso_mount, fname) if src: - run_cmd(["sudo", "cp", src, f"{mount_efi}/{fname}"]) + shutil.copy2(src, os.path.join(mount_efi, fname)) _status(f"Copied {fname} to EFI partition root") _fix_efi_bootloader(mount_efi) @@ -412,7 +426,7 @@ def _status(msg): # Step 7: Sync _status("Syncing all writes to disk...") - run_cmd(["sudo", "sync"]) + os.sync() _emit(97) _status("Sync complete") diff --git a/src/lufus/writing/windows/tweaks.py b/src/lufus/writing/windows/tweaks.py index 095e78a..fe3daac 100644 --- a/src/lufus/writing/windows/tweaks.py +++ b/src/lufus/writing/windows/tweaks.py @@ -1,3 +1,4 @@ +# Not tested, at least by me, I don't remember when that was added, and never used it nor really know what it's supposed to do. """Windows installation customization functions. These modify Windows installation media (boot.wim, autounattend.xml) @@ -9,6 +10,7 @@ import re import subprocess import os +import shutil from lufus.utils import get_mount_and_drive from lufus import state from lufus.lufus_logging import get_logger @@ -54,7 +56,7 @@ def win_hardware_bypass(): cmd_string = "\n".join(commands) + "\n" log.info("win_hardware_bypass: injecting registry keys into boot.wim at %s...", mount) try: - subprocess.run(["mkdir", "/media/tempwinmnt"], check=True) + os.makedirs("/media/tempwinmnt", exist_ok=True) subprocess.run(["wimmountrw", f"{mount}/sources/boot.wim", "2", "/media/tempwinmnt"], check=True) subprocess.run( ["chntpw", "e", "/media/tempwinmnt/Windows/System32/config/SYSTEM"], @@ -64,7 +66,7 @@ def win_hardware_bypass(): check=True, ) subprocess.run(["wimunmount", "/media/tempwinmnt", "--commit"], check=True) - subprocess.run(["rm", "-rf", "/media/tempwinmnt"], check=True) + shutil.rmtree("/media/tempwinmnt", ignore_errors=True) log.info("win_hardware_bypass: registry keys injected successfully.") except subprocess.CalledProcessError as e: log.error("win_hardware_bypass: CalledProcessError: %s", e.stderr) @@ -79,7 +81,7 @@ def win_local_acc(): cmd_string = "\n".join(commands) + "\n" log.info("win_local_acc: bypassing online account requirement at %s...", mount) try: - subprocess.run(["mkdir", "/media/tempwinmnt"], check=True) + os.makedirs("/media/tempwinmnt", exist_ok=True) subprocess.run(["wimmountrw", f"{mount}/sources/boot.wim", "2", "/media/tempwinmnt"], check=True) subprocess.run( ["chntpw", "e", "/media/tempwinmnt/Windows/System32/config/SOFTWARE"], @@ -89,7 +91,7 @@ def win_local_acc(): check=True, ) subprocess.run(["wimunmount", "/media/tempwinmnt", "--commit"], check=True) - subprocess.run(["rm", "-rf", "/media/tempwinmnt"], check=True) + shutil.rmtree("/media/tempwinmnt", ignore_errors=True) log.info("win_local_acc: online account bypass applied successfully.") except subprocess.CalledProcessError as e: log.error("win_local_acc: CalledProcessError: %s", e.stderr) diff --git a/tests/test_find_usb.py b/tests/test_find_usb.py index fd0abe2..037fe5b 100644 --- a/tests/test_find_usb.py +++ b/tests/test_find_usb.py @@ -1,8 +1,9 @@ from __future__ import annotations -import subprocess import sys +import os from pathlib import Path from types import SimpleNamespace +from unittest.mock import MagicMock ROOT = Path(__file__).resolve().parents[1] SRC = ROOT / "src" @@ -15,6 +16,7 @@ def test_find_usb_returns_mount_to_label_mapping(monkeypatch) -> None: user = "testuser" mount_path = f"/media/{user}/MY_USB" + device_node = "/dev/sdb1" monkeypatch.setattr(find_usb_module.getpass, "getuser", lambda: user) monkeypatch.setattr( @@ -35,21 +37,34 @@ def test_find_usb_returns_mount_to_label_mapping(monkeypatch) -> None: monkeypatch.setattr( find_usb_module.psutil, "disk_partitions", - lambda *args, **kwargs: [SimpleNamespace(mountpoint=mount_path, device="/dev/sdb1")], - ) - monkeypatch.setattr( - find_usb_module.subprocess, - "check_output", - lambda *args, **kwargs: "lufus_USB\n", + lambda *args, **kwargs: [SimpleNamespace(mountpoint=mount_path, device=device_node)], ) + # Mock os.stat safely + os_stat_orig = os.stat + def mock_os_stat(p): + if str(p).startswith("/dev/"): + mock_stat = MagicMock() + mock_stat.st_rdev = 1234 + return mock_stat + return os_stat_orig(p) + monkeypatch.setattr(find_usb_module.os, "stat", mock_os_stat) + + # Mock pyudev + mock_context = MagicMock() + mock_device = MagicMock() + mock_device.get.return_value = "lufus_USB" + monkeypatch.setattr(find_usb_module.pyudev, "Context", lambda: mock_context) + monkeypatch.setattr(find_usb_module.pyudev.Devices, "from_device_number", lambda ctx, type, num: mock_device) + result = find_usb_module.find_usb() assert result == {mount_path: "lufus_USB"} -def test_find_usb_falls_back_to_dir_name_when_lsblk_fails(monkeypatch) -> None: +def test_find_usb_falls_back_to_dir_name_when_pyudev_fails(monkeypatch) -> None: user = "testuser" mount_path = f"/media/{user}/NO_LABEL" + device_node = "/dev/sdc1" monkeypatch.setattr(find_usb_module.getpass, "getuser", lambda: user) monkeypatch.setattr( @@ -70,13 +85,23 @@ def test_find_usb_falls_back_to_dir_name_when_lsblk_fails(monkeypatch) -> None: monkeypatch.setattr( find_usb_module.psutil, "disk_partitions", - lambda *args, **kwargs: [SimpleNamespace(mountpoint=mount_path, device="/dev/sdc1")], + lambda *args, **kwargs: [SimpleNamespace(mountpoint=mount_path, device=device_node)], ) - def raise_lsblk_error(*args, **kwargs): - raise subprocess.CalledProcessError(returncode=1, cmd="lsblk") - - monkeypatch.setattr(find_usb_module.subprocess, "check_output", raise_lsblk_error) + # Mock os.stat safely + os_stat_orig = os.stat + def mock_os_stat(p): + if str(p).startswith("/dev/"): + mock_stat = MagicMock() + mock_stat.st_rdev = 5678 + return mock_stat + return os_stat_orig(p) + monkeypatch.setattr(find_usb_module.os, "stat", mock_os_stat) + + # Mock pyudev to fail + mock_context = MagicMock() + monkeypatch.setattr(find_usb_module.pyudev, "Context", lambda: mock_context) + monkeypatch.setattr(find_usb_module.pyudev.Devices, "from_device_number", MagicMock(side_effect=Exception("udev fail"))) result = find_usb_module.find_usb() assert result == {mount_path: "NO_LABEL"} diff --git a/tests/test_flash_usb_and_find_usb_fixes.py b/tests/test_flash_usb_and_find_usb_fixes.py index 2181687..a288923 100644 --- a/tests/test_flash_usb_and_find_usb_fixes.py +++ b/tests/test_flash_usb_and_find_usb_fixes.py @@ -285,9 +285,10 @@ def test_empty_device_node_does_not_overwrite_states_dn(self, monkeypatch): class TestFindUsbHappyPath: - def test_find_usb_returns_label_from_lsblk(self, monkeypatch): + def test_find_usb_returns_label_from_udev(self, monkeypatch): user = "testuser" mount_path = f"/media/{user}/MY_USB" + device_node = "/dev/sdb1" monkeypatch.setattr(find_usb_module.getpass, "getuser", lambda: user) monkeypatch.setattr( @@ -308,14 +309,26 @@ def test_find_usb_returns_label_from_lsblk(self, monkeypatch): monkeypatch.setattr( find_usb_module.psutil, "disk_partitions", - lambda *args, **kwargs: [SimpleNamespace(mountpoint=mount_path, device="/dev/sdb1")], - ) - monkeypatch.setattr( - find_usb_module.subprocess, - "check_output", - lambda *a, **kw: "MY_LABEL\n", + lambda *args, **kwargs: [SimpleNamespace(mountpoint=mount_path, device=device_node)], ) + import os as real_os + from unittest.mock import MagicMock + os_stat_orig = find_usb_module.os.stat + def mock_os_stat(p): + if str(p).startswith("/dev/"): + m = MagicMock() + m.st_rdev = 1234 + return m + return os_stat_orig(p) + monkeypatch.setattr(find_usb_module.os, "stat", mock_os_stat) + + mock_context = MagicMock() + mock_device = MagicMock() + mock_device.get.return_value = "MY_LABEL" + monkeypatch.setattr(find_usb_module.pyudev, "Context", lambda: mock_context) + monkeypatch.setattr(find_usb_module.pyudev.Devices, "from_device_number", lambda ctx, type, num: mock_device) + result = find_usb_module.find_usb() assert result == {mount_path: "MY_LABEL"} diff --git a/tests/test_get_usb_info.py b/tests/test_get_usb_info.py index 3ba2361..a9582c4 100644 --- a/tests/test_get_usb_info.py +++ b/tests/test_get_usb_info.py @@ -1,8 +1,8 @@ from __future__ import annotations -import subprocess import sys from pathlib import Path from types import SimpleNamespace +from unittest.mock import MagicMock ROOT = Path(__file__).resolve().parents[1] SRC = ROOT / "src" @@ -32,14 +32,23 @@ def test_get_usb_info_returns_expected_dictionary(monkeypatch) -> None: lambda *args, **kwargs: [SimpleNamespace(mountpoint=mount_path, device=device_node)], ) - def fake_check_output(cmd, text=True, timeout=5): - if cmd[-2:] == ["SIZE", device_node]: - return str(16 * 1024**3) - if cmd[-2:] == ["LABEL", device_node]: - return "MYUSB\n" - raise AssertionError(f"Unexpected command: {cmd}") - - monkeypatch.setattr(get_usb_info_module.subprocess, "check_output", fake_check_output) + # Mock os.stat safely + os_stat_orig = get_usb_info_module.os.stat + def mock_os_stat(p): + if str(p).startswith("/dev/"): + mock_stat = MagicMock() + mock_stat.st_rdev = 1234 + return mock_stat + return os_stat_orig(p) + monkeypatch.setattr(get_usb_info_module.os, "stat", mock_os_stat) + + # Mock pyudev + mock_context = MagicMock() + mock_device = MagicMock() + mock_device.attributes = {"size": str((16 * 1024**3) // 512)} + mock_device.get.return_value = "MYUSB" + monkeypatch.setattr(get_usb_info_module.pyudev, "Context", lambda: mock_context) + monkeypatch.setattr(get_usb_info_module.pyudev.Devices, "from_device_number", lambda ctx, type, num: mock_device) result = get_usb_info_module.get_usb_info(mount_path) assert result == { @@ -59,20 +68,29 @@ def test_get_usb_info_uses_mount_basename_when_label_is_empty(monkeypatch) -> No lambda *args, **kwargs: [SimpleNamespace(mountpoint=mount_path, device=device_node)], ) - def fake_check_output(cmd, text=True, timeout=5): - if cmd[-2:] == ["SIZE", device_node]: - return str(8 * 1024**3) - if cmd[-2:] == ["LABEL", device_node]: - return "\n" - raise AssertionError(f"Unexpected command: {cmd}") - - monkeypatch.setattr(get_usb_info_module.subprocess, "check_output", fake_check_output) + # Mock os.stat safely + os_stat_orig = get_usb_info_module.os.stat + def mock_os_stat(p): + if str(p).startswith("/dev/"): + mock_stat = MagicMock() + mock_stat.st_rdev = 5678 + return mock_stat + return os_stat_orig(p) + monkeypatch.setattr(get_usb_info_module.os, "stat", mock_os_stat) + + # Mock pyudev + mock_context = MagicMock() + mock_device = MagicMock() + mock_device.attributes = {"size": str((8 * 1024**3) // 512)} + mock_device.get.return_value = None + monkeypatch.setattr(get_usb_info_module.pyudev, "Context", lambda: mock_context) + monkeypatch.setattr(get_usb_info_module.pyudev.Devices, "from_device_number", lambda ctx, type, num: mock_device) result = get_usb_info_module.get_usb_info(mount_path) assert result["label"] == "NO_LABEL" -def test_get_usb_info_returns_empty_when_lsblk_fails(monkeypatch) -> None: +def test_get_usb_info_returns_empty_when_pyudev_fails(monkeypatch) -> None: mount_path = "/media/testuser/USB" device_node = "/dev/sdb1" @@ -82,9 +100,13 @@ def test_get_usb_info_returns_empty_when_lsblk_fails(monkeypatch) -> None: lambda *args, **kwargs: [SimpleNamespace(mountpoint=mount_path, device=device_node)], ) - def raise_lsblk_error(*args, **kwargs): - raise subprocess.CalledProcessError(returncode=1, cmd="lsblk") - - monkeypatch.setattr(get_usb_info_module.subprocess, "check_output", raise_lsblk_error) + # Mock os.stat safely + os_stat_orig = get_usb_info_module.os.stat + def mock_os_stat(p): + if str(p).startswith("/dev/"): + raise Exception("stat fail") + return os_stat_orig(p) + monkeypatch.setattr(get_usb_info_module.os, "stat", mock_os_stat) + # This should return None because of the catch-all Exception in get_usb_info assert get_usb_info_module.get_usb_info(mount_path) is None diff --git a/tests/test_get_usb_info_and_detect_windows_fixes.py b/tests/test_get_usb_info_and_detect_windows_fixes.py index 2ab845f..fa8ae69 100644 --- a/tests/test_get_usb_info_and_detect_windows_fixes.py +++ b/tests/test_get_usb_info_and_detect_windows_fixes.py @@ -13,19 +13,33 @@ from lufus.drives.get_usb_info import get_usb_info import lufus.writing.windows.detect as dw_module from lufus.writing.windows.detect import _label_is_windows, _read_iso_label, is_windows_iso +from unittest.mock import MagicMock +import os as real_os def _fake_partitions(mount, device): return lambda all=False: [SimpleNamespace(mountpoint=mount, device=device)] -def _fake_check_output(size="1000000000", label="MY_USB"): - def impl(cmd, **kwargs): - if "SIZE" in cmd: - return size + "\n" - return label + "\n" +def _mock_os_stat_and_pyudev(monkeypatch, size="1000000000", label="MY_USB"): + # Mock os.stat safely + os_stat_orig = gui_module.os.stat + def mock_os_stat(p): + if str(p).startswith("/dev/"): + m = MagicMock() + m.st_rdev = 1234 + return m + return os_stat_orig(p) + monkeypatch.setattr(gui_module.os, "stat", mock_os_stat) - return impl + # Mock pyudev + mock_context = MagicMock() + mock_device = MagicMock() + # size in 512-byte sectors + mock_device.attributes = {"size": str(int(size) // 512)} + mock_device.get.return_value = label + monkeypatch.setattr(gui_module.pyudev, "Context", lambda: mock_context) + monkeypatch.setattr(gui_module.pyudev.Devices, "from_device_number", lambda ctx, type, num: mock_device) class Testget_usb_infoNormalisedMountPath: @@ -36,14 +50,14 @@ class Testget_usb_infoNormalisedMountPath: def test_trailing_slash_is_stripped(self, monkeypatch): monkeypatch.setattr(gui_module.psutil, "disk_partitions", _fake_partitions("/media/u/USB/", "/dev/sdb1")) - monkeypatch.setattr(gui_module.subprocess, "check_output", _fake_check_output()) + _mock_os_stat_and_pyudev(monkeypatch) result = get_usb_info("/media/u/USB/") assert result["mount_path"] == "/media/u/USB" def test_normalised_path_matches_normpath(self, monkeypatch, tmp_path): mount = str(tmp_path) monkeypatch.setattr(gui_module.psutil, "disk_partitions", _fake_partitions(mount, "/dev/sdc1")) - monkeypatch.setattr(gui_module.subprocess, "check_output", _fake_check_output()) + _mock_os_stat_and_pyudev(monkeypatch) result = get_usb_info(mount) import os @@ -67,28 +81,6 @@ def fake_dp(all=False): assert calls.get("all") is True -class Testget_usb_infoTimeoutExpired: - """TimeoutExpired was previously swallowed by the broad Exception handler - with a generic message. It must now be caught explicitly. - """ - - def test_returns_empty_dict_on_timeout(self, monkeypatch): - monkeypatch.setattr(gui_module.psutil, "disk_partitions", _fake_partitions("/media/u/USB", "/dev/sdb1")) - - def raise_timeout(*args, **kwargs): - raise subprocess.TimeoutExpired(cmd="lsblk", timeout=5) - - monkeypatch.setattr(gui_module.subprocess, "check_output", raise_timeout) - result = get_usb_info("/media/u/USB") - assert result is None - - def test_timeout_handler_is_explicit(self): - import inspect - - src = inspect.getsource(get_usb_info) - assert "TimeoutExpired" in src - - class Testget_usb_infoForElse: """When no partition matches the mount path, get_usb_info must return {}.""" @@ -100,7 +92,7 @@ def test_returns_empty_when_no_match(self, monkeypatch): class TestLabelIsWindowsDeadBranch: """'or label.startswith("WINDOWS")' was dead code — every "WINDOWS…" - string already starts with "WIN". The redundant check must be gone. + string already starts with "WIN". The redundant check must be gone. Idk why I don't like that, but eh, it works... """ def test_windows_prefix_still_detected(self):