Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
6e67c45
feat(kb): complete phase 1A storage decoupling and tests
sqhyz55 Mar 16, 2026
810316b
fix(tests): prevent LanceDB default-dir pollution
sqhyz55 Mar 17, 2026
7834437
feat(kb): unify embedding model identity on hub id
sqhyz55 Mar 19, 2026
0b43fc9
feat(kb): complete Phase 1A storage decoupling with abstraction layer
sqhyz55 Mar 24, 2026
5b63a5d
WIP local
sqhyz55 Mar 31, 2026
240215f
feat(storage): implement IngestionStatusStore and unified StorageFactory
sqhyz55 Apr 1, 2026
786522b
feat(kb): Phase 2.2 - index management abstraction and cleanup
sqhyz55 Apr 1, 2026
95b2388
feat(kb): Phase 2.3 - PromptTemplateStore and MainPointerStore with u…
sqhyz55 Apr 1, 2026
7224079
feat(kb): complete Phase 2.4 - full PromptTemplateStore/MainPointerSt…
sqhyz55 Apr 1, 2026
41c19b4
fix(linters): resolve all Python linter errors and warnings
sqhyz55 Apr 1, 2026
f1b4826
test(rag): fix RAG flow tests for Phase 1A storage decoupling
sqhyz55 Apr 1, 2026
9e38e82
test(storage): fix schema mock errors in test_lancedb_stores.py
sqhyz55 Apr 1, 2026
dcbd9e4
test(storage): add comprehensive async/upsert tests and structured lo…
sqhyz55 Apr 2, 2026
7953bfc
fix(storage): complete Phase 1A Issue #14 and add comprehensive tests
sqhyz55 Apr 2, 2026
f346c7a
fix(storage): resolve remaining PR #158 review comments and test fail…
sqhyz55 Apr 6, 2026
697944c
fix(storage): implement IndexResult for create_index and fix async FT…
sqhyz55 Apr 6, 2026
1b8110a
refactor(storage): complete Phase 1A storage decoupling optimizations
sqhyz55 Apr 7, 2026
61ac911
fix(retrieval): complete Phase 1A test adaptation and fix sparse sear…
sqhyz55 Apr 7, 2026
b325783
fix(retrieval): unify exception handling and improve test mock levels
sqhyz55 Apr 7, 2026
c8d6bd6
fix(schema): fix concurrent table creation test by using separate con…
sqhyz55 Apr 7, 2026
d50adcf
fix(schema): fix field name inconsistency in collection_metadata schema
sqhyz55 Apr 7, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,17 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).

### Changed

- **Knowledge Base embedding model binding (breaking / migration)**
The Knowledge Base now treats the **Model Hub ID** as the single source of truth for embedding model identity:
- `collection_metadata.embedding_model_id` stores the Hub ID (trimmed; no other normalization).
- Embeddings tables are named by Hub ID: `embeddings_{to_model_tag(hub_id)}`.
- The `model` field stored alongside each embedding vector is the Hub ID.

**Migration / backward compatibility:** Older deployments may have created embeddings tables using the provider `model_name`
(e.g. `embeddings_text-embedding-v4`). During search and embedding reads, the system will **try the new Hub-ID table first**
and automatically **fall back to the legacy table name** derived from the resolved `model_name` when the new table is missing.
Rebuild/inference helpers were updated to prefer Hub IDs when they can be resolved from Model Hub metadata.

- **Knowledge Base upload: default parse method (breaking)**
The default parse method on the KB detail upload form is now `"default"` instead of `"pypdf"`. The backend chooses the parser by file type (e.g. .docx, .pdf). If you rely on the previous default (always use PyPDF), select `"pypdf"` explicitly in the parse method dropdown when uploading.

Expand Down
11 changes: 3 additions & 8 deletions src/xagent/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,12 +189,7 @@ def get_lancedb_path() -> Path:

Priority:
1. LANCEDB_PATH environment variable
2. Default to ./data/lancedb (relative to cwd)

.. warning::
Default to ``./data/lancedb``, which is **relative** to cwd, **NOT**
relative to ``storage_root``. This behavior is kept for backward
compatibility but may change in the future (see proposal #246).
2. Default to STORAGE_ROOT/data/lancedb

Returns:
Path object for LanceDB directory
Expand All @@ -203,8 +198,8 @@ def get_lancedb_path() -> Path:
if env_path:
return Path(env_path)

# Default: ./data/lancedb
return Path("data/lancedb")
# Default: storage_root/data/lancedb
return get_storage_root() / "data" / "lancedb"


def get_default_sqlite_db_path() -> str:
Expand Down
112 changes: 49 additions & 63 deletions src/xagent/core/tools/core/RAG_tools/chunk/chunk_document.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

import pandas as pd

from ......providers.vector_store.lancedb import get_connection_from_env
Comment thread
rogercloud marked this conversation as resolved.
from ..core.config import (
DEFAULT_IMAGE_CONTEXT_SIZE,
DEFAULT_TABLE_CONTEXT_SIZE,
Expand All @@ -23,12 +22,9 @@
DocumentValidationError,
)
from ..core.schemas import ChunkStrategy
from ..LanceDB.schema_manager import ensure_chunks_table
from ..storage.factory import get_vector_index_store
from ..utils.hash_utils import compute_chunk_hash
from ..utils.lancedb_query_utils import query_to_list
from ..utils.metadata_utils import deserialize_metadata, serialize_metadata
from ..utils.string_utils import build_lancedb_filter_expression
from ..utils.user_permissions import UserPermissions
from .chunk_strategies import (
apply_fixed_size_strategy,
apply_markdown_strategy,
Expand Down Expand Up @@ -109,14 +105,6 @@ def chunk_document(
f"Starting document chunking: doc_id={doc_id}, strategy={chunk_strategy}"
)

# Get database connection
try:
conn = get_connection_from_env()
ensure_chunks_table(conn)
except Exception as e:
logger.error(f"Database connection failed: {e}")
raise DatabaseOperationError(f"Failed to connect to database: {e}") from e

# Validate chunk parameters
_validate_chunk_params(chunk_strategy, params)

Expand Down Expand Up @@ -251,8 +239,7 @@ def _chunks_exist(
) -> bool:
"""Check if chunk records already exist."""
try:
conn = get_connection_from_env()
table = conn.open_table("chunks")
vector_store = get_vector_index_store()

# Build safe filter expression using utility function
query_filters = {
Expand All @@ -261,8 +248,7 @@ def _chunks_exist(
"parse_hash": parse_hash,
"config_hash": config_hash,
}
filter_expr = build_lancedb_filter_expression(query_filters)
return bool(table.count_rows(filter_expr) > 0)
return vector_store.count_rows_or_zero("chunks", filters=query_filters) > 0
except Exception as e:
logger.error(f"Failed to check chunk existence: {e}")
raise DatabaseOperationError(f"Database query failed: {e}") from e
Expand Down Expand Up @@ -293,8 +279,7 @@ def _get_existing_chunks(
List of existing chunks accessible to the user
"""
try:
conn = get_connection_from_env()
table = conn.open_table("chunks")
vector_store = get_vector_index_store()

# Build safe filter expression using utility function
query_filters = {
Expand All @@ -303,25 +288,28 @@ def _get_existing_chunks(
"parse_hash": parse_hash,
"config_hash": config_hash,
}
base_filter_expr = build_lancedb_filter_expression(query_filters)

# Add user permission filter for multi-tenancy
user_filter_expr = UserPermissions.get_user_filter(user_id, is_admin)

# Combine filters
if user_filter_expr and base_filter_expr:
filter_expr = f"({base_filter_expr}) and ({user_filter_expr})"
elif user_filter_expr:
filter_expr = user_filter_expr
else:
filter_expr = base_filter_expr

# OPTIMIZATION: Use count_rows() for memory-efficient existence check
if table.count_rows(filter_expr) == 0:
# OPTIMIZATION: Use count_rows_or_zero() for memory-efficient existence check
if (
vector_store.count_rows_or_zero(
"chunks", filters=query_filters, user_id=user_id, is_admin=is_admin
)
== 0
):
return []

# OPTIMIZATION: Use unified query_to_list() with three-tier fallback
chunks_data = query_to_list(table.search().where(filter_expr))
# Use iter_batches to load chunks
chunks_data = []
for batch in vector_store.iter_batches(
table_name="chunks",
filters=query_filters,
user_id=user_id,
is_admin=is_admin,
):
# Convert batch to pandas for easier row-by-row processing
batch_df = batch.to_pandas()
for _, row in batch_df.iterrows():
chunks_data.append(row.to_dict())

# Convert to expected format with metadata deserialization
# Arrow/to_list() returns None instead of NaN, so direct None check is sufficient
Expand Down Expand Up @@ -372,35 +360,37 @@ def _load_paragraphs(
) -> List[Dict[str, Any]]:
"""Load parsed content from parses table."""
try:
conn = get_connection_from_env()
table = conn.open_table("parses")
vector_store = get_vector_index_store()

# Build safe filter expression using utility function
query_filters = {
"collection": collection,
"doc_id": doc_id,
"parse_hash": parse_hash,
}
base_filter_expr = build_lancedb_filter_expression(query_filters)

# Add user permission filter for multi-tenancy
user_filter_expr = UserPermissions.get_user_filter(user_id, is_admin)

# Combine filters
if user_filter_expr and base_filter_expr:
filter_expr = f"({base_filter_expr}) and ({user_filter_expr})"
elif user_filter_expr:
filter_expr = user_filter_expr
else:
filter_expr = base_filter_expr

# First check if any parse exists using efficient count_rows
if table.count_rows(filter_expr) == 0:
# First check if any parse exists using efficient count_rows_or_zero
if (
vector_store.count_rows_or_zero(
"parses", filters=query_filters, user_id=user_id, is_admin=is_admin
)
== 0
):
return []

# Only load data if parse exists
# OPTIMIZATION: Use unified query_to_list() with three-tier fallback
records = query_to_list(table.search().where(filter_expr))
# Load data using iter_batches
records = []
for batch in vector_store.iter_batches(
table_name="parses",
filters=query_filters,
user_id=user_id,
is_admin=is_admin,
):
# Convert batch to pandas for easier row-by-row processing
batch_df = batch.to_pandas()
for _, row in batch_df.iterrows():
records.append(row.to_dict())

if not records:
return []
record = records[0]
Expand Down Expand Up @@ -445,11 +435,8 @@ def _write_chunks_to_db(
user_id: Optional[int] = None,
is_admin: bool = False,
) -> bool:
"""Write chunk records to database."""
"""Write chunk records to database using abstraction layer."""
try:
conn = get_connection_from_env()
table = conn.open_table("chunks")

rows = []
for chunk in chunks:
text = chunk["text"]
Expand Down Expand Up @@ -477,11 +464,10 @@ def _write_chunks_to_db(
if not rows:
return False

# Use merge_insert for efficient upsert operation
# This handles cases where chunks might already exist (idempotent operation)
table.merge_insert(
["collection", "doc_id", "parse_hash", "chunk_id"]
).when_matched_update_all().when_not_matched_insert_all().execute(rows)
# Use abstraction layer for upsert
vector_store = get_vector_index_store()
vector_store.upsert_chunks(rows)

logger.info(
f"Chunk records written to database: doc_id={doc_id}, parse_hash={parse_hash}, config_hash={config_hash}"
)
Expand Down
30 changes: 30 additions & 0 deletions src/xagent/core/tools/core/RAG_tools/core/config.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import os
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, Final, Mapping, Sequence
Expand Down Expand Up @@ -55,13 +56,42 @@
Set to 0 to disable any artificial throttling.
"""

DEFAULT_LANCEDB_BATCH_SIZE: Final[int] = 1000
"""Default batch size for embedding writes to LanceDB (env: LANCEDB_BATCH_SIZE)."""

DEFAULT_VECTOR_STORE_SCAN_LIMIT: Final[int] = 10_000
"""Default max rows scanned in vector-store document listing operations."""

DEFAULT_VECTOR_STORE_EXTENDED_SCAN_LIMIT: Final[int] = 1_000_000
"""Higher limit for operations like listing all documents in a collection or deleting a collection."""

# Reserved int64 lower bound for internal system sentinel values.
MIN_INT64: Final[int] = -(2**63)
"""Minimum 64-bit integer, used as internal sentinel value."""

# Stable expression that always matches no rows for unauthenticated reads.
UNAUTHENTICATED_NO_ACCESS_FILTER: Final[str] = (
"(user_id IS NULL and user_id IS NOT NULL)"
)
"""A stable LanceDB filter expression that always matches no rows."""

ENABLE_AUTO_EMBEDDINGS_MIGRATION: Final[bool] = (
os.getenv("ENABLE_AUTO_EMBEDDINGS_MIGRATION", "false").lower() == "true"
)
"""
Enable automatic forward migration of legacy embeddings tables.

When disabled (default), the system will not automatically migrate data from
legacy table names (embeddings_{model_name}) to new Hub ID-based names
(embeddings_{hub_id}). This prevents unexpected data movement and performance
impact during normal operations.

To enable automatic migration, set the environment variable:
ENABLE_AUTO_EMBEDDINGS_MIGRATION=true

Automatic migration should only be enabled during controlled maintenance windows
or when explicitly executing migration tools.
"""

# Parameters that affect parse hash
PARSE_PARAM_WHITELIST: Final[Sequence[str]] = (
Expand Down
39 changes: 34 additions & 5 deletions src/xagent/core/tools/core/RAG_tools/core/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,29 @@ class HybridSearchResponse(BaseModel):
)


class IndexResult(BaseModel):
"""Structured result from index creation operations.

This model replaces the previous string-based return format for create_index,
providing type-safe access to index status, advice, and FTS enabled state.

Attributes:
status: Index creation status (e.g., "index_ready", "readonly", "failed")
advice: Optional advice message for further actions
fts_enabled: Whether FTS index is actually enabled (separate from vector index)
"""

model_config = ConfigDict(frozen=True)

status: str = Field(..., description="Index creation status")
advice: Optional[str] = Field(
default=None, description="Human-readable index advice"
)
fts_enabled: bool = Field(
default=False, description="Whether FTS index is enabled on text column"
)


class SearchConfig(BaseModel):
"""Configuration for the unified document search pipeline."""

Expand Down Expand Up @@ -1319,7 +1342,15 @@ def is_initialized(self) -> bool:

@classmethod
def from_storage(cls, data: dict) -> "CollectionInfo":
"""Factory method to load from LanceDB, handling migration automatically."""
"""Load from storage dict with in-memory schema normalization.

Legacy rows (e.g. ``schema_version`` missing / ``0.0.0``) are upgraded
**in memory only** via :func:`~.migration_utils.migrate_collection_metadata`
with ``infer_embedding=False`` so this path does **not** open LanceDB or
scan embedding tables (read-side-effect-free). For full migration with
embedding inference, call ``migrate_collection_metadata(data)`` explicitly
(e.g. admin repair or write pipeline).
"""
import json
import math

Expand All @@ -1342,14 +1373,12 @@ def from_storage(cls, data: dict) -> "CollectionInfo":
if isinstance(value, float) and math.isnan(value):
data[key] = None

# 3. Check version and migrate if needed
# 3. Check version and migrate if needed (no DB access on read path)
current_version = "1.0.0"
data_version = data.get("schema_version", "0.0.0")

if data_version < current_version:
data = migrate_collection_metadata(data)
# Note: In LanceDB, we don't auto-save migrated data here
# It will be saved when the collection is next updated
data = migrate_collection_metadata(data, infer_embedding=False)

return cls(**data)

Expand Down
Loading
Loading