Skip to content
83 changes: 83 additions & 0 deletions deps.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# Lufus External Dependencies (Non-Python)

## System Privileges
- pkexec (polkit)
- sudo
- runuser (util-linux)

## Flashing & Imaging
- dd (coreutils)
- sync (coreutils)
- cp (coreutils)
- rm (coreutils)
- mkdir (coreutils)

## Partitioning & Disk Info
- lsblk (util-linux)
- parted
- sfdisk (util-linux)
- partprobe (parted)
- udevadm (systemd/udev)
- wipefs (util-linux)
- blockdev (util-linux)
- badblocks (e2fsprogs)

## Filesystem Tools
- mkfs.vfat, fatlabel (dosfstools)
- mkfs.exfat (exfatprogs or exfat-utils)
- mkfs.ntfs / mkntfs, ntfslabel (ntfs-3g)
- mkfs.ext4, e2label (e2fsprogs)
- mkudffs, udflabel (udftools)

## Mounting & ISO Handling
- mount (util-linux)
- umount (util-linux)
- 7z (p7zip-full) - Used for Windows ISO detection
- isoinfo (genisoimage) - Fallback for Windows ISO detection

## Process & System Management
- lsof
- fuser (psmisc)
- pgrep (procps)
- which (debianutils or similar)
- stty (coreutils)
- xdg-user-dir (xdg-user-dirs)
- xdg-open (xdg-utils)

## Windows Image Operations (WIM)
- wimlib-imagex (wimlib / wimtools)
- wimmountrw (wimlib)
- wimunmount (wimlib)

## Bootloader Installation
- grub-install (grub2 / grub-common)

## GUI & System Libraries (PyQt6 Requirements)
- libgl1
- libx11-6
- libxcb1
- libxrender1
- fontconfig
- libfreetype6
- libxext6
- libxrandr2
- libxcursor1
- libxi6
- libxfixes3
- libxcomposite1
- libxdamage1

## Package Managers (Used for auto-installing missing tools)
- apt-get (Debian/Ubuntu)
- dnf (Fedora/RHEL)
- pacman (Arch)
- zypper (openSUSE)

## Python Environment
- python3
- python3-pip
- python3-venv

## Other (from requirements-system.txt)
- wget
- file
14 changes: 7 additions & 7 deletions src/lufus/drives/find_usb.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import psutil
import os
import subprocess
import pyudev
import getpass
from lufus import state
from lufus.lufus_logging import get_logger
Expand Down Expand Up @@ -43,6 +43,7 @@ def find_usb() -> dict[str, str]:
dir_set = set(all_directories)

# Check each partition to see if it matches our potential mount points
context = pyudev.Context()
for part in psutil.disk_partitions(all=True):
if part.mountpoint not in dir_set:
continue
Expand All @@ -53,12 +54,11 @@ def find_usb() -> dict[str, str]:

label = None
try:
label = subprocess.check_output(
["lsblk", "-d", "-n", "-o", "LABEL", device_node],
text=True,
timeout=5,
).strip()
except (subprocess.CalledProcessError, subprocess.TimeoutExpired):
# Using os.stat to get device number as per requirements
st = os.stat(device_node)
device = pyudev.Devices.from_device_number(context, "block", st.st_rdev)
label = device.get("ID_FS_LABEL")
except Exception:
pass

if not label:
Expand Down
28 changes: 11 additions & 17 deletions src/lufus/drives/get_usb_info.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import psutil
import os
import subprocess
import pyudev
from typing import TypedDict
from lufus.lufus_logging import get_logger

Expand All @@ -25,20 +25,20 @@ def get_usb_info(usb_path: str) -> USBDeviceInfo | None:
log.warning("Could not find device node for USB path: %s", usb_path)
return None

size_output = subprocess.check_output(
["lsblk", "-d", "-n", "-b", "-o", "SIZE", device_node],
text=True,
timeout=5,
).strip()

usb_size = int(size_output) if size_output.isdigit() else 0
if not size_output.isdigit():
log.warning("Could not parse device size: %r", size_output)
context = pyudev.Context()
# Using os.stat to get device number as per requirements
st = os.stat(device_node)
device = pyudev.Devices.from_device_number(context, "block", st.st_rdev)

# Size in bytes: udev attributes 'size' is in 512-byte sectors
size_attr = device.attributes.get("size")
usb_size = int(size_attr) * 512 if size_attr else 0

label = device.get("ID_FS_LABEL")

if usb_size > 32 * 1024**3:
log.warning("USB device is large (%d bytes); confirm before flashing.", usb_size)

label = subprocess.check_output(["lsblk", "-d", "-n", "-o", "LABEL", device_node], text=True, timeout=5).strip()
if not label:
label = os.path.basename(usb_path)

Expand All @@ -49,15 +49,9 @@ def get_usb_info(usb_path: str) -> USBDeviceInfo | None:
}
log.info("USB Info: %s", usb_info)
return usb_info
except subprocess.TimeoutExpired as e:
log.error("Timed out getting USB info for %s: %s", usb_path, e)
return None
except PermissionError:
log.error("Permission denied when trying to get USB info: %s", usb_path)
return None
except subprocess.CalledProcessError as e:
log.error("Error getting USB info: %s", e)
return None
except Exception as err:
log.error("Unexpected error getting USB info: %s", err)
return None
45 changes: 38 additions & 7 deletions src/lufus/writing/flash_usb.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ def _status(msg: str) -> None:
try:
iso_size = os.path.getsize(iso_path)
_status(f"File size: {iso_size:,} bytes ({iso_size / (1024**3):.2f} GiB)")
if progress_cb:
progress_cb(2)

if iso_path.lower().endswith(".iso"):
_status(f"Validating ISO9660 signature for: {iso_path}")
Expand All @@ -62,8 +64,15 @@ def _status(msg: str) -> None:
else:
_status(f"Not an ISO file ({os.path.basename(iso_path)}), skipping ISO signature check")

if progress_cb:
progress_cb(5)

_status("Checking if image contains installation markers...")
if is_windows_iso(iso_path):
iso_type = detect_iso_type(iso_path)
if progress_cb:
progress_cb(8)

if iso_type == IsoType.WINDOWS:
_status("Windows Installation media detected, routing to flash_windows (ISO mode)")
return flash_windows(
device,
Expand All @@ -73,12 +82,14 @@ def _status(msg: str) -> None:
status_cb=status_cb,
)

iso_type = detect_iso_type(iso_path)
if iso_type == IsoType.LINUX:
_status("Linux Installation media detected, will use dd for flashing")
else:
_status("Generic or unknown image, will use dd for flashing")

if progress_cb:
progress_cb(10)

dd_args = [
"dd",
f"if={iso_path}",
Expand All @@ -93,7 +104,11 @@ def _status(msg: str) -> None:
_status(f"Writing {iso_size:,} bytes to {shlex.quote(device)}, this may take several minutes...")

try:
process = subprocess.Popen(dd_args, stderr=subprocess.PIPE, stdout=subprocess.DEVNULL)
# Use LC_ALL=C to ensure "bytes" is the keyword for progress parsing
# and set a consistent output format across different locales.
env = os.environ.copy()
env["LC_ALL"] = "C"
process = subprocess.Popen(dd_args, stderr=subprocess.PIPE, stdout=subprocess.DEVNULL, env=env)
except FileNotFoundError:
log.error("Flash failed: 'dd' utility not found. Install coreutils.")
_status("Flash failed: 'dd' utility not found. Install coreutils.")
Expand All @@ -104,27 +119,43 @@ def _status(msg: str) -> None:
buf = b""
last_pct = -1
while True:
chunk = process.stderr.readline()
# Read in small chunks to handle \r progress updates from dd without blocking
# until a newline (\n) is received. status=progress usually emits \r.
try:
chunk = process.stderr.read(128)
except Exception as e:
log.warning("Error reading dd stderr: %s", e)
break

if not chunk:
break
buf += chunk
# Split by \r or \n to catch all progress updates
parts = re.split(rb"[\r\n]", buf)
# The last part might be incomplete, keep it in the buffer
buf = parts[-1]

for line in parts[:-1]:
line = line.strip()
if not line:
continue
m = re.match(rb"^(\d+)\s+bytes", line)
if m and iso_size > 0:
bytes_done = int(m.group(1))
pct = min(int(bytes_done * 100 / iso_size), 99)
# Scale progress to 10-95% range to leave room for early steps and final sync
pct_raw = min(int(bytes_done * 100 / iso_size), 100)
pct = 10 + int(pct_raw * 0.85)

if pct != last_pct:
_status(f"dd progress: {bytes_done:,} / {iso_size:,} bytes ({pct}%)")
_status(f"dd progress: {bytes_done:,} / {iso_size:,} bytes ({pct_raw}%)")
last_pct = pct
if progress_cb:
progress_cb(pct)
else:
log.warning("dd stderr: %s", line.decode("utf-8", errors="replace"))
# Filter out common dd output lines to avoid logging noise
line_str = line.decode("utf-8", errors="replace")
if not any(x in line_str for x in ["records in", "records out", "copied"]):
log.warning("dd stderr: %s", line_str)

process.wait()
_status(f"dd process exited with return code {process.returncode}")
Expand Down
2 changes: 1 addition & 1 deletion src/lufus/writing/install_ventoy.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ def install_grub(target_device: str) -> bool:
# Refresh kernel table
subprocess.run(["partprobe", target_device], check=False)
subprocess.run(["udevadm", "settle"], check=False)
subprocess.run(["sync"], check=True)
os.sync()

# Wait for device nodes to be created by udev
efi_part = f"{target_device}{sep}2"
Expand Down
38 changes: 26 additions & 12 deletions src/lufus/writing/windows/flash.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,12 @@ def _fix_efi_bootloader(efi_mount):

log.info("EFI bootloader fix: BOOTX64.EFI not found, will attempt to create at %s", boot_dir)
bootx64 = os.path.join(boot_dir, "BOOTX64.EFI")
run_cmd(["sudo", "mkdir", "-p", boot_dir])
os.makedirs(boot_dir, exist_ok=True)
log.info("EFI bootloader fix: created directory %s", boot_dir)

src = _find_path_case_insensitive(efi_mount, "EFI", "Microsoft", "Boot", "bootmgfw.efi")
if src:
run_cmd(["sudo", "cp", src, bootx64])
shutil.copy2(src, bootx64)
log.info("EFI bootloader fix: copied %s -> %s", src, bootx64)
return

Expand Down Expand Up @@ -155,7 +155,7 @@ def _copy_file(src: str, dst: str) -> str:
def _find_ntfs_tool(status_cb=None) -> str | None:
"""Find mkfs.ntfs/mkntfs, installing ntfs-3g if needed. Returns command name or None."""
for candidate in ["mkfs.ntfs", "mkntfs"]:
if subprocess.run(["which", candidate], capture_output=True).returncode == 0:
if shutil.which(candidate):
return candidate

if status_cb:
Expand All @@ -167,21 +167,21 @@ def _find_ntfs_tool(status_cb=None) -> str | None:
["zypper", "install", "-y", "ntfs-3g"],
]
for pm_cmd in pkg_managers:
if subprocess.run(["which", pm_cmd[0]], capture_output=True).returncode == 0:
if shutil.which(pm_cmd[0]):
run_cmd(["sudo"] + pm_cmd)
break # stop after the first working package manager

# Re-check after installation attempt
for candidate in ["mkfs.ntfs", "mkntfs"]:
if subprocess.run(["which", candidate], capture_output=True).returncode == 0:
if shutil.which(candidate):
return candidate # installation succeeded

return None


def _ensure_wimlib(status_cb=None) -> None:
"""Install wimlib-imagex if not present. Raises FileNotFoundError if it can't be found after install."""
if subprocess.run(["which", "wimlib-imagex"], capture_output=True).returncode == 0:
if shutil.which("wimlib-imagex"):
return
if status_cb:
status_cb("wimlib-imagex not found, attempting to install...")
Expand All @@ -192,10 +192,10 @@ def _ensure_wimlib(status_cb=None) -> None:
["zypper", "install", "-y", "wimtools"],
]
for pm_cmd in pkg_managers:
if subprocess.run(["which", pm_cmd[0]], capture_output=True).returncode == 0:
if shutil.which(pm_cmd[0]):
run_cmd(["sudo"] + pm_cmd)
break
if subprocess.run(["which", "wimlib-imagex"], capture_output=True).returncode != 0:
if not shutil.which("wimlib-imagex"):
raise FileNotFoundError(
"wimlib-imagex not found. Install manually: sudo pacman -S wimlib / sudo apt install wimtools"
)
Expand Down Expand Up @@ -271,20 +271,34 @@ def _copy_efi_boot_files(iso_mount, mount_efi, _status):
if efi_src:
efi_items = os.listdir(efi_src)
_status(f"Found EFI/ with {len(efi_items)} items: {efi_items}")
run_cmd(["sudo", "cp", "-r"] + [os.path.join(efi_src, i) for i in efi_items] + [mount_efi])
for item in efi_items:
src_path = os.path.join(efi_src, item)
dst_path = os.path.join(mount_efi, "EFI", item)
if os.path.isdir(src_path):
shutil.copytree(src_path, dst_path, dirs_exist_ok=True)
else:
os.makedirs(os.path.dirname(dst_path), exist_ok=True)
shutil.copy2(src_path, dst_path)
_status("Copied EFI/ tree to EFI partition")
else:
_status("WARNING: No EFI directory found in ISO - drive may not be UEFI bootable")

boot_src = _find_path_case_insensitive(iso_mount, "boot")
if boot_src:
run_cmd(["sudo", "cp", "-r"] + [os.path.join(boot_src, i) for i in os.listdir(boot_src)] + [mount_efi])
for item in os.listdir(boot_src):
src_path = os.path.join(boot_src, item)
dst_path = os.path.join(mount_efi, "boot", item)
if os.path.isdir(src_path):
shutil.copytree(src_path, dst_path, dirs_exist_ok=True)
else:
os.makedirs(os.path.dirname(dst_path), exist_ok=True)
shutil.copy2(src_path, dst_path)
_status("Copied boot/ tree to EFI partition")

for fname in ["bootmgr", "bootmgr.efi"]:
src = _find_path_case_insensitive(iso_mount, fname)
if src:
run_cmd(["sudo", "cp", src, f"{mount_efi}/{fname}"])
shutil.copy2(src, os.path.join(mount_efi, fname))
_status(f"Copied {fname} to EFI partition root")

_fix_efi_bootloader(mount_efi)
Expand Down Expand Up @@ -412,7 +426,7 @@ def _status(msg):

# Step 7: Sync
_status("Syncing all writes to disk...")
run_cmd(["sudo", "sync"])
os.sync()
_emit(97)
_status("Sync complete")

Expand Down
Loading
Loading