From 9563e5f120f6b50bf53a7a67d4bf4c48f98ed45b Mon Sep 17 00:00:00 2001 From: Sagi Kimhi Date: Fri, 20 Mar 2026 00:11:06 +0200 Subject: [PATCH 1/2] feat/enhance-regression-tui (#147) * feat(socx_tui): improve regression details, support saving/loading state * fix(socx_tui): fix header and footer panel colors Signed-off-by: Sagi Kimhi * chore(.gitignore): ignore regression dumps Signed-off-by: Sagi Kimhi * chore: bump version to 0.13.3 Signed-off-by: Sagi Kimhi --------- Signed-off-by: Sagi Kimhi --- .gitignore | 3 +- pyproject.toml | 2 +- src/socx/regression/regression.py | 303 +++++++++++++++++-- src/socx/regression/test.py | 57 +++- src/socx_plugins/regression/_run.py | 1 + src/socx_tui/regression/app.py | 11 + src/socx_tui/regression/details.py | 35 ++- src/socx_tui/regression/dialog.py | 91 ++++-- src/socx_tui/regression/tree.py | 10 + src/socx_tui/regression/widget.py | 121 ++++++-- src/socx_tui/static/tcss/regression/app.tcss | 2 + tests/test_regression_runtime.py | 53 ++++ tests/test_regression_tui.py | 131 +++++++- 13 files changed, 741 insertions(+), 79 deletions(-) diff --git a/.gitignore b/.gitignore index dcdd567..7417bdd 100644 --- a/.gitignore +++ b/.gitignore @@ -40,7 +40,6 @@ !*.yaml !*.toml !*.json -!*.example !LICENSE !Makefile @@ -50,3 +49,5 @@ !CHANGELOG.md !CONTRIBUTING.md !CODE_OF_CONDUCT.md + +workrun/ diff --git a/pyproject.toml b/pyproject.toml index f9fc82d..30b962b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -29,7 +29,7 @@ socx = 'socx.__main__:main' [project] name = "socx-cli" readme = "README.md" -version = "0.13.2" +version = "0.13.3" license = "Apache-2.0" authors = [{ name = "Sagi Kimhi", email = "sagi.kim5@gmail.com" }] maintainers = [{ name = "Sagi Kimhi", email = "sagi.kim5@gmail.com" }] diff --git a/src/socx/regression/regression.py b/src/socx/regression/regression.py index 661649c..d15d916 100644 --- a/src/socx/regression/regression.py +++ b/src/socx/regression/regression.py @@ -5,9 +5,10 @@ import asyncio as aio import anyio import logging +import re import time from collections import OrderedDict -from collections.abc import AsyncGenerator, Iterable +from collections.abc import AsyncGenerator, Iterable, Mapping from pathlib import Path from threading import RLock from typing import Self, Any, Annotated @@ -36,6 +37,25 @@ semaphore = anyio.Semaphore(max(1, settings.regression.max_runs_in_parallel)) +def _safe_dir_name(name: str, node_id: UUID4) -> str: + slug = re.sub(r"[^A-Za-z0-9._-]+", "-", name).strip("-").lower() + return f"{slug or 'item'}-{node_id}" + + +def _coerce_status(value: TestStatus | int | str) -> TestStatus: + if isinstance(value, TestStatus): + return value + if isinstance(value, int): + return TestStatus(value) + return TestStatus[value.strip().lower().title()] + + +def _coerce_result(value: TestResult | str) -> TestResult: + if isinstance(value, TestResult): + return value + return TestResult(value) + + class Regression(TestBase): """Manage and execute a collection of tests with concurrency control.""" @@ -78,7 +98,28 @@ def from_file( test_cls: type[TestBase] | None = None, **kwargs: Any, ) -> Self: - return cls._from_file(path, test_cls=test_cls, **kwargs) + return cls._from_file(path, name=name, test_cls=test_cls, **kwargs) + + @classmethod + @validate_call() + def load( + cls, + path: str | Path, + name: str | None = None, + test_cls: type[TestBase] | None = None, + **kwargs: Any, + ) -> Self: + path = TypeAdapter(FilePath).validate_python(path) + data = cls._read_data(path) + + if cls._looks_like_state(data): + return cls._from_state_data( + data, + output_dir=path.parent, + test_cls=test_cls or Test, + ) + + return cls._from_file(path, name=name, test_cls=test_cls, **kwargs) @computed_field @property @@ -187,7 +228,7 @@ async def start(self) -> None: self._done = aio.Queue() self._pending = aio.Queue() self.finished_time = None - self.started_time = time.perf_counter() + self.started_time = time.time() logger.info("regression starting...") try: @@ -196,7 +237,7 @@ async def start(self) -> None: for _ in range(self.run_limit): tg.start_soon(self._runner) finally: - self.finished_time = time.perf_counter() + self.finished_time = time.time() self._pause_event.set() self._running.clear() logger.info(f"regression {self.status.name.lower()}.") @@ -294,24 +335,146 @@ async def _runner(self) -> None: self.pending.task_done() semaphore.release() - def dump_state(self, output_dir: Path) -> None: - """Write the regression command results to their respective files.""" + def assign_output_dir(self, output_dir: Path) -> Path: + self.output_dir = output_dir + output_dir.mkdir(parents=True, exist_ok=True) + + for child in self.tests: + child_output_dir = output_dir / _safe_dir_name( + child.name, child.id + ) + if isinstance(child, Regression): + child.assign_output_dir(child_output_dir) + else: + child.output_dir = child_output_dir + + return output_dir + + def dump_state(self, output_dir: Path | None = None) -> Path: + """Write the regression state and test artifacts to disk.""" + root_output_dir = self.output_dir + if output_dir is not None: + root_output_dir = self.assign_output_dir(output_dir / self.name) + + if root_output_dir is None: + msg = "Regression output directory is not configured." + raise ValueError(msg) + logger.info("saving regression state and results to disk...") - file = output_dir / self.name / "state.yaml" - state = self.model_dump( - mode="json", - round_trip=True, - serialize_as_any=True, - include={"result", "status"}, - ) + self._persist_test_outputs() + file = root_output_dir / "state.yaml" + state = self._serialize_state(root_output_dir) file.parent.mkdir(parents=True, exist_ok=True) - file.touch(exist_ok=False) box.DDBox(state).to_yaml(str(file)) - logger.info(f"state and results saved to: '{output_dir}'.") + logger.info(f"state and results saved to: '{file}'.") + return file def _active_tests(self) -> list[TestBase]: return [test for test in self.tests if test.id in self._running] + def iter_leaf_tests(self) -> Iterable[TestBase]: + for test in self.tests: + if isinstance(test, Regression): + yield from test.iter_leaf_tests() + else: + yield test + + @property + def leaf_tests(self) -> list[TestBase]: + return list(self.iter_leaf_tests()) + + @property + def total_test_count(self) -> int: + return len(self.leaf_tests) + + @property + def completed_test_count(self) -> int: + return sum( + 1 + for test in self.iter_leaf_tests() + if test.status in (TestStatus.Finished, TestStatus.Terminated) + ) + + @property + def progress_ratio(self) -> float: + total = self.total_test_count + if total == 0: + return 0.0 + return min(1.0, self.completed_test_count / total) + + @property + def estimated_remaining_time(self) -> float | None: + total = self.total_test_count + completed = self.completed_test_count + elapsed = self.elapsed_time + + if total == 0 or elapsed is None: + return None + if completed >= total: + return 0.0 + if completed == 0 or elapsed <= 0: + return None + + rate = completed / elapsed + if rate <= 0: + return None + + return max(0.0, (total - completed) / rate) + + def _persist_test_outputs(self) -> None: + for child in self.tests: + if isinstance(child, Regression): + child._persist_test_outputs() + continue + + if ( + isinstance(child, Test) + and child.started_time is not None + and child.output_dir is not None + ): + child._prepare_output_files() + child._write_output_files() + + def _serialize_state(self, root_output_dir: Path) -> dict[str, Any]: + return self._serialize_node(self, root_output_dir) + + @classmethod + def _serialize_node( + cls, node: TestBase, root_output_dir: Path + ) -> dict[str, Any]: + state = { + "kind": "regression" if isinstance(node, Regression) else "test", + "id": str(node.id), + "name": node.name, + "started_time": node.started_time, + "finished_time": node.finished_time, + "status": node.status.name.lower(), + "result": node.result.value, + } + + if node.output_dir is not None and node.output_dir != root_output_dir: + state["output_dir"] = str( + node.output_dir.relative_to(root_output_dir) + ) + + if isinstance(node, Regression): + state["tests"] = [ + cls._serialize_node(child, root_output_dir) + for child in node.tests + ] + return state + + state["exec"] = str(node.exec) if node.exec is not None else None + if node.stdout_path is not None and node.stdout_path.exists(): + state["stdout_path"] = str( + node.stdout_path.relative_to(root_output_dir) + ) + if node.stderr_path is not None and node.stderr_path.exists(): + state["stderr_path"] = str( + node.stderr_path.relative_to(root_output_dir) + ) + return state + @classmethod @validate_call() def _from_file( @@ -326,20 +489,29 @@ def _from_file( path = TypeAdapter(FilePath).validate_python(path) name = name or path.stem test_cls = test_cls or Test + data = cls._read_data(path) + + settings.update(Box({name: data}), merge=False) + return cls._from_data(name, settings[name], test_cls) + + @staticmethod + def _read_data(path: Path) -> Mapping[str, Any]: + from box import Box match path.suffix.lower(): case ".yml" | ".yaml": - data = Box.from_yaml(filename=str(path)) + return Box.from_yaml(filename=str(path)) case ".toml": - data = Box.from_toml(filename=str(path)) + return Box.from_toml(filename=str(path)) case ".json": - data = Box.from_json(filename=str(path)) + return Box.from_json(filename=str(path)) case _: msg = f"Unsupported file format: '{path.suffix}'" raise ValueError(msg) - settings.update(Box({name: data}), merge=False) - return cls._from_data(name, settings[name], test_cls) + @staticmethod + def _looks_like_state(data: Mapping[str, Any]) -> bool: + return data.get("kind") == "regression" and "tests" in data @classmethod def _from_data( @@ -366,6 +538,97 @@ def _from_data( regressions.append(regression) return cls(name=name, tests=regressions) + @classmethod + def _from_state_data( + cls, + data: Mapping[str, Any], + output_dir: Path, + test_cls: type[TestBase], + ) -> Self: + node = cls._deserialize_node( + data, + root_output_dir=output_dir, + test_cls=test_cls, + parent_output_dir=None, + ) + if not isinstance(node, Regression): + msg = "State file must contain a root regression." + raise ValueError(msg) + return node + + @classmethod + def _deserialize_node( + cls, + data: Mapping[str, Any], + root_output_dir: Path, + test_cls: type[TestBase], + parent_output_dir: Path | None, + ) -> TestBase: + kind = str(data.get("kind", "")).strip().lower() + output_dir = cls._resolve_output_dir( + data, + root_output_dir=root_output_dir, + parent_output_dir=parent_output_dir, + ) + + if kind == "regression": + regression = cls( + id=data["id"], + name=data["name"], + started_time=data.get("started_time"), + finished_time=data.get("finished_time"), + tests=[], + ) + regression.output_dir = output_dir + regression.tests = [ + cls._deserialize_node( + child, + root_output_dir=root_output_dir, + test_cls=test_cls, + parent_output_dir=regression.output_dir, + ) + for child in data.get("tests", []) + ] + return regression + + test = test_cls( + id=data["id"], + name=data["name"], + exec=data.get("exec"), + started_time=data.get("started_time"), + finished_time=data.get("finished_time"), + ) + test.output_dir = output_dir + test.status = _coerce_status(data.get("status", TestStatus.Idle)) + test.result = _coerce_result(data.get("result", TestResult.NA)) + + if isinstance(test, Test): + for attr, relpath in ( + ("stdout", data.get("stdout_path")), + ("stderr", data.get("stderr_path")), + ): + if relpath: + file = root_output_dir / str(relpath) + if file.exists(): + setattr(test, attr, file.read_text(encoding="utf-8")) + + return test + + @classmethod + def _resolve_output_dir( + cls, + data: Mapping[str, Any], + *, + root_output_dir: Path, + parent_output_dir: Path | None, + ) -> Path: + relative_output_dir = data.get("output_dir") + if relative_output_dir: + return root_output_dir / str(relative_output_dir) + if parent_output_dir is None: + return root_output_dir + return parent_output_dir / _safe_dir_name(data["name"], data["id"]) + TreeNode = Annotated[ TestBase, diff --git a/src/socx/regression/test.py b/src/socx/regression/test.py index 13d6d70..1fa9f68 100644 --- a/src/socx/regression/test.py +++ b/src/socx/regression/test.py @@ -8,6 +8,7 @@ import time import uuid from enum import StrEnum, IntEnum, auto +from pathlib import Path from pydantic import ( BaseModel, @@ -59,6 +60,7 @@ class TestBase(BaseModel): _status: TestStatus = PrivateAttr(TestStatus.Idle) _process: aio.subprocess.Process | None = PrivateAttr(default=None) _termination_requested: bool = PrivateAttr(default=False) + _output_dir: Path | None = PrivateAttr(default=None) @computed_field @property @@ -84,12 +86,41 @@ def process(self) -> Process | None: return None return Process(self._process.pid) + @property + def output_dir(self) -> Path | None: + return self._output_dir + + @output_dir.setter + def output_dir(self, value: Path | None) -> None: + self._output_dir = value + + @property + def stdout_path(self) -> Path | None: + if self.output_dir is None: + return None + return self.output_dir / "stdout.txt" + + @property + def stderr_path(self) -> Path | None: + if self.output_dir is None: + return None + return self.output_dir / "stderr.txt" + @computed_field @property def started(self) -> bool: """Return ``True`` once ``start`` has spawned the subprocess.""" return self.status > TestStatus.Pending + @property + def elapsed_time(self) -> float | None: + """Return the elapsed runtime derived from wall-clock timestamps.""" + if self.started_time is None: + return None + + end_time = self.finished_time or time.time() + return max(0.0, end_time - self.started_time) + @property def finished(self) -> bool: """Return ``True`` if the test completed and recorded a result.""" @@ -188,6 +219,15 @@ async def restart(self) -> None: self.reset() await self.start() + def _prepare_output_files(self) -> None: + if self.output_dir is None: + return + + self.output_dir.mkdir(parents=True, exist_ok=True) + for path in (self.stdout_path, self.stderr_path): + if path is not None: + path.write_text("", encoding="utf-8") + class Test(TestBase): """Concrete test model with subprocess execution support.""" @@ -237,14 +277,16 @@ async def start(self) -> None: self.result = TestResult.NA self.stdout = "" self.stderr = "" - self.started_time = time.perf_counter() + self.started_time = time.time() self.finished_time = None self.status = TestStatus.Pending + self._prepare_output_files() if not self.exec: self.status = TestStatus.Terminated self.result = TestResult.Failed - self.finished_time = time.perf_counter() + self.finished_time = time.time() + self._write_output_files() return process = await aio.create_subprocess_exec( @@ -263,9 +305,10 @@ async def start(self) -> None: try: stdout, stderr = await process.communicate() finally: - self.finished_time = time.perf_counter() + self.finished_time = time.time() self.stderr = stderr.decode() if stderr else "" self.stdout = stdout.decode() if stdout else "" + self._write_output_files() returncode = process.returncode or 0 if self._termination_requested or returncode < 0: @@ -279,3 +322,11 @@ async def start(self) -> None: self.result = TestResult.Failed self._process = None + + def _write_output_files(self) -> None: + for path, content in ( + (self.stdout_path, self.stdout), + (self.stderr_path, self.stderr), + ): + if path is not None: + path.write_text(content, encoding="utf-8") diff --git a/src/socx_plugins/regression/_run.py b/src/socx_plugins/regression/_run.py index 68d9f4e..c549608 100644 --- a/src/socx_plugins/regression/_run.py +++ b/src/socx_plugins/regression/_run.py @@ -219,6 +219,7 @@ async def run_regression( path_in = file or _get_input_path() regression = populate_regression(path_in) output_dir = _get_output_path(regression) + regression.assign_output_dir(output_dir / regression.name) names_set = _get_names_to_run() progress = RegressionProgress(regression) diff --git a/src/socx_tui/regression/app.py b/src/socx_tui/regression/app.py index 53fe118..c2cdbf4 100644 --- a/src/socx_tui/regression/app.py +++ b/src/socx_tui/regression/app.py @@ -2,6 +2,7 @@ from __future__ import annotations +from contextlib import suppress from typing import Any, ClassVar from collections import ChainMap from collections.abc import Iterable @@ -38,6 +39,16 @@ def run(self, *args: Any, **kwargs: Any) -> int | None: kwargs = dict(ChainMap(kwargs, dict(inline=True))) return super().run(*args, **kwargs) + def exit( + self, + result: int | None = None, + return_code: int = 0, + message: Any | None = None, + ) -> None: + with suppress(Exception): + self.regression.persist_loaded_regression_state() + super().exit(result=result, return_code=return_code, message=message) + def compose(self) -> ComposeResult: """Lay out the application chrome shared between all screens.""" yield Header( diff --git a/src/socx_tui/regression/details.py b/src/socx_tui/regression/details.py index a3d4fdf..6013e36 100644 --- a/src/socx_tui/regression/details.py +++ b/src/socx_tui/regression/details.py @@ -1,5 +1,6 @@ from __future__ import annotations +from datetime import datetime from textwrap import wrap from typing import ClassVar @@ -79,9 +80,11 @@ def format_details(self, model: TestBase | None = None) -> str: f"**✅ Passed:** {self.count_results(model, TestResult.Passed)}", # noqa: E501 f"**💡 Status:** {self.format_status(model.status)}", f"**🚩 Result:** {self.format_result(model.result)}", - # f"**⌛ Elapsed Time:** {self.format_timedelta(model.time_elapsed)}", # noqa: E501, W505 + f"**⌛ Elapsed Time:** {self.format_timedelta(model.elapsed_time)}", # noqa: E501 f"**⌛ Started Time:** {self.format_time(model.started_time)}", # noqa: E501 f"**⌛ Finished Time:** {self.format_time(model.finished_time)}", # noqa: E501 + f"**📊 Progress:** `{self.format_progress(model)}`", + f"**⏳ ETA:** {self.format_timedelta(model.estimated_remaining_time)}", # noqa: E501 ] ) else: @@ -89,7 +92,7 @@ def format_details(self, model: TestBase | None = None) -> str: [ f"**💡 Status:** {self.format_status(model.status)}", f"**🚩 Result:** {self.format_result(model.result)}", - # f"**⌛ Elapsed Time:** {self.format_timedelta(model.time_elapsed)}", # noqa: E501, W505 + f"**⌛ Elapsed Time:** {self.format_timedelta(model.elapsed_time)}", # noqa: E501 f"**⌛ Started Time:** {self.format_time(model.started_time)}", # noqa: E501 f"**⌛ Finished Time:** {self.format_time(model.finished_time)}", # noqa: E501 "", @@ -123,7 +126,20 @@ def format_status(self, status: int | str | TestStatus) -> str: return status.name.lower() def format_time(self, value: float | None) -> str: - return "n/a" if value is None else f"{value:.3f}" + if value is None: + return "n/a" + + try: + if value >= 946684800: + return ( + datetime.fromtimestamp(value) + .astimezone() + .strftime("%Y-%m-%d %H:%M:%S %Z") + ) + except (OverflowError, OSError, ValueError): + pass + + return f"{value:.3f}s" def format_timedelta(self, value: int | float | None) -> str: if value is None: @@ -135,4 +151,15 @@ def format_timedelta(self, value: int | float | None) -> str: return f"{hours:02}h:{minutes:02}m:{seconds:02}s" def count_results(self, regression: Regression, result: TestResult) -> int: - return sum(1 for test in regression.tests if test.result is result) + return sum( + 1 for test in regression.iter_leaf_tests() if test.result is result + ) + + def format_progress(self, regression: Regression, width: int = 24) -> str: + total = regression.total_test_count + completed = regression.completed_test_count + ratio = regression.progress_ratio + filled = min(width, round(ratio * width)) + bar = "#" * filled + "-" * (width - filled) + percent = int(ratio * 100) + return f"[{bar}] {completed}/{total} ({percent}%)" diff --git a/src/socx_tui/regression/dialog.py b/src/socx_tui/regression/dialog.py index 93fcd42..d44a5e0 100644 --- a/src/socx_tui/regression/dialog.py +++ b/src/socx_tui/regression/dialog.py @@ -1,16 +1,16 @@ from __future__ import annotations -from socx_tui.regression.details import RegressionDetails -from textual.screen import ModalScreen, ScreenResultType from typing import ClassVar import rich.repr -from socx import TestBase +from socx import Test from textual.app import ComposeResult from textual.binding import Binding, BindingType -from textual.containers import ( - Vertical, -) +from textual.containers import Vertical +from textual.screen import ModalScreen, ScreenResultType +from textual.widgets import Static, TextArea + +from socx_tui.regression.bindings.vim.mode import VimModes class Dialog(Vertical): @@ -19,33 +19,88 @@ class Dialog(Vertical): pass +class ReadOnlyOutputArea(TextArea, can_focus=True, inherit_bindings=True): + """Read-only output viewer with keyboard navigation.""" + + BINDINGS: ClassVar[list[BindingType]] = TextArea.BINDINGS + VimModes.Normal + + @rich.repr.auto -class ActionDialog(ModalScreen[ScreenResultType]): +class TestOutputDialog(ModalScreen[ScreenResultType]): BINDINGS: ClassVar[list[BindingType]] = [ Binding( key="escape", action="dismiss(None)", description="Dismiss the dialog.", show=False, - ) + ), + Binding( + key="q", + action="dismiss(None)", + description="Dismiss the dialog.", + show=False, + ), ] - def __init__(self, model: TestBase, *args, **kwargs) -> None: + DEFAULT_CSS: ClassVar[str] = """ + TestOutputDialog { + align: center middle; + } + + #regression-output-dialog { + width: 92%; + height: 88%; + border: thick $accent; + background: $surface; + } + + #regression-output-title { + padding: 0 1; + height: auto; + text-style: bold; + } + + #regression-output-area { + height: 1fr; + } + """ + + def __init__(self, model: Test, *args, **kwargs) -> None: super().__init__(*args, **kwargs) self._model = model - self._details_view = RegressionDetails( - wrap_code=False, - id="regression-dialog-details", - name="regression-dialog-details", + self._output_view = ReadOnlyOutputArea( + self._format_output(model), + id="regression-output-area", + language="bash", + read_only=True, + soft_wrap=False, + show_cursor=True, + show_line_numbers=True, ) def compose(self) -> ComposeResult: with Dialog( - id="regression-dialog-content", - name="regression-dialog-content", + id="regression-output-dialog", + name="regression-output-dialog", ): - yield self._details_view + yield Static( + f"{self._model.name} stdout / stderr", + id="regression-output-title", + ) + yield self._output_view def on_mount(self) -> None: - self._details_view.model = self._model - self._details_view.focus() + self._output_view.focus() + + def _format_output(self, model: Test) -> str: + stdout = model.stdout or "" + stderr = model.stderr or "" + return "\n".join( + [ + "===== STDOUT =====", + stdout, + "", + "===== STDERR =====", + stderr, + ] + ) diff --git a/src/socx_tui/regression/tree.py b/src/socx_tui/regression/tree.py index 6233fb3..de422d0 100644 --- a/src/socx_tui/regression/tree.py +++ b/src/socx_tui/regression/tree.py @@ -6,6 +6,7 @@ import rich.repr from textual.binding import BindingType +from textual.message import Message from textual.widgets import Tree from socx_tui.regression.bindings.vim.mode import VimModes @@ -15,6 +16,13 @@ class VimTree(Tree[object], can_focus=True, inherit_bindings=True): """Interactive tree widget with Vim-style navigation bindings.""" + class OpenCursorNode(Message): + """Posted when Enter is pressed on the current tree node.""" + + def __init__(self, node: object) -> None: + self.node = node + super().__init__() + BINDINGS: ClassVar[list[BindingType]] = Tree.BINDINGS + VimModes.Normal ICON_NODE: ClassVar[str] = "👉 " @@ -30,4 +38,6 @@ def action_select_cursor(self) -> None: node = self.cursor_node if node is not None and node.allow_expand: node.toggle() + if node is not None: + self.post_message(self.OpenCursorNode(node)) super().action_select_cursor() diff --git a/src/socx_tui/regression/widget.py b/src/socx_tui/regression/widget.py index 6a842df..d388f4e 100644 --- a/src/socx_tui/regression/widget.py +++ b/src/socx_tui/regression/widget.py @@ -1,26 +1,25 @@ from __future__ import annotations import logging +from datetime import datetime from pathlib import Path -from typing import ClassVar, Any +from typing import Any, ClassVar import rich.repr from rich.text import Text -from socx import Regression, TestBase, TestResult, TestStatus -from textual import work, on +from socx import Regression, Test, TestBase, TestResult, TestStatus, settings +from textual import on, work from textual.app import ComposeResult from textual.binding import Binding, BindingType -from textual.containers import ( - Container, -) +from textual.containers import Container from textual.timer import Timer from textual.widget import Widget from textual.widgets import Button, Tree from textual_fspicker import FileOpen -from socx_tui.regression.tree import VimTree -from socx_tui.regression.dialog import ActionDialog from socx_tui.regression.details import RegressionDetails +from socx_tui.regression.dialog import TestOutputDialog +from socx_tui.regression.tree import VimTree logger = logging.getLogger(__name__) @@ -103,6 +102,7 @@ def __init__(self, **kwargs: Any) -> None: classes="layout", ) self._refresh_timer: Timer | None = None + self._refresh_enabled = False @property def selected_model(self) -> TestBase | None: @@ -110,6 +110,10 @@ def selected_model(self) -> TestBase | None: data = getattr(node, "data", None) return data if isinstance(data, TestBase) else None + @property + def refresh_enabled(self) -> bool: + return self._refresh_enabled + def compose(self) -> ComposeResult: yield self.content yield self.button_layout @@ -117,7 +121,7 @@ def compose(self) -> ComposeResult: async def on_mount(self) -> None: self.regression_tree.focus() self._refresh_timer = self.set_interval( - 1 / 5, self._refresh_tree_state + 1 / 5, self._refresh_tree_state, pause=True ) def show_text(self, message: str | Text) -> None: @@ -160,6 +164,7 @@ async def action_resume_selected(self) -> None: self._no_model_selected_notification() return if model.is_suspended(): + self._set_refresh_timer(True) await model.resume() self._refresh_tree_state() @@ -170,17 +175,16 @@ async def action_restart_selected(self) -> None: return self._restart_model(model) - @work(exclusive=False) - @on(Tree.NodeSelected) - async def _select_node(self, event: Tree.NodeSelected) -> None: - model = self.selected_model - - if model is None: - self._no_model_selected_notification() + @on(VimTree.OpenCursorNode) + async def on_vim_tree_open_cursor_node( + self, event: VimTree.OpenCursorNode + ) -> None: + model = getattr(event.node, "data", None) + if not isinstance(model, Test): return event.stop() - await self.app.push_screen_wait(ActionDialog(model)) + self.app.push_screen(TestOutputDialog(model)) def on_button_pressed(self, event: Button.Pressed) -> None: match event.button.id: @@ -207,6 +211,7 @@ async def load_regression_from_file(self) -> None: @work(exclusive=False) async def _start_model(self, model: TestBase) -> None: + self._set_refresh_timer(True) if model.is_suspended(): await model.resume() else: @@ -220,17 +225,32 @@ async def _pause_model(self, model: TestBase) -> None: @work(exclusive=False) async def _restart_model(self, model: TestBase) -> None: + self._set_refresh_timer(True) await model.restart() self._refresh_tree_state() async def load_regression_from_path(self, path: Path) -> Regression: - """Load regressions from ``path`` into the tree.""" - regression = Regression.from_file(path=path) + """Load regressions or saved state from ``path`` into the tree.""" + regression = Regression.load(path=path) + if regression.output_dir is None: + regression.assign_output_dir( + self._create_session_output_dir(regression) + ) + self.loaded_regression = regression self._populate_tree(regression) + self._refresh_tree_state() self.regression_tree.focus() return regression + def persist_loaded_regression_state(self) -> Path | None: + if self.loaded_regression is None: + return None + + file = self.loaded_regression.dump_state() + self.log.info(f"Saved regression state to '{file}'.") + return file + async def _load_regression_from_file(self) -> None: file = await self._open_file_dialog() @@ -268,13 +288,8 @@ def on_tree_node_highlighted( def on_tree_node_selected(self, event: Tree.NodeSelected[object]) -> None: model = event.node.data - if not isinstance(model, TestBase): - return - - if isinstance(model, Regression) and event.node.allow_expand: - event.node.toggle() - - self.show_details(model) + if isinstance(model, TestBase): + self.show_details(model) def _populate_tree(self, regression: Regression) -> None: root = self.regression_tree.root @@ -290,10 +305,12 @@ def _populate_tree(self, regression: Regression) -> None: if first_node is None: self.show_details(regression) + self._sync_refresh_timer() return self.regression_tree.move_cursor(first_node) self.show_details(first_node.data) + self._sync_refresh_timer() def _top_level_regressions( self, regression: Regression @@ -327,9 +344,16 @@ def _add_regression_node( return node def _refresh_tree_state(self) -> None: - if self.loaded_regression is None: + regression = self.loaded_regression + if regression is None: + self._set_refresh_timer(False) + return + + self._sync_refresh_timer() + if self.app.screen is not self.screen: return + labels_changed = False for node in self.node_map.values(): model = getattr(node, "data", None) if not isinstance(model, TestBase): @@ -340,15 +364,53 @@ def _refresh_tree_state(self) -> None: if isinstance(model, Regression) else self._format_test_label(model) ) - node.set_label(label) + current_label = getattr(node.label, "plain", str(node.label)) + if current_label != label.plain: + node.set_label(label) + labels_changed = True - self.regression_tree.refresh() + if labels_changed: + self.regression_tree.refresh() if self.selected_model != self.details_view.model: self.details_view.model = self.selected_model else: self.details_view.refresh_details() + def _sync_refresh_timer(self) -> None: + regression = self.loaded_regression + should_refresh = regression is not None and regression.status in ( + TestStatus.Pending, + TestStatus.Running, + ) + self._set_refresh_timer(should_refresh) + + def _set_refresh_timer(self, enabled: bool) -> None: + if self._refresh_timer is None or self._refresh_enabled == enabled: + self._refresh_enabled = enabled + return + + self._refresh_enabled = enabled + if enabled: + self._refresh_timer.resume() + self._refresh_timer.reset() + else: + self._refresh_timer.pause() + + def _create_session_output_dir(self, regression: Regression) -> Path: + now = datetime.now().astimezone() + output_root = settings.regression.run.output.directory + base_dir = ( + Path(output_root) if isinstance(output_root, str) else output_root + ) + return ( + base_dir + / regression.name + / now.strftime("%Y-%m-%d") + / now.strftime("%H-%M-%S") + / regression.name + ) + def _format_regression_label(self, regression: Regression) -> Text: return Text.assemble( f"⚗️ {regression.name} ", @@ -401,7 +463,6 @@ def _format_test_label(self, test: TestBase) -> Text: def _format_test_status_label(self, test: TestBase) -> Text: status = f"💡 {self.details_view.format_status(test.status)}" result = f"🚩 {self.details_view.format_result(test.result)}" - # elapsed = f"⌛ {self.details_view.format_timedelta(test.time_elapsed)}" # noqa: E501, W505 return Text.assemble("[", "|".join([status, result]), "]") def _no_model_selected_notification(self) -> None: diff --git a/src/socx_tui/static/tcss/regression/app.tcss b/src/socx_tui/static/tcss/regression/app.tcss index 1fa6b5c..cccd27d 100644 --- a/src/socx_tui/static/tcss/regression/app.tcss +++ b/src/socx_tui/static/tcss/regression/app.tcss @@ -91,6 +91,7 @@ Button:hover { Header { width: 100%; height: auto; + background: $panel; padding: 0; margin: 0; } @@ -98,6 +99,7 @@ Header { Footer { width: 100%; height: auto; + background: $panel; padding: 0; margin: 0; } diff --git a/tests/test_regression_runtime.py b/tests/test_regression_runtime.py index 16288a7..772e21b 100644 --- a/tests/test_regression_runtime.py +++ b/tests/test_regression_runtime.py @@ -52,6 +52,11 @@ async def run_test() -> None: await regression.start() assert regression.status is TestStatus.Finished assert marker.read_text().splitlines() == ["run"] + assert regression.elapsed_time is not None + assert regression.total_test_count == 1 + assert regression.completed_test_count == 1 + assert regression.progress_ratio == 1.0 + assert regression.estimated_remaining_time == 0.0 await regression.restart() @@ -59,3 +64,51 @@ async def run_test() -> None: assert marker.read_text().splitlines() == ["run", "run"] asyncio.run(run_test()) + + +def test_regression_state_round_trips_with_test_outputs(tmp_path) -> None: + async def run_test() -> None: + regression = Regression( + name="smoke", + tests=[ + Test( + name="alpha", + exec="printf 'alpha out'; printf 'alpha err' >&2", + ) + ], + ) + session_dir = tmp_path / "session" + regression.assign_output_dir(session_dir / regression.name) + test = regression.tests[0] + + task = asyncio.create_task(regression.start()) + await _wait_for( + lambda: ( + test.output_dir is not None + and test.output_dir.is_dir() + and test.stdout_path is not None + and test.stdout_path.exists() + and test.stderr_path is not None + and test.stderr_path.exists() + ) + ) + await task + + state_file = regression.dump_state(session_dir) + loaded = Regression.load(state_file) + loaded_test = loaded.tests[0] + + assert regression.started_time is not None + assert regression.started_time > 946684800 + assert state_file.exists() + assert isinstance(loaded_test, Test) + assert loaded_test.stdout == "alpha out" + assert loaded_test.stderr == "alpha err" + assert loaded_test.finished + assert loaded_test.output_dir is not None + assert loaded_test.stdout_path is not None + assert loaded_test.stdout_path.read_text() == "alpha out" + assert loaded_test.stderr_path is not None + assert loaded_test.stderr_path.read_text() == "alpha err" + + asyncio.run(run_test()) diff --git a/tests/test_regression_tui.py b/tests/test_regression_tui.py index 85bcc82..1549129 100644 --- a/tests/test_regression_tui.py +++ b/tests/test_regression_tui.py @@ -2,10 +2,13 @@ import re import asyncio +from contextlib import contextmanager from textwrap import dedent from time import perf_counter +from socx import settings from socx_tui.regression.app import SoCX +from socx_tui.regression.dialog import TestOutputDialog as OutputDialog from socx_tui.regression.widget import RegressionWidget @@ -14,6 +17,22 @@ def _detail_text(widget: RegressionWidget) -> str: return widget.details_view.document.source +def _output_text(app: SoCX) -> str: + dialog = app.screen + assert isinstance(dialog, OutputDialog) + return dialog.query_one("#regression-output-area").text + + +@contextmanager +def _tui_output_dir(path): + original = settings.regression.run.output.directory + settings.regression.run.output.directory = path + try: + yield + finally: + settings.regression.run.output.directory = original + + async def _wait_for(predicate, max_wait: float = 3.0) -> None: deadline = perf_counter() + max_wait while perf_counter() < deadline: @@ -69,7 +88,8 @@ async def run_test() -> None: assert "Script:" in details assert "echo alpha" in details - asyncio.run(run_test()) + with _tui_output_dir(tmp_path / "workrun"): + asyncio.run(run_test()) def test_regression_tui_can_run_pause_resume_and_restart(tmp_path) -> None: @@ -126,5 +146,112 @@ async def run_test() -> None: assert marker.read_text().splitlines() == ["run", "run"] details = _detail_text(widget) assert bool(re.search(r"Result:.*passed", details, re.S)) + assert bool( + re.search( + r"Elapsed Time:.*\d{2}h:\d{2}m:\d{2}s", details, re.S + ) + ) + + with _tui_output_dir(tmp_path / "workrun"): + asyncio.run(run_test()) + + +def test_regression_tui_persists_state_and_opens_output_modal( + tmp_path, +) -> None: + path = tmp_path / "single.yaml" + path.write_text( + dedent( + """ + smoke: + - name: alpha + exec: + - printf alpha-out + - printf alpha-err >&2 + - sleep 0.5 + """ + ) + ) + + async def run_test() -> None: + saved_state_file = None + app = SoCX() + + async with app.run_test() as pilot: + widget = app.query_one(RegressionWidget) + regression = await widget.load_regression_from_path(path) + test = regression.tests[0].tests[0] + + assert not widget.refresh_enabled + await widget.action_start_selected() + await _wait_for(lambda: widget.refresh_enabled) + await _wait_for( + lambda: ( + test.status.name.lower() == "finished" + and not widget.refresh_enabled + ) + ) + + await pilot.press("space") + await pilot.pause() + await pilot.press("down") + await pilot.pause() + await pilot.press("enter") + await pilot.pause() + + assert isinstance(app.screen, OutputDialog) + output = _output_text(app) + assert "===== STDOUT =====" in output + assert "alpha-out" in output + assert "===== STDERR =====" in output + assert "alpha-err" in output + + await pilot.press("escape") + await pilot.pause() + + saved_state_file = regression.output_dir / "state.yaml" + app.exit() + await pilot.pause() + + assert saved_state_file is not None + assert saved_state_file.exists() + + restored_app = SoCX() + async with restored_app.run_test() as pilot: + widget = restored_app.query_one(RegressionWidget) + regression = await widget.load_regression_from_path( + saved_state_file + ) + test = regression.tests[0].tests[0] + + assert test.status.name.lower() == "finished" + assert test.stdout == "alpha-out" + assert test.stderr == "alpha-err" + details = _detail_text(widget) + assert bool( + re.search(r"Started Time:.*\d{4}-\d{2}-\d{2}", details, re.S) + ) + assert bool( + re.search(r"Finished Time:.*\d{4}-\d{2}-\d{2}", details, re.S) + ) + assert bool( + re.search( + r"Elapsed Time:.*\d{2}h:\d{2}m:\d{2}s", details, re.S + ) + ) + assert bool(re.search(r"Progress:.*1/1.*100%", details, re.S)) + assert bool(re.search(r"ETA:.*00h:00m:00s", details, re.S)) + + await pilot.press("space") + await pilot.pause() + await pilot.press("down") + await pilot.pause() + await pilot.press("enter") + await pilot.pause() + + output = _output_text(restored_app) + assert "alpha-out" in output + assert "alpha-err" in output - asyncio.run(run_test()) + with _tui_output_dir(tmp_path / "workrun"): + asyncio.run(run_test()) From aabf99b4fb408e602c85a73910c96fcfa1336904 Mon Sep 17 00:00:00 2001 From: Sagi Kimhi Date: Sat, 21 Mar 2026 05:59:52 +0200 Subject: [PATCH 2/2] Replace Dynaconf-based config loading with pydantic-settings stack --- README.md | 2 +- pyproject.toml | 1 - src/socx/config/_config.py | 4 +- src/socx/config/_settings.py | 399 +++++++++++----------------- src/socx/config/converters.py | 22 +- src/socx/config/encoders.py | 6 +- src/socx/config/serializers.py | 16 +- src/socx/config/validators.py | 129 ++------- src/socx_plugins/config/__init__.py | 4 +- src/socx_plugins/git/summary.py | 25 +- src/socx_plugins/git/utils.py | 7 +- 11 files changed, 216 insertions(+), 399 deletions(-) diff --git a/README.md b/README.md index 7773460..83fe3c0 100644 --- a/README.md +++ b/README.md @@ -96,7 +96,7 @@ pipx upgrade socx-cli > License-Expression: Apache-2.0 > Location: /home/skimhi/.project/git/users/sagikimhi/python/socx-worktree/develop/.venv/lib/python3.14/site-packages > Editable project location: /home/skimhi/.project/git/users/sagikimhi/python/socx-worktree/develop -> Requires: click, copier, dynaconf, gitpython, hoptex, jinja2, paramiko, pip, +> Requires: click, copier, gitpython, hoptex, jinja2, paramiko, pip, > platformdirs, plumbum, prompt-toolkit, psutil, pydantic, pygit2, python-box, > rich, rich-click, sh, textual, textual-speedups, typer, uv, werkzeug > Required-by: diff --git a/pyproject.toml b/pyproject.toml index 30b962b..853b96f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -67,7 +67,6 @@ dependencies = [ "textual", "plumbum", "pydantic", - "dynaconf<3.2.12", "paramiko", "werkzeug>=3.1.4", "gitpython", diff --git a/src/socx/config/_config.py b/src/socx/config/_config.py index d1c1b69..9bc697e 100644 --- a/src/socx/config/_config.py +++ b/src/socx/config/_config.py @@ -10,10 +10,9 @@ from pathlib import Path from collections import ChainMap -from dynaconf.utils import ensure_a_list +from socx.config._settings import ensure_a_list from werkzeug.local import LocalProxy -from socx.config import converters from socx.core import ( LOCAL_CONFIG_FILENAME, LOCAL_CONFIG_FILE, @@ -151,7 +150,6 @@ def get_settings( return rv -converters.init() _tokens = [] diff --git a/src/socx/config/_settings.py b/src/socx/config/_settings.py index d72c7f9..09ebbe4 100644 --- a/src/socx/config/_settings.py +++ b/src/socx/config/_settings.py @@ -1,342 +1,245 @@ -"""Dynaconf configuration wrappers used across SoCX.""" +"""Configuration wrappers used across SoCX.""" from __future__ import annotations -import logging -from typing import Any, Literal, ParamSpec, TypeVar +import json +import tomllib +from copy import deepcopy +from types import SimpleNamespace from pathlib import Path -from collections import ChainMap +from typing import Any, Literal from collections.abc import Callable -from dynaconf import LazySettings, get_history -from dynaconf.base import SourceMetadata, ensure_a_list -from dynaconf.utils.boxing import DynaBox -from dynaconf.utils.inspect import get_debug_info, _get_data_by_key -from dynaconf.utils.parse_conf import unparse_conf_data +import box +from jinja2 import Environment +from pydantic import Field from pydantic_core import to_jsonable_python - -from socx.config.serializers import SettingsSerializer - - -logger = logging.getLogger(__name__) - -T = TypeVar("T") -P = ParamSpec("P") -KT = TypeVar("KT", bound=str) -VT = TypeVar("VT") +from pydantic_settings import BaseSettings, SettingsConfigDict SETTINGS_DEFAULTS: dict[str, Any] = dict( - env="default", envvar="SOCX_SETTINGS_PATH", encoding="utf-8", - auto_cast=True, - load_dotenv=True, - environments=False, - dotted_lookup=True, - envvar_prefix="SOCX", - merge_enabled=True, - lowercase_read=True, - sysenv_fallback=True, - dotenv_override=False, - commentjson_enabled=True, - apply_default_on_none=False, ) -"""Default options passed to Dynaconf constructor in `get_settings`.""" -class Settings(LazySettings): - """Singleton settings instance of loaded `socx` configurations.""" +def ensure_a_list(value: Any) -> list[Any]: + if value is None: + return [] + if isinstance(value, list | tuple | set): + return list(value) + return [value] + + +class SettingsEnv(BaseSettings): + model_config = SettingsConfigDict(env_prefix="SOCX_", extra="ignore") + settings_path: str | None = Field(default=None, alias="SETTINGS_PATH") + + +class Settings(box.Box): + """Mutable runtime settings loaded from yaml/toml/json files.""" + + _root_path: str - def __init__(self, wrapped=None, **kwargs: Any) -> None: - kwargs = dict(ChainMap(kwargs, SETTINGS_DEFAULTS)) - LazySettings.__init__(self, wrapped=wrapped, **kwargs) - if hasattr(self, "dynaconf_include"): - for file in self.dynaconf_include: - if file not in self.loaded_files: - self.load_file(path=file) + def __init__(self, **kwargs: Any) -> None: + super().__init__({}, box_dots=True) + self._history: list[dict[str, Any]] = [] + self._loaded_files: list[str] = [] + self.settings_file: list[Path] = [Path(p) for p in ensure_a_list(kwargs.pop("settings_file", []))] + self.preload: list[Path] = [Path(p) for p in ensure_a_list(kwargs.pop("preload", []))] + self._root_path = str(kwargs.pop("root_path", Path.cwd())) + self.update(kwargs) + self._load_all() - def __contains__(self, key): - return self.exists(key) or ( - isinstance(key, str) and hasattr(self, key) - ) + def _load_all(self) -> None: + for file in [*self.preload, *self.settings_file]: + self.load_file(file) @property - def raw(self) -> dict[str, Any]: - """Return this instance with all nested values as raw values. + def dynaconf_include(self) -> list[Path]: + return [Path(p) for p in ensure_a_list(self.get("dynaconf_include", []))] + + @dynaconf_include.setter + def dynaconf_include(self, value: list[str | Path]) -> None: + self["dynaconf_include"] = [str(Path(v)) for v in ensure_a_list(value)] + + def __contains__(self, key: object) -> bool: + return bool(isinstance(key, str) and (self.exists(key) or hasattr(self, key))) - See Also: - --------- - get_raw : Further information regarding 'raw values'. + def exists(self, key: str) -> bool: + return self.get(key, default=None) is not None - """ - return self.encode(SettingsSerializer.serialize(self)) + @property + def raw(self) -> dict[str, Any]: + return self.encode(dict(self)) @property def root(self) -> Path: - """Get the root path of the current settings instance.""" return Path(self._root_path) @root.setter def root(self, value: str | Path) -> None: - if isinstance(value, str): - value = Path(value) - - if value.is_file() and value.exists(): - self.set( - "SETTINGS_FILE_FOR_DYNACONF", - value, - loader_identifier="init_settings_module", - ) - self.reload() + self._root_path = str(Path(value)) @property def history(self) -> tuple[dict[str, Any], ...]: - """Get the history of this instance. - - See Also: - --------- - get_history : Additional options for retrieving the settings history. - - """ return self.get_history() @property - def metadata(self) -> list[SourceMetadata]: - """Return the ``SourceMetadata`` of loaded settings history.""" - return self._loaded_by_loaders.keys() + def metadata(self) -> list[str]: + return self._loaded_files @property def excludes(self) -> list[str]: - """Get a list of settings paths and glob expressions not to load.""" - return self.get("SKIP_FILES_FOR_DYNACONF", []) + return self.get("skip_files", []) @excludes.setter def excludes(self, value: str | Path | list[str | Path]) -> None: - """Set the list of settings paths and glob expressions not to load.""" - excludes = [str(v) for v in ensure_a_list(value)] - self.update( - data={"SKIP_FILES_FOR_DYNACONF": excludes}, - merge=bool("merge" in excludes), - tomlfy=True, - ) - self.reload() + self["skip_files"] = [str(v) for v in ensure_a_list(value)] @property def includes(self) -> list[str]: - """Get a list of settings paths and glob expressions to load.""" - return [ - *self.get("DYNACONF_INCLUDE", []), - *[str(f) for f in self.settings_file], - ] + return [*self.get("dynaconf_include", []), *[str(f) for f in self.settings_file]] @includes.setter def includes(self, value: str | Path | list[str | Path]) -> None: - """Set the list of settings paths and glob expressions to load.""" - self.dynaconf_include = [*self.dynaconf_include, value] - self.reload() + self["dynaconf_include"] = [*self.get("dynaconf_include", []), *[str(v) for v in ensure_a_list(value)]] @property - def loaded_files(self): - """Get a list of paths to all loaded settings files.""" + def loaded_files(self) -> list[str]: return self._loaded_files @property def debug_info(self) -> dict[str, Any]: - """Return a ``dict`` with useful debugging information of settings.""" return self.get_debug_info() - def as_box(self, key: str | None = None) -> DynaBox: - """Get the current settings as a ``DynaBox`` instance.""" + def as_box(self, key: str | None = None) -> box.Box: rv = self.get_raw(key) - return DynaBox({key: rv}) if key is not None else DynaBox(rv) + return box.Box({key: rv}) if key is not None else box.Box(rv) def to_json(self, key: str | None = None) -> str: - """Serialize the current settings to JSON.""" return self.as_box(key).to_json() or "" def to_toml(self, key: str | None = None) -> str: - """Serialize the current settings to TOML.""" return self.as_box(key).to_toml() or "" def to_yaml(self, key: str | None = None) -> str: - """Serialize the current settings to YAML.""" return self.as_box(key).to_yaml() or "" - def get_raw( - self, - key: str | None = None, - default: Any | None = None, - sep: str | None = None, - ) -> Any: - """Return the raw value for key if such exists, else default. - - A raw value is the original value string as it was originally defined - in the a configuration file, prior to casting it into a python object - type. - - This is useful when writing back configuration files in order to - revert unserializable python types into their original configuration - string form. - - Parameters: - ----------- - key: str, optional - The dictionary key of the raw value to return. - default: Any, optional - A default value to return if the key does not exist. - sep: str, optional - An optional seperator in the original key that will be replaced - with a dot before performing a dotted lookup. - - Returns: - -------- - value_or_default: Any - The value in its raw configuration format or default if key does - not exist. - - """ + def get_raw(self, key: str | None = None, default: Any | None = None, sep: str | None = None) -> Any: if key is None: return self.raw + key = key.replace(sep or "__", ".") + return self.get(key, default=default) + + def get_history(self, key: str | None = None, limit: int = 0) -> tuple[dict[str, Any], ...]: + entries = self._history + if key: + entries = [entry for entry in entries if key in entry.get("data", {})] + if limit > 0: + entries = entries[-limit:] + return tuple(entries) + + def get_debug_info(self, key: str | None = None, verbosity: Literal[0, 1, 2] = 0) -> dict[str, Any]: + return { + "key": key, + "verbosity": verbosity, + "root": self._root_path, + "loaded_files": self._loaded_files, + "settings_file": [str(f) for f in self.settings_file], + } + + def update(self, data: dict[str, Any], merge: bool = True, **kwargs: Any) -> None: # type: ignore[override] + normalized = self.transform(data, self._normalize_key, skip_values=True) + super().merge_update(box.Box(normalized)) if merge else super().clear() or super().update(normalized) + self._history.append({"data": deepcopy(normalized)}) + + def reload(self) -> None: + keep = {"_history": self._history, "_loaded_files": []} + super().clear() + self._loaded_files = [] + self._history = keep["_history"] + self._load_all() + + def load_file(self, path: str | Path | list[str | Path]) -> None: + for value in ensure_a_list(path): + file = Path(value) + if not file.exists() or not file.is_file(): + continue + data = self._parse_file(file) + self.update(data) + resolved = str(file.resolve()) + if resolved not in self._loaded_files: + self._loaded_files.append(resolved) + + for include in ensure_a_list(self.get("dynaconf_include", [])): + include_path = Path(str(include)) + if include_path.exists() and include_path.is_file() and str(include_path.resolve()) not in self._loaded_files: + self.load_file(include_path) + + def get_environ(self, name: str, default: str | None = None) -> str | None: + import os + + return os.getenv(name, default) + + def _parse_file(self, path: Path) -> dict[str, Any]: + suffix = path.suffix.lower() + if suffix in {".yaml", ".yml"}: + data = box.Box.from_yaml(filename=str(path)).to_dict() + elif suffix == ".json": + data = json.loads(path.read_text()) + elif suffix == ".toml": + data = tomllib.loads(path.read_text()) + else: + return {} + return self._resolve_templates(data) - if self.exists(key): - return _get_data_by_key( - data=self.raw, - key_dotted_path=key, - default=default, - sep=(sep or "__"), - ) - - if hasattr(self, key): - return self.encode(getattr(self, key)) - - return default - - def get_history( - self, key: str | None = None, limit: int = 0 - ) -> tuple[dict[str, Any], ...]: - """Get up to limit entries of loaded data history. - - Parameters: - ----------- - key: str | None, optional - An optional key to by which to filter history entries in order - to search for changes applied to a specific settings key. - - limit : int, optional - If limit > 0, it specifies the maximum number of history entries - to append to the returned result. - - If limit == 0, all history entries are returned in the result. - - entries are sorted from newest to oldest, with the newest entry - being at index 0. - - Returns: - -------- - entries: tuple[dict[str, Any], ...] - A tuple of history entries where each entry is a transformation - that has been previously applied to the settings instance. - - The returned entries are ordered from oldest to newest. - """ - rv = [ - DynaBox(self.encode(entry)) - for i, entry in enumerate(get_history(obj=self, key=key)) - ] - rv.reverse() - return tuple(rv[: limit or -1]) - - def get_debug_info( - self, key: str | None = None, verbosity: Literal[0, 1, 2] = 0 - ) -> dict[str, Any]: - """Get a dict of debugging information about the settings instance.""" - return get_debug_info(settings=self, verbosity=verbosity, key=key) + def _resolve_templates(self, obj: Any) -> Any: + if isinstance(obj, dict): + return {k: self._resolve_templates(v) for k, v in obj.items()} + if isinstance(obj, list): + return [self._resolve_templates(v) for v in obj] + if isinstance(obj, str) and obj.startswith("@jinja"): + template = obj.removeprefix("@jinja").strip() + env = Environment() + env.filters["realpath"] = lambda value: str(Path(value).resolve()) + text = env.from_string(template).render(this=self, settings=self) + return text + return obj @classmethod def encode(cls, obj: Any) -> Any: - """Encode an object to a python serializable value.""" rv = cls.transform(obj, cls._encode, skip_values=False) - return cls.transform( - rv, cls._lowerfy, cls._normalize_key, skip_values=True - ) + return cls.transform(rv, cls._lowerfy, cls._normalize_key, skip_values=True) @classmethod - def transform( - cls, - obj: T, - *funcs: Callable[..., Any], - skip_values: bool = True, - ) -> T: - """Apply one or more callables to obj and return the result.""" + def transform(cls, obj: Any, *funcs: Callable[..., Any], skip_values: bool = True) -> Any: rv = obj for fn in funcs: rv = cls._transform(rv, fn, skip_values=skip_values) return rv @classmethod - def _transform( - cls, - obj: T, - fn: Callable[[T], T], - skip_values: bool = True, - ): + def _transform(cls, obj: Any, fn: Callable[[Any], Any], skip_values: bool = True): if isinstance(obj, dict): - rv = {} - - for k, v in obj.items(): - k = cls._transform(k, fn, skip_values=False) - v = cls._transform(v, fn, skip_values=skip_values) - rv[k] = v - - return rv - + return {cls._transform(k, fn, skip_values=False): cls._transform(v, fn, skip_values=skip_values) for k, v in obj.items()} if isinstance(obj, list | set | tuple): rv = [cls._transform(v, fn, skip_values=skip_values) for v in obj] - - if isinstance(obj, tuple): - return tuple(rv) - - if isinstance(obj, set): - return frozenset(rv) - - return rv - - if skip_values: - return obj - - return fn(obj) + return tuple(rv) if isinstance(obj, tuple) else frozenset(rv) if isinstance(obj, set) else rv + return obj if skip_values else fn(obj) @classmethod def _encode(cls, obj: Any): - rv = unparse_conf_data(obj) - - if isinstance(rv, Path): - return str(rv) - - if isinstance(rv, str): - if rv.startswith("@none"): - rv = rv.strip() - - return rv - - return to_jsonable_python( - rv, serialize_unknown=True, fallback=unparse_conf_data - ) + if isinstance(obj, Path): + return str(obj) + return to_jsonable_python(obj, serialize_unknown=True) @classmethod def _lowerfy(cls, obj: Any): - if isinstance(obj, str): - return obj.lower() - else: - return obj + return obj.lower() if isinstance(obj, str) else obj @classmethod def _normalize_key(cls, obj: Any): if isinstance(obj, str): - return ( - obj.lower() - .replace("_for_dynaconf", "") - .replace("dynaconf", "") - ) + return obj.lower().replace("_for_dynaconf", "").replace("dynaconf", "") return obj diff --git a/src/socx/config/converters.py b/src/socx/config/converters.py index cc413c2..7178768 100644 --- a/src/socx/config/converters.py +++ b/src/socx/config/converters.py @@ -38,9 +38,19 @@ validate_call, ConfigDict, ) -from dynaconf import add_converter -from dynaconf.utils.boxing import DynaBox -from dynaconf.utils.parse_conf import Lazy +import box + +DynaBox = box.Box + +class Lazy: + def __init__(self, value=None, casting=None): + self.value=value + self.casting=casting + def set_casting(self, casting): + self.casting=casting + return self + +_CONVERTERS: dict[str, Converter] = {} from click.utils import _detect_program_name from socx.core.funcs import fill @@ -860,15 +870,13 @@ def get_name(self) -> str: def add_converters(converters: Iterable[Converter]) -> None: """Register converters with Dynaconf using their inferred names.""" for converter in converters: - add_converter(converter.name, converter) + _CONVERTERS[converter.name] = converter def get_converters() -> Iterable[tuple[str, Converter]]: """Yield converter registrations, wrapping raw callables as needed.""" - from dynaconf.utils.parse_conf import converters - rv = [] - for name, cvt in converters.items(): + for name, cvt in _CONVERTERS.items(): if not isinstance(cvt, Converter): cvt = GenericConverter(name, cvt) rv.append((name, cvt)) diff --git a/src/socx/config/encoders.py b/src/socx/config/encoders.py index 2a88627..548cfe3 100644 --- a/src/socx/config/encoders.py +++ b/src/socx/config/encoders.py @@ -5,10 +5,10 @@ import box from typing import Any, Literal, override -from dynaconf import LazySettings from socx.core.encoder import Encoder from socx.config.serializers import SettingsSerializer +from socx.config._settings import Settings FormatType = Literal["yaml", "toml", "json"] @@ -21,12 +21,12 @@ def noop_str(*args, **kwargs): return "" -class SettingsEncoder(Encoder[LazySettings]): +class SettingsEncoder(Encoder[Settings]): @classmethod @override def encode( cls, - obj: LazySettings, + obj: Settings, key: str | None = None, merge: bool = False, format_: FormatType | None = None, diff --git a/src/socx/config/serializers.py b/src/socx/config/serializers.py index 974f046..dcf969d 100644 --- a/src/socx/config/serializers.py +++ b/src/socx/config/serializers.py @@ -6,10 +6,10 @@ from typing import Any, override from pydantic_core import to_jsonable_python -from dynaconf import LazySettings -from dynaconf.utils.boxing import DynaBox +import box from socx.core.serializer import Serializer +from socx.config._settings import Settings class ModuleSerializer(Serializer[ModuleType]): @@ -27,18 +27,18 @@ def serialize( return {name: attrs} -class SettingsSerializer(Serializer[LazySettings]): +class SettingsSerializer(Serializer[Settings]): @classmethod @override def serialize( cls, - obj: LazySettings, + obj: Settings, key: str | None = None, merge: bool = False, *args: Any, **kwargs: Any, - ) -> DynaBox: - """Serialize a ``LazySettings`` obj into a python ``dict``.""" + ) -> box.Box: + """Serialize a ``Settings`` obj into a python ``dict``.""" if key is None: - return DynaBox(obj.to_dict()) - return obj.get(key, cast=False, fresh=True) + return box.Box(obj.to_dict()) + return box.Box(obj.get(key)) diff --git a/src/socx/config/validators.py b/src/socx/config/validators.py index 951d1f9..d037ff7 100644 --- a/src/socx/config/validators.py +++ b/src/socx/config/validators.py @@ -1,121 +1,34 @@ -"""Dynaconf validator helpers for SoCX configuration.""" +"""Validation helpers for SoCX configuration.""" -from itertools import chain -from typing import ClassVar -from pathlib import Path -from collections.abc import Iterable - -from dynaconf import LazySettings -from dynaconf.validator import empty - - -__all__ = ( - "Validator", - "PathValidator", - "ValidationError", -) - - -from dynaconf.validator import Validator as Validator -from dynaconf.validator import ValidationError as ValidationError - - -class PathValidator: - """Validate include/exclude path configuration for converters.""" - - src: ClassVar[Path] - target: ClassVar[Path] - - @classmethod - def source_validator(cls, src: str | Path) -> bool: - """Validate that ``src`` points to an existing directory.""" - if not isinstance(src, Path): - src = Path(src) - return src.exists() and src.is_dir() - - @classmethod - def target_validator(cls, target: str | Path) -> bool: - """Ensure target either does not exist or resolves to a directory.""" - if not isinstance(target, Path): - target = Path(target) - return target.is_dir() or not target.exists() - - @classmethod - def includes_validator( - cls, src: Path, includes: Iterable[str], excludes: Iterable[str] - ) -> bool: - """Validate include patterns resolve to files once exclusions apply.""" - if not includes: - return False - if not isinstance(src, Path): - src = Path(src) - if not isinstance(includes, list | set | tuple): - return False - paths = cls._extract_includes(src, includes, excludes) - return bool(paths) and all(path.is_file() for path in paths) - - @classmethod - def _extract_includes( - cls, src: Path, includes: Iterable[str], excludes: Iterable[str] - ) -> set[Path]: - """Resolve include/exclude patterns into a set of concrete paths.""" - paths: set[Path] = set() - globpaths: set[Path] = set() - - if not isinstance(src, Path): - src = Path(src) - - for include in includes: - if "*" not in include: - paths.add(Path(src / include)) - else: - globpaths = globpaths.union(set(src.glob(str(include)))) - - for exclude in excludes: - if "*" not in exclude: - paths.discard(Path(src / exclude)) - else: - globpaths.difference_update(set(src.glob(str(exclude)))) - - return paths.union(globpaths) +from __future__ import annotations +from dataclasses import dataclass +from collections.abc import Iterable -def _convert_validators(settings: LazySettings) -> Iterable[Validator]: - """Build validators related to converter configuration blocks.""" +from socx.config._settings import Settings - def _source_validator(lang: str) -> Validator: - yield Validator( - f"convert.{lang}.source", - condition=PathValidator.source_validator, - must_exist=True, - ne=empty, - ) - def _target_validator(lang: str) -> Validator: - yield Validator( - f"convert.{lang}.target", - condition=PathValidator.target_validator, - must_exist=True, - ne=empty, - ) +class ValidationError(ValueError): + """Raised when configuration validation fails.""" - def _source_validators(settings: LazySettings) -> Iterable[Validator]: - yield from (_source_validator(lang) for lang in settings.convert) - def _target_validators(settings: LazySettings) -> Iterable[Validator]: - yield from (_target_validator(lang) for lang in settings.convert) +@dataclass(frozen=True) +class Validator: + """Lightweight compatibility validator.""" - yield from chain(_source_validator(settings), _target_validator(settings)) + names: tuple[str, ...] + def validate(self, settings: Settings) -> None: + for name in self.names: + if settings.get(name) is None: + msg = f"Missing required setting: {name}" + raise ValidationError(msg) -def get_validators(settings: LazySettings): - """Yield all validators expected for the provided ``settings``.""" - yield from _convert_validators(settings) +def get_validators(settings: Settings) -> Iterable[Validator]: + return () -def validate_all(settings: LazySettings, register: bool = False) -> None: - """Run validation against all registered validators.""" - if register: - settings.validators.register(get_validators(settings)) - settings.validators.validate_all() +def validate_all(settings: Settings, register: bool = False) -> None: + for validator in get_validators(settings): + validator.validate(settings) diff --git a/src/socx_plugins/config/__init__.py b/src/socx_plugins/config/__init__.py index bdf2ef8..9c25872 100644 --- a/src/socx_plugins/config/__init__.py +++ b/src/socx_plugins/config/__init__.py @@ -4,7 +4,7 @@ from textwrap import dedent -from dynaconf.utils.boxing import DynaBox +import box import rich from rich.syntax import Syntax from rich.json import JSON @@ -144,7 +144,7 @@ def noop(*args, **kwargs): history = [] entries = settings.encode(settings.get_history(key=key, limit=limit)) for entry in entries: - code = getattr(DynaBox(entry), f"to_{format_}", noop)() + code = getattr(box.Box(entry), f"to_{format_}", noop)() if format_ == "json": code = JSON(code).text.plain history.append(Syntax(code=code, lexer=format_, theme="ansi_dark")) diff --git a/src/socx_plugins/git/summary.py b/src/socx_plugins/git/summary.py index 2317e3b..69ca764 100644 --- a/src/socx_plugins/git/summary.py +++ b/src/socx_plugins/git/summary.py @@ -9,15 +9,13 @@ from collections.abc import Callable, Iterable from typing import Literal -from dynaconf.base import Lazy from git import Repo from rich.json import JSON from socx import settings, get_console, console from rich.text import Text from rich.table import Table from rich.console import Console, ConsoleOptions, RenderResult -from dynaconf.utils.boxing import DynaBox -from dynaconf.utils.parse_conf import apply_converter +import box from socx_plugins.git.utils import ( get_repo_name, @@ -36,9 +34,9 @@ class Summary: """Compute and present a multi-repository summary view.""" repos: list[Repo] - style: DynaBox + style: box.Box console: Console - columns: list[DynaBox] + columns: list[box.Box] headers: list[Text] records: list[list[Text]] @@ -57,7 +55,7 @@ def __init__(self, repos: Iterable[Repo]) -> None: } @classmethod - def get_header(cls, column: DynaBox, style: DynaBox) -> Text: + def get_header(cls, column: box.Box, style: box.Box) -> Text: """Build a styled header string for a summary column.""" header = column.get("name", "") header_style = " ".join( @@ -66,16 +64,15 @@ def get_header(cls, column: DynaBox, style: DynaBox) -> Text: return Text.from_markup(text=header, style=header_style) @classmethod - def get_column_func(cls, column: DynaBox) -> Callable[[Repo], str]: + def get_column_func(cls, column: box.Box) -> Callable[[Repo], str]: func = column.func if isinstance(func, str): - func = apply_converter("@symbol", column.func, settings) - if isinstance(func, Lazy): - func = func(settings) + from socx.config.converters import SymbolConverter + func = SymbolConverter()(column.func) return func @classmethod - def get_column_value(cls, column: DynaBox, repo: Repo) -> Text: + def get_column_value(cls, column: box.Box, repo: Repo) -> Text: """Render the column content for a single repository.""" text = str(cls.get_column_func(column)(repo)) style = column.get("style") or "" @@ -83,19 +80,19 @@ def get_column_value(cls, column: DynaBox, repo: Repo) -> Text: return content @classmethod - def get_record(cls, columns: Iterable[DynaBox], repo: Repo) -> list[Text]: + def get_record(cls, columns: Iterable[box.Box], repo: Repo) -> list[Text]: """Return all column values for ``repo``.""" return [cls.get_column_value(c, repo) for c in columns] @classmethod - def get_headers(cls, columns: list[DynaBox], style: DynaBox) -> list[Text]: + def get_headers(cls, columns: list[box.Box], style: box.Box) -> list[Text]: """Return the summary headers with style applied.""" rv = [cls.get_header(c, style) for c in columns] return rv @classmethod def get_records( - cls, columns: Iterable[DynaBox], repos: Iterable[Repo] + cls, columns: Iterable[box.Box], repos: Iterable[Repo] ) -> list[list[Text]]: """Convert repository metadata into row-wise records.""" return [cls.get_record(columns, repo) for repo in repos] diff --git a/src/socx_plugins/git/utils.py b/src/socx_plugins/git/utils.py index 899a5d0..bcb0fb7 100644 --- a/src/socx_plugins/git/utils.py +++ b/src/socx_plugins/git/utils.py @@ -9,7 +9,7 @@ import git from socx import settings from rich.text import Text -from dynaconf.utils.files import glob, deduplicate +from glob import glob def get_repo(path: str | Path) -> git.Repo | None: @@ -142,7 +142,7 @@ def find_repositories( root = cast(Path, root) def deduplicate_paths(paths: Iterable[Path]) -> list[Path]: - return list(map(Path, deduplicate([str(p) for p in paths]))) + return list(dict.fromkeys(Path(p) for p in paths)) def match_directories( root: Path, @@ -163,9 +163,8 @@ def match_directories( (root / path).relative_to(Path.cwd(), walk_up=True) for path in glob( pattern, - root_dir=root, + root_dir=str(root), recursive=recursive, - include_hidden=include_hidden, ) ] rv.extend(filter(Path.is_dir, paths))