Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 123 additions & 0 deletions apps/data-processing/alembic/versions/003_add_onchain_entity_links.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
"""Add on-chain entity links for news articles

Revision ID: 003
Revises: 002
Create Date: 2026-06-01 23:10:00.000000

"""

from typing import Sequence, Union

from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision: str = "003"
down_revision: Union[str, None] = "002"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
op.add_column(
"articles",
sa.Column("onchain_entity_links", sa.JSON(), nullable=True),
)

op.create_table(
"article_onchain_entity_links",
sa.Column("id", sa.Integer(), autoincrement=True, nullable=False),
sa.Column("article_id", sa.String(length=255), nullable=False),
sa.Column("stable_entity_id", sa.String(length=255), nullable=False),
sa.Column("entity_type", sa.String(length=50), nullable=False),
sa.Column("display_name", sa.String(length=255), nullable=False),
sa.Column("matched_text", sa.String(length=255), nullable=False),
sa.Column("confidence", sa.Float(), nullable=False),
sa.Column("source", sa.String(length=100), nullable=False),
sa.Column("asset_code", sa.String(length=20), nullable=True),
sa.Column("project_id", sa.BigInteger(), nullable=True),
sa.Column("contract_id", sa.String(length=255), nullable=True),
sa.Column(
"created_at",
sa.DateTime(timezone=True),
server_default=sa.func.now(),
nullable=False,
),
sa.Column(
"updated_at",
sa.DateTime(timezone=True),
server_default=sa.func.now(),
nullable=False,
),
sa.PrimaryKeyConstraint("id"),
)
op.create_index(
"ix_article_onchain_entity_links_article_id",
"article_onchain_entity_links",
["article_id"],
)
op.create_index(
"ix_article_onchain_entity_links_asset_code",
"article_onchain_entity_links",
["asset_code"],
)
op.create_index(
"ix_article_onchain_entity_links_contract_id",
"article_onchain_entity_links",
["contract_id"],
)
op.create_index(
"ix_article_onchain_entity_links_entity_type",
"article_onchain_entity_links",
["entity_type"],
)
op.create_index(
"ix_article_onchain_entity_links_project_id",
"article_onchain_entity_links",
["project_id"],
)
op.create_index(
"ix_article_onchain_entity_links_stable_entity_id",
"article_onchain_entity_links",
["stable_entity_id"],
)
op.create_index(
"ux_article_onchain_links_article_entity",
"article_onchain_entity_links",
["article_id", "stable_entity_id"],
unique=True,
)


def downgrade() -> None:
op.drop_index(
"ux_article_onchain_links_article_entity",
table_name="article_onchain_entity_links",
)
op.drop_index(
"ix_article_onchain_entity_links_stable_entity_id",
table_name="article_onchain_entity_links",
)
op.drop_index(
"ix_article_onchain_entity_links_project_id",
table_name="article_onchain_entity_links",
)
op.drop_index(
"ix_article_onchain_entity_links_entity_type",
table_name="article_onchain_entity_links",
)
op.drop_index(
"ix_article_onchain_entity_links_contract_id",
table_name="article_onchain_entity_links",
)
op.drop_index(
"ix_article_onchain_entity_links_asset_code",
table_name="article_onchain_entity_links",
)
op.drop_index(
"ix_article_onchain_entity_links_article_id",
table_name="article_onchain_entity_links",
)
op.drop_table("article_onchain_entity_links")
op.drop_column("articles", "onchain_entity_links")
47 changes: 39 additions & 8 deletions apps/data-processing/src/analytics/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,4 @@
"""
Analytics module for market analysis and trend detection.
"""

from .market_analyzer import MarketAnalyzer, Trend, MarketData, get_explanation
from .forecaster import SentimentForecaster, ForecastResult
from .correlation_engine import CorrelationEngine, CorrelationResult, DataPoint
from .ner_service import NERService
"""Analytics module for market analysis and trend detection."""

__all__ = [
"MarketAnalyzer",
Expand All @@ -19,3 +12,41 @@
"DataPoint",
"NERService",
]


def __getattr__(name: str):
"""Lazy-load analytics exports so lightweight NLP imports stay cheap."""
if name in {"MarketAnalyzer", "Trend", "MarketData", "get_explanation"}:
from .market_analyzer import MarketAnalyzer, MarketData, Trend, get_explanation

values = {
"MarketAnalyzer": MarketAnalyzer,
"Trend": Trend,
"MarketData": MarketData,
"get_explanation": get_explanation,
}
return values[name]

if name in {"SentimentForecaster", "ForecastResult"}:
from .forecaster import ForecastResult, SentimentForecaster

return {
"SentimentForecaster": SentimentForecaster,
"ForecastResult": ForecastResult,
}[name]

if name in {"CorrelationEngine", "CorrelationResult", "DataPoint"}:
from .correlation_engine import CorrelationEngine, CorrelationResult, DataPoint

return {
"CorrelationEngine": CorrelationEngine,
"CorrelationResult": CorrelationResult,
"DataPoint": DataPoint,
}[name]

if name == "NERService":
from .ner_service import NERService

return NERService

raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
69 changes: 48 additions & 21 deletions apps/data-processing/src/analytics/ner_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@
import logging
import re
from functools import lru_cache
from typing import Dict, List, Optional
from typing import Any, Dict, List, Optional

import spacy
from spacy.language import Language
try:
import spacy
except ImportError: # pragma: no cover - exercised in minimal test envs
spacy = None

from .keywords import CRYPTO_PROJECT_MAP, KNOWN_TICKERS

Expand All @@ -28,6 +30,7 @@ class NERService:
r"\b([A-Z][a-zA-Z]+(?:\s+[A-Z][a-zA-Z]+)+)\b"
)
_TICKER_PATTERN = re.compile(r"(?:\$)?\b([A-Z]{2,6})\b")
_PERSON_PREFIX_EXCLUSIONS = {"The", "This", "That", "New"}

def __init__(self) -> None:
self._canonical_names = self._build_canonical_name_map()
Expand All @@ -48,8 +51,14 @@ def _build_canonical_name_map(self) -> Dict[str, str]:

return canonical_names

def _initialize_pipeline(self) -> Language:
nlp: Optional[Language] = None
def _initialize_pipeline(self) -> Optional[Any]:
if spacy is None:
logger.warning(
"spaCy is not installed; using regex-only entity extraction fallback"
)
return None

nlp: Optional[Any] = None

for model_name in self._MODEL_CANDIDATES:
try:
Expand All @@ -62,7 +71,8 @@ def _initialize_pipeline(self) -> Language:
if nlp is None:
nlp = spacy.blank("en")
logger.warning(
"spaCy pretrained model not found; using blank English pipeline with custom entity rules"
"spaCy pretrained model not found; using blank English "
"pipeline with custom entity rules"
)

if "entity_ruler" in nlp.pipe_names:
Expand Down Expand Up @@ -119,23 +129,36 @@ def extract_entities(self, text: str) -> List[str]:
text = text[:20000]

candidates: List[str] = []
doc = self._nlp(text)

for ent in doc.ents:
if ent.label_ in {
"PERSON",
"ORG",
"PRODUCT",
"NORP",
"GPE",
"EVENT",
"PROJECT",
"ASSET",
}:
candidates.append(ent.text)
doc = self._nlp(text) if self._nlp is not None else None

if doc is not None:
for ent in doc.ents:
if ent.label_ in {
"PERSON",
"ORG",
"PRODUCT",
"NORP",
"GPE",
"EVENT",
"PROJECT",
"ASSET",
}:
candidates.append(ent.text)

for alias in sorted(self._canonical_names, key=len, reverse=True):
if len(alias) < 3:
continue
pattern = r"(?<![\w$])" + re.escape(alias) + r"(?![\w-])"
if re.search(pattern, text, flags=re.IGNORECASE):
candidates.append(self._canonical_names[alias])

# Heuristic for names when running without a pretrained NER model.
for match in self._PERSON_PATTERN.findall(text):
first_word = match.split()[0]
if first_word in self._PERSON_PREFIX_EXCLUSIONS:
continue
if any(part.isupper() for part in match.split()):
continue
candidates.append(match)

# Explicit ticker extraction catches tokens that may not be tagged as entities.
Expand Down Expand Up @@ -165,7 +188,11 @@ def extract_entities_from_article(
content: Optional[str] = None,
) -> List[str]:
"""Extract entities from combined article fields."""
chunks = [value.strip() for value in [title or "", summary or "", content or ""] if value and value.strip()]
chunks = [
value.strip()
for value in [title or "", summary or "", content or ""]
if value and value.strip()
]
if not chunks:
return []
return self.extract_entities("\n".join(chunks))
Loading
Loading