Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 19 additions & 11 deletions backend/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import json
import logging
from collections import deque
from contextlib import asynccontextmanager
from pathlib import Path
from typing import Any, AsyncGenerator, Dict, List

Expand All @@ -20,10 +21,15 @@
from services.commodity_service import get_commodity_service
from services.conflict_service import get_conflict_service
from services.rag_service import get_rag_service
from services.tracking_service import fetch_flights, get_ships, get_flights
from services.tracking_service import fetch_flights, get_ships, get_flights, stream_ships

logger = logging.getLogger(__name__)

_FRONTEND_DIR = Path(__file__).resolve().parent.parent / "frontend" / "web"
_ASSETS_DIR = Path(__file__).resolve().parent.parent / "frontend" / "assets"

latest_news: deque[Dict[str, Any]] = deque(maxlen=100)

@asynccontextmanager
async def lifespan(app):
# Start AIS ship stream in background
Expand All @@ -39,14 +45,6 @@ async def lifespan(app):
allow_headers=["*"],
)

_FRONTEND_DIR = Path(__file__).resolve().parent.parent / "frontend" / "web"
_ASSETS_DIR = Path(__file__).resolve().parent.parent / "frontend" / "assets"

latest_news: deque[Dict[str, Any]] = deque(maxlen=100)

from contextlib import asynccontextmanager
from services.tracking_service import stream_ships

# ── Health ────────────────────────────────────────────────────────────

@app.get("/health", tags=["meta"])
Expand Down Expand Up @@ -235,15 +233,25 @@ async def receive_stream(data: Dict[str, Any]):


@app.get("/api/tracking/flights", tags=["tracking"])
async def get_flight_data(military_only: bool = False):
async def get_flight_data(military_only: bool = False, limit: int = 50):
"""Get flight tracking data (limited and cached)"""
flights = await fetch_flights()
if military_only:
flights = [f for f in flights if f.get("military")]

# Limit results to prevent frontend lag
flights = flights[:limit]

return {"success": True, "count": len(flights), "flights": flights}

@app.get("/api/tracking/ships", tags=["tracking"])
async def get_ship_data(tankers_only: bool = False):
async def get_ship_data(tankers_only: bool = False, limit: int = 100):
"""Get ship tracking data (limited and cached)"""
ships = get_ships(tankers_only)

# Limit results to prevent frontend lag
ships = ships[:limit]

return {"success": True, "count": len(ships), "ships": ships}

# ── Static mount — MUST BE LAST ───────────────────────────────────────
Expand Down
2 changes: 1 addition & 1 deletion backend/config/auth_telegram.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from dotenv import load_dotenv

# Load Telegram API credentials from environment
load_dotenv() # Load variables from .env88
load_dotenv() # Load variables from .env
api_id = os.getenv("TELEGRAM_API_ID")
api_hash = os.getenv("TELEGRAM_API_HASH")
phone = os.getenv("TELEGRAM_PHONE")
Expand Down
1 change: 0 additions & 1 deletion backend/init_infra.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ def check_postgresql():
try:
with engine.connect() as conn:
result = conn.execute(text("SELECT version();"))
conn.execute(text("CREATE EXTENSION IF NOT EXISTS timescaledb CASCADE;"))
version = result.fetchone()[0]
print(f"✅ PostgreSQL: {version[:50]}...")
return True
Expand Down
Binary file added backend/models/__pycache__/__init__.cpython-310.pyc
Binary file not shown.
Binary file not shown.
Binary file modified backend/models/__pycache__/database.cpython-311.pyc
Binary file not shown.
83 changes: 64 additions & 19 deletions backend/models/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,28 +120,73 @@ def init_db():


def init_timescaledb():
"""Convert events table to TimescaleDB hypertable"""
"""Convert events table to TimescaleDB hypertable with better error handling"""
from sqlalchemy import text
try:
with engine.connect() as conn:
# Check if TimescaleDB extension exists
conn.execute("CREATE EXTENSION IF NOT EXISTS timescaledb CASCADE;")

# Convert events table to hypertable
conn.execute("""
SELECT create_hypertable('events', 'timestamp',
if_not_exists => TRUE,
chunk_time_interval => INTERVAL '1 day'
print("📊 Installing TimescaleDB extension...")
conn.execute(text("CREATE EXTENSION IF NOT EXISTS timescaledb CASCADE;"))
conn.commit()

# Check if events table is already a hypertable
result = conn.execute(text("""
SELECT EXISTS (
SELECT 1 FROM timescaledb_information.hypertables
WHERE hypertable_name = 'events'
);
""")

# Convert commodities table to hypertable
conn.execute("""
SELECT create_hypertable('commodities', 'timestamp',
if_not_exists => TRUE,
chunk_time_interval => INTERVAL '1 hour'
""")).scalar()

if not result:
print("📊 Converting events table to hypertable...")
# Convert events table to hypertable (migrate existing data)
conn.execute(text("""
SELECT create_hypertable('events', 'timestamp',
if_not_exists => TRUE,
migrate_data => TRUE,
chunk_time_interval => INTERVAL '1 day'
);
"""))
conn.commit()
print("✅ Events hypertable created successfully")
else:
print("ℹ️ Events table already a hypertable")

# Check if commodities table is already a hypertable
result = conn.execute(text("""
SELECT EXISTS (
SELECT 1 FROM timescaledb_information.hypertables
WHERE hypertable_name = 'commodities'
);
""")

print("✅ TimescaleDB hypertables configured")
""")).scalar()

if not result:
print("📊 Converting commodities table to hypertable...")
# Convert commodities table to hypertable (migrate existing data)
conn.execute(text("""
SELECT create_hypertable('commodities', 'timestamp',
if_not_exists => TRUE,
migrate_data => TRUE,
chunk_time_interval => INTERVAL '1 hour'
);
"""))
conn.commit()
print("✅ Commodities hypertable created successfully")
else:
print("ℹ️ Commodities table already a hypertable")

print("✅ TimescaleDB hypertables configured successfully")

except Exception as e:
print(f"⚠️ TimescaleDB setup skipped (requires extension): {e}")
error_msg = str(e)
if "table is not empty" in error_msg:
print(f"⚠️ TimescaleDB: Tables contain data. Use 'migrate_data => TRUE' to migrate.")
elif "does not exist" in error_msg:
print(f"⚠️ TimescaleDB extension not available. Install with: apt install timescaledb-postgresql")
elif "already exists" in error_msg:
print(f"ℹ️ TimescaleDB: Tables already converted to hypertables")
else:
print(f"⚠️ TimescaleDB setup error: {error_msg}")

# Don't fail the whole initialization - regular PostgreSQL still works
print("ℹ️ Continuing with regular PostgreSQL (performance may be reduced)")
Binary file modified backend/services/__pycache__/conflict_service.cpython-311.pyc
Binary file not shown.
Binary file modified backend/services/__pycache__/rag_service.cpython-311.pyc
Binary file not shown.
Binary file not shown.
8 changes: 4 additions & 4 deletions backend/services/rag_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from dotenv import load_dotenv

load_dotenv()
_MODEL = "openrouter/free"
OPENROUTER_API_BASE = "https://openrouter.ai/api/v1"
OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY")
QDRANT_URL = os.getenv("QDRANT_URL", "http://localhost:6333")
EMBEDDING_MODEL = os.getenv("EMBEDDING_MODEL", "all-MiniLM-L6-v2")
Expand Down Expand Up @@ -51,10 +51,10 @@ def _initialize(self):
embedding=self.embeddings, # note: "embedding" not "embeddings"
)

# Initialize OpenRouter LLM
# Initialize OpenRouter LLM - using less busy models
self.llm = ChatOpenAI(
model="meta-llama/llama-3.3-70b-instruct:free",
openai_api_base=_MODEL,
model="deepseek/deepseek-chat", # DeepSeek R1 - fast and reliable
openai_api_base=OPENROUTER_API_BASE,
openai_api_key=OPENROUTER_API_KEY,
temperature=0.7,
max_tokens=1000,
Expand Down
49 changes: 42 additions & 7 deletions backend/services/tracking_service.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""Real-time tracking service
"""Real-time tracking service with caching
- Ships: aisstream.io WebSocket (free)
- Flights: OpenSky Network REST API (free, no key needed)
"""
Expand All @@ -8,8 +8,10 @@
import os
import httpx
import websockets
import time
from datetime import datetime
from typing import Dict, List
from functools import lru_cache
from dotenv import load_dotenv

load_dotenv()
Expand All @@ -18,6 +20,11 @@
OPENSKY_USER = os.getenv("OPENSKY_USERNAME", "")
OPENSKY_PASS = os.getenv("OPENSKY_PASSWORD", "")

# Caching configuration
CACHE_TTL = 120 # 2 minutes cache for tracking data
flight_cache = {"data": [], "timestamp": 0}
ship_cache = {"data": [], "timestamp": 0}

# Military callsign prefixes
MILITARY_CALLSIGN_PATTERNS = [
"RCH", # US Air Force (Reach)
Expand Down Expand Up @@ -102,7 +109,14 @@ async def stream_ships():


async def fetch_flights(region: str = "global") -> List[dict]:
"""Fetch flights from OpenSky Network — free, no key needed."""
"""Fetch flights from OpenSky Network with caching — free, no key needed."""

# Check cache first
current_time = time.time()
if (current_time - flight_cache["timestamp"]) < CACHE_TTL and flight_cache["data"]:
print(f"🛩️ Using cached flight data ({len(flight_cache['data'])} flights)")
return flight_cache["data"]

# Bounding boxes for key regions
REGIONS = {
"middle_east": (12.0, 25.0, 42.0, 65.0),
Expand All @@ -115,7 +129,7 @@ async def fetch_flights(region: str = "global") -> List[dict]:

try:
auth = (OPENSKY_USER, OPENSKY_PASS) if OPENSKY_USER else None
async with httpx.AsyncClient(timeout=15.0) as client:
async with httpx.AsyncClient(timeout=10.0) as client: # Reduced timeout
resp = await client.get(
"https://opensky-network.org/api/states/all",
params={"lamin": lamin, "lomin": lomin,
Expand All @@ -126,7 +140,7 @@ async def fetch_flights(region: str = "global") -> List[dict]:
data = resp.json()

flights = []
for state in (data.get("states") or []):
for state in (data.get("states") or [])[:200]: # Limit to 200 flights
if len(state) < 17:
continue
icao24 = state[0] or ""
Expand Down Expand Up @@ -162,20 +176,41 @@ async def fetch_flights(region: str = "global") -> List[dict]:
for f in flights:
_flights[f["icao24"]] = f

# Update cache
flight_cache["data"] = flights
flight_cache["timestamp"] = current_time

military_count = sum(1 for f in flights if f["military"])
print(f"✅ OpenSky: {len(flights)} flights, {military_count} military")
print(f"✅ OpenSky: {len(flights)} flights, {military_count} military (cached)")
return flights

except Exception as e:
print(f"⚠️ OpenSky error: {e}")
# Return cached data if available, otherwise empty list
if flight_cache["data"]:
print(f"🛩️ Using cached flight data due to error ({len(flight_cache['data'])} flights)")
return flight_cache["data"]
return list(_flights.values())


def get_ships(tankers_only: bool = False) -> List[dict]:
ships = list(_ships.values())
"""Get cached ship data with optional tanker filtering"""

# Check cache first
current_time = time.time()
if (current_time - ship_cache["timestamp"]) < CACHE_TTL and ship_cache["data"]:
ships = ship_cache["data"]
print(f"🚢 Using cached ship data ({len(ships)} ships)")
else:
ships = list(_ships.values())
# Update cache
ship_cache["data"] = ships
ship_cache["timestamp"] = current_time

if tankers_only:
ships = [s for s in ships if s.get("is_tanker")]
return ships[:500] # cap at 500 for frontend performance

return ships[:100] # Reduced from 500 to 100 for performance


def get_flights(military_only: bool = False) -> List[dict]:
Expand Down
Binary file modified backend/workers/tasks/__pycache__/news_worker.cpython-311.pyc
Binary file not shown.
Binary file modified backend/workers/tasks/__pycache__/processor.cpython-311.pyc
Binary file not shown.
Binary file modified backend/workers/tasks/__pycache__/reddit_worker.cpython-311.pyc
Binary file not shown.
Binary file modified backend/workers/tasks/__pycache__/rss_worker.cpython-311.pyc
Binary file not shown.
Binary file modified backend/workers/tasks/__pycache__/telegram_worker.cpython-311.pyc
Binary file not shown.
27 changes: 22 additions & 5 deletions backend/workers/tasks/news_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from config.celery_config import celery_app
from models.redis_client import is_duplicate, RedisPubSub
from models.database import SessionLocal, Event
from services.geo_extractor import extract_location

load_dotenv()

Expand Down Expand Up @@ -86,28 +87,44 @@ def fetch_news():
# Parse timestamp
published = article.get("publishedAt")
timestamp = datetime.fromisoformat(published.replace("Z", "+00:00")) if published else datetime.utcnow()


# Extract geo-location
geo = extract_location(full_text)
lat, lon, place = None, None, None
if geo:
lat = geo.get("lat")
lon = geo.get("lon")
place = geo.get("place")

# Create event
event = Event(
source="NewsAPI",
text=full_text,
url=article.get("url", ""),
timestamp=timestamp,
bias="Varied",
content_hash=content_hash
content_hash=content_hash,
lat=lat,
lon=lon,
place=place
)

db.add(event)
db.flush() # Get event ID
new_articles += 1

# Publish to stream
pubsub.publish({
"type": "event",
"id": event.id,
"source": "NewsAPI",
"text": full_text[:200] + "..." if len(full_text) > 200 else full_text,
"url": event.url,
"timestamp": timestamp.isoformat(),
"bias": "Varied"
"bias": "Varied",
"lat": lat,
"lon": lon,
"place": place
})

# Queue for processing
Expand Down
Loading