diff --git a/apps/data-processing/scripts/build_project_materialized_views.py b/apps/data-processing/scripts/build_project_materialized_views.py new file mode 100644 index 00000000..e0318077 --- /dev/null +++ b/apps/data-processing/scripts/build_project_materialized_views.py @@ -0,0 +1,96 @@ +#!/usr/bin/env python3 +"""Build project contribution materialized views from backfilled on-chain events.""" + +import argparse +import json +import sys +from pathlib import Path + +PROJECT_ROOT = Path(__file__).resolve().parents[1] +if str(PROJECT_ROOT) not in sys.path: + sys.path.insert(0, str(PROJECT_ROOT)) + +from dotenv import load_dotenv + +from src.db import PostgresService +from src.ingestion.project_materialized_views import refresh_project_materialized_views + + +def load_events(path: Path) -> list: + with path.open() as handle: + payload = json.load(handle) + + if isinstance(payload, dict): + events = payload.get("events") + if isinstance(events, list): + return events + return [] + if isinstance(payload, list): + return payload + return [] + + +def main() -> int: + parser = argparse.ArgumentParser(description="Refresh project contribution materialized views") + parser.add_argument("--input-dir", type=str, required=True, help="Directory containing backfilled event JSON files") + parser.add_argument("--verbose", action="store_true") + args = parser.parse_args() + + load_dotenv(PROJECT_ROOT / ".env") + + db_service = PostgresService() + input_dir = Path(args.input_dir) + + total_projects = 0 + total_checks = 0 + + for file_path in sorted(input_dir.glob("*.json")): + events = load_events(file_path) + if not events: + continue + + existing_rows = { + row.project_id: { + "project_id": row.project_id, + "total_contributed": row.total_contributed, + "contributor_count": row.contributor_count, + "milestone_approved": bool(row.milestone_approved), + "contributors": row.contributors or [], + "last_processed_ledger": row.last_processed_ledger, + } + for row in db_service.get_project_contribution_rollups() + } + + rows, checks = refresh_project_materialized_views(events, existing_rows=existing_rows) + + for row in rows: + db_service.upsert_project_contribution_rollup( + project_id=row["project_id"], + total_contributed=row["total_contributed"], + contributor_count=row["contributor_count"], + milestone_approved=bool(row["milestone_approved"]), + contributors=row["contributors"], + last_processed_ledger=row["last_processed_ledger"], + ) + total_projects += 1 + + for check_id, check in checks.items(): + db_service.save_analytics_record( + record_type="project_materialized_view_quality", + metric_name=check_id, + value=float(int(check["passed"])), + asset="project_totals", + extra_data=check["details"], + ) + total_checks += 1 + + print({ + "projects_materialized": total_projects, + "quality_checks_emitted": total_checks, + }) + + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/apps/data-processing/src/db/__init__.py b/apps/data-processing/src/db/__init__.py index cf3b7564..2930075a 100644 --- a/apps/data-processing/src/db/__init__.py +++ b/apps/data-processing/src/db/__init__.py @@ -2,7 +2,15 @@ Database package for analytics data persistence """ -from .models import Base, Article, SocialPost, AnalyticsRecord, NewsInsight, AssetTrend +from .models import ( + Base, + Article, + SocialPost, + AnalyticsRecord, + ProjectContributionMaterializedView, + NewsInsight, + AssetTrend, +) from .postgres_service import PostgresService __all__ = [ @@ -10,6 +18,7 @@ "Article", "SocialPost", "AnalyticsRecord", + "ProjectContributionMaterializedView", "NewsInsight", "AssetTrend", "PostgresService", diff --git a/apps/data-processing/src/db/models.py b/apps/data-processing/src/db/models.py index af596592..08b8fa3f 100644 --- a/apps/data-processing/src/db/models.py +++ b/apps/data-processing/src/db/models.py @@ -166,6 +166,38 @@ def __repr__(self): return f"" +class ProjectContributionMaterializedView(Base): + """ + Stores incremental rollups for project totals, contributors, and milestone status. + """ + + __tablename__ = "project_contribution_materialized_views" + + project_id = Column(Integer, primary_key=True, index=True) + total_contributed = Column(BigInteger, nullable=False, default=0) + contributor_count = Column(Integer, nullable=False, default=0) + milestone_approved = Column(Integer, nullable=False, default=0) + contributors = Column(JSON, nullable=True) + last_processed_ledger = Column(BigInteger, nullable=False, default=0) + updated_at = Column( + DateTime(timezone=True), server_default=func.now(), onupdate=func.now(), nullable=False + ) + created_at = Column( + DateTime(timezone=True), server_default=func.now(), nullable=False + ) + + __table_args__ = ( + Index("idx_project_contribution_rollup_project_id", "project_id"), + Index("idx_project_contribution_rollup_ledger", "last_processed_ledger"), + ) + + def __repr__(self): + return ( + f"" + ) + + class NewsInsight(Base): """ Stores sentiment analysis results for news articles (legacy table, kept for backward compatibility) diff --git a/apps/data-processing/src/db/postgres_service.py b/apps/data-processing/src/db/postgres_service.py index 54d9896b..5800e44a 100644 --- a/apps/data-processing/src/db/postgres_service.py +++ b/apps/data-processing/src/db/postgres_service.py @@ -13,7 +13,15 @@ from sqlalchemy.orm import sessionmaker, Session from sqlalchemy.exc import SQLAlchemyError, OperationalError -from .models import Base, Article, SocialPost, AnalyticsRecord, NewsInsight, AssetTrend +from .models import ( + Base, + Article, + SocialPost, + AnalyticsRecord, + ProjectContributionMaterializedView, + NewsInsight, + AssetTrend, +) from src.analytics.ner_service import NERService logger = logging.getLogger(__name__) @@ -684,6 +692,71 @@ def save_analytics_records_batch( return saved_count + def upsert_project_contribution_rollup( + self, + project_id: int, + total_contributed: int, + contributor_count: int, + milestone_approved: bool, + contributors: Optional[List[str]] = None, + last_processed_ledger: Optional[int] = None, + ) -> Optional[ProjectContributionMaterializedView]: + """ + Persist incremental project contribution rollups. + """ + + def _upsert(): + with self.get_session() as session: + existing = session.execute( + select(ProjectContributionMaterializedView).where( + ProjectContributionMaterializedView.project_id == project_id + ) + ).scalar_one_or_none() + + if existing is None: + record = ProjectContributionMaterializedView( + project_id=project_id, + total_contributed=total_contributed, + contributor_count=contributor_count, + milestone_approved=int(bool(milestone_approved)), + contributors=contributors or [], + last_processed_ledger=last_processed_ledger or 0, + ) + session.add(record) + session.flush() + logger.debug(f"Saved project contribution rollup for project {project_id}") + return record + + existing.total_contributed = total_contributed + existing.contributor_count = contributor_count + existing.milestone_approved = int(bool(milestone_approved)) + existing.contributors = contributors or [] + if last_processed_ledger is not None: + existing.last_processed_ledger = last_processed_ledger + session.flush() + logger.debug(f"Updated project contribution rollup for project {project_id}") + return existing + + try: + return self._retry_operation(_upsert) + except SQLAlchemyError as e: + logger.error(f"Failed to upsert project contribution rollup: {e}") + return None + + def get_project_contribution_rollups(self) -> List[ProjectContributionMaterializedView]: + """Fetch persisted project contribution materialized views.""" + try: + with self.get_session() as session: + stmt = select(ProjectContributionMaterializedView).order_by( + ProjectContributionMaterializedView.project_id + ) + results = session.execute(stmt).scalars().all() + logger.debug(f"Retrieved {len(results)} project contribution rollups") + return results + except SQLAlchemyError as e: + logger.error(f"Failed to retrieve project contribution rollups: {e}") + return [] + def get_analytics_records( self, record_type: Optional[str] = None, diff --git a/apps/data-processing/src/ingestion/project_materialized_views.py b/apps/data-processing/src/ingestion/project_materialized_views.py new file mode 100644 index 00000000..8c03c9e7 --- /dev/null +++ b/apps/data-processing/src/ingestion/project_materialized_views.py @@ -0,0 +1,264 @@ +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Dict, List, Optional + + +@dataclass +class ProjectMaterializedView: + project_id: int + total_contributed: int = 0 + contributor_count: int = 0 + milestone_approved: bool = False + contributors: set[str] = field(default_factory=set) + last_processed_ledger: int = 0 + last_processed_timestamp: Optional[str] = None + + +def _coerce_int(value) -> int: + if value is None: + return 0 + if isinstance(value, bool): + return int(value) + if isinstance(value, (int, float)): + return int(value) + if isinstance(value, str): + cleaned = value.strip() + if cleaned == "": + return 0 + try: + return int(float(cleaned)) + except ValueError: + return 0 + return 0 + + +def _extract_project_id(event: Dict) -> Optional[int]: + candidates = [ + event.get("project_id"), + event.get("projectId"), + event.get("project"), + ] + + if isinstance(event.get("data"), dict): + candidates.append(event["data"].get("project_id")) + candidates.append(event["data"].get("projectId")) + + if isinstance(event.get("value"), dict): + candidates.append(event["value"].get("project_id")) + candidates.append(event["value"].get("projectId")) + + if isinstance(event.get("topics"), list): + for topic in event["topics"]: + if isinstance(topic, dict): + candidates.append(topic.get("project_id")) + candidates.append(topic.get("projectId")) + else: + try: + candidates.append(int(topic)) + except (TypeError, ValueError): + continue + + for candidate in candidates: + try: + project_id = int(candidate) + if project_id >= 0: + return project_id + except (TypeError, ValueError): + continue + return None + + +def _extract_amount(event: Dict) -> int: + candidates = [ + event.get("amount"), + event.get("deposit_amount"), + event.get("value"), + ] + + if isinstance(event.get("data"), dict): + candidates.append(event["data"].get("amount")) + candidates.append(event["data"].get("deposit_amount")) + + if isinstance(event.get("value"), dict): + candidates.append(event["value"].get("amount")) + candidates.append(event["value"].get("deposit_amount")) + + if isinstance(event.get("topics"), list): + for topic in event["topics"]: + if isinstance(topic, dict): + candidates.append(topic.get("amount")) + elif isinstance(topic, (int, float)) and not isinstance(topic, bool): + candidates.append(topic) + + for candidate in candidates: + amount = _coerce_int(candidate) + if amount > 0: + return amount + return 0 + + +def _extract_contributor(event: Dict) -> Optional[str]: + candidates = [ + event.get("contributor"), + event.get("user"), + event.get("owner"), + event.get("admin"), + event.get("caller"), + ] + + if isinstance(event.get("data"), dict): + for key in ("contributor", "user", "owner", "admin", "caller"): + candidates.append(event["data"].get(key)) + + if isinstance(event.get("value"), dict): + for key in ("contributor", "user", "owner", "admin", "caller"): + candidates.append(event["value"].get(key)) + + for candidate in candidates: + if isinstance(candidate, str) and candidate.strip(): + return candidate.strip() + return None + + +def _extract_ledger(event: Dict) -> int: + ledger = event.get("ledger") + if ledger is None: + ledger = event.get("ledger_sequence") + if ledger is None and isinstance(event.get("data"), dict): + ledger = event["data"].get("ledger") + if ledger is None and isinstance(event.get("value"), dict): + ledger = event["value"].get("ledger") + return _coerce_int(ledger) + + +def _build_row(project_id: int, materialized: ProjectMaterializedView) -> Dict: + return { + "project_id": project_id, + "total_contributed": materialized.total_contributed, + "contributor_count": materialized.contributor_count, + "milestone_approved": materialized.milestone_approved, + "contributors": sorted(materialized.contributors), + "last_processed_ledger": materialized.last_processed_ledger, + } + + +def _calculate_raw_totals(events: List[Dict]) -> Dict[int, int]: + totals: Dict[int, int] = {} + for event in events: + event_type = (event.get("event_type") or event.get("type") or "").strip() + if event_type != "DepositEvent": + continue + project_id = _extract_project_id(event) + if project_id is None: + continue + totals[project_id] = totals.get(project_id, 0) + _extract_amount(event) + return totals + + +def refresh_project_materialized_views( + events: List[Dict], + existing_rows: Optional[Dict[int, Dict]] = None, +) -> tuple[List[Dict], Dict]: + project_rows: Dict[int, ProjectMaterializedView] = {} + + for project_id, existing in (existing_rows or {}).items(): + project_rows[project_id] = ProjectMaterializedView( + project_id=project_id, + total_contributed=_coerce_int(existing.get("total_contributed")), + contributor_count=_coerce_int(existing.get("contributor_count")), + milestone_approved=bool(existing.get("milestone_approved")), + contributors=set(existing.get("contributors") or []), + last_processed_ledger=_coerce_int(existing.get("last_processed_ledger")), + ) + + updated_rows = [] + events_sorted = sorted(events, key=lambda item: (_extract_ledger(item), str(item.get("event_type")))) + raw_totals = _calculate_raw_totals(events_sorted) + + for event in events_sorted: + event_type = (event.get("event_type") or event.get("type") or "").strip() + project_id = _extract_project_id(event) + if project_id is None: + continue + + materialized = project_rows.setdefault( + project_id, + ProjectMaterializedView(project_id=project_id), + ) + + ledger = _extract_ledger(event) + if ledger <= materialized.last_processed_ledger: + continue + + if event_type == "ProjectCreatedEvent": + materialized.milestone_approved = False + materialized.last_processed_ledger = ledger + materialized.last_processed_timestamp = event.get("timestamp") + continue + + if event_type == "DepositEvent": + amount = _extract_amount(event) + contributor = _extract_contributor(event) + if amount > 0: + materialized.total_contributed += amount + if contributor: + if contributor not in materialized.contributors: + materialized.contributors.add(contributor) + materialized.contributor_count = len(materialized.contributors) + materialized.last_processed_ledger = ledger + materialized.last_processed_timestamp = event.get("timestamp") + continue + + if event_type == "MilestoneApprovedEvent": + materialized.milestone_approved = True + materialized.last_processed_ledger = ledger + materialized.last_processed_timestamp = event.get("timestamp") + continue + + if event_type == "ContributionRefundedEvent": + amount = _extract_amount(event) + if amount > 0: + materialized.total_contributed = max(materialized.total_contributed - amount, 0) + materialized.last_processed_ledger = ledger + materialized.last_processed_timestamp = event.get("timestamp") + continue + + materialized.last_processed_ledger = ledger + materialized.last_processed_timestamp = event.get("timestamp") + + for project_id, materialized in sorted(project_rows.items()): + row = _build_row(project_id, materialized) + updated_rows.append(row) + + checks = build_data_quality_checks( + rows_by_project={row["project_id"]: row for row in updated_rows}, + raw_totals=raw_totals, + ) + + return updated_rows, checks + + +def build_data_quality_checks(rows_by_project: Dict[int, Dict], raw_totals: Optional[Dict[int, int]] = None) -> Dict: + raw_totals = raw_totals or {} + materialized_totals = { + project_id: row.get("total_contributed", 0) + for project_id, row in rows_by_project.items() + } + + expected_totals = raw_totals or materialized_totals + passed = all( + materialized_totals.get(project_id, 0) == expected_totals.get(project_id, 0) + for project_id in set(materialized_totals) | set(expected_totals) + ) + + return { + "project_total_crosscheck": { + "passed": passed, + "details": { + "project_totals": materialized_totals, + "raw_total": raw_totals, + "materialized_total": materialized_totals, + }, + } + } diff --git a/apps/data-processing/tests/test_project_materialized_views.py b/apps/data-processing/tests/test_project_materialized_views.py new file mode 100644 index 00000000..0d13e7b4 --- /dev/null +++ b/apps/data-processing/tests/test_project_materialized_views.py @@ -0,0 +1,97 @@ +from src.ingestion.project_materialized_views import ( + build_data_quality_checks, + refresh_project_materialized_views, +) + + +SAMPLE_EVENTS = [ + { + "ledger": 100, + "event_type": "ProjectCreatedEvent", + "project_id": 42, + "owner": "GOWNER", + }, + { + "ledger": 101, + "event_type": "DepositEvent", + "project_id": 42, + "contributor": "GCONTRIB_A", + "amount": 100, + }, + { + "ledger": 102, + "event_type": "DepositEvent", + "project_id": 42, + "contributor": "GCONTRIB_B", + "amount": 50, + }, + { + "ledger": 103, + "event_type": "DepositEvent", + "project_id": 42, + "contributor": "GCONTRIB_A", + "amount": 25, + }, + { + "ledger": 104, + "event_type": "MilestoneApprovedEvent", + "project_id": 42, + "admin": "GADMIN", + }, +] + + +def test_refresh_project_materialized_views_tracks_totals_and_contributors() -> None: + rows, checks = refresh_project_materialized_views(SAMPLE_EVENTS, existing_rows={}) + + assert len(rows) == 1 + row = rows[0] + assert row["project_id"] == 42 + assert row["total_contributed"] == 175 + assert row["contributor_count"] == 2 + assert row["milestone_approved"] is True + assert row["last_processed_ledger"] == 104 + assert set(row["contributors"]) == {"GCONTRIB_A", "GCONTRIB_B"} + + assert checks["project_total_crosscheck"]["passed"] is True + assert checks["project_total_crosscheck"]["details"]["project_totals"] == {42: 175} + + +def test_refresh_project_materialized_views_skips_processed_ledger_and_reports_drift() -> None: + existing_rows = { + 42: { + "project_id": 42, + "total_contributed": 175, + "contributor_count": 2, + "milestone_approved": True, + "contributors": {"GCONTRIB_A", "GCONTRIB_B"}, + "last_processed_ledger": 104, + } + } + + duplicate_event = { + "ledger": 104, + "event_type": "DepositEvent", + "project_id": 42, + "contributor": "GCONTRIB_A", + "amount": 25, + } + + rows, checks = refresh_project_materialized_views([duplicate_event], existing_rows=existing_rows) + + assert len(rows) == 1 + assert rows[0]["project_id"] == 42 + assert rows[0]["total_contributed"] == 175 + assert rows[0]["contributor_count"] == 2 + assert rows[0]["last_processed_ledger"] == 104 + assert rows[0]["contributors"] == ["GCONTRIB_A", "GCONTRIB_B"] + assert checks["project_total_crosscheck"]["passed"] is True + + drift_checks = build_data_quality_checks( + rows_by_project={42: {"total_contributed": 100}}, + raw_totals={42: 175}, + ) + + assert drift_checks["project_total_crosscheck"]["passed"] is False + assert drift_checks["project_total_crosscheck"]["details"]["raw_total"] == {42: 175} + assert drift_checks["project_total_crosscheck"]["details"]["materialized_total"] == {42: 100}