diff --git a/condor/trading_agent/prompts.py b/condor/trading_agent/prompts.py index a0431b15..76fcd520 100644 --- a/condor/trading_agent/prompts.py +++ b/condor/trading_agent/prompts.py @@ -34,7 +34,7 @@ AVAILABLE MCP TOOLS: - mcp-hummingbot: configure_server, get_market_data, get_portfolio_overview, \ -manage_executors, manage_bots, search_history +manage_executors, manage_bots, search_history, explore_dex_pools, manage_gateway_config - condor: trading_agent_journal_write, trading_agent_journal_read, \ send_notification """ @@ -46,6 +46,8 @@ "mcp__mcp-hummingbot__get_portfolio_overview," "mcp__mcp-hummingbot__manage_executors," "mcp__mcp-hummingbot__search_history," + "mcp__mcp-hummingbot__explore_dex_pools," + "mcp__mcp-hummingbot__manage_gateway_config," "mcp__condor__trading_agent_journal_write," "mcp__condor__trading_agent_journal_read," 'mcp__condor__send_notification")\n' diff --git a/condor/trading_agent/skills/__init__.py b/condor/trading_agent/skills/__init__.py index c0eb5e6e..ab614d2a 100644 --- a/condor/trading_agent/skills/__init__.py +++ b/condor/trading_agent/skills/__init__.py @@ -23,6 +23,8 @@ def _auto_register() -> None: """Import built-in Python skill modules and discover markdown templates.""" # Core Python skills from . import executors # noqa: F401 + from . import trending_pools # noqa: F401 + from . import pool_candles # noqa: F401 # Discover markdown skill templates for md_path in _SKILLS_DIR.glob("*.md"): diff --git a/condor/trading_agent/skills/pool_candles.py b/condor/trading_agent/skills/pool_candles.py new file mode 100644 index 00000000..7adaf777 --- /dev/null +++ b/condor/trading_agent/skills/pool_candles.py @@ -0,0 +1,117 @@ +"""Core skill: fetch OHLCV candles for a Solana pool from GeckoTerminal.""" + +from __future__ import annotations + +import logging +from typing import Any + +from . import register_skill +from .base import BaseSkill, SkillResult + +log = logging.getLogger(__name__) + + +class PoolCandlesSkill(BaseSkill): + name = "pool_candles" + is_core = True + + async def execute(self, client: Any, config: dict, agent_id: str = "") -> SkillResult: + pool_address = config.get("pool_address", "") + if not pool_address: + return SkillResult( + name=self.name, + data={"error": "pool_address required in config"}, + summary="Pool Candles: no pool_address provided", + ) + + network = config.get("network", "solana") + timeframe = config.get("timeframe", "hour") + limit = config.get("candle_limit", 168) # 7 days of hourly candles + + try: + from geckoterminal_py import GeckoTerminalAsyncClient + + gecko = GeckoTerminalAsyncClient() + result = await gecko.get_ohlcv( + network_id=network, + pool_address=pool_address, + timeframe=timeframe, + limit=limit, + ) + + # Parse result (DataFrame or raw dict) + candles = [] + try: + import pandas as pd + if isinstance(result, pd.DataFrame) and not result.empty: + candles = result.to_dict("records") + except ImportError: + pass + + if not candles: + if isinstance(result, list): + candles = result + elif isinstance(result, dict): + ohlcv_list = result.get("data", {}).get("attributes", {}).get("ohlcv_list", []) + candles = [ + {"timestamp": c[0], "open": c[1], "high": c[2], "low": c[3], "close": c[4], "volume": c[5]} + for c in ohlcv_list + ] + + if not candles: + return SkillResult( + name=self.name, + data={"candles": [], "pool_address": pool_address}, + summary=f"Pool Candles ({pool_address[:8]}...): no data from GeckoTerminal", + ) + + # Extract high/low/current from candle data + highs = [] + lows = [] + for c in candles: + h = c.get("high") + l = c.get("low") + if h is not None: + highs.append(float(h)) + if l is not None: + lows.append(float(l)) + + current_price = float(candles[-1]["close"]) if candles[-1].get("close") else None + period_high = max(highs) if highs else None + period_low = min(lows) if lows else None + + summary_data = { + "candles": candles, + "pool_address": pool_address, + "current_price": current_price, + "period_high": period_high, + "period_low": period_low, + "candle_count": len(candles), + "timeframe": timeframe, + } + + lines = [f"Pool Candles ({pool_address[:8]}..., {timeframe}, {len(candles)} candles):"] + if current_price is not None: + lines.append(f" Current: ${current_price:,.6g}") + if period_high is not None and period_low is not None: + lines.append(f" High: ${period_high:,.6g} | Low: ${period_low:,.6g}") + if period_low > 0: + range_pct = ((period_high - period_low) / period_low) * 100 + lines.append(f" Range: {range_pct:,.1f}%") + + return SkillResult( + name=self.name, + data=summary_data, + summary="\n".join(lines), + ) + + except Exception as e: + log.exception("PoolCandlesSkill failed") + return SkillResult( + name=self.name, + data={"error": str(e), "pool_address": pool_address}, + summary=f"Pool Candles ({pool_address[:8]}...): failed ({e})", + ) + + +register_skill(PoolCandlesSkill()) diff --git a/condor/trading_agent/skills/trending_pools.py b/condor/trading_agent/skills/trending_pools.py new file mode 100644 index 00000000..0b4122ce --- /dev/null +++ b/condor/trading_agent/skills/trending_pools.py @@ -0,0 +1,226 @@ +"""Core skill: fetch trending Solana tokens from GeckoTerminal. + +Fetches top 20 trending pools on first tick, extracts unique base token +addresses (any DEX), then caches the result. The agent uses these tokens +with explore_dex_pools to find the best Meteora pools by fee/TVL ratio. +""" + +from __future__ import annotations + +import logging +from typing import Any + +from . import register_skill +from .base import BaseSkill, SkillResult + +log = logging.getLogger(__name__) + +# Wrapped SOL address on Solana +WRAPPED_SOL = "So11111111111111111111111111111111111111112" + + +def _safe_float(val: Any) -> float: + """Safely convert a value to float, returning 0.0 on failure.""" + try: + return float(val) if val else 0.0 + except (ValueError, TypeError): + return 0.0 + + +def _extract_pool_summary(pool: dict) -> dict: + """Extract key fields from a GeckoTerminal pool record. + + Handles both nested API response format and flattened DataFrame rows. + """ + attrs = pool.get("attributes", pool) + relationships = pool.get("relationships", {}) + + name = attrs.get("name", "") + base_symbol = attrs.get("base_token_symbol", "") + quote_symbol = attrs.get("quote_token_symbol", "") + + # Parse symbols from name if missing (e.g. "PENGU / SOL") + if (not base_symbol or not quote_symbol) and "/" in str(name): + parts = str(name).split("/") + if len(parts) == 2: + base_symbol = base_symbol or parts[0].strip() + quote_symbol = quote_symbol or parts[1].strip() + + # Token addresses from relationships or direct fields + base_token_id = attrs.get("base_token_id", "") + quote_token_id = attrs.get("quote_token_id", "") + if not base_token_id: + try: + base_token_id = relationships.get("base_token", {}).get("data", {}).get("id", "") + except (AttributeError, TypeError): + pass + if not quote_token_id: + try: + quote_token_id = relationships.get("quote_token", {}).get("data", {}).get("id", "") + except (AttributeError, TypeError): + pass + + def _parse_addr(token_id: str) -> str: + if not token_id: + return "" + # Format: "solana_
" -> extract address + parts = token_id.split("_", 1) + return parts[1] if len(parts) > 1 else token_id + + base_token_address = attrs.get("base_token_address", "") or _parse_addr(base_token_id) + quote_token_address = attrs.get("quote_token_address", "") or _parse_addr(quote_token_id) + + # DEX name from relationships or direct field + dex_id = attrs.get("dex_id", "") + if not dex_id: + try: + dex_id = relationships.get("dex", {}).get("data", {}).get("id", "") + except (AttributeError, TypeError): + pass + + # Pool address + pool_address = attrs.get("address", pool.get("id", "")) + if "solana_" in str(pool_address): + pool_address = str(pool_address).split("_", 1)[-1] + elif "solana_" in str(pool.get("id", "")): + pool_address = str(pool.get("id", "")).split("_", 1)[-1] + + return { + "name": name or f"{base_symbol}/{quote_symbol}", + "base_symbol": base_symbol, + "quote_symbol": quote_symbol, + "base_token_address": base_token_address, + "quote_token_address": quote_token_address, + "pool_address": pool_address, + "dex": dex_id, + "price_usd": attrs.get("base_token_price_usd", ""), + "volume_24h": attrs.get("volume_usd", {}).get("h24", "") if isinstance(attrs.get("volume_usd"), dict) else attrs.get("volume_usd_h24", ""), + "tvl": attrs.get("reserve_in_usd", attrs.get("tvl", "")), + "price_change_24h": attrs.get("price_change_percentage", {}).get("h24", "") if isinstance(attrs.get("price_change_percentage"), dict) else attrs.get("price_change_percentage_h24", ""), + } + + +class TrendingPoolsSkill(BaseSkill): + name = "trending_pools" + is_core = True + + # Cache: fetch trending tokens once at startup, reuse on subsequent ticks + _cached_result: SkillResult | None = None + + async def execute(self, client: Any, config: dict, agent_id: str = "") -> SkillResult: + if self._cached_result is not None: + return self._cached_result + + try: + from geckoterminal_py import GeckoTerminalAsyncClient + + gecko = GeckoTerminalAsyncClient() + result = await gecko.get_trending_pools_by_network("solana") + + # Extract pools from response (handles DataFrame and dict formats) + pools: list = [] + try: + import pandas as pd + if isinstance(result, pd.DataFrame): + pools = result.to_dict("records") + except ImportError: + pass + + if not pools: + if isinstance(result, list): + pools = result + elif isinstance(result, dict): + pools = result.get("data", []) + elif hasattr(result, "data"): + data = result.data + pools = data if isinstance(data, list) else data.get("data", []) if isinstance(data, dict) else [] + + pools = pools[:20] + + if not pools: + skill_result = SkillResult( + name=self.name, + data={"tokens": [], "pools": []}, + summary="Trending Solana Tokens: no data from GeckoTerminal", + ) + self._cached_result = skill_result + return skill_result + + summaries = [_extract_pool_summary(p) for p in pools] + + # Extract unique tokens (dedupe by address, exclude SOL/stablecoins) + seen_addresses: set[str] = set() + tokens: list[dict] = [] + stablecoins = {"USDC", "USDT", "BUSD", "DAI", "UST", "USDH", "UXD"} + + for p in summaries: + # Get the non-SOL token from the pair + if p["quote_symbol"].upper() == "SOL" or p["quote_token_address"] == WRAPPED_SOL: + token_symbol = p["base_symbol"] + token_address = p["base_token_address"] + elif p["base_symbol"].upper() == "SOL" or p["base_token_address"] == WRAPPED_SOL: + token_symbol = p["quote_symbol"] + token_address = p["quote_token_address"] + else: + # Neither is SOL, take base token + token_symbol = p["base_symbol"] + token_address = p["base_token_address"] + + # Skip if already seen, is SOL, or is a stablecoin + if not token_address or token_address in seen_addresses: + continue + if token_address == WRAPPED_SOL: + continue + if token_symbol.upper() in stablecoins: + continue + + seen_addresses.add(token_address) + tokens.append({ + "symbol": token_symbol, + "address": token_address, + "source_pool": p["name"], + "source_dex": p["dex"], + "tvl": p["tvl"], + "volume_24h": p["volume_24h"], + "price_change_24h": p["price_change_24h"], + }) + + # Build summary + lines = [f"Trending Solana Tokens ({len(tokens)} unique from top 20 pools):"] + for i, t in enumerate(tokens, 1): + vol = t["volume_24h"] + tvl = t["tvl"] + change = t["price_change_24h"] + vol_str = f"${_safe_float(vol):,.0f}" if vol else "N/A" + tvl_str = f"${_safe_float(tvl):,.0f}" if tvl else "N/A" + change_str = f"{_safe_float(change):+.1f}%" if change else "N/A" + lines.append( + f" {i}. {t['symbol']} — from {t['source_dex']}, " + f"Vol: {vol_str}, TVL: {tvl_str}, 24h: {change_str}" + ) + lines.append(f" Address: {t['address']}") + + if not tokens: + lines.append(" (no trending tokens found)") + + lines.append("") + lines.append("Use explore_dex_pools(connector='meteora', search_term='{SYMBOL}', sort_key='feetvlratio') to find best Meteora pools.") + + skill_result = SkillResult( + name=self.name, + data={"tokens": tokens, "pools": summaries}, + summary="\n".join(lines), + ) + self._cached_result = skill_result + return skill_result + + except Exception as e: + log.exception("TrendingPoolsSkill failed") + return SkillResult( + name=self.name, + data={"error": str(e)}, + summary=f"Trending Solana Tokens: failed ({e})", + ) + + +register_skill(TrendingPoolsSkill()) diff --git a/handlers/config/gateway/pools.py b/handlers/config/gateway/pools.py index 5859be38..d3def248 100644 --- a/handlers/config/gateway/pools.py +++ b/handlers/config/gateway/pools.py @@ -503,8 +503,7 @@ async def prompt_remove_pool( f"➖ *Remove Pool from {connector_escaped}*\n" f"Network: `{network_escaped}`\n\n" "*Select a pool to remove:*\n\n" - f"{pools_text}\n\n" - "⚠️ _Restart Gateway after removing for changes to take effect\\._" + f"{pools_text}" ) keyboard.append( [InlineKeyboardButton("« Cancel", callback_data="gateway_pool_view")] @@ -548,8 +547,7 @@ async def show_delete_pool_confirmation( f"Network: *{network_escaped}*\n" f"Type: *{pool_type_escaped}*\n" f"Address: `{addr_escaped}`\n\n" - f"⚠️ This will remove the pool from *{connector_escaped}* on *{network_escaped}*\\.\n" - "You will need to restart the Gateway for changes to take effect\\.\n\n" + f"⚠️ This will remove the pool from *{connector_escaped}* on *{network_escaped}*\\.\n\n" "Are you sure you want to delete this pool?" ) @@ -623,8 +621,7 @@ async def remove_pool( success_text = ( f"✅ *Pool Removed*\n\n" f"`{addr_escaped}`\n\n" - f"Removed from {connector_escaped} on {network_escaped}\n\n" - "⚠️ _Restart Gateway for changes to take effect\\._" + f"Removed from {connector_escaped} on {network_escaped}" ) keyboard = [ diff --git a/handlers/config/gateway/tokens.py b/handlers/config/gateway/tokens.py index 1ec544fc..06faefdd 100644 --- a/handlers/config/gateway/tokens.py +++ b/handlers/config/gateway/tokens.py @@ -364,8 +364,7 @@ async def prompt_add_token( "*Option 2:* Full format\n" "`address,symbol,decimals,name`\n\n" "*Example:*\n" - "`9QFfgxdSqH5zT7j6rZb1y6SZhw2aFtcQu2r6BuYpump`\n\n" - "⚠️ _Restart Gateway after adding for changes to take effect\\._" + "`9QFfgxdSqH5zT7j6rZb1y6SZhw2aFtcQu2r6BuYpump`" ) keyboard = [ @@ -576,8 +575,7 @@ async def prompt_edit_token( "`symbol,decimals,name`\n\n" "*Example:*\n" "`GOLD,9,Goldcoin`\n\n" - "_Leave name empty if not needed \\(e\\.g\\. `GOLD,9`\\)_\n\n" - "⚠️ _Restart Gateway after editing for changes to take effect\\._" + "_Leave name empty if not needed \\(e\\.g\\. `GOLD,9`\\)_" ) keyboard = [ @@ -650,8 +648,7 @@ async def show_delete_token_confirmation( message_text += ( f"Address: `{addr_escaped}`\n\n" - f"⚠️ This will remove the token from *{network_escaped}*\\.\n" - "You will need to restart the Gateway for changes to take effect\\.\n\n" + f"⚠️ This will remove the token from *{network_escaped}*\\.\n\n" "Are you sure you want to delete this token?" ) @@ -713,8 +710,7 @@ async def remove_token( success_text = ( f"✅ *Token Removed*\n\n" f"`{addr_escaped}`\n\n" - f"Removed from {network_escaped}\n\n" - "⚠️ _Restart Gateway for changes to take effect\\._" + f"Removed from {network_escaped}" ) keyboard = [ @@ -876,8 +872,7 @@ async def handle_token_input( success_text = ( f"✅ *Token Added Successfully*\n\n" - f"*{symbol_escaped}* added to {network_escaped}\n\n" - "⚠️ _Restart Gateway for changes to take effect\\._" + f"*{symbol_escaped}* added to {network_escaped}" ) if message_id and chat_id: @@ -1014,8 +1009,7 @@ async def mock_answer(text=""): success_text = ( f"✅ *Token Updated Successfully*\n\n" - f"*{symbol_escaped}* updated on {network_escaped}\n\n" - "⚠️ _Restart Gateway for changes to take effect\\._" + f"*{symbol_escaped}* updated on {network_escaped}" ) if message_id and chat_id: diff --git a/handlers/dex/geckoterminal.py b/handlers/dex/geckoterminal.py index 928a1c24..1b634b92 100644 --- a/handlers/dex/geckoterminal.py +++ b/handlers/dex/geckoterminal.py @@ -3167,8 +3167,7 @@ async def handle_gecko_token_add( success_text = ( f"✅ *Token Added*\n\n" - f"*{escape_markdown_v2(symbol)}* added to {escape_markdown_v2(gateway_network)}\n\n" - "⚠️ _Restart Gateway for changes to take effect\\._" + f"*{escape_markdown_v2(symbol)}* added to {escape_markdown_v2(gateway_network)}" ) keyboard = [ @@ -3741,22 +3740,14 @@ async def add_token_to_gateway(token_address: str) -> str: # Build result message if added_tokens: - result_msg = f"✅ *Added:* {escape_markdown_v2(', '.join(added_tokens))}\n\n" + result_msg = f"✅ *Added:* {escape_markdown_v2(', '.join(added_tokens))}" else: - result_msg = "ℹ️ _Tokens already in Gateway_\n\n" + result_msg = "ℹ️ _Tokens already in Gateway_" if errors: - result_msg += f"⚠️ Failed: {escape_markdown_v2(', '.join(errors))}\n\n" + result_msg += f"\n\n⚠️ Failed: {escape_markdown_v2(', '.join(errors))}" - result_msg += r"⚠️ _Restart Gateway for changes to take effect_" - - # Add restart button keyboard = [ - [ - InlineKeyboardButton( - "🔄 Restart Gateway", callback_data="dex:gecko_restart_gateway" - ) - ], [ InlineKeyboardButton( "« Back to Pool", diff --git a/handlers/dex/pools.py b/handlers/dex/pools.py index 8bf26995..34db255c 100644 --- a/handlers/dex/pools.py +++ b/handlers/dex/pools.py @@ -1821,8 +1821,6 @@ async def add_token_to_gateway(token_address: str) -> bool: if errors: success_msg += f"\n⚠️ Failed: {', '.join(errors)}" - success_msg += "\n\n⚠️ Restart Gateway for changes to take effect" - # Go back to pool detail try: ( diff --git a/handlers/executors/_shared.py b/handlers/executors/_shared.py index c2d3ed39..d530b309 100644 --- a/handlers/executors/_shared.py +++ b/handlers/executors/_shared.py @@ -82,16 +82,30 @@ def normalize_side(side_val) -> int: "activation_bounds": -1, # -1 = disabled, float = value } +# Swap executor defaults (Gateway AMM swaps via Jupiter router) +SWAP_EXECUTOR_DEFAULTS = { + "type": "swap_executor", + "connector_name": "jupiter/router", # Gateway router connector + "trading_pair": "", + "side": SIDE_SHORT, # SELL=2, BUY=1 + "amount": 0.0, # Base token amount to swap + "slippage_pct": None, # Optional: override default slippage +} + def get_executor_type(executor: Dict[str, Any]) -> str: """Determine executor type from its data. - Returns: 'grid' or 'position' + Returns: 'grid', 'position', 'swap', or 'lp' """ config = executor.get("config", executor) for source in (config, executor): ex_type = source.get("type", "") if isinstance(ex_type, str): + if "swap" in ex_type.lower(): + return "swap" + if "lp" in ex_type.lower(): + return "lp" if "position" in ex_type.lower(): return "position" if "grid" in ex_type.lower(): @@ -164,6 +178,8 @@ def init_new_executor_config(context, executor_type: str = "grid") -> Dict[str, config = GRID_EXECUTOR_DEFAULTS.copy() elif executor_type == "position": config = POSITION_EXECUTOR_DEFAULTS.copy() + elif executor_type == "swap": + config = SWAP_EXECUTOR_DEFAULTS.copy() else: config = GRID_EXECUTOR_DEFAULTS.copy()