Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 40 additions & 16 deletions backend/app/services/cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,24 @@

All cleanup functions use batched pagination and transaction-safe patterns
to avoid race conditions, double-deletes, and database lock contention.

Each batch is committed independently rather than being accumulated inside
one long-lived transaction spanning the whole job. This means:

* No manually advancing ``offset`` is tracked across iterations. Each
iteration re-runs the *same* filtered, ``ORDER BY``-ed query with only
``LIMIT`` applied. Once a batch is committed, the rows in it no longer
match the filter (they were just marked "failed", soft-deleted, or
hard-deleted), so they naturally fall out of the next query's results
and the next "page" is simply whatever rows are still eligible --
without needing an offset to skip past already-handled rows.
* A failure partway through a large job (e.g. on row 220 of 250) cannot
roll back the rows already committed in earlier batches. Without this,
a single unhandled error anywhere in the run would discard every prior
batch's work when the surrounding transaction rolled back, including
already-performed irreversible side effects like deleted vector-store
entries or removed files in ``_hard_delete_document`` -- leaving
orphaned DB rows pointing at data that no longer exists.
"""
import logging
from datetime import datetime, timedelta, timezone
Expand Down Expand Up @@ -51,9 +69,9 @@ def cleanup_stale_documents():
timeout_minutes = settings.DOC_PROCESSING_TIMEOUT_MINUTES
cutoff = datetime.now(timezone.utc) - timedelta(minutes=timeout_minutes)

with get_db_session() as db:
offset = 0
while True:
total = 0
while True:
with get_db_session() as db:
batch = (
db.query(Document)
.filter(
Expand All @@ -62,8 +80,8 @@ def cleanup_stale_documents():
Document.processing_started_at < cutoff,
Document.is_deleted.is_(False),
)
.order_by(Document.id)
.limit(_CLEANUP_BATCH_SIZE)
.offset(offset)
.all()
)
if not batch:
Expand All @@ -80,26 +98,30 @@ def cleanup_stale_documents():
doc.error_message = f"Processing timed out after {timeout_minutes} minutes"
doc.last_error_traceback = "Timed out: no progress update received within the configured timeout window."
logger.info("Marked stale document %s as failed", doc.id)
offset += _CLEANUP_BATCH_SIZE
total += len(batch)
# `with get_db_session()` has committed this batch; loop and
# re-query from the top so the next batch picks up whatever rows
# are still eligible (this batch's rows no longer match the filter).
return total


def cleanup_old_deleted_documents():
"""Permanently delete documents soft-deleted beyond the max age."""
max_age_days = settings.DOC_CLEANUP_MAX_AGE_DAYS
cutoff = datetime.now(timezone.utc) - timedelta(days=max_age_days)

with get_db_session() as db:
offset = 0
while True:
total = 0
while True:
with get_db_session() as db:
batch = (
db.query(Document)
.filter(
Document.is_deleted.is_(True),
Document.deleted_at.isnot(None),
Document.deleted_at < cutoff,
)
.order_by(Document.id)
.limit(_CLEANUP_BATCH_SIZE)
.offset(offset)
.all()
)
if not batch:
Expand All @@ -117,7 +139,8 @@ def cleanup_old_deleted_documents():
doc.id,
doc.original_name,
)
offset += _CLEANUP_BATCH_SIZE
total += len(batch)
return total


def cleanup_inactive_active_documents():
Expand All @@ -130,22 +153,22 @@ def cleanup_inactive_active_documents():
"""
if not settings.DOC_CLEANUP_ENABLED:
logger.info("Inactive document cleanup is disabled via DOC_CLEANUP_ENABLED")
return
return 0

inactive_days = settings.DOC_CLEANUP_INACTIVE_DAYS
cutoff = datetime.now(timezone.utc) - timedelta(days=inactive_days)

with get_db_session() as db:
offset = 0
while True:
total = 0
while True:
with get_db_session() as db:
batch = (
db.query(Document)
.filter(
Document.is_deleted.is_(False),
Document.last_accessed_at < cutoff,
)
.order_by(Document.id)
.limit(_CLEANUP_BATCH_SIZE)
.offset(offset)
.all()
)
if not batch:
Expand All @@ -163,4 +186,5 @@ def cleanup_inactive_active_documents():
doc.id,
doc.original_name,
)
offset += _CLEANUP_BATCH_SIZE
total += len(batch)
return total
189 changes: 189 additions & 0 deletions backend/tests/test_cleanup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
"""Regression tests for backend/app/services/cleanup.py.

Covers GH issue: cleanup jobs silently skip eligible documents because the
old implementation tracked a manually advancing ``offset`` across batches
inside one long-lived transaction. These tests seed more than one batch's
worth of eligible rows (``_CLEANUP_BATCH_SIZE`` is 100) and assert that a
single call to each cleanup function processes *all* of them, in a stable
order, and that partial failures don't roll back already-committed batches.
"""
from datetime import datetime, timedelta, timezone

import pytest

from app.models import Document
from app.services import cleanup as cleanup_mod
from app.services.cleanup import _CLEANUP_BATCH_SIZE


SEEDED_ROWS = _CLEANUP_BATCH_SIZE * 2 + 50 # 250 with the current batch size of 100


def _seed_stale_processing_documents(db_session, user, count, cutoff):
docs = []
for i in range(count):
doc = Document(
user_id=user.id,
filename=f"stale_{i}.txt",
original_name=f"stale_{i}.txt",
status="processing",
processing_started_at=cutoff - timedelta(minutes=1),
is_deleted=False,
)
db_session.add(doc)
docs.append(doc)
db_session.commit()
return docs


def _seed_old_deleted_documents(db_session, user, count, cutoff):
docs = []
for i in range(count):
doc = Document(
user_id=user.id,
filename=f"deleted_{i}.txt",
original_name=f"deleted_{i}.txt",
status="ready",
is_deleted=True,
deleted_at=cutoff - timedelta(days=1),
)
db_session.add(doc)
docs.append(doc)
db_session.commit()
return docs


def _seed_inactive_documents(db_session, user, count, cutoff):
docs = []
for i in range(count):
doc = Document(
user_id=user.id,
filename=f"inactive_{i}.txt",
original_name=f"inactive_{i}.txt",
status="ready",
is_deleted=False,
last_accessed_at=cutoff - timedelta(days=1),
)
db_session.add(doc)
docs.append(doc)
db_session.commit()
return docs


@pytest.fixture()
def patch_session_local(monkeypatch, db_session):
"""Route app.database.SessionLocal() (used inside get_db_session) to the
test's single in-memory db_session, matching the pattern already used by
the `client` fixture in conftest.py.
"""
monkeypatch.setattr("app.database.SessionLocal", lambda: db_session)


@pytest.fixture()
def stub_vectorstore(monkeypatch):
"""Stub out the vectorstore delete used by _hard_delete_document so hard
deletes don't depend on a real Chroma instance.
"""
monkeypatch.setattr(
"app.rag.vectorstore.delete_document_chunks",
lambda document_id, user_id: None,
)


def test_cleanup_stale_documents_processes_all_rows_across_multiple_batches(
db_session, user, patch_session_local
):
"""A single call must mark *every* eligible stale document as failed,
not just the first batch's worth. With the old offset-tracking loop on
a single uncommitted transaction this assertion still happened to pass
on SQLite/autoflush=False, but the (now removed) offset arithmetic was
still wrong for any backend where each batch is visible to the next
query (e.g. once batches are committed independently, as they are now).
"""
cutoff = datetime.now(timezone.utc) - timedelta(
minutes=30
) # default DOC_PROCESSING_TIMEOUT_MINUTES
_seed_stale_processing_documents(db_session, user, SEEDED_ROWS, cutoff)

total = cleanup_mod.cleanup_stale_documents()

assert total == SEEDED_ROWS
remaining_processing = (
db_session.query(Document).filter(Document.status == "processing").count()
)
failed_count = db_session.query(Document).filter(Document.status == "failed").count()
assert remaining_processing == 0
assert failed_count == SEEDED_ROWS


def test_cleanup_old_deleted_documents_processes_all_rows_across_multiple_batches(
db_session, user, patch_session_local, stub_vectorstore
):
cutoff = datetime.now(timezone.utc) - timedelta(days=90) # default DOC_CLEANUP_MAX_AGE_DAYS
_seed_old_deleted_documents(db_session, user, SEEDED_ROWS, cutoff)

total = cleanup_mod.cleanup_old_deleted_documents()

assert total == SEEDED_ROWS
assert db_session.query(Document).count() == 0


def test_cleanup_inactive_active_documents_processes_all_rows_across_multiple_batches(
db_session, user, patch_session_local, stub_vectorstore
):
cutoff = datetime.now(timezone.utc) - timedelta(days=30) # default DOC_CLEANUP_INACTIVE_DAYS
_seed_inactive_documents(db_session, user, SEEDED_ROWS, cutoff)

total = cleanup_mod.cleanup_inactive_active_documents()

assert total == SEEDED_ROWS
assert db_session.query(Document).count() == 0


def test_cleanup_old_deleted_documents_preserves_earlier_committed_batches_on_failure(
db_session, user, patch_session_local, stub_vectorstore, monkeypatch
):
"""If processing fails partway through a large job, batches that were
already committed must stay committed -- a failure on document #220
(well inside the third batch) must not roll back the 200 documents
purged in the first two batches. This is the corruption scenario the
old single-long-transaction-per-job implementation was vulnerable to:
a single unhandled error anywhere in the loop discarded every prior
batch's work in that call, even though irreversible side effects (the
vector-store deletes already performed for those rows) had already
happened.
"""
cutoff = datetime.now(timezone.utc) - timedelta(days=90)
_seed_old_deleted_documents(db_session, user, SEEDED_ROWS, cutoff)

call_count = {"n": 0}
original = cleanup_mod._hard_delete_document

def flaky_hard_delete(db, doc):
call_count["n"] += 1
if call_count["n"] == 220:
raise RuntimeError("simulated unhandled failure mid-job")
return original(db, doc)

monkeypatch.setattr(cleanup_mod, "_hard_delete_document", flaky_hard_delete)

with pytest.raises(RuntimeError):
cleanup_mod.cleanup_old_deleted_documents()

# Batches 1 and 2 (200 rows) were committed before the failure in batch 3;
# only the still-uncommitted remainder (50 rows) should survive.
remaining = db_session.query(Document).count()
assert remaining == SEEDED_ROWS - 200


def test_cleanup_inactive_documents_disabled_returns_without_querying(
db_session, user, patch_session_local, monkeypatch
):
monkeypatch.setattr(cleanup_mod.settings, "DOC_CLEANUP_ENABLED", False)
cutoff = datetime.now(timezone.utc) - timedelta(days=30)
_seed_inactive_documents(db_session, user, 5, cutoff)

total = cleanup_mod.cleanup_inactive_active_documents()

assert total == 0
assert db_session.query(Document).count() == 5
Loading