diff --git a/routers/market_data.py b/routers/market_data.py index 1d358a28..ecacac69 100644 --- a/routers/market_data.py +++ b/routers/market_data.py @@ -234,6 +234,37 @@ async def get_available_candle_connectors(): return list(CandlesFactory._candles_map.keys()) +@router.get("/volumes/{connector_name}") +async def get_24h_volumes( + connector_name: str, + market_data_manager: MarketDataService = Depends(get_market_data_service), +): + """ + Get 24h quote-denominated volume per trading pair for a connector, keyed by Hummingbot + trading pair (BASE-QUOTE). Intended for ranking/curating a trade-pair selector. + + A value of 0.0 marks a listed-but-untraded market (e.g. Hyperliquid's permissionless + tokenized-equity spot pairs like AAPL-USDC), so clients can rank by volume and hide it. + + Supported connectors: hyperliquid, hyperliquid_perpetual, binance, binance_perpetual, + okx, okx_perpetual. + + Returns: + {"connector": str, "volumes": {trading_pair: quote_volume_24h}} + + Raises: + HTTPException: 400 if the connector has no public 24h-volume source, 502 on fetch error. + """ + try: + volumes = await market_data_manager.get_24h_volumes(connector_name) + return {"connector": connector_name, "volumes": volumes} + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + except Exception as e: + logger.error(f"Error fetching 24h volumes for {connector_name}: {e}", exc_info=True) + raise HTTPException(status_code=502, detail=f"Error fetching 24h volumes: {str(e)}") + + # Enhanced Market Data Endpoints @router.post("/prices", response_model=PricesResponse) diff --git a/services/market_data_service.py b/services/market_data_service.py index ab002857..51f58f4b 100644 --- a/services/market_data_service.py +++ b/services/market_data_service.py @@ -8,6 +8,8 @@ import logging import time from decimal import Decimal + +import aiohttp from enum import Enum from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple @@ -70,6 +72,10 @@ def __init__( self._cleanup_task: Optional[asyncio.Task] = None self._is_running = False + # 24h volume cache: connector_name -> (fetched_at, {trading_pair: quote_volume}) + self._volume_cache: Dict[str, Tuple[float, Dict[str, float]]] = {} + self._volume_cache_ttl = 60.0 + logger.info("MarketDataService initialized") # ==================== Lifecycle ==================== @@ -590,6 +596,145 @@ async def get_trading_rules( logger.error(f"Error getting trading rules for {connector_name}: {e}") return {"error": str(e)} + # ==================== 24h Volumes ==================== + + async def get_24h_volumes( + self, + connector_name: str, + account_name: Optional[str] = None, + ) -> Dict[str, float]: + """ + 24h quote-denominated volume per trading pair, keyed by Hummingbot trading pair + (BASE-QUOTE) so it joins cleanly with trading rules. + + Used to rank/curate the trade-pair selector: high-volume pairs first, and a value of + 0.0 marks a listed-but-untraded market (e.g. Hyperliquid's permissionless tokenized + equities like AAPL-USDC) so the UI can hide it. + + Supported: hyperliquid, hyperliquid_perpetual, binance, binance_perpetual, okx, + okx_perpetual. Raises for connectors without a public 24h-volume source. + """ + cached = self._volume_cache.get(connector_name) + if cached and (time.time() - cached[0]) < self._volume_cache_ttl: + return cached[1] + + connector = self._connector_service.get_best_connector_for_market(connector_name, account_name) + if not connector: + raise ValueError(f"No connector available for {connector_name}") + + raw = await self._fetch_raw_24h_volumes(connector_name) + + # Map each exchange-native symbol to its Hummingbot trading pair via the connector's + # symbol map (lazily initialized). Symbols not in the map (delisted / filtered) are skipped. + volumes: Dict[str, float] = {} + for exchange_symbol, volume in raw.items(): + try: + trading_pair = await connector.trading_pair_associated_to_exchange_symbol(exchange_symbol) + except Exception: + continue + volumes[trading_pair] = volume + + self._volume_cache[connector_name] = (time.time(), volumes) + return volumes + + async def _fetch_raw_24h_volumes(self, connector_name: str) -> Dict[str, float]: + """Fetch {exchange_symbol: quote_volume} from each exchange's public ticker endpoint.""" + timeout = aiohttp.ClientTimeout(total=15) + async with aiohttp.ClientSession(timeout=timeout) as session: + if connector_name == "hyperliquid": + return await self._fetch_hyperliquid_spot_volumes(session) + if connector_name == "hyperliquid_perpetual": + return await self._fetch_hyperliquid_perp_volumes(session) + if connector_name == "binance": + return await self._fetch_binance_volumes(session, "https://api.binance.com/api/v3/ticker/24hr") + if connector_name == "binance_perpetual": + return await self._fetch_binance_volumes(session, "https://fapi.binance.com/fapi/v1/ticker/24hr") + if connector_name == "okx": + return await self._fetch_okx_volumes(session, "SPOT") + if connector_name == "okx_perpetual": + return await self._fetch_okx_volumes(session, "SWAP") + raise ValueError(f"24h volume ranking is not supported for connector '{connector_name}'") + + @staticmethod + async def _fetch_hyperliquid_spot_volumes(session: aiohttp.ClientSession) -> Dict[str, float]: + # spotMetaAndAssetCtxs returns [meta, ctxs]. The ctx list is NOT index-aligned with + # meta.universe (it carries more entries), but each ctx names its own market in `coin` + # ("@107", "PURR/USDC"), so key by that. dayNtlVlm is 24h USDC notional (quote) volume. + async with session.post("https://api.hyperliquid.xyz/info", json={"type": "spotMetaAndAssetCtxs"}) as resp: + resp.raise_for_status() + _, ctxs = await resp.json() + out: Dict[str, float] = {} + for ctx in ctxs: + coin = ctx.get("coin") + day_vlm = ctx.get("dayNtlVlm") + if coin is not None and day_vlm is not None: + out[coin] = float(day_vlm) + return out + + @staticmethod + async def _fetch_hyperliquid_perp_volumes(session: aiohttp.ClientSession) -> Dict[str, float]: + # metaAndAssetCtxs returns [meta, ctxs] index-aligned with meta.universe (perp ctxs have no + # `coin` field). dayNtlVlm is 24h USDC notional (quote) volume. The base call covers only the + # main perp dex; HIP-3 builder-deployed dexes (xyz, flx, …) each need a dex-scoped call, with + # market names like "xyz:CL" that the connector maps to trading pairs like XYZ:CL-USD. + url = "https://api.hyperliquid.xyz/info" + + async def fetch_dex(dex: Optional[str]) -> Dict[str, float]: + payload: Dict[str, Any] = {"type": "metaAndAssetCtxs"} + if dex: + payload["dex"] = dex + async with session.post(url, json=payload) as resp: + resp.raise_for_status() + meta, ctxs = await resp.json() + return { + market["name"]: float(ctx["dayNtlVlm"]) + for market, ctx in zip(meta["universe"], ctxs) + if ctx.get("dayNtlVlm") is not None + } + + # perpDexs lists the builder-deployed dexes (its first entry, the main dex, is null). + async with session.post(url, json={"type": "perpDexs"}) as resp: + resp.raise_for_status() + perp_dexs = await resp.json() + dex_names = [d["name"] for d in perp_dexs if d and d.get("name")] + + results = await asyncio.gather(fetch_dex(None), *[fetch_dex(name) for name in dex_names]) + out: Dict[str, float] = {} + for partial in results: + out.update(partial) + return out + + @staticmethod + async def _fetch_binance_volumes(session: aiohttp.ClientSession, url: str) -> Dict[str, float]: + # Binance `quoteVolume` is already the 24h quote-asset volume. + async with session.get(url) as resp: + resp.raise_for_status() + data = await resp.json() + return { + row["symbol"]: float(row["quoteVolume"]) + for row in data + if row.get("quoteVolume") is not None + } + + @staticmethod + async def _fetch_okx_volumes(session: aiohttp.ClientSession, inst_type: str) -> Dict[str, float]: + # OKX SPOT: volCcy24h is the quote-currency volume. SWAP: volCcy24h is in the base + # currency, so multiply by last price for a quote-notional estimate. + url = f"https://www.okx.com/api/v5/market/tickers?instType={inst_type}" + async with session.get(url) as resp: + resp.raise_for_status() + payload = await resp.json() + out: Dict[str, float] = {} + for row in payload.get("data", []): + inst_id = row.get("instId") + if not inst_id: + continue + vol_ccy = float(row.get("volCcy24h") or 0.0) + if inst_type == "SWAP": + vol_ccy *= float(row.get("last") or 0.0) + out[inst_id] = vol_ccy + return out + # ==================== Funding Info ==================== async def get_funding_info(