Skip to content
Merged
59 changes: 46 additions & 13 deletions django-backend/soroscan/health.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,19 @@
Health check endpoints for Kubernetes liveness/readiness probes.
"""
import time
import requests

from django.core.cache import cache
from django.db import connection
from rest_framework.decorators import api_view, permission_classes
from rest_framework.permissions import AllowAny
from rest_framework.response import Response
from django.conf import settings

from celery.exceptions import TimeoutError

from soroscan.celery import app

WORKER_HEALTH_TIMEOUT_SECONDS = 2


PROCESS_START_TIME = time.monotonic()


Expand Down Expand Up @@ -53,26 +52,60 @@ def health_view(request):
@api_view(["GET"])
@permission_classes([AllowAny])
def readiness_view(request):
"""Readiness probe - DB and Redis are connected."""
errors = []
"""Readiness probe - DB, Redis, and Soroban RPC are connected."""
components = {
"database": "healthy",
"redis": "healthy",
"soroban_rpc": "healthy"
}
overall_status = "healthy"

# 1. Database Check
try:
with connection.cursor() as cursor:
cursor.execute("SELECT 1")
except Exception as e:
errors.append(f"db: {str(e)}")
components["database"] = f"degraded: {str(e)}"
overall_status = "degraded"

# 2. Redis Check
try:
cache.set("health_check", "1", timeout=10)
cache.set("health_check", "1", timeout=5)
if cache.get("health_check") != "1":
errors.append("redis: failed to read value")
components["redis"] = "degraded: failed to read/write cache"
overall_status = "degraded"
except Exception as e:
errors.append(f"redis: {str(e)}")
components["redis"] = f"degraded: {str(e)}"
overall_status = "degraded"

if errors:
return Response({"status": "not_ready", "errors": errors}, status=503)
# 3. Soroban RPC Check
try:
rpc_url = getattr(settings, "SOROBAN_RPC_URL", "")
if rpc_url:
# Send a lightweight getHealth JSON-RPC ping to Soroban
res = requests.post(
rpc_url,
json={"jsonrpc": "2.0", "id": 1, "method": "getHealth"},
timeout=3
)
res.raise_for_status()
data = res.json()
if "error" in data:
components["soroban_rpc"] = f"degraded: {data['error']}"
overall_status = "degraded"
else:
components["soroban_rpc"] = "degraded: SOROBAN_RPC_URL not configured"
overall_status = "degraded"
except Exception as e:
components["soroban_rpc"] = f"degraded: {str(e)}"
overall_status = "degraded"

return Response({"status": "ready"})
status_code = 200 if overall_status == "healthy" else 503

return Response({
"status": overall_status,
"components": components
}, status=status_code)


@api_view(["GET"])
Expand All @@ -96,4 +129,4 @@ def worker_health_view(request):
return Response(
{"status": "unhealthy", "error": str(exc)},
status=503,
)
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Generated by Django 5.2.10 on 2026-06-02 11:25

import django.core.validators
from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
("ingest", "0042_blacklistedcontract"),
]

operations = [
migrations.AlterField(
model_name="webhooksubscription",
name="retry_backoff_seconds",
field=models.PositiveIntegerField(
default=2,
help_text="Base seconds for backoff calculation (e.g. 2s, 4s, 8s...) (1-3600, default: 2)",
validators=[
django.core.validators.MinValueValidator(1),
django.core.validators.MaxValueValidator(3600),
],
),
),
migrations.AddConstraint(
model_name="webhooksubscription",
constraint=models.UniqueConstraint(
fields=("target_url", "contract"),
name="unique_url_contract_subscription",
),
),
]
7 changes: 7 additions & 0 deletions django-backend/soroscan/ingest/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -795,6 +795,13 @@ class WebhookSubscription(models.Model):
class Meta:
ordering = ["-created_at"]

constraints = [
models.UniqueConstraint(
fields=["target_url", "contract"],
name="unique_url_contract_subscription",
)
]

def __str__(self):
return f"Webhook -> {self.target_url} ({self.contract.name})"

Expand Down
74 changes: 29 additions & 45 deletions django-backend/soroscan/ingest/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,37 +337,35 @@ def validate_filter_condition(self, value):
if not isinstance(value, dict):
raise serializers.ValidationError("filter_condition must be an object.")

allowed_ops = {"and", "or", "not", "eq", "neq", "gt", "gte", "lt", "lte", "in", "contains", "startswith", "regex"}

def _validate(node: dict):
if not isinstance(node, dict):
raise serializers.ValidationError("Each condition node must be an object.")
op = str(node.get("op", "")).lower()
if op not in allowed_ops:
raise serializers.ValidationError(f"Unsupported operator: {op}")

if op in {"and", "or"}:
conditions = node.get("conditions")
if not isinstance(conditions, list) or not conditions:
raise serializers.ValidationError(f"'{op}' requires a non-empty conditions array.")
for sub in conditions:
_validate(sub)
return

if op == "not":
condition = node.get("condition")
if not isinstance(condition, dict):
raise serializers.ValidationError("'not' requires a condition object.")
_validate(condition)
return

if "field" not in node:
raise serializers.ValidationError(f"'{op}' requires a field.")
if "value" not in node:
raise serializers.ValidationError(f"'{op}' requires a value.")

_validate(value)
return value

def validate(self, attrs):
contract = attrs.get("contract")
if not contract and self.instance:
contract = self.instance.contract

target_url = attrs.get("target_url")
if not target_url and self.instance:
target_url = self.instance.target_url

# Check for duplicates (Issue #474)
if contract and target_url:
qs = WebhookSubscription.objects.filter(contract=contract, target_url=target_url)
if self.instance:
qs = qs.exclude(pk=self.instance.pk)
if qs.exists():
raise serializers.ValidationError({
"target_url": "A webhook subscription for this URL and contract already exists."
})

if contract:
estimated_size = contract.metadata.get("estimated_payload_size", 0)
if estimated_size > 1048576: # 1MB
raise serializers.ValidationError({"contract": "Estimated payload exceeds 1MB limit."})

if contract.metadata.get("is_massive", False):
raise serializers.ValidationError({"contract": "Contract events are known to be massive."})

return attrs

def validate_escalation_policy(self, value):
if value in (None, []):
Expand Down Expand Up @@ -401,20 +399,6 @@ def validate_escalation_policy(self, value):
)
return value

def validate(self, attrs):
contract = attrs.get("contract")
if not contract and self.instance:
contract = self.instance.contract

if contract:
estimated_size = contract.metadata.get("estimated_payload_size", 0)
if estimated_size > 1048576: # 1MB
raise serializers.ValidationError({"contract": "Estimated payload exceeds 1MB limit."})

if contract.metadata.get("is_massive", False):
raise serializers.ValidationError({"contract": "Contract events are known to be massive."})

return attrs

class RecordEventRequestSerializer(serializers.Serializer):
"""
Expand Down
27 changes: 9 additions & 18 deletions django-backend/soroscan/ingest/tests/test_health.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,16 @@
import time

import pytest
from django.conf import settings
from django.core.cache import cache
from django.urls import reverse
from rest_framework import status
from rest_framework.test import APIClient

from soroscan.health import format_uptime


@pytest.fixture
def api_client():
return APIClient()


@pytest.mark.django_db
class TestHealthView:
def test_health_returns_ok_with_uptime(self, api_client):
Expand Down Expand Up @@ -56,37 +52,32 @@ def test_ready_when_db_and_cache_connected(self, api_client):
response = api_client.get(url)

assert response.status_code == status.HTTP_200_OK
assert response.data == {"status": "ready"}
assert response.data["status"] == "healthy"
assert "components" in response.data
assert response["X-SoroScan-Version"] == settings.SOFTWARE_VERSION

def test_not_ready_when_db_fails(self, api_client, monkeypatch):
from django.db import connection

def mocked_cursor(*args, **kwargs):
raise Exception("DB connection failed")

monkeypatch.setattr(connection, "cursor", lambda: mocked_cursor())
monkeypatch.setattr(connection, "cursor", lambda: (_ for _ in ()).throw(Exception("DB fail")))

url = reverse("readiness")
response = api_client.get(url)

assert response.status_code == status.HTTP_503_SERVICE_UNAVAILABLE
assert response.data["status"] == "not_ready"
assert any("db" in e for e in response.data["errors"])
assert response.data["status"] == "degraded"
assert "database" in response.data["components"]
assert response["X-SoroScan-Version"] == settings.SOFTWARE_VERSION

def test_not_ready_when_cache_fails(self, api_client, monkeypatch):
def mocked_get(*args, **kwargs):
raise Exception("Cache connection failed")

monkeypatch.setattr(cache, "get", mocked_get)
from django.core.cache import cache
monkeypatch.setattr(cache, "get", lambda *args, **kwargs: (_ for _ in ()).throw(Exception("Redis fail")))

url = reverse("readiness")
response = api_client.get(url)

assert response.status_code == status.HTTP_503_SERVICE_UNAVAILABLE
assert response.data["status"] == "not_ready"
assert any("redis" in e for e in response.data["errors"])
assert response.data["status"] == "degraded"
assert "redis" in response.data["components"]
assert response["X-SoroScan-Version"] == settings.SOFTWARE_VERSION


Expand Down
9 changes: 3 additions & 6 deletions django-backend/soroscan/ingest/tests/test_migration_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ def enable_db_access_for_all_tests():
def test_single_leaf_node():
"""
Assert the ingest migration graph has exactly one leaf node.

The current leaf is '0042_blacklistedcontract'
"""
loader = MigrationLoader(None, ignore_no_migrations=True)

Expand All @@ -31,10 +29,9 @@ def test_single_leaf_node():
assert len(leaf_nodes) == 1, (
f"Expected 1 leaf node for 'ingest', found {len(leaf_nodes)}: {leaf_nodes}"
)
# After adding BlacklistedContract the expected single leaf is 0042
assert leaf_nodes[0][1] == "0042_blacklistedcontract", (
"Expected leaf node '0042_blacklistedcontract', "
f"got '{leaf_nodes[0][1]}'"
# Updated to reflect the new migration generated by the unique constraint
assert leaf_nodes[0][1].startswith("0043_"), (
f"Expected leaf node starting with '0043_', got '{leaf_nodes[0][1]}'"
)


Expand Down
9 changes: 5 additions & 4 deletions django-backend/soroscan/ingest/tests/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -657,15 +657,16 @@ def test_process_event_dispatches_to_matching_webhooks(self, contract):
event = ContractEventFactory(
contract=contract, event_type="swap", ledger=6000, event_index=0
)

# Add unique target_urls here!
webhook_swap = WebhookSubscriptionFactory(
contract=contract, event_type="swap", is_active=True,
contract=contract, event_type="swap", is_active=True, target_url="https://example.com/swap"
)
webhook_all = WebhookSubscriptionFactory(
contract=contract, event_type="", is_active=True,
contract=contract, event_type="", is_active=True, target_url="https://example.com/all"
)
# non-matching event type — must NOT be dispatched
WebhookSubscriptionFactory(
contract=contract, event_type="transfer", is_active=True,
contract=contract, event_type="transfer", is_active=True, target_url="https://example.com/transfer"
)

responses.add(
Expand Down
14 changes: 8 additions & 6 deletions django-backend/soroscan/ingest/tests/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,12 +279,14 @@ def test_filter_events_by_decoding_status(self, authenticated_client, contract):
@pytest.mark.django_db
class TestWebhookSubscriptionViewSet:
def test_list_webhooks(self, authenticated_client, contract):
WebhookSubscriptionFactory.create_batch(2, contract=contract)
url = reverse("webhook-list")
response = authenticated_client.get(url)

assert response.status_code == status.HTTP_200_OK
assert len(response.data["results"]) == 2
WebhookSubscriptionFactory(contract=contract, target_url="https://example.com/webhook-1")
WebhookSubscriptionFactory(contract=contract, target_url="https://example.com/webhook-2")

url = reverse("webhook-list")
response = authenticated_client.get(url)

assert response.status_code == status.HTTP_200_OK
assert len(response.data["results"]) == 2

def test_create_webhook(self, authenticated_client, contract):
url = reverse("webhook-list")
Expand Down
11 changes: 10 additions & 1 deletion django-backend/soroscan/ingest/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,16 @@ class ContractEventViewSet(viewsets.ReadOnlyModelViewSet):
}

def get_queryset(self):
return ContractEvent.objects.select_related("contract").all()
qs = ContractEvent.objects.select_related("contract").all()

# Support comma-separated 'type' filter (Issue #476)
event_types = self.request.query_params.get("type")
if event_types:
types_list = [t.strip() for t in event_types.split(",") if t.strip()]
if types_list:
qs = qs.filter(event_type__in=types_list)

return qs

@extend_schema(
parameters=[
Expand Down
Loading
Loading