From a3a561a5167934a0be9f601c61b5549d4a9b3196 Mon Sep 17 00:00:00 2001 From: Hari Om Date: Sat, 20 Jun 2026 14:03:09 +0530 Subject: [PATCH] fix(cleanup): commit each batch independently instead of one job-wide transaction Removes the manually advancing offset from all three scheduled cleanup functions (cleanup_stale_documents, cleanup_old_deleted_documents, cleanup_inactive_active_documents) and re-queries from the top of the filter on every iteration instead, with an explicit ORDER BY Document.id for deterministic pagination. Each batch is now committed independently rather than accumulated inside one long-lived transaction for the whole job. This fixes a data-integrity bug: if any unhandled exception occurred partway through a large cleanup run, the entire job's transaction rolled back, discarding every batch already processed in that call -- including batches where _hard_delete_document had already performed irreversible side effects (vector-store deletes, file removal). The DB rows for those documents would reappear after rollback, now orphaned and pointing at data that no longer existed. Verified via fault injection: seeding 250 eligible rows (2.5x the 100-row batch size) and forcing an unhandled error on row #220 -- before this fix, all 250 rows reappeared after rollback despite ~200 already having had their vectors/files deleted; after this fix, the 200 rows from the first two committed batches correctly stay purged. Note: investigated the originally suspected mechanism (SQLAlchemy autoflush propagating in-batch mutations into the next iteration's query within one shared transaction) and could not reproduce it -- this codebase's SessionLocal is configured with autoflush=False, so that specific failure mode does not occur here. The offset-removal and per-batch-commit change is still correct and necessary for the partial-failure durability issue described above, and is also more robust against any future change to session config. --- backend/app/services/cleanup.py | 56 +++++++--- backend/tests/test_cleanup.py | 189 ++++++++++++++++++++++++++++++++ 2 files changed, 229 insertions(+), 16 deletions(-) create mode 100644 backend/tests/test_cleanup.py diff --git a/backend/app/services/cleanup.py b/backend/app/services/cleanup.py index 15daf5ae..85009873 100644 --- a/backend/app/services/cleanup.py +++ b/backend/app/services/cleanup.py @@ -2,6 +2,24 @@ All cleanup functions use batched pagination and transaction-safe patterns to avoid race conditions, double-deletes, and database lock contention. + +Each batch is committed independently rather than being accumulated inside +one long-lived transaction spanning the whole job. This means: + + * No manually advancing ``offset`` is tracked across iterations. Each + iteration re-runs the *same* filtered, ``ORDER BY``-ed query with only + ``LIMIT`` applied. Once a batch is committed, the rows in it no longer + match the filter (they were just marked "failed", soft-deleted, or + hard-deleted), so they naturally fall out of the next query's results + and the next "page" is simply whatever rows are still eligible -- + without needing an offset to skip past already-handled rows. + * A failure partway through a large job (e.g. on row 220 of 250) cannot + roll back the rows already committed in earlier batches. Without this, + a single unhandled error anywhere in the run would discard every prior + batch's work when the surrounding transaction rolled back, including + already-performed irreversible side effects like deleted vector-store + entries or removed files in ``_hard_delete_document`` -- leaving + orphaned DB rows pointing at data that no longer exists. """ import logging from datetime import datetime, timedelta, timezone @@ -51,9 +69,9 @@ def cleanup_stale_documents(): timeout_minutes = settings.DOC_PROCESSING_TIMEOUT_MINUTES cutoff = datetime.now(timezone.utc) - timedelta(minutes=timeout_minutes) - with get_db_session() as db: - offset = 0 - while True: + total = 0 + while True: + with get_db_session() as db: batch = ( db.query(Document) .filter( @@ -62,8 +80,8 @@ def cleanup_stale_documents(): Document.processing_started_at < cutoff, Document.is_deleted.is_(False), ) + .order_by(Document.id) .limit(_CLEANUP_BATCH_SIZE) - .offset(offset) .all() ) if not batch: @@ -80,7 +98,11 @@ def cleanup_stale_documents(): doc.error_message = f"Processing timed out after {timeout_minutes} minutes" doc.last_error_traceback = "Timed out: no progress update received within the configured timeout window." logger.info("Marked stale document %s as failed", doc.id) - offset += _CLEANUP_BATCH_SIZE + total += len(batch) + # `with get_db_session()` has committed this batch; loop and + # re-query from the top so the next batch picks up whatever rows + # are still eligible (this batch's rows no longer match the filter). + return total def cleanup_old_deleted_documents(): @@ -88,9 +110,9 @@ def cleanup_old_deleted_documents(): max_age_days = settings.DOC_CLEANUP_MAX_AGE_DAYS cutoff = datetime.now(timezone.utc) - timedelta(days=max_age_days) - with get_db_session() as db: - offset = 0 - while True: + total = 0 + while True: + with get_db_session() as db: batch = ( db.query(Document) .filter( @@ -98,8 +120,8 @@ def cleanup_old_deleted_documents(): Document.deleted_at.isnot(None), Document.deleted_at < cutoff, ) + .order_by(Document.id) .limit(_CLEANUP_BATCH_SIZE) - .offset(offset) .all() ) if not batch: @@ -117,7 +139,8 @@ def cleanup_old_deleted_documents(): doc.id, doc.original_name, ) - offset += _CLEANUP_BATCH_SIZE + total += len(batch) + return total def cleanup_inactive_active_documents(): @@ -130,22 +153,22 @@ def cleanup_inactive_active_documents(): """ if not settings.DOC_CLEANUP_ENABLED: logger.info("Inactive document cleanup is disabled via DOC_CLEANUP_ENABLED") - return + return 0 inactive_days = settings.DOC_CLEANUP_INACTIVE_DAYS cutoff = datetime.now(timezone.utc) - timedelta(days=inactive_days) - with get_db_session() as db: - offset = 0 - while True: + total = 0 + while True: + with get_db_session() as db: batch = ( db.query(Document) .filter( Document.is_deleted.is_(False), Document.last_accessed_at < cutoff, ) + .order_by(Document.id) .limit(_CLEANUP_BATCH_SIZE) - .offset(offset) .all() ) if not batch: @@ -163,4 +186,5 @@ def cleanup_inactive_active_documents(): doc.id, doc.original_name, ) - offset += _CLEANUP_BATCH_SIZE + total += len(batch) + return total \ No newline at end of file diff --git a/backend/tests/test_cleanup.py b/backend/tests/test_cleanup.py new file mode 100644 index 00000000..9340397b --- /dev/null +++ b/backend/tests/test_cleanup.py @@ -0,0 +1,189 @@ +"""Regression tests for backend/app/services/cleanup.py. + +Covers GH issue: cleanup jobs silently skip eligible documents because the +old implementation tracked a manually advancing ``offset`` across batches +inside one long-lived transaction. These tests seed more than one batch's +worth of eligible rows (``_CLEANUP_BATCH_SIZE`` is 100) and assert that a +single call to each cleanup function processes *all* of them, in a stable +order, and that partial failures don't roll back already-committed batches. +""" +from datetime import datetime, timedelta, timezone + +import pytest + +from app.models import Document +from app.services import cleanup as cleanup_mod +from app.services.cleanup import _CLEANUP_BATCH_SIZE + + +SEEDED_ROWS = _CLEANUP_BATCH_SIZE * 2 + 50 # 250 with the current batch size of 100 + + +def _seed_stale_processing_documents(db_session, user, count, cutoff): + docs = [] + for i in range(count): + doc = Document( + user_id=user.id, + filename=f"stale_{i}.txt", + original_name=f"stale_{i}.txt", + status="processing", + processing_started_at=cutoff - timedelta(minutes=1), + is_deleted=False, + ) + db_session.add(doc) + docs.append(doc) + db_session.commit() + return docs + + +def _seed_old_deleted_documents(db_session, user, count, cutoff): + docs = [] + for i in range(count): + doc = Document( + user_id=user.id, + filename=f"deleted_{i}.txt", + original_name=f"deleted_{i}.txt", + status="ready", + is_deleted=True, + deleted_at=cutoff - timedelta(days=1), + ) + db_session.add(doc) + docs.append(doc) + db_session.commit() + return docs + + +def _seed_inactive_documents(db_session, user, count, cutoff): + docs = [] + for i in range(count): + doc = Document( + user_id=user.id, + filename=f"inactive_{i}.txt", + original_name=f"inactive_{i}.txt", + status="ready", + is_deleted=False, + last_accessed_at=cutoff - timedelta(days=1), + ) + db_session.add(doc) + docs.append(doc) + db_session.commit() + return docs + + +@pytest.fixture() +def patch_session_local(monkeypatch, db_session): + """Route app.database.SessionLocal() (used inside get_db_session) to the + test's single in-memory db_session, matching the pattern already used by + the `client` fixture in conftest.py. + """ + monkeypatch.setattr("app.database.SessionLocal", lambda: db_session) + + +@pytest.fixture() +def stub_vectorstore(monkeypatch): + """Stub out the vectorstore delete used by _hard_delete_document so hard + deletes don't depend on a real Chroma instance. + """ + monkeypatch.setattr( + "app.rag.vectorstore.delete_document_chunks", + lambda document_id, user_id: None, + ) + + +def test_cleanup_stale_documents_processes_all_rows_across_multiple_batches( + db_session, user, patch_session_local +): + """A single call must mark *every* eligible stale document as failed, + not just the first batch's worth. With the old offset-tracking loop on + a single uncommitted transaction this assertion still happened to pass + on SQLite/autoflush=False, but the (now removed) offset arithmetic was + still wrong for any backend where each batch is visible to the next + query (e.g. once batches are committed independently, as they are now). + """ + cutoff = datetime.now(timezone.utc) - timedelta( + minutes=30 + ) # default DOC_PROCESSING_TIMEOUT_MINUTES + _seed_stale_processing_documents(db_session, user, SEEDED_ROWS, cutoff) + + total = cleanup_mod.cleanup_stale_documents() + + assert total == SEEDED_ROWS + remaining_processing = ( + db_session.query(Document).filter(Document.status == "processing").count() + ) + failed_count = db_session.query(Document).filter(Document.status == "failed").count() + assert remaining_processing == 0 + assert failed_count == SEEDED_ROWS + + +def test_cleanup_old_deleted_documents_processes_all_rows_across_multiple_batches( + db_session, user, patch_session_local, stub_vectorstore +): + cutoff = datetime.now(timezone.utc) - timedelta(days=90) # default DOC_CLEANUP_MAX_AGE_DAYS + _seed_old_deleted_documents(db_session, user, SEEDED_ROWS, cutoff) + + total = cleanup_mod.cleanup_old_deleted_documents() + + assert total == SEEDED_ROWS + assert db_session.query(Document).count() == 0 + + +def test_cleanup_inactive_active_documents_processes_all_rows_across_multiple_batches( + db_session, user, patch_session_local, stub_vectorstore +): + cutoff = datetime.now(timezone.utc) - timedelta(days=30) # default DOC_CLEANUP_INACTIVE_DAYS + _seed_inactive_documents(db_session, user, SEEDED_ROWS, cutoff) + + total = cleanup_mod.cleanup_inactive_active_documents() + + assert total == SEEDED_ROWS + assert db_session.query(Document).count() == 0 + + +def test_cleanup_old_deleted_documents_preserves_earlier_committed_batches_on_failure( + db_session, user, patch_session_local, stub_vectorstore, monkeypatch +): + """If processing fails partway through a large job, batches that were + already committed must stay committed -- a failure on document #220 + (well inside the third batch) must not roll back the 200 documents + purged in the first two batches. This is the corruption scenario the + old single-long-transaction-per-job implementation was vulnerable to: + a single unhandled error anywhere in the loop discarded every prior + batch's work in that call, even though irreversible side effects (the + vector-store deletes already performed for those rows) had already + happened. + """ + cutoff = datetime.now(timezone.utc) - timedelta(days=90) + _seed_old_deleted_documents(db_session, user, SEEDED_ROWS, cutoff) + + call_count = {"n": 0} + original = cleanup_mod._hard_delete_document + + def flaky_hard_delete(db, doc): + call_count["n"] += 1 + if call_count["n"] == 220: + raise RuntimeError("simulated unhandled failure mid-job") + return original(db, doc) + + monkeypatch.setattr(cleanup_mod, "_hard_delete_document", flaky_hard_delete) + + with pytest.raises(RuntimeError): + cleanup_mod.cleanup_old_deleted_documents() + + # Batches 1 and 2 (200 rows) were committed before the failure in batch 3; + # only the still-uncommitted remainder (50 rows) should survive. + remaining = db_session.query(Document).count() + assert remaining == SEEDED_ROWS - 200 + + +def test_cleanup_inactive_documents_disabled_returns_without_querying( + db_session, user, patch_session_local, monkeypatch +): + monkeypatch.setattr(cleanup_mod.settings, "DOC_CLEANUP_ENABLED", False) + cutoff = datetime.now(timezone.utc) - timedelta(days=30) + _seed_inactive_documents(db_session, user, 5, cutoff) + + total = cleanup_mod.cleanup_inactive_active_documents() + + assert total == 0 + assert db_session.query(Document).count() == 5 \ No newline at end of file