Skip to content

ref(kb): split web rollback callbacks into per-boundary compensation steps#613

Open
rogercloud wants to merge 7 commits into
xorbitsai:mainfrom
rogercloud:ref/kb-structured-rollback-compensation
Open

ref(kb): split web rollback callbacks into per-boundary compensation steps#613
rogercloud wants to merge 7 commits into
xorbitsai:mainfrom
rogercloud:ref/kb-structured-rollback-compensation

Conversation

@rogercloud

Copy link
Copy Markdown
Collaborator

Closes #608.

Split the four monolithic production web rollback callbacks into per-boundary compensation callbacks registered via record_side_effect(compensation=...).

Changes

  • Add per-boundary compensation factory functions: FILE (delete/restore), DOCUMENT, STATUS, SNAPSHOT
  • Refactor four web handler functions to return per-boundary callbacks instead of monolithic closures
  • Rewrite _run_file_handler_compensation to execute per-boundary callbacks independently and mark compensated planes
  • Remove mark_web_page_compensation_coverage, _compensated_side_effect_planes, WEB_ROLLBACK_COMPENSATED_PLANES_KEY, and _WEB_FAILED_INGEST_CLEANUP_COMPENSATED_PLANES
  • Custom rollback_on_failure callbacks remain supported via legacy path without auto-marking other planes

Tests

  • Per-boundary compensation execution (all succeed + partial failure)
  • Custom callback conservative behavior (no auto-coverage)
  • Cleanup decision derived from operation outcome (no opaque metadata)

…steps

Split the four monolithic production web rollback callbacks
(_rollback_existing, _rollback_new_web_file, _rollback_refresh,
_rollback_recreate) into per-boundary compensation callbacks
registered via record_side_effect(compensation=...).

- Add per-boundary compensation factory functions in web/api/kb.py:
  FILE (delete/restore), DOCUMENT, STATUS, SNAPSHOT
- Refactor four web handler functions to return per-boundary
  callbacks instead of monolithic closures
- Rewrite _run_file_handler_compensation to execute per-boundary
  callbacks independently and mark compensated planes
- Remove mark_web_page_compensation_coverage,
  _compensated_side_effect_planes,
  WEB_ROLLBACK_COMPENSATED_PLANES_KEY, and
  _WEB_FAILED_INGEST_CLEANUP_COMPENSATED_PLANES
- Custom rollback_on_failure callbacks remain supported via
  legacy path without auto-marking other planes
- Add tests for per-boundary compensation, partial failure,
  and cleanup decision from operation outcome

Closes xorbitsai#608
Move cleanup_decision test from test_pipeline_compatibility.py
to test_api_compatibility.py where it belongs. Parameterize
two per-boundary compensation tests into one using pytest
parametrize (all_succeed / document_fails cases). Remove
duplicate assertions and unused imports.

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request refactors the web ingestion rollback mechanism by replacing the monolithic rollback callback with a structured, per-boundary compensation system across FILE, DOCUMENT, STATUS, and SNAPSHOT boundaries. Feedback on these changes highlights a copy-paste error in the SNAPSHOT boundary block that incorrectly marks the FILE plane as compensated, as well as a bug in the file restoration compensation when handling recreated files that did not previously exist. Additionally, minor type annotation corrections are suggested for the document and status compensation factories.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Comment thread src/xagent/core/tools/core/RAG_tools/pipelines/web_ingestion.py Outdated
Comment thread src/xagent/web/api/kb.py
Comment thread src/xagent/web/api/kb.py Outdated
Comment thread src/xagent/web/api/kb.py
Comment thread src/xagent/web/api/kb.py Outdated
Comment thread src/xagent/web/api/kb.py Outdated

@qinxuye qinxuye left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found two non-duplicate blockers in addition to the existing Gemini threads.

Comment thread src/xagent/core/tools/core/RAG_tools/pipelines/web_ingestion.py
Comment thread src/xagent/web/api/kb.py
Add SideEffectPlane.SNAPSHOT to track backup file cleanup
as a structured compensation boundary. Fix copy-paste error
where SNAPSHOT block incorrectly marked FILE plane.

Fix _create_file_compensation_restore to accept optional
backup_path and had_existing_file parameter, handling the
recreate scenario where the file did not exist before.

Fix return type annotations on _create_document_compensation
and _create_status_compensation factory functions.

Register SNAPSHOT side effect via record_side_effect and
execute via execute_compensations for proper tracking.
…guard

Add docstring explaining the three behaviors of
_restore_ingest_file_backup (restore / error / cleanup)
to prevent confusion about the function's purpose.

Restore guard on sync_existing call in
_create_file_compensation_restore: skip durable sync
when backup_path is None (file did not exist before).
…ollback_on_failure compat

Mark PARSE/CHUNK/EMBEDDING planes as compensated alongside
DOCUMENT, since delete_document() cascades to those tables.
This matches the old coverage behavior from
_WEB_FAILED_INGEST_CLEANUP_COMPENSATED_PLANES.

Restore rollback_on_failure compat wrappers in all four web
file handler functions. Each wrapper delegates to the
per-boundary callbacks internally. The pipeline now
prioritizes per-boundary compensation when present, falling
back to legacy rollback_on_failure for custom callbacks only.

Add tests for cascaded plane marking and compat wrapper
delegation.
rogercloud

This comment was marked as abuse.

@rogercloud

Copy link
Copy Markdown
Collaborator Author

/gemini review

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request refactors the web ingestion rollback mechanism by introducing a per-boundary compensation system (FILE, DOCUMENT, STATUS, and SNAPSHOT boundaries) that takes priority over the legacy monolithic rollback callback. It updates the file handler results, pipeline compatibility facade, and API endpoints to construct and execute these granular compensation callbacks, supported by new unit tests. The review feedback suggests defensive guards to prevent potential AttributeErrors when page_operation is None or when ingestion_result is a dictionary, and recommends consolidating redundant inline imports of SideEffectPlane within the compensation runner.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Comment on lines +295 to +323
# SNAPSHOT boundary
snapshot_compensation = cast(
Optional[FileHandlerCallback], file_info.get("snapshot_compensation")
)
if snapshot_compensation is not None:
from ..kb.operation_compatibility import SideEffectPlane

page_operation.record_side_effect(
name="cleanup_backup_file",
plane=SideEffectPlane.SNAPSHOT,
payload={
"collection": collection,
"url": url,
"file_id": cast(Optional[str], file_info.get("file_id")),
},
idempotency_key=f"snapshot:{collection}:{cast(Optional[str], file_info.get('file_id')) or url}",
compensation=snapshot_compensation,
)
errors = page_operation.execute_compensations(planes={SideEffectPlane.SNAPSHOT})
if not errors:
succeeded.add(SideEffectPlane.SNAPSHOT)
else:
_handle_boundary_failure(
warnings,
url,
"SNAPSHOT",
errors[0],
rollback_context,
)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

If page_operation is None (which can happen if the operation compatibility facade is disabled or inactive), calling page_operation.record_side_effect and page_operation.execute_compensations will raise an AttributeError. Guard this block defensively by checking if page_operation is not None. If it is None, execute snapshot_compensation() directly.

    # SNAPSHOT boundary
    snapshot_compensation = cast(
        Optional[FileHandlerCallback], file_info.get("snapshot_compensation")
    )
    if snapshot_compensation is not None:
        from ..kb.operation_compatibility import SideEffectPlane

        if page_operation is not None:
            page_operation.record_side_effect(
                name="cleanup_backup_file",
                plane=SideEffectPlane.SNAPSHOT,
                payload={
                    "collection": collection,
                    "url": url,
                    "file_id": cast(Optional[str], file_info.get("file_id")),
                },
                idempotency_key=f"snapshot:{collection}:{cast(Optional[str], file_info.get('file_id')) or url}",
                compensation=snapshot_compensation,
            )
            errors = page_operation.execute_compensations(planes={SideEffectPlane.SNAPSHOT})
            if not errors:
                succeeded.add(SideEffectPlane.SNAPSHOT)
            else:
                _handle_boundary_failure(
                    warnings,
                    url,
                    "SNAPSHOT",
                    errors[0],
                    rollback_context,
                )
        else:
            try:
                snapshot_compensation()
            except Exception as exc:
                _handle_boundary_failure(
                    warnings,
                    url,
                    "SNAPSHOT",
                    exc,
                    rollback_context,
                )

Comment on lines +226 to +237
def _run_per_boundary_compensation(
*,
pipeline_facade: "KBPipelineCompatibilityFacade",
page_operation: Any,
file_info: FileHandlerResult,
collection: str,
url: str,
warnings: list[str],
ingestion_result: Optional[IngestionResult] = None,
) -> Optional[str]:
rollback_context = _rollback_context_payload(file_info)
succeeded: set[Any] = set()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The SideEffectPlane class is imported inline multiple times inside this function (lines 249, 267, 291, and 300). To improve maintainability and avoid redundant imports, import it once at the top of the function.

Suggested change
def _run_per_boundary_compensation(
*,
pipeline_facade: "KBPipelineCompatibilityFacade",
page_operation: Any,
file_info: FileHandlerResult,
collection: str,
url: str,
warnings: list[str],
ingestion_result: Optional[IngestionResult] = None,
) -> Optional[str]:
rollback_context = _rollback_context_payload(file_info)
succeeded: set[Any] = set()
def _run_per_boundary_compensation(
*,
pipeline_facade: "KBPipelineCompatibilityFacade",
page_operation: Any,
file_info: FileHandlerResult,
collection: str,
url: str,
warnings: list[str],
ingestion_result: Optional[IngestionResult] = None,
) -> Optional[str]:
from ..kb.operation_compatibility import SideEffectPlane
rollback_context = _rollback_context_payload(file_info)
succeeded: set[Any] = set()

Comment thread src/xagent/web/api/kb.py
Comment on lines +305 to +312
elif ingestion_result is not None:
doc_id = (
ingestion_result.doc_id
if isinstance(ingestion_result.doc_id, str)
and ingestion_result.doc_id
else None
)
if doc_id:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

If ingestion_result is passed as a dictionary (e.g., in legacy pipelines or test mocks), accessing ingestion_result.doc_id directly will raise an AttributeError. Use getattr or check if it is a dictionary to safely retrieve doc_id.

Suggested change
elif ingestion_result is not None:
doc_id = (
ingestion_result.doc_id
if isinstance(ingestion_result.doc_id, str)
and ingestion_result.doc_id
else None
)
if doc_id:
elif ingestion_result is not None:
if isinstance(ingestion_result, dict):
raw_doc_id = ingestion_result.get("doc_id")
else:
raw_doc_id = getattr(ingestion_result, "doc_id", None)
doc_id = raw_doc_id if isinstance(raw_doc_id, str) and raw_doc_id else None
if doc_id:

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

ref(kb): split web rollback callbacks into structured compensation steps

3 participants