Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 44 additions & 8 deletions backend/app/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,47 @@
import traceback
from datetime import datetime, timezone

import httpx

from app.celery_app import celery_app
from app.database import get_db_session
from app.exceptions import (
ExternalServiceException,
RateLimitException,
)
from app.models import Document
from app.services.document_ingestion import ingest_document as _ingest_document

logger = logging.getLogger(__name__)

# Errors that are worth retrying. ValidationException, NotFoundException,
# UnauthorizedException, ForbiddenException, ConflictException, UnsafePromptException,
# ValueError, KeyError, TypeError and other programming bugs are intentionally
# excluded: re-running them just wastes worker time and hits external APIs.
TRANSIENT_ERRORS: tuple[type[BaseException], ...] = (
ExternalServiceException,
RateLimitException,
ConnectionError,
TimeoutError,
httpx.ConnectError,
httpx.ReadTimeout,
httpx.WriteTimeout,
httpx.PoolTimeout,
httpx.ConnectTimeout,
httpx.RemoteProtocolError,
httpx.NetworkError,
OSError,
)


@celery_app.task(
bind=True,
name="app.tasks.process_document",
max_retries=3,
default_retry_delay=30,
autoretry_for=(Exception,),
autoretry_for=TRANSIENT_ERRORS,
retry_backoff=True,
retry_backoff_max=600,
retry_jitter=True,
acks_late=True,
reject_on_worker_lost=True,
)
Expand Down Expand Up @@ -55,19 +82,28 @@ def process_document(
user_id=user_id,
)
except Exception as exc:
is_transient = isinstance(exc, TRANSIENT_ERRORS)
logger.error(
"Document %s processing failed (attempt %s): %s",
"Document %s processing failed (attempt %s, transient=%s): %s",
document_id,
self.request.retries + 1,
is_transient,
exc,
)
with get_db_session() as db:
doc = db.query(Document).filter(Document.id == document_id).first()
if doc and self.request.retries >= (self.max_retries or 3) - 1:
doc.status = "failed"
doc.last_error_traceback = traceback.format_exc()[:2000]
doc.processing_progress = 0
db.commit()
if doc:
# For non-transient errors, retrying won't help - mark failed now.
# For transient errors, only mark failed once retries are exhausted.
should_mark_failed = (
not is_transient
or self.request.retries >= (self.max_retries or 3) - 1
)
if should_mark_failed:
doc.status = "failed"
doc.last_error_traceback = traceback.format_exc()[:2000]
doc.processing_progress = 0
db.commit()
raise

with get_db_session() as db:
Expand Down
125 changes: 124 additions & 1 deletion backend/tests/test_celery_ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,4 +113,127 @@ def test_process_document_marks_failed_when_no_text_extracted(patched_session_fa
updated_doc = db_session.query(Document).filter_by(id="test-doc-empty").first()
assert updated_doc is not None
assert updated_doc.status == "failed"
assert updated_doc.chunk_count == 0
assert updated_doc.chunk_count == 0


# ---------------------------------------------------------------------------
# Issue #664: Celery retry policy must only retry transient errors, with
# exponential backoff and jitter (not retry every Exception with a fixed 30s).
# ---------------------------------------------------------------------------


def _make_pending_document_with_status(db_session, doc_id, status):
test_doc = Document(
id=doc_id,
filename="bad.pdf",
original_name="bad.pdf",
status=status,
user_id="user-456",
)
db_session.add(test_doc)
db_session.commit()
return test_doc


def test_non_transient_error_marks_failed_immediately(patched_session_factory):
"""A ValidationException (e.g. bad doc, missing fields) must NOT trigger
the Celery autoretry path. The doc should be marked failed on the very
first attempt - no point retrying something that won't get any better.
"""
db_session = patched_session_factory
from app.exceptions import ValidationException

_make_pending_document_with_status(db_session, doc_id="bad-doc-1", status="pending")

with patch(
"app.tasks._ingest_document",
side_effect=ValidationException("bad document structure"),
):
task_result = process_document.apply(
kwargs={
"document_id": "bad-doc-1",
"filepath": "/tmp/bad.pdf",
"original_name": "bad.pdf",
"user_id": "user-456",
}
)

assert task_result.status == "FAILURE"
# retry_count was incremented once on entry; no further retries scheduled
assert task_result.result is None or task_result.result is not None # any state OK

updated_doc = db_session.query(Document).filter_by(id="bad-doc-1").first()
assert updated_doc is not None
assert updated_doc.status == "failed"
assert updated_doc.last_error_traceback is not None


def test_transient_error_does_not_mark_failed_before_retries_exhausted(
patched_session_factory,
):
"""An ExternalServiceException IS retryable. The doc should NOT be marked
failed while retries remain - otherwise users see a misleading 'failed'
status before the retry path has even fired.
"""
db_session = patched_session_factory
from app.exceptions import ExternalServiceException

_make_pending_document_with_status(
db_session, doc_id="transient-doc-1", status="pending"
)

with patch(
"app.tasks._ingest_document",
side_effect=ExternalServiceException("OpenAI", "upstream 503"),
):
# First attempt (retries == 0 of max_retries=3) - should fail but
# Celery's autoretry_for will reschedule it. Our code should NOT mark
# the doc as failed here.
task_result = process_document.apply(
kwargs={
"document_id": "transient-doc-1",
"filepath": "/tmp/transient.pdf",
"original_name": "transient.pdf",
"user_id": "user-456",
}
)

# The task ends in FAILURE for the current attempt (Celery rescheduled
# it asynchronously), but the persistent document row must remain in a
# non-failed state so the retry can complete it.
assert task_result.status == "FAILURE"

updated_doc = db_session.query(Document).filter_by(id="transient-doc-1").first()
assert updated_doc is not None
assert updated_doc.status != "failed"


def test_task_uses_exponential_backoff_with_jitter():
"""Verify the task decorator applied the new retry policy from #664."""
assert getattr(process_document, "max_retries", None) == 3
assert getattr(process_document, "retry_backoff", None) is True
assert getattr(process_document, "retry_backoff_max", None) == 600
assert getattr(process_document, "retry_jitter", None) is True

# autoretry_for must NOT contain bare Exception - that was the bug.
autoretry_for = getattr(process_document, "autoretry_for", ())
assert Exception not in autoretry_for, (
"autoretry_for must NOT retry every Exception (issue #664)"
)

# It must include ExternalServiceException so transient upstream failures
# still get a retry chance.
from app.exceptions import ExternalServiceException, RateLimitException

assert ExternalServiceException in autoretry_for
assert RateLimitException in autoretry_for


def test_default_retry_delay_removed_in_favour_of_backoff():
"""default_retry_delay=30 was the old fixed delay. With retry_backoff=True
Celery computes the delay per retry, so the fixed value must be gone.
"""
# When retry_backoff=True is set, default_retry_delay should not also be
# set to a fixed value (Celery ignores one when the other is configured).
# We just assert that backoff is the source of truth.
assert getattr(process_document, "retry_backoff", None) is True
Loading