Skip to content

Decouple long-running digest generation from the web process (job queue + worker) #5

@nmogil

Description

@nmogil

Summary

Digest generation is a multi-minute, OpenAI-heavy pipeline, but it currently runs inside the FastAPI web process as a BackgroundTasks job. This couples job execution to the lifecycle of the web server and is the root cause of a cluster of operational workarounds. We should move digest generation to a dedicated worker consuming a durable job queue, leaving the API responsible only for enqueueing work and reporting status.

Current state (verified)

  • POST /generate-digest (src/main.py:289-361) creates a digest_tasks row with status PENDING, then calls background_tasks.add_task(safe_background_task, ...) and returns a task_id immediately.
  • safe_background_task (src/main.py:245-263) wraps the real work (digest_service_enhanced.generate_digest) in asyncio.timeout(TASK_TIMEOUT)the entire pipeline runs in the web process's event loop.
  • TASK_TIMEOUT default is 600s (fly.toml [env]), with REQUEST_TIMEOUT deliberately set just under it to dodge a platform edge timeout.
  • The fact that this is really a worker, not a web request, leaks everywhere:
    • fly.toml: auto_stop_machines = false with the comment "Don't auto-stop: it was killing in-flight background digests mid-run" and min_machines_running = 1 "Keep one machine warm so the daily digest run isn't stopped mid-flight."
    • src/graceful_shutdown.py exists largely to track in-flight requests and wait up to 30s on SIGTERM so digests aren't killed; /generate-digest wraps the background task in SHUTDOWN_HANDLER.track_request(...) (src/main.py:331-343).
    • The VM is sized at 4GB / 2 shared CPUs (fly.toml [[vm]]) — sized for the worker workload, not the API.

Why this matters

  • Lost work on deploy/restart/crash. Any machine restart (deploy, OOM, platform migration) mid-digest loses the job; there is no retry/redelivery. The whole daily run for all users can be silently dropped.
  • Can't scale the two concerns independently. The API is trivial; the pipeline is heavy. Today they share a process and a machine.
  • Forced to keep a machine warm 24/7 (min_machines_running = 1) purely so background jobs survive, defeating scale-to-zero economics.
  • Concurrency is implicit. N simultaneous /generate-digest calls = N heavy pipelines on one event loop competing for CPU and the OpenAI rate limit, with no backpressure.

Proposed approach

Pick a queue mechanism and split responsibilities. Options, roughly in increasing order of effort:

  1. Lightweight, no new infra (recommended first step): Use the existing digest_tasks table as the queue. The API inserts a PENDING row (already does). A separate worker process (new entrypoint, e.g. src/worker.py) polls for PENDING rows using SELECT ... FOR UPDATE SKIP LOCKED (via Supabase/Postgres RPC) or an atomic status flip PENDING -> RUNNING with a claimed_at timestamp, runs the pipeline, and writes COMPLETED/FAILED. Add a attempts column and a reaper that re-queues rows stuck in RUNNING past a lease timeout. Deploy the worker as a second Fly process ([processes] in fly.toml) or a separate Fly app.
  2. Dedicated queue: Redis + RQ/Celery/arq, or a hosted queue. More moving parts; only worth it if (1) proves insufficient.

Regardless of mechanism:

  • The API process should no longer run the pipeline. /generate-digest becomes enqueue-only.
  • The worker owns timeouts, retries (with backoff + max attempts), and idempotency (don't double-process a task_id).
  • Re-evaluate whether graceful_shutdown.py's request-draining is still needed once jobs are durable — the worker should be safe to kill because an interrupted job is just re-leased.
  • Re-evaluate fly.toml: API can scale to zero / small VM; worker is the thing that needs to stay alive (or be a scheduled/queue-driven process).

Files likely involved

  • src/main.py (strip background execution from /generate-digest; keep enqueue + status endpoints)
  • src/digest_service_enhanced.py (becomes the worker's callable; should be free of web-process assumptions)
  • New src/worker.py (poll/claim/execute loop)
  • src/graceful_shutdown.py (likely simplify or remove)
  • src/state_supabase.py (add claim/lease/attempts helpers; see also the migrations issue)
  • fly.toml (add worker process or second app; revisit VM sizing + auto-stop)
  • Dockerfile / compose (a way to start the worker entrypoint)
  • Coordinate with the n8n orchestrator: workflow 2 (2_Generate_Digest, id ilWr3k5rSwCHRFXc) POSTs /generate-digest and expects an immediate task_id + an eventual callback. The enqueue/return contract must stay compatible, and the completion webhook callback must still fire from the worker (currently sent at the end of generate_digest). Verify against recent executions with the mcp__n8n-mcp__* tools before changing the callback timing.

Acceptance criteria

  • Digest generation runs in a process separate from the HTTP server.
  • A killed/restarted worker does not permanently lose an in-flight digest — the job is retried or re-leased.
  • /generate-digest returns a task_id without doing heavy work; status is still observable via GET /digest-status/{task_id}.
  • The completion webhook callback to n8n still fires on success (workflow 3 depends on it).
  • min_machines_running / auto_stop workarounds re-evaluated and documented.

Gotchas

  • Do not break the n8n callback contract. See paperboy_all/CLAUDE.md for the three-workflow pipeline. Workflow 3 (3_Send_Digest_Email_v2, id GwegBjHAUPQO3380) emails the digest on callback — if the callback stops firing, digests silently stop being emailed.
  • Fly rebuild trap: rebuilding the image has previously broken boot (missing importlib_metadata); deps are pinned for this reason (requirements.lightweight.txt). Pin/verify deps before deploying the new worker image.
  • The paperboy-ai Fly app is on the Mogil Ventures Fly org (noah.mogil97@gmail.com), not Pennie — verify org before any Fly op.

Metadata

Metadata

Assignees

No one assigned

    Labels

    architectureArchitectural / structural changereliabilityReliability / operational robustness

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions