feat(rag): add active generation schema and store (#438 PR1)#452
feat(rag): add active generation schema and store (#438 PR1)#452sqhyz55 wants to merge 4 commits into
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces the ActiveGenerationStore and its LanceDB implementation to track published searchable generations, updating the schemas for chunks and embeddings tables to include generation_id and config_hash while providing migration utilities for existing data. Feedback recommends optimizing performance by caching table existence checks and improving the atomicity of the publication process to reduce database round-trips.
| def _ensure_table(self) -> None: | ||
| from ..LanceDB.schema_manager import ensure_active_generations_table | ||
|
|
||
| ensure_active_generations_table(self._get_sync_connection()) |
There was a problem hiding this comment.
The _ensure_table method is called on every database operation (publish, get, list). Since it performs a directory listing via list_table_names to check for table existence, this can become a performance bottleneck during high-volume ingestion. Caching the result of this check after the first successful execution would significantly reduce overhead.
| def _ensure_table(self) -> None: | |
| from ..LanceDB.schema_manager import ensure_active_generations_table | |
| ensure_active_generations_table(self._get_sync_connection()) | |
| def _ensure_table(self) -> None: | |
| if getattr(self, "_table_ensured", False): | |
| return | |
| from ..LanceDB.schema_manager import ensure_active_generations_table | |
| ensure_active_generations_table(self._get_sync_connection()) | |
| self._table_ensured = True |
| now = datetime.now(timezone.utc).replace(tzinfo=None) | ||
| existing = self.get_active_generation( | ||
| collection=collection, | ||
| doc_id=doc_id, | ||
| parse_hash=parse_hash, | ||
| user_id=user_id, | ||
| model_tag=model_tag, | ||
| ) | ||
| created_at = existing["created_at"] if existing else now | ||
|
|
||
| record = { | ||
| "collection": collection, | ||
| "doc_id": doc_id, | ||
| "parse_hash": parse_hash, | ||
| "user_id": user_id, | ||
| "model_tag": normalized_tag, | ||
| "generation_id": generation_id, | ||
| "config_hash": config_hash, | ||
| "created_at": created_at, | ||
| "updated_at": now, | ||
| "published_at": now, | ||
| "operator": operator or "unknown", | ||
| } | ||
|
|
||
| ( | ||
| table.merge_insert( | ||
| on=[ | ||
| "collection", | ||
| "doc_id", | ||
| "parse_hash", | ||
| "user_id", | ||
| "model_tag", | ||
| ] | ||
| ) | ||
| .when_matched_update_all() | ||
| .when_not_matched_insert_all() | ||
| .execute([record]) | ||
| ) |
There was a problem hiding this comment.
The current implementation performs an extra round-trip to the database to fetch the existing record just to preserve the created_at timestamp. This can be achieved more efficiently and atomically using merge_insert with a specific when_matched_update clause that excludes created_at. This avoids the overhead of a separate search operation.
try:
now = datetime.now(timezone.utc).replace(tzinfo=None)
record = {
"collection": collection,
"doc_id": doc_id,
"parse_hash": parse_hash,
"user_id": user_id,
"model_tag": normalized_tag,
"generation_id": generation_id,
"config_hash": config_hash,
"created_at": now,
"updated_at": now,
"published_at": now,
"operator": operator or "unknown",
}
(
table.merge_insert(
on=[
"collection",
"doc_id",
"parse_hash",
"user_id",
"model_tag",
]
)
.when_matched_update(
updates={
"generation_id": "source.generation_id",
"config_hash": "source.config_hash",
"updated_at": "source.updated_at",
"published_at": "source.published_at",
"operator": "source.operator",
}
)
.when_not_matched_insert_all()
.execute([record])
)
finally:
_safe_close_table(table)There was a problem hiding this comment.
当前依赖的 LanceDB 0.30.2 的 merge_insert 只有:
when_matched_update_all()
when_not_matched_insert_all()
when_not_matched_by_source_delete()
没有 when_matched_update(updates={...})
rogercloud
left a comment
There was a problem hiding this comment.
I found one issue in the active generation pointer store around nullable legacy user scope handling.
| } | ||
|
|
||
| ( | ||
| table.merge_insert(on=list(self._MERGE_KEYS)) |
There was a problem hiding this comment.
This breaks legacy scopes where user_id is None. active_generations.user_id is nullable and the RAG model still allows Optional[int]; when callers publish with user_id=None, the pre-read builds user_id == NULL (which does not match NULL rows), and this merge_insert also cannot match an existing NULL-key row because the on columns are joined by equality. Re-publishing the same legacy scope therefore inserts duplicate active pointers, while get_active_generation(..., None, ...) returns no row.
A no-schema-change fix would be to normalize the active-generation store's legacy scope to a non-null int sentinel before building filters and records, e.g. LEGACY_USER_ID = -1, and use that normalized value consistently in publish_active_generation, get_active_generation, and list_active_generations. Please add a regression test that publishes the same (collection, doc_id, parse_hash, None, model_tag) twice and asserts there is only one row and the second generation is returned.
Introduce nullable generation_id columns, active_generations table, and ActiveGenerationStore for crash-safe re-chunking. Keep legacy chunk/embedding merge keys until ingestion publishes generations in a follow-up PR.
…tsai#438 PR1) Address PR review on LanceDBActiveGenerationStore: - Cache _table_ensured so publish/get/list skip list_table_names after the first call. - Replace the extra get_active_generation() round-trip in publish_active_generation with an inline filter on the already-opened table while keeping created_at preserved across republishes. - Add regression tests for republish created_at semantics and ensure-table caching, plus apply ruff format to the generation schema test file.
Ensure nullable legacy user scopes use a non-null LanceDB merge key so active generation pointers remain queryable and idempotent.
Keep the active generation store accessor compatible with the coordinator-owned storage shim after rebasing onto main.
332b2b7 to
e3555fc
Compare
Introduce nullable generation_id columns, active_generations table, and ActiveGenerationStore for crash-safe re-chunking. Keep legacy chunk/embedding merge keys until ingestion publishes generations in a follow-up PR.