Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions backend/backend/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,11 +151,30 @@ def _normalize_google_redirect_uri(raw_uri: str, backend_base_url: str) -> str:
).lower() in ('true', '1', 'yes')
CELERY_TASK_EAGER_PROPAGATES = True

if DEBUG:
CACHES = {
'default': {
'BACKEND': 'django.core.cache.backends.locmem.LocMemCache',
'LOCATION': 'leadorbit-local-cache',
}
}
else:
CACHES = {
'default': {
'BACKEND': 'django.core.cache.backends.redis.RedisCache',
'LOCATION': os.getenv('CACHE_URL', os.getenv('CELERY_BROKER_URL', 'redis://localhost:6379/0')),
}
}

CELERY_BEAT_SCHEDULE = {
'process-campaign-leads-every-minute': {
'task': 'campaigns.tasks.process_active_leads',
'schedule': 60.0,
},
'celery-beat-heartbeat-every-minute': {
'task': 'campaigns.tasks.celery_heartbeat',
'schedule': 60.0,
},
'poll-gmail-replies-every-5-minutes': {
'task': 'campaigns.tasks.poll_gmail_for_replies',
'schedule': 300.0,
Expand Down
4 changes: 3 additions & 1 deletion backend/backend/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ class CustomTokenObtainPairView(BaseTokenObtainPairView):
CampaignViewSet,
SequenceStepViewSet,
EmailTemplateViewSet,
CeleryBeatHealthView,
WebhookView,
DashboardAnalyticsView,
AIGenerateView,
Expand Down Expand Up @@ -51,6 +52,7 @@ def api_root(_request):
path('api/v1/token/refresh/', TokenRefreshView.as_view(), name='token_refresh'),
path('api/v1/webhooks/email/', WebhookView.as_view(), name='email_webhook'),
path('api/v1/analytics/dashboard/', DashboardAnalyticsView.as_view(), name='dashboard_analytics'),
path('api/v1/health/celery-beat/', CeleryBeatHealthView.as_view(), name='celery_beat_health'),
path('api/v1/campaigns/ai-generate/', AIGenerateView.as_view(), name='ai_generate'),


Expand All @@ -65,4 +67,4 @@ def api_root(_request):
path('api/v1/connected-accounts/', ConnectedAccountsListView.as_view(), name='connected_accounts'),
path('api/v1/unsubscribe/<uuid:lead_id>/<str:token>/', unsubscribe_view, name='unsubscribe'),
path('api/v1/', include(router.urls)),
]
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from django.db import migrations


class Migration(migrations.Migration):

dependencies = [
('campaigns', '0009_campaign_cached_counters'),
('campaigns', '0009_campaignlead_bounce_metadata'),
]

operations = []
47 changes: 34 additions & 13 deletions backend/campaigns/tasks.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,26 @@
import logging from datetime
import timedelta from celery
import shared_task from django.conf
import settings as django_settings from django.utils
import timezone from .ai
import _apply_merge_tags, personalize_email from .gmail_service
import build_unsubscribe_url, check_for_replies, send_gmail from .sms_service
import send_sms, initiate_call from .models
import CampaignLead, SequenceStep from leads.models
import BlockedDomain, normalize_domain


import logging
import urllib.parse
from datetime import timedelta

from bs4 import BeautifulSoup
from celery import shared_task
from django.conf import settings as django_settings
from django.core.cache import cache
from django.core.signing import Signer
from django.utils import timezone

from .ai import _apply_merge_tags, personalize_email
from .gmail_service import build_unsubscribe_url, check_for_replies, send_gmail
from .models import CampaignLead, SequenceStep
from .sms_service import send_sms, initiate_call
from leads.models import BlockedDomain, normalize_domain


logger = logging.getLogger(__name__)

CELERY_BEAT_HEARTBEAT_KEY = 'celery_beat_heartbeat'
CELERY_BEAT_HEARTBEAT_MAX_AGE_SECONDS = 180

def _get_campaign_steps(campaign):
"""
Returns ordered steps for a campaign.
Expand Down Expand Up @@ -117,6 +121,23 @@ def _maybe_mark_campaign_completed(campaign):
logger.info(f"Campaign marked COMPLETED: {campaign.id}")


@shared_task
def celery_heartbeat():
"""
Periodic heartbeat used by the health endpoint to confirm Celery Beat is alive.
"""
now = timezone.now().timestamp()
cache.set(
CELERY_BEAT_HEARTBEAT_KEY,
now,
timeout=CELERY_BEAT_HEARTBEAT_MAX_AGE_SECONDS * 2,
)
return {
'heartbeat_key': CELERY_BEAT_HEARTBEAT_KEY,
'timestamp': now,
}


def _advance_to_next_step(clead, completed_step, now=None):
now = now or timezone.now()
raw_steps = _get_campaign_raw_steps(clead.campaign)
Expand Down Expand Up @@ -692,4 +713,4 @@ def poll_gmail_for_replies():
logger.info(f"Reply detected for {clead.lead.email} in campaign {clead.campaign.name}")
_maybe_mark_campaign_completed(clead.campaign)

return f"Detected {total_replies} new replies."
return f"Detected {total_replies} new replies."
35 changes: 35 additions & 0 deletions backend/campaigns/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@
from campaigns.tasks import (
_execute_condition_event_step,
_get_campaign_steps,
celery_heartbeat,
poll_gmail_for_replies,
process_active_leads,
process_active_leads_once,
send_email_step,
)
from django.core.cache import cache
from campaigns.utils import generate_unsubscribe_token
from leads.models import BlockedDomain, Lead
from tenants.models import Organization
Expand Down Expand Up @@ -1304,6 +1306,39 @@ def test_dashboard_analytics_requires_authentication(self):
response = self.client.get('/api/v1/analytics/dashboard/')
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)

def test_celery_beat_health_reports_503_until_heartbeat_updates(self):
cache.clear()
response = self.client.get('/api/v1/health/celery-beat/')

self.assertEqual(response.status_code, status.HTTP_503_SERVICE_UNAVAILABLE)
self.assertFalse(response.data['healthy'])
self.assertFalse(response.data['heartbeat_present'])

def test_celery_beat_health_reports_healthy_after_recent_heartbeat(self):
cache.clear()
celery_heartbeat()

response = self.client.get('/api/v1/health/celery-beat/')

self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertTrue(response.data['healthy'])
self.assertTrue(response.data['heartbeat_present'])
self.assertLessEqual(response.data['heartbeat_age_seconds'], 180)

def test_celery_beat_health_marks_stale_heartbeat_unhealthy(self):
cache.set(
'celery_beat_heartbeat',
timezone.now().timestamp() - 240,
timeout=600,
)

response = self.client.get('/api/v1/health/celery-beat/')

self.assertEqual(response.status_code, status.HTTP_503_SERVICE_UNAVAILABLE)
self.assertFalse(response.data['healthy'])
self.assertTrue(response.data['heartbeat_present'])
self.assertGreater(response.data['heartbeat_age_seconds'], 180)

def test_unsubscribe_get_shows_confirmation_without_updating_lead(self):
lead = Lead.objects.create(
organization=self.organization,
Expand Down
58 changes: 55 additions & 3 deletions backend/campaigns/views.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,25 @@
import logging


import urllib.parse
from datetime import datetime, timezone as dt_timezone

from django.core.cache import cache
from django.core.signing import Signer, BadSignature
from django.http import HttpResponseRedirect, HttpResponseBadRequest
from django.utils import timezone
# ------------------------------------------

from rest_framework import status, viewsets
from rest_framework.decorators import action
from rest_framework.permissions import AllowAny, IsAuthenticated
from rest_framework.response import Response
from rest_framework.views import APIView

from leads.models import Lead
from users.permissions import IsOrgManager

from .models import Campaign, CampaignLead, SequenceStep, EmailTemplate
from .serializers import CampaignSerializer, SequenceStepSerializer, EmailTemplateSerializer
from .tasks import CELERY_BEAT_HEARTBEAT_KEY, CELERY_BEAT_HEARTBEAT_MAX_AGE_SECONDS

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -711,6 +715,54 @@ def _build_fallback_content(self, request):
"Your Name"
)
return f"SUBJECT: {subject}\nBODY: {body}"


class CeleryBeatHealthView(APIView):
"""
Read-only health endpoint that reports whether Celery Beat is updating the
heartbeat cache key frequently enough.
"""
permission_classes = [IsAuthenticated]

def get(self, request, *args, **kwargs):
raw_heartbeat = cache.get(CELERY_BEAT_HEARTBEAT_KEY)
if raw_heartbeat is None:
return Response(
{
'healthy': False,
'heartbeat_present': False,
'detail': 'Celery Beat heartbeat is missing.',
},
status=status.HTTP_503_SERVICE_UNAVAILABLE,
)

try:
heartbeat_ts = float(raw_heartbeat)
except (TypeError, ValueError):
return Response(
{
'healthy': False,
'heartbeat_present': True,
'detail': 'Celery Beat heartbeat is malformed.',
},
status=status.HTTP_503_SERVICE_UNAVAILABLE,
)

now_ts = timezone.now().timestamp()
age_seconds = now_ts - heartbeat_ts
healthy = age_seconds <= CELERY_BEAT_HEARTBEAT_MAX_AGE_SECONDS
response_status = status.HTTP_200_OK if healthy else status.HTTP_503_SERVICE_UNAVAILABLE

return Response(
{
'healthy': healthy,
'heartbeat_present': True,
'heartbeat_age_seconds': round(age_seconds, 1),
'last_heartbeat': datetime.fromtimestamp(heartbeat_ts, tz=dt_timezone.utc).isoformat(),
'max_age_seconds': CELERY_BEAT_HEARTBEAT_MAX_AGE_SECONDS,
},
status=response_status,
)

from django.http import HttpResponse
from django.middleware.csrf import get_token
Expand Down Expand Up @@ -820,4 +872,4 @@ def get(self, request, *args, **kwargs):
# Original Destination par redirect karna
decoded_dest = urllib.parse.unquote(dest_url)
return HttpResponseRedirect(decoded_dest)
# ------------------------------------------
# ------------------------------------------