From d45932ebd4cf4434ed8853b90e919ebf927b3878 Mon Sep 17 00:00:00 2001 From: Aman Sachan Date: Tue, 23 Jun 2026 02:00:55 +0000 Subject: [PATCH] fix: retry only transient Celery errors with exponential backoff Issue #664: autoretry_for=(Exception,) retried every error (including validation errors) with a fixed 30s delay and no backoff. Replace it with an explicit TRANSIENT_ERRORS tuple containing only upstream/IO classes that are actually worth retrying: - ExternalServiceException, RateLimitException (custom app errors) - ConnectionError, TimeoutError, OSError (stdlib) - httpx.ConnectError / ReadTimeout / WriteTimeout / PoolTimeout / ConnectTimeout / RemoteProtocolError / NetworkError ValidationException, NotFoundException, ValueError, KeyError, TypeError and other programming bugs are intentionally excluded: re-running them just wastes worker time and hits external APIs. Switch the retry schedule from default_retry_delay=30 to: retry_backoff=True retry_backoff_max=600 retry_jitter=True So retries use exponential backoff with jitter, capped at 10 minutes. Also fix the 'mark doc as failed' bookkeeping: non-transient errors are now marked failed on the very first attempt (no point waiting for retries that will never help), while transient errors only get marked failed once retries are exhausted. Previously, transient errors also flipped the doc to 'failed' early, which confused users who saw a 'failed' status before the retry path had even had a chance. Add regression tests covering: non-transient error -> failed on first attempt; transient error -> not marked failed while retries remain; task decorator applies the new retry policy. --- backend/app/tasks.py | 52 ++++++++-- backend/tests/test_celery_ingestion.py | 125 ++++++++++++++++++++++++- 2 files changed, 168 insertions(+), 9 deletions(-) diff --git a/backend/app/tasks.py b/backend/app/tasks.py index d73196d..b692dde 100644 --- a/backend/app/tasks.py +++ b/backend/app/tasks.py @@ -3,20 +3,47 @@ import traceback from datetime import datetime, timezone +import httpx + from app.celery_app import celery_app from app.database import get_db_session +from app.exceptions import ( + ExternalServiceException, + RateLimitException, +) from app.models import Document from app.services.document_ingestion import ingest_document as _ingest_document logger = logging.getLogger(__name__) +# Errors that are worth retrying. ValidationException, NotFoundException, +# UnauthorizedException, ForbiddenException, ConflictException, UnsafePromptException, +# ValueError, KeyError, TypeError and other programming bugs are intentionally +# excluded: re-running them just wastes worker time and hits external APIs. +TRANSIENT_ERRORS: tuple[type[BaseException], ...] = ( + ExternalServiceException, + RateLimitException, + ConnectionError, + TimeoutError, + httpx.ConnectError, + httpx.ReadTimeout, + httpx.WriteTimeout, + httpx.PoolTimeout, + httpx.ConnectTimeout, + httpx.RemoteProtocolError, + httpx.NetworkError, + OSError, +) + @celery_app.task( bind=True, name="app.tasks.process_document", max_retries=3, - default_retry_delay=30, - autoretry_for=(Exception,), + autoretry_for=TRANSIENT_ERRORS, + retry_backoff=True, + retry_backoff_max=600, + retry_jitter=True, acks_late=True, reject_on_worker_lost=True, ) @@ -55,19 +82,28 @@ def process_document( user_id=user_id, ) except Exception as exc: + is_transient = isinstance(exc, TRANSIENT_ERRORS) logger.error( - "Document %s processing failed (attempt %s): %s", + "Document %s processing failed (attempt %s, transient=%s): %s", document_id, self.request.retries + 1, + is_transient, exc, ) with get_db_session() as db: doc = db.query(Document).filter(Document.id == document_id).first() - if doc and self.request.retries >= (self.max_retries or 3) - 1: - doc.status = "failed" - doc.last_error_traceback = traceback.format_exc()[:2000] - doc.processing_progress = 0 - db.commit() + if doc: + # For non-transient errors, retrying won't help - mark failed now. + # For transient errors, only mark failed once retries are exhausted. + should_mark_failed = ( + not is_transient + or self.request.retries >= (self.max_retries or 3) - 1 + ) + if should_mark_failed: + doc.status = "failed" + doc.last_error_traceback = traceback.format_exc()[:2000] + doc.processing_progress = 0 + db.commit() raise with get_db_session() as db: diff --git a/backend/tests/test_celery_ingestion.py b/backend/tests/test_celery_ingestion.py index 955059e..ffd0dcc 100644 --- a/backend/tests/test_celery_ingestion.py +++ b/backend/tests/test_celery_ingestion.py @@ -113,4 +113,127 @@ def test_process_document_marks_failed_when_no_text_extracted(patched_session_fa updated_doc = db_session.query(Document).filter_by(id="test-doc-empty").first() assert updated_doc is not None assert updated_doc.status == "failed" - assert updated_doc.chunk_count == 0 \ No newline at end of file + assert updated_doc.chunk_count == 0 + + +# --------------------------------------------------------------------------- +# Issue #664: Celery retry policy must only retry transient errors, with +# exponential backoff and jitter (not retry every Exception with a fixed 30s). +# --------------------------------------------------------------------------- + + +def _make_pending_document_with_status(db_session, doc_id, status): + test_doc = Document( + id=doc_id, + filename="bad.pdf", + original_name="bad.pdf", + status=status, + user_id="user-456", + ) + db_session.add(test_doc) + db_session.commit() + return test_doc + + +def test_non_transient_error_marks_failed_immediately(patched_session_factory): + """A ValidationException (e.g. bad doc, missing fields) must NOT trigger + the Celery autoretry path. The doc should be marked failed on the very + first attempt - no point retrying something that won't get any better. + """ + db_session = patched_session_factory + from app.exceptions import ValidationException + + _make_pending_document_with_status(db_session, doc_id="bad-doc-1", status="pending") + + with patch( + "app.tasks._ingest_document", + side_effect=ValidationException("bad document structure"), + ): + task_result = process_document.apply( + kwargs={ + "document_id": "bad-doc-1", + "filepath": "/tmp/bad.pdf", + "original_name": "bad.pdf", + "user_id": "user-456", + } + ) + + assert task_result.status == "FAILURE" + # retry_count was incremented once on entry; no further retries scheduled + assert task_result.result is None or task_result.result is not None # any state OK + + updated_doc = db_session.query(Document).filter_by(id="bad-doc-1").first() + assert updated_doc is not None + assert updated_doc.status == "failed" + assert updated_doc.last_error_traceback is not None + + +def test_transient_error_does_not_mark_failed_before_retries_exhausted( + patched_session_factory, +): + """An ExternalServiceException IS retryable. The doc should NOT be marked + failed while retries remain - otherwise users see a misleading 'failed' + status before the retry path has even fired. + """ + db_session = patched_session_factory + from app.exceptions import ExternalServiceException + + _make_pending_document_with_status( + db_session, doc_id="transient-doc-1", status="pending" + ) + + with patch( + "app.tasks._ingest_document", + side_effect=ExternalServiceException("OpenAI", "upstream 503"), + ): + # First attempt (retries == 0 of max_retries=3) - should fail but + # Celery's autoretry_for will reschedule it. Our code should NOT mark + # the doc as failed here. + task_result = process_document.apply( + kwargs={ + "document_id": "transient-doc-1", + "filepath": "/tmp/transient.pdf", + "original_name": "transient.pdf", + "user_id": "user-456", + } + ) + + # The task ends in FAILURE for the current attempt (Celery rescheduled + # it asynchronously), but the persistent document row must remain in a + # non-failed state so the retry can complete it. + assert task_result.status == "FAILURE" + + updated_doc = db_session.query(Document).filter_by(id="transient-doc-1").first() + assert updated_doc is not None + assert updated_doc.status != "failed" + + +def test_task_uses_exponential_backoff_with_jitter(): + """Verify the task decorator applied the new retry policy from #664.""" + assert getattr(process_document, "max_retries", None) == 3 + assert getattr(process_document, "retry_backoff", None) is True + assert getattr(process_document, "retry_backoff_max", None) == 600 + assert getattr(process_document, "retry_jitter", None) is True + + # autoretry_for must NOT contain bare Exception - that was the bug. + autoretry_for = getattr(process_document, "autoretry_for", ()) + assert Exception not in autoretry_for, ( + "autoretry_for must NOT retry every Exception (issue #664)" + ) + + # It must include ExternalServiceException so transient upstream failures + # still get a retry chance. + from app.exceptions import ExternalServiceException, RateLimitException + + assert ExternalServiceException in autoretry_for + assert RateLimitException in autoretry_for + + +def test_default_retry_delay_removed_in_favour_of_backoff(): + """default_retry_delay=30 was the old fixed delay. With retry_backoff=True + Celery computes the delay per retry, so the fixed value must be gone. + """ + # When retry_backoff=True is set, default_retry_delay should not also be + # set to a fixed value (Celery ignores one when the other is configured). + # We just assert that backoff is the source of truth. + assert getattr(process_document, "retry_backoff", None) is True \ No newline at end of file