Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 96 additions & 0 deletions apps/data-processing/scripts/build_project_materialized_views.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
#!/usr/bin/env python3
"""Build project contribution materialized views from backfilled on-chain events."""

import argparse
import json
import sys
from pathlib import Path

PROJECT_ROOT = Path(__file__).resolve().parents[1]
if str(PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(PROJECT_ROOT))

from dotenv import load_dotenv

from src.db import PostgresService
from src.ingestion.project_materialized_views import refresh_project_materialized_views


def load_events(path: Path) -> list:
with path.open() as handle:
payload = json.load(handle)

if isinstance(payload, dict):
events = payload.get("events")
if isinstance(events, list):
return events
return []
if isinstance(payload, list):
return payload
return []


def main() -> int:
parser = argparse.ArgumentParser(description="Refresh project contribution materialized views")
parser.add_argument("--input-dir", type=str, required=True, help="Directory containing backfilled event JSON files")
parser.add_argument("--verbose", action="store_true")
args = parser.parse_args()

load_dotenv(PROJECT_ROOT / ".env")

db_service = PostgresService()
input_dir = Path(args.input_dir)

total_projects = 0
total_checks = 0

for file_path in sorted(input_dir.glob("*.json")):
events = load_events(file_path)
if not events:
continue

existing_rows = {
row.project_id: {
"project_id": row.project_id,
"total_contributed": row.total_contributed,
"contributor_count": row.contributor_count,
"milestone_approved": bool(row.milestone_approved),
"contributors": row.contributors or [],
"last_processed_ledger": row.last_processed_ledger,
}
for row in db_service.get_project_contribution_rollups()
}

rows, checks = refresh_project_materialized_views(events, existing_rows=existing_rows)

for row in rows:
db_service.upsert_project_contribution_rollup(
project_id=row["project_id"],
total_contributed=row["total_contributed"],
contributor_count=row["contributor_count"],
milestone_approved=bool(row["milestone_approved"]),
contributors=row["contributors"],
last_processed_ledger=row["last_processed_ledger"],
)
total_projects += 1

for check_id, check in checks.items():
db_service.save_analytics_record(
record_type="project_materialized_view_quality",
metric_name=check_id,
value=float(int(check["passed"])),
asset="project_totals",
extra_data=check["details"],
)
total_checks += 1

print({
"projects_materialized": total_projects,
"quality_checks_emitted": total_checks,
})

return 0


if __name__ == "__main__":
raise SystemExit(main())
11 changes: 10 additions & 1 deletion apps/data-processing/src/db/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,23 @@
Database package for analytics data persistence
"""

from .models import Base, Article, SocialPost, AnalyticsRecord, NewsInsight, AssetTrend
from .models import (
Base,
Article,
SocialPost,
AnalyticsRecord,
ProjectContributionMaterializedView,
NewsInsight,
AssetTrend,
)
from .postgres_service import PostgresService

__all__ = [
"Base",
"Article",
"SocialPost",
"AnalyticsRecord",
"ProjectContributionMaterializedView",
"NewsInsight",
"AssetTrend",
"PostgresService",
Expand Down
32 changes: 32 additions & 0 deletions apps/data-processing/src/db/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,38 @@ def __repr__(self):
return f"<AnalyticsRecord(type={self.record_type}, asset={self.asset}, metric={self.metric_name}, value={self.value})>"


class ProjectContributionMaterializedView(Base):
"""
Stores incremental rollups for project totals, contributors, and milestone status.
"""

__tablename__ = "project_contribution_materialized_views"

project_id = Column(Integer, primary_key=True, index=True)
total_contributed = Column(BigInteger, nullable=False, default=0)
contributor_count = Column(Integer, nullable=False, default=0)
milestone_approved = Column(Integer, nullable=False, default=0)
contributors = Column(JSON, nullable=True)
last_processed_ledger = Column(BigInteger, nullable=False, default=0)
updated_at = Column(
DateTime(timezone=True), server_default=func.now(), onupdate=func.now(), nullable=False
)
created_at = Column(
DateTime(timezone=True), server_default=func.now(), nullable=False
)

__table_args__ = (
Index("idx_project_contribution_rollup_project_id", "project_id"),
Index("idx_project_contribution_rollup_ledger", "last_processed_ledger"),
)

def __repr__(self):
return (
f"<ProjectContributionMaterializedView(project_id={self.project_id}, "
f"total_contributed={self.total_contributed}, contributor_count={self.contributor_count})>"
)


class NewsInsight(Base):
"""
Stores sentiment analysis results for news articles (legacy table, kept for backward compatibility)
Expand Down
75 changes: 74 additions & 1 deletion apps/data-processing/src/db/postgres_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,15 @@
from sqlalchemy.orm import sessionmaker, Session
from sqlalchemy.exc import SQLAlchemyError, OperationalError

from .models import Base, Article, SocialPost, AnalyticsRecord, NewsInsight, AssetTrend
from .models import (
Base,
Article,
SocialPost,
AnalyticsRecord,
ProjectContributionMaterializedView,
NewsInsight,
AssetTrend,
)
from src.analytics.ner_service import NERService

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -684,6 +692,71 @@ def save_analytics_records_batch(

return saved_count

def upsert_project_contribution_rollup(
self,
project_id: int,
total_contributed: int,
contributor_count: int,
milestone_approved: bool,
contributors: Optional[List[str]] = None,
last_processed_ledger: Optional[int] = None,
) -> Optional[ProjectContributionMaterializedView]:
"""
Persist incremental project contribution rollups.
"""

def _upsert():
with self.get_session() as session:
existing = session.execute(
select(ProjectContributionMaterializedView).where(
ProjectContributionMaterializedView.project_id == project_id
)
).scalar_one_or_none()

if existing is None:
record = ProjectContributionMaterializedView(
project_id=project_id,
total_contributed=total_contributed,
contributor_count=contributor_count,
milestone_approved=int(bool(milestone_approved)),
contributors=contributors or [],
last_processed_ledger=last_processed_ledger or 0,
)
session.add(record)
session.flush()
logger.debug(f"Saved project contribution rollup for project {project_id}")
return record

existing.total_contributed = total_contributed
existing.contributor_count = contributor_count
existing.milestone_approved = int(bool(milestone_approved))
existing.contributors = contributors or []
if last_processed_ledger is not None:
existing.last_processed_ledger = last_processed_ledger
session.flush()
logger.debug(f"Updated project contribution rollup for project {project_id}")
return existing

try:
return self._retry_operation(_upsert)
except SQLAlchemyError as e:
logger.error(f"Failed to upsert project contribution rollup: {e}")
return None

def get_project_contribution_rollups(self) -> List[ProjectContributionMaterializedView]:
"""Fetch persisted project contribution materialized views."""
try:
with self.get_session() as session:
stmt = select(ProjectContributionMaterializedView).order_by(
ProjectContributionMaterializedView.project_id
)
results = session.execute(stmt).scalars().all()
logger.debug(f"Retrieved {len(results)} project contribution rollups")
return results
except SQLAlchemyError as e:
logger.error(f"Failed to retrieve project contribution rollups: {e}")
return []

def get_analytics_records(
self,
record_type: Optional[str] = None,
Expand Down
Loading