Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions routers/market_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,37 @@ async def get_available_candle_connectors():
return list(CandlesFactory._candles_map.keys())


@router.get("/volumes/{connector_name}")
async def get_24h_volumes(
connector_name: str,
market_data_manager: MarketDataService = Depends(get_market_data_service),
):
"""
Get 24h quote-denominated volume per trading pair for a connector, keyed by Hummingbot
trading pair (BASE-QUOTE). Intended for ranking/curating a trade-pair selector.

A value of 0.0 marks a listed-but-untraded market (e.g. Hyperliquid's permissionless
tokenized-equity spot pairs like AAPL-USDC), so clients can rank by volume and hide it.

Supported connectors: hyperliquid, hyperliquid_perpetual, binance, binance_perpetual,
okx, okx_perpetual.

Returns:
{"connector": str, "volumes": {trading_pair: quote_volume_24h}}

Raises:
HTTPException: 400 if the connector has no public 24h-volume source, 502 on fetch error.
"""
try:
volumes = await market_data_manager.get_24h_volumes(connector_name)
return {"connector": connector_name, "volumes": volumes}
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Error fetching 24h volumes for {connector_name}: {e}", exc_info=True)
raise HTTPException(status_code=502, detail=f"Error fetching 24h volumes: {str(e)}")


# Enhanced Market Data Endpoints

@router.post("/prices", response_model=PricesResponse)
Expand Down
145 changes: 145 additions & 0 deletions services/market_data_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import logging
import time
from decimal import Decimal

import aiohttp
from enum import Enum
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple

Expand Down Expand Up @@ -70,6 +72,10 @@ def __init__(
self._cleanup_task: Optional[asyncio.Task] = None
self._is_running = False

# 24h volume cache: connector_name -> (fetched_at, {trading_pair: quote_volume})
self._volume_cache: Dict[str, Tuple[float, Dict[str, float]]] = {}
self._volume_cache_ttl = 60.0

logger.info("MarketDataService initialized")

# ==================== Lifecycle ====================
Expand Down Expand Up @@ -590,6 +596,145 @@ async def get_trading_rules(
logger.error(f"Error getting trading rules for {connector_name}: {e}")
return {"error": str(e)}

# ==================== 24h Volumes ====================

async def get_24h_volumes(
self,
connector_name: str,
account_name: Optional[str] = None,
) -> Dict[str, float]:
"""
24h quote-denominated volume per trading pair, keyed by Hummingbot trading pair
(BASE-QUOTE) so it joins cleanly with trading rules.

Used to rank/curate the trade-pair selector: high-volume pairs first, and a value of
0.0 marks a listed-but-untraded market (e.g. Hyperliquid's permissionless tokenized
equities like AAPL-USDC) so the UI can hide it.

Supported: hyperliquid, hyperliquid_perpetual, binance, binance_perpetual, okx,
okx_perpetual. Raises for connectors without a public 24h-volume source.
"""
cached = self._volume_cache.get(connector_name)
if cached and (time.time() - cached[0]) < self._volume_cache_ttl:
return cached[1]

connector = self._connector_service.get_best_connector_for_market(connector_name, account_name)
if not connector:
raise ValueError(f"No connector available for {connector_name}")

raw = await self._fetch_raw_24h_volumes(connector_name)

# Map each exchange-native symbol to its Hummingbot trading pair via the connector's
# symbol map (lazily initialized). Symbols not in the map (delisted / filtered) are skipped.
volumes: Dict[str, float] = {}
for exchange_symbol, volume in raw.items():
try:
trading_pair = await connector.trading_pair_associated_to_exchange_symbol(exchange_symbol)
except Exception:
continue
volumes[trading_pair] = volume

self._volume_cache[connector_name] = (time.time(), volumes)
return volumes

async def _fetch_raw_24h_volumes(self, connector_name: str) -> Dict[str, float]:
"""Fetch {exchange_symbol: quote_volume} from each exchange's public ticker endpoint."""
timeout = aiohttp.ClientTimeout(total=15)
async with aiohttp.ClientSession(timeout=timeout) as session:
if connector_name == "hyperliquid":
return await self._fetch_hyperliquid_spot_volumes(session)
if connector_name == "hyperliquid_perpetual":
return await self._fetch_hyperliquid_perp_volumes(session)
if connector_name == "binance":
return await self._fetch_binance_volumes(session, "https://api.binance.com/api/v3/ticker/24hr")
if connector_name == "binance_perpetual":
return await self._fetch_binance_volumes(session, "https://fapi.binance.com/fapi/v1/ticker/24hr")
if connector_name == "okx":
return await self._fetch_okx_volumes(session, "SPOT")
if connector_name == "okx_perpetual":
return await self._fetch_okx_volumes(session, "SWAP")
raise ValueError(f"24h volume ranking is not supported for connector '{connector_name}'")

@staticmethod
async def _fetch_hyperliquid_spot_volumes(session: aiohttp.ClientSession) -> Dict[str, float]:
# spotMetaAndAssetCtxs returns [meta, ctxs]. The ctx list is NOT index-aligned with
# meta.universe (it carries more entries), but each ctx names its own market in `coin`
# ("@107", "PURR/USDC"), so key by that. dayNtlVlm is 24h USDC notional (quote) volume.
async with session.post("https://api.hyperliquid.xyz/info", json={"type": "spotMetaAndAssetCtxs"}) as resp:
resp.raise_for_status()
_, ctxs = await resp.json()
out: Dict[str, float] = {}
for ctx in ctxs:
coin = ctx.get("coin")
day_vlm = ctx.get("dayNtlVlm")
if coin is not None and day_vlm is not None:
out[coin] = float(day_vlm)
return out

@staticmethod
async def _fetch_hyperliquid_perp_volumes(session: aiohttp.ClientSession) -> Dict[str, float]:
# metaAndAssetCtxs returns [meta, ctxs] index-aligned with meta.universe (perp ctxs have no
# `coin` field). dayNtlVlm is 24h USDC notional (quote) volume. The base call covers only the
# main perp dex; HIP-3 builder-deployed dexes (xyz, flx, …) each need a dex-scoped call, with
# market names like "xyz:CL" that the connector maps to trading pairs like XYZ:CL-USD.
url = "https://api.hyperliquid.xyz/info"

async def fetch_dex(dex: Optional[str]) -> Dict[str, float]:
payload: Dict[str, Any] = {"type": "metaAndAssetCtxs"}
if dex:
payload["dex"] = dex
async with session.post(url, json=payload) as resp:
resp.raise_for_status()
meta, ctxs = await resp.json()
return {
market["name"]: float(ctx["dayNtlVlm"])
for market, ctx in zip(meta["universe"], ctxs)
if ctx.get("dayNtlVlm") is not None
}

# perpDexs lists the builder-deployed dexes (its first entry, the main dex, is null).
async with session.post(url, json={"type": "perpDexs"}) as resp:
resp.raise_for_status()
perp_dexs = await resp.json()
dex_names = [d["name"] for d in perp_dexs if d and d.get("name")]

results = await asyncio.gather(fetch_dex(None), *[fetch_dex(name) for name in dex_names])
out: Dict[str, float] = {}
for partial in results:
out.update(partial)
return out

@staticmethod
async def _fetch_binance_volumes(session: aiohttp.ClientSession, url: str) -> Dict[str, float]:
# Binance `quoteVolume` is already the 24h quote-asset volume.
async with session.get(url) as resp:
resp.raise_for_status()
data = await resp.json()
return {
row["symbol"]: float(row["quoteVolume"])
for row in data
if row.get("quoteVolume") is not None
}

@staticmethod
async def _fetch_okx_volumes(session: aiohttp.ClientSession, inst_type: str) -> Dict[str, float]:
# OKX SPOT: volCcy24h is the quote-currency volume. SWAP: volCcy24h is in the base
# currency, so multiply by last price for a quote-notional estimate.
url = f"https://www.okx.com/api/v5/market/tickers?instType={inst_type}"
async with session.get(url) as resp:
resp.raise_for_status()
payload = await resp.json()
out: Dict[str, float] = {}
for row in payload.get("data", []):
inst_id = row.get("instId")
if not inst_id:
continue
vol_ccy = float(row.get("volCcy24h") or 0.0)
if inst_type == "SWAP":
vol_ccy *= float(row.get("last") or 0.0)
out[inst_id] = vol_ccy
return out

# ==================== Funding Info ====================

async def get_funding_info(
Expand Down