From c25213591b140bcd6b2a9f42276b0f31a3d371ce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B8rgen=20Andresen=20Osberg?= Date: Thu, 11 Jun 2026 14:21:54 +0200 Subject: [PATCH] feat(indexing): add canonical core index manifest --- docstra/core/__init__.py | 17 +- docstra/core/cli.py | 33 +- docstra/core/document_processing/document.py | 4 + docstra/core/indexing/code_index.py | 771 ++++++++++-------- docstra/core/indexing/model.py | 477 +++++++++++ docstra/core/indexing/repo_map.py | 655 +++++---------- docstra/core/ingestion/embeddings.py | 18 +- docstra/core/ingestion/storage.py | 34 +- docstra/core/retrieval/chroma.py | 21 +- docstra/core/retrieval/context_aware.py | 8 +- .../core/services/documentation_service.py | 35 +- docstra/core/services/ingestion_service.py | 56 +- docstra/core/services/query_service.py | 43 +- .../services/repository_explorer_service.py | 51 +- tests/test_core_index.py | 481 +++++++++++ tests/test_index_loading.py | 129 +++ 16 files changed, 1922 insertions(+), 911 deletions(-) create mode 100644 docstra/core/indexing/model.py create mode 100644 tests/test_core_index.py create mode 100644 tests/test_index_loading.py diff --git a/docstra/core/__init__.py b/docstra/core/__init__.py index 6b77494..db9ac88 100644 --- a/docstra/core/__init__.py +++ b/docstra/core/__init__.py @@ -5,6 +5,7 @@ """ from collections.abc import Generator +from pathlib import Path from docstra import __version__ as __version__ @@ -86,16 +87,28 @@ def setup_components(self): ) # Document indexer - self.document_indexer = DocumentIndexer(self.storage, self.embedding_generator) + self.document_indexer = DocumentIndexer( + self.storage, + self.embedding_generator, + codebase_root=str(Path.cwd()), + ) # Code indexer self.code_indexer = CodebaseIndexer( index_directory=f"{storage_dir}/index", exclude_patterns=self.config.processing.exclude_patterns, + codebase_root=str(Path.cwd()), + embedding_backend="chroma", + embedding_model=self.config.embedding.model_name, + source_kinds=["tree-sitter"], ) # Retriever - self.retriever = ChromaRetriever(self.storage, self.embedding_generator) + self.retriever = ChromaRetriever( + self.storage, + self.embedding_generator, + codebase_root=str(Path.cwd()), + ) # Hybrid retriever self.hybrid_retriever = HybridRetriever( diff --git a/docstra/core/cli.py b/docstra/core/cli.py index 25787d2..7ab8cbf 100644 --- a/docstra/core/cli.py +++ b/docstra/core/cli.py @@ -36,7 +36,8 @@ ) from docstra.core.document_processing.extractor import DocumentProcessor from docstra.core.documentation.generator import DocumentationGenerator -from docstra.core.indexing.code_index import CodebaseIndexer +from docstra.core.indexing.code_index import CodebaseIndex, CodebaseIndexer +from docstra.core.indexing.model import CORE_INDEX_FILENAME from docstra.core.ingestion.embeddings import EmbeddingFactory from docstra.core.ingestion.storage import ChromaDBStorage from docstra.core.llm.anthropic import AnthropicClient @@ -1689,16 +1690,25 @@ def _create_retrieval_eval_runner( user_config: UserConfig, abs_codebase_path: Path ) -> Callable[[str, int], List[Dict[str, Any]]]: _, chroma_path, index_path = _get_persist_paths(user_config, abs_codebase_path) + core_index_path = index_path / CORE_INDEX_FILENAME chroma_check_file = chroma_path / "chroma.sqlite3" - - if not index_path.exists() or not chroma_check_file.exists(): + legacy_index_artifacts = CodebaseIndex.legacy_artifacts_in(index_path) + legacy_repo_map = index_path.parent / "repo_map.json" + + if not core_index_path.exists() or not chroma_check_file.exists(): + migration_hint = "" + if legacy_index_artifacts or legacy_repo_map.exists(): + migration_hint = ( + " Legacy index artifacts were found. Rerun 'docstra ingest' " + "to rebuild the index in the new format." + ) raise FileNotFoundError( f"Codebase at {abs_codebase_path} is not fully initialized for " f"retrieval evaluation. ChromaDB path: {chroma_path} " f"(check file: {chroma_check_file}, exists: " - f"{chroma_check_file.exists()}), index path: {index_path} " - f"(exists: {index_path.exists()}). Run 'docstra init' and " - "'docstra ingest' first." + f"{chroma_check_file.exists()}), core index path: {core_index_path} " + f"(exists: {core_index_path.exists()}). Run 'docstra init' and " + f"'docstra ingest' first.{migration_hint}" ) embedding_generator = EmbeddingFactory.create_embedding_generator( @@ -1708,8 +1718,15 @@ def _create_retrieval_eval_runner( api_base=user_config.model.api_base, ) storage = ChromaDBStorage(persist_directory=str(chroma_path)) - base_retriever = ChromaRetriever(storage, embedding_generator) - code_indexer = CodebaseIndexer(index_directory=str(index_path)) + base_retriever = ChromaRetriever( + storage, + embedding_generator, + codebase_root=str(abs_codebase_path), + ) + code_indexer = CodebaseIndexer( + index_directory=str(index_path), + codebase_root=str(abs_codebase_path), + ) code_index = code_indexer.get_index() if code_index: diff --git a/docstra/core/document_processing/document.py b/docstra/core/document_processing/document.py index 1753830..dc34838 100644 --- a/docstra/core/document_processing/document.py +++ b/docstra/core/document_processing/document.py @@ -33,6 +33,10 @@ class DocumentType(str, Enum): TEXT = "text" OTHER = "other" + def __str__(self) -> str: + """Return the enum value instead of the enum representation.""" + return self.value + class DocumentMetadata(BaseModel): """Metadata for a document.""" diff --git a/docstra/core/indexing/code_index.py b/docstra/core/indexing/code_index.py index c2c4d98..0eaf768 100644 --- a/docstra/core/indexing/code_index.py +++ b/docstra/core/indexing/code_index.py @@ -1,354 +1,469 @@ -# File: ./docstra/core/indexing/code_index.py """ -Codebase indexing for efficient search and retrieval of code elements. +Codebase indexing facade backed by the canonical core index manifest. """ from __future__ import annotations -import json -import os from collections import defaultdict +import os from pathlib import Path -from typing import Any, Dict, List, Optional, Union +from typing import Any, Callable, Dict, Iterable, List, Optional, TypeVar, Union from docstra.core.document_processing.document import Document, DocumentType +from docstra.core.indexing.model import ( + CORE_INDEX_FILENAME, + CoreIndexBuilder, + CoreIndexManifest, + EmbeddingRef, + IndexedFile, + IndexedSymbol, + ImportRecord, + normalize_file_id, + resolve_file_path, +) + +LEGACY_INDEX_FILENAMES = [ + "symbol_index.json", + "file_index.json", + "import_index.json", + "function_index.json", + "class_index.json", +] + +RecordT = TypeVar("RecordT") class CodebaseIndex: """Index for efficient search and retrieval of code elements.""" - def __init__(self, index_directory: str = ".docstra/index"): - """Initialize the codebase index. - - Args: - index_directory: Directory to store the index - """ + def __init__( + self, + index_directory: str = ".docstra/index", + codebase_root: Optional[str] = None, + ): self.index_directory = index_directory + self.codebase_root = ( + str(Path(codebase_root).resolve()) if codebase_root else None + ) + self.manifest_path = Path(index_directory) / CORE_INDEX_FILENAME - # Ensure the directory exists os.makedirs(index_directory, exist_ok=True) - # Initialize index structures - self.symbol_index: Dict[str, List[Dict[str, Any]]] = defaultdict(list) - self.file_index: Dict[str, Dict[str, Any]] = {} - self.import_index: Dict[str, List[str]] = defaultdict(list) - self.function_index: Dict[str, List[Dict[str, Any]]] = defaultdict(list) - self.class_index: Dict[str, List[Dict[str, Any]]] = defaultdict(list) - - # Load existing indexes if they exist - self._load_indexes() - - def _load_indexes(self) -> None: - """Load existing indexes from disk.""" - symbol_index_path = Path(self.index_directory) / "symbol_index.json" - file_index_path = Path(self.index_directory) / "file_index.json" - import_index_path = Path(self.index_directory) / "import_index.json" - function_index_path = Path(self.index_directory) / "function_index.json" - class_index_path = Path(self.index_directory) / "class_index.json" - - if symbol_index_path.exists(): - with open(symbol_index_path, "r") as f: - self.symbol_index = defaultdict(list, json.load(f)) - - if file_index_path.exists(): - with open(file_index_path, "r") as f: - self.file_index = json.load(f) - - if import_index_path.exists(): - with open(import_index_path, "r") as f: - self.import_index = defaultdict(list, json.load(f)) - - if function_index_path.exists(): - with open(function_index_path, "r") as f: - self.function_index = defaultdict(list, json.load(f)) - - if class_index_path.exists(): - with open(class_index_path, "r") as f: - self.class_index = defaultdict(list, json.load(f)) - - def _save_indexes(self) -> None: - """Save indexes to disk.""" - symbol_index_path = Path(self.index_directory) / "symbol_index.json" - file_index_path = Path(self.index_directory) / "file_index.json" - import_index_path = Path(self.index_directory) / "import_index.json" - function_index_path = Path(self.index_directory) / "function_index.json" - class_index_path = Path(self.index_directory) / "class_index.json" - - with open(symbol_index_path, "w") as f: - json.dump(dict(self.symbol_index), f) - - with open(file_index_path, "w") as f: - json.dump(self.file_index, f) - - with open(import_index_path, "w") as f: - json.dump(dict(self.import_index), f) + self._manifest = CoreIndexManifest.empty() + self._files_by_id: Dict[str, IndexedFile] = {} + self._symbols_by_name: Dict[str, List[IndexedSymbol]] = defaultdict(list) + self._functions_by_name: Dict[str, List[IndexedSymbol]] = defaultdict(list) + self._classes_by_name: Dict[str, List[IndexedSymbol]] = defaultdict(list) + self._symbols_by_file: Dict[str, List[IndexedSymbol]] = defaultdict(list) + self._imports_by_source: Dict[str, List[ImportRecord]] = defaultdict(list) + self._imports_by_text: Dict[str, List[str]] = defaultdict(list) + self._dependencies_by_source: Dict[str, List[str]] = defaultdict(list) + self._dependents_by_target: Dict[str, List[str]] = defaultdict(list) + + self._load_manifest() + + @property + def manifest(self) -> CoreIndexManifest: + """Expose the loaded manifest.""" + return self._manifest + + @property + def has_manifest(self) -> bool: + """Return whether a persisted manifest is present.""" + return self.manifest_path.exists() + + @staticmethod + def legacy_artifacts_in(index_directory: str | Path) -> List[Path]: + """Return legacy sidecar index files that still exist in an index directory.""" + base = Path(index_directory) + return [ + base / name for name in LEGACY_INDEX_FILENAMES if (base / name).exists() + ] - with open(function_index_path, "w") as f: - json.dump(dict(self.function_index), f) + def _load_manifest(self) -> None: + """Load the persisted manifest or detect legacy artifacts.""" + if self.manifest_path.exists(): + self._manifest = CoreIndexManifest.model_validate_json( + self.manifest_path.read_text(encoding="utf-8") + ) + self._rebuild_lookups() + return + + legacy_paths = self.legacy_artifacts_in(self.index_directory) + if legacy_paths: + legacy_names = ", ".join(path.name for path in legacy_paths) + raise FileNotFoundError( + "Legacy Docstra index artifacts were found without a core index " + f"manifest ({legacy_names}). Rerun 'docstra ingest' to rebuild the " + "index in the new format." + ) - with open(class_index_path, "w") as f: - json.dump(dict(self.class_index), f) + self._manifest = CoreIndexManifest.empty() + self._rebuild_lookups() - def index_document(self, document: Document) -> None: - """Index a document. - - Args: - document: Document to index - """ - # Extract document path and normalize it - filepath = document.metadata.filepath - filepath = os.path.normpath(filepath) - - # Index file metadata - self.file_index[filepath] = { - "filepath": filepath, - "language": str(document.metadata.language), - "size_bytes": document.metadata.size_bytes, - "line_count": document.metadata.line_count, - "last_modified": document.metadata.last_modified, - "classes": document.metadata.classes, - "functions": document.metadata.functions, - "imports": document.metadata.imports, + def _rebuild_lookups(self) -> None: + """Rebuild in-memory lookup tables from the manifest.""" + self._files_by_id = { + indexed_file.id: indexed_file for indexed_file in self._manifest.files } - - # Index symbols - for symbol, lines in document.metadata.symbols.items(): - for line in lines: - self.symbol_index[symbol].append( - { - "filepath": filepath, - "line": line, - "language": str(document.metadata.language), - } - ) - - # Index imports - for import_stmt in document.metadata.imports: - self.import_index[import_stmt].append(filepath) - - # Index functions - for function_name in document.metadata.functions: - self.function_index[function_name].append( - { - "filepath": filepath, - "language": str(document.metadata.language), - } + self._symbols_by_name = defaultdict(list) + self._functions_by_name = defaultdict(list) + self._classes_by_name = defaultdict(list) + self._symbols_by_file = defaultdict(list) + self._imports_by_source = defaultdict(list) + self._imports_by_text = defaultdict(list) + self._dependencies_by_source = defaultdict(list) + self._dependents_by_target = defaultdict(list) + + for symbol in self._manifest.symbols: + self._symbols_by_name[symbol.name].append(symbol) + self._symbols_by_file[symbol.file_id].append(symbol) + if symbol.kind == "function": + self._functions_by_name[symbol.name].append(symbol) + elif symbol.kind == "class": + self._classes_by_name[symbol.name].append(symbol) + + for import_record in self._manifest.imports: + self._imports_by_source[import_record.source_file_id].append(import_record) + self._imports_by_text[import_record.raw_text].append( + import_record.source_file_id ) - # Index classes - for class_name in document.metadata.classes: - self.class_index[class_name].append( - { - "filepath": filepath, - "language": str(document.metadata.language), - } - ) + for edge in self._manifest.edges: + if edge.edge_type != "imports": + continue + self._dependencies_by_source[edge.source_id].append(edge.target_id) + self._dependents_by_target[edge.target_id].append(edge.source_id) + + def replace_manifest( + self, manifest: CoreIndexManifest, *, codebase_root: Optional[str] = None + ) -> None: + """Replace the in-memory manifest and rebuild lookup tables.""" + self._manifest = manifest + if codebase_root is not None: + self.codebase_root = str(Path(codebase_root).resolve()) + self._rebuild_lookups() + + def save(self) -> None: + """Persist the current manifest.""" + self.manifest_path.write_text( + self._manifest.model_dump_json(indent=2), encoding="utf-8" + ) + + def normalize_file_id(self, filepath: str) -> str: + """Normalize a path or id to the canonical file id shape.""" + return normalize_file_id(filepath, self.codebase_root) + + def resolve_file_path(self, filepath: str) -> Optional[Path]: + """Resolve a canonical file id to an absolute path when possible.""" + normalized = self.normalize_file_id(filepath) + return resolve_file_path(normalized, self.codebase_root) + + def iter_files(self) -> List[IndexedFile]: + """Return all indexed files.""" + return list(self._manifest.files) + + def iter_file_ids(self) -> List[str]: + """Return all indexed file ids.""" + return [indexed_file.id for indexed_file in self._manifest.files] - # Save the updated indexes - self._save_indexes() + def index_document(self, document: Document) -> None: + """Merge a single indexed document into the persisted manifest.""" + self.upsert_documents([document]) def index_documents(self, documents: List[Document]) -> None: - """Index multiple documents. + """Index multiple documents into a canonical manifest.""" + if documents and self.codebase_root is None: + absolute_paths = [ + str(Path(document.metadata.filepath).resolve()) + for document in documents + if Path(document.metadata.filepath).is_absolute() + ] + if absolute_paths: + self.codebase_root = os.path.commonpath(absolute_paths) + + manifest = CoreIndexBuilder.from_documents( + documents, + codebase_root=self.codebase_root or Path.cwd(), + embedding_backend=self._manifest.embedding_backend, + embedding_model=self._manifest.embedding_model, + source_kinds=self._manifest.source_kinds, + ) + self.replace_manifest(manifest) + self.save() + + def upsert_documents(self, documents: List[Document]) -> None: + """Merge one or more indexed documents into the existing manifest.""" + if not documents: + return + + if self.codebase_root is None: + absolute_paths = [ + str(Path(document.metadata.filepath).resolve()) + for document in documents + if Path(document.metadata.filepath).is_absolute() + ] + if absolute_paths: + self.codebase_root = os.path.commonpath(absolute_paths) + + updated_manifest = CoreIndexBuilder.from_documents( + documents, + codebase_root=self.codebase_root or Path.cwd(), + embedding_backend=self._manifest.embedding_backend, + embedding_model=self._manifest.embedding_model, + source_kinds=self._manifest.source_kinds, + known_files=self._manifest.files, + ) + merged_manifest = self._merge_manifest(updated_manifest) + self.replace_manifest(merged_manifest) + self.save() + + def _merge_manifest(self, updated_manifest: CoreIndexManifest) -> CoreIndexManifest: + """Replace manifest records for indexed files while preserving other files.""" + updated_file_ids = {indexed_file.id for indexed_file in updated_manifest.files} + if not updated_file_ids: + return self._manifest + + return CoreIndexManifest( + schema_version=updated_manifest.schema_version, + created_at=updated_manifest.created_at, + embedding_backend=updated_manifest.embedding_backend, + embedding_model=updated_manifest.embedding_model, + source_kinds=updated_manifest.source_kinds, + files=self._merge_records( + self._manifest.files, + updated_manifest.files, + lambda item: item.id in updated_file_ids, + ), + chunks=self._merge_records( + self._manifest.chunks, + updated_manifest.chunks, + lambda item: item.file_id in updated_file_ids, + ), + symbols=self._merge_records( + self._manifest.symbols, + updated_manifest.symbols, + lambda item: item.file_id in updated_file_ids, + ), + occurrences=self._merge_records( + self._manifest.occurrences, + updated_manifest.occurrences, + lambda item: item.file_id in updated_file_ids, + ), + imports=self._merge_records( + self._manifest.imports, + updated_manifest.imports, + lambda item: item.source_file_id in updated_file_ids, + ), + edges=self._merge_records( + self._manifest.edges, + updated_manifest.edges, + lambda item: item.source_id in updated_file_ids, + ), + embeddings=self._merge_records( + self._manifest.embeddings, + updated_manifest.embeddings, + lambda item: self._embedding_targets_file(item, updated_file_ids), + ), + docs=self._merge_records( + self._manifest.docs, + updated_manifest.docs, + lambda item: bool(updated_file_ids.intersection(item.source_file_ids)), + ), + ) + + @staticmethod + def _merge_records( + existing_records: List[RecordT], + updated_records: List[RecordT], + should_replace: Callable[[RecordT], bool], + ) -> List[RecordT]: + return [ + *[item for item in existing_records if not should_replace(item)], + *updated_records, + ] - Args: - documents: Documents to index - """ - for document in documents: - self.index_document(document) + @staticmethod + def _embedding_targets_file( + embedding: EmbeddingRef, updated_file_ids: set[str] + ) -> bool: + if embedding.target_id in updated_file_ids: + return True + for file_id in updated_file_ids: + if embedding.target_id.startswith(f"{file_id}#"): + return True + if embedding.target_id.startswith(f"{file_id}::"): + return True + return False def search_symbol(self, symbol: str) -> List[Dict[str, Any]]: - """Search for a symbol in the codebase. - - Args: - symbol: Symbol to search for - - Returns: - List of locations where the symbol is defined - """ - return self.symbol_index.get(symbol, []) + """Search for symbol definitions in the codebase.""" + return [ + self._symbol_location_payload(item) + for item in self._symbols_by_name.get(symbol, []) + ] def search_function(self, function_name: str) -> List[Dict[str, Any]]: - """Search for a function in the codebase. - - Args: - function_name: Function name to search for - - Returns: - List of locations where the function is defined - """ - return self.function_index.get(function_name, []) + """Search for function definitions in the codebase.""" + return [ + self._symbol_location_payload(item) + for item in self._functions_by_name.get(function_name, []) + ] def search_class(self, class_name: str) -> List[Dict[str, Any]]: - """Search for a class in the codebase. - - Args: - class_name: Class name to search for + """Search for class definitions in the codebase.""" + return [ + self._symbol_location_payload(item) + for item in self._classes_by_name.get(class_name, []) + ] - Returns: - List of locations where the class is defined - """ - return self.class_index.get(class_name, []) + def _symbol_location_payload(self, symbol: IndexedSymbol) -> Dict[str, Any]: + return { + "filepath": symbol.file_id, + "line": symbol.line, + "language": symbol.language, + "kind": symbol.kind, + "symbol_id": symbol.id, + } def get_files_by_language(self, language: Union[DocumentType, str]) -> List[str]: - """Get all files of a specific language. - - Args: - language: Language to filter by - - Returns: - List of file paths - """ + """Get all indexed files for a language.""" language_str = str(language) return [ - filepath - for filepath, metadata in self.file_index.items() - if metadata["language"] == language_str + indexed_file.id + for indexed_file in self._manifest.files + if indexed_file.language == language_str ] def get_file_metadata(self, filepath: str) -> Optional[Dict[str, Any]]: - """Get metadata for a specific file. - - Args: - filepath: Path to the file - - Returns: - File metadata if found, None otherwise - """ - filepath = os.path.normpath(filepath) - return self.file_index.get(filepath) + """Get derived metadata for an indexed file.""" + file_id = self.normalize_file_id(filepath) + indexed_file = self._files_by_id.get(file_id) + if indexed_file is None: + return None + + symbols = self._symbols_by_file.get(file_id, []) + classes = [symbol.name for symbol in symbols if symbol.kind == "class"] + functions = [symbol.name for symbol in symbols if symbol.kind == "function"] + imports = list( + dict.fromkeys( + record.raw_text for record in self._imports_by_source.get(file_id, []) + ) + ) + dependencies = self.get_file_dependencies(file_id) + dependents = self.get_dependents(file_id) + + return { + "filepath": file_id, + "language": indexed_file.language, + "size_bytes": indexed_file.size_bytes, + "line_count": indexed_file.line_count, + "last_modified": indexed_file.last_modified, + "classes": classes, + "functions": functions, + "imports": imports, + "module_docstring": indexed_file.module_docstring, + "dependencies": dependencies, + "dependents": dependents, + "complexity": len(dependencies) + len(symbols), + "complexity_metrics": {}, + "code_quality": {}, + "documentation_coverage": None, + "test_coverage": None, + "category": None, + "contributors": [], + "tags": [], + } def search_files_by_import(self, import_stmt: str) -> List[str]: - """Find files that use a specific import. + """Find files that contain a matching import statement.""" + if import_stmt in self._imports_by_text: + return list(dict.fromkeys(self._imports_by_text[import_stmt])) - Args: - import_stmt: Import statement to search for - - Returns: - List of file paths that use the import - """ - # Try exact match first - if import_stmt in self.import_index: - return self.import_index[import_stmt] - - # Try partial matching if exact match not found - results = [] - for idx, files in self.import_index.items(): - if import_stmt in idx: - results.extend(files) - - return list(set(results)) # Remove duplicates + results: List[str] = [] + for raw_text, file_ids in self._imports_by_text.items(): + if import_stmt in raw_text: + results.extend(file_ids) + return list(dict.fromkeys(results)) def full_text_search(self, query: str) -> List[Dict[str, Any]]: - """Perform a simple full-text search across the codebase. - - This is a basic implementation. For more sophisticated full-text search, - a dedicated search engine like Elasticsearch would be better. - - Args: - query: Text to search for - - Returns: - List of matches with file and context information - """ + """Perform a simple full-text search across indexed files.""" results = [] - - for filepath, metadata in self.file_index.items(): + for file_id, metadata in ( + (indexed_file.id, self.get_file_metadata(indexed_file.id)) + for indexed_file in self._manifest.files + ): + if metadata is None: + continue + absolute_path = self.resolve_file_path(file_id) + if absolute_path is None: + continue try: - # Read the file content - with open(filepath, "r", encoding="utf-8", errors="ignore") as f: - content = f.read() - - # Check if query exists in content - if query.lower() in content.lower(): - # Find line numbers with matches - lines = content.splitlines() - matches = [] - - for i, line in enumerate(lines): - if query.lower() in line.lower(): - matches.append( - { - "line_number": i + 1, - "line_content": line.strip(), - } - ) - - if matches: - results.append( - { - "filepath": filepath, - "language": metadata["language"], - "matches": matches, - } - ) + content = absolute_path.read_text(encoding="utf-8", errors="ignore") except Exception: - # Skip files that can't be read continue - return results + if query.lower() not in content.lower(): + continue - def get_related_files(self, filepath: str) -> List[str]: - """Find files that are related to a given file. + matches = [] + for line_number, line in enumerate(content.splitlines(), start=1): + if query.lower() in line.lower(): + matches.append( + {"line_number": line_number, "line_content": line.strip()} + ) - Files are considered related if they: - - Share imports - - Import each other - - Define or use the same symbols + if matches: + results.append( + { + "filepath": file_id, + "language": metadata["language"], + "matches": matches, + } + ) + return results - Args: - filepath: Path to the file + def get_file_dependencies(self, filepath: str) -> List[str]: + """Return resolved file dependencies for a file.""" + file_id = self.normalize_file_id(filepath) + return list(dict.fromkeys(self._dependencies_by_source.get(file_id, []))) - Returns: - List of related file paths - """ - filepath = os.path.normpath(filepath) - related_files = set() + def get_dependents(self, filepath: str) -> List[str]: + """Return files that depend on a given file.""" + file_id = self.normalize_file_id(filepath) + return list(dict.fromkeys(self._dependents_by_target.get(file_id, []))) - # Get file metadata - metadata = self.get_file_metadata(filepath) - if not metadata: + def get_related_files(self, filepath: str) -> List[str]: + """Find files related to a given file.""" + file_id = self.normalize_file_id(filepath) + metadata = self.get_file_metadata(file_id) + if metadata is None: return [] - # Find files with shared imports + related_files: set[str] = set() + related_files.update(self.get_file_dependencies(file_id)) + related_files.update(self.get_dependents(file_id)) + for import_stmt in metadata["imports"]: related_files.update(self.search_files_by_import(import_stmt)) - # Find files defining symbols used in this file - for symbol in self.symbol_index: - # Check if this symbol is used in our file - for location in self.symbol_index[symbol]: - if location["filepath"] == filepath: - # Find other files defining this symbol - for other_location in self.symbol_index[symbol]: - if other_location["filepath"] != filepath: - related_files.add(other_location["filepath"]) - - # Find files using functions defined in this file - for function_name in metadata["functions"]: - for location in self.function_index.get(function_name, []): - if location["filepath"] != filepath: - related_files.add(location["filepath"]) - - # Find files using classes defined in this file - for class_name in metadata["classes"]: - for location in self.class_index.get(class_name, []): - if location["filepath"] != filepath: - related_files.add(location["filepath"]) - - # Remove the original file from the results - if filepath in related_files: - related_files.remove(filepath) - - return list(related_files) + for symbol in self._symbols_by_file.get(file_id, []): + for match in self._symbols_by_name.get(symbol.name, []): + if match.file_id != file_id: + related_files.add(match.file_id) + + related_files.discard(file_id) + return sorted(related_files) def clear(self) -> None: - """Clear all indexes.""" - self.symbol_index.clear() - self.file_index.clear() - self.import_index.clear() - self.function_index.clear() - self.class_index.clear() - self._save_indexes() + """Clear the persisted manifest and in-memory lookups.""" + self._manifest = CoreIndexManifest.empty( + embedding_backend=self._manifest.embedding_backend, + embedding_model=self._manifest.embedding_model, + source_kinds=self._manifest.source_kinds, + ) + self._rebuild_lookups() + self.save() class CodebaseIndexer: @@ -358,14 +473,14 @@ def __init__( self, index_directory: str = ".docstra/index", exclude_patterns: Optional[List[str]] = None, + codebase_root: Optional[str] = None, + embedding_backend: str = "chroma", + embedding_model: str = "", + source_kinds: Optional[Iterable[str]] = None, ): - """Initialize the codebase indexer. - - Args: - index_directory: Directory to store the index - exclude_patterns: Patterns to exclude from indexing - """ - self.index = CodebaseIndex(index_directory=index_directory) + self.index = CodebaseIndex( + index_directory=index_directory, codebase_root=codebase_root + ) self.exclude_patterns = exclude_patterns or [ ".git", "__pycache__", @@ -385,49 +500,55 @@ def __init__( "build", "dist", ] + self.embedding_backend = embedding_backend + self.embedding_model = embedding_model + self.source_kinds = list(source_kinds or ["tree-sitter"]) def should_exclude(self, path: str) -> bool: - """Check if a path should be excluded from indexing. - - Args: - path: Path to check - - Returns: - True if the path should be excluded, False otherwise - """ + """Check if a path should be excluded from indexing.""" path_norm = os.path.normpath(path) - + path_parts = set(Path(path_norm).parts) + basename = os.path.basename(path_norm) for pattern in self.exclude_patterns: - if pattern in path_norm: + if pattern in path_parts or basename == pattern: return True - return False def index_document(self, document: Document) -> None: - """Index a document. - - Args: - document: Document to index - """ - if not self.should_exclude(document.metadata.filepath): - self.index.index_document(document) + """Merge a single document into the manifest.""" + filtered_documents = [ + document + for document in [document] + if not self.should_exclude(document.metadata.filepath) + ] + self.index.upsert_documents(filtered_documents) def index_documents(self, documents: List[Document]) -> None: - """Index multiple documents. - - Args: - documents: Documents to index - """ + """Index multiple documents into the manifest.""" filtered_documents = [ - doc for doc in documents if not self.should_exclude(doc.metadata.filepath) + document + for document in documents + if not self.should_exclude(document.metadata.filepath) ] - - self.index.index_documents(filtered_documents) + if filtered_documents and self.index.codebase_root is None: + absolute_paths = [ + str(Path(document.metadata.filepath).resolve()) + for document in filtered_documents + if Path(document.metadata.filepath).is_absolute() + ] + if absolute_paths: + self.index.codebase_root = os.path.commonpath(absolute_paths) + + manifest = CoreIndexBuilder.from_documents( + filtered_documents, + codebase_root=self.index.codebase_root or Path.cwd(), + embedding_backend=self.embedding_backend, + embedding_model=self.embedding_model, + source_kinds=self.source_kinds, + ) + self.index.replace_manifest(manifest) + self.index.save() def get_index(self) -> CodebaseIndex: - """Get the underlying codebase index. - - Returns: - The codebase index - """ + """Get the underlying codebase index.""" return self.index diff --git a/docstra/core/indexing/model.py b/docstra/core/indexing/model.py new file mode 100644 index 0000000..43788a7 --- /dev/null +++ b/docstra/core/indexing/model.py @@ -0,0 +1,477 @@ +""" +Typed core index models and builders. +""" + +from __future__ import annotations + +from collections import defaultdict +from datetime import datetime, timezone +import posixpath +from pathlib import Path, PurePosixPath +import re +from typing import Dict, Iterable, List, Literal, Optional, cast + +from pydantic import BaseModel, Field + +from docstra.core.document_processing.document import Document + +CORE_INDEX_FILENAME = "core_index.json" +CORE_INDEX_SCHEMA_VERSION = 1 + + +def normalize_file_id(path: str | Path, codebase_root: str | Path | None = None) -> str: + """Normalize a source path to a repo-relative POSIX file id when possible.""" + path_str = str(path) + candidate = Path(path_str).expanduser() + + if codebase_root is not None: + root = Path(codebase_root).expanduser().resolve() + try: + if candidate.is_absolute(): + relative = candidate.resolve().relative_to(root) + else: + relative = PurePosixPath(path_str) + normalized = PurePosixPath(str(relative)).as_posix() + return _strip_relative_prefix(posixpath.normpath(normalized)) + except ValueError: + pass + + if candidate.is_absolute(): + return PurePosixPath(candidate.as_posix()).as_posix() + + normalized = PurePosixPath(path_str).as_posix() + return _strip_relative_prefix(posixpath.normpath(normalized)) + + +def _strip_relative_prefix(path: str) -> str: + """Drop a leading ./ while preserving ../ segments.""" + if path == ".": + return "" + if path.startswith("./"): + return path[2:] + return path + + +def resolve_file_path( + file_id: str, codebase_root: str | Path | None = None +) -> Optional[Path]: + """Resolve a file id to an absolute path when a codebase root is available.""" + candidate = Path(file_id).expanduser() + if candidate.is_absolute(): + return candidate.resolve() + if codebase_root is None: + return None + return (Path(codebase_root).expanduser().resolve() / file_id).resolve() + + +def make_chunk_id(file_id: str, start_line: int, end_line: int) -> str: + """Build a stable chunk id from a file id and line span.""" + return f"{file_id}#L{start_line}-L{end_line}" + + +def make_symbol_id(file_id: str, kind: str, name: str, line: int) -> str: + """Build a stable symbol id from a file id and symbol definition.""" + return f"{file_id}::{kind}::{name}::L{line}" + + +class IndexedFile(BaseModel): + """Canonical file record.""" + + id: str + language: str + size_bytes: int + last_modified: float + line_count: int + module_docstring: Optional[str] = None + + +class IndexedChunk(BaseModel): + """Canonical chunk record.""" + + id: str + file_id: str + language: str + start_line: int + end_line: int + chunk_type: str + symbols: List[str] = Field(default_factory=list) + parent_symbols: List[str] = Field(default_factory=list) + + +class IndexedSymbol(BaseModel): + """Canonical symbol definition record.""" + + id: str + file_id: str + name: str + kind: Literal["class", "function", "symbol"] + language: str + line: int + parent_symbols: List[str] = Field(default_factory=list) + + +class SymbolOccurrence(BaseModel): + """Observed symbol location.""" + + id: str + symbol_id: str + file_id: str + start_line: int + end_line: int + occurrence_type: Literal["definition"] = "definition" + + +class ImportRecord(BaseModel): + """Raw import statement with optional resolution.""" + + id: str + source_file_id: str + raw_text: str + target_file_id: Optional[str] = None + + +class CodeEdge(BaseModel): + """Relationship between indexed entities.""" + + id: str + source_id: str + target_id: str + edge_type: Literal["imports"] = "imports" + + +class EmbeddingRef(BaseModel): + """Reference to a stored vector in the embedding backend.""" + + target_id: str + target_kind: Literal["file", "chunk", "symbol"] + backend: str + collection_name: str + vector_id: str + + +class GeneratedDoc(BaseModel): + """Generated documentation artifact metadata.""" + + id: str + source_file_ids: List[str] = Field(default_factory=list) + output_path: Optional[str] = None + generated_at: Optional[datetime] = None + + +class CoreIndexManifest(BaseModel): + """Canonical persisted code index manifest.""" + + schema_version: int = CORE_INDEX_SCHEMA_VERSION + created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + embedding_backend: str = "chroma" + embedding_model: str = "" + source_kinds: List[str] = Field(default_factory=lambda: ["tree-sitter"]) + files: List[IndexedFile] = Field(default_factory=list) + chunks: List[IndexedChunk] = Field(default_factory=list) + symbols: List[IndexedSymbol] = Field(default_factory=list) + occurrences: List[SymbolOccurrence] = Field(default_factory=list) + imports: List[ImportRecord] = Field(default_factory=list) + edges: List[CodeEdge] = Field(default_factory=list) + embeddings: List[EmbeddingRef] = Field(default_factory=list) + docs: List[GeneratedDoc] = Field(default_factory=list) + + @classmethod + def empty( + cls, + *, + embedding_backend: str = "chroma", + embedding_model: str = "", + source_kinds: Optional[Iterable[str]] = None, + ) -> CoreIndexManifest: + """Create an empty manifest.""" + return cls( + embedding_backend=embedding_backend, + embedding_model=embedding_model, + source_kinds=list(source_kinds or ["tree-sitter"]), + ) + + +class CoreIndexBuilder: + """Build a core manifest from processed documents.""" + + @classmethod + def from_documents( + cls, + documents: List[Document], + codebase_root: str | Path, + *, + embedding_backend: str = "chroma", + embedding_model: str = "", + source_kinds: Optional[Iterable[str]] = None, + known_files: Optional[Iterable[IndexedFile]] = None, + ) -> CoreIndexManifest: + """Build a canonical manifest from processed documents.""" + root = Path(codebase_root).expanduser().resolve() + manifest = CoreIndexManifest.empty( + embedding_backend=embedding_backend, + embedding_model=embedding_model, + source_kinds=source_kinds, + ) + + file_symbol_parents: Dict[str, Dict[str, List[str]]] = defaultdict(dict) + + for document in documents: + file_id = normalize_file_id(document.metadata.filepath, root) + manifest.files.append( + IndexedFile( + id=file_id, + language=str(document.metadata.language), + size_bytes=document.metadata.size_bytes, + last_modified=document.metadata.last_modified, + line_count=document.metadata.line_count, + module_docstring=document.metadata.module_docstring, + ) + ) + + manifest.embeddings.append( + EmbeddingRef( + target_id=file_id, + target_kind="file", + backend=embedding_backend, + collection_name="documents", + vector_id=file_id, + ) + ) + + for chunk in document.chunks: + chunk_id = make_chunk_id(file_id, chunk.start_line, chunk.end_line) + manifest.chunks.append( + IndexedChunk( + id=chunk_id, + file_id=file_id, + language=str(document.metadata.language), + start_line=chunk.start_line, + end_line=chunk.end_line, + chunk_type=chunk.chunk_type, + symbols=list(chunk.symbols), + parent_symbols=list(chunk.parent_symbols), + ) + ) + manifest.embeddings.append( + EmbeddingRef( + target_id=chunk_id, + target_kind="chunk", + backend=embedding_backend, + collection_name="chunks", + vector_id=chunk_id, + ) + ) + for symbol_name in chunk.symbols: + if symbol_name: + file_symbol_parents[file_id][symbol_name] = list( + chunk.parent_symbols + ) + + available_files = cls._merge_available_files( + known_files or [], + manifest.files, + ) + module_map = cls._build_python_module_map(available_files) + file_id_set = {file.id for file in available_files} + + for document in documents: + file_id = normalize_file_id(document.metadata.filepath, root) + language = str(document.metadata.language) + symbol_kind_map = cls._build_symbol_kind_map(document) + + for symbol_name, lines in document.metadata.symbols.items(): + for line in lines: + kind = symbol_kind_map.get(symbol_name, "symbol") + symbol_id = make_symbol_id(file_id, kind, symbol_name, line) + manifest.symbols.append( + IndexedSymbol( + id=symbol_id, + file_id=file_id, + name=symbol_name, + kind=cast(Literal["class", "function", "symbol"], kind), + language=language, + line=line, + parent_symbols=file_symbol_parents[file_id].get( + symbol_name, [] + ), + ) + ) + manifest.occurrences.append( + SymbolOccurrence( + id=f"{symbol_id}::definition", + symbol_id=symbol_id, + file_id=file_id, + start_line=line, + end_line=line, + ) + ) + + for index, raw_import in enumerate(document.metadata.imports): + target_file_ids = cls._resolve_import_targets( + source_file_id=file_id, + raw_import=raw_import, + language=language, + module_map=module_map, + file_id_set=file_id_set, + ) + if not target_file_ids: + manifest.imports.append( + ImportRecord( + id=f"{file_id}::import::{index}", + source_file_id=file_id, + raw_text=raw_import, + target_file_id=None, + ) + ) + continue + + multiple_targets = len(target_file_ids) > 1 + for target_index, target_file_id in enumerate(target_file_ids): + import_record_id = f"{file_id}::import::{index}" + if multiple_targets: + import_record_id = f"{import_record_id}::{target_index}" + import_record = ImportRecord( + id=import_record_id, + source_file_id=file_id, + raw_text=raw_import, + target_file_id=target_file_id, + ) + manifest.imports.append(import_record) + manifest.edges.append( + CodeEdge( + id=f"{import_record.id}::imports::{target_file_id}", + source_id=file_id, + target_id=target_file_id, + ) + ) + + return manifest + + @staticmethod + def _build_symbol_kind_map(document: Document) -> Dict[str, str]: + symbol_kind_map: Dict[str, str] = {} + for class_name in document.metadata.classes: + symbol_kind_map[class_name] = "class" + for function_name in document.metadata.functions: + symbol_kind_map[function_name] = "function" + return symbol_kind_map + + @staticmethod + def _build_python_module_map(files: List[IndexedFile]) -> Dict[str, str]: + module_map: Dict[str, str] = {} + for indexed_file in files: + pure_path = PurePosixPath(indexed_file.id) + if pure_path.suffix != ".py": + continue + stem_parts = list(pure_path.with_suffix("").parts) + if stem_parts and stem_parts[-1] == "__init__": + stem_parts = stem_parts[:-1] + if not stem_parts: + continue + module_map[".".join(stem_parts)] = indexed_file.id + return module_map + + @staticmethod + def _merge_available_files( + known_files: Iterable[IndexedFile], + current_files: Iterable[IndexedFile], + ) -> List[IndexedFile]: + merged_files: Dict[str, IndexedFile] = { + indexed_file.id: indexed_file for indexed_file in known_files + } + for indexed_file in current_files: + merged_files[indexed_file.id] = indexed_file + return list(merged_files.values()) + + @classmethod + def _resolve_import_targets( + cls, + *, + source_file_id: str, + raw_import: str, + language: str, + module_map: Dict[str, str], + file_id_set: set[str], + ) -> List[str]: + if language == "python": + return cls._resolve_python_import(raw_import, module_map) + if language in {"javascript", "typescript"}: + target_file_id = cls._resolve_js_import( + source_file_id, raw_import, file_id_set + ) + return [target_file_id] if target_file_id is not None else [] + return [] + + @staticmethod + def _resolve_python_import( + raw_import: str, module_map: Dict[str, str] + ) -> List[str]: + import_match = re.match(r"^import\s+(.+)$", raw_import.strip()) + if import_match: + resolved_targets: List[str] = [] + for module_spec in import_match.group(1).split(","): + module_name = module_spec.strip().split(" as ")[0].strip() + target = module_map.get(module_name) + if target: + resolved_targets.append(target) + return _unique_preserving_order(resolved_targets) + + from_match = re.match( + r"^from\s+([A-Za-z0-9_\.]+)\s+import\s+(.+)$", raw_import.strip() + ) + if not from_match: + return [] + + module_name = from_match.group(1).strip() + imported_names = [ + part.strip().split(" as ")[0].strip() + for part in from_match.group(2).split(",") + ] + + resolved_targets: List[str] = [] + for candidate in [f"{module_name}.{name}" for name in imported_names]: + target = module_map.get(candidate) + if target: + resolved_targets.append(target) + + if resolved_targets: + return _unique_preserving_order(resolved_targets) + + target = module_map.get(module_name) + if target: + return [target] + return [] + + @staticmethod + def _resolve_js_import( + source_file_id: str, raw_import: str, file_id_set: set[str] + ) -> Optional[str]: + match = re.search(r"""(?:from|require\()\s*['"]([^'"]+)['"]""", raw_import) + if not match: + return None + + specifier = match.group(1) + if not specifier.startswith("."): + return None + + source_dir = PurePosixPath(source_file_id).parent + base_candidate = posixpath.normpath(str(source_dir / specifier)) + candidates = [ + base_candidate, + f"{base_candidate}.js", + f"{base_candidate}.jsx", + f"{base_candidate}.ts", + f"{base_candidate}.tsx", + f"{base_candidate}/index.js", + f"{base_candidate}/index.ts", + ] + for candidate in candidates: + normalized = _strip_relative_prefix(candidate) + if normalized in file_id_set: + return normalized + return None + + +def _unique_preserving_order(values: Iterable[str]) -> List[str]: + """Return unique strings while preserving input order.""" + return list(dict.fromkeys(values)) diff --git a/docstra/core/indexing/repo_map.py b/docstra/core/indexing/repo_map.py index 50d9b56..1786ab9 100644 --- a/docstra/core/indexing/repo_map.py +++ b/docstra/core/indexing/repo_map.py @@ -1,4 +1,3 @@ -# File: ./docstra/core/indexing/repo_map.py """ Repository mapping for understanding codebase structure. """ @@ -6,31 +5,24 @@ from __future__ import annotations import os +from pathlib import Path, PurePosixPath from typing import Any, Dict, List, Optional, Union, cast from docstra.core.document_processing.document import Document from docstra.core.indexing.code_index import CodebaseIndex +from docstra.core.indexing.model import normalize_file_id class FileNode: """Node representing a file in the repository structure.""" def __init__(self, name: str, path: str, language: Optional[str] = None): - """Initialize a file node. - - Args: - name: File name - path: Full path to the file - language: Programming language of the file - """ self.name = name self.path = path self.language = language self.size: Optional[int] = None self.symbols: List[str] = [] self.imports: List[str] = [] - - # Enhanced metadata self.line_count: Optional[int] = None self.complexity: Optional[int] = None self.dependencies: List[str] = [] @@ -39,8 +31,6 @@ def __init__(self, name: str, path: str, language: Optional[str] = None): self.last_modified: Optional[float] = None self.contributors: List[str] = [] self.tags: List[str] = [] - - # Analysis results with explicit types self.analysis: Dict[str, Any] = { "complexity_metrics": {}, "code_quality": {}, @@ -49,43 +39,33 @@ def __init__(self, name: str, path: str, language: Optional[str] = None): } def analyze(self, index: Optional[CodebaseIndex] = None) -> None: - """Analyze the file for additional metadata. - - Args: - index: Optional codebase index for enhanced analysis - """ + """Analyze the file for additional metadata.""" if not index: return - # Get enhanced metadata from index metadata = index.get_file_metadata(self.path) - if metadata: - # Update basic metadata - self.line_count = metadata.get("line_count") - self.complexity = metadata.get("complexity") - self.dependencies = metadata.get("dependencies", []) - self.dependents = metadata.get("dependents", []) - self.category = metadata.get("category") - self.last_modified = metadata.get("last_modified") - self.contributors = metadata.get("contributors", []) - self.tags = metadata.get("tags", []) - - # Update analysis results - self.analysis.update( - { - "complexity_metrics": metadata.get("complexity_metrics", {}), - "code_quality": metadata.get("code_quality", {}), - "documentation_coverage": metadata.get("documentation_coverage"), - "test_coverage": metadata.get("test_coverage"), - } - ) + if metadata is None: + return - def to_dict(self) -> Dict[str, Any]: - """Convert to dictionary representation. + self.line_count = metadata.get("line_count") + self.complexity = metadata.get("complexity") + self.dependencies = metadata.get("dependencies", []) + self.dependents = metadata.get("dependents", []) + self.category = metadata.get("category") + self.last_modified = metadata.get("last_modified") + self.contributors = metadata.get("contributors", []) + self.tags = metadata.get("tags", []) + self.analysis.update( + { + "complexity_metrics": metadata.get("complexity_metrics", {}), + "code_quality": metadata.get("code_quality", {}), + "documentation_coverage": metadata.get("documentation_coverage"), + "test_coverage": metadata.get("test_coverage"), + } + ) - Returns: - Dictionary representation of the node - """ + def to_dict(self) -> Dict[str, Any]: + """Convert to dictionary representation.""" return { "type": "file", "name": self.name, @@ -110,70 +90,31 @@ class DirectoryNode: """Node representing a directory in the repository structure.""" def __init__(self, name: str, path: str): - """Initialize a directory node. - - Args: - name: Directory name - path: Full path to the directory - """ self.name = name self.path = path self.children: Dict[str, Union[FileNode, DirectoryNode]] = {} def add_file(self, file_path: str, language: Optional[str] = None) -> FileNode: - """Add a file to this directory. - - Args: - file_path: Path to the file - language: Programming language of the file - - Returns: - The created file node - """ file_name = os.path.basename(file_path) node = FileNode(file_name, file_path, language) self.children[file_name] = node return node def add_directory(self, dir_path: str) -> DirectoryNode: - """Add a subdirectory to this directory. - - Args: - dir_path: Path to the directory - - Returns: - The created directory node - """ dir_name = os.path.basename(dir_path) node = DirectoryNode(dir_name, dir_path) self.children[dir_name] = node return node def get_or_create_directory(self, dir_path: str) -> DirectoryNode: - """Get a directory node, creating it if it doesn't exist. - - Args: - dir_path: Path to the directory - - Returns: - The directory node - """ dir_name = os.path.basename(dir_path) - if dir_name in self.children and isinstance( self.children[dir_name], DirectoryNode ): - # Type is guaranteed by isinstance check - cast to ensure type checker knows return cast(DirectoryNode, self.children[dir_name]) - return self.add_directory(dir_path) def to_dict(self) -> Dict[str, Any]: - """Convert to dictionary representation. - - Returns: - Dictionary representation of the node - """ return { "type": "directory", "name": self.name, @@ -188,14 +129,8 @@ class RepositoryMap: """Map representing the structure of a code repository.""" def __init__(self, root_path: str, index: Optional[CodebaseIndex] = None): - """Initialize the repository map. - - Args: - root_path: Root path of the repository - index: Optional codebase index for enhanced metadata - """ self.root_path = os.path.normpath(root_path) - self.root = DirectoryNode(os.path.basename(root_path), self.root_path) + self.root = DirectoryNode(os.path.basename(self.root_path), self.root_path) self.index = index self.exclude_patterns: List[str] = [ ".git", @@ -216,8 +151,6 @@ def __init__(self, root_path: str, index: Optional[CodebaseIndex] = None): "build", "dist", ] - - # Enhanced metadata self.module_categories: Dict[str, List[str]] = { "core": ["core", "src", "lib", "main"], "api": ["api", "rest", "graphql", "endpoints"], @@ -228,8 +161,6 @@ def __init__(self, root_path: str, index: Optional[CodebaseIndex] = None): "config": ["config", "settings", "conf"], "docs": ["docs", "documentation"], } - - # Codebase statistics with explicit types self.stats: Dict[str, Any] = { "total_files": 0, "total_lines": 0, @@ -240,176 +171,106 @@ def __init__(self, root_path: str, index: Optional[CodebaseIndex] = None): } def should_exclude(self, path: str) -> bool: - """Check if a path should be excluded based on exclude patterns. - - Args: - path: Path to check - - Returns: - True if the path should be excluded, False otherwise - """ + """Check if a path should be excluded based on exclude patterns.""" + path_norm = os.path.normpath(path) + path_parts = set(Path(path_norm).parts) + basename = os.path.basename(path_norm) for pattern in self.exclude_patterns: - if pattern in path: + if pattern in path_parts or basename == pattern: return True return False def _categorize_module(self, path: str) -> str: - """Categorize a module based on its path and contents. - - Args: - path: Path to the module - - Returns: - Category name - """ + """Categorize a module based on its path and contents.""" path_lower = path.lower() - - # Check path against known categories for category, patterns in self.module_categories.items(): if any(pattern in path_lower for pattern in patterns): return category - # Check file contents for categorization if self.index: metadata = self.index.get_file_metadata(path) if metadata: - # Check for test files if any( test in path_lower for test in ["test_", "_test", "spec_", "_spec"] ): return "tests" - # Check for configuration files if any( conf in path_lower for conf in [".conf", ".config", ".yaml", ".yml", ".json"] ): return "config" - # Check for documentation if path_lower.endswith((".md", ".rst", ".txt")): return "docs" - return "other" - def _analyze_dependencies(self) -> None: - """Analyze dependencies between modules and files.""" - if not self.index: - return - - def analyze_node(node: Union[FileNode, DirectoryNode]) -> None: - if isinstance(node, FileNode): - # Track file dependencies - deps = self.get_file_dependencies(node.path) - if deps: - # Use type cast to ensure proper typing - dependencies_dict = cast( - Dict[str, List[str]], self.stats["dependencies"] - ) - dependencies_dict[node.path] = deps - - # Calculate complexity based on dependencies and symbols - complexity = len(deps) + len(node.symbols) - complexity_dict = cast(Dict[str, int], self.stats["complexity"]) - complexity_dict[node.path] = complexity - - elif isinstance(node, DirectoryNode): - # Recursively analyze child nodes - for child in node.children.values(): - analyze_node(child) - - analyze_node(self.root) - - def _calculate_statistics(self) -> None: - """Calculate codebase statistics.""" - - def analyze_node(node: Union[FileNode, DirectoryNode]) -> None: - if isinstance(node, FileNode): - # Update file statistics - self.stats["total_files"] = cast(int, self.stats["total_files"]) + 1 + def _reset(self) -> None: + self.root = DirectoryNode(os.path.basename(self.root_path), self.root_path) + self.stats = { + "total_files": 0, + "total_lines": 0, + "languages": {}, + "module_sizes": {}, + "dependencies": {}, + "complexity": {}, + } - # Track language statistics - if node.language: - languages_dict = cast(Dict[str, int], self.stats["languages"]) - languages_dict[node.language] = ( - languages_dict.get(node.language, 0) + 1 - ) + def build(self) -> None: + """Build the repository map from the index when available.""" + self._reset() - # Track module sizes - module_category = self._categorize_module(node.path) - module_sizes_dict = cast(Dict[str, int], self.stats["module_sizes"]) - module_sizes_dict[module_category] = ( - module_sizes_dict.get(module_category, 0) + 1 - ) + if self.index and self.index.iter_files(): + self._build_from_index() + else: + self._traverse_directory(self.root_path, self.root) + if self.index: + self._enhance_with_index() - # Count lines if available - if node.line_count is not None: - self.stats["total_lines"] = ( - cast(int, self.stats["total_lines"]) + node.line_count - ) + self._calculate_statistics() + self._analyze_dependencies() - elif isinstance(node, DirectoryNode): - # Recursively analyze child nodes - for child in node.children.values(): - analyze_node(child) + def _build_from_index(self) -> None: + if not self.index: + return - analyze_node(self.root) + for indexed_file in self.index.iter_files(): + file_id = indexed_file.id + current = self.root + parts = list(PurePosixPath(file_id).parts) + for segment_count in range(1, len(parts)): + dir_path = "/".join(parts[:segment_count]) + current = current.get_or_create_directory(dir_path) - def build(self) -> None: - """Build the repository map by traversing the filesystem.""" - self._traverse_directory(self.root_path, self.root) + file_node = current.add_file(file_id, indexed_file.language) + file_node.size = indexed_file.size_bytes + metadata = self.index.get_file_metadata(file_id) or {} + file_node.symbols = metadata.get("classes", []) + metadata.get( + "functions", [] + ) + file_node.imports = metadata.get("imports", []) - # Enhance with metadata from the index if available - if self.index: - self._enhance_with_index() - - # Calculate statistics and analyze dependencies - self._calculate_statistics() - self._analyze_dependencies() + self._enhance_with_index() def _traverse_directory(self, dir_path: str, node: DirectoryNode) -> None: - """Recursively traverse a directory and build the map. - - Args: - dir_path: Path to the directory - node: Directory node representing the directory - """ try: for entry in os.scandir(dir_path): if self.should_exclude(entry.path): continue if entry.is_file(): - # Add file to the current directory node file_node = node.add_file(entry.path) - - # Determine language from file extension _, ext = os.path.splitext(entry.name) language = self._get_language_from_extension(ext) if language: file_node.language = language - - # Set file size file_node.size = entry.stat().st_size - elif entry.is_dir(): - # Add directory and recursively traverse it dir_node = node.add_directory(entry.path) self._traverse_directory(entry.path, dir_node) - - except Exception as e: - # Handle permission errors and other issues - print(f"Error traversing {dir_path}: {str(e)}") + except Exception as error: + print(f"Error traversing {dir_path}: {error}") def _get_language_from_extension(self, ext: str) -> Optional[str]: - """Determine programming language from file extension. - - Args: - ext: File extension - - Returns: - Language name if recognized, None otherwise - """ ext = ext.lower() - language_map = { ".py": "python", ".js": "javascript", @@ -439,177 +300,103 @@ def _get_language_from_extension(self, ext: str) -> Optional[str]: ".yml": "yaml", ".toml": "toml", } - return language_map.get(ext) def _enhance_with_index(self) -> None: - """Enhance the map with metadata from the codebase index.""" if not self.index: return - def _enhance_node(node: Union[FileNode, DirectoryNode]) -> None: - """Recursively enhance nodes with index metadata.""" + def enhance(node: Union[FileNode, DirectoryNode]) -> None: if isinstance(node, FileNode): - # Analyze file node node.analyze(self.index) + else: + for child in node.children.values(): + enhance(child) + + enhance(self.root) - # Update repository statistics + def _calculate_statistics(self) -> None: + def analyze(node: Union[FileNode, DirectoryNode]) -> None: + if isinstance(node, FileNode): + self.stats["total_files"] = cast(int, self.stats["total_files"]) + 1 + if node.language: + languages = cast(Dict[str, int], self.stats["languages"]) + languages[node.language] = languages.get(node.language, 0) + 1 + module_category = self._categorize_module(node.path) + module_sizes = cast(Dict[str, int], self.stats["module_sizes"]) + module_sizes[module_category] = module_sizes.get(module_category, 0) + 1 if node.line_count is not None: self.stats["total_lines"] = ( cast(int, self.stats["total_lines"]) + node.line_count ) + else: + for child in node.children.values(): + analyze(child) - if node.language: - languages_dict = cast(Dict[str, int], self.stats["languages"]) - languages_dict[node.language] = ( - languages_dict.get(node.language, 0) + 1 - ) - - if node.category: - module_sizes_dict = cast(Dict[str, int], self.stats["module_sizes"]) - module_sizes_dict[node.category] = ( - module_sizes_dict.get(node.category, 0) + 1 - ) - - # Update complexity metrics - if node.complexity is not None: - complexity_dict = cast(Dict[str, int], self.stats["complexity"]) - complexity_dict[node.path] = node.complexity + analyze(self.root) - # Update dependency information - if node.dependencies: - dependencies_dict = cast( + def _analyze_dependencies(self) -> None: + def analyze(node: Union[FileNode, DirectoryNode]) -> None: + if isinstance(node, FileNode): + deps = self.get_file_dependencies(node.path) + if deps: + dependencies = cast( Dict[str, List[str]], self.stats["dependencies"] ) - dependencies_dict[node.path] = node.dependencies - - elif isinstance(node, DirectoryNode): - # Recursively enhance child nodes + dependencies[node.path] = deps + complexity = len(deps) + len(node.symbols) + complexity_dict = cast(Dict[str, int], self.stats["complexity"]) + complexity_dict[node.path] = complexity + else: for child in node.children.values(): - _enhance_node(child) - - # Start enhancement from the root - _enhance_node(self.root) + analyze(child) - def find_file(self, file_path: str) -> Optional[FileNode]: - """Find a file node by path. - - Args: - file_path: Path to the file - - Returns: - File node if found, None otherwise - """ - file_path = os.path.normpath(file_path) + analyze(self.root) - # Find relative path from root - rel_path = os.path.relpath(file_path, self.root_path) - if rel_path.startswith(".."): - # File is outside the repository - return None + def _normalize_lookup_path(self, path: str) -> str: + if self.index: + return self.index.normalize_file_id(path) + return normalize_file_id(path, self.root_path) - parts = rel_path.split(os.sep) + def find_file(self, file_path: str) -> Optional[FileNode]: + normalized = self._normalize_lookup_path(file_path) + parts = normalized.split("/") current = self.root - - # Navigate to parent directory - for _i, part in enumerate(parts[:-1]): - if part in current.children and isinstance( - current.children[part], DirectoryNode - ): - current = cast(DirectoryNode, current.children[part]) - else: + for part in parts[:-1]: + child = current.children.get(part) + if not isinstance(child, DirectoryNode): return None - - # Check if file exists in the directory - file_name = parts[-1] - if file_name in current.children and isinstance( - current.children[file_name], FileNode - ): - return cast(FileNode, current.children[file_name]) - + current = child + leaf = current.children.get(parts[-1]) + if isinstance(leaf, FileNode): + return leaf return None def find_directory(self, dir_path: str) -> Optional[DirectoryNode]: - """Find a directory node by path. - - Args: - dir_path: Path to the directory - - Returns: - Directory node if found, None otherwise - """ - dir_path = os.path.normpath(dir_path) - - # Find relative path from root - rel_path = os.path.relpath(dir_path, self.root_path) - if rel_path.startswith(".."): - # Directory is outside the repository - return None - - parts = rel_path.split(os.sep) - if parts == ["."]: - # Root directory + normalized = self._normalize_lookup_path(dir_path) + if normalized in {"", "."}: return self.root + parts = normalized.split("/") current = self.root - - # Navigate to the directory for part in parts: - if part in current.children and isinstance( - current.children[part], DirectoryNode - ): - current = cast(DirectoryNode, current.children[part]) - else: + child = current.children.get(part) + if not isinstance(child, DirectoryNode): return None - + current = child return current def get_file_dependencies(self, file_path: str) -> List[str]: - """Get dependencies of a file based on imports. - - Args: - file_path: Path to the file - - Returns: - List of file paths that are imported by the file - """ if not self.index: return [] - - file_node = self.find_file(file_path) - if not file_node: - return [] - - # Use index to find imported files - imported_files = [] - for import_stmt in file_node.imports: - # This is a simplified approach. A more sophisticated implementation - # would resolve import statements to actual files. - files = self.index.search_files_by_import(import_stmt) - imported_files.extend(files) - - return imported_files + return self.index.get_file_dependencies(file_path) def get_related_files(self, file_path: str) -> List[str]: - """Get files related to a given file. - - Args: - file_path: Path to the file - - Returns: - List of related file paths - """ if not self.index: return [] - return self.index.get_related_files(file_path) def get_module_overview(self) -> Dict[str, Any]: - """Get a comprehensive overview of the codebase modules. - - Returns: - Dictionary containing module overview information - """ overview = { "statistics": self.stats, "modules": {}, @@ -617,133 +404,92 @@ def get_module_overview(self) -> Dict[str, Any]: "complexity": {}, } - def analyze_node(node: Union[FileNode, DirectoryNode], path: str = "") -> None: + def analyze(node: Union[FileNode, DirectoryNode]) -> None: if isinstance(node, FileNode): - # Add file information module_category = self._categorize_module(node.path) - if module_category not in overview["modules"]: - overview["modules"][module_category] = [] - - file_info = { - "path": node.path, - "language": node.language, - "symbols": node.symbols, - "imports": node.imports, - } - overview["modules"][module_category].append(file_info) - - # Add dependency information + overview["modules"].setdefault(module_category, []).append( + { + "path": node.path, + "language": node.language, + "symbols": node.symbols, + "imports": node.imports, + } + ) if node.path in self.stats["dependencies"]: overview["dependencies"][node.path] = self.stats["dependencies"][ node.path ] - - # Add complexity information if node.path in self.stats["complexity"]: overview["complexity"][node.path] = self.stats["complexity"][ node.path ] + else: + for child in node.children.values(): + analyze(child) - elif isinstance(node, DirectoryNode): - # Recursively analyze child nodes - for name, child in node.children.items(): - child_path = os.path.join(path, name) - analyze_node(child, child_path) - - analyze_node(self.root) + analyze(self.root) return overview def get_cross_references(self, file_path: str) -> List[Dict[str, str]]: - """Get cross-references for a file (imports, usage, etc.).""" + file_id = self._normalize_lookup_path(file_path) cross_refs: List[Dict[str, str]] = [] - node = self.find_file(file_path) - if not node: - return cross_refs - - # Add imports as cross-references - for import_path in node.dependencies: + for dependency in self.get_file_dependencies(file_id): cross_refs.append( { - "file": import_path, + "file": dependency, "type": "import", - "description": f"Imports from {os.path.basename(import_path)}", + "description": f"Imports from {os.path.basename(dependency)}", } ) - # Add files that depend on this one - for dependent_path in node.dependents: - cross_refs.append( - { - "file": dependent_path, - "type": "imported_by", - "description": f"Used by {os.path.basename(dependent_path)}", - } - ) - - # Add related files (same module/package) - related_files = self.get_related_files(file_path) - for related_path in related_files: - if related_path != file_path and related_path not in [ - ref["file"] for ref in cross_refs - ]: + if self.index: + for dependent in self.index.get_dependents(file_id): cross_refs.append( { - "file": related_path, - "type": "related", - "description": f"Related file in same module: {os.path.basename(related_path)}", + "file": dependent, + "type": "imported_by", + "description": f"Used by {os.path.basename(dependent)}", } ) + seen = {reference["file"] for reference in cross_refs} + for related in self.get_related_files(file_id): + if related == file_id or related in seen: + continue + cross_refs.append( + { + "file": related, + "type": "related", + "description": f"Related file in same module: {os.path.basename(related)}", + } + ) return cross_refs def get_change_impact_analysis( self, changed_files: List[str] ) -> Dict[str, List[str]]: - """Analyze the impact of changes to specific files.""" - impact_map = {} - + impact_map: Dict[str, List[str]] = {} for file_path in changed_files: - impacted_files = set() - - # Direct dependents (files that import this one) - node = self.find_file(file_path) - if node: - impacted_files.update(node.dependents) - - # Indirect impact through dependency chain - for dependent in node.dependents: - dependent_node = self.find_file(dependent) - if dependent_node: - impacted_files.update(dependent_node.dependents) - - # If no node found, try to find impact through symbol usage - if not node and self.index: - file_metadata = self.index.get_file_metadata(file_path) - if file_metadata: - # Find files that use symbols from this file - for symbol in file_metadata.get( - "functions", [] - ) + file_metadata.get("classes", []): - symbol_usages = self.index.search_symbol(symbol) - for usage in symbol_usages: - if usage["filepath"] != file_path: - impacted_files.add(usage["filepath"]) - - impact_map[file_path] = list(impacted_files) - + normalized = self._normalize_lookup_path(file_path) + impacted_files = set(self.get_related_files(normalized)) + if self.index: + impacted_files.update(self.index.get_dependents(normalized)) + for dependent in list(self.index.get_dependents(normalized)): + impacted_files.update(self.index.get_dependents(dependent)) + impact_map[normalized] = sorted(impacted_files) return impact_map def get_documentation_context_for_file(self, file_path: str) -> Dict[str, Any]: - """Get comprehensive context for documentation generation.""" - node = self.find_file(file_path) + normalized = self._normalize_lookup_path(file_path) + node = self.find_file(normalized) if not node: return {} - context = { + return { "file_info": { - "path": file_path, - "module_type": self._categorize_module(file_path), + "path": normalized, + "module_type": self._categorize_module(normalized), "complexity": node.complexity, "size_kb": node.size / 1024 if node.size else 0, }, @@ -756,78 +502,49 @@ def get_documentation_context_for_file(self, file_path: str) -> Dict[str, Any]: "dependent_count": len(node.dependents), }, "relationships": { - "related_files": self.get_related_files(file_path), - "cross_references": self.get_cross_references(file_path), - "module_category": self._categorize_module(file_path), + "related_files": self.get_related_files(normalized), + "cross_references": self.get_cross_references(normalized), + "module_category": self._categorize_module(normalized), }, "architectural_info": { - "is_core_module": len(node.dependents) > 3, # Many files depend on it - "is_leaf_module": len(node.dependencies) == 0, # No dependencies + "is_core_module": len(node.dependents) > 3, + "is_leaf_module": len(node.dependencies) == 0, "centrality_score": len(node.dependents) + len(node.dependencies), }, } - return context - def to_dict(self) -> Dict[str, Any]: - """Convert the repository map to a dictionary. - - Returns: - Dictionary representation of the map - """ base_dict = self.root.to_dict() - - # Add enhanced metadata base_dict.update( { "statistics": self.stats, "module_overview": self.get_module_overview(), } ) - return base_dict @staticmethod def from_documents( documents: List[Document], root_path: str, index: Optional[CodebaseIndex] = None ) -> RepositoryMap: - """Create a repository map from a list of documents. - - Args: - documents: List of documents - root_path: Root path of the repository - index: Optional codebase index for enhanced metadata - - Returns: - Repository map - """ repo_map = RepositoryMap(root_path, index) - # Build directory structure for document in documents: - file_path = document.metadata.filepath - - # Skip if outside root path - if not os.path.commonpath([root_path, file_path]).startswith(root_path): - continue - - # Get relative path from root - rel_path = os.path.relpath(file_path, root_path) - parts = rel_path.split(os.sep) - + file_id = normalize_file_id(document.metadata.filepath, root_path) current = repo_map.root - - # Create directories - for i, _part in enumerate(parts[:-1]): - dir_path = os.path.join(root_path, *parts[: i + 1]) + parts = list(PurePosixPath(file_id).parts) + for segment_count in range(1, len(parts)): + dir_path = "/".join(parts[:segment_count]) current = current.get_or_create_directory(dir_path) - # Add file - file_node = current.add_file(file_path, str(document.metadata.language)) - - # Add metadata + file_node = current.add_file(file_id, str(document.metadata.language)) file_node.size = document.metadata.size_bytes file_node.symbols = document.metadata.classes + document.metadata.functions file_node.imports = document.metadata.imports + if index: + repo_map._enhance_with_index() + repo_map._calculate_statistics() + repo_map._analyze_dependencies() + return repo_map diff --git a/docstra/core/ingestion/embeddings.py b/docstra/core/ingestion/embeddings.py index 8aeb510..4a768cc 100644 --- a/docstra/core/ingestion/embeddings.py +++ b/docstra/core/ingestion/embeddings.py @@ -9,12 +9,13 @@ import os import time from abc import ABC, abstractmethod -from typing import Any, Dict, List, Sequence +from typing import Any, Dict, List, Optional, Sequence import requests import tiktoken from docstra.core.document_processing.document import Document +from docstra.core.indexing.model import make_chunk_id, normalize_file_id def _vector_to_list(vector: Sequence[float]) -> List[float]: @@ -270,12 +271,18 @@ def create_embedding_generator(embedding_type: str, **kwargs) -> EmbeddingGenera class DocumentEmbedder: """Generate embeddings for documents and their chunks.""" - def __init__(self, embedding_generator: EmbeddingGenerator) -> None: + def __init__( + self, + embedding_generator: EmbeddingGenerator, + codebase_root: Optional[str] = None, + ) -> None: """Initialize the document embedder.""" self.embedding_generator = embedding_generator + self.codebase_root = codebase_root def embed_document(self, document: Document) -> Dict[str, List[float]]: """Generate embeddings for a document and its chunks.""" + doc_id = normalize_file_id(document.metadata.filepath, self.codebase_root) doc_embedding = self.embedding_generator.generate_embedding(document.content) chunk_embeddings: Dict[str, List[float]] = {} @@ -286,12 +293,9 @@ def embed_document(self, document: Document) -> Dict[str, List[float]]: ) for i, chunk in enumerate(document.chunks): - chunk_id = ( - f"{document.metadata.filepath}#{chunk.start_line}-{chunk.end_line}" - ) + chunk_id = make_chunk_id(doc_id, chunk.start_line, chunk.end_line) chunk_embeddings[chunk_id] = chunk_embedding_vectors[i] - doc_id = document.metadata.filepath chunk_embeddings[doc_id] = doc_embedding return chunk_embeddings @@ -303,7 +307,7 @@ def embed_documents( embeddings: Dict[str, Dict[str, List[float]]] = {} for document in documents: - doc_id = document.metadata.filepath + doc_id = normalize_file_id(document.metadata.filepath, self.codebase_root) embeddings[doc_id] = self.embed_document(document) return embeddings diff --git a/docstra/core/ingestion/storage.py b/docstra/core/ingestion/storage.py index 3706265..ad8ec4e 100644 --- a/docstra/core/ingestion/storage.py +++ b/docstra/core/ingestion/storage.py @@ -14,6 +14,7 @@ from chromadb.types import Metadata from docstra.core.document_processing.document import Document +from docstra.core.indexing.model import make_chunk_id, normalize_file_id ChromaScalar = str | int | float | bool ChromaMetadata = Metadata @@ -393,7 +394,12 @@ def clear(self) -> None: class DocumentIndexer: """Index documents in ChromaDB.""" - def __init__(self, storage: ChromaDBStorage, embedding_generator: Any): + def __init__( + self, + storage: ChromaDBStorage, + embedding_generator: Any, + codebase_root: Optional[str] = None, + ): """Initialize the document indexer. Args: @@ -402,6 +408,7 @@ def __init__(self, storage: ChromaDBStorage, embedding_generator: Any): """ self.storage = storage self.embedding_generator = embedding_generator + self.codebase_root = codebase_root def _prepare_metadata_for_chroma(self, metadata) -> dict: """Convert document metadata to ChromaDB-compatible format. @@ -416,7 +423,13 @@ def _prepare_metadata_for_chroma(self, metadata) -> dict: chroma_metadata = {} # Convert metadata to dictionary - metadata_dict = metadata.dict() if hasattr(metadata, "dict") else metadata + metadata_dict = ( + metadata.model_dump() + if hasattr(metadata, "model_dump") + else metadata.dict() + if hasattr(metadata, "dict") + else metadata + ) # Process each metadata field for key, value in metadata_dict.items(): @@ -460,13 +473,16 @@ def index_document(self, document: Document) -> str: """ # Generate embeddings for the document doc_embedding = self.embedding_generator.generate_embedding(document.content) + doc_id = normalize_file_id(document.metadata.filepath, self.codebase_root) # Convert document metadata to ChromaDB-compatible format doc_metadata = self._prepare_metadata_for_chroma(document.metadata) + doc_metadata["document_id"] = doc_id + doc_metadata["filepath"] = doc_id # Add document to storage - doc_id = self.storage.add_document( - document_id=document.metadata.filepath, + persisted_doc_id = self.storage.add_document( + document_id=doc_id, content=document.content, metadata=doc_metadata, embedding=doc_embedding, @@ -480,9 +496,9 @@ def index_document(self, document: Document) -> str: chunk_embeddings = [] # Process each chunk - for i, chunk in enumerate(document.chunks): + for chunk in document.chunks: # Generate chunk ID - chunk_id = f"{doc_id}#{i}" + chunk_id = make_chunk_id(doc_id, chunk.start_line, chunk.end_line) # Generate chunk embedding chunk_embedding = self.embedding_generator.generate_embedding( @@ -491,14 +507,14 @@ def index_document(self, document: Document) -> str: # Create chunk metadata chunk_metadata = { - "document_id": document.metadata.filepath, - "chunk_index": i, + "document_id": doc_id, "start_line": chunk.start_line, "end_line": chunk.end_line, "chunk_type": chunk.chunk_type, "symbols": chunk.symbols, "parent_symbols": chunk.parent_symbols, "language": str(document.metadata.language), + "filepath": doc_id, } # Convert chunk metadata to ChromaDB-compatible format @@ -521,7 +537,7 @@ def index_document(self, document: Document) -> str: embeddings=chunk_embeddings, ) - return doc_id + return persisted_doc_id def index_documents(self, documents: List[Document]) -> List[str]: """Index multiple documents. diff --git a/docstra/core/retrieval/chroma.py b/docstra/core/retrieval/chroma.py index 2247c6b..097a21d 100644 --- a/docstra/core/retrieval/chroma.py +++ b/docstra/core/retrieval/chroma.py @@ -10,13 +10,17 @@ from docstra.core.ingestion.embeddings import EmbeddingGenerator from docstra.core.ingestion.storage import ChromaDBStorage +from docstra.core.indexing.model import normalize_file_id class ChromaRetriever: """Retriever for documents and chunks using ChromaDB.""" def __init__( - self, storage: ChromaDBStorage, embedding_generator: EmbeddingGenerator + self, + storage: ChromaDBStorage, + embedding_generator: EmbeddingGenerator, + codebase_root: Optional[str] = None, ): """Initialize the ChromaDB retriever. @@ -26,6 +30,7 @@ def __init__( """ self.storage = storage self.embedding_generator = embedding_generator + self.codebase_root = codebase_root def retrieve_documents( self, query: str, n_results: int = 10, **filters @@ -105,10 +110,11 @@ def retrieve_by_filepath( Returns: List of matching chunks """ + file_id = normalize_file_id(filepath, self.codebase_root) return self.retrieve_by_context( query=query, context_type="document_id", - context_value=filepath, + context_value=file_id, n_results=n_results, ) @@ -141,8 +147,9 @@ def get_context_for_document(self, document_id: str) -> Dict[str, Any]: Returns: Document and its chunks """ - document = self.storage.get_document(document_id) - chunks = self.storage.get_chunks_for_document(document_id) + normalized_id = normalize_file_id(document_id, self.codebase_root) + document = self.storage.get_document(normalized_id) + chunks = self.storage.get_chunks_for_document(normalized_id) return {"document": document, "chunks": chunks} @@ -155,7 +162,8 @@ def get_document_by_id(self, document_id: str) -> Optional[Dict[str, Any]]: Returns: The document if found, None otherwise """ - return self.storage.get_document(document_id) + normalized_id = normalize_file_id(document_id, self.codebase_root) + return self.storage.get_document(normalized_id) def get_chunks_for_document(self, document_id: str) -> List[Dict[str, Any]]: """Get all chunks for a document. @@ -166,4 +174,5 @@ def get_chunks_for_document(self, document_id: str) -> List[Dict[str, Any]]: Returns: List of chunks for the document """ - return self.storage.get_chunks_for_document(document_id) + normalized_id = normalize_file_id(document_id, self.codebase_root) + return self.storage.get_chunks_for_document(normalized_id) diff --git a/docstra/core/retrieval/context_aware.py b/docstra/core/retrieval/context_aware.py index e58bc87..1b59817 100644 --- a/docstra/core/retrieval/context_aware.py +++ b/docstra/core/retrieval/context_aware.py @@ -469,7 +469,9 @@ def _get_relevant_modules_context( # Search for files with relevant names for architectural queries concept_keywords = ["cli", "main", "command", "app", "interface", "entry"] - for file_path, metadata in self.code_index.file_index.items(): + for indexed_file in self.code_index.iter_files(): + file_path = indexed_file.id + metadata = self.code_index.get_file_metadata(file_path) or {} file_name = file_path.lower() # Check if filename contains relevant concepts for keyword in concept_keywords: @@ -844,7 +846,7 @@ def _get_detailed_repo_overview(self) -> Optional[str]: # Add directory structure if self.code_index: directories = {} - for file_path in self.code_index.file_index.keys(): + for file_path in self.code_index.iter_file_ids(): dir_name = "/".join(file_path.split("/")[:-1]) if "core" in dir_name: directories[dir_name] = directories.get(dir_name, 0) + 1 @@ -868,7 +870,7 @@ def _get_key_file_contents( key_files = [] # Look for CLI-related files - for file_path in self.code_index.file_index.keys(): + for file_path in self.code_index.iter_file_ids(): if any( keyword in file_path.lower() for keyword in ["cli.py", "main.py", "app.py"] diff --git a/docstra/core/services/documentation_service.py b/docstra/core/services/documentation_service.py index 5ec9017..a94cbd9 100644 --- a/docstra/core/services/documentation_service.py +++ b/docstra/core/services/documentation_service.py @@ -25,6 +25,8 @@ from docstra.core.indexing.code_index import ( CodebaseIndexer, ) # For loading index for repo_map +from docstra.core.indexing.code_index import CodebaseIndex +from docstra.core.indexing.model import CORE_INDEX_FILENAME from docstra.core.indexing.repo_map import RepositoryMap from docstra.core.ingestion.embeddings import EmbeddingFactory from docstra.core.ingestion.storage import ChromaDBStorage @@ -231,23 +233,30 @@ def generate_documentation( repo_map: Optional[RepositoryMap] = None chroma_retriever: Optional[ChromaRetriever] = None - map_path = abs_persist_directory / "repo_map.json" code_indexer_path = abs_persist_directory / "index" + core_index_path = code_indexer_path / CORE_INDEX_FILENAME chroma_storage_path = abs_persist_directory / "chroma" + legacy_index_artifacts = CodebaseIndex.legacy_artifacts_in(code_indexer_path) + legacy_repo_map = abs_persist_directory / "repo_map.json" + + if not core_index_path.exists() and ( + legacy_index_artifacts or legacy_repo_map.exists() + ): + raise ValueError( + "Legacy Docstra index artifacts were found. Run 'docstra ingest' " + "to rebuild the index in the new format." + ) - if map_path.exists() and code_indexer_path.exists(): + if core_index_path.exists(): try: temp_code_index = CodebaseIndexer( - index_directory=str(code_indexer_path) + index_directory=str(code_indexer_path), + codebase_root=str(input_path_abs), ).get_index() if temp_code_index: - # Create repo map from documents instead of loading from dict - repo_map = RepositoryMap.from_documents( - documents_for_generation, str(input_path_abs), temp_code_index - ) - self.console.print( - "[dim]Repo map created from documents and index.[/dim]" - ) + repo_map = RepositoryMap(str(input_path_abs), temp_code_index) + repo_map.build() + self.console.print("[dim]Repo map created from core index.[/dim]") except Exception as e_map: self.console.print( f"[yellow]Warning: Could not load repository map: {e_map}[/yellow]" @@ -266,6 +275,7 @@ def generate_documentation( chroma_retriever = ChromaRetriever( chroma_db, embedding_gen, + codebase_root=str(input_path_abs), ) self.console.print( f"[dim]ChromaRetriever initialized from {chroma_storage_path}.[/dim]" @@ -277,10 +287,11 @@ def generate_documentation( # Get code index if available code_index = None - if abs_persist_directory and (abs_persist_directory / "index").exists(): + if core_index_path.exists(): try: indexer = CodebaseIndexer( - index_directory=str(abs_persist_directory / "index") + index_directory=str(abs_persist_directory / "index"), + codebase_root=str(input_path_abs), ) code_index = indexer.get_index() except Exception as e: diff --git a/docstra/core/services/ingestion_service.py b/docstra/core/services/ingestion_service.py index e6100c9..313cfc3 100644 --- a/docstra/core/services/ingestion_service.py +++ b/docstra/core/services/ingestion_service.py @@ -30,8 +30,8 @@ ) from docstra.core.ingestion.embeddings import EmbeddingFactory from docstra.core.ingestion.storage import ChromaDBStorage, DocumentIndexer -from docstra.core.indexing.code_index import CodebaseIndexer -from docstra.core.indexing.repo_map import RepositoryMap +from docstra.core.indexing.code_index import CodebaseIndex, CodebaseIndexer +from docstra.core.indexing.model import CORE_INDEX_FILENAME from docstra.core.utils.file_collector import collect_files, FileCollector @@ -87,10 +87,30 @@ def ingest_codebase( index_dir = persist_directory / "index" if index_dir.exists() and index_dir.is_dir(): shutil.rmtree(index_dir) + legacy_repo_map = persist_directory / "repo_map.json" + if legacy_repo_map.exists(): + legacy_repo_map.unlink() - # Check if already indexed and not forcing index_path = persist_directory / "index" - if index_path.exists() and not force: + core_index_path = index_path / CORE_INDEX_FILENAME + legacy_index_artifacts = CodebaseIndex.legacy_artifacts_in(index_path) + legacy_repo_map = persist_directory / "repo_map.json" + has_legacy_state = bool(legacy_index_artifacts) or legacy_repo_map.exists() + + if has_legacy_state and not force: + self.console.print( + "[yellow]Legacy index artifacts detected. Rebuilding the index in the new core manifest format.[/]" + ) + chroma_dir = persist_directory / "chroma" + if chroma_dir.exists() and chroma_dir.is_dir(): + shutil.rmtree(chroma_dir) + if index_path.exists() and index_path.is_dir(): + shutil.rmtree(index_path) + if legacy_repo_map.exists(): + legacy_repo_map.unlink() + + # Check if already indexed and not forcing + if core_index_path.exists() and not force: self.console.print( "[yellow]Codebase already indexed. Use --force to reindex.[/]" ) @@ -128,11 +148,19 @@ def ingest_codebase( storage = ChromaDBStorage(persist_directory=str(persist_directory / "chroma")) - doc_indexer = DocumentIndexer(storage, embedding_generator) + doc_indexer = DocumentIndexer( + storage, + embedding_generator, + codebase_root=str(codebase_path_abs), + ) code_indexer = CodebaseIndexer( index_directory=str(persist_directory / "index"), exclude_patterns=exclude_patterns or [], + codebase_root=str(codebase_path_abs), + embedding_backend="chroma", + embedding_model=user_config.embedding.model_name, + source_kinds=["tree-sitter"], ) # Collect files with suppressed logging @@ -248,24 +276,6 @@ def ingest_codebase( task_index, completed=True, description="[green]Indexed all documents" ) - # Create repository map - task_map = progress.add_task("[cyan]Creating repository map...", total=None) - - repo_map = RepositoryMap.from_documents( - documents, str(codebase_path_abs), code_indexer.index - ) - - # Save repository map - map_path = persist_directory / "repo_map.json" - with open(map_path, "w") as f: - import json - - json.dump(repo_map.to_dict(), f) - - progress.update( - task_map, completed=True, description="[green]Created repository map" - ) - # Show completion summary with embedding usage self._show_completion_summary( len(documents), diff --git a/docstra/core/services/query_service.py b/docstra/core/services/query_service.py index 1d643cf..7b72551 100644 --- a/docstra/core/services/query_service.py +++ b/docstra/core/services/query_service.py @@ -18,7 +18,8 @@ from docstra.core.ingestion.embeddings import EmbeddingFactory from docstra.core.ingestion.storage import ChromaDBStorage from docstra.core.retrieval.chroma import ChromaRetriever -from docstra.core.indexing.code_index import CodebaseIndexer +from docstra.core.indexing.code_index import CodebaseIndex, CodebaseIndexer +from docstra.core.indexing.model import CORE_INDEX_FILENAME from docstra.core.retrieval.hybrid import HybridRetriever from docstra.core.retrieval.context_aware import ContextAwareRetriever from docstra.core.utils.token_counter import get_token_counter, ContextBudgetManager @@ -129,46 +130,54 @@ def _ensure_retrieval_components_initialized(self, abs_codebase_path: Path): chroma_path = effective_persist_dir / "chroma" index_path = effective_persist_dir / "index" + core_index_path = index_path / CORE_INDEX_FILENAME chroma_check_file = chroma_path / "chroma.sqlite3" - - if not index_path.exists() or not chroma_check_file.exists(): + legacy_index_artifacts = CodebaseIndex.legacy_artifacts_in(index_path) + legacy_repo_map = effective_persist_dir / "repo_map.json" + + if not core_index_path.exists() or not chroma_check_file.exists(): + migration_hint = "" + if legacy_index_artifacts or legacy_repo_map.exists(): + migration_hint = ( + " Legacy index artifacts were found. Rerun 'docstra ingest' " + "to rebuild the index in the new format." + ) error_msg = ( f"Codebase at {abs_codebase_path} not fully initialized for querying. " f"ChromaDB path: {chroma_path} (check file: {chroma_check_file}, exists: {chroma_check_file.exists()}), " - f"Index path: {index_path} (exists: {index_path.exists()}). " + f"Core index path: {core_index_path} (exists: {core_index_path.exists()}). " "Run 'docstra init' and 'docstra ingest' first." + f"{migration_hint}" ) self.console.print(f"[bold red]Error:[/] {error_msg}") raise FileNotFoundError(error_msg) try: self.storage = ChromaDBStorage(persist_directory=str(chroma_path)) - self.retriever = ChromaRetriever(self.storage, self.embedding_generator) + self.retriever = ChromaRetriever( + self.storage, + self.embedding_generator, + codebase_root=str(abs_codebase_path), + ) self.code_indexer = CodebaseIndexer( - index_directory=str(index_path) - ) # Callbacks not typically passed here + index_directory=str(index_path), + codebase_root=str(abs_codebase_path), + ) code_index_instance = self.code_indexer.get_index() if code_index_instance is None: raise ValueError(f"Failed to load code index from {index_path}") self.hybrid_retriever = HybridRetriever(self.retriever, code_index_instance) # Initialize context-aware retriever - # Load repository map repo_map = None try: from docstra.core.indexing.repo_map import RepositoryMap - repo_map_path = effective_persist_dir / "repo_map.json" - if repo_map_path.exists(): - # Create a new repository map and rebuild it with current index - repo_map = RepositoryMap( - str(abs_codebase_path), code_index_instance - ) - if code_index_instance: - repo_map.build() # Rebuild with current index + repo_map = RepositoryMap(str(abs_codebase_path), code_index_instance) + repo_map.build() except Exception as e: self.console.print( - f"[yellow]Warning: Could not load repository map: {e}[/yellow]" + f"[yellow]Warning: Could not build repository map: {e}[/yellow]" ) self.context_aware_retriever = ContextAwareRetriever( diff --git a/docstra/core/services/repository_explorer_service.py b/docstra/core/services/repository_explorer_service.py index 4a3dbb6..b42c5bf 100644 --- a/docstra/core/services/repository_explorer_service.py +++ b/docstra/core/services/repository_explorer_service.py @@ -14,6 +14,7 @@ from docstra.core.config.settings import UserConfig from docstra.core.indexing.code_index import CodebaseIndex, CodebaseIndexer +from docstra.core.indexing.model import CORE_INDEX_FILENAME from docstra.core.indexing.repo_map import RepositoryMap from docstra.core.utils.colors import Colors @@ -61,23 +62,28 @@ def _load_components(self, codebase_path: str) -> None: # Load code index index_path = persist_dir / "index" - if index_path.exists(): - indexer = CodebaseIndexer(index_directory=str(index_path)) - self.code_index = indexer.get_index() - - # Load repository map - map_path = persist_dir / "repo_map.json" - if map_path.exists(): - # Create a new repository map and load from the saved data - self.repo_map = RepositoryMap(str(abs_path), self.code_index) - if self.code_index: - self.repo_map.build() # Rebuild with current index - - if not self.repo_map or not self.code_index: + core_index_path = index_path / CORE_INDEX_FILENAME + legacy_index_artifacts = CodebaseIndex.legacy_artifacts_in(index_path) + legacy_repo_map = persist_dir / "repo_map.json" + + if not core_index_path.exists(): + if legacy_index_artifacts or legacy_repo_map.exists(): + raise ValueError( + "Legacy Docstra index artifacts were found. Run 'docstra ingest' " + "to rebuild the index in the new format." + ) raise ValueError( "Repository not fully indexed. Run 'docstra ingest' first." ) + indexer = CodebaseIndexer( + index_directory=str(index_path), + codebase_root=str(abs_path), + ) + self.code_index = indexer.get_index() + self.repo_map = RepositoryMap(str(abs_path), self.code_index) + self.repo_map.build() + def get_file_relationships(self, file_path: str) -> Dict[str, Any]: """Get comprehensive file relationship information. @@ -100,7 +106,7 @@ def get_file_relationships(self, file_path: str) -> Dict[str, Any]: dependencies = self.repo_map.get_file_dependencies(file_path) related_files = self.repo_map.get_related_files(file_path) - # Get dependents by finding files that import this one + # Get dependents by following resolved import edges dependents = self._get_file_dependents(file_path) # Get symbols from code index @@ -138,22 +144,7 @@ def _get_file_dependents(self, file_path: str) -> List[str]: """ if not self.code_index: return [] - - dependents: List[str] = [] - file_metadata = self.code_index.get_file_metadata(file_path) - if not file_metadata: - return dependents - - # Find files that import symbols from this file - for symbol in file_metadata.get("functions", []) + file_metadata.get( - "classes", [] - ): - symbol_usages = self.code_index.search_symbol(symbol) - for usage in symbol_usages: - if usage["filepath"] != file_path: - dependents.append(usage["filepath"]) - - return list(set(dependents)) # Remove duplicates + return self.code_index.get_dependents(file_path) def explore_structure( self, path: str, depth: int = 3, show_tree: bool = False diff --git a/tests/test_core_index.py b/tests/test_core_index.py new file mode 100644 index 0000000..5c90a20 --- /dev/null +++ b/tests/test_core_index.py @@ -0,0 +1,481 @@ +from __future__ import annotations + +from pathlib import Path + +import pytest + +from docstra.core.document_processing.document import ( + CodeChunk, + Document, + DocumentMetadata, + DocumentType, +) +from docstra.core.indexing.code_index import CodebaseIndex, CodebaseIndexer +from docstra.core.indexing.model import ( + CORE_INDEX_FILENAME, + CoreIndexBuilder, + CoreIndexManifest, + IndexedChunk, + IndexedFile, + IndexedSymbol, + ImportRecord, +) +from docstra.core.ingestion.storage import ChromaDBStorage, DocumentIndexer + + +class DummyEmbeddingGenerator: + def generate_embedding(self, text: str) -> list[float]: + del text + return [1.0, 0.0, 0.0] + + def generate_embeddings(self, texts: list[str]) -> list[list[float]]: + return [[1.0, 0.0, 0.0] for _ in texts] + + +def _make_document( + path: Path, + *, + content: str, + imports: list[str], + functions: list[str], + classes: list[str], + symbols: dict[str, list[int]], + chunks: list[CodeChunk], +) -> Document: + return Document( + content=content, + metadata=DocumentMetadata( + filepath=str(path), + language=DocumentType.PYTHON, + size_bytes=len(content.encode("utf-8")), + last_modified=1.0, + line_count=len(content.splitlines()), + imports=imports, + classes=classes, + functions=functions, + symbols=symbols, + ), + chunks=chunks, + ) + + +def test_core_index_manifest_round_trip() -> None: + manifest = CoreIndexManifest( + embedding_backend="chroma", + embedding_model="test-model", + source_kinds=["tree-sitter"], + files=[ + IndexedFile( + id="docstra/core/cli.py", + language="python", + size_bytes=100, + last_modified=1.0, + line_count=10, + ) + ], + chunks=[ + IndexedChunk( + id="docstra/core/cli.py#L1-L4", + file_id="docstra/core/cli.py", + language="python", + start_line=1, + end_line=4, + chunk_type="function", + symbols=["main"], + ) + ], + symbols=[ + IndexedSymbol( + id="docstra/core/cli.py::function::main::L1", + file_id="docstra/core/cli.py", + name="main", + kind="function", + language="python", + line=1, + ) + ], + imports=[ + ImportRecord( + id="docstra/core/cli.py::import::0", + source_file_id="docstra/core/cli.py", + raw_text="from docstra.core.app import app", + target_file_id="docstra/core/app.py", + ) + ], + ) + + payload = manifest.model_dump_json(indent=2) + restored = CoreIndexManifest.model_validate_json(payload) + + assert restored.embedding_model == "test-model" + assert restored.files[0].id == "docstra/core/cli.py" + assert restored.chunks[0].id == "docstra/core/cli.py#L1-L4" + assert restored.symbols[0].id == "docstra/core/cli.py::function::main::L1" + assert restored.imports[0].target_file_id == "docstra/core/app.py" + + +def test_core_index_builder_creates_stable_ids_and_edges(tmp_path: Path) -> None: + codebase_root = tmp_path / "repo" + codebase_root.mkdir() + helper_dir = codebase_root / "pkg" + helper_dir.mkdir() + + consumer_path = codebase_root / "consumer.py" + helper_path = helper_dir / "helper.py" + consumer_content = "def run():\n return util()\n" + helper_content = "def util():\n return 1\n" + consumer_path.write_text(consumer_content, encoding="utf-8") + helper_path.write_text(helper_content, encoding="utf-8") + + consumer = _make_document( + consumer_path, + content=consumer_content, + imports=["from pkg.helper import util"], + functions=["run"], + classes=[], + symbols={"run": [1]}, + chunks=[ + CodeChunk( + content=consumer_content, + start_line=1, + end_line=2, + symbols=["run"], + chunk_type="function", + parent_symbols=[], + ) + ], + ) + helper = _make_document( + helper_path, + content=helper_content, + imports=[], + functions=["util"], + classes=[], + symbols={"util": [1]}, + chunks=[ + CodeChunk( + content=helper_content, + start_line=1, + end_line=2, + symbols=["util"], + chunk_type="function", + parent_symbols=[], + ) + ], + ) + + manifest = CoreIndexBuilder.from_documents( + [consumer, helper], + codebase_root, + embedding_backend="chroma", + embedding_model="test-embed", + ) + + assert sorted(indexed_file.id for indexed_file in manifest.files) == [ + "consumer.py", + "pkg/helper.py", + ] + assert {chunk.id for chunk in manifest.chunks} == { + "consumer.py#L1-L2", + "pkg/helper.py#L1-L2", + } + assert {symbol.id for symbol in manifest.symbols} == { + "consumer.py::function::run::L1", + "pkg/helper.py::function::util::L1", + } + assert manifest.imports[0].target_file_id == "pkg/helper.py" + assert manifest.edges[0].source_id == "consumer.py" + assert manifest.edges[0].target_id == "pkg/helper.py" + assert {embedding.vector_id for embedding in manifest.embeddings} >= { + "consumer.py", + "consumer.py#L1-L2", + "pkg/helper.py", + "pkg/helper.py#L1-L2", + } + + index_dir = codebase_root / ".docstra" / "index" + indexer = CodebaseIndexer( + index_directory=str(index_dir), + codebase_root=str(codebase_root), + embedding_model="test-embed", + ) + indexer.index_documents([consumer, helper]) + code_index = CodebaseIndex( + index_directory=str(index_dir), codebase_root=str(codebase_root) + ) + + assert (index_dir / CORE_INDEX_FILENAME).exists() + assert code_index.get_file_dependencies("consumer.py") == ["pkg/helper.py"] + assert code_index.get_related_files("pkg/helper.py") == ["consumer.py"] + assert code_index.search_function("util")[0]["filepath"] == "pkg/helper.py" + assert code_index.get_file_metadata(str(helper_path))["filepath"] == "pkg/helper.py" + + +def test_codebase_indexer_index_document_preserves_existing_files( + tmp_path: Path, +) -> None: + codebase_root = tmp_path / "repo" + codebase_root.mkdir() + helper_dir = codebase_root / "pkg" + helper_dir.mkdir() + + helper_path = helper_dir / "helper.py" + consumer_path = codebase_root / "consumer.py" + + helper_content = "def util():\n return 1\n" + helper_updated_content = "def util():\n value = 1\n return value\n" + consumer_content = "from pkg.helper import util\n\ndef run():\n return util()\n" + + helper_path.write_text(helper_updated_content, encoding="utf-8") + consumer_path.write_text(consumer_content, encoding="utf-8") + + helper = _make_document( + helper_path, + content=helper_content, + imports=[], + functions=["util"], + classes=[], + symbols={"util": [1]}, + chunks=[ + CodeChunk( + content=helper_content, + start_line=1, + end_line=2, + symbols=["util"], + chunk_type="function", + parent_symbols=[], + ) + ], + ) + helper_updated = _make_document( + helper_path, + content=helper_updated_content, + imports=[], + functions=["util"], + classes=[], + symbols={"util": [1]}, + chunks=[ + CodeChunk( + content=helper_updated_content, + start_line=1, + end_line=3, + symbols=["util"], + chunk_type="function", + parent_symbols=[], + ) + ], + ) + consumer = _make_document( + consumer_path, + content=consumer_content, + imports=["from pkg.helper import util"], + functions=["run"], + classes=[], + symbols={"run": [3]}, + chunks=[ + CodeChunk( + content=consumer_content, + start_line=3, + end_line=4, + symbols=["run"], + chunk_type="function", + parent_symbols=[], + ) + ], + ) + + index_dir = codebase_root / ".docstra" / "index" + indexer = CodebaseIndexer( + index_directory=str(index_dir), + codebase_root=str(codebase_root), + embedding_model="test-embed", + ) + + indexer.index_document(helper) + indexer.index_document(consumer) + indexer.index_document(helper_updated) + + code_index = CodebaseIndex( + index_directory=str(index_dir), codebase_root=str(codebase_root) + ) + + assert sorted(code_index.iter_file_ids()) == ["consumer.py", "pkg/helper.py"] + assert code_index.get_file_dependencies("consumer.py") == ["pkg/helper.py"] + assert code_index.get_related_files("pkg/helper.py") == ["consumer.py"] + assert code_index.search_function("util")[0]["filepath"] == "pkg/helper.py" + assert code_index.get_file_metadata("pkg/helper.py")["line_count"] == 3 + + +def test_core_index_builder_resolves_all_python_multi_import_targets( + tmp_path: Path, +) -> None: + codebase_root = tmp_path / "repo" + codebase_root.mkdir() + pkg_dir = codebase_root / "pkg" + pkg_dir.mkdir() + + init_path = pkg_dir / "__init__.py" + module_a_path = pkg_dir / "a.py" + module_b_path = pkg_dir / "b.py" + consumer_path = codebase_root / "consumer.py" + + init_content = "from .a import alpha\nfrom .b import beta\n" + module_a_content = "def alpha():\n return 'a'\n" + module_b_content = "def beta():\n return 'b'\n" + consumer_content = "from pkg import a, b\nimport pkg.a, pkg.b\n" + + for path, content in [ + (init_path, init_content), + (module_a_path, module_a_content), + (module_b_path, module_b_content), + (consumer_path, consumer_content), + ]: + path.write_text(content, encoding="utf-8") + + package_init = _make_document( + init_path, + content=init_content, + imports=["from .a import alpha", "from .b import beta"], + functions=[], + classes=[], + symbols={}, + chunks=[], + ) + module_a = _make_document( + module_a_path, + content=module_a_content, + imports=[], + functions=["alpha"], + classes=[], + symbols={"alpha": [1]}, + chunks=[ + CodeChunk( + content=module_a_content, + start_line=1, + end_line=2, + symbols=["alpha"], + chunk_type="function", + parent_symbols=[], + ) + ], + ) + module_b = _make_document( + module_b_path, + content=module_b_content, + imports=[], + functions=["beta"], + classes=[], + symbols={"beta": [1]}, + chunks=[ + CodeChunk( + content=module_b_content, + start_line=1, + end_line=2, + symbols=["beta"], + chunk_type="function", + parent_symbols=[], + ) + ], + ) + consumer = _make_document( + consumer_path, + content=consumer_content, + imports=["from pkg import a, b", "import pkg.a, pkg.b"], + functions=[], + classes=[], + symbols={}, + chunks=[], + ) + + manifest = CoreIndexBuilder.from_documents( + [package_init, module_a, module_b, consumer], + codebase_root, + ) + + from_import_records = [ + record + for record in manifest.imports + if record.raw_text == "from pkg import a, b" + ] + import_records = [ + record + for record in manifest.imports + if record.raw_text == "import pkg.a, pkg.b" + ] + + assert {record.target_file_id for record in from_import_records} == { + "pkg/a.py", + "pkg/b.py", + } + assert {record.target_file_id for record in import_records} == { + "pkg/a.py", + "pkg/b.py", + } + assert { + (edge.source_id, edge.target_id) + for edge in manifest.edges + if edge.source_id == "consumer.py" + } == { + ("consumer.py", "pkg/a.py"), + ("consumer.py", "pkg/b.py"), + } + assert "pkg/__init__.py" not in { + record.target_file_id for record in from_import_records + } + + +def test_document_indexer_stores_repo_relative_document_ids(tmp_path: Path) -> None: + codebase_root = tmp_path / "repo" + codebase_root.mkdir() + source_path = codebase_root / "app.py" + source_content = "def main():\n return 1\n" + source_path.write_text(source_content, encoding="utf-8") + + document = _make_document( + source_path, + content=source_content, + imports=[], + functions=["main"], + classes=[], + symbols={"main": [1]}, + chunks=[ + CodeChunk( + content=source_content, + start_line=1, + end_line=2, + symbols=["main"], + chunk_type="function", + parent_symbols=[], + ) + ], + ) + + storage = ChromaDBStorage(persist_directory=str(tmp_path / "chroma")) + indexer = DocumentIndexer( + storage, + DummyEmbeddingGenerator(), + codebase_root=str(codebase_root), + ) + + doc_id = indexer.index_document(document) + doc_record = storage.get_document("app.py") + chunk_records = storage.get_chunks_for_document("app.py") + + assert doc_id == "app.py" + assert doc_record is not None + assert doc_record["id"] == "app.py" + assert doc_record["metadata"]["document_id"] == "app.py" + assert doc_record["metadata"]["filepath"] == "app.py" + assert chunk_records[0]["id"] == "app.py#L1-L2" + assert chunk_records[0]["metadata"]["document_id"] == "app.py" + + +def test_codebase_index_rejects_legacy_sidecars_without_core_manifest( + tmp_path: Path, +) -> None: + index_dir = tmp_path / "index" + index_dir.mkdir() + (index_dir / "file_index.json").write_text("{}", encoding="utf-8") + + with pytest.raises(FileNotFoundError, match="Rerun 'docstra ingest'"): + CodebaseIndex(index_directory=str(index_dir), codebase_root=str(tmp_path)) diff --git a/tests/test_index_loading.py b/tests/test_index_loading.py new file mode 100644 index 0000000..082ae71 --- /dev/null +++ b/tests/test_index_loading.py @@ -0,0 +1,129 @@ +from __future__ import annotations + +from pathlib import Path + +from docstra.core.config.settings import UserConfig +from docstra.core.document_processing.document import ( + CodeChunk, + Document, + DocumentMetadata, + DocumentType, +) +from docstra.core.indexing.model import CORE_INDEX_FILENAME, CoreIndexBuilder +from docstra.core.ingestion.storage import ChromaDBStorage +from docstra.core.services.query_service import QueryService +from docstra.core.services.repository_explorer_service import RepositoryExplorerService + + +class DummyEmbeddingGenerator: + def generate_embedding(self, text: str) -> list[float]: + del text + return [1.0, 0.0, 0.0] + + def generate_embeddings(self, texts: list[str]) -> list[list[float]]: + return [[1.0, 0.0, 0.0] for _ in texts] + + +def _write_core_index(codebase_root: Path) -> None: + source_path = codebase_root / "app.py" + source_content = "def main():\n return 1\n" + source_path.write_text(source_content, encoding="utf-8") + + document = Document( + content=source_content, + metadata=DocumentMetadata( + filepath=str(source_path), + language=DocumentType.PYTHON, + size_bytes=len(source_content.encode("utf-8")), + last_modified=1.0, + line_count=2, + imports=[], + classes=[], + functions=["main"], + symbols={"main": [1]}, + ), + chunks=[ + CodeChunk( + content=source_content, + start_line=1, + end_line=2, + symbols=["main"], + chunk_type="function", + parent_symbols=[], + ) + ], + ) + + persist_dir = codebase_root / ".docstra" + index_dir = persist_dir / "index" + index_dir.mkdir(parents=True, exist_ok=True) + manifest = CoreIndexBuilder.from_documents([document], codebase_root) + (index_dir / CORE_INDEX_FILENAME).write_text( + manifest.model_dump_json(indent=2), encoding="utf-8" + ) + ChromaDBStorage(persist_directory=str(persist_dir / "chroma")) + + +def test_query_service_initializes_from_core_index_without_repo_map( + tmp_path: Path, monkeypatch +) -> None: + codebase_root = tmp_path / "repo" + codebase_root.mkdir() + _write_core_index(codebase_root) + + monkeypatch.setattr( + "docstra.core.services.query_service._get_llm_client_for_service", + lambda config, callbacks=None: object(), + ) + monkeypatch.setattr( + "docstra.core.services.query_service.EmbeddingFactory.create_embedding_generator", + lambda embedding_type, **kwargs: DummyEmbeddingGenerator(), + ) + + config = UserConfig() + config.storage.persist_directory = ".docstra" + service = QueryService(config) + service._ensure_retrieval_components_initialized(codebase_root.resolve()) + + assert service.code_indexer is not None + assert service.context_aware_retriever is not None + assert service.context_aware_retriever.repo_map is not None + assert not (codebase_root / ".docstra" / "repo_map.json").exists() + + +def test_repository_explorer_service_loads_core_index_without_repo_map( + tmp_path: Path, +) -> None: + codebase_root = tmp_path / "repo" + codebase_root.mkdir() + _write_core_index(codebase_root) + + config = UserConfig() + config.storage.persist_directory = ".docstra" + service = RepositoryExplorerService(config) + service._load_components(str(codebase_root)) + + assert service.code_index is not None + assert service.repo_map is not None + assert service.code_index.get_file_metadata(str(codebase_root / "app.py")) == { + "filepath": "app.py", + "language": "python", + "size_bytes": len("def main():\n return 1\n".encode("utf-8")), + "line_count": 2, + "last_modified": 1.0, + "classes": [], + "functions": ["main"], + "imports": [], + "module_docstring": None, + "dependencies": [], + "dependents": [], + "complexity": 1, + "complexity_metrics": {}, + "code_quality": {}, + "documentation_coverage": None, + "test_coverage": None, + "category": None, + "contributors": [], + "tags": [], + } + assert service.repo_map.find_file("app.py") is not None