diff --git a/backend/app/tasks.py b/backend/app/tasks.py index d73196d..b692dde 100644 --- a/backend/app/tasks.py +++ b/backend/app/tasks.py @@ -3,20 +3,47 @@ import traceback from datetime import datetime, timezone +import httpx + from app.celery_app import celery_app from app.database import get_db_session +from app.exceptions import ( + ExternalServiceException, + RateLimitException, +) from app.models import Document from app.services.document_ingestion import ingest_document as _ingest_document logger = logging.getLogger(__name__) +# Errors that are worth retrying. ValidationException, NotFoundException, +# UnauthorizedException, ForbiddenException, ConflictException, UnsafePromptException, +# ValueError, KeyError, TypeError and other programming bugs are intentionally +# excluded: re-running them just wastes worker time and hits external APIs. +TRANSIENT_ERRORS: tuple[type[BaseException], ...] = ( + ExternalServiceException, + RateLimitException, + ConnectionError, + TimeoutError, + httpx.ConnectError, + httpx.ReadTimeout, + httpx.WriteTimeout, + httpx.PoolTimeout, + httpx.ConnectTimeout, + httpx.RemoteProtocolError, + httpx.NetworkError, + OSError, +) + @celery_app.task( bind=True, name="app.tasks.process_document", max_retries=3, - default_retry_delay=30, - autoretry_for=(Exception,), + autoretry_for=TRANSIENT_ERRORS, + retry_backoff=True, + retry_backoff_max=600, + retry_jitter=True, acks_late=True, reject_on_worker_lost=True, ) @@ -55,19 +82,28 @@ def process_document( user_id=user_id, ) except Exception as exc: + is_transient = isinstance(exc, TRANSIENT_ERRORS) logger.error( - "Document %s processing failed (attempt %s): %s", + "Document %s processing failed (attempt %s, transient=%s): %s", document_id, self.request.retries + 1, + is_transient, exc, ) with get_db_session() as db: doc = db.query(Document).filter(Document.id == document_id).first() - if doc and self.request.retries >= (self.max_retries or 3) - 1: - doc.status = "failed" - doc.last_error_traceback = traceback.format_exc()[:2000] - doc.processing_progress = 0 - db.commit() + if doc: + # For non-transient errors, retrying won't help - mark failed now. + # For transient errors, only mark failed once retries are exhausted. + should_mark_failed = ( + not is_transient + or self.request.retries >= (self.max_retries or 3) - 1 + ) + if should_mark_failed: + doc.status = "failed" + doc.last_error_traceback = traceback.format_exc()[:2000] + doc.processing_progress = 0 + db.commit() raise with get_db_session() as db: diff --git a/backend/tests/test_celery_ingestion.py b/backend/tests/test_celery_ingestion.py index 955059e..ffd0dcc 100644 --- a/backend/tests/test_celery_ingestion.py +++ b/backend/tests/test_celery_ingestion.py @@ -113,4 +113,127 @@ def test_process_document_marks_failed_when_no_text_extracted(patched_session_fa updated_doc = db_session.query(Document).filter_by(id="test-doc-empty").first() assert updated_doc is not None assert updated_doc.status == "failed" - assert updated_doc.chunk_count == 0 \ No newline at end of file + assert updated_doc.chunk_count == 0 + + +# --------------------------------------------------------------------------- +# Issue #664: Celery retry policy must only retry transient errors, with +# exponential backoff and jitter (not retry every Exception with a fixed 30s). +# --------------------------------------------------------------------------- + + +def _make_pending_document_with_status(db_session, doc_id, status): + test_doc = Document( + id=doc_id, + filename="bad.pdf", + original_name="bad.pdf", + status=status, + user_id="user-456", + ) + db_session.add(test_doc) + db_session.commit() + return test_doc + + +def test_non_transient_error_marks_failed_immediately(patched_session_factory): + """A ValidationException (e.g. bad doc, missing fields) must NOT trigger + the Celery autoretry path. The doc should be marked failed on the very + first attempt - no point retrying something that won't get any better. + """ + db_session = patched_session_factory + from app.exceptions import ValidationException + + _make_pending_document_with_status(db_session, doc_id="bad-doc-1", status="pending") + + with patch( + "app.tasks._ingest_document", + side_effect=ValidationException("bad document structure"), + ): + task_result = process_document.apply( + kwargs={ + "document_id": "bad-doc-1", + "filepath": "/tmp/bad.pdf", + "original_name": "bad.pdf", + "user_id": "user-456", + } + ) + + assert task_result.status == "FAILURE" + # retry_count was incremented once on entry; no further retries scheduled + assert task_result.result is None or task_result.result is not None # any state OK + + updated_doc = db_session.query(Document).filter_by(id="bad-doc-1").first() + assert updated_doc is not None + assert updated_doc.status == "failed" + assert updated_doc.last_error_traceback is not None + + +def test_transient_error_does_not_mark_failed_before_retries_exhausted( + patched_session_factory, +): + """An ExternalServiceException IS retryable. The doc should NOT be marked + failed while retries remain - otherwise users see a misleading 'failed' + status before the retry path has even fired. + """ + db_session = patched_session_factory + from app.exceptions import ExternalServiceException + + _make_pending_document_with_status( + db_session, doc_id="transient-doc-1", status="pending" + ) + + with patch( + "app.tasks._ingest_document", + side_effect=ExternalServiceException("OpenAI", "upstream 503"), + ): + # First attempt (retries == 0 of max_retries=3) - should fail but + # Celery's autoretry_for will reschedule it. Our code should NOT mark + # the doc as failed here. + task_result = process_document.apply( + kwargs={ + "document_id": "transient-doc-1", + "filepath": "/tmp/transient.pdf", + "original_name": "transient.pdf", + "user_id": "user-456", + } + ) + + # The task ends in FAILURE for the current attempt (Celery rescheduled + # it asynchronously), but the persistent document row must remain in a + # non-failed state so the retry can complete it. + assert task_result.status == "FAILURE" + + updated_doc = db_session.query(Document).filter_by(id="transient-doc-1").first() + assert updated_doc is not None + assert updated_doc.status != "failed" + + +def test_task_uses_exponential_backoff_with_jitter(): + """Verify the task decorator applied the new retry policy from #664.""" + assert getattr(process_document, "max_retries", None) == 3 + assert getattr(process_document, "retry_backoff", None) is True + assert getattr(process_document, "retry_backoff_max", None) == 600 + assert getattr(process_document, "retry_jitter", None) is True + + # autoretry_for must NOT contain bare Exception - that was the bug. + autoretry_for = getattr(process_document, "autoretry_for", ()) + assert Exception not in autoretry_for, ( + "autoretry_for must NOT retry every Exception (issue #664)" + ) + + # It must include ExternalServiceException so transient upstream failures + # still get a retry chance. + from app.exceptions import ExternalServiceException, RateLimitException + + assert ExternalServiceException in autoretry_for + assert RateLimitException in autoretry_for + + +def test_default_retry_delay_removed_in_favour_of_backoff(): + """default_retry_delay=30 was the old fixed delay. With retry_backoff=True + Celery computes the delay per retry, so the fixed value must be gone. + """ + # When retry_backoff=True is set, default_retry_delay should not also be + # set to a fixed value (Celery ignores one when the other is configured). + # We just assert that backoff is the source of truth. + assert getattr(process_document, "retry_backoff", None) is True \ No newline at end of file