diff --git a/backend/backend/settings.py b/backend/backend/settings.py index 7ecc6b0..855a3a3 100644 --- a/backend/backend/settings.py +++ b/backend/backend/settings.py @@ -151,11 +151,30 @@ def _normalize_google_redirect_uri(raw_uri: str, backend_base_url: str) -> str: ).lower() in ('true', '1', 'yes') CELERY_TASK_EAGER_PROPAGATES = True +if DEBUG: + CACHES = { + 'default': { + 'BACKEND': 'django.core.cache.backends.locmem.LocMemCache', + 'LOCATION': 'leadorbit-local-cache', + } + } +else: + CACHES = { + 'default': { + 'BACKEND': 'django.core.cache.backends.redis.RedisCache', + 'LOCATION': os.getenv('CACHE_URL', os.getenv('CELERY_BROKER_URL', 'redis://localhost:6379/0')), + } + } + CELERY_BEAT_SCHEDULE = { 'process-campaign-leads-every-minute': { 'task': 'campaigns.tasks.process_active_leads', 'schedule': 60.0, }, + 'celery-beat-heartbeat-every-minute': { + 'task': 'campaigns.tasks.celery_heartbeat', + 'schedule': 60.0, + }, 'poll-gmail-replies-every-5-minutes': { 'task': 'campaigns.tasks.poll_gmail_for_replies', 'schedule': 300.0, diff --git a/backend/backend/urls.py b/backend/backend/urls.py index eafcbb2..4d42fe5 100644 --- a/backend/backend/urls.py +++ b/backend/backend/urls.py @@ -17,6 +17,7 @@ class CustomTokenObtainPairView(BaseTokenObtainPairView): CampaignViewSet, SequenceStepViewSet, EmailTemplateViewSet, + CeleryBeatHealthView, WebhookView, DashboardAnalyticsView, AIGenerateView, @@ -51,6 +52,7 @@ def api_root(_request): path('api/v1/token/refresh/', TokenRefreshView.as_view(), name='token_refresh'), path('api/v1/webhooks/email/', WebhookView.as_view(), name='email_webhook'), path('api/v1/analytics/dashboard/', DashboardAnalyticsView.as_view(), name='dashboard_analytics'), + path('api/v1/health/celery-beat/', CeleryBeatHealthView.as_view(), name='celery_beat_health'), path('api/v1/campaigns/ai-generate/', AIGenerateView.as_view(), name='ai_generate'), @@ -65,4 +67,4 @@ def api_root(_request): path('api/v1/connected-accounts/', ConnectedAccountsListView.as_view(), name='connected_accounts'), path('api/v1/unsubscribe///', unsubscribe_view, name='unsubscribe'), path('api/v1/', include(router.urls)), -] \ No newline at end of file +] diff --git a/backend/campaigns/migrations/0010_merge_0009_campaign_cached_counters_0009_campaignlead_bounce_metadata.py b/backend/campaigns/migrations/0010_merge_0009_campaign_cached_counters_0009_campaignlead_bounce_metadata.py new file mode 100644 index 0000000..317a41b --- /dev/null +++ b/backend/campaigns/migrations/0010_merge_0009_campaign_cached_counters_0009_campaignlead_bounce_metadata.py @@ -0,0 +1,11 @@ +from django.db import migrations + + +class Migration(migrations.Migration): + + dependencies = [ + ('campaigns', '0009_campaign_cached_counters'), + ('campaigns', '0009_campaignlead_bounce_metadata'), + ] + + operations = [] diff --git a/backend/campaigns/tasks.py b/backend/campaigns/tasks.py index de16c24..e2633f0 100644 --- a/backend/campaigns/tasks.py +++ b/backend/campaigns/tasks.py @@ -1,22 +1,26 @@ -import logging from datetime -import timedelta from celery -import shared_task from django.conf -import settings as django_settings from django.utils -import timezone from .ai -import _apply_merge_tags, personalize_email from .gmail_service -import build_unsubscribe_url, check_for_replies, send_gmail from .sms_service -import send_sms, initiate_call from .models -import CampaignLead, SequenceStep from leads.models -import BlockedDomain, normalize_domain - - +import logging import urllib.parse +from datetime import timedelta + from bs4 import BeautifulSoup +from celery import shared_task +from django.conf import settings as django_settings +from django.core.cache import cache from django.core.signing import Signer +from django.utils import timezone + +from .ai import _apply_merge_tags, personalize_email +from .gmail_service import build_unsubscribe_url, check_for_replies, send_gmail +from .models import CampaignLead, SequenceStep +from .sms_service import send_sms, initiate_call +from leads.models import BlockedDomain, normalize_domain logger = logging.getLogger(__name__) +CELERY_BEAT_HEARTBEAT_KEY = 'celery_beat_heartbeat' +CELERY_BEAT_HEARTBEAT_MAX_AGE_SECONDS = 180 + def _get_campaign_steps(campaign): """ Returns ordered steps for a campaign. @@ -117,6 +121,23 @@ def _maybe_mark_campaign_completed(campaign): logger.info(f"Campaign marked COMPLETED: {campaign.id}") +@shared_task +def celery_heartbeat(): + """ + Periodic heartbeat used by the health endpoint to confirm Celery Beat is alive. + """ + now = timezone.now().timestamp() + cache.set( + CELERY_BEAT_HEARTBEAT_KEY, + now, + timeout=CELERY_BEAT_HEARTBEAT_MAX_AGE_SECONDS * 2, + ) + return { + 'heartbeat_key': CELERY_BEAT_HEARTBEAT_KEY, + 'timestamp': now, + } + + def _advance_to_next_step(clead, completed_step, now=None): now = now or timezone.now() raw_steps = _get_campaign_raw_steps(clead.campaign) @@ -692,4 +713,4 @@ def poll_gmail_for_replies(): logger.info(f"Reply detected for {clead.lead.email} in campaign {clead.campaign.name}") _maybe_mark_campaign_completed(clead.campaign) - return f"Detected {total_replies} new replies." \ No newline at end of file + return f"Detected {total_replies} new replies." diff --git a/backend/campaigns/tests.py b/backend/campaigns/tests.py index 1d96075..a1b60ae 100644 --- a/backend/campaigns/tests.py +++ b/backend/campaigns/tests.py @@ -13,11 +13,13 @@ from campaigns.tasks import ( _execute_condition_event_step, _get_campaign_steps, + celery_heartbeat, poll_gmail_for_replies, process_active_leads, process_active_leads_once, send_email_step, ) +from django.core.cache import cache from campaigns.utils import generate_unsubscribe_token from leads.models import BlockedDomain, Lead from tenants.models import Organization @@ -1304,6 +1306,39 @@ def test_dashboard_analytics_requires_authentication(self): response = self.client.get('/api/v1/analytics/dashboard/') self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED) + def test_celery_beat_health_reports_503_until_heartbeat_updates(self): + cache.clear() + response = self.client.get('/api/v1/health/celery-beat/') + + self.assertEqual(response.status_code, status.HTTP_503_SERVICE_UNAVAILABLE) + self.assertFalse(response.data['healthy']) + self.assertFalse(response.data['heartbeat_present']) + + def test_celery_beat_health_reports_healthy_after_recent_heartbeat(self): + cache.clear() + celery_heartbeat() + + response = self.client.get('/api/v1/health/celery-beat/') + + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertTrue(response.data['healthy']) + self.assertTrue(response.data['heartbeat_present']) + self.assertLessEqual(response.data['heartbeat_age_seconds'], 180) + + def test_celery_beat_health_marks_stale_heartbeat_unhealthy(self): + cache.set( + 'celery_beat_heartbeat', + timezone.now().timestamp() - 240, + timeout=600, + ) + + response = self.client.get('/api/v1/health/celery-beat/') + + self.assertEqual(response.status_code, status.HTTP_503_SERVICE_UNAVAILABLE) + self.assertFalse(response.data['healthy']) + self.assertTrue(response.data['heartbeat_present']) + self.assertGreater(response.data['heartbeat_age_seconds'], 180) + def test_unsubscribe_get_shows_confirmation_without_updating_lead(self): lead = Lead.objects.create( organization=self.organization, diff --git a/backend/campaigns/views.py b/backend/campaigns/views.py index 2a11bf0..c3a4396 100644 --- a/backend/campaigns/views.py +++ b/backend/campaigns/views.py @@ -1,21 +1,25 @@ import logging - - import urllib.parse +from datetime import datetime, timezone as dt_timezone + +from django.core.cache import cache from django.core.signing import Signer, BadSignature from django.http import HttpResponseRedirect, HttpResponseBadRequest +from django.utils import timezone # ------------------------------------------ from rest_framework import status, viewsets from rest_framework.decorators import action from rest_framework.permissions import AllowAny, IsAuthenticated from rest_framework.response import Response +from rest_framework.views import APIView from leads.models import Lead from users.permissions import IsOrgManager from .models import Campaign, CampaignLead, SequenceStep, EmailTemplate from .serializers import CampaignSerializer, SequenceStepSerializer, EmailTemplateSerializer +from .tasks import CELERY_BEAT_HEARTBEAT_KEY, CELERY_BEAT_HEARTBEAT_MAX_AGE_SECONDS logger = logging.getLogger(__name__) @@ -711,6 +715,54 @@ def _build_fallback_content(self, request): "Your Name" ) return f"SUBJECT: {subject}\nBODY: {body}" + + +class CeleryBeatHealthView(APIView): + """ + Read-only health endpoint that reports whether Celery Beat is updating the + heartbeat cache key frequently enough. + """ + permission_classes = [IsAuthenticated] + + def get(self, request, *args, **kwargs): + raw_heartbeat = cache.get(CELERY_BEAT_HEARTBEAT_KEY) + if raw_heartbeat is None: + return Response( + { + 'healthy': False, + 'heartbeat_present': False, + 'detail': 'Celery Beat heartbeat is missing.', + }, + status=status.HTTP_503_SERVICE_UNAVAILABLE, + ) + + try: + heartbeat_ts = float(raw_heartbeat) + except (TypeError, ValueError): + return Response( + { + 'healthy': False, + 'heartbeat_present': True, + 'detail': 'Celery Beat heartbeat is malformed.', + }, + status=status.HTTP_503_SERVICE_UNAVAILABLE, + ) + + now_ts = timezone.now().timestamp() + age_seconds = now_ts - heartbeat_ts + healthy = age_seconds <= CELERY_BEAT_HEARTBEAT_MAX_AGE_SECONDS + response_status = status.HTTP_200_OK if healthy else status.HTTP_503_SERVICE_UNAVAILABLE + + return Response( + { + 'healthy': healthy, + 'heartbeat_present': True, + 'heartbeat_age_seconds': round(age_seconds, 1), + 'last_heartbeat': datetime.fromtimestamp(heartbeat_ts, tz=dt_timezone.utc).isoformat(), + 'max_age_seconds': CELERY_BEAT_HEARTBEAT_MAX_AGE_SECONDS, + }, + status=response_status, + ) from django.http import HttpResponse from django.middleware.csrf import get_token @@ -820,4 +872,4 @@ def get(self, request, *args, **kwargs): # Original Destination par redirect karna decoded_dest = urllib.parse.unquote(dest_url) return HttpResponseRedirect(decoded_dest) -# ------------------------------------------ \ No newline at end of file +# ------------------------------------------