Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 32 additions & 6 deletions frontend/src/components/kb/knowledge-base-creation-dialog.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,12 @@ import { CloudConnectDialog, CloudFile } from "./cloud-connect-dialog"
interface IngestionResult {
collection: string
document_count: number
chunks_count: number
chunk_count: number
embedding_count?: number
vector_count?: number
status: string
message: string
warnings?: string[]
failed_step?: string
}

Expand Down Expand Up @@ -357,16 +360,29 @@ export function KnowledgeBaseCreationDialog({ open, onOpenChange, onSuccess }: K
}))
}

const result = isJsonRecord(parsed.data) ? parsed.data as unknown as IngestionResult : null
const result = isJsonRecord(parsed.data)
? (parsed.data as unknown as IngestionResult)
: null
if (!result) {
throw new Error(t("kb.errors.uploadFailedFile", { name: file.name }))
}
setIngestionResults(prev => [...prev, result])

if (result.status === "partial" && result.failed_step) {
throw new Error(result.message || t("kb.errors.failedAtStep", { step: result.failed_step }))
if (result.status === "error") {
throw new Error(result.message || t("kb.errors.uploadFailedFile", { name: file.name }))
}
if (result.status === "partial") {
toast.warning(
result.message + (result.warnings?.length ? ` Warnings: ${result.warnings.join(", ")}` : "")
)
}
if (result.status === "success" && (result.embedding_count ?? 0) === 0 && (result.chunk_count ?? 0) > 0) {
toast.error(
"文档上传成功,但 embedding 生成失败(" + result.chunk_count + " 个 chunks 未生成 embedding)。文档无法被搜索。请检查 embedding 模型配置和 API 状态。" +
(result.warnings?.length ? ` 警告: ${result.warnings.join(", ")}` : "")
)
}

setIngestionResults(prev => [...prev, result])
successfulCollections.push(collectionName)
setCompletedUploadCount(i + 1)
setUploadProgress(((i + 1) / selectedFiles.length) * 100)
Expand Down Expand Up @@ -457,6 +473,16 @@ export function KnowledgeBaseCreationDialog({ open, onOpenChange, onSuccess }: K
if (!result) {
throw new Error(t("kb.errors.webIngestFailed"))
}

if (result.status === "error") {
throw new Error(result.message || t("kb.errors.webIngestFailed"))
}
if (result.status === "partial") {
toast.warning(
result.message + (result.warnings?.length ? ` Warnings: ${result.warnings.join(", ")}` : "")
)
}

setWebIngestionResult(result)
setWebIngestionProgress(100)

Expand Down Expand Up @@ -1011,7 +1037,7 @@ export function KnowledgeBaseCreationDialog({ open, onOpenChange, onSuccess }: K
{result.document_count} {t("kb.dialog.fileUpload.processResult.createDocuments")}
</Badge>
<Badge variant="secondary" className="text-xs font-normal">
{result.chunks_count} {t("kb.dialog.fileUpload.processResult.textChunks")}
{result.chunk_count} {t("kb.dialog.fileUpload.processResult.textChunks")}
</Badge>
</>
)}
Expand Down
112 changes: 112 additions & 0 deletions src/xagent/core/tools/core/RAG_tools/pipelines/document_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import logging
import numbers
import os
import time
from typing import Any, Dict, List, Mapping, Optional, Sequence, Tuple, Union, cast

import requests
Expand Down Expand Up @@ -515,6 +516,8 @@ def search_documents(
user_id = scope.user_id
is_admin = scope.is_admin

search_start = time.time()

cfg = (
config
if isinstance(config, SearchConfig)
Expand Down Expand Up @@ -583,20 +586,44 @@ def search_documents(
message = "Search completed successfully"

if requested_type == SearchType.SPARSE:
logger.info(
"Executing SPARSE search: UserID=%s, Collection=%s, TopK=%d, ModelTag=%s",
user_id,
collection,
fetch_top_k,
embedding_model_id,
)
sparse_start = time.time()
with progress_tracker.track_step("sparse_search"):
pass
current_step = "search_sparse"
results, status, sparse_warnings, message = _execute_sparse_search(
collection, query_text, cfg, embedding_model_id, user_id, is_admin
)
sparse_elapsed = int((time.time() - sparse_start) * 1000)
logger.info(
"SPARSE search completed: Found %d results, Status=%s, Elapsed=%dms, UserID=%s",
len(results),
status,
sparse_elapsed,
user_id,
)
warnings.extend(sparse_warnings)
else:
# Use embedding adapter for dense/hybrid paths
try:
with progress_tracker.track_step("encode_query"):
pass
current_step = "encode_query_vector"
encode_start = time.time()
query_vector = _encode_query_vector(embedding_adapter, query_text)
encode_elapsed = int((time.time() - encode_start) * 1000)
logger.info(
"Query vector encoded: Dimension=%d, Elapsed=%dms, UserID=%s",
len(query_vector),
encode_elapsed,
user_id,
)
except VectorValidationError:
if requested_type == SearchType.HYBRID and cfg.fallback_to_sparse:
current_step = "search_sparse_fallback"
Expand All @@ -620,6 +647,17 @@ def search_documents(
raise
else:
if requested_type == SearchType.DENSE:
logger.info(
"Executing DENSE search: UserID=%s, Collection=%s, TopK=%d, ModelTag=%s, "
"NProbes=%s, RefineFactor=%s",
user_id,
collection,
fetch_top_k,
embedding_model_id,
cfg.nprobes,
cfg.refine_factor,
)
dense_start = time.time()
with progress_tracker.track_step("dense_search"):
pass
dense_response: DenseSearchResponse = search_dense(
Expand All @@ -641,7 +679,28 @@ def search_documents(
message = (
advice if advice else "Dense search completed successfully"
)
dense_elapsed = int((time.time() - dense_start) * 1000)
logger.info(
"DENSE search completed: Found %d results, Status=%s, Elapsed=%dms, "
"IndexAdvice=%s, UserID=%s",
len(results),
status,
dense_elapsed,
advice or "None",
user_id,
)
else: # HYBRID
logger.info(
"Executing HYBRID search: UserID=%s, Collection=%s, TopK=%d, ModelTag=%s, "
"FusionConfig=%s, FallbackToSparse=%s",
user_id,
collection,
fetch_top_k,
embedding_model_id,
cfg.fusion_config,
cfg.fallback_to_sparse,
)
hybrid_start = time.time()
try:
with progress_tracker.track_step("hybrid_search"):
pass
Expand Down Expand Up @@ -684,6 +743,7 @@ def search_documents(
current_step = "search_hybrid"
raise
else:
hybrid_elapsed = int((time.time() - hybrid_start) * 1000)
warnings.extend(_serialize_warnings(hybrid_response.warnings))
results = list(hybrid_response.results)
status = hybrid_response.status or "success"
Expand All @@ -692,14 +752,66 @@ def search_documents(
if status == "success"
else "Hybrid search completed with warnings"
)
logger.info(
"HYBRID search completed: Found %d results, Status=%s, Elapsed=%dms, UserID=%s",
len(results),
status,
hybrid_elapsed,
user_id,
)

# Apply optional rerank
current_step = "apply_rerank"
rerank_start = time.time()
results, used_rerank, rerank_warnings = _apply_rerank_if_needed(
results, query_text, cfg
)
rerank_elapsed = int((time.time() - rerank_start) * 1000)
warnings.extend(rerank_warnings)

if used_rerank:
logger.info(
"Rerank applied: Input results=%d, Output results=%d, Elapsed=%dms",
len(results) + len(rerank_warnings),
len(results),
rerank_elapsed,
)
else:
logger.debug("Rerank skipped: Not configured or no results to rerank")

total_elapsed = int((time.time() - search_start) * 1000)
logger.info(
"Document search COMPLETED: UserID=%s, Collection=%s, Status=%s, SearchType=%s, "
"Results=%d, TopK=%d, TotalElapsed=%dms, UsedRerank=%s, Warnings=%d, "
"Model=%s",
user_id,
collection,
status.upper(),
actual_type.value,
len(results),
cfg.top_k,
total_elapsed,
used_rerank,
len(warnings),
embedding_model_id,
)
if status == "success":
logger.info(
"Search SUCCESS: UserID=%s, Found %d results in %dms using %s search with model %s",
user_id,
len(results),
total_elapsed,
actual_type.value,
embedding_model_id,
)
elif status != "success":
logger.warning(
"Search completed with status '%s': Found %d results, Warnings: %d",
status,
len(results),
len(warnings),
)

return _build_pipeline_result(
status=status,
search_type=actual_type,
Expand Down
44 changes: 39 additions & 5 deletions src/xagent/core/tools/core/RAG_tools/storage/contracts.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,16 +334,36 @@ async def delete_collection_metadata(
"""Delete persisted metadata/config rows for a collection."""

@abstractmethod
async def rename_collection(self, old_name: str, new_name: str) -> None:
"""Rename persisted control-plane keys after a data-plane collection rename.
async def count_users_with_collection_config(self, collection_name: str) -> int:
"""Count distinct users with a ``collection_config`` row for ``collection_name``.

Updates rows that gate :meth:`list_collections` visibility (for example
per-tenant config rows and aggregate metadata) so they stay aligned with
vector tables when the ``collection`` / ``name`` fields change.
Args:
collection_name: Collection name (sanitized by the caller).

Returns:
Number of distinct ``user_id`` values occupying the name.
"""

@abstractmethod
async def rename_collection(
self,
old_name: str,
new_name: str,
*,
user_id: int,
is_admin: bool = False,
) -> None:
"""Rename persisted control-plane keys for one user's collection scope.

Updates the caller's ``collection_config`` row. Global ``collection_metadata``
is renamed only when this user is the sole config occupant of ``old_name``.

Args:
old_name: Previous collection name (sanitized by the caller).
new_name: Target collection name (sanitized by the caller).
user_id: User whose config (and optionally metadata) should be renamed.
is_admin: Whether the operation is performed by an admin on behalf of
``user_id`` (does not broaden vector/config scope beyond ``user_id``).
"""

@abstractmethod
Expand Down Expand Up @@ -445,9 +465,23 @@ def rename_collection_data(
self,
collection_name: str,
new_name: str,
user_id: Optional[int],
is_admin: bool,
) -> List[str]:
"""Rename collection key across vector-side tables.

Applies the same multi-tenancy filter semantics as
:meth:`delete_collection_data`: non-admin callers only rename rows they
can see (``user_id`` match; legacy NULL ``user_id`` is admin-only).
Admins omit the user predicate and rename all rows for the old collection
name.

Args:
collection_name: Current collection name stored in vector tables.
new_name: Target collection name after rename.
user_id: User ID for multi-tenancy filtering.
is_admin: Whether the caller has admin privileges.

Returns:
Warning messages generated during best-effort updates.
"""
Expand Down
Loading
Loading