diff --git a/.github/workflows/unit_tests.yml b/.github/workflows/unit_tests.yml index 6b4da64..bdded4f 100644 --- a/.github/workflows/unit_tests.yml +++ b/.github/workflows/unit_tests.yml @@ -19,8 +19,6 @@ jobs: steps: - uses: actions/checkout@v4 - - name: Obtain dependency projects - run: git clone https://github.com/desultory/zenlib - name: Set up Python ${{ matrix.python-version }} uses: actions/setup-python@v3 with: @@ -30,8 +28,21 @@ jobs: run: | python -m venv venv venv/bin/pip install --upgrade pip - venv/bin/pip install ./zenlib - venv/bin/pip install . + + - name: Obtain dependency projects + run: | + mkdir ../deps/zenlib -p + git clone https://github.com/desultory/zenlib ../deps/zenlib + venv/bin/pip install ../deps/zenlib + + - name: Run mypy type checks + run: | + venv/bin/pip install mypy + venv/bin/mypy ../pycpio + + - name: Install pycpio + run: venv/bin/pip install . + - name: Run unit tests run: | cd tests diff --git a/pyproject.toml b/pyproject.toml index 3a72496..96af1e2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "pycpio" -version = "1.6.0" +version = "1.6.1" authors = [ { name="Desultory", email="dev@pyl.onl" }, @@ -18,6 +18,9 @@ classifiers = [ ] dependencies = ["zenlib >= 3.0.2"] +[options.package_data] +pycpio = "py.typed" + [project.optional-dependencies] zstd = ["zstandard"] diff --git a/src/pycpio/cpio/archive.py b/src/pycpio/cpio/archive.py index 8c4d1ac..5ce301d 100644 --- a/src/pycpio/cpio/archive.py +++ b/src/pycpio/cpio/archive.py @@ -10,13 +10,12 @@ from pycpio.cpio.file import CPIO_File from pycpio.cpio.symlink import CPIO_Symlink from pycpio.header import HEADER_NEW -from zenlib.logging import loggify +from zenlib.logging import LoggerMixIn from zenlib.util import colorize as c_ from zenlib.util import handle_plural -@loggify -class CPIOArchive(dict): +class CPIOArchive(dict, LoggerMixIn): def __setitem__(self, name, value): if name in self: raise AttributeError(f"Entry already exists: {c_(name, 'red')}") @@ -115,6 +114,7 @@ def __getitem__(self, name): return super().__getitem__(self._normalize_name(name)) def __init__(self, structure=HEADER_NEW, reproducible=False, *args, **kwargs): + self.init_logger(args, kwargs) self.structure = structure self.reproducible = reproducible self.inodes = {} diff --git a/src/pycpio/cpio/common.py b/src/pycpio/cpio/common.py index 06b52f1..e9273ae 100644 --- a/src/pycpio/cpio/common.py +++ b/src/pycpio/cpio/common.py @@ -3,12 +3,12 @@ MAX_INODES = 0xFFFFFFFF -def pad_cpio(size, align=4): +def pad_cpio(size: int, align: int = 4): """Pad size to align bytes.""" return ((size + align - 1) & ~(align - 1)) - size -def get_new_inode(existing_inodes): +def get_new_inode(existing_inodes: dict[str, list[int]]): """Get a new inode number.""" if not existing_inodes: return 1 diff --git a/src/pycpio/cpio/data.py b/src/pycpio/cpio/data.py index 1e07e55..56afe97 100644 --- a/src/pycpio/cpio/data.py +++ b/src/pycpio/cpio/data.py @@ -44,22 +44,31 @@ def from_dir(path: Path, parent=None, relative=False, *args, **kwargs): kwargs["name"] = str(path) data = [] - data.append(CPIOData.from_path(path=path, relative=relative, *args, **kwargs)) + top_kwargs = kwargs.copy() + top_kwargs["path"] = path + top_kwargs["relative"] = relative + data.append(CPIOData.from_path(*args, **top_kwargs)) for child in path.iterdir(): + child_kwargs = kwargs.copy() + if parent: child_path = parent / child else: child_path = child if relative: - kwargs["name"] = str(child_path.relative_to(relative)) + child_kwargs["name"] = str(child_path.relative_to(relative)) else: - kwargs["name"] = str(child_path) + child_kwargs["name"] = str(child_path) + + child_kwargs["path"] = child_path + child_kwargs["relative"] = relative if child.is_dir() and not child.is_symlink(): - data.extend(CPIOData.from_dir(path=child_path, parent=parent, relative=relative, *args, **kwargs)) + child_kwargs["parent"] = parent + data.extend(CPIOData.from_dir(*args, **child_kwargs)) else: - data.append(CPIOData.from_path(path=child_path, relative=relative, *args, **kwargs)) + data.append(CPIOData.from_path(*args, **child_kwargs)) return data @staticmethod @@ -92,7 +101,6 @@ def from_path(path: Path, relative=False, resolve_symlink=False, *args, **kwargs if not path.exists(): raise ValueError("Path does not exist: %s" % path) - kwargs["path"] = path # If a name is provided, use it, otherwise, use the path, if relative is provided, use the relative path if name := kwargs.pop("name", None): kwargs["name"] = name @@ -118,7 +126,7 @@ def from_path(path: Path, relative=False, resolve_symlink=False, *args, **kwargs kwargs["rdevminor"] = kwargs.pop("rdevminor", os.minor(path.stat(follow_symlinks=resolve_symlink).st_rdev)) header = CPIOHeader(*args, **kwargs) - data = CPIOData.get_subtype(b"", header, *args, **kwargs) + data = CPIOData.get_subtype(b"", header, path=path, *args, **kwargs) if logger := kwargs.get("logger"): logger.debug(f"Created CPIO entry from path: {data}") diff --git a/src/pycpio/header/cpioheader.py b/src/pycpio/header/cpioheader.py index 243db83..424cd42 100644 --- a/src/pycpio/header/cpioheader.py +++ b/src/pycpio/header/cpioheader.py @@ -6,15 +6,15 @@ from pycpio.header.header_funcs import get_header_from_magic, get_magic_from_header from pycpio.header.headers import HEADER_NEW from pycpio.masks import print_permissions, resolve_mode_bytes, resolve_permissions -from zenlib.logging import loggify +from zenlib.logging import LoggerMixIn from zenlib.util import colorize as c_ -@loggify -class CPIOHeader: +class CPIOHeader(LoggerMixIn): """CPIO HEADER, can be initialized from a segment of header data with or without a structure definition.""" def __init__(self, header_data=b"", overrides={}, *args, **kwargs): + self.init_logger(args, kwargs) self.overrides = overrides if header_data: self.logger.debug("Creating CPIOEntry from header data: %s", header_data) @@ -138,8 +138,9 @@ def process_overrides(self) -> None: if hasattr(self, attribute): self.logger.log(5, "[%s] Pre-override: %s" % (attribute, getattr(self, attribute))) if attribute == "mode": + mode = int(getattr(self, "mode", b"00000000"), 16) # Mask the mode, then add the override - value = (int(self.mode, 16) & 0o7777000) | (self.overrides[attribute] & 0o777) + value = (mode & 0o7777000) | (self.overrides[attribute] & 0o777) else: value = self.overrides[attribute] self.logger.debug("[%s] Setting override: %s" % (attribute, value)) @@ -148,13 +149,13 @@ def process_overrides(self) -> None: def _read_bytes(self, num_bytes: int) -> bytes: """Read the specified number of bytes from the data, incrementing the offset, then returning the data.""" data = self.data[self.offset : self.offset + num_bytes] - self.logger.log(5, "Read %d bytes: %s" % (num_bytes, data)) + self.logger.log(5, "Read %s bytes: %r" % (num_bytes, data)) self.offset += num_bytes return data def add_data(self, data: bytes) -> None: """Add the file data to the object.""" - self.logger.debug("Adding data: %s" % data) + self.logger.debug("Adding data: %r" % data) self.data += data def parse_header(self): diff --git a/src/pycpio/header/header_funcs.py b/src/pycpio/header/header_funcs.py index 3081ab4..dcc856c 100644 --- a/src/pycpio/header/header_funcs.py +++ b/src/pycpio/header/header_funcs.py @@ -10,10 +10,10 @@ def get_header_from_magic(magic: bytes) -> dict: for magic, header_type in lookup_table.items(): if magic == magic: return header_type - raise ValueError("Unknown magic number: %s" % magic) + raise ValueError("Unknown magic number: %r" % magic) -def get_magic_from_header(header: dict) -> dict: +def get_magic_from_header(header: dict) -> bytes: """Return the magic number for the given header format.""" for magic, header_type in lookup_table.items(): if header_type == header: diff --git a/src/pycpio/main.py b/src/pycpio/main.py index a6f5997..43767ac 100755 --- a/src/pycpio/main.py +++ b/src/pycpio/main.py @@ -61,19 +61,6 @@ def main(): raise ValueError("Character device requires minor number") c.add_chardev(chardev_path, major, minor) - if append_file := kwargs.get("append"): - cmdargs = { - "relative": kwargs.get("relative"), - "path": Path(append_file), - "name": kwargs.get("name"), - "absolute": kwargs.get("absolute"), - } - - c.append_cpio(**cmdargs) - - if recursive_path := kwargs.get("recursive"): - cmdargs = {"relative": kwargs.get("relative"), "path": Path(recursive_path)} - c.append_recursive(**cmdargs) if output_file := kwargs.get("output"): compression = kwargs.get("compress") diff --git a/src/pycpio/masks/__init__.py b/src/pycpio/masks/__init__.py index 6a4256f..607b2a1 100644 --- a/src/pycpio/masks/__init__.py +++ b/src/pycpio/masks/__init__.py @@ -1,5 +1,11 @@ +from pycpio.masks.modes import CPIOModes, mode_bytes_from_path, resolve_mode_bytes from pycpio.masks.permissions import Permissions, print_permissions, resolve_permissions -from pycpio.masks.modes import CPIOModes, resolve_mode_bytes, mode_bytes_from_path -__all__ = [Permissions, print_permissions, resolve_permissions, - CPIOModes, resolve_mode_bytes, mode_bytes_from_path] +__all__ = [ + "Permissions", + "print_permissions", + "resolve_permissions", + "CPIOModes", + "resolve_mode_bytes", + "mode_bytes_from_path", +] diff --git a/src/pycpio/masks/modes.py b/src/pycpio/masks/modes.py index c91e43f..ebee392 100644 --- a/src/pycpio/masks/modes.py +++ b/src/pycpio/masks/modes.py @@ -1,6 +1,6 @@ from enum import Enum from pathlib import Path - +from typing import Union class CPIOModes(Enum): """ @@ -16,9 +16,11 @@ class CPIOModes(Enum): FIFO = 0o010000 # FIFO -def resolve_mode_bytes(mode: bytes) -> CPIOModes: +def resolve_mode_bytes(mode: bytes) -> Union[CPIOModes, None]: """ Resolve the mode mask from the given bytes. + + If the mode is 0, return None (trailer). """ mode_int = int(mode, 16) # Handle the trailer @@ -29,14 +31,16 @@ def resolve_mode_bytes(mode: bytes) -> CPIOModes: if cpiomode.value & mode_int == cpiomode.value: return cpiomode - raise ValueError(f"Unknown mode: {mode}") + raise ValueError("Unknown mode: %r" % mode) -def mode_bytes_from_path(file_path: Path) -> CPIOModes: +def mode_bytes_from_path(file_path: Path) -> int: """ Gets the mode type bytes from the given path. The order of the checks is important, as some types are subsets of others. + + returns the integer value of the mode. """ if file_path.is_symlink(): return CPIOModes.Symlink.value diff --git a/src/pycpio/py.typed b/src/pycpio/py.typed new file mode 100644 index 0000000..e69de29 diff --git a/src/pycpio/pycpio.py b/src/pycpio/pycpio.py index 48196eb..94445d4 100644 --- a/src/pycpio/pycpio.py +++ b/src/pycpio/pycpio.py @@ -1,7 +1,7 @@ from pathlib import Path from typing import Union -from zenlib.logging import loggify +from zenlib.logging import LoggerMixIn from pycpio.cpio import CPIOArchive, CPIOData from pycpio.header import HEADER_NEW @@ -10,11 +10,11 @@ from pycpio.writer import CPIOWriter -@loggify -class PyCPIO: +class PyCPIO(LoggerMixIn): """A class for using CPIO archives.""" def __init__(self, structure=HEADER_NEW, reproducible=False, *args, **kwargs): + self.init_logger(args, kwargs) self.structure = structure self.reproducible = reproducible self.overrides = {} @@ -25,7 +25,7 @@ def __init__(self, structure=HEADER_NEW, reproducible=False, *args, **kwargs): self.logger.info("[%s] Setting override: %s" % (attr, value)) self.overrides[attr] = value - def append_cpio(self, path: Path, name: str = None, *args, **kwargs): + def append_cpio(self, path: Path, name: Union[str, None] = None, *args, **kwargs): """Appends a file or directory to the CPIO archive.""" kwargs.update({"path": path, "structure": self.structure, "overrides": self.overrides, "logger": self.logger}) if name: @@ -47,9 +47,9 @@ def add_symlink(self, name: str, target: str): def add_chardev(self, name: str, major: int, minor: int, *args, **kwargs): """Adds a character device to the CPIO archive.""" - self._build_cpio_entry( - name=name, entry_type=CPIOModes["CharDev"].value, rdevmajor=major, rdevminor=minor, *args, **kwargs - ) + kwargs["name"] = name + kwargs["entry_type"] = CPIOModes["CharDev"].value + self._build_cpio_entry(rdevmajor=major, rdevminor=minor, *args, **kwargs) def read_cpio_file(self, file_path: Path): """Creates a CPIOReader object and reads the file.""" @@ -67,7 +67,7 @@ def list_files(self): """Returns a list of files in the CPIO archive.""" return str(self.entries.list()) - def _build_cpio_entry(self, name: str, entry_type: CPIOModes, data=None, *args, **kwargs): + def _build_cpio_entry(self, name: str, entry_type: int, data=None, *args, **kwargs): """Creates a CPIOData object and adds it to the CPIO archive.""" overrides = self.overrides.copy() if mode := kwargs.pop("mode", None): diff --git a/src/pycpio/reader/reader.py b/src/pycpio/reader/reader.py index 6269f19..25ace2a 100644 --- a/src/pycpio/reader/reader.py +++ b/src/pycpio/reader/reader.py @@ -1,13 +1,12 @@ from pathlib import Path -from typing import Union +from typing import Generator, Union, Any from pycpio.cpio import CPIOArchive, CPIOData, pad_cpio from pycpio.header import CPIOHeader -from zenlib.logging import loggify +from zenlib.logging import LoggerMixIn -@loggify -class CPIOReader: +class CPIOReader(LoggerMixIn): """ A class for reading CPIO archives. Takes a file path as input, and reads it into self.raw_cpio. @@ -15,54 +14,60 @@ class CPIOReader: Once processed, the files are stored in self.entries, which is a dictionary of CPIO entries. """ - def __init__(self, input_file: Union[Path, str], overrides={}, *args, **kwargs): - self.file_path = Path(input_file) + def __init__(self, input_file: Union[Path, str], overrides: Union[dict[str, Any], None] = None, *args, **kwargs): + if overrides is None: + overrides = {} + self.init_logger(args, kwargs) + self.file_path: Union[Path, int] = Path(input_file) if self.file_path == Path('-'): - # stdin + # opening just '0' indicates stdin self.file_path = 0 else: # normal file assert self.file_path.exists(), "File does not exist: %s" % self.file_path + self.data_bytes: bytes = b"" - self.overrides = overrides - self.entries = CPIOArchive(logger=self.logger) + self.overrides: dict[str, Any] = overrides + self.entries: CPIOArchive = CPIOArchive(logger=self.logger) + self.offset: int = 0 self.read_cpio_file() self.process_cpio_file() - def _read_bytes(self, num_bytes: int, pad=False): + def _read_bytes(self, num_bytes: int, pad: bool = False) -> bytes: """Reads num_bytes from self.raw_cpio, starting at self.offset.""" if not num_bytes: return b"" - data = self.cpio_file[self.offset : self.offset + num_bytes] + data = self.data_bytes[self.offset : self.offset + num_bytes] if len(data) > 256: - self.logger.debug("Read %s bytes: %s...%s" % (num_bytes, data[:128], data[-128:])) + self.logger.debug("Read %d bytes: %r...%r" % (num_bytes, data[:128], data[-128:])) else: - self.logger.debug("Read %s bytes: %s" % (num_bytes, data)) + self.logger.debug("Read %d bytes: %r" % (num_bytes, data)) self.offset += num_bytes if pad: pad_size = pad_cpio(self.offset) - self.logger.debug("Padding offset by %s bytes" % pad_size) + self.logger.debug("Padding offset by %d bytes" % pad_size) self.offset += pad_size return data - def read_cpio_file(self): + def read_cpio_file(self) -> None: """ - Reads a CPIO archive into self.cpio_file. + Reads a CPIO archive into self.data_bytes. Resets the offset to 0, preparing for processing. """ self.logger.debug("Reading file: %s" % self.file_path) with open(self.file_path, "rb") as cpio_file: - self.cpio_file = cpio_file.read() - self.logger.info("[%s] Read bytes: %s" % (self.file_path, len(self.cpio_file))) + self.data_bytes = cpio_file.read() + self.logger.info("[%s] Read bytes: %d" % (self.file_path, len(self.data_bytes))) - self.logger.debug("Setting offset to 0") - self.offset = 0 + if self.offset != 0: + self.logger.debug("Resetting read offset to 0") + self.offset = 0 - def process_cpio_header(self) -> CPIOHeader: + def process_cpio_header(self) -> Union[CPIOHeader, None]: """Processes a single CPIO header from self.raw_cpio.""" header_data = self._read_bytes(110) @@ -73,8 +78,7 @@ def process_cpio_header(self) -> CPIOHeader: header = CPIOHeader(**kwargs) except ValueError as e: self.logger.error("Failed to process header: %s" % e) - self.logger.info("[%s] Header data at offset %d: %s" % (self.file_path, self.offset, header_data)) - return + return self.logger.info("[%s] Header data at offset %d: %r" % (self.file_path, self.offset, header_data)) # Get the filename now that we know the size filename_data = self._read_bytes(int(header.namesize, 16), pad=True) @@ -83,27 +87,27 @@ def process_cpio_header(self) -> CPIOHeader: # If it's the trailer, break if not header.mode_type: - self.logger.info("Trailer detected at offset: %s" % self.offset) - return + return self.logger.info("Trailer detected at offset: %s" % self.offset) return header - def process_cpio_data(self): - """Processes the file object self.cpio_file, yielding CPIOData objects.""" - while self.offset < len(self.cpio_file): + def process_cpio_data(self) -> Generator[CPIOData, None, None]: + """Processes the file object self.data_bytes, yielding CPIOData objects.""" + while self.offset < len(self.data_bytes): self.logger.debug("At offset: %s" % self.offset) if header := self.process_cpio_header(): - kwargs = {"data": self._read_bytes(int(header.filesize, 16), pad=True), "header": header} - yield CPIOData.get_subtype(**kwargs) + filesize = int(getattr(header, "filesize", "0"), 16) + data = self._read_bytes(filesize, pad=True) + yield CPIOData(header=header, data=data, logger=self.logger) else: break else: self.logger.warning("Reached end of file without finding trailer") - def process_cpio_file(self): + def process_cpio_file(self) -> None: """ Processes a CPIO archive. - Uses reads data from self.cpio_file, and processes it into CPIOData objects. + Uses reads data from self.data_bytes, and processes it into CPIOData objects. When opjects are processed, the internal offset is updated. Processed objects are stored in self.entries. """ diff --git a/src/pycpio/writer/writer.py b/src/pycpio/writer/writer.py index 1fdb280..1e0445b 100644 --- a/src/pycpio/writer/writer.py +++ b/src/pycpio/writer/writer.py @@ -1,6 +1,7 @@ from lzma import CHECK_CRC32 from os import fsync from pathlib import Path +from typing import Union from pycpio.header import HEADER_NEW, CPIOHeader from pycpio.errors import UnavailableCompression @@ -18,7 +19,7 @@ class CPIOWriter: def __init__( self, cpio_entries: list, - output_file: Path, + output_file: Union[str, Path], structure=None, compression=False, compression_level=10,