diff --git a/frontend/src/components/kb/knowledge-base-creation-dialog.tsx b/frontend/src/components/kb/knowledge-base-creation-dialog.tsx index 2f67d384f..57fb66332 100644 --- a/frontend/src/components/kb/knowledge-base-creation-dialog.tsx +++ b/frontend/src/components/kb/knowledge-base-creation-dialog.tsx @@ -38,9 +38,12 @@ import { CloudConnectDialog, CloudFile } from "./cloud-connect-dialog" interface IngestionResult { collection: string document_count: number - chunks_count: number + chunk_count: number + embedding_count?: number + vector_count?: number status: string message: string + warnings?: string[] failed_step?: string } @@ -357,16 +360,29 @@ export function KnowledgeBaseCreationDialog({ open, onOpenChange, onSuccess }: K })) } - const result = isJsonRecord(parsed.data) ? parsed.data as unknown as IngestionResult : null + const result = isJsonRecord(parsed.data) + ? (parsed.data as unknown as IngestionResult) + : null if (!result) { throw new Error(t("kb.errors.uploadFailedFile", { name: file.name })) } - setIngestionResults(prev => [...prev, result]) - if (result.status === "partial" && result.failed_step) { - throw new Error(result.message || t("kb.errors.failedAtStep", { step: result.failed_step })) + if (result.status === "error") { + throw new Error(result.message || t("kb.errors.uploadFailedFile", { name: file.name })) + } + if (result.status === "partial") { + toast.warning( + result.message + (result.warnings?.length ? ` Warnings: ${result.warnings.join(", ")}` : "") + ) + } + if (result.status === "success" && (result.embedding_count ?? 0) === 0 && (result.chunk_count ?? 0) > 0) { + toast.error( + "文档上传成功,但 embedding 生成失败(" + result.chunk_count + " 个 chunks 未生成 embedding)。文档无法被搜索。请检查 embedding 模型配置和 API 状态。" + + (result.warnings?.length ? ` 警告: ${result.warnings.join(", ")}` : "") + ) } + setIngestionResults(prev => [...prev, result]) successfulCollections.push(collectionName) setCompletedUploadCount(i + 1) setUploadProgress(((i + 1) / selectedFiles.length) * 100) @@ -457,6 +473,16 @@ export function KnowledgeBaseCreationDialog({ open, onOpenChange, onSuccess }: K if (!result) { throw new Error(t("kb.errors.webIngestFailed")) } + + if (result.status === "error") { + throw new Error(result.message || t("kb.errors.webIngestFailed")) + } + if (result.status === "partial") { + toast.warning( + result.message + (result.warnings?.length ? ` Warnings: ${result.warnings.join(", ")}` : "") + ) + } + setWebIngestionResult(result) setWebIngestionProgress(100) @@ -1011,7 +1037,7 @@ export function KnowledgeBaseCreationDialog({ open, onOpenChange, onSuccess }: K {result.document_count} {t("kb.dialog.fileUpload.processResult.createDocuments")} - {result.chunks_count} {t("kb.dialog.fileUpload.processResult.textChunks")} + {result.chunk_count} {t("kb.dialog.fileUpload.processResult.textChunks")} )} diff --git a/src/xagent/core/tools/core/RAG_tools/pipelines/document_search.py b/src/xagent/core/tools/core/RAG_tools/pipelines/document_search.py index 3800d54d8..0edf03fee 100644 --- a/src/xagent/core/tools/core/RAG_tools/pipelines/document_search.py +++ b/src/xagent/core/tools/core/RAG_tools/pipelines/document_search.py @@ -5,6 +5,7 @@ import logging import numbers import os +import time from typing import Any, Dict, List, Mapping, Optional, Sequence, Tuple, Union, cast import requests @@ -515,6 +516,8 @@ def search_documents( user_id = scope.user_id is_admin = scope.is_admin + search_start = time.time() + cfg = ( config if isinstance(config, SearchConfig) @@ -583,12 +586,28 @@ def search_documents( message = "Search completed successfully" if requested_type == SearchType.SPARSE: + logger.info( + "Executing SPARSE search: UserID=%s, Collection=%s, TopK=%d, ModelTag=%s", + user_id, + collection, + fetch_top_k, + embedding_model_id, + ) + sparse_start = time.time() with progress_tracker.track_step("sparse_search"): pass current_step = "search_sparse" results, status, sparse_warnings, message = _execute_sparse_search( collection, query_text, cfg, embedding_model_id, user_id, is_admin ) + sparse_elapsed = int((time.time() - sparse_start) * 1000) + logger.info( + "SPARSE search completed: Found %d results, Status=%s, Elapsed=%dms, UserID=%s", + len(results), + status, + sparse_elapsed, + user_id, + ) warnings.extend(sparse_warnings) else: # Use embedding adapter for dense/hybrid paths @@ -596,7 +615,15 @@ def search_documents( with progress_tracker.track_step("encode_query"): pass current_step = "encode_query_vector" + encode_start = time.time() query_vector = _encode_query_vector(embedding_adapter, query_text) + encode_elapsed = int((time.time() - encode_start) * 1000) + logger.info( + "Query vector encoded: Dimension=%d, Elapsed=%dms, UserID=%s", + len(query_vector), + encode_elapsed, + user_id, + ) except VectorValidationError: if requested_type == SearchType.HYBRID and cfg.fallback_to_sparse: current_step = "search_sparse_fallback" @@ -620,6 +647,17 @@ def search_documents( raise else: if requested_type == SearchType.DENSE: + logger.info( + "Executing DENSE search: UserID=%s, Collection=%s, TopK=%d, ModelTag=%s, " + "NProbes=%s, RefineFactor=%s", + user_id, + collection, + fetch_top_k, + embedding_model_id, + cfg.nprobes, + cfg.refine_factor, + ) + dense_start = time.time() with progress_tracker.track_step("dense_search"): pass dense_response: DenseSearchResponse = search_dense( @@ -641,7 +679,28 @@ def search_documents( message = ( advice if advice else "Dense search completed successfully" ) + dense_elapsed = int((time.time() - dense_start) * 1000) + logger.info( + "DENSE search completed: Found %d results, Status=%s, Elapsed=%dms, " + "IndexAdvice=%s, UserID=%s", + len(results), + status, + dense_elapsed, + advice or "None", + user_id, + ) else: # HYBRID + logger.info( + "Executing HYBRID search: UserID=%s, Collection=%s, TopK=%d, ModelTag=%s, " + "FusionConfig=%s, FallbackToSparse=%s", + user_id, + collection, + fetch_top_k, + embedding_model_id, + cfg.fusion_config, + cfg.fallback_to_sparse, + ) + hybrid_start = time.time() try: with progress_tracker.track_step("hybrid_search"): pass @@ -684,6 +743,7 @@ def search_documents( current_step = "search_hybrid" raise else: + hybrid_elapsed = int((time.time() - hybrid_start) * 1000) warnings.extend(_serialize_warnings(hybrid_response.warnings)) results = list(hybrid_response.results) status = hybrid_response.status or "success" @@ -692,14 +752,66 @@ def search_documents( if status == "success" else "Hybrid search completed with warnings" ) + logger.info( + "HYBRID search completed: Found %d results, Status=%s, Elapsed=%dms, UserID=%s", + len(results), + status, + hybrid_elapsed, + user_id, + ) # Apply optional rerank current_step = "apply_rerank" + rerank_start = time.time() results, used_rerank, rerank_warnings = _apply_rerank_if_needed( results, query_text, cfg ) + rerank_elapsed = int((time.time() - rerank_start) * 1000) warnings.extend(rerank_warnings) + if used_rerank: + logger.info( + "Rerank applied: Input results=%d, Output results=%d, Elapsed=%dms", + len(results) + len(rerank_warnings), + len(results), + rerank_elapsed, + ) + else: + logger.debug("Rerank skipped: Not configured or no results to rerank") + + total_elapsed = int((time.time() - search_start) * 1000) + logger.info( + "Document search COMPLETED: UserID=%s, Collection=%s, Status=%s, SearchType=%s, " + "Results=%d, TopK=%d, TotalElapsed=%dms, UsedRerank=%s, Warnings=%d, " + "Model=%s", + user_id, + collection, + status.upper(), + actual_type.value, + len(results), + cfg.top_k, + total_elapsed, + used_rerank, + len(warnings), + embedding_model_id, + ) + if status == "success": + logger.info( + "Search SUCCESS: UserID=%s, Found %d results in %dms using %s search with model %s", + user_id, + len(results), + total_elapsed, + actual_type.value, + embedding_model_id, + ) + elif status != "success": + logger.warning( + "Search completed with status '%s': Found %d results, Warnings: %d", + status, + len(results), + len(warnings), + ) + return _build_pipeline_result( status=status, search_type=actual_type, diff --git a/src/xagent/core/tools/core/RAG_tools/storage/contracts.py b/src/xagent/core/tools/core/RAG_tools/storage/contracts.py index 1231466f1..de06f080a 100644 --- a/src/xagent/core/tools/core/RAG_tools/storage/contracts.py +++ b/src/xagent/core/tools/core/RAG_tools/storage/contracts.py @@ -334,16 +334,36 @@ async def delete_collection_metadata( """Delete persisted metadata/config rows for a collection.""" @abstractmethod - async def rename_collection(self, old_name: str, new_name: str) -> None: - """Rename persisted control-plane keys after a data-plane collection rename. + async def count_users_with_collection_config(self, collection_name: str) -> int: + """Count distinct users with a ``collection_config`` row for ``collection_name``. - Updates rows that gate :meth:`list_collections` visibility (for example - per-tenant config rows and aggregate metadata) so they stay aligned with - vector tables when the ``collection`` / ``name`` fields change. + Args: + collection_name: Collection name (sanitized by the caller). + + Returns: + Number of distinct ``user_id`` values occupying the name. + """ + + @abstractmethod + async def rename_collection( + self, + old_name: str, + new_name: str, + *, + user_id: int, + is_admin: bool = False, + ) -> None: + """Rename persisted control-plane keys for one user's collection scope. + + Updates the caller's ``collection_config`` row. Global ``collection_metadata`` + is renamed only when this user is the sole config occupant of ``old_name``. Args: old_name: Previous collection name (sanitized by the caller). new_name: Target collection name (sanitized by the caller). + user_id: User whose config (and optionally metadata) should be renamed. + is_admin: Whether the operation is performed by an admin on behalf of + ``user_id`` (does not broaden vector/config scope beyond ``user_id``). """ @abstractmethod @@ -445,9 +465,23 @@ def rename_collection_data( self, collection_name: str, new_name: str, + user_id: Optional[int], + is_admin: bool, ) -> List[str]: """Rename collection key across vector-side tables. + Applies the same multi-tenancy filter semantics as + :meth:`delete_collection_data`: non-admin callers only rename rows they + can see (``user_id`` match; legacy NULL ``user_id`` is admin-only). + Admins omit the user predicate and rename all rows for the old collection + name. + + Args: + collection_name: Current collection name stored in vector tables. + new_name: Target collection name after rename. + user_id: User ID for multi-tenancy filtering. + is_admin: Whether the caller has admin privileges. + Returns: Warning messages generated during best-effort updates. """ diff --git a/src/xagent/core/tools/core/RAG_tools/storage/lancedb_stores.py b/src/xagent/core/tools/core/RAG_tools/storage/lancedb_stores.py index 7f71a225c..34431fa8f 100644 --- a/src/xagent/core/tools/core/RAG_tools/storage/lancedb_stores.py +++ b/src/xagent/core/tools/core/RAG_tools/storage/lancedb_stores.py @@ -80,42 +80,94 @@ async def delete_collection(self, collection_name: str) -> None: except Exception as exc: logger.debug("Failed to delete collection metadata: %s", exc) - async def rename_collection(self, old_name: str, new_name: str) -> None: - """Rename ``collection_config`` and ``collection_metadata`` keys. + async def count_users_with_collection_config(self, collection_name: str) -> int: + """Count distinct ``user_id`` rows in ``collection_config`` for a collection name.""" + from ..LanceDB.schema_manager import ( + _safe_close_table, + ensure_collection_config_table, + ) + + conn = await self._get_connection() + ensure_collection_config_table(conn) + + safe_collection = escape_lancedb_string(collection_name) + config_table = None + try: + config_table = conn.open_table("collection_config") + rows = ( + config_table.search() + .where(f"collection = '{safe_collection}'") + .to_arrow() + .to_pylist() + ) + except Exception as exc: # noqa: BLE001 + logger.debug( + "Failed to count collection config occupants for %s: %s", + collection_name, + exc, + ) + return 0 + finally: + _safe_close_table(config_table) + + occupant_ids = { + int(row["user_id"]) for row in rows if row.get("user_id") is not None + } + return len(occupant_ids) + + async def rename_collection( + self, + old_name: str, + new_name: str, + *, + user_id: int, + is_admin: bool = False, + ) -> None: + """Rename control-plane keys for a single user's collection scope. See :meth:`MetadataStore.rename_collection`. """ + from ..core.schemas import CollectionInfo from ..LanceDB.schema_manager import ( _safe_close_table, ensure_collection_config_table, ) + del ( + is_admin + ) # Scope is always limited to ``user_id``; flag is for callers only. + conn = await self._get_connection() await self.ensure_collection_metadata_table() ensure_collection_config_table(conn) safe_old = escape_lancedb_string(old_name) now = datetime.now(timezone.utc).replace(tzinfo=None) + occupant_count = await self.count_users_with_collection_config(old_name) config_table = None try: config_table = conn.open_table("collection_config") config_table.update( - f"collection = '{safe_old}'", + f"collection = '{safe_old}' AND user_id = {int(user_id)}", {"collection": new_name, "updated_at": now}, ) finally: _safe_close_table(config_table) - meta_table = None - try: - meta_table = conn.open_table("collection_metadata") - meta_table.update( - f"name = '{safe_old}'", - {"name": new_name, "updated_at": now}, - ) - finally: - _safe_close_table(meta_table) + if occupant_count <= 1: + meta_table = None + try: + meta_table = conn.open_table("collection_metadata") + meta_table.update( + f"name = '{safe_old}'", + {"name": new_name, "updated_at": now}, + ) + finally: + _safe_close_table(meta_table) + return + + await self.save_collection(CollectionInfo(name=new_name)) async def save_collection(self, collection: CollectionInfo) -> None: from ..LanceDB.schema_manager import _safe_close_table @@ -612,11 +664,20 @@ def rename_collection_data( self, collection_name: str, new_name: str, + user_id: Optional[int], + is_admin: bool, ) -> List[str]: from ..LanceDB.schema_manager import _safe_close_table warnings: List[str] = [] - safe_old_name = escape_lancedb_string(collection_name) + filter_expr = self.build_filter_expression( + build_filter_from_dict({"collection": collection_name}), + user_id=user_id, + is_admin=is_admin, + ) + if not filter_expr: + return warnings + conn = self._get_connection() for table_name in self.list_table_names(): if table_name not in { @@ -629,7 +690,7 @@ def rename_collection_data( try: table = conn.open_table(table_name) table.update( - f"collection = '{safe_old_name}'", + filter_expr, {"collection": new_name}, ) except Exception as exc: # noqa: BLE001 diff --git a/src/xagent/core/tools/core/RAG_tools/vector_storage/vector_manager.py b/src/xagent/core/tools/core/RAG_tools/vector_storage/vector_manager.py index 19ca421a3..af64dc36e 100644 --- a/src/xagent/core/tools/core/RAG_tools/vector_storage/vector_manager.py +++ b/src/xagent/core/tools/core/RAG_tools/vector_storage/vector_manager.py @@ -39,6 +39,7 @@ ) from ..LanceDB.model_tag_utils import to_model_tag from ..LanceDB.schema_manager import _safe_close_table, ensure_embeddings_table +from ..storage.contracts import VectorIndexStore from ..storage.factory import get_vector_index_store from ..utils.lancedb_query_utils import list_table_names from ..utils.metadata_utils import deserialize_metadata, serialize_metadata @@ -184,6 +185,53 @@ def _safe_int_conversion(value: Any, default: int = 0) -> int: return default +def _maybe_log_chunks_hidden_by_user_filter( + vector_store: VectorIndexStore, + query_filters: Dict[str, Any], + *, + user_id: Optional[int], + is_admin: bool, +) -> None: + """If DEBUG logging is enabled, detect chunks matching scope filters but excluded by user_id filter. + + Non-admin callers filter rows by ``user_id``; admins effectively omit that predicate. + When the scoped count is zero but an admin-equivalent count is positive, rows likely + exist under another ``user_id`` or legacy NULL ``user_id`` — typical symptom of a + permission or ingestion ownership mismatch. + + Args: + vector_store: Vector index abstraction bound to the active backend. + query_filters: Column equality filters (collection, doc_id, parse_hash, ...). + user_id: Authenticated user id passed into the read path. + is_admin: Whether the caller is treated as admin for tenancy filtering. + """ + if is_admin or user_id is None: + return + if not logger.isEnabledFor(logging.DEBUG): + return + try: + count_without_user_scope = vector_store.count_rows_or_zero( + table_name="chunks", + filters=query_filters, + user_id=user_id, + is_admin=True, + ) + except Exception as exc: # noqa: BLE001 + logger.debug("Skipped chunks user-filter diagnostic count: %s", exc) + return + + if count_without_user_scope > 0: + logger.error( + "Chunks exist for doc/collection scope but user filter excluded them " + "(possible user_id mismatch, legacy NULL user_id row, or permission bug). " + "caller_user_id=%s is_admin=%s unscoped_count=%s filters=%s", + user_id, + is_admin, + count_without_user_scope, + query_filters, + ) + + def _safe_str_value(value: Any) -> Optional[str]: """Extract string value, returning None for NaN/None values. @@ -251,6 +299,12 @@ def read_chunks_for_embedding( is_admin=is_admin, ) if total_count == 0: + _maybe_log_chunks_hidden_by_user_filter( + vector_store, + query_filters, + user_id=user_id, + is_admin=is_admin, + ) logger.info("No chunks found for the given criteria") return EmbeddingReadResponse(chunks=[], total_count=0, pending_count=0) diff --git a/src/xagent/core/tools/core/document_search.py b/src/xagent/core/tools/core/document_search.py index 4cc4ebcd4..8e36462ef 100644 --- a/src/xagent/core/tools/core/document_search.py +++ b/src/xagent/core/tools/core/document_search.py @@ -152,9 +152,23 @@ async def search_knowledge_base( RuntimeError: If search fails """ try: + logger.debug( + "Listing collections for search: UserID=%s, IsAdmin=%s, Query='%s'", + user_id, + is_admin, + tool_args.query[:100] + ("..." if len(tool_args.query) > 100 else ""), + ) # List all collections collections_result = await list_collections(user_id=user_id, is_admin=is_admin) + logger.debug( + "Found %d collections: %s", + len(collections_result.collections), + [ + (c.name, c.documents, c.chunks, c.embeddings) + for c in collections_result.collections + ], + ) if not collections_result.collections: return KnowledgeSearchResult( results=[], @@ -165,13 +179,13 @@ async def search_knowledge_base( available_names = {c.name for c in collections_result.collections} # Debug: Log available collections for troubleshooting - logger.info( + logger.debug( f"📚 Available knowledge base collections: {sorted(available_names)}" ) if tool_args.collections: - logger.info(f" - Requested collections: {tool_args.collections}") + logger.debug(f" - Requested collections: {tool_args.collections}") if tool_args.allowed_collections: - logger.info(f" - Allowed collections: {tool_args.allowed_collections}") + logger.debug(f" - Allowed collections: {tool_args.allowed_collections}") if tool_args.collections: # User specified collections - validate against allowed_collections @@ -205,7 +219,7 @@ async def search_knowledge_base( collections_to_iterate = [ c for c in collections_result.collections if c.name in collections_set ] - logger.info(f"Searching specific collections: {sorted(collections_set)}") + logger.debug(f"Searching specific collections: {sorted(collections_set)}") elif tool_args.allowed_collections is not None: # Use allowed_collections as default allowed_set = set(tool_args.allowed_collections) @@ -228,10 +242,10 @@ async def search_knowledge_base( collections_to_iterate = [ c for c in collections_result.collections if c.name in valid_collections ] - logger.info(f"Searching allowed collections: {sorted(valid_collections)}") + logger.debug(f"Searching allowed collections: {sorted(valid_collections)}") else: collections_to_iterate = collections_result.collections - logger.info("Searching all collections") + logger.debug("Searching all collections") # Build search config search_config = { @@ -250,6 +264,11 @@ async def search_knowledge_base( collection_warnings: list[str] = [] total_searched = 0 + logger.debug( + "Starting search across %d collections: %s", + len(collections_to_iterate), + [c.name for c in collections_to_iterate], + ) for collection_info in collections_to_iterate: collection_name = collection_info.name @@ -261,7 +280,7 @@ async def search_knowledge_base( continue try: - logger.info( + logger.debug( f"Searching collection '{collection_name}' for: {tool_args.query}" ) diff --git a/src/xagent/web/api/kb.py b/src/xagent/web/api/kb.py index 3cc04eda9..4251bb8fa 100644 --- a/src/xagent/web/api/kb.py +++ b/src/xagent/web/api/kb.py @@ -1,6 +1,8 @@ """Knowledge base API route handlers""" import asyncio +import concurrent.futures +import contextvars import functools import hashlib import json @@ -75,7 +77,10 @@ ) from ...core.tools.core.RAG_tools.progress import get_progress_manager from ...core.tools.core.RAG_tools.storage.contracts import DocumentRecord -from ...core.tools.core.RAG_tools.storage.factory import get_vector_index_store +from ...core.tools.core.RAG_tools.storage.factory import ( + get_metadata_store, + get_vector_index_store, +) from ...core.tools.core.RAG_tools.utils.string_utils import ( generate_deterministic_doc_id, ) @@ -891,6 +896,47 @@ async def wrapper(*args: Any, **kwargs: Any) -> Any: # Create router kb_router = APIRouter(prefix="/api/kb", tags=["kb"]) +# Shared executor for ingestion tasks to prevent global thread pool exhaustion +_ingest_executor = concurrent.futures.ThreadPoolExecutor( + max_workers=10, thread_name_prefix="ingest_worker_" +) + + +def shutdown_ingest_executor() -> None: + """Shutdown the shared ingestion executor gracefully. + + Called during application shutdown to ensure pending tasks complete + and resources are properly released. Uses a timeout to prevent blocking + application shutdown indefinitely. + """ + logger.info("Shutting down ingestion executor...") + try: + import threading + + shutdown_complete = threading.Event() + + def wait_for_shutdown() -> None: + _ingest_executor.shutdown(wait=True) + shutdown_complete.set() + + shutdown_thread = threading.Thread(target=wait_for_shutdown, daemon=True) + shutdown_thread.start() + + if shutdown_complete.wait(timeout=30): + logger.info("Ingestion executor shutdown complete") + else: + logger.warning( + "Executor shutdown timed out after 30s; forcing shutdown. " + "Some ingestion tasks may be incomplete." + ) + _ingest_executor.shutdown(wait=False) + except Exception as e: + logger.error("Error during executor shutdown: %s", e) + try: + _ingest_executor.shutdown(wait=False) + except Exception: + pass + class CloudFile(BaseModel): provider: str @@ -1408,8 +1454,12 @@ def _run_ingestion() -> IngestionResult: file_id=str(file_record.file_id), ) + ingest_ctx = contextvars.copy_context() loop = asyncio.get_running_loop() - result: IngestionResult = await loop.run_in_executor(None, _run_ingestion) + result: IngestionResult = await loop.run_in_executor( + _ingest_executor, + lambda: ingest_ctx.run(_run_ingestion), + ) if result.status in {"error", "partial"}: await _rollback_failed_ingestion( @@ -1675,15 +1725,22 @@ def _download_file() -> None: file_config = config.model_copy( update={"parse_method": normalized_parse_method} ) - result = await asyncio.to_thread( - run_document_ingestion, - collection=safe_collection, - source_path=str(file_path), - ingestion_config=file_config, - progress_manager=progress_manager, - user_id=int(_user.id), - is_admin=bool(_user.is_admin), - file_id=str(file_record.file_id), + + def _run_cloud_ingestion() -> IngestionResult: + return run_document_ingestion( + collection=safe_collection, + source_path=str(file_path), + ingestion_config=file_config, + progress_manager=progress_manager, + user_id=int(_user.id), + is_admin=bool(_user.is_admin), + file_id=str(file_record.file_id), + ) + + cloud_ctx = contextvars.copy_context() + result = await asyncio.get_running_loop().run_in_executor( + _ingest_executor, + lambda: cloud_ctx.run(_run_cloud_ingestion), ) if result.status in {"error", "partial"}: await _rollback_failed_cloud_ingestion( @@ -2632,9 +2689,8 @@ def _file_handler_with_db( finally: db_session.close() - result = await asyncio.get_event_loop().run_in_executor( - None, - lambda: asyncio.run( + def _run_web_ingestion_blocking() -> WebIngestionResult: + return asyncio.run( run_web_ingestion( collection=safe_collection, crawl_config=crawl_config, @@ -2643,7 +2699,12 @@ def _file_handler_with_db( is_admin=bool(_user.is_admin), file_handler=_file_handler_with_db, ) - ), + ) + + web_ctx = contextvars.copy_context() + result = await asyncio.get_running_loop().run_in_executor( + _ingest_executor, + lambda: web_ctx.run(_run_web_ingestion_blocking), ) if result.status == "error": @@ -3858,6 +3919,10 @@ def _resolve_list_documents_match() -> Optional[ResolvedDocumentMatch]: async def rename_collection_api( collection_name: str, new_name: str = Form(..., description="New collection name"), + target_user_id: Optional[int] = Form( + None, + description="Admin only: user whose collection scope to rename", + ), _user: User = Depends(get_current_user), db: Session = Depends(get_db), ) -> dict: @@ -3866,6 +3931,7 @@ async def rename_collection_api( Args: collection_name: Current collection name new_name: New collection name + target_user_id: Required for admins; scopes rename to that user's data only Returns: Success message @@ -3875,13 +3941,20 @@ async def rename_collection_api( load_ingestion_status, write_ingestion_status, ) - from ...core.tools.core.RAG_tools.storage.factory import ( - get_metadata_store, - get_vector_index_store, - ) + metadata_store = get_metadata_store() vector_store = get_vector_index_store() + if bool(_user.is_admin): + if target_user_id is None: + raise HTTPException( + status_code=422, + detail="Admin rename requires target_user_id form field", + ) + scope_user_id = int(target_user_id) + else: + scope_user_id = int(_user.id) + if not new_name or not new_name.strip(): raise HTTPException( status_code=422, @@ -3903,12 +3976,40 @@ async def rename_collection_api( if safe_new_collection == safe_old_collection: return {"status": "success", "message": "Collection name unchanged"} + is_admin_rename = bool(_user.is_admin) + if not is_admin_rename: + occupant_count = await metadata_store.count_users_with_collection_config( + safe_old_collection + ) + if occupant_count > 1: + raise HTTPException( + status_code=409, + detail=( + "Collection name is shared by multiple users; " + "rename is not allowed. Contact an administrator." + ), + ) + # Access control check - await _ensure_collection_access(safe_old_collection, _user, hide_missing=False) + if is_admin_rename: + visible_for_scope = await _list_collections_with_retry( + user_id=scope_user_id, + is_admin=False, + stage="rename_list_visible_collections_for_target_user", + ) + if not any( + c.name == safe_old_collection for c in visible_for_scope.collections + ): + raise HTTPException( + status_code=404, + detail=f"Collection not found for user: {safe_old_collection}", + ) + else: + await _ensure_collection_access(safe_old_collection, _user, hide_missing=False) # Validate that target collection doesn't exist or user has access visible_for_user = await _list_collections_with_retry( - user_id=int(_user.id), + user_id=scope_user_id, is_admin=False, stage="rename_list_visible_collections", ) @@ -3935,8 +4036,8 @@ async def rename_collection_api( new_collection_dir: Optional[Path] = None collection_records = vector_store.list_document_records( collection_name=safe_old_collection, - user_id=int(_user.id), - is_admin=bool(_user.is_admin), + user_id=scope_user_id, + is_admin=False, ) collection_file_ids = { file_id @@ -3948,7 +4049,7 @@ async def rename_collection_api( physical_rename = rename_collection_storage( db, - user_id=int(_user.id), + user_id=scope_user_id, old_collection_name=safe_old_collection, new_collection_name=safe_new_collection, collection_file_ids=collection_file_ids, @@ -3973,20 +4074,21 @@ async def rename_collection_api( ) # Step 2: Update collection name in all tables (documents, parses, chunks, embeddings) - # Use storage abstraction layer which handles all tables including embeddings - vector_store = get_vector_index_store() warnings.extend( vector_store.rename_collection_data( collection_name=safe_old_collection, new_name=safe_new_collection, + user_id=scope_user_id, + is_admin=False, ) ) try: - metadata_store = get_metadata_store() await metadata_store.rename_collection( old_name=safe_old_collection, new_name=safe_new_collection, + user_id=scope_user_id, + is_admin=is_admin_rename, ) except Exception as e: logger.warning("Failed to rename metadata store keys: %s", e) @@ -3994,7 +4096,11 @@ async def rename_collection_api( # Migrate ingestion status from old collection name to new try: - status_entries = load_ingestion_status(collection=safe_old_collection) + status_entries = load_ingestion_status( + collection=safe_old_collection, + user_id=scope_user_id, + is_admin=False, + ) for entry in status_entries: doc_id = entry.get("doc_id") if doc_id: @@ -4004,8 +4110,14 @@ async def rename_collection_api( status=entry.get("status", "pending"), message=entry.get("message", ""), parse_hash=entry.get("parse_hash", ""), + user_id=scope_user_id, + ) + clear_ingestion_status( + safe_old_collection, + doc_id, + user_id=scope_user_id, + is_admin=False, ) - clear_ingestion_status(safe_old_collection, doc_id) except Exception as e: logger.warning("Failed to update ingestion status: %s", e) warnings.append(f"Failed to update ingestion status: {e}") diff --git a/src/xagent/web/app.py b/src/xagent/web/app.py index d1dd9d850..d0f277b53 100644 --- a/src/xagent/web/app.py +++ b/src/xagent/web/app.py @@ -718,6 +718,12 @@ async def shutdown_event() -> None: if sandbox_mgr: await sandbox_mgr.cleanup() + # Shutdown shared ingestion executor + from .api.kb import shutdown_ingest_executor + + shutdown_ingest_executor() + logger.info("Ingestion executor shutdown completed") + # Frontend is now served by Next.js at http://localhost:3000 # This backend only provides API endpoints diff --git a/tests/core/tools/core/RAG_tools/management/test_collections.py b/tests/core/tools/core/RAG_tools/management/test_collections.py index ba28af4f9..d54b91420 100644 --- a/tests/core/tools/core/RAG_tools/management/test_collections.py +++ b/tests/core/tools/core/RAG_tools/management/test_collections.py @@ -1067,3 +1067,57 @@ def test_delete_collection_removes_metadata_table_entry(temp_lancedb_dir: str) - after_table = conn.open_table("collection_metadata") after = after_table.search().where(f"name = '{collection}'").to_list() assert after == [] + + +@pytest.mark.asyncio +async def test_metadata_rename_collection_preserves_other_users_config_when_shared( + temp_lancedb_dir: str, +) -> None: + """Tenant-scoped metadata rename must not retarget another user's config row.""" + from src.xagent.core.tools.core.RAG_tools.core.schemas import CollectionInfo + + collection = "shared_rename" + store = get_metadata_store() + await store.save_collection(CollectionInfo(name=collection)) + await store.save_collection_config(collection, "{}", user_id=1) + await store.save_collection_config(collection, "{}", user_id=2) + + await store.rename_collection( + collection, + "user1_new", + user_id=1, + is_admin=False, + ) + + assert await store.get_collection_config("user1_new", user_id=1) == "{}" + assert await store.get_collection_config(collection, user_id=2) == "{}" + assert await store.get_collection_config("user1_new", user_id=2) is None + + listed = await list_collections(user_id=None, is_admin=True) + names = {info.name for info in listed.collections} + assert collection in names + assert "user1_new" in names + + +@pytest.mark.asyncio +async def test_metadata_rename_collection_admin_scopes_to_target_user_only( + temp_lancedb_dir: str, +) -> None: + """Admin rename for one user must leave other occupants on the old name.""" + from src.xagent.core.tools.core.RAG_tools.core.schemas import CollectionInfo + + collection = "shared_admin_rename" + store = get_metadata_store() + await store.save_collection(CollectionInfo(name=collection)) + await store.save_collection_config(collection, "{}", user_id=10) + await store.save_collection_config(collection, "{}", user_id=20) + + await store.rename_collection( + collection, + "user10_new", + user_id=10, + is_admin=True, + ) + + assert await store.get_collection_config("user10_new", user_id=10) == "{}" + assert await store.get_collection_config(collection, user_id=20) == "{}" diff --git a/tests/core/tools/core/RAG_tools/storage/test_lancedb_stores.py b/tests/core/tools/core/RAG_tools/storage/test_lancedb_stores.py index abb57b02d..d80c2e267 100644 --- a/tests/core/tools/core/RAG_tools/storage/test_lancedb_stores.py +++ b/tests/core/tools/core/RAG_tools/storage/test_lancedb_stores.py @@ -45,19 +45,22 @@ def mock_schema_manager_user_id_migration() -> None: @patch( "xagent.core.tools.core.RAG_tools.storage.lancedb_stores.get_connection_from_env" ) -def test_metadata_store_rename_collection_updates_tables( +def test_metadata_store_rename_collection_scopes_config_and_metadata_for_sole_owner( mock_get_connection: Mock, _mock_ensure_config: Mock, _mock_ensure_meta: AsyncMock, ) -> None: - """rename_collection should update collection_config and collection_metadata.""" + """Sole config occupant: rename only that user's config row and global metadata.""" from types import SimpleNamespace mock_conn = Mock() mock_get_connection.return_value = mock_conn mock_config = Mock() - mock_config.schema = [SimpleNamespace(name="collection")] + mock_config.schema = [ + SimpleNamespace(name="collection"), + SimpleNamespace(name="user_id"), + ] mock_meta = Mock() mock_meta.schema = [SimpleNamespace(name="name")] @@ -71,11 +74,24 @@ def _open(name: str) -> Mock: mock_conn.open_table.side_effect = _open store = LanceDBMetadataStore() - asyncio.run(store.rename_collection("old_col", "new_col")) + with patch.object( + store, + "count_users_with_collection_config", + new=AsyncMock(return_value=1), + ): + asyncio.run( + store.rename_collection( + "old_col", + "new_col", + user_id=7, + is_admin=False, + ) + ) mock_config.update.assert_called_once() cfg_where, cfg_updates = mock_config.update.call_args[0] assert "old_col" in cfg_where + assert "user_id = 7" in cfg_where assert cfg_updates["collection"] == "new_col" mock_meta.update.assert_called_once() @@ -84,6 +100,104 @@ def _open(name: str) -> Mock: assert meta_updates["name"] == "new_col" +@patch( + "xagent.core.tools.core.RAG_tools.storage.lancedb_stores.LanceDBMetadataStore.ensure_collection_metadata_table", + new_callable=AsyncMock, +) +@patch( + "xagent.core.tools.core.RAG_tools.LanceDB.schema_manager.ensure_collection_config_table" +) +@patch( + "xagent.core.tools.core.RAG_tools.storage.lancedb_stores.get_connection_from_env" +) +def test_metadata_store_rename_collection_skips_global_metadata_when_name_shared( + mock_get_connection: Mock, + _mock_ensure_config: Mock, + _mock_ensure_meta: AsyncMock, +) -> None: + """Multiple config occupants: only the caller's config row is renamed.""" + from types import SimpleNamespace + + mock_conn = Mock() + mock_get_connection.return_value = mock_conn + + mock_config = Mock() + mock_config.schema = [ + SimpleNamespace(name="collection"), + SimpleNamespace(name="user_id"), + ] + mock_meta = Mock() + mock_meta.schema = [SimpleNamespace(name="name")] + + def _open(name: str) -> Mock: + if name == "collection_config": + return mock_config + if name == "collection_metadata": + return mock_meta + raise AssertionError(name) + + mock_conn.open_table.side_effect = _open + + store = LanceDBMetadataStore() + with ( + patch.object( + store, + "count_users_with_collection_config", + new=AsyncMock(return_value=2), + ), + patch.object(store, "save_collection", new=AsyncMock()), + ): + asyncio.run( + store.rename_collection( + "old_col", + "new_col", + user_id=7, + is_admin=False, + ) + ) + + mock_config.update.assert_called_once() + cfg_where, _ = mock_config.update.call_args[0] + assert "user_id = 7" in cfg_where + mock_meta.update.assert_not_called() + + +@patch( + "xagent.core.tools.core.RAG_tools.LanceDB.schema_manager.ensure_collection_config_table" +) +@patch( + "xagent.core.tools.core.RAG_tools.storage.lancedb_stores.get_connection_from_env" +) +def test_metadata_store_count_users_with_collection_config( + mock_get_connection: Mock, + _mock_ensure_config: Mock, +) -> None: + """Occupant count reflects distinct user_id rows for a collection name.""" + from types import SimpleNamespace + + mock_conn = Mock() + mock_get_connection.return_value = mock_conn + + mock_table = Mock() + mock_table.schema = [ + SimpleNamespace(name="collection"), + SimpleNamespace(name="user_id"), + ] + mock_conn.open_table.return_value = mock_table + mock_table.search.return_value.where.return_value.to_arrow.return_value.to_pylist.return_value = [ + {"collection": "shared", "user_id": 1}, + {"collection": "shared", "user_id": 2}, + ] + + store = LanceDBMetadataStore() + count = asyncio.run(store.count_users_with_collection_config("shared")) + + assert count == 2 + mock_table.search.return_value.where.assert_called_once() + where_clause = mock_table.search.return_value.where.call_args[0][0] + assert "shared" in where_clause + + @patch( "xagent.core.tools.core.RAG_tools.storage.lancedb_stores.get_connection_from_env" ) @@ -337,7 +451,12 @@ def test_vector_store_rename_collection_data_updates_expected_tables( mock_conn.open_table.return_value = mock_table store = LanceDBVectorIndexStore() - warnings = store.rename_collection_data("old_name", "new_name") + warnings = store.rename_collection_data( + "old_name", + "new_name", + user_id=1, + is_admin=False, + ) assert warnings == [] # 4 target tables should be updated; control-plane table excluded. diff --git a/tests/web/api/test_kb_dir.py b/tests/web/api/test_kb_dir.py index 4537776ef..1af243566 100644 --- a/tests/web/api/test_kb_dir.py +++ b/tests/web/api/test_kb_dir.py @@ -1288,14 +1288,12 @@ def test_kb_delete_returns_physical_cleanup_status(test_env, temp_uploads): data = response.json() # Should include physical cleanup information in warnings - assert "warnings" in data or "message" in data - if "warnings" in data: - # Check that warnings include physical cleanup status - warnings_text = " ".join(data["warnings"]).lower() - assert any( - keyword in warnings_text - for keyword in ["physical", "directory", "cleanup", "removed"] - ) + assert "warnings" in data + warnings_text = " ".join(data["warnings"]).lower() + assert any( + keyword in warnings_text + for keyword in ["physical", "directory", "cleanup", "removed"] + ) def test_kb_delete_skips_uploaded_file_cleanup_when_logical_delete_fails( diff --git a/tests/web/api/test_kb_rename_tenancy.py b/tests/web/api/test_kb_rename_tenancy.py new file mode 100644 index 000000000..45a884d08 --- /dev/null +++ b/tests/web/api/test_kb_rename_tenancy.py @@ -0,0 +1,419 @@ +"""Tenant-scoped collection rename behavior (TDD). + +Non-admin rename is rejected when multiple users occupy the same collection name. +Admin rename always scopes to an explicit target user. +""" + +from __future__ import annotations + +from datetime import datetime, timedelta, timezone +from unittest.mock import AsyncMock, MagicMock, patch + +import jwt +import pytest +from fastapi import FastAPI +from fastapi.testclient import TestClient +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker + +from xagent.core.tools.core.RAG_tools.core.schemas import ( + CollectionInfo, + ListCollectionsResult, +) +from xagent.web.api.auth import hash_password +from xagent.web.api.kb import kb_router +from xagent.web.auth_config import JWT_ALGORITHM, JWT_SECRET_KEY +from xagent.web.models.database import Base, get_db +from xagent.web.models.user import User + + +def _auth_headers(user: User) -> dict[str, str]: + payload = { + "sub": user.username, + "user_id": user.id, + "type": "access", + "exp": datetime.now(timezone.utc) + timedelta(hours=1), + "iat": datetime.now(timezone.utc), + } + token = jwt.encode(payload, JWT_SECRET_KEY, algorithm=JWT_ALGORITHM) + return {"Authorization": f"Bearer {token}"} + + +@pytest.fixture() +def rename_users_env(tmp_path): + """App with two regular users and one admin user.""" + import os + import shutil + + db_path = tmp_path / "test.db" + temp_lancedb_dir = tmp_path / "lancedb" + temp_lancedb_dir.mkdir(parents=True, exist_ok=True) + + from xagent.core.tools.core.RAG_tools.storage.factory import StorageFactory + + previous_lancedb_dir = os.environ.get("LANCEDB_DIR") + os.environ["LANCEDB_DIR"] = str(temp_lancedb_dir) + StorageFactory.get_factory().reset_all() + + test_engine = create_engine(f"sqlite:///{db_path}") + TestingSessionLocal = sessionmaker(bind=test_engine) + + def override_get_db(): + db = TestingSessionLocal() + try: + yield db + finally: + db.close() + + app = FastAPI() + app.include_router(kb_router) + app.dependency_overrides[get_db] = override_get_db + Base.metadata.create_all(bind=test_engine) + + session = TestingSessionLocal() + regular = User( + username="regular", + password_hash=hash_password("test"), + is_admin=False, + ) + other = User( + username="other", + password_hash=hash_password("test"), + is_admin=False, + ) + admin = User( + username="admin", + password_hash=hash_password("test"), + is_admin=True, + ) + session.add_all([regular, other, admin]) + session.commit() + session.refresh(regular) + session.refresh(other) + session.refresh(admin) + + yield ( + app, + _auth_headers(regular), + regular, + _auth_headers(admin), + admin, + other, + TestingSessionLocal, + ) + + session.close() + test_engine.dispose() + StorageFactory.get_factory().reset_all() + if previous_lancedb_dir is None: + os.environ.pop("LANCEDB_DIR", None) + else: + os.environ["LANCEDB_DIR"] = previous_lancedb_dir + shutil.rmtree(temp_lancedb_dir, ignore_errors=True) + + +def _visible_only_old(old_name: str) -> ListCollectionsResult: + return ListCollectionsResult( + status="success", + collections=[ + CollectionInfo(name=old_name, documents=1, document_names=[]), + ], + total_count=1, + message="ok", + warnings=[], + ) + + +def test_kb_rename_non_admin_rejects_shared_collection_name(rename_users_env) -> None: + """Non-admin must not rename when multiple users hold the same collection name.""" + app, headers, user, _, _, _, _ = rename_users_env + client = TestClient(app) + old_name = "shared_project" + new_name = "shared_project_new" + + mock_metadata_store = MagicMock() + mock_metadata_store.count_users_with_collection_config = AsyncMock(return_value=2) + + with ( + patch("xagent.web.api.kb._ensure_collection_access", new_callable=AsyncMock), + patch( + "xagent.web.api.kb._list_collections_with_retry", + new_callable=AsyncMock, + return_value=_visible_only_old(old_name), + ), + patch( + "xagent.web.api.kb.get_metadata_store", + return_value=mock_metadata_store, + ), + patch("xagent.web.api.kb.rename_collection_storage") as mock_physical, + patch("xagent.web.api.kb.get_vector_index_store") as mock_vector_factory, + ): + response = client.put( + f"/api/kb/collections/{old_name}", + data={"new_name": new_name}, + headers=headers, + ) + + assert response.status_code == 409, response.text + detail = response.json()["detail"].lower() + assert "shared" in detail or "multiple" in detail + mock_physical.assert_not_called() + mock_vector_factory.return_value.rename_collection_data.assert_not_called() + mock_metadata_store.rename_collection.assert_not_called() + + +def test_kb_rename_non_admin_succeeds_when_sole_collection_occupant( + rename_users_env, +) -> None: + """Non-admin may rename when they are the only config occupant for the old name.""" + app, headers, user, _, _, _, _ = rename_users_env + client = TestClient(app) + old_name = "solo_project" + new_name = "solo_project_new" + + mock_metadata_store = MagicMock() + mock_metadata_store.count_users_with_collection_config = AsyncMock(return_value=1) + mock_metadata_store.rename_collection = AsyncMock() + + mock_vector_store = MagicMock() + mock_vector_store.list_document_records.return_value = [] + mock_vector_store.rename_collection_data.return_value = [] + + mock_rename_result = MagicMock() + mock_rename_result.status = "not_found" + mock_rename_result.error = None + mock_rename_result.old_collection_dir = None + mock_rename_result.new_collection_dir = None + + with ( + patch("xagent.web.api.kb._ensure_collection_access", new_callable=AsyncMock), + patch( + "xagent.web.api.kb._list_collections_with_retry", + new_callable=AsyncMock, + return_value=_visible_only_old(old_name), + ), + patch( + "xagent.web.api.kb.get_metadata_store", + return_value=mock_metadata_store, + ), + patch( + "xagent.web.api.kb.get_vector_index_store", + return_value=mock_vector_store, + ), + patch( + "xagent.web.api.kb.rename_collection_storage", + return_value=mock_rename_result, + ), + patch( + "xagent.core.tools.core.RAG_tools.management.status.load_ingestion_status", + return_value=[], + ), + patch( + "xagent.core.tools.core.RAG_tools.management.status.write_ingestion_status", + ), + patch( + "xagent.core.tools.core.RAG_tools.management.status.clear_ingestion_status", + ), + ): + response = client.put( + f"/api/kb/collections/{old_name}", + data={"new_name": new_name}, + headers=headers, + ) + + assert response.status_code == 200, response.text + mock_vector_store.rename_collection_data.assert_called_once_with( + collection_name=old_name, + new_name=new_name, + user_id=int(user.id), + is_admin=False, + ) + mock_metadata_store.rename_collection.assert_awaited_once_with( + old_name=old_name, + new_name=new_name, + user_id=int(user.id), + is_admin=False, + ) + + +def test_kb_rename_admin_requires_target_user_id(rename_users_env) -> None: + """Admin rename must specify which user's collection scope to update.""" + app, _, _, admin_headers, _, _, _ = rename_users_env + client = TestClient(app) + + with patch("xagent.web.api.kb._ensure_collection_access", new_callable=AsyncMock): + response = client.put( + "/api/kb/collections/admin_old", + data={"new_name": "admin_new"}, + headers=admin_headers, + ) + + assert response.status_code == 422, response.text + assert "target_user_id" in response.json()["detail"].lower() + + +def test_kb_rename_admin_scopes_physical_vector_and_metadata_to_target_user( + rename_users_env, +) -> None: + """Admin passes target_user_id; all rename side effects use that user only.""" + app, _, _, admin_headers, _, target_user, _ = rename_users_env + client = TestClient(app) + old_name = "admin_scope_old" + new_name = "admin_scope_new" + target_user_id = int(target_user.id) + + mock_metadata_store = MagicMock() + mock_metadata_store.count_users_with_collection_config = AsyncMock(return_value=2) + mock_metadata_store.rename_collection = AsyncMock() + + mock_vector_store = MagicMock() + mock_vector_store.list_document_records.return_value = [] + mock_vector_store.rename_collection_data.return_value = [] + + mock_rename_result = MagicMock() + mock_rename_result.status = "not_found" + mock_rename_result.error = None + mock_rename_result.old_collection_dir = None + mock_rename_result.new_collection_dir = None + + with ( + patch("xagent.web.api.kb._ensure_collection_access", new_callable=AsyncMock), + patch( + "xagent.web.api.kb._list_collections_with_retry", + new_callable=AsyncMock, + return_value=_visible_only_old(old_name), + ), + patch( + "xagent.web.api.kb.get_metadata_store", + return_value=mock_metadata_store, + ), + patch( + "xagent.web.api.kb.get_vector_index_store", + return_value=mock_vector_store, + ), + patch( + "xagent.web.api.kb.rename_collection_storage", + return_value=mock_rename_result, + ) as mock_physical, + patch( + "xagent.core.tools.core.RAG_tools.management.status.load_ingestion_status", + return_value=[], + ) as mock_load_status, + patch( + "xagent.core.tools.core.RAG_tools.management.status.write_ingestion_status", + ), + patch( + "xagent.core.tools.core.RAG_tools.management.status.clear_ingestion_status", + ), + ): + response = client.put( + f"/api/kb/collections/{old_name}", + data={"new_name": new_name, "target_user_id": str(target_user_id)}, + headers=admin_headers, + ) + + assert response.status_code == 200, response.text + mock_physical.assert_called_once() + assert mock_physical.call_args.kwargs["user_id"] == target_user_id + mock_vector_store.rename_collection_data.assert_called_once_with( + collection_name=old_name, + new_name=new_name, + user_id=target_user_id, + is_admin=False, + ) + mock_metadata_store.rename_collection.assert_awaited_once_with( + old_name=old_name, + new_name=new_name, + user_id=target_user_id, + is_admin=True, + ) + mock_load_status.assert_called_once_with( + collection=old_name, + user_id=target_user_id, + is_admin=False, + ) + + +def test_kb_rename_migrates_ingestion_status_only_for_scoped_user( + rename_users_env, +) -> None: + """Ingestion status migration must not pull other users' rows for the old name.""" + app, headers, user, _, _, _, _ = rename_users_env + client = TestClient(app) + old_name = "status_scope_old" + new_name = "status_scope_new" + + mock_metadata_store = MagicMock() + mock_metadata_store.count_users_with_collection_config = AsyncMock(return_value=1) + mock_metadata_store.rename_collection = AsyncMock() + + mock_vector_store = MagicMock() + mock_vector_store.list_document_records.return_value = [] + mock_vector_store.rename_collection_data.return_value = [] + + mock_rename_result = MagicMock() + mock_rename_result.status = "not_found" + mock_rename_result.error = None + mock_rename_result.old_collection_dir = None + mock_rename_result.new_collection_dir = None + + status_rows = [ + { + "collection": old_name, + "doc_id": "doc-a", + "status": "success", + "message": "", + "parse_hash": "h1", + "user_id": int(user.id), + } + ] + + with ( + patch("xagent.web.api.kb._ensure_collection_access", new_callable=AsyncMock), + patch( + "xagent.web.api.kb._list_collections_with_retry", + new_callable=AsyncMock, + return_value=_visible_only_old(old_name), + ), + patch( + "xagent.web.api.kb.get_metadata_store", + return_value=mock_metadata_store, + ), + patch( + "xagent.web.api.kb.get_vector_index_store", + return_value=mock_vector_store, + ), + patch( + "xagent.web.api.kb.rename_collection_storage", + return_value=mock_rename_result, + ), + patch( + "xagent.core.tools.core.RAG_tools.management.status.load_ingestion_status", + return_value=status_rows, + ) as mock_load_status, + patch( + "xagent.core.tools.core.RAG_tools.management.status.write_ingestion_status", + ) as mock_write_status, + patch( + "xagent.core.tools.core.RAG_tools.management.status.clear_ingestion_status", + ) as mock_clear_status, + ): + response = client.put( + f"/api/kb/collections/{old_name}", + data={"new_name": new_name}, + headers=headers, + ) + + assert response.status_code == 200, response.text + mock_load_status.assert_called_once_with( + collection=old_name, + user_id=int(user.id), + is_admin=False, + ) + mock_write_status.assert_called_once() + mock_clear_status.assert_called_once_with( + old_name, + "doc-a", + user_id=int(user.id), + is_admin=False, + )