diff --git a/docs/TECH_DEBT.md b/docs/TECH_DEBT.md index 0d9c913..a66a2eb 100644 --- a/docs/TECH_DEBT.md +++ b/docs/TECH_DEBT.md @@ -258,6 +258,63 @@ Interpretation: The hotspots above were checked against current code and targeted tests. +## Quick Wins (next 2-4 weeks) + +This section prioritizes low-risk, high-impact improvements for the three +highest-impact active debts. + +### A) Mutation saga monolítica + +1. **Extract phase helpers without changing orchestration order** + - Create private functions for: preflight/profile checks, SQL apply, vector + apply, rollback/recovery. + - **Why:** Reduces cognitive load and blast radius while preserving current + write semantics. +2. **Introduce a small typed runtime config object** + - Resolve toggles from `Settings` once and pass a typed object through the + mutation path. + - **Why:** Removes repeated dynamic branching and makes behavior easier to + reason about and test. +3. **Add phase-scoped structured telemetry** + - Emit consistent phase markers (`phase`, `doc_count`, `strategy`) around saga + steps. + - **Why:** Makes production debugging faster before deeper refactors. + +### B) Evaluación con sesgos metodológicos + +1. **Preserve retrieval scores in the callback contract** + - Move from `Sequence[str]` to a scored shape (`external_id`, `score`). + - **Why:** Avoids lossy rank-only evaluation and unlocks better diagnostics. +2. **Stop silently dropping unknown IDs** + - Treat unknown IDs as explicit anomalies (counter + optional report) or as + non-relevant retrieved docs. + - **Why:** Prevents false confidence by surfacing corpus/index drift. +3. **Emit per-query breakdown in compare mode** + - Add query-level metrics output alongside aggregate deltas. + - **Why:** Quick visibility into regressions hidden by averages. + +### C) Fragilidad de contratos CLI/HTTP + +1. **Single DTO validation path for CLI + HTTP + MCP** + - Reuse the same typed input models in all transports. + - **Why:** Eliminates semantic drift and duplicated parsing logic. +2. **Unify visible defaults and error shape** + - Align default values and error payload contract across entrypoints. + - **Why:** Reduces user confusion and makes integration tests more stable. +3. **Add transport parity tests for critical commands** + - Golden tests asserting same payload => same behavior in CLI and HTTP. + - **Why:** Catches drift early with low implementation cost. + +## Suggested execution order + +1. **Week 1-2:** transport parity + shared DTO validation (fast risk reduction). +2. **Week 2-3:** evaluator unknown-ID handling + scored callback contract. +3. **Week 3-4:** mutation saga phase extraction (behavior-preserving slice). + +This order is recommended because it first reduces outward-facing contract +fragility, then improves quality signals, and finally tackles internal +orchestration complexity with lower operational risk. + Targeted test command used during this review: ```bash diff --git a/src/local_rag_backend/cli_commands/docs/docs_mutate.py b/src/local_rag_backend/cli_commands/docs/docs_mutate.py index ab1c270..72dd0bf 100644 --- a/src/local_rag_backend/cli_commands/docs/docs_mutate.py +++ b/src/local_rag_backend/cli_commands/docs/docs_mutate.py @@ -5,17 +5,17 @@ from typing import Any, cast import click +from pydantic import ValidationError from local_rag_backend.cli_commands.runtime import ( build_dense_embedder, get_cli_container, run_cli_mutation, ) -from local_rag_backend.core.use_cases.docs_mutation import ( - MutationCoordinator, - MutationIntent, - MutationUpsertInput, +from local_rag_backend.core.services.docs_mutation_transport import ( + build_docs_mutation_intent_from_raw, ) +from local_rag_backend.core.use_cases.docs_mutation import MutationCoordinator, MutationIntent from local_rag_backend.core.use_cases.results import MutationSummary from local_rag_backend.settings import settings @@ -26,46 +26,11 @@ def _read_payload(payload_json: Path) -> dict[str, Any]: raise ValueError("--json must contain a JSON object payload") return cast("dict[str, Any]", payload) - def _build_intent(payload: dict[str, Any]) -> MutationIntent: - upserts_raw = payload.get("upserts") or [] - delete_ids_raw = payload.get("delete_ids") or [] - delete_external_ids_raw = payload.get("delete_external_ids") or [] - - if not isinstance(upserts_raw, list): - raise ValueError("payload.upserts must be a list") - if not isinstance(delete_ids_raw, list): - raise ValueError("payload.delete_ids must be a list") - if not isinstance(delete_external_ids_raw, list): - raise ValueError("payload.delete_external_ids must be a list") - - upserts: list[MutationUpsertInput] = [] - for item in upserts_raw: - if not isinstance(item, dict): - raise ValueError("each payload.upserts item must be an object") - md = item.get("metadata") - upserts.append( - MutationUpsertInput( - external_id=str(item.get("external_id") or "").strip(), - content=str(item.get("content") or "").strip(), - source_id=( - str(item.get("source_id")) if item.get("source_id") is not None else None - ), - scope=(str(item.get("scope")) if item.get("scope") is not None else None), - snapshot_id=( - str(item.get("snapshot_id")) if item.get("snapshot_id") is not None else None - ), - metadata=(cast("dict[str, Any]", md) if isinstance(md, dict) else None), - ) - ) - - return MutationIntent( - op_id=str(payload.get("op_id") or "").strip(), - upserts=tuple(upserts), - delete_ids=tuple(str(v) for v in delete_ids_raw), - delete_external_ids=tuple(str(v) for v in delete_external_ids_raw), - source="cli:docs:mutate", - ) + try: + return build_docs_mutation_intent_from_raw(payload, source="cli:docs:mutate") + except (ValidationError, ValueError) as exc: + raise ValueError(str(exc)) from exc @click.command("mutate-docs") diff --git a/src/local_rag_backend/core/services/docs_mutation_transport.py b/src/local_rag_backend/core/services/docs_mutation_transport.py new file mode 100644 index 0000000..c967c9d --- /dev/null +++ b/src/local_rag_backend/core/services/docs_mutation_transport.py @@ -0,0 +1,127 @@ +"""Shared transport validation/assembly for docs mutation payloads.""" + +from __future__ import annotations + +from collections.abc import Mapping +from typing import Any + +from pydantic import BaseModel, Field, field_validator, model_validator + +from local_rag_backend.core.use_cases.docs_mutation import MutationIntent, MutationUpsertInput + + +class MutationUpsertPayload(BaseModel): + external_id: str = Field(..., min_length=1, max_length=512) + content: str = Field(..., min_length=1, max_length=20000) + source_id: str | None = Field(default=None, max_length=1024) + scope: str | None = Field(default=None, min_length=1, max_length=512) + snapshot_id: str | None = Field(default=None, min_length=1, max_length=512) + metadata: dict[str, Any] | None = None + + @field_validator("external_id") + @classmethod + def _external_id_not_blank(cls, value: str) -> str: + value2 = value.strip() + if not value2: + raise ValueError("external_id must not be blank") + return value2 + + @field_validator("content") + @classmethod + def _content_not_blank(cls, value: str) -> str: + value2 = value.strip() + if not value2: + raise ValueError("content must not be blank") + return value2 + + +class DocsMutationPayload(BaseModel): + op_id: str | None = Field(default=None, min_length=1, max_length=128) + upserts: list[MutationUpsertPayload] = Field(default_factory=list, max_length=256) + delete_ids: list[str] = Field(default_factory=list, max_length=2048) + delete_external_ids: list[str] = Field(default_factory=list, max_length=2048) + + @field_validator("delete_ids") + @classmethod + def _normalize_delete_ids(cls, values: list[str]) -> list[str]: + normalized: list[str] = [] + seen: set[str] = set() + for doc_id in values: + doc_id_s = str(doc_id).strip() + if not doc_id_s or doc_id_s in seen: + continue + seen.add(doc_id_s) + normalized.append(doc_id_s) + return normalized + + @field_validator("delete_external_ids") + @classmethod + def _normalize_delete_external_ids(cls, values: list[str]) -> list[str]: + normalized: list[str] = [] + seen: set[str] = set() + for external_id in values: + ext_s = str(external_id).strip() + if not ext_s or ext_s in seen: + continue + seen.add(ext_s) + normalized.append(ext_s) + return normalized + + @model_validator(mode="after") + def _validate_payload(self) -> DocsMutationPayload: + if not self.upserts and not self.delete_ids and not self.delete_external_ids: + raise ValueError( + "docs/mutate requires at least one operation: upserts, delete_ids, or delete_external_ids" + ) + upsert_ext_ids = {str(item.external_id).strip() for item in self.upserts} + conflict = upsert_ext_ids & set(self.delete_external_ids) + if conflict: + raise ValueError( + "upserts and delete_external_ids cannot target the same external_id values" + ) + return self + + +def validate_docs_mutation_payload(payload: Mapping[str, Any]) -> DocsMutationPayload: + return DocsMutationPayload.model_validate(payload) + + +def build_docs_mutation_intent( + payload: DocsMutationPayload, + *, + source: str, +) -> MutationIntent: + return MutationIntent( + op_id=str(payload.op_id or "").strip(), + upserts=tuple( + MutationUpsertInput( + external_id=item.external_id, + content=item.content, + source_id=item.source_id, + scope=item.scope, + snapshot_id=item.snapshot_id, + metadata=item.metadata, + ) + for item in payload.upserts + ), + delete_ids=tuple(payload.delete_ids), + delete_external_ids=tuple(payload.delete_external_ids), + source=source, + ) + + +def build_docs_mutation_intent_from_raw( + payload: Mapping[str, Any], + *, + source: str, +) -> MutationIntent: + return build_docs_mutation_intent(validate_docs_mutation_payload(payload), source=source) + + +__all__ = [ + "DocsMutationPayload", + "MutationUpsertPayload", + "build_docs_mutation_intent", + "build_docs_mutation_intent_from_raw", + "validate_docs_mutation_payload", +] diff --git a/src/local_rag_backend/core/services/maintenance.py b/src/local_rag_backend/core/services/maintenance.py index aa3f5e9..09b7a96 100644 --- a/src/local_rag_backend/core/services/maintenance.py +++ b/src/local_rag_backend/core/services/maintenance.py @@ -52,6 +52,30 @@ class MultiStoreExternalIdDeleteResult: ) +def _preflight_vector_store_or_resolve_embedder( + *, + vec_repo: VectorRepoPort | None, + rebuild_on_index_failure: bool, + embedder: EmbedderPort | None, + embedder_factory: Callable[[], EmbedderPort] | None, +) -> EmbedderPort | None: + """Ensure vector preflight is safe before SQL mutation; resolve fallback embedder when needed.""" + resolved_embedder = embedder + if vec_repo is None or not rebuild_on_index_failure or resolved_embedder is not None: + return resolved_embedder + + try: + _ = vec_repo.ntotal + except Exception as preflight_err: + resolved_embedder = _resolve_embedder_or_raise( + current=resolved_embedder, + factory=embedder_factory, + cause=preflight_err, + message=_PREFLIGHT_MSG, + ) + return resolved_embedder + + def _resolve_embedder_or_raise( *, current: EmbedderPort | None, @@ -68,6 +92,35 @@ def _resolve_embedder_or_raise( return current +def _delete_vector_rows_or_rebuild( + *, + ids: Sequence[DocId], + doc_repo: DocumentRepoPort, + vec_repo: VectorRepoPort, + embedder: EmbedderPort | None, + embedder_factory: Callable[[], EmbedderPort] | None, + rebuild_on_index_failure: bool, +) -> tuple[int | None, bool]: + """Delete vectors by id; optionally rebuild from SQL state when deletion fails.""" + try: + deleted_index = vec_repo.delete(ids) + return deleted_index, False + except Exception as delete_err: + if not rebuild_on_index_failure: + raise + resolved_embedder = _resolve_embedder_or_raise( + current=embedder, + factory=embedder_factory, + cause=delete_err, + message=_CONSISTENCY_MSG, + ) + try: + rebuild_index_from_db(doc_repo=doc_repo, vec_repo=vec_repo, embedder=resolved_embedder) + return None, True + except Exception as rebuild_err: + raise RuntimeError(_REBUILD_FAIL_MSG) from rebuild_err + + def rebuild_index_from_db( *, doc_repo: DocumentRepoPort, @@ -111,18 +164,12 @@ def delete_documents_multi_store( deleted_sql=0, deleted_index=0 if vec_repo else None, rebuilt=False ) - resolved_embedder = embedder - - if vec_repo is not None and rebuild_on_index_failure and resolved_embedder is None: - try: - _ = vec_repo.ntotal - except Exception as preflight_err: - resolved_embedder = _resolve_embedder_or_raise( - current=resolved_embedder, - factory=embedder_factory, - cause=preflight_err, - message=_PREFLIGHT_MSG, - ) + resolved_embedder = _preflight_vector_store_or_resolve_embedder( + vec_repo=vec_repo, + rebuild_on_index_failure=rebuild_on_index_failure, + embedder=embedder, + embedder_factory=embedder_factory, + ) before = len(list(doc_repo.get(ids_list))) doc_repo.delete_documents(ids_list) @@ -131,29 +178,19 @@ def delete_documents_multi_store( if vec_repo is None: return MultiStoreDeleteResult(deleted_sql=deleted_sql, deleted_index=None, rebuilt=False) - try: - deleted_index = vec_repo.delete(ids_list) - return MultiStoreDeleteResult( - deleted_sql=deleted_sql, deleted_index=deleted_index, rebuilt=False - ) - except Exception as delete_err: - if not rebuild_on_index_failure: - raise - resolved_embedder = _resolve_embedder_or_raise( - current=resolved_embedder, - factory=embedder_factory, - cause=delete_err, - message=_CONSISTENCY_MSG, - ) - try: - rebuilt = rebuild_index_from_db( - doc_repo=doc_repo, vec_repo=vec_repo, embedder=resolved_embedder - ) - return MultiStoreDeleteResult( - deleted_sql=deleted_sql, deleted_index=None, rebuilt=rebuilt >= 0 - ) - except Exception as rebuild_err: - raise RuntimeError(_REBUILD_FAIL_MSG) from rebuild_err + deleted_index, rebuilt = _delete_vector_rows_or_rebuild( + ids=ids_list, + doc_repo=doc_repo, + vec_repo=vec_repo, + embedder=resolved_embedder, + embedder_factory=embedder_factory, + rebuild_on_index_failure=rebuild_on_index_failure, + ) + return MultiStoreDeleteResult( + deleted_sql=deleted_sql, + deleted_index=deleted_index, + rebuilt=rebuilt, + ) def delete_external_ids_multi_store( @@ -176,18 +213,12 @@ def delete_external_ids_multi_store( rebuilt=False, ) - resolved_embedder = embedder - - if vec_repo is not None and rebuild_on_index_failure and resolved_embedder is None: - try: - _ = vec_repo.ntotal - except Exception as preflight_err: - resolved_embedder = _resolve_embedder_or_raise( - current=resolved_embedder, - factory=embedder_factory, - cause=preflight_err, - message=_PREFLIGHT_MSG, - ) + resolved_embedder = _preflight_vector_store_or_resolve_embedder( + vec_repo=vec_repo, + rebuild_on_index_failure=rebuild_on_index_failure, + embedder=embedder, + embedder_factory=embedder_factory, + ) deleted_sql, deleted_ids, missing_external_ids, tombstoned = doc_repo.delete_by_external_ids( ext_ids @@ -202,34 +233,18 @@ def delete_external_ids_multi_store( rebuilt=False, ) - try: - deleted_index = vec_repo.delete(deleted_ids) - return MultiStoreExternalIdDeleteResult( - deleted_sql=deleted_sql, - deleted_index=deleted_index, - missing_external_ids=missing_external_ids, - tombstoned=tombstoned, - rebuilt=False, - ) - except Exception as delete_err: - if not rebuild_on_index_failure: - raise - resolved_embedder = _resolve_embedder_or_raise( - current=resolved_embedder, - factory=embedder_factory, - cause=delete_err, - message=_CONSISTENCY_MSG, - ) - try: - rebuilt = rebuild_index_from_db( - doc_repo=doc_repo, vec_repo=vec_repo, embedder=resolved_embedder - ) - return MultiStoreExternalIdDeleteResult( - deleted_sql=deleted_sql, - deleted_index=None, - missing_external_ids=missing_external_ids, - tombstoned=tombstoned, - rebuilt=rebuilt >= 0, - ) - except Exception as rebuild_err: - raise RuntimeError(_REBUILD_FAIL_MSG) from rebuild_err + deleted_index, rebuilt = _delete_vector_rows_or_rebuild( + ids=deleted_ids, + doc_repo=doc_repo, + vec_repo=vec_repo, + embedder=resolved_embedder, + embedder_factory=embedder_factory, + rebuild_on_index_failure=rebuild_on_index_failure, + ) + return MultiStoreExternalIdDeleteResult( + deleted_sql=deleted_sql, + deleted_index=deleted_index, + missing_external_ids=missing_external_ids, + tombstoned=tombstoned, + rebuilt=rebuilt, + ) diff --git a/src/local_rag_backend/http/routers/docs.py b/src/local_rag_backend/http/routers/docs.py index dec5882..1bba3d7 100644 --- a/src/local_rag_backend/http/routers/docs.py +++ b/src/local_rag_backend/http/routers/docs.py @@ -16,6 +16,7 @@ build_canonical_import_request_input, validate_canonical_import_payload, ) +from local_rag_backend.core.services.docs_mutation_transport import build_docs_mutation_intent from local_rag_backend.core.use_cases.docs_import import ( DEFAULT_IMPORT_MAX_BYTES, ImportDocsOutcome, @@ -29,11 +30,7 @@ execute_import_canonical_sync, ) from local_rag_backend.core.use_cases.docs_ingest import ingest_docs_sync -from local_rag_backend.core.use_cases.docs_mutation import ( - MutationCoordinator, - MutationIntent, - MutationUpsertInput, -) +from local_rag_backend.core.use_cases.docs_mutation import MutationCoordinator from local_rag_backend.core.use_cases.errors import ( BadRequestError, PayloadTooLargeError, @@ -171,21 +168,8 @@ async def mutate_docs( def _mutate_operation() -> MutationSummary: coordinator = MutationCoordinator(settings_obj=settings_obj, ports=mutation_bundle.ports) return coordinator.execute( - MutationIntent( - op_id=str(payload.op_id or "").strip(), - upserts=tuple( - MutationUpsertInput( - external_id=item.external_id, - content=item.content, - source_id=item.source_id, - scope=item.scope, - snapshot_id=item.snapshot_id, - metadata=item.metadata, - ) - for item in payload.upserts - ), - delete_ids=tuple(payload.delete_ids), - delete_external_ids=tuple(payload.delete_external_ids), + build_docs_mutation_intent( + payload, source="api:/docs/mutate", ) ) diff --git a/src/local_rag_backend/http/schemas/docs.py b/src/local_rag_backend/http/schemas/docs.py index 28c7a13..1dc1c21 100644 --- a/src/local_rag_backend/http/schemas/docs.py +++ b/src/local_rag_backend/http/schemas/docs.py @@ -2,10 +2,14 @@ from __future__ import annotations -from typing import Annotated, Any +from typing import Annotated -from pydantic import BaseModel, Field, field_validator, model_validator +from pydantic import BaseModel, Field +from local_rag_backend.core.services.docs_mutation_transport import ( + DocsMutationPayload, + MutationUpsertPayload, +) from local_rag_backend.http.schemas.rag_api_models import RetrievalFilterModel @@ -23,29 +27,8 @@ class IngestResponse(BaseModel): ids: list[str] -class UpsertDocItem(BaseModel): - external_id: str = Field(..., min_length=1, max_length=512) - content: str = Field(..., min_length=1, max_length=20000) - source_id: str | None = Field(default=None, max_length=1024) - scope: str | None = Field(default=None, min_length=1, max_length=512) - snapshot_id: str | None = Field(default=None, min_length=1, max_length=512) - metadata: dict[str, Any] | None = None - - @field_validator("external_id") - @classmethod - def _external_id_not_blank(cls, v: str) -> str: - v2 = v.strip() - if not v2: - raise ValueError("external_id must not be blank") - return v2 - - @field_validator("content") - @classmethod - def _content_not_blank(cls, v: str) -> str: - v2 = v.strip() - if not v2: - raise ValueError("content must not be blank") - return v2 +class UpsertDocItem(MutationUpsertPayload): + """Transport alias for docs mutation upsert items.""" class UpsertDocResult(BaseModel): @@ -84,62 +67,8 @@ class ImportResponse(BaseModel): ) -class DocsMutateRequest(BaseModel): - op_id: str | None = Field( - default=None, - min_length=1, - max_length=128, - description="Optional idempotency key for mutation replay safety.", - ) - upserts: list[UpsertDocItem] = Field(default_factory=list, max_length=256) - delete_ids: list[Annotated[str, Field(min_length=1, max_length=256)]] = Field( - default_factory=list, - max_length=2048, - ) - delete_external_ids: list[Annotated[str, Field(min_length=1, max_length=512)]] = Field( - default_factory=list, - max_length=2048, - ) - - @field_validator("delete_ids") - @classmethod - def _normalize_delete_ids(cls, v: list[str]) -> list[str]: - normalized: list[str] = [] - seen: set[str] = set() - for doc_id in v: - doc_id_s = doc_id.strip() - if not doc_id_s or doc_id_s in seen: - continue - seen.add(doc_id_s) - normalized.append(doc_id_s) - return normalized - - @field_validator("delete_external_ids") - @classmethod - def _normalize_delete_external_ids(cls, v: list[str]) -> list[str]: - normalized: list[str] = [] - seen: set[str] = set() - for ext in v: - ext_s = ext.strip() - if not ext_s or ext_s in seen: - continue - seen.add(ext_s) - normalized.append(ext_s) - return normalized - - @model_validator(mode="after") - def _validate_payload(self) -> DocsMutateRequest: - if not self.upserts and not self.delete_ids and not self.delete_external_ids: - raise ValueError( - "docs/mutate requires at least one operation: upserts, delete_ids, or delete_external_ids" - ) - upsert_ext_ids = {str(doc.external_id).strip() for doc in self.upserts} - conflict = upsert_ext_ids & set(self.delete_external_ids) - if conflict: - raise ValueError( - "upserts and delete_external_ids cannot target the same external_id values" - ) - return self +class DocsMutateRequest(DocsMutationPayload): + """HTTP request alias using the shared transport validation contract.""" class DocsMutateResponse(BaseModel): diff --git a/tests/unit/core/services/test_docs_mutation_transport.py b/tests/unit/core/services/test_docs_mutation_transport.py new file mode 100644 index 0000000..41123a6 --- /dev/null +++ b/tests/unit/core/services/test_docs_mutation_transport.py @@ -0,0 +1,48 @@ +from __future__ import annotations + +import pytest + +from local_rag_backend.core.services.docs_mutation_transport import ( + build_docs_mutation_intent_from_raw, + validate_docs_mutation_payload, +) + + +def test_validate_docs_mutation_payload_normalizes_delete_lists() -> None: + payload = validate_docs_mutation_payload( + { + "upserts": [{"external_id": "doc-1", "content": "hello"}], + "delete_ids": [" id-1 ", "id-1", "", "id-2"], + "delete_external_ids": [" ext-1 ", "ext-1", " ", "ext-2"], + } + ) + assert payload.delete_ids == ["id-1", "id-2"] + assert payload.delete_external_ids == ["ext-1", "ext-2"] + + +def test_validate_docs_mutation_payload_rejects_empty_operation() -> None: + with pytest.raises(ValueError, match="requires at least one operation"): + validate_docs_mutation_payload({}) + + +def test_validate_docs_mutation_payload_rejects_conflicting_external_ids() -> None: + with pytest.raises(ValueError, match="cannot target the same external_id"): + validate_docs_mutation_payload( + { + "upserts": [{"external_id": "doc-1", "content": "hello"}], + "delete_external_ids": ["doc-1"], + } + ) + + +def test_build_docs_mutation_intent_from_raw_preserves_source() -> None: + intent = build_docs_mutation_intent_from_raw( + { + "op_id": " op-1 ", + "upserts": [{"external_id": "doc-1", "content": "hello"}], + }, + source="cli:docs:mutate", + ) + assert intent.source == "cli:docs:mutate" + assert intent.op_id == "op-1" + assert len(intent.upserts) == 1