diff --git a/apps/data-processing/alembic/versions/003_add_onchain_entity_links.py b/apps/data-processing/alembic/versions/003_add_onchain_entity_links.py new file mode 100644 index 00000000..52fc752c --- /dev/null +++ b/apps/data-processing/alembic/versions/003_add_onchain_entity_links.py @@ -0,0 +1,123 @@ +"""Add on-chain entity links for news articles + +Revision ID: 003 +Revises: 002 +Create Date: 2026-06-01 23:10:00.000000 + +""" + +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = "003" +down_revision: Union[str, None] = "002" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.add_column( + "articles", + sa.Column("onchain_entity_links", sa.JSON(), nullable=True), + ) + + op.create_table( + "article_onchain_entity_links", + sa.Column("id", sa.Integer(), autoincrement=True, nullable=False), + sa.Column("article_id", sa.String(length=255), nullable=False), + sa.Column("stable_entity_id", sa.String(length=255), nullable=False), + sa.Column("entity_type", sa.String(length=50), nullable=False), + sa.Column("display_name", sa.String(length=255), nullable=False), + sa.Column("matched_text", sa.String(length=255), nullable=False), + sa.Column("confidence", sa.Float(), nullable=False), + sa.Column("source", sa.String(length=100), nullable=False), + sa.Column("asset_code", sa.String(length=20), nullable=True), + sa.Column("project_id", sa.BigInteger(), nullable=True), + sa.Column("contract_id", sa.String(length=255), nullable=True), + sa.Column( + "created_at", + sa.DateTime(timezone=True), + server_default=sa.func.now(), + nullable=False, + ), + sa.Column( + "updated_at", + sa.DateTime(timezone=True), + server_default=sa.func.now(), + nullable=False, + ), + sa.PrimaryKeyConstraint("id"), + ) + op.create_index( + "ix_article_onchain_entity_links_article_id", + "article_onchain_entity_links", + ["article_id"], + ) + op.create_index( + "ix_article_onchain_entity_links_asset_code", + "article_onchain_entity_links", + ["asset_code"], + ) + op.create_index( + "ix_article_onchain_entity_links_contract_id", + "article_onchain_entity_links", + ["contract_id"], + ) + op.create_index( + "ix_article_onchain_entity_links_entity_type", + "article_onchain_entity_links", + ["entity_type"], + ) + op.create_index( + "ix_article_onchain_entity_links_project_id", + "article_onchain_entity_links", + ["project_id"], + ) + op.create_index( + "ix_article_onchain_entity_links_stable_entity_id", + "article_onchain_entity_links", + ["stable_entity_id"], + ) + op.create_index( + "ux_article_onchain_links_article_entity", + "article_onchain_entity_links", + ["article_id", "stable_entity_id"], + unique=True, + ) + + +def downgrade() -> None: + op.drop_index( + "ux_article_onchain_links_article_entity", + table_name="article_onchain_entity_links", + ) + op.drop_index( + "ix_article_onchain_entity_links_stable_entity_id", + table_name="article_onchain_entity_links", + ) + op.drop_index( + "ix_article_onchain_entity_links_project_id", + table_name="article_onchain_entity_links", + ) + op.drop_index( + "ix_article_onchain_entity_links_entity_type", + table_name="article_onchain_entity_links", + ) + op.drop_index( + "ix_article_onchain_entity_links_contract_id", + table_name="article_onchain_entity_links", + ) + op.drop_index( + "ix_article_onchain_entity_links_asset_code", + table_name="article_onchain_entity_links", + ) + op.drop_index( + "ix_article_onchain_entity_links_article_id", + table_name="article_onchain_entity_links", + ) + op.drop_table("article_onchain_entity_links") + op.drop_column("articles", "onchain_entity_links") diff --git a/apps/data-processing/src/analytics/__init__.py b/apps/data-processing/src/analytics/__init__.py index 1a4f5060..69927aa0 100644 --- a/apps/data-processing/src/analytics/__init__.py +++ b/apps/data-processing/src/analytics/__init__.py @@ -1,11 +1,4 @@ -""" -Analytics module for market analysis and trend detection. -""" - -from .market_analyzer import MarketAnalyzer, Trend, MarketData, get_explanation -from .forecaster import SentimentForecaster, ForecastResult -from .correlation_engine import CorrelationEngine, CorrelationResult, DataPoint -from .ner_service import NERService +"""Analytics module for market analysis and trend detection.""" __all__ = [ "MarketAnalyzer", @@ -19,3 +12,41 @@ "DataPoint", "NERService", ] + + +def __getattr__(name: str): + """Lazy-load analytics exports so lightweight NLP imports stay cheap.""" + if name in {"MarketAnalyzer", "Trend", "MarketData", "get_explanation"}: + from .market_analyzer import MarketAnalyzer, MarketData, Trend, get_explanation + + values = { + "MarketAnalyzer": MarketAnalyzer, + "Trend": Trend, + "MarketData": MarketData, + "get_explanation": get_explanation, + } + return values[name] + + if name in {"SentimentForecaster", "ForecastResult"}: + from .forecaster import ForecastResult, SentimentForecaster + + return { + "SentimentForecaster": SentimentForecaster, + "ForecastResult": ForecastResult, + }[name] + + if name in {"CorrelationEngine", "CorrelationResult", "DataPoint"}: + from .correlation_engine import CorrelationEngine, CorrelationResult, DataPoint + + return { + "CorrelationEngine": CorrelationEngine, + "CorrelationResult": CorrelationResult, + "DataPoint": DataPoint, + }[name] + + if name == "NERService": + from .ner_service import NERService + + return NERService + + raise AttributeError(f"module {__name__!r} has no attribute {name!r}") diff --git a/apps/data-processing/src/analytics/ner_service.py b/apps/data-processing/src/analytics/ner_service.py index bb94d06c..b6153e41 100644 --- a/apps/data-processing/src/analytics/ner_service.py +++ b/apps/data-processing/src/analytics/ner_service.py @@ -10,10 +10,12 @@ import logging import re from functools import lru_cache -from typing import Dict, List, Optional +from typing import Any, Dict, List, Optional -import spacy -from spacy.language import Language +try: + import spacy +except ImportError: # pragma: no cover - exercised in minimal test envs + spacy = None from .keywords import CRYPTO_PROJECT_MAP, KNOWN_TICKERS @@ -28,6 +30,7 @@ class NERService: r"\b([A-Z][a-zA-Z]+(?:\s+[A-Z][a-zA-Z]+)+)\b" ) _TICKER_PATTERN = re.compile(r"(?:\$)?\b([A-Z]{2,6})\b") + _PERSON_PREFIX_EXCLUSIONS = {"The", "This", "That", "New"} def __init__(self) -> None: self._canonical_names = self._build_canonical_name_map() @@ -48,8 +51,14 @@ def _build_canonical_name_map(self) -> Dict[str, str]: return canonical_names - def _initialize_pipeline(self) -> Language: - nlp: Optional[Language] = None + def _initialize_pipeline(self) -> Optional[Any]: + if spacy is None: + logger.warning( + "spaCy is not installed; using regex-only entity extraction fallback" + ) + return None + + nlp: Optional[Any] = None for model_name in self._MODEL_CANDIDATES: try: @@ -62,7 +71,8 @@ def _initialize_pipeline(self) -> Language: if nlp is None: nlp = spacy.blank("en") logger.warning( - "spaCy pretrained model not found; using blank English pipeline with custom entity rules" + "spaCy pretrained model not found; using blank English " + "pipeline with custom entity rules" ) if "entity_ruler" in nlp.pipe_names: @@ -119,23 +129,36 @@ def extract_entities(self, text: str) -> List[str]: text = text[:20000] candidates: List[str] = [] - doc = self._nlp(text) - - for ent in doc.ents: - if ent.label_ in { - "PERSON", - "ORG", - "PRODUCT", - "NORP", - "GPE", - "EVENT", - "PROJECT", - "ASSET", - }: - candidates.append(ent.text) + doc = self._nlp(text) if self._nlp is not None else None + + if doc is not None: + for ent in doc.ents: + if ent.label_ in { + "PERSON", + "ORG", + "PRODUCT", + "NORP", + "GPE", + "EVENT", + "PROJECT", + "ASSET", + }: + candidates.append(ent.text) + + for alias in sorted(self._canonical_names, key=len, reverse=True): + if len(alias) < 3: + continue + pattern = r"(? List[str]: """Extract entities from combined article fields.""" - chunks = [value.strip() for value in [title or "", summary or "", content or ""] if value and value.strip()] + chunks = [ + value.strip() + for value in [title or "", summary or "", content or ""] + if value and value.strip() + ] if not chunks: return [] return self.extract_entities("\n".join(chunks)) diff --git a/apps/data-processing/src/analytics/onchain_entity_linker.py b/apps/data-processing/src/analytics/onchain_entity_linker.py new file mode 100644 index 00000000..59a7ee2d --- /dev/null +++ b/apps/data-processing/src/analytics/onchain_entity_linker.py @@ -0,0 +1,186 @@ +"""Link article text to on-chain project and asset entities.""" + +from __future__ import annotations + +import re +from dataclasses import dataclass +from typing import Any, Dict, Iterable, List, Optional, Sequence, Set + +from .keywords import TICKER_TO_PROJECT + + +@dataclass(frozen=True) +class OnchainEntityCandidate: + """A project or asset that can be linked from article text.""" + + stable_id: str + entity_type: str + display_name: str + aliases: Sequence[str] + asset_code: Optional[str] = None + project_id: Optional[int] = None + contract_id: Optional[str] = None + + +@dataclass(frozen=True) +class OnchainEntityLink: + """A stable article-to-entity link produced by the linker.""" + + stable_id: str + entity_type: str + display_name: str + matched_text: str + confidence: float + source: str + asset_code: Optional[str] = None + project_id: Optional[int] = None + contract_id: Optional[str] = None + + def to_dict(self) -> Dict[str, Any]: + """Serialize for DB JSON fields and API responses.""" + return { + "stable_id": self.stable_id, + "entity_type": self.entity_type, + "display_name": self.display_name, + "matched_text": self.matched_text, + "confidence": self.confidence, + "source": self.source, + "asset_code": self.asset_code, + "project_id": self.project_id, + "contract_id": self.contract_id, + } + + +class OnchainEntityLinker: + """Deterministic linker for testnet projects/assets mentioned in news.""" + + DEFAULT_ASSETS: Sequence[OnchainEntityCandidate] = tuple( + OnchainEntityCandidate( + stable_id=f"asset:{asset_code}", + entity_type="asset", + display_name=project_names[0], + aliases=tuple({asset_code, *project_names}), + asset_code=asset_code, + ) + for asset_code, project_names in sorted(TICKER_TO_PROJECT.items()) + ) + + def __init__( + self, + candidates: Optional[Sequence[OnchainEntityCandidate]] = None, + ) -> None: + self.candidates = self._dedupe_candidates( + list(candidates or []) + list(self.DEFAULT_ASSETS) + ) + + def link_text( + self, + text: str, + detected_entities: Optional[Sequence[str]] = None, + ) -> List[OnchainEntityLink]: + """Return deterministic links found in text and existing NER entities.""" + if not text and not detected_entities: + return [] + + haystack = "\n".join( + value for value in [text or "", " ".join(detected_entities or [])] if value + ) + links: Dict[str, OnchainEntityLink] = {} + + for candidate in self.candidates: + match = self._first_alias_match(haystack, candidate.aliases) + if not match: + continue + + confidence = 0.95 if match.lower() == candidate.display_name.lower() else 0.85 + if candidate.entity_type == "asset" and match.upper() == candidate.asset_code: + confidence = 0.9 + + links[candidate.stable_id] = OnchainEntityLink( + stable_id=candidate.stable_id, + entity_type=candidate.entity_type, + display_name=candidate.display_name, + matched_text=match, + confidence=confidence, + source="onchain_entity_linker_v1", + asset_code=candidate.asset_code, + project_id=candidate.project_id, + contract_id=candidate.contract_id, + ) + + return sorted(links.values(), key=lambda link: link.stable_id) + + def link_article(self, article: Dict[str, Any]) -> List[OnchainEntityLink]: + """Link an article dictionary using title, summary, content, and tags.""" + parts = [ + article.get("title"), + article.get("summary"), + article.get("content"), + " ".join(article.get("keywords") or []), + " ".join(article.get("categories") or []), + ] + text = "\n".join(str(part) for part in parts if part) + return self.link_text(text, article.get("detected_entities") or []) + + def evaluate_precision( + self, + labeled_articles: Iterable[Dict[str, Any]], + ) -> Dict[str, Any]: + """Measure precision against a small labeled set of expected stable IDs.""" + true_positive = 0 + predicted = 0 + per_article: List[Dict[str, Any]] = [] + + for item in labeled_articles: + expected: Set[str] = set(item.get("expected_stable_ids") or []) + links = self.link_article(item) + actual = {link.stable_id for link in links} + matches = expected & actual + + true_positive += len(matches) + predicted += len(actual) + per_article.append( + { + "article_id": item.get("id"), + "expected_stable_ids": sorted(expected), + "predicted_stable_ids": sorted(actual), + "true_positive": len(matches), + "false_positive": len(actual - expected), + } + ) + + precision = true_positive / predicted if predicted else 0.0 + return { + "precision": precision, + "true_positive": true_positive, + "predicted": predicted, + "article_count": len(per_article), + "per_article": per_article, + } + + def _first_alias_match( + self, + text: str, + aliases: Sequence[str], + ) -> Optional[str]: + for alias in sorted(set(aliases), key=len, reverse=True): + if not alias or len(alias.strip()) < 2: + continue + pattern = r"(? List[OnchainEntityCandidate]: + seen: Set[str] = set() + deduped: List[OnchainEntityCandidate] = [] + for candidate in candidates: + if candidate.stable_id in seen: + continue + seen.add(candidate.stable_id) + deduped.append(candidate) + return deduped diff --git a/apps/data-processing/src/api/server.py b/apps/data-processing/src/api/server.py index e2bcbb07..5dc7df5b 100644 --- a/apps/data-processing/src/api/server.py +++ b/apps/data-processing/src/api/server.py @@ -153,6 +153,7 @@ class NewsArticleResponse(BaseModel): categories: List[str] = [] keywords: List[str] = [] detected_entities: List[str] = [] + onchain_entity_links: List[Dict[str, Any]] = [] sentiment_score: Optional[float] = None # Raw compound score stored in DB sentiment_label: Optional[str] = None # positive / negative / neutral indicator: Optional[SentimentIndicatorResponse] = None # Visual colour indicator @@ -252,6 +253,7 @@ def _build_indicator( categories=article.categories or [], keywords=article.keywords or [], detected_entities=article.detected_entities or [], + onchain_entity_links=article.onchain_entity_links or [], sentiment_score=article.sentiment_score, sentiment_label=article.sentiment_label, indicator=_build_indicator(article.sentiment_score), diff --git a/apps/data-processing/src/db/__init__.py b/apps/data-processing/src/db/__init__.py index b9180e8a..af7f30ab 100644 --- a/apps/data-processing/src/db/__init__.py +++ b/apps/data-processing/src/db/__init__.py @@ -5,6 +5,7 @@ from .models import ( Base, Article, + ArticleOnchainEntityLink, SocialPost, AnalyticsRecord, ContractEvent, @@ -19,6 +20,7 @@ __all__ = [ "Base", "Article", + "ArticleOnchainEntityLink", "SocialPost", "AnalyticsRecord", "ContractEvent", diff --git a/apps/data-processing/src/db/models.py b/apps/data-processing/src/db/models.py index 61e2e3fc..9f700f61 100644 --- a/apps/data-processing/src/db/models.py +++ b/apps/data-processing/src/db/models.py @@ -41,6 +41,7 @@ class Article(Base): # Keywords and metadata keywords = Column(JSON, nullable=True) # Array of keywords detected_entities = Column(JSON, nullable=True) # NER entities detected in article text + onchain_entity_links = Column(JSON, nullable=True) # Stable project/asset links language = Column(String(10), nullable=True) # Timestamps @@ -68,6 +69,42 @@ def __repr__(self): return f"" +class ArticleOnchainEntityLink(Base): + """ + Normalized article-to-on-chain entity links for backend consumption. + """ + + __tablename__ = "article_onchain_entity_links" + + id = Column(Integer, primary_key=True, autoincrement=True) + article_id = Column(String(255), nullable=False, index=True) + stable_entity_id = Column(String(255), nullable=False, index=True) + entity_type = Column(String(50), nullable=False, index=True) + display_name = Column(String(255), nullable=False) + matched_text = Column(String(255), nullable=False) + confidence = Column(Float, nullable=False) + source = Column(String(100), nullable=False) + asset_code = Column(String(20), nullable=True, index=True) + project_id = Column(BigInteger, nullable=True, index=True) + contract_id = Column(String(255), nullable=True, index=True) + created_at = Column( + DateTime(timezone=True), server_default=func.now(), nullable=False + ) + updated_at = Column( + DateTime(timezone=True), server_default=func.now(), onupdate=func.now(), nullable=False + ) + + __table_args__ = ( + Index( + "ux_article_onchain_links_article_entity", + "article_id", + "stable_entity_id", + unique=True, + ), + Index("idx_article_onchain_links_type", "entity_type"), + ) + + class SocialPost(Base): """ Stores social media posts (Twitter, Reddit, etc.) diff --git a/apps/data-processing/src/db/postgres_service.py b/apps/data-processing/src/db/postgres_service.py index 8c0993bd..badcf6e7 100644 --- a/apps/data-processing/src/db/postgres_service.py +++ b/apps/data-processing/src/db/postgres_service.py @@ -9,13 +9,14 @@ from datetime import datetime, timedelta from contextlib import contextmanager -from sqlalchemy import create_engine, select, and_, desc, func +from sqlalchemy import create_engine, select, and_, desc, func, delete from sqlalchemy.orm import sessionmaker, Session from sqlalchemy.exc import SQLAlchemyError, OperationalError from .models import ( Base, Article, + ArticleOnchainEntityLink, SocialPost, AnalyticsRecord, ContractEvent, @@ -26,6 +27,11 @@ AssetTrend, ) from src.analytics.ner_service import NERService +from src.analytics.onchain_entity_linker import ( + OnchainEntityCandidate, + OnchainEntityLink, + OnchainEntityLinker, +) logger = logging.getLogger(__name__) @@ -61,6 +67,7 @@ def __init__(self, database_url: Optional[str] = None): bind=self.engine, ) self.ner_service = NERService() + self.onchain_linker = OnchainEntityLinker() logger.info("PostgreSQL service initialized successfully") except Exception as e: logger.error(f"Failed to initialize PostgreSQL service: {e}") @@ -80,6 +87,102 @@ def _ensure_detected_entities(self, article_data: Dict[str, Any]) -> Dict[str, A ) return normalized + def _project_candidates_from_session( + self, + session: Session, + ) -> List[OnchainEntityCandidate]: + """Build project candidates from materialized on-chain project views.""" + projects = session.execute(select(ProjectView)).scalars().all() + candidates: List[OnchainEntityCandidate] = [] + + for project in projects: + extra_data = project.extra_data or {} + aliases = { + str(project.project_id), + f"project {project.project_id}", + } + + for key in ("name", "title", "slug", "symbol", "asset_code"): + value = extra_data.get(key) + if value: + aliases.add(str(value)) + + for value in extra_data.get("aliases") or []: + if value: + aliases.add(str(value)) + + display_name = ( + extra_data.get("name") + or extra_data.get("title") + or f"Project {project.project_id}" + ) + asset_code = extra_data.get("asset_code") or extra_data.get("symbol") + + candidates.append( + OnchainEntityCandidate( + stable_id=f"project:{project.project_id}", + entity_type="project", + display_name=str(display_name), + aliases=tuple(sorted(aliases)), + asset_code=str(asset_code) if asset_code else None, + project_id=int(project.project_id), + contract_id=project.contract_id, + ) + ) + + if project.contract_id: + candidates.append( + OnchainEntityCandidate( + stable_id=f"contract:{project.contract_id}", + entity_type="contract", + display_name=str(display_name), + aliases=(project.contract_id,), + project_id=int(project.project_id), + contract_id=project.contract_id, + ) + ) + + return candidates + + def _link_article_onchain_entities( + self, + session: Session, + article_data: Dict[str, Any], + ) -> List[OnchainEntityLink]: + """Link article content to default assets and current project views.""" + linker = OnchainEntityLinker(self._project_candidates_from_session(session)) + return linker.link_article(article_data) + + def _sync_article_onchain_links( + self, + session: Session, + article: Article, + links: List[OnchainEntityLink], + ) -> None: + """Replace normalized link rows for an article with the latest links.""" + article.onchain_entity_links = [link.to_dict() for link in links] + session.execute( + delete(ArticleOnchainEntityLink).where( + ArticleOnchainEntityLink.article_id == article.article_id + ) + ) + + for link in links: + session.add( + ArticleOnchainEntityLink( + article_id=article.article_id, + stable_entity_id=link.stable_id, + entity_type=link.entity_type, + display_name=link.display_name, + matched_text=link.matched_text, + confidence=link.confidence, + source=link.source, + asset_code=link.asset_code, + project_id=link.project_id, + contract_id=link.contract_id, + ) + ) + @contextmanager def get_session(self): """ @@ -185,6 +288,7 @@ def _save(): ).scalar_one_or_none() if existing: + links = self._link_article_onchain_entities(session, article_data) # Update existing article existing.title = article_data.get("title", existing.title) existing.content = article_data.get("content", existing.content) @@ -192,10 +296,17 @@ def _save(): existing.source = article_data.get("source", existing.source) existing.url = article_data.get("url", existing.url) existing.asset_codes = article_data.get("asset_codes", existing.asset_codes) - existing.primary_asset = article_data.get("primary_asset", existing.primary_asset) + existing.primary_asset = article_data.get( + "primary_asset", + existing.primary_asset, + ) existing.categories = article_data.get("categories", existing.categories) existing.keywords = article_data.get("keywords", existing.keywords) - existing.detected_entities = article_data.get("detected_entities", existing.detected_entities) + existing.detected_entities = article_data.get( + "detected_entities", + existing.detected_entities, + ) + self._sync_article_onchain_links(session, existing, links) existing.language = article_data.get("language", existing.language) existing.published_at = article_data.get("published_at", existing.published_at) existing.fetched_at = article_data.get("fetched_at", existing.fetched_at) @@ -212,6 +323,7 @@ def _save(): logger.debug(f"Updated article: {existing.article_id}") return existing else: + links = self._link_article_onchain_entities(session, article_data) # Create new article article = Article( article_id=article_data.get("id"), @@ -225,6 +337,7 @@ def _save(): categories=article_data.get("categories"), keywords=article_data.get("keywords"), detected_entities=article_data.get("detected_entities"), + onchain_entity_links=[link.to_dict() for link in links], language=article_data.get("language"), published_at=article_data.get("published_at"), fetched_at=article_data.get("fetched_at"), @@ -240,6 +353,7 @@ def _save(): session.add(article) session.flush() + self._sync_article_onchain_links(session, article, links) logger.debug(f"Saved article: {article.article_id}") return article @@ -277,6 +391,7 @@ def save_articles_batch( ).scalar_one_or_none() if existing: + links = self._link_article_onchain_entities(session, article_data) # Update existing article existing.title = article_data.get("title", existing.title) existing.content = article_data.get("content", existing.content) @@ -284,12 +399,22 @@ def save_articles_batch( existing.source = article_data.get("source", existing.source) existing.url = article_data.get("url", existing.url) existing.asset_codes = article_data.get("asset_codes", existing.asset_codes) - existing.primary_asset = article_data.get("primary_asset", existing.primary_asset) + existing.primary_asset = article_data.get( + "primary_asset", + existing.primary_asset, + ) existing.categories = article_data.get("categories", existing.categories) existing.keywords = article_data.get("keywords", existing.keywords) - existing.detected_entities = article_data.get("detected_entities", existing.detected_entities) + existing.detected_entities = article_data.get( + "detected_entities", + existing.detected_entities, + ) + self._sync_article_onchain_links(session, existing, links) existing.language = article_data.get("language", existing.language) - existing.published_at = article_data.get("published_at", existing.published_at) + existing.published_at = article_data.get( + "published_at", + existing.published_at, + ) existing.fetched_at = article_data.get("fetched_at", existing.fetched_at) if sentiment_result: @@ -300,6 +425,7 @@ def save_articles_batch( existing.sentiment_label = sentiment_result.get("sentiment_label") existing.analyzed_at = datetime.utcnow() else: + links = self._link_article_onchain_entities(session, article_data) # Create new article article = Article( article_id=article_data.get("id"), @@ -313,6 +439,7 @@ def save_articles_batch( categories=article_data.get("categories"), keywords=article_data.get("keywords"), detected_entities=article_data.get("detected_entities"), + onchain_entity_links=[link.to_dict() for link in links], language=article_data.get("language"), published_at=article_data.get("published_at"), fetched_at=article_data.get("fetched_at"), @@ -327,6 +454,8 @@ def save_articles_batch( article.analyzed_at = datetime.utcnow() session.add(article) + session.flush() + self._sync_article_onchain_links(session, article, links) saved_count += 1 @@ -385,6 +514,30 @@ def get_recent_articles( logger.error(f"Failed to retrieve articles: {e}") return [] + def get_article_onchain_links( + self, + article_id: Optional[str] = None, + stable_entity_id: Optional[str] = None, + entity_type: Optional[str] = None, + limit: int = 200, + ) -> List[ArticleOnchainEntityLink]: + """Return normalized article/entity links for backend consumers.""" + try: + with self.get_session() as session: + stmt = select(ArticleOnchainEntityLink).limit(limit) + if article_id: + stmt = stmt.where(ArticleOnchainEntityLink.article_id == article_id) + if stable_entity_id: + stmt = stmt.where( + ArticleOnchainEntityLink.stable_entity_id == stable_entity_id + ) + if entity_type: + stmt = stmt.where(ArticleOnchainEntityLink.entity_type == entity_type) + return session.execute(stmt).scalars().all() + except SQLAlchemyError as e: + logger.error(f"Failed to retrieve article on-chain links: {e}") + return [] + # Social Post Methods def save_social_post( diff --git a/apps/data-processing/tests/test_onchain_entity_linker.py b/apps/data-processing/tests/test_onchain_entity_linker.py new file mode 100644 index 00000000..e301bef6 --- /dev/null +++ b/apps/data-processing/tests/test_onchain_entity_linker.py @@ -0,0 +1,65 @@ +"""Tests for deterministic on-chain entity linking.""" + +from src.analytics.onchain_entity_linker import ( + OnchainEntityCandidate, + OnchainEntityLinker, +) + + +def test_links_assets_with_stable_ids() -> None: + linker = OnchainEntityLinker() + + links = linker.link_text("Stellar developers shipped new XLM rails.") + stable_ids = {link.stable_id for link in links} + + assert "asset:XLM" in stable_ids + assert all(link.source == "onchain_entity_linker_v1" for link in links) + + +def test_links_project_catalog_candidate() -> None: + linker = OnchainEntityLinker( + [ + OnchainEntityCandidate( + stable_id="project:42", + entity_type="project", + display_name="Solar Grants", + aliases=("Solar Grants", "solar-grants"), + project_id=42, + contract_id="CBQTESTPROJECT", + ) + ] + ) + + links = linker.link_text("Solar Grants crossed its testnet funding target.") + + assert links[0].stable_id == "project:42" + assert links[0].project_id == 42 + assert links[0].contract_id == "CBQTESTPROJECT" + + +def test_measures_precision_against_labeled_articles() -> None: + linker = OnchainEntityLinker( + [ + OnchainEntityCandidate( + stable_id="project:7", + entity_type="project", + display_name="Lumen Launch", + aliases=("Lumen Launch",), + project_id=7, + ) + ] + ) + + result = linker.evaluate_precision( + [ + { + "id": "article-1", + "title": "Lumen Launch adds Stellar rewards", + "expected_stable_ids": ["project:7", "asset:XLM"], + } + ] + ) + + assert result["precision"] == 1.0 + assert result["true_positive"] == 2 + assert result["predicted"] == 2 diff --git a/apps/data-processing/tests/test_onchain_entity_persistence.py b/apps/data-processing/tests/test_onchain_entity_persistence.py new file mode 100644 index 00000000..96ef1c89 --- /dev/null +++ b/apps/data-processing/tests/test_onchain_entity_persistence.py @@ -0,0 +1,60 @@ +"""Tests for persisted article on-chain entity links.""" + +from datetime import datetime + +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker + +from src.analytics.ner_service import NERService +from src.db.models import Base +from src.db.postgres_service import PostgresService + + +def build_sqlite_service() -> PostgresService: + engine = create_engine("sqlite:///:memory:") + Base.metadata.create_all(bind=engine) + + service = PostgresService.__new__(PostgresService) + service.database_url = "sqlite:///:memory:" + service.engine = engine + service.SessionLocal = sessionmaker( + autocommit=False, + autoflush=False, + expire_on_commit=False, + bind=engine, + ) + service.ner_service = NERService() + return service + + +def test_save_article_materializes_onchain_links() -> None: + service = build_sqlite_service() + service.save_project_view( + project_id=101, + contract_id="CBQPROJECT101", + status="active", + extra_data={ + "name": "Lumen Launch", + "aliases": ["LumenLaunch"], + "asset_code": "XLM", + }, + ) + + article = service.save_article( + { + "id": "article-link-1", + "title": "Lumen Launch expands on Stellar", + "content": "The project accepts XLM contributions on testnet.", + "source": "test-source", + "published_at": datetime.utcnow(), + } + ) + + assert article is not None + link_ids = {link["stable_id"] for link in article.onchain_entity_links} + assert "project:101" in link_ids + assert "asset:XLM" in link_ids + + normalized = service.get_article_onchain_links(article_id="article-link-1") + normalized_ids = {link.stable_entity_id for link in normalized} + assert {"project:101", "asset:XLM"}.issubset(normalized_ids)