Skip to content

feat: add throttling and backpressure to background job processing (#…#1096

Open
Bhavy12-cell wants to merge 1 commit into
imDarshanGK:mainfrom
Bhavy12-cell:feat/job-throttling-632
Open

feat: add throttling and backpressure to background job processing (#…#1096
Bhavy12-cell wants to merge 1 commit into
imDarshanGK:mainfrom
Bhavy12-cell:feat/job-throttling-632

Conversation

@Bhavy12-cell

Copy link
Copy Markdown

Summary

Fixes #632

Problem

The weekly digest scheduler sent emails to all subscribers in a simple
sequential loop with no concurrency control. Under volume spikes this
would overwhelm the email provider and database connection pool.

Solution

New File — backend/app/services/job_throttle.py

Implements three components:

JobThrottle — semaphore-based concurrency limiter with:

  • max_concurrent_workers cap via threading.Semaphore
  • Rate limiting via min_interval_seconds between job starts
  • Adaptive backpressure: when error rate exceeds threshold,
    the interval is multiplied by backoff_multiplier
  • Queue backpressure: raises RuntimeError when max_queue_size
    is exceeded so callers can shed load explicitly
  • Thread-safe stats() for observability

ThrottleConfig — dataclass with tuneable defaults:

  • All values overridable via environment variables

throttled_batch() — helper that processes any iterable with
a ThreadPoolExecutor bounded by the throttle config

Modified File — backend/app/services/scheduler.py

  • Replaced sequential loop with throttled_batch()
  • Serializes SQLAlchemy models to dicts before spawning threads
    (prevents cross-thread session sharing bugs)
  • Each worker opens its own DB session and closes it on completion
  • Throttle config readable from env vars (no code change needed to tune)

New File — backend/tests/test_job_throttle.py

9 tests covering:

  • Concurrency cap enforcement
  • Queue full backpressure
  • Error rate calculation
  • Context manager success and failure paths
  • throttled_batch all-success, partial-failure, empty input

Environment Variables (tunable without code change)

Variable Default Description
DIGEST_MAX_WORKERS 3 Max concurrent digest workers
DIGEST_MAX_QUEUE 50 Max queued jobs before backpressure
DIGEST_MIN_INTERVAL_S 0.5 Min seconds between job starts
DIGEST_ERROR_THRESHOLD 0.3 Error rate to trigger backoff
DIGEST_BACKOFF_MULTIPLIER 2.0 Interval multiplier during backoff

Files Changed

  • backend/app/services/job_throttle.py — new throttling module
  • backend/app/services/scheduler.py — updated to use throttled_batch
  • backend/tests/test_job_throttle.py — full test coverage

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add throttling to background job processing to avoid spikes

1 participant