diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 66b981c..69a1454 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -39,4 +39,4 @@ jobs: python-version: "3.14" - uses: astral-sh/setup-uv@v7 - - run: uvx --with tox-uv tox -e style + - run: uvx --with tox-uv tox -e style,typecheck diff --git a/docs/history.rst b/docs/history.rst index 39665a7..b6cad5c 100644 --- a/docs/history.rst +++ b/docs/history.rst @@ -2,19 +2,35 @@ History ======= -5.6.0 (2026-03-16) +5.6.0 (2026-03-18) ------------------ +* ``which()`` looks at current venv ``bin/`` folder first + +* ``ClickRunner.default_main`` and ``ClickRunner.context_wrapper`` replace deprecated ``cli.default_main`` and ``cli.context`` + +* Removed: + + * ``assert_printed()`` from ``CapturedStream``, ``TrackedOutput``, and ``ClickRunner`` + + * ``runez.SYS_INFO.platform_id.is_windows`` + + * ``runez.inspector.AutoInstall`` + + * ``runez.inspector.ImportTime`` + +* Not exposing in top-level ``runez`` import (use ``from runez. import ...`` if needed): + + * ``AdaptedProperty``, ``ascii``, ``config`` , ``is_subfolder``, ``PsInfo``, ``Slotted`` + +* Simplified ``runez.schema`` internals + * Turned on flake8-simplify and tryceratops, fixed corresponding issues * Added tests * Modernized GH actions and project metadata -* Removed ``is_subfolder()`` from ``runez.file`` - -* Removed ``assert_printed()`` from ``CapturedStream``, ``TrackedOutput``, and ``ClickRunner`` - * ``ClickRunner.run()`` now accepts ``pathlib.Path`` objects for ``main``/script args * Fixed operator precedence bug in ``Version.__le__()`` @@ -27,6 +43,8 @@ History * Using ``extractall(filter="data")`` from stdlib for path traversal protection +* Adapted type declarations, pyright now reports 0 errors + 5.5.0 (2026-03-12) ------------------ diff --git a/pyproject.toml b/pyproject.toml index c5a23ac..3c00d46 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -33,6 +33,10 @@ dynamic = ["version"] [project.urls] Source = "https://github.com/codrsquad/runez" +[tool.pyright] +pythonVersion = "3.10" +include = ["src"] + [tool.ruff] cache-dir = ".tox/.ruff_cache" line-length = 140 diff --git a/setup.py b/setup.py index fcbaa9f..cd92c61 100644 --- a/setup.py +++ b/setup.py @@ -1,6 +1,6 @@ from setuptools import setup setup( - setup_requires="setupmeta", + setup_requires=["setupmeta"], versioning="dev", ) diff --git a/src/runez/__init__.py b/src/runez/__init__.py index a0fd887..6b106a4 100644 --- a/src/runez/__init__.py +++ b/src/runez/__init__.py @@ -3,7 +3,7 @@ """ # fmt: off -from runez import ascii, click, config, date, file, program, serialize, system +from runez import click, date, file, program, serialize, system from runez.colors import ActivateColors, ColorManager as color from runez.colors.named import black, blue, brown, gray, green, orange, plain, purple, red, teal, white, yellow from runez.colors.named import blink, bold, dim, invert, italic, strikethrough, underline @@ -15,16 +15,16 @@ from runez.file import basename, checksum, ensure_folder, parent_folder, readlines, TempFolder, to_path, touch, write from runez.file import compress, copy, decompress, delete, filesize, ls_dir, move, symlink from runez.logsetup import LogManager as log, ProgressBar -from runez.program import check_pid, is_executable, make_executable, PsInfo, run, shell, which -from runez.serialize import from_json, read_json, represented_json, save_json, Serializable -from runez.system import abort, abort_if, AdaptedProperty, cached_property, uncolored, Undefined, UNSET, wcswidth -from runez.system import Anchored, CaptureOutput, CurrentFolder, OverrideDryrun, Slotted, TempArgv, TrackedOutput +from runez.program import check_pid, is_executable, make_executable, run, shell, which +from runez.serialize import from_json, json_sanitized, read_json, represented_json, save_json, Serializable +from runez.system import abort, abort_if, cached_property, uncolored, Undefined, UNSET, wcswidth +from runez.system import Anchored, CaptureOutput, CurrentFolder, OverrideDryrun, TempArgv, TrackedOutput from runez.system import capped, decode, DEV, flattened, joined, quoted, resolved_path, short, stringified, SYS_INFO from runez.system import first_line, get_version, is_basetype, is_iterable, ltattr __all__ = [ # noqa: RUF022, grouped and sorted by provenance module "DRYRUN", - "ascii", "click", "config", "date", "file", "program", "serialize", "system", + "click", "date", "file", "program", "serialize", "system", "ActivateColors", "color", "black", "blue", "brown", "gray", "green", "orange", "plain", "purple", "red", "teal", "white", "yellow", "blink", "bold", "dim", "invert", "italic", "strikethrough", "underline", @@ -36,10 +36,10 @@ "basename", "checksum", "ensure_folder", "parent_folder", "readlines", "TempFolder", "to_path", "touch", "write", "compress", "copy", "decompress", "delete", "filesize", "ls_dir", "move", "symlink", "log", "ProgressBar", - "check_pid", "is_executable", "make_executable", "PsInfo", "run", "shell", "which", - "from_json", "read_json", "represented_json", "save_json", "Serializable", - "abort", "abort_if", "AdaptedProperty", "cached_property", "uncolored", "Undefined", "UNSET", "wcswidth", - "Anchored", "CaptureOutput", "CurrentFolder", "OverrideDryrun", "Slotted", "TempArgv", "TrackedOutput", + "check_pid", "is_executable", "make_executable", "run", "shell", "which", + "from_json", "json_sanitized", "read_json", "represented_json", "save_json", "Serializable", + "abort", "abort_if", "cached_property", "uncolored", "Undefined", "UNSET", "wcswidth", + "Anchored", "CaptureOutput", "CurrentFolder", "OverrideDryrun", "TempArgv", "TrackedOutput", "capped", "decode", "DEV", "flattened", "joined", "quoted", "resolved_path", "short", "stringified", "SYS_INFO", "first_line", "get_version", "is_basetype", "is_iterable", "ltattr" ] diff --git a/src/runez/__main__.py b/src/runez/__main__.py index 2f8780d..a75b86a 100644 --- a/src/runez/__main__.py +++ b/src/runez/__main__.py @@ -5,14 +5,11 @@ import contextlib import logging import os -import re import sys -import sysconfig import time import runez from runez.ascii import AsciiAnimation -from runez.inspector import ImportTime from runez.render import NAMED_BORDERS, PrettyTable @@ -61,63 +58,6 @@ def cmd_diagnostics(): print(PrettyTable.two_column_diagnostics(runez.SYS_INFO.diagnostics(), available, border=args.border)) -def cmd_import_speed(): - """Show average import time of top-level python packages installed in this venv""" - parser = runez.cli.parser() - parser.add_argument("--all", action="store_true", help="Show all.") - parser.add_argument("--border", choices=NAMED_BORDERS, default="reddit", help="Use custom border.") - parser.add_argument("--iterations", "-i", type=int, default=3, help="Number of measurements to average.") - parser.add_argument("name", nargs="*", help="Names of modules to show (by default: all).") - args = parser.parse_args() - names = runez.flattened(args.name, split=",") - if args.all: - names.extend(_interesting_top_levels()) - - if not names: - sys.exit("Please specify module names, or use --all") - - names = sorted(set(names)) - times = [] - fastest = None - slowest = None - for name in names: - t = ImportTime(name, iterations=args.iterations) - times.append(t) - if t.cumulative is None: - continue - - if fastest is None or (t.cumulative < fastest.cumulative): - fastest = t - - if slowest is None or t.cumulative > slowest.cumulative: - slowest = t - - table = PrettyTable("Module,-X cumulative,Elapsed,Vs fastest,Note", border=args.border) - table.header[3].align = "center" - mid = _get_mid(times) - for t in sorted(times): - if t.cumulative is None: - c = e = f = None - - else: - factor = t.elapsed / fastest.elapsed - c = runez.represented_duration(t.cumulative / 1000000, span=-2) - e = runez.represented_duration(t.elapsed, span=-2) - f = "x%.2f" % factor - if t is fastest: - f = "" - - elif t is slowest: - f = runez.red(f) - - elif t.elapsed and t.elapsed > mid: - f = runez.orange(f) - - table.add_row(t.module_name, c, e, f, t.problem or "") - - print(table) - - def cmd_passthrough(): """ Capture pass-through test @@ -172,10 +112,10 @@ def cmd_progress_bar(): else: runez.log.trace("At iteration %s" % i) - if args.verbose and i % 10 == 0: # pragma: no cover + if args.verbose and i % 10 == 0: print("iteration %s" % runez.bold(i)) - if i == 42: # pragma: no cover + if i == 42: runez.log.progress.show("some progress msg") # debug() and trace() messages don't appear any more after this for _ in runez.ProgressBar(range(10)): time.sleep(0.1) @@ -193,16 +133,6 @@ def cmd_progress_bar(): time.sleep(args.sleep) -def _get_mid(times): - elapsed = 0 - times = [t for t in times if t.elapsed] - if times: - times = sorted(times, key=lambda x: -x.elapsed) # Don't fail if no elapsed available - elapsed = times[int(len(times) / 2)].elapsed - - return elapsed - - def main(): runez.cli.run_cmds() @@ -210,7 +140,6 @@ def main(): def _show_fgcolors(bg=runez.plain, border=None): print("") table = PrettyTable("Color,Blink,Bold,Dim,Invert,Italic,Strikethrough,Underline", border=border) - table.header.style = "bold" for color in runez.color.fg: color_name = color.name text = color(color.name) @@ -227,24 +156,5 @@ def _show_fgcolors(bg=runez.plain, border=None): print(table) -def _interesting_top_levels(): - """ - Convenience for `-mrunez import-time --all` command - Return list of import top level names, ignoring things like top levels starting with an `_`, and other uninteresting libs - """ - uninteresting = re.compile(r"^(_|pip|pkg_resources|pydev|pytest|setuptools).*$") - result = set() - base = sysconfig.get_path("purelib") - for item in runez.ls_dir(base): - if item.name.endswith(".dist-info") and item.is_dir(): - top_levels = item / "top_level.txt" - if top_levels.exists(): - for line in runez.readlines(top_levels): - if line and not uninteresting.match(line): - result.add(line) - - return sorted(result) - - if __name__ == "__main__": main() diff --git a/src/runez/click.py b/src/runez/click.py index 75e74a3..c6970e5 100644 --- a/src/runez/click.py +++ b/src/runez/click.py @@ -14,12 +14,13 @@ def main(debug, dryrun, log, ...): import logging import os import sys +import types try: import click except ImportError: # pragma: no cover, click used only if installed - click = None + click: types.ModuleType = None # type: ignore[assignment] import runez.config from runez.colors import ColorManager @@ -126,7 +127,6 @@ def run_cmds(cls, prog=None): prog = short(caller.folder) epilog = PrettyTable(2) - epilog.header[0].style = "bold" for cmd, func in available_commands.items(): epilog.add_row(" " + cmd, first_line(func.__doc__, default="")) @@ -134,7 +134,7 @@ def run_cmds(cls, prog=None): cls._prog = prog or package parser = cls.parser(epilog=epilog, help=caller.module_docstring, prog=prog) if cls.version and package: - parser.add_argument(*cls.version, action="version", version=get_version(package), help="Show version and exit") + parser.add_argument(*cls.version, action="version", version=get_version(package) or "0.0.0", help="Show version and exit") if cls.color: parser.add_argument(*cls.color, action="store_true", help="Do not use colors (even if on tty)") diff --git a/src/runez/colors/__init__.py b/src/runez/colors/__init__.py index 959d7a8..8d22121 100644 --- a/src/runez/colors/__init__.py +++ b/src/runez/colors/__init__.py @@ -11,22 +11,31 @@ class ActivateColors: - """Context manager for temporarily overriding coloring""" + """Context manager to temporarily override coloring""" - def __init__(self, enable=True, flavor=None): + def __init__(self, enable: "bool | PlainBackend | type[PlainBackend] | str | None" = True, flavor=None): + """ + Temporarily override coloring + + Parameters + ---------- + enable: Enable colored output + flavor (str | None): Flavor to use (neutral, light or dark) + """ if enable is True and DEV.current_test(): # This allows to have easily reproducible tests (same color backend used in tests by default) enable = "testing" self.enable = enable self.flavor = flavor - self.prev = None def __enter__(self): - self.prev = ColorManager.activate_colors(self.enable, flavor=self.flavor) + self.prev = ColorManager.backend, ColorManager.bg, ColorManager.fg, ColorManager.style + ColorManager.activate_colors(self.enable, flavor=self.flavor) def __exit__(self, *_): - ColorManager.backend, ColorManager.bg, ColorManager.fg, ColorManager.style = self.prev + if self.prev is not None: + ColorManager.backend, ColorManager.bg, ColorManager.fg, ColorManager.style = self.prev class PlainBackend: @@ -61,10 +70,10 @@ def adjusted_size(text, size=0): class ColorManager: """Holds current global coloring backend and bg, fg color and style implementations""" - backend = None # type: PlainBackend - bg = None # type: NamedColors - fg = None # type: NamedColors - style = None # type: NamedStyles + backend: PlainBackend + bg: "NamedColors" + fg: "NamedColors" + style: "NamedStyles" @classmethod def cast_color(cls, name, source=None, strict=True): @@ -134,16 +143,14 @@ def is_coloring(cls): def activate_colors(cls, enable=None, flavor=None): """ Args: - enable (bool | type(PlainBackend) | None): Set colored output on or off + enable (bool | PlainBackend | type(PlainBackend) | str | None): Set colored output on or off flavor (str | None): Flavor to use (neutral, light or dark) """ if enable is None: enable = SYS_INFO.terminal.is_stdout_tty - prev = cls.backend, cls.bg, cls.fg, cls.style cls.backend = _detect_backend(enable, flavor=flavor) cls.bg, cls.fg, cls.style = cls.backend.named_triplet() - return prev @classmethod def adjusted_size(cls, text, size=0): @@ -161,13 +168,13 @@ def adjusted_size(cls, text, size=0): class Renderable: """A render-able (color or style) named object""" - def __init__(self, name): + def __init__(self, name: str): self.name = name def __repr__(self): return self.name - def __call__(self, text, size=None): + def __call__(self, text: str, size=None) -> str: """ Allows for convenient call of the form: @@ -178,7 +185,7 @@ def __call__(self, text, size=None): text = short(text, size=size) if size else stringified(text) return self.rendered(text) if text else "" - def rendered(self, text): + def rendered(self, text: str) -> str: return text @@ -206,13 +213,18 @@ def __init__(self, cls=None, params=None, **color_names): class NamedColors(NamedRenderables): """Set of registered named colors""" - __slots__ = ["black", "blue", "brown", "gray", "green", "orange", "plain", "purple", "red", "teal", "white", "yellow"] + __slots__ = ("black", "blue", "brown", "gray", "green", "orange", "plain", "purple", "red", "teal", "white", "yellow") class NamedStyles(NamedRenderables): """Set of registered named styles""" - __slots__ = ["blink", "bold", "dim", "invert", "italic", "strikethrough", "underline"] + __slots__ = ("blink", "bold", "dim", "invert", "italic", "strikethrough", "underline") + + +# Initialize ColorManager with plain defaults (overridden by activate_colors() at import time) +ColorManager.backend = PlainBackend() +ColorManager.bg, ColorManager.fg, ColorManager.style = ColorManager.backend.named_triplet() def _detect_backend(enable, flavor=None): diff --git a/src/runez/colors/terminal.py b/src/runez/colors/terminal.py index 9202347..f93196f 100644 --- a/src/runez/colors/terminal.py +++ b/src/runez/colors/terminal.py @@ -68,13 +68,13 @@ def truecolor(cls, offset, r, g, b, brighten): class AnsiColor(Renderable): """Defines a color, with associated tty codes, plus a `name` that can potentially be used by other kinds of backends""" - def __init__(self, name, rgb, ansi=None, flavor=None): + def __init__(self, name: str, rgb: int, ansi: str, flavor: str): """ Args: - name (str): Color name (example: blue) - rgb (int | None): RGB value (example: 0x0000ff) - ansi (str): Ansi codeset to use (ansi16, ansi256 or truecolor) - flavor (str): Flavor to use (neutral, light or dark) + name: Color name (example: blue) + rgb: RGB value (example: 0x0000ff) + ansi: Ansi codeset to use (ansi16, ansi256 or truecolor) + flavor: Flavor to use (neutral, light or dark) """ super().__init__(name) self.rgb = rgb diff --git a/src/runez/config.py b/src/runez/config.py index c9ddfe8..24c4d6d 100644 --- a/src/runez/config.py +++ b/src/runez/config.py @@ -4,6 +4,7 @@ Usage example: import runez + import runez.config # One time initialization, call this from your main() @runez.click.command() @@ -25,7 +26,7 @@ def main(): from runez.file import readlines from runez.logsetup import LogManager from runez.serialize import from_json -from runez.system import capped, decode, stringified, SYS_INFO +from runez.system import capped, stringified, SYS_INFO class Configuration: @@ -358,11 +359,11 @@ class ConfigProvider: def __repr__(self): return self.provider_id() - def __len__(self): + def __len__(self) -> int: return 0 @property - def values(self): + def values(self) -> dict | None: """dict: values in this provider""" return None @@ -374,7 +375,7 @@ def provider_id(self): """Id of this provider (there can only be one active at a time)""" return self.__class__.__name__.replace("Provider", "").lower() - def get(self, key): + def get(self, key) -> str | None: """ Args: key (str): Key to lookup @@ -426,7 +427,7 @@ def get(self, key): try: path = os.path.join(self.folder, key) with open(path) as fh: - return decode(fh.read()) + return fh.read() except (OSError, IOError): return None diff --git a/src/runez/conftest.py b/src/runez/conftest.py index ca4a28d..f150eed 100644 --- a/src/runez/conftest.py +++ b/src/runez/conftest.py @@ -61,7 +61,7 @@ def __init__(self, adjust_tmp=True): self.temp_folder = None self.abort_exception = None self.old_cwd = None - self.old_env_vars = None + self.old_env_vars: dict[str, str] = {} # populated in __enter__ def __enter__(self): WrappedHandler.isolation += 1 @@ -115,13 +115,13 @@ def cli(): Example usage: - from runez.conftest import cli + from runez.conftest import ClickRunner from my_cli import main - cli.default_main = main # Handy if you have only one main + ClickRunner.default_main = main # Handy if you have only one main def test_help(cli): - cli.main = main # Not needed if `cli.default_main` was set + cli.main = main # Not needed if `ClickRunner.default_main` was set cli.run("--help") assert cli.succeeded assert cli.match("Usage:") @@ -129,15 +129,17 @@ def test_help(cli): # or more specifically assert "Usage:" in cli.logged.stdout """ - with cli.context() as context: - yield ClickRunner(context=context) - + wrapper = ClickRunner.context_wrapper + deprecated_context = getattr(cli, "context", None) + if deprecated_context is not None: # pragma: no cover, for temporary backwards compat + import warnings -# Comes in handy for click apps with only one main entry point -cli.default_main = None + msg = "Please use `ClickRunner.context_wrapper = ...` instead of `cli.context = ...`" + warnings.warn(msg, DeprecationWarning, stacklevel=1) + wrapper = deprecated_context -# Can be customized by users, wraps cli (fixture) runs in given context -cli.context = TempFolder + with wrapper() as context: + yield ClickRunner(context=context) @pytest.fixture @@ -197,26 +199,29 @@ def clean_accumulated_logs(cls): class ClickWrapper: """Wrap click invoke, when click is available, otherwise just call provided function""" - def __init__(self, stdout, stderr, exit_code, exception): - self.stdout = stdout - self.stderr = stderr - self.exit_code = exit_code - self.exception = exception + def __init__(self, stdout=None, stderr=None, exit_code=None, exception=None): + self.stdout: str | None = stdout + self.stderr: str | None = stderr + self.exit_code: int | None = exit_code + self.exception: BaseException | None = exception class ClickRunner: """Allows to provide a test-friendly fixture around testing click entry-points""" - args: list = None # Arguments used in last run() - exit_code: int = None # Exit code of last run() + default_main = None # Class-level default main entry point, set from user's conftest.py + context_wrapper = TempFolder # Class-level context manager to use for cli fixture runs + + args: list | None = None # Arguments used in last run() + exit_code: int | None = None # Exit code of last run() logged: TrackedOutput # Captured log from last run() - main: callable = None # Optional, override default_main for this runner instance - trace: bool = None # Optional, enable trace logging for this runner instance + main = None # Optional, override default_main for this runner instance + trace: bool | None = None # Optional, enable trace logging for this runner instance def __init__(self, context=None): """ Args: - context (callable | None): Context (example: temp folder) this click run was invoked under + context: Active context instance (example: temp folder) this click run was invoked under """ self.context = context @@ -268,7 +273,16 @@ def run(self, *args, exe=None, main=UNSET, trace=UNSET): main (callable | None): Optional, override current self.main just for this run trace (bool): If True, enable trace logging """ - main = _R.rdefault(main, self.main or cli.default_main) + main = _R.rdefault(main, self.main or ClickRunner.default_main) + if not main: # pragma: no cover, for temporary backwards compat + # Backwards compatibility: grab deprecated `cli.default_main` if present + main = getattr(cli, "default_main", None) + if main: + import warnings + + msg = "Please use `ClickRunner.default_main = ...` instead of `cli.default_main = ...`" + warnings.warn(msg, DeprecationWarning, stacklevel=2) + if len(args) == 1 and hasattr(args[0], "split"): # Convenience: allow to provide full command as one string argument args = args[0].split() @@ -284,10 +298,10 @@ def run(self, *args, exe=None, main=UNSET, trace=UNSET): if isinstance(result.exception, AssertionError): raise result.exception - if result.stdout: + if result.stdout and logged.stdout is not None: logged.stdout.buffer.write(result.stdout) - if result.stderr: + if result.stderr and logged.stderr is not None: logged.stderr.buffer.write(result.stderr) if result.exception and not isinstance(result.exception, SystemExit): @@ -403,19 +417,19 @@ def _resolved_script(self, script): return path def _run_main(self, main, args): - if _ClickCommand is not None and isinstance(main, _ClickCommand): + if _ClickCommand is not None and isinstance(main, _ClickCommand) and _CliRunner is not None: if "LANG" not in os.environ: # Avoid click complaining about unicode for tests that mock env vars - os.environ["LANG"] = "en_US.UTF-8" + os.environ.setdefault("LANG", "en_US.UTF-8") runner = _CliRunner() r = runner.invoke(main, args=args) - return ClickWrapper(r.output, None, r.exit_code, r.exception) + return ClickWrapper(stdout=r.output, exit_code=r.exit_code, exception=r.exception) if callable(main): - result = ClickWrapper(None, None, None, None) + result = ClickWrapper() try: - result.stdout = main() + result.stdout = stringified(main()) result.exit_code = 0 except AssertionError: @@ -440,11 +454,11 @@ def _run_main(self, main, args): script = self._resolved_script(main) assert script, "Can't find script '%s', invalid main" % main r = runez.run(sys.executable, script, *args, fatal=False) - return ClickWrapper(r.output, r.error, r.exit_code, r.exc_info) + return ClickWrapper(stdout=r.output, stderr=r.error, exit_code=r.exit_code, exception=r.exc_info) class RunSpec(Slotted): - __slots__ = ["regex", "stderr", "stdout"] + __slots__ = ("regex", "stderr", "stdout") def _get_defaults(self): return UNSET diff --git a/src/runez/convert.py b/src/runez/convert.py index ccb705a..b2466db 100644 --- a/src/runez/convert.py +++ b/src/runez/convert.py @@ -313,7 +313,7 @@ def wordified(text, delimiter="_", normalize=None): return delimiter.join(words(text, normalize=normalize)) -def words(text, normalize=None, split="_", decamel=False): +def words(text, normalize=None, split: str | None = "_", decamel=False): """Words extracted from `text` (split on underscore character as well by default) Args: @@ -408,7 +408,7 @@ def __init__(self, name, start=0, end=0): self.name = name self.start = start self.end = end - self.next = None + self.next: _TabularInterval | None = None def __repr__(self): # pragma: no cover, for debugging return "%s [%s:%s]" % (self.name, self.start or "", self.end or "") diff --git a/src/runez/date.py b/src/runez/date.py index 845167f..64a4820 100644 --- a/src/runez/date.py +++ b/src/runez/date.py @@ -53,8 +53,7 @@ def __repr__(self): return self.name def __eq__(self, other): - if isinstance(other, datetime.tzinfo): - return self.offset == other.utcoffset(datetime.datetime.now(tz=UTC)) + return isinstance(other, datetime.tzinfo) and self.offset == other.utcoffset(datetime.datetime.now(tz=UTC)) def utcoffset(self, *_): return self.offset @@ -426,8 +425,9 @@ def _date_from_text(text, epocher, tz=UNSET): if m: tz = UTC if tz is UNSET else tz offset = to_seconds(text) - now = datetime.datetime.now(tz=tz) - return to_datetime(to_epoch(now) - offset, tz=tz) + epoch = to_epoch(datetime.datetime.now(tz=tz)) + if epoch is not None and offset is not None: + return to_datetime(epoch - offset, tz=tz) return None diff --git a/src/runez/file.py b/src/runez/file.py index 39d7f29..0ec180e 100644 --- a/src/runez/file.py +++ b/src/runez/file.py @@ -5,8 +5,9 @@ import tempfile import time from pathlib import Path +from typing import overload -from runez.system import _R, abort, Anchored, decode, flattened, resolved_path, short, SYMBOLIC_TMP, SYS_INFO, UNSET +from runez.system import _R, abort, Anchored, flattened, resolved_path, short, SYMBOLIC_TMP, SYS_INFO, UNSET def basename(path, extension_marker=os.extsep, follow=False): @@ -33,29 +34,24 @@ def basename(path, extension_marker=os.extsep, follow=False): return path -def checksum(path, hash=hashlib.sha256, blocksize=65536): +def checksum(path, hash=hashlib.sha256, blocksize=65536) -> str: """ Args: path (str | Path | None): Path to file - hash: Hash algorithm to use - blocksize (int): + hash (callable): Hash algorithm to use (eg hashlib.sha256) + blocksize (int): Read block size Returns: (str): Hex-digest """ - if isinstance(hash, str): - hash = getattr(hashlib, hash) - - if callable(hash): - hash = hash() - + h = hash() with open(path, "rb") as fh: buf = fh.read(blocksize) while len(buf) > 0: - hash.update(buf) + h.update(buf) buf = fh.read(blocksize) - return hash.hexdigest() + return h.hexdigest() def copy(source, destination, ignore=None, overwrite=True, fatal=True, logger=UNSET, dryrun=UNSET): @@ -288,9 +284,9 @@ def readlines(path, first=None, errors="ignore", fatal=False, logger=False, tran if first == 0: return - line = decode(line) if transform: line = transform(line) + yield line first -= 1 @@ -302,10 +298,18 @@ def readlines(path, first=None, errors="ignore", fatal=False, logger=False, tran _R.hlog(logger, message, exc_info=e) +@overload +def to_path(path: "str | Path", no_spaces=False) -> Path: ... + + +@overload +def to_path(path: None, no_spaces=False) -> None: ... + + def to_path(path, no_spaces=False): """ Args: - path (str | Path): Path to convert + path (str | Path | None): Path to convert no_spaces (type | bool | None): If True-ish, abort if 'path' contains a space Returns: @@ -689,9 +693,7 @@ def _file_op(source, destination, func, overwrite, fatal, logger, dryrun, must_e if ignore is not None: if not callable(ignore): given = ignore - - def ignore(*_): - return given + ignore = lambda *_: given # noqa: E731 extra["ignore"] = ignore diff --git a/src/runez/heartbeat.py b/src/runez/heartbeat.py index d67e90b..0ed7a8a 100644 --- a/src/runez/heartbeat.py +++ b/src/runez/heartbeat.py @@ -74,7 +74,7 @@ class Heartbeat: _lock = threading.Lock() _thread = None # Background daemon thread used to periodically execute the tasks - _last_execution = None # Epoch when last task execution completed + _last_execution: float = 0 # Epoch when last task execution completed _sleep_delay = 1 # How many seconds we're currently sleeping until next task @classmethod diff --git a/src/runez/http.py b/src/runez/http.py index fd6c5da..a2ee7cf 100644 --- a/src/runez/http.py +++ b/src/runez/http.py @@ -22,8 +22,10 @@ response = MY_CLIENT.get("api/v1/....", fatal=False, dryrun=False) """ +import abc import contextlib import functools +import hashlib import json import os import re @@ -36,7 +38,7 @@ from runez.system import _R, abort, DEV, find_caller, joined, short, stringified, SYS_INFO, UNSET -def urljoin(base, url): +def urljoin(base, url) -> str: """Join base + url, considering that `base` is intended to be the base url of a REST end point""" if not base: return url @@ -177,16 +179,16 @@ class GlobalHttpCalls: _original_urlopen = None - def __init__(self, allowed): + def __init__(self, allowed: bool): """We're used as a context manager""" - self.allowed = allowed - self.was_allowed = None + self.is_allowed = allowed + self.was_allowed = True def __repr__(self): - return "allowed" if self.allowed else "forbidden" + return "allowed" if self.is_allowed else "forbidden" def __enter__(self): - self.was_allowed = self.allow(self.allowed) + self.was_allowed = self.allow(self.is_allowed) return self def __exit__(self, *_): @@ -198,7 +200,7 @@ def intentionally_disabled(*_, **kwargs): raise ForbiddenHttpError(kwargs.get("url")) @classmethod - def is_forbidden(cls): + def is_forbidden(cls) -> bool: """Are outgoing http calls currently allowed?""" return cls._original_urlopen is not None @@ -225,7 +227,7 @@ def inner(*args, **kwargs): return inner @classmethod - def allow(cls, allowed=True): + def allow(cls, allowed=True) -> bool: """Allow outgoing http(s) calls""" from urllib3.connectionpool import HTTPConnectionPool @@ -242,7 +244,7 @@ def allow(cls, allowed=True): return was_allowed @classmethod - def forbid(cls): + def forbid(cls) -> bool: """Forbid outgoing http(s) calls""" return cls.allow(False) @@ -327,8 +329,8 @@ def text(self): class MockedHandlerStack: def __init__(self): - self.handler = None - self.ms = None + self.handler: type[RestHandler] | None = None + self.ms: tuple[type, str] | None = None self.specs = {} self.spec_stack = [] self.original_send_function = None @@ -339,18 +341,20 @@ def __repr__(self): nested = " [depth: %s]" % len(self.spec_stack) if self.spec_stack else "" return "%s mock %s, %s specs%s" % (name, status, len(self.specs), nested) - def register_handler(self, handler): - if self.handler: + def register_handler(self, handler: type["RestHandler"]) -> None: + if self.handler is not None: assert self.handler is handler, "Mocks targeting multiple handlers is not supported" return - self.ms = handler.intercept(None) + self.ms = handler.ms_adapter() self.handler = handler - def _intercept(self, *args, **kwargs): + def _intercept(self, *args, **kwargs) -> object: + assert self.handler is not None return self.handler.intercept(self, *args, **kwargs) def start(self, specs): + assert self.ms is not None if not self.spec_stack: assert self.original_send_function is None target, name = self.ms @@ -361,6 +365,7 @@ def start(self, specs): self.specs.update(specs) def stop(self): + assert self.ms is not None self.specs = self.spec_stack.pop() if not self.spec_stack: assert self.original_send_function is not None @@ -417,7 +422,7 @@ def __init__(self, handler, base_url, specs): self.handler = handler self.specs = specs or {} self.key = None - self.stack = None + self.stack: MockedHandlerStack | None = None if base_url: assert isinstance(base_url, str) assert isinstance(self.specs, dict) @@ -432,8 +437,9 @@ def start(self): self.stack.start(self.specs) def stop(self): - self.stack.stop() - self.stack = None + if self.stack is not None: + self.stack.stop() + self.stack = None def __call__(self, func): """ @@ -479,7 +485,7 @@ def __init__(self, method, url, raw_response): self.method = method self.url = url self.raw_response = raw_response - self.status_code = raw_response.status_code + self.status_code = int(getattr(raw_response, "status_code", 0)) def __repr__(self): return "" % self.status_code @@ -577,11 +583,13 @@ def test_foo(): return MockWrapper(cls, base_url, specs) @classmethod - def new_session(cls, **session_spec): - """New session for 'session_spec'""" + @abc.abstractmethod + def new_session(cls, *args, **kwargs) -> object: + """New session""" @classmethod - def raw_response(cls, session, method, url, **passed_through): + @abc.abstractmethod + def raw_response(cls, session, method, url, **passed_through) -> object: """ Args: session: Session as obtained via new_session() call from this handler @@ -591,7 +599,8 @@ def raw_response(cls, session, method, url, **passed_through): """ @classmethod - def to_rest_response(cls, method, url, raw_response): + @abc.abstractmethod + def to_rest_response(cls, method, url, raw_response) -> "RestResponse": """ Args: method (str): Underlying method to call (GET, PUT, POST, etc) @@ -603,14 +612,20 @@ def to_rest_response(cls, method, url, raw_response): """ @classmethod - def intercept(cls, mock_caller, *_, **__): + @abc.abstractmethod + def ms_adapter(cls) -> tuple[type, str]: + """A tuple of (adapter_class, send_function_name) used to intercept outgoing requests for mocking""" + + @classmethod + @abc.abstractmethod + def intercept(cls, mock_caller, *args, **kwargs) -> object: """ Args: - mock_caller (MockedHandlerStack | None): If provided: effectively intercept, if None: return target+name of function to replace + mock_caller (MockedHandlerStack): Mock caller to use for intercepting requests """ @classmethod - def user_agent(cls): + def user_agent(cls) -> str: """Default user agent to use""" return "%s/%s (%s)" % (SYS_INFO.program_name, SYS_INFO.program_version, SYS_INFO.platform_info) @@ -665,12 +680,14 @@ def to_rest_response(cls, method, url, raw_response): return RestResponse(method, url, raw_response) @classmethod - def intercept(cls, mock_caller, *args, **__): - if mock_caller is None: - from requests.adapters import HTTPAdapter + def ms_adapter(cls): + """A tuple of which class to use to create an adapter and name of `send` function""" + from requests.adapters import HTTPAdapter - return HTTPAdapter, "send" + return HTTPAdapter, "send" + @classmethod + def intercept(cls, mock_caller, *args, **__): from requests import Response request = args[0] @@ -734,7 +751,7 @@ def sub_client(self, relative_url): url, headers=self.headers, timeout=self.timeout, user_agent=self.user_agent, handler=self.handler, session=self.session ) - def full_url(self, url): + def full_url(self, url) -> str: """ Args: url (str): Relative URL @@ -744,7 +761,7 @@ def full_url(self, url): """ return urljoin(self.base_url, url) - def decompress(self, url, destination, simplify=False, fatal=True, logger=UNSET, dryrun=UNSET, **kwargs): + def decompress(self, url, destination, simplify=False, fatal=True, logger=UNSET, dryrun=UNSET, **kwargs) -> RestResponse: """ Args: url (str): URL of .tar.gz to unpack (may be absolute, or relative to self.base_url) @@ -768,7 +785,7 @@ def decompress(self, url, destination, simplify=False, fatal=True, logger=UNSET, return response - def download(self, url, destination, fatal=True, logger=UNSET, dryrun=UNSET, **kwargs): + def download(self, url, destination, fatal=True, logger=UNSET, dryrun=UNSET, **kwargs) -> RestResponse: """ Args: url (str): URL of resource to download (may be absolute, or relative to self.base_url) @@ -790,16 +807,17 @@ def download(self, url, destination, fatal=True, logger=UNSET, dryrun=UNSET, **k with open(destination, "wb") as fh: fh.write(response.content) - if hash_checksum: - downloaded_checksum = checksum(destination, hash=hash_algo) + if hash_checksum and hash_algo and hasattr(hashlib, hash_algo): + downloaded_checksum = checksum(destination, hash=getattr(hashlib, hash_algo)) if downloaded_checksum != hash_checksum: delete(destination, fatal=False, logger=logger) msg = "%s differs for %s: expecting %s, got %s" % (hash_algo, short(destination), hash_checksum, downloaded_checksum) - return abort(msg, fatal=fatal, logger=logger) + response.status_code = 400 + return abort(msg, fatal=fatal, return_value=response, logger=logger) return response - def get_response(self, url, fatal=False, logger=False, **kwargs): + def get_response(self, url, fatal=False, logger=False, **kwargs) -> RestResponse: """ Args: url (str): Remote URL (may be absolute, or relative to self.base_url) @@ -812,7 +830,7 @@ def get_response(self, url, fatal=False, logger=False, **kwargs): """ return self._get_response("GET", url, fatal, logger, **kwargs) - def delete(self, url, fatal=True, logger=UNSET, dryrun=UNSET, **kwargs): + def delete(self, url, fatal=True, logger=UNSET, dryrun=UNSET, **kwargs) -> RestResponse: """Same as underlying .delete(), but respecting 'dryrun' mode Args: @@ -962,7 +980,7 @@ def std_diskcache(directory=UNSET, default_expire=UNSET, size_limit=UNSET): (CacheWrapper): Object wrapping this cache """ with contextlib.suppress(ImportError): - from diskcache import Cache + from diskcache import Cache # type: ignore[import-not-found] # optional dependency if directory is UNSET: directory = None @@ -1003,7 +1021,7 @@ def _decomposed_checksum_url(cls, url): return None, None, url - def _get_response(self, method, url, fatal, logger, dryrun=False, state=None, action=None, **kwargs): + def _get_response(self, method, url, fatal, logger, dryrun=False, state=None, action=None, **kwargs) -> RestResponse: """ Args: method (str): Underlying method to call @@ -1057,7 +1075,7 @@ def _get_response(self, method, url, fatal, logger, dryrun=False, state=None, ac _R.hlog(logger, msg) - if cache_key is not None and response.ok and self.cache_wrapper.is_cachable_method(method): + if cache_key is not None and self.cache_wrapper is not None and response.ok and self.cache_wrapper.is_cachable_method(method): self.cache_wrapper.set(cache_key, response, expire=cache_expire) return response diff --git a/src/runez/inspector.py b/src/runez/inspector.py index d1f07fd..c3c38fa 100644 --- a/src/runez/inspector.py +++ b/src/runez/inspector.py @@ -7,13 +7,8 @@ """ import importlib -import sys -import time -from functools import wraps -from runez.convert import to_int -from runez.program import run -from runez.system import abort_if, find_caller, py_mimic, SYS_INFO +from runez.system import find_caller def auto_import_siblings(skip=None, caller=None): @@ -67,90 +62,6 @@ def foo(...): return imported -class AutoInstall: - """ - Decorator to trigger just-in-time pip installation of a requirement (if/when needed), example usage: - - from runez.inspector import AutoInstall - - @AutoInstall("requests") - def fetch(url): - import requests - ... - """ - - def __init__(self, top_level, package_name=None): - """Decorator creation""" - self.top_level = top_level - self.package_name = package_name or top_level - - def ensure_installed(self): - """Ensure that self.top_level is installed (install it if need be)""" - try: - importlib.import_module(self.top_level) - - except ImportError: - abort_if(not SYS_INFO.venv_bin_folder, "Can't auto-install '%s' outside of a virtual environment" % self.package_name) - r = run(sys.executable, "-mpip", "install", self.package_name, fatal=False, dryrun=False) - abort_if(r.failed, "Can't auto-install '%s': %s" % (self.package_name, r.full_output)) - - def __call__(self, target): - """Decorator invoked with decorated function 'target'""" - - @wraps(target) - def inner(*args, **kwargs): - self.ensure_installed() - return target(*args, **kwargs) - - py_mimic(target, inner) - return inner - - -class ImportTime: - """Measure average import time of a given top-level package""" - - def __init__(self, module_name, iterations=3): - self.module_name = module_name - self.elapsed = None - self.cumulative = None - self.problem = None - cumulative = 0 - started = time.time() - for _ in range(iterations): - c = self._get_importtime() - if not c: - return - - cumulative += c - - self.elapsed = (time.time() - started) / iterations - self.cumulative = cumulative / iterations - - def __repr__(self): - return "%s %.3g" % (self.module_name, self.elapsed or 0) - - def __lt__(self, other): - if isinstance(other, ImportTime) and self.cumulative and other.cumulative: - return self.cumulative < other.cumulative - - def _get_importtime(self): - result = run(sys.executable, "-Ximporttime", "-c", "import %s" % self.module_name, fatal=None) - if result.failed: - lines = result.error.splitlines() - self.problem = lines[-1] if lines else "-Ximporttime failed" - return None - - cumulative = 0 - for line in result.error.splitlines(): # python -Ximporttime outputs to stderr - items = line.split("|") - if items and len(items) > 1: - c = to_int(items[1]) - if c: - cumulative = max(cumulative, c) - - return cumulative - - def _should_auto_import(module_name, skip): """ Args: diff --git a/src/runez/logsetup.py b/src/runez/logsetup.py index e1523d3..ade693d 100644 --- a/src/runez/logsetup.py +++ b/src/runez/logsetup.py @@ -10,17 +10,17 @@ import threading import time from logging.handlers import RotatingFileHandler, TimedRotatingFileHandler -from typing import List, Optional +from typing import Callable, List, Optional from runez.ascii import AsciiAnimation from runez.convert import to_bytesize, to_int from runez.date import local_timezone, represented_duration from runez.file import parent_folder from runez.system import ( + _PyMimicked, _R, abort_if, cached_property, - decode, DEV, find_caller, flattened, @@ -68,20 +68,10 @@ def formatted(message, *args, **named_values): class ProgressHandler(logging.Handler): """Used to capture logging chatter and show it as progress""" - level = logging.DEBUG - - @classmethod - def handle(cls, record): + def handle(self, record: logging.LogRecord) -> bool: """Intercept all log chatter and show it as progress message""" LogManager.progress._show_debug(record.getMessage()) - - @classmethod - def emit(cls, record): - """Not needed""" - - @classmethod - def createLock(cls): - """Not needed""" + return True class ProgressBar: @@ -94,12 +84,12 @@ def __init__(self, iterable=None, total=None, columns=8, frames=" ▏▎▍▌ self.frame_count = len(frames) - 2 self.per_frame = self.per_char / self.frame_count if self.frame_count > 0 else None self.iterable = iterable - self.parent = None # type: Optional[ProgressBar] # Parent progress bar, if any - if total is None and hasattr(iterable, "__len__"): + self.parent: ProgressBar | None = None # Parent progress bar, if any + if not total and iterable and hasattr(iterable, "__len__"): total = len(iterable) - self.total = total - self.n = None + self.total = total or 1 + self.n = 0 def __repr__(self): return "%s/%s" % (self.n, self.total) @@ -122,26 +112,21 @@ def __exit__(self, *_): def start(self): """Start tracking progress with this progressbar""" - if self.n is None: - self.n = 0 - LogManager.progress._add_progress_bar(self) + self.n = 0 + LogManager.progress._add_progress_bar(self) def stop(self): """Stop / cleanup this progressbar""" - if self.n is not None: - self.n = None - LogManager.progress._remove_progress_bar(self) + self.n = 0 + LogManager.progress._remove_progress_bar(self) def update(self, n=1): """Manually update the progress bar, advance progress by 'n'""" self.n += n - def rendered(self): + def rendered(self) -> str: """Called in spinner thread (lock already acquired)""" - if self.n is None or not self.total: - return None - - percent = max(round(100.0 * self.n / self.total), 0) + percent = max(0, round(100.0 * self.n / max(1, self.total))) blanks = 0 if percent >= 100: percent = 100 @@ -169,11 +154,11 @@ def _remove_parent(self, parent): class _SpinnerComponent: def __init__(self, fps, source, color, adapter=None): self.adapter = adapter - self.source = source # type: callable + self.source = source self.color = color self.update_delay = 1.0 / fps self.next_update = 0 - self.current_text = None # type: Optional[str] + self.current_text: str | None = None def add_text(self, line, columns): """(int): size of text added to 'line' (lock already acquired)""" @@ -253,13 +238,13 @@ def __init__(self): self._current_line = None self._fps = 60.0 # Higher fps for _run(), to reduce flickering as much as possible self._has_progress_line = False - self._msg_show = None # type: Optional[str] # Message coming from show() calls - self._msg_debug = None # type: Optional[str] # Message coming from trace() or debug() calls - self._progress_bar = None # type: Optional[ProgressBar] - self._state = None # type: Optional[_SpinnerState] - self._stderr_write = None - self._stdout_write = None - self._thread = None # type: Optional[threading.Thread] # Background daemon thread used to display progress + self._msg_show: str | None = None # Message coming from show() calls + self._msg_debug: str | None = None # Message coming from trace() or debug() calls + self._progress_bar: ProgressBar | None = None + self._state: _SpinnerState | None = None + self._stderr_write: Callable[[str], int] | None = None + self._stdout_write: Callable[[str], int] | None = None + self._thread: threading.Thread | None = None # Background daemon thread used to display progress def show(self, message): """ @@ -303,6 +288,7 @@ def stop(self): return self._thread = None + self._state = None attempts = 10 while attempts > 0: @@ -320,10 +306,10 @@ def stop(self): self._clear_line() self._has_progress_line = False - if sys.stdout.write == self._on_stdout: + if self._stdout_write is not None and sys.stdout.write == self._on_stdout: sys.stdout.write = self._stdout_write - if sys.stderr.write == self._on_stderr: + if self._stderr_write is not None and sys.stderr.write == self._on_stderr: sys.stderr.write = self._stderr_write self._stderr_write = None @@ -340,10 +326,7 @@ def _show_debug(self, message): def _get_message(self): """Called in spinner thread (lock already acquired)""" - if self._msg_show is not None: - return self._msg_show - - return self._msg_debug + return self._msg_show or self._msg_debug def _get_progress(self): """Called in spinner thread (lock already acquired)""" @@ -372,27 +355,25 @@ def _original_write(stream): if SYS_INFO.terminal.isatty(stream): return getattr(stream, "write", None) - def _clean_write(self, write, message): + def _clean_write(self, write, message: str) -> int: """Output 'message' using 'write' function, ensure any pending progress line is cleared first""" - if message: - message = decode(message) - with self._lock: - if self._has_progress_line: - self._clear_line() - self._has_progress_line = False + with self._lock: + if self._has_progress_line: + self._clear_line() + self._has_progress_line = False - if self._has_progress_line is False and message.endswith("\n"): - self._has_progress_line = None + if self._has_progress_line is False and message.endswith("\n"): + self._has_progress_line = None - write(message) + return write(message) - def _on_stdout(self, message): + def _on_stdout(self, s: str) -> int: """Intercepted print() or sys.stdout.write()""" - self._clean_write(self._stdout_write, message) + return self._clean_write(self._stdout_write, s) - def _on_stderr(self, message): + def _on_stderr(self, s: str) -> int: """Intercepted sys.stderr.write()""" - self._clean_write(self._stderr_write, message) + return self._clean_write(self._stderr_write, s) def _clear_line(self): """Called in spinner thread (lock already acquired)""" @@ -400,29 +381,32 @@ def _clear_line(self): def _write(self, text): """Called in any thread (lock already acquired)""" - self._stderr_write(text) + if self._stderr_write is not None: + self._stderr_write(text) def _run(self): """Background thread handling progress reporting and animation""" try: - sleep_delay = 1 / self._fps - frequency = int(self._fps / self._state.max_fps) - 1 - countdown = 0 - line = None - while self._thread: - time.sleep(sleep_delay) - countdown -= 1 - if countdown < 0 or self._has_progress_line is None: - with self._lock: - if countdown < 0: - countdown = frequency - line = self._state.get_line(time.time()) - - if line: - self._clear_line() - self._write(line) - self._write("\r") - self._has_progress_line = True + state = self._state + if state is not None: + sleep_delay = 1 / self._fps + frequency = int(self._fps / state.max_fps) - 1 + countdown = 0 + line = None + while self._thread: + time.sleep(sleep_delay) + countdown -= 1 + if countdown < 0 or self._has_progress_line is None: + with self._lock: + if countdown < 0: + countdown = frequency + line = state.get_line(time.time()) + + if line: + self._clear_line() + self._write(line) + self._write("\r") + self._has_progress_line = True finally: self.is_running = False @@ -453,7 +437,7 @@ class LoggingSnapshot(Slotted): Take a snapshot of parts we're modifying in the 'logging' module, in order to be able to restore it as it was """ - __slots__ = ["_srcfile", "critical", "debug", "error", "exception", "fatal", "info", "warning"] + __slots__ = ("_srcfile", "critical", "debug", "error", "exception", "fatal", "info", "warning") def _seed(self): """Seed initial fields""" @@ -478,7 +462,7 @@ class LogSpec(Slotted): """ # See setup()'s docstring for meaning of each field - __slots__ = [ + __slots__ = ( "appname", "basename", "console_format", @@ -496,7 +480,7 @@ class LogSpec(Slotted): "rotate_count", "timezone", "tmp", - ] + ) @property def argv(self): @@ -615,10 +599,11 @@ def filter(self, record): class Timeit: """Measure how long a decorated function, or context, took took to run""" + function_name: str | None = None + def __init__(self, function=None, color="bold", logger=UNSET, fmt="{function} took {elapsed}"): self.__func__ = None - self.function_name = None - self.start_time = None + self.start_time: float = 0 self.color = color self.logger = logger self.fmt = fmt @@ -627,7 +612,7 @@ def __init__(self, function=None, color="bold", logger=UNSET, fmt="{function} to self.__func__ = function self.function_name = "%s()" % function.__qualname__ - else: + elif isinstance(function, str): self.function_name = function def __get__(self, instance, owner): @@ -737,6 +722,7 @@ class LogManager: _lock = threading.RLock() _logging_snapshot = LoggingSnapshot() + _progress_handler: Optional[ProgressHandler] = None @classmethod def set_debug(cls, debug): @@ -905,7 +891,7 @@ def greet(cls, greetings): def clean_handlers(cls): """Remove all non-runez logging handlers""" for h in list(logging.root.handlers): - if h is not cls.console_handler and h is not cls.file_handler and h is not ProgressHandler: + if h is not cls.console_handler and h is not cls.file_handler and h is not cls._progress_handler: logging.root.removeHandler(h) @classmethod @@ -1064,12 +1050,15 @@ def _props(cls): @classmethod def _auto_enable_progress_handler(cls): + if cls._progress_handler is None: + cls._progress_handler = ProgressHandler(level=logging.DEBUG) + if cls.progress.is_running: - if ProgressHandler not in logging.root.handlers: - logging.root.handlers.append(ProgressHandler) + if cls._progress_handler not in logging.root.handlers: + logging.root.handlers.append(cls._progress_handler) - elif ProgressHandler in logging.root.handlers: - logging.root.handlers.remove(ProgressHandler) + elif cls._progress_handler in logging.root.handlers: + logging.root.handlers.remove(cls._progress_handler) @classmethod def _update_used_formats(cls): @@ -1087,7 +1076,7 @@ def _setup_console_handler(cls): target = cls.spec.console_stream existing = cls.console_handler if existing is None or _get_fmt(existing) != fmt or existing.level != level or existing.stream != target: - if existing is not None: + if existing is not None and cls.handlers is not None: cls.handlers.remove(existing) logging.root.removeHandler(existing) @@ -1101,7 +1090,7 @@ def _setup_file_handler(cls): target = cls.spec.usable_location() existing = cls.file_handler if existing is None or _get_fmt(existing) != fmt or existing.level != level or existing.baseFilename != target: - if existing is not None: + if existing is not None and cls.handlers is not None: cls.handlers.remove(existing) logging.root.removeHandler(existing) @@ -1117,7 +1106,9 @@ def _add_handler(cls, new_handler, fmt, level): new_handler.setLevel(level) logging.root.addHandler(new_handler) - cls.handlers.append(new_handler) + if cls.handlers is not None: + cls.handlers.append(new_handler) + return new_handler @classmethod @@ -1159,12 +1150,14 @@ def _fix_logging_shortcuts(cls): """ if cls.is_using_format("%(context)"): cls.context.enable(True) - for handler in cls.handlers: - handler.addFilter(cls.context.filter) + if cls.context.filter is not None and cls.handlers is not None: + for handler in cls.handlers: + handler.addFilter(cls.context.filter) else: - for handler in cls.handlers: - handler.removeFilter(cls.context.filter) + if cls.context.filter is not None and cls.handlers is not None: + for handler in cls.handlers: + handler.removeFilter(cls.context.filter) cls.context.enable(False) @@ -1189,7 +1182,7 @@ def _fix_logging_shortcuts(cls): logging.log = _LogWrap.log -class _LogWrap: +class _LogWrap(_PyMimicked): """Allows to correctly report caller file/function/line from convenience calls such as logging.info()""" def __init__(self, level, exc_info=None): @@ -1199,12 +1192,11 @@ def __init__(self, level, exc_info=None): @staticmethod def log(level, msg, *args, **kwargs): - getframe = getattr(sys, "_getframe", None) offset = kwargs.pop("_stack_offset", 1) - name = getframe(offset).f_globals.get("__name__") + name = sys._getframe(offset).f_globals.get("__name__") logger = logging.getLogger(name) try: - logging.currentframe = lambda: getframe(3 + offset) + logging.currentframe = lambda: sys._getframe(3 + offset) logger.log(level, msg, *args, **kwargs) finally: diff --git a/src/runez/program.py b/src/runez/program.py index 9093a6d..3aa72c5 100644 --- a/src/runez/program.py +++ b/src/runez/program.py @@ -14,7 +14,7 @@ import sys import tempfile import termios -from io import BytesIO +from io import StringIO from select import select from runez.convert import parsed_tabular, to_int @@ -24,7 +24,7 @@ class PsInfo: """Summary info about a process, as given by `ps -f` command""" - info = None # type: dict # Info returned by `ps` + info: dict | None = None # Info returned by `ps` def __init__(self, pid=None): """ @@ -186,18 +186,6 @@ def check_pid(pid): if not pid: # No support for kill pid 0, as that is not the intent of this function, and it's not cross-platform return False - if SYS_INFO.platform_id.is_windows: # pragma: no cover - import ctypes - - kernel32 = ctypes.windll.kernel32 - SYNCHRONIZE = 0x100000 - process = kernel32.OpenProcess(SYNCHRONIZE, 0, pid) - if process: - kernel32.CloseHandle(process) - return True - - return False - try: os.kill(pid, 0) @@ -223,9 +211,10 @@ def daemonize(): os._exit(0) devnull_fd = os.open(os.devnull, os.O_RDWR) - os.dup2(devnull_fd, sys.__stdin__.fileno()) - os.dup2(devnull_fd, sys.__stdout__.fileno()) - os.dup2(devnull_fd, sys.__stderr__.fileno()) + for stream in (sys.__stdin__, sys.__stdout__, sys.__stderr__): + if stream is not None: + os.dup2(devnull_fd, stream.fileno()) + os.close(devnull_fd) @@ -237,9 +226,6 @@ def is_executable(path): Returns: (bool): True if file exists and is executable """ - if SYS_INFO.platform_id.is_windows: # pragma: no cover - return bool(_windows_exe(path)) - return path and os.path.isfile(path) and os.access(path, os.X_OK) @@ -315,8 +301,9 @@ def run( args = flattened(args, shellify=True) full_path = which(program) - result = RunResult(audit=RunAudit(full_path or program, args, popen_args)) - description = result.audit.run_description(short_exe=short_exe) + audit = RunAudit(full_path or program, args, popen_args) + result = RunResult(audit=audit) + description = audit.run_description(short_exe=short_exe) if background: description += " &" @@ -326,7 +313,7 @@ def run( description = _R.colored(description, "bold") if _R.hdry(dryrun, logger, "run: %s" % description): - result.audit.dryrun = True + audit.dryrun = True result.exit_code = 0 if stdout is not None: result.output = "[dryrun] %s" % description # Properly simulate a successful run @@ -358,8 +345,8 @@ def run( with _WrappedArgs([full_path, *args]) as wrapped_args: try: p, out, err = _run_popen(wrapped_args, popen_args, passthrough, fatal, stdout, stderr) - result.output = decode(out, strip=strip) - result.error = decode(err, strip=strip) + result.output = decode(out or "", strip=strip) + result.error = decode(err or "", strip=strip) result.pid = p.pid result.exit_code = p.returncode @@ -480,27 +467,28 @@ def run_description(self, short_exe=UNSET): class RunResult: """Holds result of a runez.run()""" - def __init__(self, output=None, error=None, code=1, audit=None): + def __init__(self, output=None, error=None, code=1, audit: "RunAudit | None" = None): """ Args: output (str | None): Captured output (on stdout), if any error (str | None): Captured error output (on stderr), if any code (int): Exit code - audit (RunAudit): Optional audit object recording what run this was related to + audit (RunAudit | None): Optional audit object recording what run this was related to """ self.output = output self.error = error self.exit_code = code - self.exc_info = None # Exception that occurred during the run, if any - self.pid = None # Pid of spawned process, if any + self.exc_info: BaseException | None = None # Exception that occurred during the run, if any + self.pid: int | None = None # Pid of spawned process, if any self.audit = audit def __repr__(self): return "RunResult(exit_code=%s)" % self.exit_code def __eq__(self, other): - if isinstance(other, RunResult): - return self.output == other.output and self.error == other.error and self.exit_code == other.exit_code + return ( + isinstance(other, RunResult) and self.output == other.output and self.error == other.error and self.exit_code == other.exit_code + ) def __bool__(self): return self.exit_code == 0 @@ -535,30 +523,26 @@ def which(program, ignore_own_venv=False): program = str(program) if os.path.basename(program) != program: + # Full path given: ok if it's an executable program = resolved_path(program) - if SYS_INFO.platform_id.is_windows: # pragma: no cover - return _windows_exe(program) - return program if is_executable(program) else None + if not ignore_own_venv: + # Look at our own venv first + venv_program = SYS_INFO.venv_bin_path(program) + if venv_program and is_executable(venv_program): + return venv_program + for p in os.environ.get("PATH", "").split(os.pathsep): fp = os.path.join(p, program) - if SYS_INFO.platform_id.is_windows: # pragma: no cover - fp = _windows_exe(fp) - - if ( - fp - and (not ignore_own_venv or not SYS_INFO.venv_bin_folder or not fp.startswith(SYS_INFO.venv_bin_folder)) - and is_executable(fp) - ): + if (not ignore_own_venv or not SYS_INFO.venv_bin_folder or not fp.startswith(SYS_INFO.venv_bin_folder)) and is_executable(fp): return fp + # Finally, look at current folder too program = os.path.join(os.getcwd(), program) if is_executable(program): return program - return None - def require_installed(program, instructions=None, platform=None): """Raise an exception if 'program' is not available on PATH, show instructions on how to install it @@ -641,12 +625,12 @@ def _read_data(fd, length=1024): def _run_popen(args, popen_args, passthrough, fatal, stdout, stderr): """Run subprocess.Popen(), capturing output accordingly""" if not passthrough: - p = subprocess.Popen(args, stdout=stdout, stderr=stderr, **popen_args) # noqa: S603 + p = subprocess.Popen(args, stdout=stdout, stderr=stderr, text=True, **popen_args) # noqa: S603 if fatal is None and stdout is None and stderr is None: return p, None, None # Don't wait on spawned process out, err = p.communicate() - return p, decode(out), decode(err) + return p, out, err # Capture output, but also let it pass-through as-is to the terminal stdout_r, stdout_w = pty.openpty() @@ -662,10 +646,10 @@ def _run_popen(args, popen_args, passthrough, fatal, stdout, stderr): else: passthrough = None - stdout_buffer = BytesIO() - stderr_buffer = BytesIO() + stdout_buffer = StringIO() + stderr_buffer = StringIO() - with subprocess.Popen(args, stdout=stdout_w, stderr=stderr_w, **popen_args) as p: # noqa: S603 + with subprocess.Popen(args, stdout=stdout_w, stderr=stderr_w, text=True, **popen_args) as p: # noqa: S603 os.close(stdout_w) os.close(stderr_w) readable = [stdout_r, stderr_r] @@ -681,11 +665,11 @@ def _run_popen(args, popen_args, passthrough, fatal, stdout, stderr): _safe_write(passthrough, text) if fd == stdout_r: _safe_write(sys.stdout, text, flush=sys.stdout.buffer) - _safe_write(stdout_buffer, data) + _safe_write(stdout_buffer, text) else: _safe_write(sys.stderr, text, flush=sys.stderr.buffer) - _safe_write(stderr_buffer, data) + _safe_write(stderr_buffer, text) except OSError as e: if e.errno != errno.EIO: # On some OS-es, EIO means EOF @@ -715,20 +699,6 @@ def _safe_write(target, data, flush=None): flush.flush() -def _windows_exe(path): # pragma: no cover - if path: - if os.path.isfile(path): - return path - - for extension in (".exe", ".bat"): - fpath = path - if not fpath.lower().endswith(extension): - fpath += extension - - if os.path.isfile(fpath): - return fpath - - class _WrappedArgs: """Context manager to temporarily work around https://youtrack.jetbrains.com/issue/PY-40692""" @@ -738,7 +708,7 @@ def __init__(self, args): def __enter__(self): args = self.args - needs_wrap = not SYS_INFO.platform_id.is_windows and "PYCHARM_HOSTED" in os.environ and len(args) > 1 + needs_wrap = "PYCHARM_HOSTED" in os.environ and len(args) > 1 if needs_wrap and "python" in args[0] and args[1][:2] in ("-m", "-X", "-c"): self.tmp_folder = os.path.realpath(tempfile.mkdtemp()) wrapper = os.path.join(self.tmp_folder, "pydev-wrapper.sh") diff --git a/src/runez/pyenv.py b/src/runez/pyenv.py index 4c49c85..aacb6e2 100644 --- a/src/runez/pyenv.py +++ b/src/runez/pyenv.py @@ -2,7 +2,7 @@ import os import re from pathlib import Path -from typing import ClassVar +from typing import ClassVar, Optional from runez.file import ls_dir from runez.program import is_executable, run @@ -63,7 +63,8 @@ def from_basename(cls, basename, source=None, last_modified=None, size=None): Returns: (ArtifactInfo | None): Parsed artifact info, if any """ - is_wheel = wheel_build_number = tags = None + is_wheel = False + wheel_build_number = tags = None m = PypiStd.RX_SDIST.match(basename) if not m: m = PypiStd.RX_WHEEL.match(basename) @@ -178,7 +179,7 @@ def __lt__(self, other): def satisfies(self, other): """Does this spec satisfy 'other'?""" if isinstance(other, PythonSpec) and self.family == other.family and self.freethreading == other.freethreading: - if other.is_min_spec: + if other.is_min_spec and other.version is not None: return self.version >= other.version return self.canonical.startswith(other.canonical) @@ -193,10 +194,11 @@ def represented(self, color=None, compact=CPYTHON): (str): Textual representation of this spec """ text = self.canonical - if compact and (compact is True or self.family in compact): + if compact and self.version is not None and (compact is True or self.family in compact): text = self.version.text if self.freethreading: text += "t" + if self.is_min_spec: text += "+" @@ -298,7 +300,7 @@ class PythonDepot: p = my_depot.find_python("3.10") """ - _preferred_python = None # type: PythonInstallation # Preferred python to use, if configured + _preferred_python: Optional["PythonInstallation"] = None # Preferred python to use, if configured def __init__(self, *locations): """ @@ -419,12 +421,16 @@ class Version: text: str - def __init__(self, text, max_parts=5, canonical=False): + def __init__(self, text, max_parts=5, canonical: bool | None = False): """ - Args: - text (str | None): Text to be parsed - max_parts (int): Maximum number of parts (components) to consider version valid - canonical (bool | None): None: loose parsing, False: strict parsing, version left as-is, True: Turn into canonical PEP-440 + Parameters + ---------- + text : str | None + Text to be parsed + max_parts : int + Maximum number of parts (components) to consider version valid + canonical : bool | None + None: loose parsing, False: strict parsing, version left as-is, True: Turn into canonical PEP-440 """ self.given_text = text self.given_components = None # Components as given by 'text' @@ -458,7 +464,7 @@ def __init__(self, text, max_parts=5, canonical=False): if pre: rel = rel_num = None # rc.post does not count as .post (but a .post.dev does) - components = [int(c) for c in m.group("main").split(".")] + components: list[int | str] = [int(c) for c in m.group("main").split(".")] if len(components) > max_parts: return # Invalid version, too many parts @@ -471,7 +477,7 @@ def __init__(self, text, max_parts=5, canonical=False): components.append(rel or "") self.components = tuple(components) if canonical is True: - self.text = self.pep_440 + self.text = self.pep_440 or "" @classmethod def extracted_from_text(cls, text): @@ -489,13 +495,13 @@ def extracted_from_text(cls, text): return v @classmethod - def from_object(cls, obj): + def from_object(cls, obj) -> Optional["Version"]: """ Args: obj: Object to turn into a Version, if possible Returns: - (Version | None): Corresponding version object, if valid + Corresponding version object, if valid """ if obj: if isinstance(obj, Version): @@ -839,7 +845,7 @@ class PythonInstallationLocation: def __init__(self, location): self.location = location - self._preferred_python = UNSET # type: PythonInstallation # Auto-selected preferred python from this location + self._preferred_python: PythonInstallation | None = UNSET # Auto-selected preferred python from this location def __repr__(self): return short(self.location) @@ -987,7 +993,7 @@ class PythonSimpleInspection: _cached: ClassVar = {} - def __init__(self, version=None, machine=None, problem=None, freethreading=None): + def __init__(self, version=None, machine=None, problem=None, freethreading=False): self.version = version self.machine = machine self.problem = problem diff --git a/src/runez/render.py b/src/runez/render.py index 6cedb5c..9c4fd85 100644 --- a/src/runez/render.py +++ b/src/runez/render.py @@ -1,8 +1,9 @@ +import inspect import os from runez.colors import ColorManager from runez.convert import to_int -from runez.system import _R, AdaptedProperty, flattened, is_iterable, joined, short, Slotted, stringified, SYS_INFO, UNSET, wcswidth +from runez.system import _R, flattened, joined, short, Slotted, stringified, SYS_INFO, UNSET, wcswidth NAMED_BORDERS = { "ascii": "rstgrid,t:+++=,m:+++-", @@ -127,7 +128,7 @@ def aerated(cls, text, border="--"): class PrettyBorder(Slotted): # bottom, cell, header, header-cell, mid, padding, top - __slots__ = ["b", "c", "h", "hc", "m", "pad", "t"] + __slots__ = ("b", "c", "h", "hc", "m", "pad", "t") def __repr__(self): return self.represented_values(delimiter=",", operator=":") @@ -160,6 +161,36 @@ def _set_field(self, name, value): super()._set_field(name, value) +class _AdaptedProperty: + """ + This decorator allows to define properties with regular get/set behavior, + but the body of the decorated function can act as a validator, and can auto-convert given values + """ + + def __init__(self, key: str, caster=None, type=None): + """ + Args: + caster (callable): Optional caster called for non-None values only (applies to anonymous properties only) + type (type): Optional type, must have initializer with one argument if provided + """ + self.caster = caster + self.type = type + self.key = "_%s" % key + + def __get__(self, instance, owner): + return getattr(instance, self.key, None) + + def __set__(self, obj, value): + if self.type is not None: + if not isinstance(value, self.type): + value = self.type(value) + + elif value is not None and self.caster is not None: + value = self.caster(value) + + setattr(obj, self.key, value) + + class PrettyCustomizable: """ Ancestor to customizable points, in reverse order of priority @@ -168,9 +199,9 @@ class PrettyCustomizable: - header.column: applies to all cells within a column (including header cells) """ - align = AdaptedProperty("align", caster=Align.cast, doc="Horizontal alignment to use (left, center or right)") - style = AdaptedProperty("style", caster=ColorManager.cast_style, doc="Style") - width = AdaptedProperty("width", caster=int, doc="Desired width") + align = _AdaptedProperty("align", caster=Align.cast) + style = _AdaptedProperty("style", caster=ColorManager.cast_style) + width = _AdaptedProperty("width", caster=int) def to_dict(self): result = {} @@ -181,7 +212,7 @@ def to_dict(self): return result - def formatted(self, text): + def formatted(self, text: str) -> str: style = self.style if style is not None: text = style(text) @@ -292,8 +323,8 @@ def add_columns(self, *columns): class PrettyTable(PrettyCustomizable): - border = AdaptedProperty("border", type=PrettyBorder, doc="Border to use") - header = AdaptedProperty("header", type=PrettyHeader, doc="Header") + border: PrettyBorder = _AdaptedProperty("border", type=PrettyBorder) # type: ignore[assignment] + header: PrettyHeader = _AdaptedProperty("header", type=PrettyHeader) # type: ignore[assignment] def __init__(self, header=None, align=None, border=None, missing="-", style=None, width=None): """ @@ -305,9 +336,9 @@ def __init__(self, header=None, align=None, border=None, missing="-", style=None style (str | runez.colors.Renderable | None): Desired default style (eg: dim, bold, etc) width (int | None): Desired width (defaults to detected terminal width) """ - self.header = header # type: PrettyHeader + self.header = header # type: ignore[assignment] # converted by _AdaptedProperty.__set__ self.align = align - self.border = border + self.border = border # type: ignore[assignment] self.missing = missing self.style = style self.width = width @@ -320,8 +351,8 @@ def __str__(self): def rows(self): return self._rows - def formatted(self, value): - return stringified(value, none=self.missing) + def formatted(self, text: str | None) -> str: + return stringified(text, none=self.missing) def add_row(self, *values): """Add one row with given 'values'""" @@ -334,13 +365,13 @@ def add_rows(self, *rows): for row in rows: self.add_row(row) - def get_string(self): + def get_string(self) -> str: """Table rendered as a string""" t = _PTTable(self) return t.get_string() @staticmethod - def _single_diag(sources, border, align, style, missing, columns): + def _single_diag(sources, border, align, style, missing, columns) -> str: table = PrettyTable(2, border=border) table.header[0].align = align table.header[1].style = style @@ -353,10 +384,7 @@ def _single_diag(sources, border, align, style, missing, columns): if isinstance(source, dict): source = sorted(source.items()) - elif not is_iterable(source): - source = [source] - - for row in source: + for row in _iterate(source): if not isinstance(row, (tuple, list)): row = (row, "") @@ -457,7 +485,7 @@ def render_line(container, columns, padding, pad, chars, cells=None): class _PTBorderChars(Slotted): - __slots__ = ["first", "h", "last", "mid"] + __slots__ = ("first", "h", "last", "mid") def _values_from_string(self, text): return self._values_from_object(list(text)) @@ -504,7 +532,7 @@ def new_row(self, values, header=None): return row - def get_string(self): + def get_string(self) -> str: container = [] columns = self.columns border = self.parent.border @@ -574,14 +602,25 @@ def __init__(self, column, value, header): column.update_width(self.text_width) def rendered_text(self, size, padding): - text = self.text + text = str(self.text or "") if text: size += len(text) - wcswidth(text) - text = self.custom.align(text, size) + if self.custom.align is not None: + text = self.custom.align(text, size) + return "%s%s%s" % (padding, text, padding) +def _iterate(source): + """Iterate over given `source`""" + if isinstance(source, (list, tuple, set)) or inspect.isgenerator(source): + yield from source + + else: + yield source + + def _represented_cell(text, missing_color): if text is None: return _R.colored("-missing-", missing_color) diff --git a/src/runez/schema.py b/src/runez/schema.py index cf5577d..6d62830 100644 --- a/src/runez/schema.py +++ b/src/runez/schema.py @@ -17,10 +17,14 @@ """ import inspect +from typing import TYPE_CHECKING from runez.convert import to_boolean, to_float, to_int from runez.date import to_date, to_datetime, UTC -from runez.system import _R, stringified +from runez.system import stringified + +if TYPE_CHECKING: + from runez.serialize import ClassMetaDescription class ValidationException(Exception): @@ -32,58 +36,9 @@ def __init__(self, message): def __str__(self): return self.message - -def determined_schema_type(value, required=True): - """ - Args: - value: Value given by user (as class attribute to describe their runez.Serializable schema) - required (bool): If True, raise ValidationException() is no type could be determined - - Returns: - (Any | None): Associated schema type (descendant of Any), if one is applicable - """ - schema_type = _determined_schema_type(value) - if required and schema_type is None: - raise ValidationException("Invalid schema definition '%s'" % value) - - return schema_type - - -def _determined_schema_type(value): - if value is None: - return Any() # User used None as value, no more info to be had - - if isinstance(value, Any): - return value # User specified their schema explicitly - - if inspect.isroutine(value): - return None # Routine, not a schema type - - if inspect.ismemberdescriptor(value): - return Any() # Member descriptor (such as slot) - - if isinstance(value, str): - return String(default=value) # User gave a string as value, assume they mean string type, and use value as default - - mapped = TYPE_MAP.get(value.__class__) - if mapped is not None: - return mapped(default=value) - - if not isinstance(value, type): - value = value.__class__ - - if issubclass(value, str): - return String() - - if issubclass(value, _R.serializable()): - return _MetaSerializable(value._meta) - - if issubclass(value, Any): - return value() - - mapped = TYPE_MAP.get(value) - if mapped is not None: - return mapped() + @staticmethod + def raise_with_message(message: str): # pragma: no cover, used as fallback + raise ValidationException(message) class Any: @@ -97,12 +52,13 @@ def __init__(self, default=None): self.default = default def __repr__(self): - if self.default is None: - return self.representation() + rep = self.representation() + if self.default is not None: + rep = f"{rep} (default: {self.default})" - return "%s (default: %s)" % (self.representation(), self.default) + return rep - def representation(self): + def representation(self) -> str: """ Returns: (str): Textual representation for this type constraint @@ -122,7 +78,7 @@ def problem(self, value): return self._problem(value) - def _problem(self, value): + def _problem(self, value) -> str | None: """To be re-defined by descendants, `value` is never `None`""" def converted(self, value): @@ -146,13 +102,13 @@ def _converted(self, value): class _MetaSerializable(Any): """Wraps descendants of `runez.Serializable` as schema fields (will be retired in the future)""" - def __init__(self, meta, default=None): + def __init__(self, meta: "ClassMetaDescription", default=None): """ Args: meta: A runez.Serializable object, or its ._meta attribute default: Default to use (when no value is provided) """ - self.meta = getattr(meta, "_meta", meta) + self.meta: ClassMetaDescription = getattr(meta, "_meta", meta) super().__init__(default=default) def _problem(self, value): @@ -213,8 +169,8 @@ def __init__(self, key=None, value=None, default=None): value: Optional constraint for values default: Default to use when no value was provided """ - self.key = determined_schema_type(key) # type: Any - self.value = determined_schema_type(value) # type: Any + self.key = _determined_schema_type(key) + self.value = _determined_schema_type(value) super().__init__(default=default) def representation(self): @@ -291,7 +247,7 @@ def __init__(self, subtype=None, default=None): subtype: Optional constraint for values default: Default to use when no value was provided """ - self.subtype = determined_schema_type(subtype) # type: Any + self.subtype = _determined_schema_type(subtype) super().__init__(default=default) def representation(self): @@ -324,15 +280,22 @@ def _converted(self, value): class Struct(Any): """Represents a composed object, similar to `Serializable`, but not intended to be the root of any schema""" + _meta: "ClassMetaDescription" # Class attribute set dynamically in __init__ + def __init__(self, default=None): if not hasattr(self.__class__, "_meta"): - self.__class__._meta = _R.meta_description(self) + from runez.serialize import ClassMetaDescription + + self.__class__._meta = ClassMetaDescription(self.__class__) super().__init__(default=default) def __eq__(self, other): - if other is not None and other.__class__ is self.__class__: - return not any(not hasattr(other, x) or getattr(self, x) != getattr(other, x) for x in self.meta.attributes) + return ( + other is not None + and other.__class__ is self.__class__ + and not any(not hasattr(other, x) or getattr(self, x) != getattr(other, x) for x in self.meta.attributes) + ) def __ne__(self, other): return not (self == other) @@ -383,7 +346,7 @@ def __init__(self, subtype=None): Args: subtype: Optional type constraint for this identifier (defaults to `String`) """ - self.subtype = determined_schema_type(subtype or String) # type: Any + self.subtype = _determined_schema_type(subtype or String) super().__init__(default=None) @@ -396,7 +359,48 @@ def __init__(self, subtype=None): } -def _schema_type_name(target): +def _determined_schema_type(value) -> Any: + """ + Args: + value: Value given by user (as class attribute to describe their runez.Serializable schema) + + Returns: + (Any): Associated schema type (descendant of Any), if one is applicable + """ + from runez.serialize import Serializable + + if value is None: + return Any() # User used None as value, no more info to be had + + if isinstance(value, Any): + return value # User specified their schema explicitly + + if inspect.ismemberdescriptor(value): + return Any() # Member descriptor (such as slot) + + if isinstance(value, str): + return String(default=value) # User gave a string as value, assume they mean string type, and use value as default + + mapped = TYPE_MAP.get(value.__class__) + if mapped is not None: + return mapped(default=value) + + if not isinstance(value, type): + value = value.__class__ + + if issubclass(value, str): + return String() + + if issubclass(value, Serializable): + return _MetaSerializable(value._meta) + + if issubclass(value, Any): + return value() + + raise ValidationException("Invalid schema definition '%s'" % value) + + +def _schema_type_name(target) -> str: meta = getattr(target, "meta", None) if meta is not None: return meta.name diff --git a/src/runez/serialize.py b/src/runez/serialize.py index d1a1af5..f7cf93b 100644 --- a/src/runez/serialize.py +++ b/src/runez/serialize.py @@ -4,6 +4,7 @@ import collections import datetime +import inspect import io import json from typing import ClassVar @@ -87,8 +88,8 @@ class DefaultBehavior: (global default for that does not make sense). """ - strict = False # type: callable # Original default: don't strictly enforce type compatibility - extras = False # type: callable # Original default: don't report extra fields seen in deserialized data (ie: ignore them) + strict = False # Original default: don't strictly enforce type compatibility + extras = False # Original default: don't report extra fields seen in deserialized data (ie: ignore them) def __init__(self, strict=UNSET, extras=UNSET, hook=UNSET): """ @@ -97,13 +98,15 @@ def __init__(self, strict=UNSET, extras=UNSET, hook=UNSET): extras (bool | callable | (callable, list)): See `with_behavior()` hook (callable): Called if provided at the end of ClassMetaDescription initialization """ + from runez import schema + if strict is UNSET: strict = self.strict if extras is UNSET: extras = self.extras - self.strict = _to_callable(strict, fallback=_R.lc.rm_schema.ValidationException) + self.strict = _to_callable(strict, fallback=schema.ValidationException) self.hook = _to_callable(hook) # Called if provided at the end of ClassMetaDescription initialization self.ignored_extras = None # Internal, populated if given `extras` is a `tuple(callable, list)` @@ -137,10 +140,10 @@ def __repr__(self): return "lenient" @staticmethod - def behavior_from_bases(cls): - """Determine behavior from base classes of given `cls`""" + def behavior_from_bases(klass): + """Determine behavior from base classes of given `klass`""" strict = hook = UNSET - for base in reversed(cls.__bases__): + for base in reversed(klass.__bases__): meta = getattr(base, "_meta", None) if isinstance(meta, ClassMetaDescription) and meta.behavior is not None and is_serializable_descendant(base): # Let `strict` and `hook` be inherited from parent classes (but not `Serializable` itself) @@ -156,14 +159,18 @@ def handle_mismatch(self, class_name, field_name, problem, source): if isinstance(self.strict, type) and issubclass(self.strict, Exception): raise self.strict(msg) - self.strict(msg) + from runez.schema import ValidationException + + handler = self.strict if callable(self.strict) else ValidationException.raise_with_message + handler(msg) def do_notify(self, message): if self.extras: if isinstance(self.extras, type) and issubclass(self.extras, Exception): raise self.extras(message) - self.extras(message) + notifier = self.extras if callable(self.extras) else LOG.debug + notifier(message) def handle_extra(self, class_name, field_name): self.do_notify("'%s' is not an attribute of %s" % (field_name, class_name)) @@ -345,35 +352,45 @@ class ClassMetaDescription: """Info on class attributes and properties""" def __init__(self, cls, mbehavior=None): + from runez import schema + self.name = cls.__name__ self.qualified_name = "%s.%s" % (cls.__module__, cls.__name__) self.cls = cls self.attributes = {} self.properties = [] - self.behavior = mbehavior.behavior if mbehavior is not None else DefaultBehavior.behavior_from_bases(cls) self.unique_identifier = None + if mbehavior is not None and isinstance(mbehavior.behavior, DefaultBehavior): + self.behavior = mbehavior.behavior + + else: + self.behavior = DefaultBehavior.behavior_from_bases(cls) by_type = collections.defaultdict(list) for key, value in scan_all_attributes(cls): - if not key.startswith("_"): - if value is not None and "property" in value.__class__.__name__: - self.properties.append(key) - continue - - schema_module = _R.lc.rm_schema - schema_type = schema_module.determined_schema_type(value, required=False) - if schema_type is not None: - if isinstance(schema_type, schema_module.UniqueIdentifier): - if self.unique_identifier: - raise schema_module.ValidationException( - "Multiple unique ids specified for %s: %s and %s" - % (self.qualified_name, self.unique_identifier, schema_type) - ) - self.unique_identifier = key - schema_type = schema_type.subtype - - self.attributes[key] = schema_type - by_type[schema_type.__class__].append(key) + if key.startswith("_") or (value is not None and inspect.isroutine(value)): + continue + + if value is not None and "property" in value.__class__.__name__: + self.properties.append(key) + continue + + try: + schema_type = schema._determined_schema_type(value) + + except schema.ValidationException: + continue + + if isinstance(schema_type, schema.UniqueIdentifier): + if self.unique_identifier: + msg = f"Multiple unique ids specified for {self.qualified_name}: {self.unique_identifier} and {schema_type}" + raise schema.ValidationException(msg) + + self.unique_identifier = key + schema_type = schema_type.subtype + + self.attributes[key] = schema_type + by_type[schema_type.__class__].append(key) self._by_type = {k: sorted(v) for k, v in by_type.items()} # Sorted to make things deterministic if self.attributes: @@ -538,7 +555,7 @@ def __init__(cls, name, bases, dct): class Serializable: """Serializable object""" - _meta = None # type: ClassMetaDescription # This describes fields and properties of descendant classes, populated via metaclass + _meta: ClassMetaDescription # Describes fields and properties of descendant classes, populated via metaclass def __new__(cls, *_, **__): obj = super(Serializable, cls).__new__(cls) @@ -546,8 +563,11 @@ def __new__(cls, *_, **__): return obj def __eq__(self, other): - if other is not None and other.__class__ is self.__class__: - return not any(not hasattr(other, x) or getattr(self, x) != getattr(other, x) for x in self._meta.attributes) + return ( + other is not None + and other.__class__ is self.__class__ + and not any(not hasattr(other, x) or getattr(self, x) != getattr(other, x) for x in self._meta.attributes) + ) def __ne__(self, other): return not (self == other) @@ -568,7 +588,7 @@ def from_json(cls, path, default=None, fatal=False, logger=False): (cls): Deserialized object """ result = cls() - data = read_json(path, default=default, fatal=fatal or (default is None and cls._meta.behavior.strict), logger=logger) + data = read_json(path, default=default, fatal=fatal or bool(default is None and cls._meta.behavior.strict), logger=logger) result.set_from_dict(data, source=short(path)) return result @@ -606,7 +626,7 @@ def reset(self): for name, schema_type in self._meta.attributes.items(): setattr(self, name, schema_type.default) - def to_dict(self, stringify=stringified, dt=str, none=False): + def to_dict(self, stringify=stringified, dt=str, none=False) -> dict: """ Args: stringify (callable | None): Function to use to stringify non-builtin types @@ -620,7 +640,9 @@ def to_dict(self, stringify=stringified, dt=str, none=False): (dict): This object serialized to a dict """ raw = {name: getattr(self, name) for name in self._meta.attributes} - return json_sanitized(raw, stringify=stringify, dt=dt, none=none) + value = json_sanitized(raw, stringify=stringify, dt=dt, none=none) + assert isinstance(value, dict) + return value def from_json(value, default=None, fatal=False, logger=False): diff --git a/src/runez/system.py b/src/runez/system.py index 1c3fbb5..2420393 100644 --- a/src/runez/system.py +++ b/src/runez/system.py @@ -5,6 +5,7 @@ """ import contextlib +import functools import importlib.metadata import inspect import logging @@ -15,7 +16,9 @@ import threading import unicodedata from io import StringIO -from typing import ClassVar +from typing import Any, Callable, ClassVar, TypeVar + +_T = TypeVar("_T") ABORT_LOGGER = logging.error LOG = logging.getLogger("runez") @@ -40,10 +43,11 @@ def __len__(self): # Internal marker for values that are NOT set -UNSET = Undefined() # type: Undefined +# Typed as Any so that pyright doesn't constrain parameter types when UNSET is used as a default value +UNSET: Any = Undefined() -def abort(message, code=1, exc_info=None, return_value=None, fatal=True, logger=UNSET): +def abort(message, code=1, exc_info=None, return_value: _T = None, fatal=True, logger=UNSET) -> _T: """General wrapper for optionally fatal calls >>> from runez import abort @@ -73,7 +77,7 @@ def abort(message, code=1, exc_info=None, return_value=None, fatal=True, logger= Args: message (str): Message explaining why we're aborting code (int): Exit code used when runez.system.AbortException is set to SystemExit - exc_info (Exception): Exception info to pass on to logger + exc_info (BaseException): Exception info to pass on to logger return_value (Any): Value to return when `fatal` is not True fatal (type | bool | None): True: abort execution on failure, False: don't abort but log, None: don't abort, don't log logger (callable | bool | None): Logger to use, True to print(), None to disable log chatter @@ -118,11 +122,36 @@ def abort_if(condition, message=None, code=1, exc_info=None, logger=UNSET): abort(_R.actual_message(message or condition), code=code, exc_info=exc_info, logger=logger) +class _PyMimicked: + """ + Base class for descriptor objects that use ``py_mimic()`` to copy identity attributes from a wrapped function. + + ``py_mimic()`` dynamically sets ``__name__``, ``__doc__``, ``__module__``, and ``__annotations__`` + on the target object at runtime. Since pyright cannot infer attributes set via side effects, + this mixin declares them upfront so that static type checkers understand they exist. + + Inherit from this class in any descriptor that calls ``py_mimic(self, func)`` in its ``__init__``. + """ + + __name__: str + __doc__: str | None + __module__: str + __annotations__: dict + + def py_mimic(target, source): - """Make 'target' mimic python definition of 'source' - Args: - target: Object to decorate - source: Object to mimic + """ + Make ``target`` mimic the Python identity of ``source``. + + Copies ``__annotations__``, ``__doc__``, ``__module__``, and ``__name__`` from ``source`` to ``target``. + This is similar to ``functools.update_wrapper``, but works on arbitrary objects (not just callables). + + Parameters + ---------- + target : _PyMimicked | object + Object to decorate (typically ``self`` in a descriptor's ``__init__``) + source : object + Object whose identity to copy (typically the wrapped function) """ if target is not None and source is not None: target.__annotations__ = source.__annotations__ @@ -131,26 +160,27 @@ def py_mimic(target, source): target.__name__ = source.__name__ -class cached_property: +class cached_property(functools.cached_property): """ - A property that is only computed once per instance and then replaces itself with an ordinary attribute. - Same as https://pypi.org/project/cached-property/ (without having to add another dependency). - Deleting the attribute resets the property. + A ``cached_property`` that extends ``functools.cached_property`` with batch introspection utilities. - Threads/async is not supported on purpose (to keep things simple) - See docs/async-cached-property.md for how that can be added if/when needed. - """ + The core caching behavior is identical to the stdlib version: on first access, the decorated method + is called and its return value is stored in the instance's ``__dict__``. Deleting the attribute + resets the property so it will be recomputed on next access. - def __init__(self, func): - self.__func__ = func - py_mimic(self, self.__func__) + This subclass adds static helper methods for working with cached properties in bulk: - def __get__(self, instance, owner): - if instance is None: - return self + - ``reset(target)``: clear all cached property values on an object (useful in tests/conftest) + - ``properties(target)``: yield names of all (cached) properties on an object + - ``to_dict(target)``: return a dict of property name/value pairs + """ - value = instance.__dict__[self.__name__] = self.__func__(instance) - return value + def __init__(self, func): + super().__init__(func) + self.__annotations__ = func.__annotations__ + self.__doc__ = func.__doc__ + self.__module__ = func.__module__ + self.__name__ = func.__name__ @staticmethod def _walk_properties(target, cached_only=True): @@ -245,24 +275,24 @@ def capped(value, minimum=None, maximum=None, key=None, none_ok=False): return value -def decode(value, strip=None): +def decode(value: str | bytes, strip: str | bool | None = None) -> str: """Python 2/3 friendly decoding of output. Args: - value (str | bytes | None): The value to decode. + value (str | bytes): The value to decode. strip (str | bool | None): If provided, `strip()` the returned string. Returns: str: Decoded value, if applicable. """ - if value is None: - return None - if isinstance(value, bytes): value = value.decode("utf-8", errors="replace") - if strip: - value = value.strip(strip if isinstance(strip, str) else None) + if strip is True: + value = value.strip() + + elif isinstance(strip, str): + value = value.strip(strip) return value @@ -288,11 +318,12 @@ def find_caller(depth=2, maximum=1000, need_file=True, need_package=False, regex package = f.f_globals.get("__package__") if package or not need_package: name = f.f_globals.get("__name__") - top_level = package and package.partition(".")[0] - if name.endswith("__main__") or not top_level or (not top_level.startswith("_") and top_level not in ignored): - filepath = f.f_globals.get("__file__") - if (filepath or not need_file) and (regex is None or regex.match(name)): - return _CallerInfo(f, depth, package, top_level, name, filepath) + if name != "functools": + top_level = package and package.partition(".")[0] + if name.endswith("__main__") or not top_level or (not top_level.startswith("_") and top_level not in ignored): + filepath = f.f_globals.get("__file__") + if (filepath or not need_file) and (regex is None or regex.match(name)): + return _CallerInfo(f, depth, package, top_level, name, filepath) depth = depth + 1 @@ -328,7 +359,7 @@ def first_line(text, keep_empty=False, default=None): return default -def flattened(*value, keep_empty=False, split=None, shellify=False, strip=None, transform=None, unique=False): +def flattened(*value, keep_empty: str | bool | None = False, split=None, shellify=False, strip=None, transform=None, unique=False): """ Args: value: Possibly nested arguments (sequence of lists, nested lists, ...) @@ -366,7 +397,7 @@ def flattened(*value, keep_empty=False, split=None, shellify=False, strip=None, return result -def get_version(mod, default="0.0.0", fatal=False, logger=False): +def get_version(mod, default="0.0.0", fatal=False, logger: bool | Callable | None = False): """ Args: mod (module | str): Module, or module name to find version for (pass either calling module, or its .__name__) @@ -417,7 +448,7 @@ def is_iterable(value): return isinstance(value, (list, tuple, set)) or inspect.isgenerator(value) -def stringified(value, converter=None, none="None"): +def stringified(value, converter=None, none: str | bool | None = "None") -> str: """ Args: value: Any object to turn into a string @@ -431,7 +462,7 @@ def stringified(value, converter=None, none="None"): return value if isinstance(value, bytes): - return value.decode("utf-8", errors="replace") + return decode(value) if converter is not None: converted = converter(value) @@ -456,7 +487,7 @@ def stringified(value, converter=None, none="None"): return "{}".format(value) -def joined(*args, delimiter=" ", keep_empty=False, strip=None, stringify=stringified, unique=False): +def joined(*args, delimiter=" ", keep_empty: str | bool | None = False, strip=None, stringify=stringified, unique=False): """ >>> joined(1, " foo ", None, 2) '1 foo 2' @@ -656,85 +687,6 @@ class AbortException(Exception): """ -class AdaptedProperty: - """ - This decorator allows to define properties with regular get/set behavior, - but the body of the decorated function can act as a validator, and can auto-convert given values - - Example usage: - >>> from runez import AdaptedProperty - >>> class MyObject: - ... age = AdaptedProperty(default=5) # Anonymous property - ... - ... @AdaptedProperty # Simple adapted property - ... def width(self, value): - ... if value is not None: # Implementation of this function acts as validator and adapter - ... return int(value) # Here we turn value into an int (will raise exception if not possible) - ... - >>> my_object = MyObject() - >>> assert my_object.age == 5 # Default value - >>> my_object.width = "10" # Implementation of decorated function turns this into an int - >>> assert my_object.width == 10 - """ - - __counter: ClassVar = [0] # Simple counter for anonymous properties - - def __init__(self, validator=None, default=None, doc=None, caster=None, type=None): - """ - Args: - validator (callable | str | None): Function to use to validate/adapt passed values, or name of property - default: Default value - doc (str): Docstring (applies to anonymous properties only) - caster (callable): Optional caster called for non-None values only (applies to anonymous properties only) - type (type): Optional type, must have initializer with one argument if provided - """ - self.default = default - self.caster = caster - self.type = type - if callable(validator): - # 'validator' is available when used as decorator of the form: @AdaptedProperty - self.validator = validator - self.key = "__%s" % validator.__name__ - py_mimic(self, validator) - - else: - # 'validator' is NOT available when decorator of this form is used: @AdaptedProperty(default=...) - # or as an anonymous property form: my_prop = AdaptedProperty() - self.validator = None - self.__doc__ = doc - if validator is None: - i = self.__counter[0] = self.__counter[0] + 1 - validator = "anon_prop_%s" % i - - self.key = "__%s" % validator - - def __call__(self, validator): - """Called when used as decorator of the form: @AdaptedProperty(default=...)""" - self.validator = validator - self.key = "__%s" % validator.__name__ - py_mimic(self, validator) - return self - - def __get__(self, instance, owner): - if instance is None: - return self # We're being called by class - - return getattr(instance, self.key, self.default) - - def __set__(self, obj, value): - if self.validator is not None: - value = self.validator(obj, value) - - elif self.type is not None: - if not isinstance(value, self.type): - value = self.type(value) - - elif value is not None and self.caster is not None: - value = self.caster(value) - - setattr(obj, self.key, value) - - class Anchored: """ An "anchor" is a known path that we don't wish to show in full when printing/logging @@ -1029,6 +981,8 @@ def clear(self): class Slotted: """This class allows to easily initialize/set a descendant using named arguments""" + __slots__: tuple + def __init__(self, *positionals, **named): """ Args: @@ -1116,8 +1070,8 @@ def pop(self, settings): for name in self.__slots__: self._set(name, settings.pop(name, UNSET)) - def to_dict(self): - """dict: Key/value pairs of defined fields""" + def to_dict(self) -> dict: + """Key/value pairs of defined fields""" result = {} for name in self.__slots__: val = getattr(self, name, UNSET) @@ -1150,8 +1104,7 @@ def __iter__(self): yield val def __eq__(self, other): - if isinstance(other, self.__class__): - return all(getattr(self, x, None) == getattr(other, x, None) for x in self.__slots__) + return isinstance(other, self.__class__) and all(getattr(self, x, None) == getattr(other, x, None) for x in self.__slots__) def _seed(self): """Seed initial fields""" @@ -1166,8 +1119,8 @@ def _seed(self): def _set_field(self, name, value): setattr(self, name, value) - def _get_defaults(self): - """dict|Undefined|None: Optional defaults""" + def _get_defaults(self) -> dict | None: + """Optional defaults""" def _set(self, name, value): """ @@ -1195,8 +1148,8 @@ def _set(self, name, value): else: self._set_field(name, value) - def _values_from_positional(self, positional): - """dict: Key/value pairs from a given position to set()""" + def _values_from_positional(self, positional) -> dict | None: + """Key/value pairs from a given position to set()""" if isinstance(positional, str): return self._values_from_string(positional) @@ -1208,11 +1161,11 @@ def _values_from_positional(self, positional): return self._values_from_object(positional) - def _values_from_string(self, text): - """dict: Optional hook to allow descendants to extract key/value pairs from a string""" + def _values_from_string(self, text) -> dict | None: + """Optional hook to allow descendants to extract key/value pairs from a string""" - def _values_from_object(self, obj): - """dict: Optional hook to allow descendants to extract key/value pairs from an object""" + def _values_from_object(self, obj) -> dict | None: + """Optional hook to allow descendants to extract key/value pairs from an object""" if obj is not None: return {k: getattr(obj, k, UNSET) for k in self.__slots__} @@ -1252,7 +1205,7 @@ def project_folder(self) -> str | None: return path @cached_property - def tests_folder(self) -> str: + def tests_folder(self) -> str | None: """Path to current development project's tests/ folder, if we're running from a source compilation""" if SYS_INFO.venv_bin_folder: ct = self.current_test() @@ -1260,7 +1213,7 @@ def tests_folder(self) -> str: return _R.find_parent_folder(ct.folder, {"tests", "test"}) @cached_property - def venv_folder(self) -> str: + def venv_folder(self) -> str | None: """Path to current development venv, if we're running from one""" if SYS_INFO.venv_bin_folder: return _R.find_parent_folder(sys.prefix, {"venv", ".venv", ".virtualenvs", ".tox", "build"}) @@ -1297,9 +1250,9 @@ class PlatformId: - Additionally, the binary also should be able to run from whatever folder it has been unpacked in (no global system settings needed) """ - arch: str = None # Example: arm64, x86_64 - platform: str = None # Example: linux, macos - subsystem: str = None # Example: libc, musl (empty for macos/windows for now) + arch: str = "" # Example: arm64, x86_64 (populated in __init__) + platform: str = "" # Example: linux, macos (populated in __init__) + subsystem: str | None = None # Example: libc, musl (empty for macos/windows) default_subsystem = None # Can this be auto-detected? (currently: users can optionally provide this, by setting this class field) platform_archive_type: ClassVar = {"linux": "tar.gz", "macos": "tar.gz", "windows": "zip"} @@ -1325,7 +1278,7 @@ def __init__(self, given=None, arch=None, platform=None, subsystem=None): if subsystem is None and len(given) > 2: subsystem = given[2] - self.platform = platform or self.determine_current_platform() + self.platform = str(platform) if platform else self.determine_current_platform() self.arch = arch or self.determine_current_architecture() if subsystem is None: subsystem = self.determine_current_subsystem() @@ -1356,9 +1309,6 @@ def __init__(self, given=None, arch=None, platform=None, subsystem=None): base_paths.append("@(rpath|executable_path|loader_path)/.+") # Count relative libs as base base_paths.append(r"/usr/lib/libSystem\.B\.dylib") - elif self.is_windows: - base_paths.append(r"C:/Windows/System32/.*") - self.rx_base_path = re.compile(r"^(%s)$" % joined(base_paths, delimiter="|")) def __repr__(self): @@ -1421,7 +1371,7 @@ def get_identifier(self, delimiter="-"): def is_base_lib(self, *paths): """Does one of the given 'paths' match a base library? (present on any declination of this system)""" - return any(p and self.rx_base_path.match(str(p)) for p in paths) + return self.rx_base_path is not None and any(p and self.rx_base_path.match(str(p)) for p in paths) def is_system_lib(self, *paths): """Does one of the given 'paths' match a system library? (ie: installed in a system folder, not /usr/local and such)""" @@ -1441,10 +1391,6 @@ def is_linux(self): def is_macos(self): return self.platform == "macos" - @property - def is_windows(self): - return self.platform == "windows" - def determine_current_architecture(self): import platform @@ -1479,7 +1425,7 @@ class PlatformInfo: def __init__(self, text=None): if text is None: - text = "Windows" if SYS_INFO.platform_id.is_windows else _R.lc.rm.shell("uname", "-msrp") + text = _R.lc.rm.shell("uname", "-msrp") self.os_name = text or "unknown-os" self.os_version = None @@ -1506,7 +1452,7 @@ class SystemInfo: @cached_property def current_process(self): """Info on currently running process""" - return _R.lc.rm.PsInfo() + return _R.lc.rm.program.PsInfo() def diagnostics(self, argv=UNSET, exe=True, platform=True, term=UNSET, userid=UNSET, version=UNSET, via=" ⚡ "): """Usable by runez.render.PrettyTable.two_column_diagnostics()""" @@ -1730,8 +1676,8 @@ def get_size(default_columns=160, default_lines=25): class TerminalProgram: """Info on terminal program being currently used, if any""" - name = None # type: str # Terminal program name - extra_info = None # type: str # Extra info, if available + name: str | None = None # Terminal program name + extra_info: str | None = None # Extra info, if available def __init__(self, ps=None): for k in ("LC_TERMINAL", "TERM_PROGRAM"): @@ -1780,8 +1726,8 @@ def __init__(self, filter_type): """ self._filter_type = filter_type self._lock = threading.RLock() - self._tpayload = None - self._gpayload = None + self._tpayload: threading.local | None = None + self._gpayload: dict | None = None self.filter = None def reset(self): @@ -1811,14 +1757,14 @@ def has_global(self): def set_threadlocal(self, **values): """Set current thread's logging context to specified `values`""" with self._lock: - self._ensure_threadlocal() - self._tpayload.log_context = values + tp = self._ensure_threadlocal() + tp.log_context = values def add_threadlocal(self, **values): """Add `values` to current thread's logging context""" with self._lock: - self._ensure_threadlocal() - self._tpayload.log_context.update(**values) + tp = self._ensure_threadlocal() + tp.log_context.update(**values) def remove_threadlocal(self, name): """ @@ -1846,8 +1792,8 @@ def set_global(self, **values): def add_global(self, **values): """Add `values` to global logging context""" with self._lock: - self._ensure_global() - self._gpayload.update(**values) + gp = self._ensure_global() + gp.update(**values) def remove_global(self, name): """ @@ -1881,14 +1827,16 @@ def to_dict(self): return result - def _ensure_threadlocal(self): + def _ensure_threadlocal(self) -> threading.local: if self._tpayload is None: self._tpayload = threading.local() if not hasattr(self._tpayload, "log_context"): self._tpayload.log_context = {} - def _ensure_global(self, values=None): + return self._tpayload + + def _ensure_global(self, values=None) -> dict: """ Args: values (dict): Ensure internal global tracking dict is created, seed it with `values` when provided (Default value = None) @@ -1896,6 +1844,8 @@ def _ensure_global(self, values=None): if self._gpayload is None: self._gpayload = values or {} + return self._gpayload + class UnitRepresentation: def __init__(self, base=1000, prefixes="KMGTP"): @@ -1971,12 +1921,6 @@ def rm(self): return runez - @cached_property - def rm_schema(self): - import runez.schema - - return runez.schema - @cached_property def rx_ansi_escape(self): return re.compile(r"\x1b(\[[;\d]*[A-Za-z]?)?") @@ -2167,22 +2111,6 @@ def resolved_dryrun(cls, dryrun): return dryrun - @classmethod - def serializable(cls): - """Late-imported Serializable class""" - return cls.lc.rm.Serializable - - @classmethod - def meta_description(cls, struct): - """ - Args: - struct (runez.schema.Struct): Associated Struct - - Returns: - (runez.serialize.ClassMetaDescription): Meta object describing given 'struct' - """ - return cls.lc.rm.serialize.ClassMetaDescription(struct.__class__) - @classmethod def set_dryrun(cls, dryrun): """Set runez.DRYRUN, and return its previous value (useful for context managers) diff --git a/src/runez/thread.py b/src/runez/thread.py index 1a42ff1..cd254d3 100644 --- a/src/runez/thread.py +++ b/src/runez/thread.py @@ -1,11 +1,11 @@ import threading -from runez.system import py_mimic +from runez.system import _PyMimicked, py_mimic THREAD_LOCAL = threading.local() -class thread_local_property: +class thread_local_property(_PyMimicked): """ A property that is computed once per thread Use this in rare cases where you need just a property (or 2) to be thread local in a given object diff --git a/tests/conftest.py b/tests/conftest.py index b6f8443..e9ba2b4 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -4,22 +4,20 @@ import runez from runez.__main__ import main -from runez.conftest import cli, IsolatedLogSetup, logged, temp_folder -from runez.file import readlines +from runez.conftest import cli, ClickRunner, IsolatedLogSetup, logged, temp_folder from runez.http import GlobalHttpCalls from runez.logsetup import LogManager -from runez.system import CaptureOutput, LOG, short, stringified +from runez.system import CaptureOutput, short -cli.default_main = main +# Re-export fixtures so pytest discovers them, and ruff knows they're intentional +__all__ = ["cli", "logged", "temp_folder"] + +ClickRunner.default_main = main GlobalHttpCalls.forbid() runez.date.DEFAULT_TIMEZONE = runez.date.UTC runez.serialize.set_default_behavior(strict=False, extras=True) -# This is here only to satisfy flake8, mentioning the imported fixtures so they're not declared "unused" -assert all(s for s in [logged, temp_folder]) - - class TempLog: """Extra test-oriented convenience on top of runez.TrackedOutput""" @@ -55,21 +53,11 @@ def expect_logged(self, *expected): found = [msg for msg in remaining if msg in line] remaining.difference_update(found) - if remaining: - LOG.info("File contents:") - LOG.info("\n".join(readlines(LogManager.file_handler.baseFilename))) - assert not remaining def clear(self): self.tracked.clear() - def __repr__(self): - return stringified(self.tracked) - - def __str__(self): - return self.folder - def __contains__(self, item): return item in self.tracked diff --git a/tests/test_click.py b/tests/test_click.py index 3cf569e..ecdc1cc 100644 --- a/tests/test_click.py +++ b/tests/test_click.py @@ -11,6 +11,7 @@ import pytest import runez +import runez.config from .conftest import exception_raiser diff --git a/tests/test_config.py b/tests/test_config.py index ad312e0..1895888 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -3,6 +3,7 @@ import pytest import runez +import runez.config SAMPLES = runez.DEV.tests_path("sample") diff --git a/tests/test_convert.py b/tests/test_convert.py index cb6ab1c..bf0eb12 100644 --- a/tests/test_convert.py +++ b/tests/test_convert.py @@ -122,9 +122,6 @@ def tabulated_samples(): lines.append(line) - if lines: - yield len(lines) - 1, "\n".join(lines) - def test_tabulated_parsing(): assert runez.parsed_tabular(" \nfoo") == [] # First line must have a header... diff --git a/tests/test_file.py b/tests/test_file.py index 5863617..0fec0be 100644 --- a/tests/test_file.py +++ b/tests/test_file.py @@ -53,8 +53,6 @@ def test_checksum(): sample = runez.DEV.project_path("LICENSE") assert runez.checksum(sample) == "0f7ae07a0fc3fccaf4e7e7888df52473abcbc2b29b47b7c2cfe8c125528e1536" assert runez.checksum(sample, hash=hashlib.sha1) == "ea553d4e5a18aa83ba90b575ee63a37fc9a7bc07" - assert runez.checksum(sample, hash=hashlib.sha1()) == "ea553d4e5a18aa83ba90b575ee63a37fc9a7bc07" - assert runez.checksum(sample, hash=hashlib.md5()) == "cbc91d983eaeb4ce4724ea3f420c5ce4" def dir_contents(path=None): @@ -238,9 +236,8 @@ def test_failure(temp_folder, monkeypatch): assert runez.write("bar", "some content", fatal=False) assert "Can't write to bar:" in logged.pop() - if not runez.SYS_INFO.platform_id.is_windows: - assert runez.make_executable("some-file", fatal=False) == -1 - assert "Can't chmod some-file:" in logged.pop() + assert runez.make_executable("some-file", fatal=False) == -1 + assert "Can't chmod some-file:" in logged.pop() def test_file_inspection(temp_folder, logged): @@ -417,8 +414,7 @@ def test_parent_folder(): assert runez.parent_folder(None) is None assert runez.parent_folder("././some-file") == cwd - if not runez.SYS_INFO.platform_id.is_windows: - parent = runez.parent_folder("/logs/foo") - assert parent == "/logs" - assert runez.parent_folder(parent) == "/" - assert runez.parent_folder("/") == "/" + parent = runez.parent_folder("/logs/foo") + assert parent == "/logs" + assert runez.parent_folder(parent) == "/" + assert runez.parent_folder("/") == "/" diff --git a/tests/test_http.py b/tests/test_http.py index 99e0396..f401bc0 100644 --- a/tests/test_http.py +++ b/tests/test_http.py @@ -153,7 +153,13 @@ def test_download(temp_folder, logged): assert list(runez.readlines("README.txt")) == ["Hello"] # With checksum validation - assert client.download("README.txt#sha256=a123", "README.txt", fatal=False) is None + assert not logged + r = client.download("README.txt#sha256=a123", "README.txt", fatal=False) + assert r.status_code == 400 + assert "sha256 differs for README.txt: expecting a123, got" in logged.pop() + with pytest.raises(runez.system.AbortException, match="sha256 differs"): + client.download("README.txt#sha256=a123", "README.txt") + assert "Deleted README.txt" in logged assert "sha256 differs for README.txt: expecting a123, got " in logged.pop() diff --git a/tests/test_inspector.py b/tests/test_inspector.py index 385a045..b7b4cbf 100644 --- a/tests/test_inspector.py +++ b/tests/test_inspector.py @@ -1,11 +1,10 @@ import os import sys -from unittest.mock import MagicMock, patch import pytest import runez -from runez.inspector import auto_import_siblings, AutoInstall, ImportTime +from runez.inspector import auto_import_siblings def importable_test_py_files(folder): @@ -61,53 +60,6 @@ def test_auto_import_siblings(): assert "tests.test_serialize" not in imported -class SomeClass: - @AutoInstall("bar") - def needs_bar(self, msg): - return "OK: %s" % msg - - -@AutoInstall("foo") -def needs_foo(msg): - import foo # noqa: F401 - - return "OK: %s" % msg - - -def test_auto_install(logged, monkeypatch): - # Verify that an already present req is a no-op - AutoInstall("runez").ensure_installed() - assert not logged - - # Verify failure to install raises abort exception - with patch("runez.inspector.run", return_value=runez.program.RunResult("failed")): - with pytest.raises(runez.system.AbortException): - needs_foo("hello") - assert "Can't auto-install 'foo': failed" in logged.pop() - - # Verify successful install exercises function call - with patch("runez.inspector.run", return_value=runez.program.RunResult("OK", code=0)): - with pytest.raises(ImportError): # 2nd import attempt raises ImportError (in this case, because we're trying a mocked 'foo') - needs_foo("hello") - assert not logged - - # Full successful call - with patch("runez.inspector.run", return_value=runez.program.RunResult("OK", code=0)): - assert SomeClass().needs_bar("hello") == "OK: hello" - assert not logged - - # Mocked successful import - with patch.dict("sys.modules", foo=MagicMock()), patch("runez.inspector.run", return_value=runez.program.RunResult("OK", code=0)): - assert needs_foo("hello") == "OK: hello" - assert not logged - - # Ensure auto-installation is refused unless we have a venv - monkeypatch.setattr(runez.SYS_INFO, "venv_bin_folder", None) - with pytest.raises(runez.system.AbortException): - needs_foo("hello") - assert "Can't auto-install 'foo' outside of a virtual environment" in logged.pop() - - def test_diagnostics_command(cli): cli.run("--no-color", "diagnostics") assert cli.succeeded @@ -115,21 +67,6 @@ def test_diagnostics_command(cli): assert "sys.executable : %s" % runez.short(sys.executable) in cli.logged -def test_importtime_command(cli): - trunez = ImportTime("runez") - assert str(trunez).startswith("runez ") - - cli.run("import-speed") - assert cli.failed - assert "Please specify module names, or use --all" in cli.logged - - cli.run("import-speed -i1 --all runez foo_no_such_module runez") - assert cli.succeeded - lines = cli.logged.stdout.contents().splitlines() - assert len([s for s in lines if "runez" in s]) == 1 - assert len([s for s in lines if "foo_no_such_module" in s]) == 1 - - def test_passthrough(cli): cli.run("passthrough") assert cli.failed diff --git a/tests/test_logsetup.py b/tests/test_logsetup.py index 5ccfd63..de5ce03 100644 --- a/tests/test_logsetup.py +++ b/tests/test_logsetup.py @@ -154,6 +154,14 @@ def test_context(temp_log): logging.info("hello") assert temp_log.pop() == "UTC INFO - hello" + # Verify that filters get cleaned up if `%(context)s` is removed + runez.log.spec.console_format = "%(timezone)s %(levelname)s - %(message)s" + assert runez.log.context.filter + runez.log.setup() + assert runez.log.context.filter is None + logging.info("hello") + assert temp_log.pop() == "UTC INFO - hello" + assert not runez.log.context.has_global() assert not runez.log.context.has_threadlocal() @@ -227,7 +235,6 @@ def test_deprecated(): assert runez.log.tests_path() == runez.DEV.tests_path() # deprecated -@pytest.mark.skipif(runez.SYS_INFO.platform_id.is_windows, reason="No /dev/null on Windows") def test_file_location_not_writable(temp_log): runez.log.setup( greetings="Logging to: {location}", @@ -429,13 +436,12 @@ def test_setup(temp_log, monkeypatch): assert runez.log.is_using_format("%(context) %(lineno)", fmt) is True assert runez.log.is_using_format("%(context)", "") is False - if not runez.SYS_INFO.platform_id.is_windows: - # signum=None is equivalent to disabling faulthandler - runez.log.enable_faulthandler(signum=None) - assert runez.log.faulthandler_signum is None - # We didn't call setup, so enabling faulthandler will do nothing - runez.log.enable_faulthandler() - assert runez.log.faulthandler_signum is None + # signum=None is equivalent to disabling faulthandler + runez.log.enable_faulthandler(signum=None) + assert runez.log.faulthandler_signum is None + # We didn't call setup, so enabling faulthandler will do nothing + runez.log.enable_faulthandler() + assert runez.log.faulthandler_signum is None cwd = os.getcwd() assert not runez.DRYRUN @@ -495,10 +501,8 @@ def test_setup(temp_log, monkeypatch): assert "DEBUG - hello" in temp_log.stdout.pop() assert not temp_log.stderr - if not runez.SYS_INFO.platform_id.is_windows and runez.logsetup.faulthandler: - # Available only in python3 - runez.log.enable_faulthandler() - assert runez.log.faulthandler_signum + runez.log.enable_faulthandler() + assert runez.log.faulthandler_signum assert runez.log.debug is True assert runez.DRYRUN is True @@ -512,8 +516,8 @@ def test_setup(temp_log, monkeypatch): def test_progress_bar(): p = runez.ProgressBar(range(2)) assert list(p) == [0, 1] - assert str(p) == "None/2" - assert p.rendered() is None + assert str(p) == "0/2" + assert p.rendered() == " 0%" with runez.ProgressBar(total=3, columns=4) as pb: assert pb.n == 0 @@ -526,15 +530,15 @@ def test_progress_bar(): pb.update() assert pb.rendered() == "▉▉▉▉100%" - assert pb.n is None - assert pb.rendered() is None + assert pb.n == 0 + assert pb.rendered() == " 0%" def test_progress_command(cli, monkeypatch): - cli.run("progress-bar", "-i10", "-d1", "--sleep", "0.01") + cli.run("progress-bar", "-v", "-i50", "-d1", "--sleep", "0.01") assert cli.succeeded assert "done" in cli.logged.stdout - assert "CPU usage" in cli.logged.stdout + assert "% CPU usage" in cli.logged.stdout monkeypatch.setitem(sys.modules, "psutil", None) cli.run("progress-bar", "-i10", "-d1", "--sleep", "0.01") diff --git a/tests/test_program.py b/tests/test_program.py index b4374d9..3890715 100644 --- a/tests/test_program.py +++ b/tests/test_program.py @@ -8,7 +8,7 @@ import pytest import runez -from runez.program import RunAudit, RunResult +from runez.program import PsInfo, RunAudit, RunResult from .conftest import exception_raiser @@ -49,12 +49,11 @@ def write(self, message): raise RuntimeError("oops, failed to write %s" % message) -@pytest.mark.skipif(runez.SYS_INFO.platform_id.is_windows, reason="Not supported on windows") def test_capture(monkeypatch): with runez.CurrentFolder(os.path.dirname(CHATTER)): # Check which finds programs in current folder assert runez.which("chatter") == CHATTER - assert runez.shell("chatter hello") == "hello" + assert runez.program.shell("chatter hello") == "hello" with runez.CaptureOutput(dryrun=True) as logged: # Dryrun mode doesn't fail (since it doesn't actually run the program) @@ -75,7 +74,7 @@ def test_capture(monkeypatch): assert runez.run(CHATTER, "hello", fatal=False) == RunResult("hello", "", 0) assert runez.run(CHATTER, "hello", fatal=True) == RunResult("hello", "", 0) assert "chatter hello" in logged.pop() - assert runez.run(CHATTER, stdout=None) == RunResult(None, "", 0) + assert runez.run(CHATTER, stdout=None) == RunResult("", "", 0) assert "Running:" in logged.pop() r = runez.run(CHATTER, "hello", fatal=True, passthrough=True) @@ -83,7 +82,7 @@ def test_capture(monkeypatch): crasher = CrashingWrite() r = runez.run(CHATTER, "hello", fatal=True, passthrough=crasher) - assert r == RunResult(None, None, 0) + assert r == RunResult("", "", 0) assert crasher.crash_counter assert "hello" in logged.pop() @@ -96,9 +95,9 @@ def test_capture(monkeypatch): assert r assert str(r) == "RunResult(exit_code=0)" assert r.succeeded - assert r.output is None - assert r.error is None - assert r.full_output is None + assert r.output == "" + assert r.error == "" + assert r.full_output == "" r = runez.run(CHATTER, "hello", path_env={"PATH": ":.", "CPPFLAGS": " -I/usr/local/opt/openssl/include"}) assert str(r) == "RunResult(exit_code=0)" @@ -203,7 +202,6 @@ def test_daemonize(*_): assert runez.program.daemonize() is None -@pytest.mark.skipif(runez.SYS_INFO.platform_id.is_windows, reason="Not supported on windows") def test_executable(temp_folder): with runez.CaptureOutput(dryrun=True) as logged: assert runez.make_executable("some-file") == 1 @@ -256,15 +254,15 @@ def check_process_tree(pinfo, max_depth=10): def test_ps(): - assert runez.PsInfo.from_pid(None) is None - assert runez.PsInfo.from_pid(0) is None + assert PsInfo.from_pid(None) is None + assert PsInfo.from_pid(0) is None - p = runez.PsInfo() + p = PsInfo() check_process_tree(p) - assert p == runez.PsInfo(0) - assert p == runez.PsInfo("0") - assert p == runez.PsInfo(os.getpid()) - assert p == runez.PsInfo("%s" % os.getpid()) + assert p == PsInfo(0) + assert p == PsInfo("0") + assert p == PsInfo(os.getpid()) + assert p == PsInfo("%s" % os.getpid()) info = p.info assert info["PID"] in str(p) @@ -276,10 +274,10 @@ def test_ps(): parent = p.parent assert parent.pid == p.ppid - # Verify that both variants (user name or uid number) for UID work + # Verify that both variants (username or uid number) for UID work uid = p.uid userid = p.userid - p = runez.PsInfo() + p = PsInfo() if runez.to_int(info["UID"]) is None: p.info["UID"] = uid @@ -291,7 +289,7 @@ def test_ps(): # Edge case: verify __eq__ based on pid p.pid = 0 - assert p != runez.PsInfo(0) + assert p != PsInfo(0) def simulated_ps_output(pid, ppid, cmd): @@ -303,11 +301,8 @@ def simulated_tmux(program, *args, **_): if program == "tmux": return RunResult(output="3", code=0) - if program == "id": - if args[0] == "-un": - return RunResult(output="root", code=0) - - return RunResult(output="0", code=0) + if program == "id" and args[0] == "-un": + return RunResult(output="root", code=0) assert program == "ps" pid = args[1] @@ -328,8 +323,8 @@ def simulated_tmux(program, *args, **_): def test_ps_follow(): with patch("runez.program.run", side_effect=simulated_tmux): - assert runez.PsInfo.from_pid(-1) is None - bad_pid = runez.PsInfo(-1) + assert PsInfo.from_pid(-1) is None + bad_pid = PsInfo(-1) assert str(bad_pid) == "-1 None None" assert bad_pid.cmd is None assert bad_pid.cmd_basename is None @@ -343,7 +338,7 @@ def test_ps_follow(): assert bad_pid.parent_list(follow=True) == [] assert bad_pid.parent_list(follow=False) == [] - p = runez.PsInfo() + p = PsInfo() assert p.cmd == "/dev/null/some-test foo bar" assert p.cmd_basename == "/dev/null/some-test" # Falls back to using 1st sequence with space as basename assert p.uid == 0 @@ -358,7 +353,7 @@ def test_ps_follow(): with patch("runez.program.is_executable", side_effect=lambda x: x == "/dev/null/some-test foo"): # Edge case: verify that `ps` lack of quoting is properly detected - p = runez.PsInfo() + p = PsInfo() assert p.cmd == "/dev/null/some-test foo bar" assert p.cmd_basename == "some-test foo" @@ -438,7 +433,6 @@ def test_which(): assert pp == ps -@pytest.mark.skipif(runez.SYS_INFO.platform_id.is_windows, reason="Not supported on windows") def test_wrapped_run(monkeypatch): original = ["python", "-mvenv", "foo"] monkeypatch.delenv("PYCHARM_HOSTED", raising=False) diff --git a/tests/test_pyenv.py b/tests/test_pyenv.py index 7d29865..c6d277f 100644 --- a/tests/test_pyenv.py +++ b/tests/test_pyenv.py @@ -189,6 +189,13 @@ def test_depot_path(): assert depot.find_python(None) is depot.invoker +def test_edge_cases(): + invalid = PythonSpec("cpython", "invalid") + assert str(invalid) == "cpython:invalid" + assert invalid.abi_suffix == "" + assert invalid.version is None + + def test_empty_depot(): depot = PythonDepot() assert depot.representation() == "No PythonDepot locations configured" diff --git a/tests/test_schema.py b/tests/test_schema.py index 19dfbf3..8aac49b 100644 --- a/tests/test_schema.py +++ b/tests/test_schema.py @@ -1,8 +1,24 @@ import datetime import logging +import pytest + import runez -from runez.schema import Any, Boolean, Date, Datetime, Dict, Enum, Float, Integer, List, String, Struct, UniqueIdentifier +from runez.schema import ( + Any, + Boolean, + Date, + Datetime, + Dict, + Enum, + Float, + Integer, + List, + String, + Struct, + UniqueIdentifier, + ValidationException, +) from runez.serialize import Serializable, SerializableDescendants, with_behavior @@ -130,6 +146,21 @@ def test_list(): assert sorted(ll.converted({1, "2"})) == [1, 2] +def declare_bogus_class(): + # Declare a bogus class (with strict=False, default), to verify that unknown types are ignored + class Bogus(Serializable): + a = object + b = object() + + return Bogus + + +def test_invalid(): + bogus = declare_bogus_class() + m = bogus._meta + assert str(m) == "Bogus (0 attributes, 0 properties)" + + def test_number(): ff = Float() assert str(ff) == "Float" @@ -144,8 +175,8 @@ def test_number(): assert ff.converted("0o10") == 8.0 -class Car(Serializable, with_behavior(extras=(logging.info, "foo bar"))): - make = String +class Car(Serializable, with_behavior(strict=True, extras=(logging.info, "foo bar"))): + make = str serial = UniqueIdentifier year = Integer @@ -191,8 +222,9 @@ def test_serializable(logged): assert str(Serializable._meta.behavior) == "lenient" - assert str(Car._meta.behavior) == "extras: function 'info', ignored extras: [foo, bar]" - assert str(SpecializedCar._meta.behavior) == "extras: function 'debug'" # extras are NOT inherited + assert str(Car._meta.behavior) == "strict: class runez.schema.ValidationException, extras: function 'info', ignored extras: [foo, bar]" + # Verify that extras are NOT inherited + assert str(SpecializedCar._meta.behavior) == "strict: class runez.schema.ValidationException, extras: function 'debug'" assert Car._meta.attributes_by_type(String) == ["make", "serial"] assert Car._meta.attributes_by_type(Integer) == ["year"] assert SpecializedCar._meta.attributes_by_type(Integer) == ["year"] @@ -202,6 +234,7 @@ def test_serializable(logged): assert str(Person._meta.behavior) == "strict: function 'error', extras: function 'debug', hook: function 'info'" # `hook` is inherited assert str(GPerson._meta.behavior) == "strict: function 'error', extras: function 'debug', hook: function 'info'" + assert str(Person._meta.attributes["name"]) == "String (default: joe)" # Verify that most specific type wins (GPerson -> age) assert Person._meta.attributes_by_type(Integer) == ["fingerprint"] @@ -218,6 +251,9 @@ def test_serializable(logged): assert "Extra content given for Car: baz" in logged.pop() assert car.to_dict() == {"serial": "bar"} + with pytest.raises(ValidationException, match=r"Can't deserialize Car.year: expecting int, got 'foo'"): + Car.from_dict({"year": "foo"}) + pp = Person() assert pp.age is None assert pp.fingerprint is None diff --git a/tests/test_serialize.py b/tests/test_serialize.py index 70e5abf..e480eb8 100644 --- a/tests/test_serialize.py +++ b/tests/test_serialize.py @@ -7,7 +7,7 @@ import runez import runez.conftest -from runez.schema import determined_schema_type, Dict, Integer, List, String, Struct, UniqueIdentifier, ValidationException +from runez.schema import Integer, List, String, Struct, UniqueIdentifier, ValidationException from runez.serialize import add_meta, ClassMetaDescription, same_type, SerializableDescendants, type_name, with_behavior from .conftest import exception_raiser @@ -46,10 +46,11 @@ class SomeSerializable(runez.Serializable, with_behavior(strict=True)): @classmethod def do_something_on_class(cls, value): + """Simulating class function""" cls._called = value - def do_something_on_instance(cls, value): - cls._called = value + def do_something_on_instance(self, value): + self._called = value @property def int_prod(self): @@ -68,32 +69,12 @@ def test_bogus_class(): with pytest.raises(ValidationException): class Bogus(runez.Serializable): - """This class shouldn't have to unique identifiers""" + """This class shouldn't have 2 unique identifiers""" id1 = UniqueIdentifier id2 = UniqueIdentifier -def test_determined_schema_type(): - assert str(determined_schema_type("a")) == "String (default: a)" - assert str(determined_schema_type(5)) == "Integer (default: 5)" - - assert str(determined_schema_type(str)) == "String" - assert str(determined_schema_type(int)) == "Integer" - assert str(determined_schema_type(dict)) == "Dict[Any, Any]" - assert str(determined_schema_type(list)) == "List[Any]" - assert str(determined_schema_type(set)) == "List[Any]" - assert str(determined_schema_type(tuple)) == "List[Any]" - - assert str(determined_schema_type(List)) == "List[Any]" - assert str(determined_schema_type(List(Integer))) == "List[Integer]" - assert str(determined_schema_type(Dict(String, List(Integer)))) == "Dict[String, List[Integer]]" - - with pytest.raises(ValidationException) as e: - determined_schema_type(object()) - assert "Invalid schema definition" in str(e.value) - - def test_from_json(): assert runez.from_json(None) is None assert runez.from_json("") is None diff --git a/tests/test_slotted.py b/tests/test_slotted.py index b425ead..7ad006d 100644 --- a/tests/test_slotted.py +++ b/tests/test_slotted.py @@ -1,163 +1,43 @@ import pytest -import runez +from runez.system import Slotted, UNSET -class Slotted1(runez.Slotted): - __slots__ = ["a1", "b1"] +class Sample(Slotted): + __slots__ = ("a", "b") - prop1a = runez.AdaptedProperty(default="1a") # No validation, accepts anything as-is - prop1b = runez.AdaptedProperty(default="1b", doc="p1b") # Allows to verify multiple anonymous properties work - prop1c = runez.AdaptedProperty(caster=int) # Allows to verify caster approach - prop1d = runez.AdaptedProperty(type=int, default=123) # Allows to verify type approach - - @runez.AdaptedProperty(default="p2") - def prop2(self, value): - """No validation, accepts anything as-is""" - return value - - @runez.AdaptedProperty - def prop3(self, value): - """Requires something that can be turned into an int""" - return int(value) - - @runez.AdaptedProperty(default=4) - def prop4(self, value): - """Requires something that can be turned into an int""" - return int(value) - - def _get_defaults(self): - return runez.UNSET - - -class Slotted2(runez.Slotted): - __slots__ = ["name", "other"] - - -def test_adapted_properties(): - s1a = Slotted1() - - # Check class-level properties - s1ac = s1a.__class__ - assert isinstance(s1ac.prop1a, runez.AdaptedProperty) - assert s1ac.prop1a.__doc__ is None - assert s1ac.prop1b.__doc__ == "p1b" - assert s1ac.prop1c.__doc__ is None - assert s1ac.prop1d.__doc__ is None - assert s1ac.prop2.__doc__ == "No validation, accepts anything as-is" - assert s1ac.prop3.__doc__ == "Requires something that can be turned into an int" - - assert s1a.prop1a == "1a" - assert s1a.prop1b == "1b" - assert s1a.prop1c is None - assert s1a.prop1d == 123 - assert s1a.prop2 == "p2" - assert s1a.prop3 is None - assert s1a.prop4 == 4 - - # prop1a/b and prop2 have no validators - s1a.prop2 = 2 - assert s1a.prop2 == 2 - - s1a.prop1a = "foo" - assert s1a.prop1a == "foo" - assert s1a.prop1b == "1b" # Verify other anonymous props remain unchanged - assert s1a.prop1c is None - assert s1a.prop1d == 123 - - s1a.prop1b = 0 - s1a.prop1d = 234 - assert s1a.prop1a == "foo" - assert s1a.prop1b == 0 - assert s1a.prop1c is None - assert s1a.prop1d == 234 - - s1a.prop1c = "100" # prop1c has a caster - assert s1a.prop1a == "foo" - assert s1a.prop1b == 0 - assert s1a.prop1c == 100 - - s1a.prop1c = None # caster should accept None - assert s1a.prop1c is None - with pytest.raises(ValueError, match="invalid literal for int"): # but anything other than None must be an int - s1a.prop1c = "foo" - assert s1a.prop1c is None - - with pytest.raises(TypeError, match="not 'NoneType'"): - s1a.prop1d = None # prop1d uses type=int, and int() does not accept None - - # prop3 and prop4 insist on ints - s1a.prop3 = "30" - s1a.prop4 = 40 - assert s1a.prop3 == 30 - assert s1a.prop4 == 40 - - with pytest.raises(ValueError, match="invalid literal for int"): - s1a.prop3 = "foo" - - # Verify properties stay bound to their object - s1b = Slotted1(s1a) - assert s1b.prop1a == "1a" - assert s1b.prop1b == "1b" - assert s1b.prop2 == "p2" - assert s1b.prop3 is None - assert s1b.prop4 == 4 - - s1b.set({"prop1a": "foo"}) - assert s1b.prop1a == "foo" +def test_slotted(): + sample1 = Sample() + sample2 = Sample() + assert str(sample1) == "Sample()" + sample1.a = 10 + assert str(sample1) == "Sample(a=10)" + assert sample1 != sample2 + + # Exercise setting + sample2.set(a=sample1) + assert sample1 == sample2.a + sample2.set(a=sample1) # 2nd set to exercise replacing value + + sample3 = Sample() + sample3.set({"a": sample1}) + assert sample2 == sample3 -def test_insights(): class Foo: name = "testing" age = 10 with pytest.raises(TypeError, match="should be instance"): - runez.Slotted.fill_attributes(Foo, {}) + Slotted.fill_attributes(Foo, {}) foo = Foo() assert foo.name == "testing" assert foo.age == 10 - runez.Slotted.fill_attributes(foo, {"name": "my-name"}) + Slotted.fill_attributes(foo, {"name": "my-name"}) assert foo.name == "my-name" - runez.Slotted.fill_attributes(foo, {"name": runez.UNSET}) + Slotted.fill_attributes(foo, {"name": UNSET}) assert foo.name == "testing" # back to class default with pytest.raises(AttributeError, match="Unknown Foo key 'bar'"): - runez.Slotted.fill_attributes(foo, {"bar": 5}) - - -def test_slotted(): - s1a = Slotted1() - assert s1a.a1 is runez.UNSET - - s1a = Slotted1(a1="a1", b1="b1") - s1b = Slotted1(s1a) - assert str(s1b) == "Slotted1(a1=a1, b1=b1)" - assert s1a.a1 == "a1" - assert s1a.b1 == "b1" - assert s1a == s1b - - # Check properties - assert s1a.prop4 == 4 - assert s1b.prop4 == 4 - s1a.prop4 = 40 - assert s1a.prop4 == 40 - assert s1b.prop4 == 4 # Clone's property did not get modified - - s2 = Slotted2(other=s1a) - assert s2.name is None - assert s2.other.a1 == "a1" - s1a.a1 = "changed-a1" - assert s2.other.a1 == "a1" - - s2.set(other=s1a) - assert s2.other is not s1a - assert s2.other == s1a - assert s2.other.a1 == "changed-a1" - - s2.set(other="other") - assert s2.other == "other" - - s2.set(other=s1a) - assert s2.other == s1a + Slotted.fill_attributes(foo, {"bar": 5}) diff --git a/tests/test_system.py b/tests/test_system.py index 0b3c9f5..f269c47 100644 --- a/tests/test_system.py +++ b/tests/test_system.py @@ -175,7 +175,6 @@ def test_current_folder(temp_folder): def test_decode(): - assert runez.decode(None) is None assert runez.decode(" something ") == " something " assert runez.decode(" something ", strip=True) == "something" @@ -478,15 +477,6 @@ def test_platform_identification(): assert m1.is_system_lib("/usr/lib/foo.so") assert m1.is_system_lib("/System/Library/foo.so") - win = PlatformId("windows-x86_64") - assert str(win) == "windows-x86_64" - assert win.canonical_compress_extension() == "zip" - assert win.is_windows - assert win.composed_basename("cpython", "1.2.3") == "cpython-1.2.3-windows-x86_64.zip" - assert win.composed_basename("foo", extension="gz") == "foo-windows-x86_64.tar.gz" - assert win.composed_basename("foo", extension="tar.gz") == "foo-windows-x86_64.tar.gz" - assert win.composed_basename("foo", extension="zip") == "foo-windows-x86_64.zip" - def test_quoted(): assert runez.quoted(None) == "None" @@ -675,7 +665,7 @@ def test_terminal(monkeypatch): with patch.dict(os.environ, {"LC_TERMINAL": "", "TERM_PROGRAM": "", "TERM": ""}): # Simulate a known terminal - ps = runez.PsInfo() + ps = runez.program.PsInfo() ps.followed_parent.cmd = "/dev/null/tilix" ps.followed_parent.cmd_basename = "tilix" p = TerminalProgram(ps=ps) diff --git a/tests/test_testing.py b/tests/test_testing.py index 547844b..fabe9e9 100644 --- a/tests/test_testing.py +++ b/tests/test_testing.py @@ -43,9 +43,9 @@ def sample_main(): def test_cli_uninitialized(cli, monkeypatch): - from runez.conftest import cli as cli_fixture + from runez.conftest import ClickRunner - monkeypatch.setattr(cli_fixture, "default_main", None) + monkeypatch.setattr(ClickRunner, "default_main", None) with pytest.raises(AssertionError): # No main provided cli.run("hello no main") diff --git a/tox.ini b/tox.ini index 07bcec5..f593c38 100644 --- a/tox.ini +++ b/tox.ini @@ -1,12 +1,12 @@ [tox] # Running tests against min/max only to speed up local iterations (all versions tested in GH actions) -envlist = py{310,314}, coverage, style +envlist = py{310,314}, coverage, style, typecheck [testenv] setenv = COVERAGE_FILE={toxworkdir}/.coverage.{envname} usedevelop = True deps = -rtests/requirements.txt -commands = pytest {posargs:-vv --cov=src --cov-report=xml --junit-xml="{envdir}/junit.xml" --doctest-modules src tests} +commands = pytest {posargs:-vv --cov=src --cov-report=xml --doctest-modules src tests} [testenv:coverage] setenv = COVERAGE_FILE={toxworkdir}/.coverage @@ -35,6 +35,12 @@ deps = ruff commands = ruff check --fix ruff format +[testenv:typecheck] +usedevelop = True +deps = pyright + -rtests/requirements.txt +commands = pyright + [coverage:run] concurrency =