diff --git a/bots/controllers/generic/lp_rebalancer/lp_rebalancer.py b/bots/controllers/generic/lp_rebalancer/lp_rebalancer.py index 6c400629..5ff49fbd 100644 --- a/bots/controllers/generic/lp_rebalancer/lp_rebalancer.py +++ b/bots/controllers/generic/lp_rebalancer/lp_rebalancer.py @@ -2,14 +2,17 @@ from decimal import Decimal from typing import List, Optional -from hummingbot.core.data_type.common import MarketDict +from hummingbot.core.data_type.common import MarketDict, TradeType from hummingbot.core.utils.async_utils import safe_ensure_future from hummingbot.data_feed.candles_feed.data_types import CandlesConfig from hummingbot.logger import HummingbotLogger from hummingbot.strategy_v2.controllers import ControllerBase, ControllerConfigBase from hummingbot.strategy_v2.executors.data_types import ConnectorPair +from hummingbot.strategy_v2.executors.gateway_utils import parse_provider from hummingbot.strategy_v2.executors.lp_executor.data_types import LPExecutorConfig, LPExecutorStates +from hummingbot.strategy_v2.executors.order_executor.data_types import ExecutionStrategy, OrderExecutorConfig from hummingbot.strategy_v2.models.executor_actions import CreateExecutorAction, ExecutorAction, StopExecutorAction +from hummingbot.strategy_v2.models.executors import CloseType from hummingbot.strategy_v2.models.executors_info import ExecutorInfo from pydantic import Field, field_validator, model_validator @@ -20,14 +23,24 @@ class LPRebalancerConfig(ControllerConfigBase): Uses total_amount_quote and side for position sizing. Implements KEEP vs REBALANCE logic based on price limits. + + Provider Architecture: + - connector_name: The network identifier (e.g., "solana-mainnet-beta") + - lp_provider: LP provider in format "dex/trading_type" (e.g., "meteora/clmm") + - autoswap: Uses OrderExecutor with network's configured swapProvider """ controller_type: str = "generic" controller_name: str = "lp_rebalancer" candles_config: List[CandlesConfig] = [] - # Pool configuration (required) - connector_name: str = "meteora/clmm" - network: str = "solana-mainnet-beta" + # Network connector - e.g., "solana-mainnet-beta" + connector_name: str = "solana-mainnet-beta" + + # LP provider (required) - format: "dex/trading_type" + # Examples: "meteora/clmm", "orca/clmm", "raydium/clmm" + lp_provider: str = "orca/clmm" + + # Pool configuration trading_pair: str = "" pool_address: str = "" @@ -38,7 +51,8 @@ class LPRebalancerConfig(ControllerConfigBase): position_offset_pct: Decimal = Field( default=Decimal("0.01"), json_schema_extra={"is_updatable": True}, - description="Offset from current price to ensure single-sided positions start out-of-range (e.g., 0.1 = 0.1%)" + description="Offset from current price. Positive = out-of-range (single-sided). " + "Negative = in-range (needs both tokens, autoswap will convert |offset|%)" ) # Rebalancing @@ -60,6 +74,18 @@ class LPRebalancerConfig(ControllerConfigBase): # Connector-specific params (optional) strategy_type: Optional[int] = Field(default=None, json_schema_extra={"is_updatable": True}) + # Auto-swap feature: swap tokens if balance insufficient for position + autoswap: bool = Field( + default=False, + json_schema_extra={"is_updatable": True}, + description="Automatically swap tokens if balance is insufficient for position." + ) + swap_buffer_pct: Decimal = Field( + default=Decimal("0.01"), + json_schema_extra={"is_updatable": True}, + description="Extra % to swap beyond deficit to account for slippage (e.g., 0.01 = 0.01%)" + ) + @field_validator("sell_price_min", "sell_price_max", "buy_price_min", "buy_price_max", mode="before") @classmethod def validate_price_limits(cls, v): @@ -86,11 +112,19 @@ def validate_price_limit_ranges(self): if self.sell_price_max is not None and self.sell_price_min is not None: if self.sell_price_max < self.sell_price_min: raise ValueError("sell_price_max must be >= sell_price_min") + # For negative offset (in-range), offset magnitude must not exceed width + if self.position_offset_pct < 0: + if abs(self.position_offset_pct) > self.position_width_pct: + raise ValueError( + f"For in-range positions, |position_offset_pct| ({abs(self.position_offset_pct)}) " + f"must not exceed position_width_pct ({self.position_width_pct})" + ) return self def update_markets(self, markets: MarketDict) -> MarketDict: """Register the LP connector with trading pair""" - return markets.add_or_update(self.connector_name, self.trading_pair) + markets = markets.add_or_update(self.connector_name, self.trading_pair) + return markets class LPRebalancer(ControllerBase): @@ -116,6 +150,11 @@ def __init__(self, config: LPRebalancerConfig, *args, **kwargs): super().__init__(config, *args, **kwargs) self.config: LPRebalancerConfig = config + # Parse lp_provider into dex_name and trading_type for gateway calls + self.lp_dex_name, self.lp_trading_type = parse_provider( + config.lp_provider, default_trading_type="clmm" + ) + # Parse token symbols from trading pair parts = config.trading_pair.split("-") self._base_token: str = parts[0] if len(parts) >= 2 else "" @@ -144,6 +183,13 @@ def __init__(self, config: LPRebalancerConfig, *args, **kwargs): # Cached pool price (updated in update_processed_data) self._pool_price: Optional[Decimal] = None + # Swap executor tracking (for autoswap feature) + self._swap_executor_id: Optional[str] = None + self._pending_swap_side: Optional[int] = None # LP side to create after swap completes + + # Track if initial position has been created (after that, always use side 1 or 2) + self._initial_position_created: bool = False + # Initialize rate sources self.market_data_provider.initialize_rate_sources([ ConnectorPair( @@ -176,6 +222,154 @@ def is_tracked_executor_terminated(self) -> bool: return True return executor.status == RunnableStatus.TERMINATED + def get_swap_executor(self) -> Optional[ExecutorInfo]: + """Get the swap executor we're tracking""" + if not self._swap_executor_id: + return None + for e in self.executors_info: + if e.id == self._swap_executor_id: + return e + return None + + def is_swap_executor_done(self) -> bool: + """Check if swap executor has completed (success or failure)""" + if not self._swap_executor_id: + return True + swap_executor = self.get_swap_executor() + if swap_executor is None: + return True + return swap_executor.is_done + + def _check_autoswap_needed(self, side: int, current_price: Decimal) -> Optional[OrderExecutorConfig]: + """ + Check if autoswap is needed and return swap config if so. + + Returns OrderExecutorConfig if swap is needed, None otherwise. + + Simply checks balance vs required amounts and swaps deficit + buffer if insufficient. + Works for both positive offset (out-of-range) and negative offset (in-range) positions. + + For rebalances, includes tokens from just-closed position in available balance + since wallet balance may not be updated yet. + """ + if not self.config.autoswap: + return None + + # Capture closed position amounts BEFORE creating LP position + # (they get cleared after position creation in determine_executor_actions) + closed_base = self._last_closed_base_amount or Decimal("0") + closed_quote = self._last_closed_quote_amount or Decimal("0") + closed_base_fee = self._last_closed_base_fee or Decimal("0") + closed_quote_fee = self._last_closed_quote_fee or Decimal("0") + + # Calculate required amounts (handles negative offset internally) + base_amt, quote_amt = self._calculate_amounts(side, current_price) + + # Get current wallet balances + try: + base_balance = self.market_data_provider.get_balance( + self.config.connector_name, self._base_token + ) + quote_balance = self.market_data_provider.get_balance( + self.config.connector_name, self._quote_token + ) + except Exception as e: + self.logger().warning(f"Could not fetch balances for autoswap check: {e}") + return None + + # For rebalances, add closed position amounts to available balance + # (wallet balance may not be updated yet after position close) + if closed_base > 0 or closed_quote > 0: + base_balance += closed_base + closed_base_fee + quote_balance += closed_quote + closed_quote_fee + self.logger().info( + f"Autoswap: including closed position amounts in balance: " + f"+{closed_base + closed_base_fee:.6f} {self._base_token}, " + f"+{closed_quote + closed_quote_fee:.6f} {self._quote_token}" + ) + + # Calculate deficit from raw amounts + base_deficit = base_amt - base_balance + quote_deficit = quote_amt - quote_balance + + # Add 0.1 SOL buffer for rent and transaction fees when SOL is involved + sol_buffer = Decimal("0.1") + if self._base_token.upper() == "SOL": + base_deficit += sol_buffer + if self._quote_token.upper() == "SOL": + quote_deficit += sol_buffer + + self.logger().info( + f"Autoswap check: need base={base_amt:.6f}, have={base_balance:.6f}, deficit={base_deficit:.6f} | " + f"need quote={quote_amt:.6f}, have={quote_balance:.6f}, deficit={quote_deficit:.6f}" + ) + + # Buffer multiplier only applied to swap amount + buffer_multiplier = Decimal("1") + (self.config.swap_buffer_pct / Decimal("100")) + + # If any deficit, swap + if base_deficit > 0 and quote_deficit <= 0: + # Need more base, have enough quote - BUY base with quote + swap_amount = base_deficit * buffer_multiplier + # Check if we have enough quote to buy this much base + required_quote = swap_amount * current_price * Decimal("1.02") # 2% extra for price movement + if quote_balance >= required_quote: + self.logger().info( + f"Autoswap: BUY {swap_amount:.6f} {self._base_token} " + f"(deficit={base_deficit:.6f} + {self.config.swap_buffer_pct}% buffer, " + f"have {quote_balance:.6f} {self._quote_token})" + ) + return OrderExecutorConfig( + timestamp=self.market_data_provider.time(), + connector_name=self.config.connector_name, + trading_pair=self.config.trading_pair, + side=TradeType.BUY, + amount=swap_amount, + execution_strategy=ExecutionStrategy.MARKET, + ) + else: + self.logger().warning( + f"Autoswap: insufficient quote ({quote_balance:.6f}) to buy {swap_amount:.6f} base " + f"(need ~{required_quote:.6f} {self._quote_token})" + ) + return None + + elif quote_deficit > 0 and base_deficit <= 0: + # Need more quote, have enough base - SELL base for quote + swap_amount = (quote_deficit / current_price) * buffer_multiplier + # Check if we have enough base to sell + if base_balance >= swap_amount * Decimal("1.02"): # 2% extra for price movement + self.logger().info( + f"Autoswap: SELL {swap_amount:.6f} {self._base_token} for ~{quote_deficit:.6f} {self._quote_token} " + f"(deficit + {self.config.swap_buffer_pct}% buffer, have {base_balance:.6f} {self._base_token})" + ) + return OrderExecutorConfig( + timestamp=self.market_data_provider.time(), + connector_name=self.config.connector_name, + trading_pair=self.config.trading_pair, + side=TradeType.SELL, + amount=swap_amount, + execution_strategy=ExecutionStrategy.MARKET, + ) + else: + self.logger().warning( + f"Autoswap: insufficient base ({base_balance:.6f}) to sell for {quote_deficit:.6f} quote" + ) + return None + + elif base_deficit > 0 and quote_deficit > 0: + # Both tokens in deficit - user is underfunded for side=0 (BOTH) + total_deficit_quote = base_deficit * current_price + quote_deficit + self.logger().warning( + f"Autoswap: cannot swap - both tokens in deficit (side=0). " + f"Need {base_deficit:.6f} more {self._base_token} AND {quote_deficit:.6f} more {self._quote_token} " + f"(total deficit: {total_deficit_quote:.2f} {self._quote_token})" + ) + return None + + # No swap needed + return None + def _trigger_balance_update(self): """Trigger a balance update on the connector after position changes.""" try: @@ -201,6 +395,66 @@ def determine_executor_actions(self) -> List[ExecutorAction]: self.logger().debug(f"Could not capture initial balances: {e}") actions = [] + + # Check if swap executor is running (autoswap in progress) + if self._pending_swap_side is not None: + # Find and track the swap executor if not already tracked + if not self._swap_executor_id: + for e in self.executors_info: + if e.config.type == "order_executor" and e.is_active: + self._swap_executor_id = e.id + self.logger().info(f"Tracking swap executor: {e.id}") + break + + # If swap is pending but executor not found yet, wait for it to appear + if not self._swap_executor_id: + self.logger().debug("Waiting for swap executor to appear in executors_info") + return actions + + if self._swap_executor_id: + if not self.is_swap_executor_done(): + swap_executor = self.get_swap_executor() + state = swap_executor.custom_info.get("state") if swap_executor else "unknown" + self.logger().debug(f"Waiting for swap executor to complete (state: {state})") + return actions + + # Swap executor completed - check result and proceed + swap_executor = self.get_swap_executor() + pending_side = self._pending_swap_side + + # Clear swap tracking + self._swap_executor_id = None + self._pending_swap_side = None + + # Check if swap succeeded (not failed) + swap_succeeded = swap_executor and swap_executor.close_type != CloseType.FAILED + if swap_succeeded: + self.logger().info("Autoswap completed successfully, proceeding to LP position") + # Trigger balance update after successful swap + self._trigger_balance_update() + + # Create LP position with the side that was pending + if pending_side is not None: + executor_config = self._create_executor_config(pending_side) + if executor_config: + actions.append(CreateExecutorAction( + controller_id=self.config.id, + executor_config=executor_config + )) + self._initial_position_created = True + self._pending_balance_update = True + else: + # Swap failed - log error and skip LP position creation this cycle + close_type = swap_executor.close_type if swap_executor else "unknown" + self.logger().error( + f"Autoswap FAILED (close_type: {close_type}). " + f"Will retry autoswap check on next cycle for side={pending_side}" + ) + # Don't create LP position - let the next cycle re-check balances + # and potentially retry the swap + + return actions + executor = self.active_executor() # Track the active executor's ID if we don't have one yet @@ -236,11 +490,38 @@ def determine_executor_actions(self) -> List[ExecutorAction]: # Determine side for new position if self._pending_rebalance and self._pending_rebalance_side is not None: + # Rebalance: use the side determined by price direction side = self._pending_rebalance_side self._pending_rebalance = False self._pending_rebalance_side = None - else: + elif not self._initial_position_created: + # Initial position: use configured side (can be 0=BOTH, 1=BUY, 2=SELL) side = self.config.side + else: + # After initial position but no pending rebalance (e.g., position failed/closed) + # Determine side from current price vs price limits + if not self._pool_price: + self.logger().info("Waiting for pool price to determine side") + return actions + side = self._determine_side_from_price(self._pool_price) + + # Check if autoswap is needed before creating LP position + if self.config.autoswap: + if not self._pool_price: + self.logger().info("Autoswap: waiting for pool price") + return actions + swap_config = self._check_autoswap_needed(side, self._pool_price) + if swap_config: + # Create swap executor and wait for it to complete + self._pending_swap_side = side + actions.append(CreateExecutorAction( + controller_id=self.config.id, + executor_config=swap_config + )) + # Track the swap executor ID on next tick + return actions + else: + self.logger().info("Autoswap: no swap needed, balances sufficient") # Create executor config with calculated bounds executor_config = self._create_executor_config(side) @@ -252,14 +533,23 @@ def determine_executor_actions(self) -> List[ExecutorAction]: controller_id=self.config.id, executor_config=executor_config )) + # Note: _initial_position_created is set below when position is confirmed active self._pending_balance_update = True + + # Clear closed position amounts after LP position is created + self._last_closed_base_amount = None + self._last_closed_quote_amount = None + self._last_closed_base_fee = None + self._last_closed_quote_fee = None + return actions - # Trigger balance update after position is created + # Mark initial position created and trigger balance update when position is active if self._pending_balance_update: state = executor.custom_info.get("state") if state in ("IN_RANGE", "OUT_OF_RANGE"): self._pending_balance_update = False + self._initial_position_created = True # Only mark created when actually active self._trigger_balance_update() # Check executor state @@ -313,7 +603,7 @@ def _handle_rebalance(self, executor: ExecutorInfo) -> Optional[StopExecutorActi # Don't log repeatedly - this is checked every tick return None - # Step 4: Initiate rebalance + # Step 3: Initiate rebalance self._pending_rebalance = True self._pending_rebalance_side = new_side self.logger().info( @@ -324,7 +614,6 @@ def _handle_rebalance(self, executor: ExecutorInfo) -> Optional[StopExecutorActi return StopExecutorAction( controller_id=self.config.id, executor_id=executor.id, - keep_position=False, ) def _is_beyond_rebalance_threshold(self, executor: ExecutorInfo) -> bool: @@ -404,6 +693,7 @@ def _create_executor_config(self, side: int) -> Optional[LPExecutorConfig]: return LPExecutorConfig( timestamp=self.market_data_provider.time(), connector_name=self.config.connector_name, + lp_provider=self.config.lp_provider, trading_pair=self.config.trading_pair, pool_address=self.config.pool_address, lower_price=lower_price, @@ -411,72 +701,70 @@ def _create_executor_config(self, side: int) -> Optional[LPExecutorConfig]: base_amount=base_amt, quote_amount=quote_amt, side=side, - position_offset_pct=self.config.position_offset_pct, extra_params=extra_params if extra_params else None, - keep_position=False, ) def _calculate_amounts(self, side: int, current_price: Decimal) -> tuple: """ - Calculate base and quote amounts based on side and total_amount_quote. + Calculate base and quote amounts based on side, offset, and total_amount_quote. - For rebalances, clamps to the actual amounts returned from the closed position - to avoid order failures when balance is less than configured total (due to - impermanent loss, fees, or price movement). + Allocation logic: + - Side 0 (BOTH): split 50/50 + - Side 1/2 with offset >= 0 (out-of-range): 100% single-sided + - Side 1/2 with offset < 0 (in-range): proportional split based on price position - Side 0 (BOTH): split 50/50 - Side 1 (BUY): all quote - clamp to closed position's quote + quote_fee - Side 2 (SELL): all base - clamp to closed position's base + base_fee + For in-range positions, the split is calculated based on where current price + sits in the range. This mirrors CLMM behavior where both tokens are needed + when price is within bounds. + + Note: No clamping is done here - autoswap handles any token deficits. """ total = self.config.total_amount_quote - - # For rebalances, clamp to actual amounts from closed position - # Check if we have captured amounts (indicates this is a rebalance) - has_closed_amounts = (self._last_closed_base_amount is not None or - self._last_closed_quote_amount is not None) - if has_closed_amounts: - if side == 1: # BUY - needs quote token - if self._last_closed_quote_amount is not None: - # Total available = position amount + fees earned - available_quote = self._last_closed_quote_amount - if self._last_closed_quote_fee: - available_quote += self._last_closed_quote_fee - if available_quote < total: - self.logger().info( - f"Clamping quote amount from {total} to {available_quote} {self._quote_token} " - f"(closed position returned {self._last_closed_quote_amount} + {self._last_closed_quote_fee} fees)" - ) - total = available_quote - elif side == 2: # SELL - needs base token - if self._last_closed_base_amount is not None: - # Total available = position amount + fees earned - available_base = self._last_closed_base_amount - if self._last_closed_base_fee: - available_base += self._last_closed_base_fee - available_as_quote = available_base * current_price - if available_as_quote < total: - self.logger().info( - f"Clamping total from {total} to {available_as_quote:.4f} " - f"{self._quote_token} (closed: {self._last_closed_base_amount} + " - f"{self._last_closed_base_fee} fees {self._base_token})" - ) - total = available_as_quote - - # Clear the cached amounts after use - self._last_closed_base_amount = None - self._last_closed_quote_amount = None - self._last_closed_base_fee = None - self._last_closed_quote_fee = None + offset = self.config.position_offset_pct if side == 0: # BOTH quote_amt = total / Decimal("2") base_amt = quote_amt / current_price - elif side == 1: # BUY - base_amt = Decimal("0") - quote_amt = total - else: # SELL - base_amt = total / current_price - quote_amt = Decimal("0") + elif offset >= 0: + # Out-of-range: single-sided allocation + if side == 1: # BUY - all quote + base_amt = Decimal("0") + quote_amt = total + else: # SELL - all base + base_amt = total / current_price + quote_amt = Decimal("0") + else: + # In-range (offset < 0): proportional split based on price position in range + # Calculate bounds to determine where price sits + lower_price, upper_price = self._calculate_price_bounds(side, current_price) + price_range = upper_price - lower_price + + if price_range <= 0 or current_price <= lower_price: + # At or below lower bound - all quote for BUY, all base for SELL + if side == 1: + base_amt = Decimal("0") + quote_amt = total + else: + base_amt = total / current_price + quote_amt = Decimal("0") + elif current_price >= upper_price: + # At or above upper bound - all base for SELL, all quote for BUY + if side == 2: + base_amt = total / current_price + quote_amt = Decimal("0") + else: + base_amt = Decimal("0") + quote_amt = total + else: + # Price is in range - calculate proportional split + # price_ratio: 0 at lower_price, 1 at upper_price + price_ratio = (current_price - lower_price) / price_range + # As price goes up, more of the position is in quote, less in base + quote_pct = price_ratio + base_pct = Decimal("1") - price_ratio + + quote_amt = total * quote_pct + base_amt = (total * base_pct) / current_price return base_amt, quote_amt @@ -566,12 +854,52 @@ def _is_price_within_limits(self, price: Decimal, side: int) -> bool: return False return True + def _determine_side_from_price(self, current_price: Decimal) -> int: + """ + Determine side (1=BUY or 2=SELL) based on current price vs price limits. + + Used after initial position to ensure we never use side=0 (BOTH) for rebalances. + - If price is closer to buy range, use BUY (1) + - If price is closer to sell range, use SELL (2) + """ + # Get midpoints of buy and sell ranges + buy_mid = None + sell_mid = None + + if self.config.buy_price_min and self.config.buy_price_max: + buy_mid = (self.config.buy_price_min + self.config.buy_price_max) / 2 + if self.config.sell_price_min and self.config.sell_price_max: + sell_mid = (self.config.sell_price_min + self.config.sell_price_max) / 2 + + # If both ranges defined, use the one price is closer to + if buy_mid and sell_mid: + if current_price <= buy_mid: + return 1 # BUY - price in lower range + elif current_price >= sell_mid: + return 2 # SELL - price in upper range + else: + # Price between buy_mid and sell_mid - use BUY if closer to buy_mid + return 1 if (current_price - buy_mid) < (sell_mid - current_price) else 2 + + # If only one range defined, use that side + if buy_mid: + return 1 + if sell_mid: + return 2 + + # No price limits defined - default to BUY + return 1 + async def update_processed_data(self): - """Called every tick - always fetch fresh pool price for accurate position creation.""" + """Called every tick - fetch pool price.""" try: connector = self.market_data_provider.get_connector(self.config.connector_name) if hasattr(connector, 'get_pool_info_by_address'): - pool_info = await connector.get_pool_info_by_address(self.config.pool_address) + pool_info = await connector.get_pool_info_by_address( + self.config.pool_address, + dex_name=self.lp_dex_name, + trading_type=self.lp_trading_type, + ) if pool_info and pool_info.price: self._pool_price = Decimal(str(pool_info.price)) except Exception as e: @@ -589,80 +917,87 @@ def to_format_status(self) -> List[str]: status.append(header + " " * (box_width - len(header) + 1) + "|") status.append("+" + "-" * box_width + "+") - # Network, connector, pool - line = f"| Network: {self.config.network}" + # === CONFIG SECTION === + line = f"| Network: {self.config.connector_name} | LP: {self.config.lp_provider}" status.append(line + " " * (box_width - len(line) + 1) + "|") line = f"| Pool: {self.config.pool_address}" status.append(line + " " * (box_width - len(line) + 1) + "|") - # Position info from current executor (active or transitioning) - executor = self.active_executor() or self.get_tracked_executor() - if executor and not executor.is_done: - position_address = executor.custom_info.get("position_address", "N/A") - line = f"| Position: {position_address}" - status.append(line + " " * (box_width - len(line) + 1) + "|") - # Config summary side_names = {0: "BOTH", 1: "BUY", 2: "SELL"} side_str = side_names.get(self.config.side, '?') amt = self.config.total_amount_quote width = self.config.position_width_pct + offset = self.config.position_offset_pct rebal = self.config.rebalance_seconds - line = f"| Config: side={side_str}, amount={amt} {self._quote_token}, width={width}%, rebal={rebal}s" + line = f"| Config: side={side_str}, amount={amt} {self._quote_token}, width={width}%, offset={offset}%, rebal={rebal}s" status.append(line + " " * (box_width - len(line) + 1) + "|") - # Position fees and assets + # Spacer before Position section + status.append("|" + " " * box_width + "|") + + # === POSITION SECTION === + executor = self.active_executor() or self.get_tracked_executor() + + # Get position amounts for balance calculations + pos_base_amount = Decimal("0") + pos_quote_amount = Decimal("0") + if executor and not executor.is_done: custom = executor.custom_info + # Position Address + position_address = custom.get("position_address", "N/A") + line = f"| Position: {position_address}" + status.append(line + " " * (box_width - len(line) + 1) + "|") + + # Assets row: base_amount + quote_amount = total value + pos_base_amount = Decimal(str(custom.get("base_amount", 0))) + pos_quote_amount = Decimal(str(custom.get("quote_amount", 0))) + total_value_quote = Decimal(str(custom.get("total_value_quote", 0))) + line = ( + f"| Assets: {float(pos_base_amount):.6f} {self._base_token} + " + f"{float(pos_quote_amount):.6f} {self._quote_token} = {float(total_value_quote):.4f} {self._quote_token}" + ) + status.append(line + " " * (box_width - len(line) + 1) + "|") + # Fees row: base_fee + quote_fee = total base_fee = Decimal(str(custom.get("base_fee", 0))) quote_fee = Decimal(str(custom.get("quote_fee", 0))) fees_earned_quote = Decimal(str(custom.get("fees_earned_quote", 0))) line = ( f"| Fees: {float(base_fee):.6f} {self._base_token} + " - f"{float(quote_fee):.6f} {self._quote_token} = {float(fees_earned_quote):.6f}" + f"{float(quote_fee):.6f} {self._quote_token} = {float(fees_earned_quote):.6f} {self._quote_token}" ) status.append(line + " " * (box_width - len(line) + 1) + "|") - # Value row: base_amount + quote_amount = total value - base_amount = Decimal(str(custom.get("base_amount", 0))) - quote_amount = Decimal(str(custom.get("quote_amount", 0))) - total_value_quote = Decimal(str(custom.get("total_value_quote", 0))) - line = ( - f"| Value: {float(base_amount):.6f} {self._base_token} + " - f"{float(quote_amount):.6f} {self._quote_token} = {float(total_value_quote):.4f}" - ) - status.append(line + " " * (box_width - len(line) + 1) + "|") + # Price and rebalance thresholds + lower_price = custom.get("lower_price") + upper_price = custom.get("upper_price") - # Position range visualization - lower_price = executor.custom_info.get("lower_price") - upper_price = executor.custom_info.get("upper_price") - - if lower_price and upper_price and self._pool_price: - # Show rebalance thresholds (convert % to decimal) - # Takes into account price limits - rebalance only happens within limits + if lower_price is not None and upper_price is not None and self._pool_price: threshold = self.config.rebalance_threshold_pct / Decimal("100") lower_threshold = Decimal(str(lower_price)) * (Decimal("1") - threshold) upper_threshold = Decimal(str(upper_price)) * (Decimal("1") + threshold) # Lower threshold triggers SELL - check sell_price_min if self.config.sell_price_min and lower_threshold < self.config.sell_price_min: - lower_str = "N/A" # Below sell limit, no rebalance possible + lower_str = "N/A" else: lower_str = f"{float(lower_threshold):.{price_decimals}f}" # Upper threshold triggers BUY - check buy_price_max if self.config.buy_price_max and upper_threshold > self.config.buy_price_max: - upper_str = "N/A" # Above buy limit, no rebalance possible + upper_str = "N/A" else: upper_str = f"{float(upper_threshold):.{price_decimals}f}" line = f"| Price: {float(self._pool_price):.{price_decimals}f} | Rebalance if: <{lower_str} or >{upper_str}" status.append(line + " " * (box_width - len(line) + 1) + "|") - state = executor.custom_info.get("state", "UNKNOWN") + # Status with icon + state = custom.get("state", "UNKNOWN") state_icons = { "IN_RANGE": "●", "OUT_OF_RANGE": "○", @@ -677,6 +1012,7 @@ def to_format_status(self) -> List[str]: line = f"| Position Status: [{state_icon} {state}]" status.append(line + " " * (box_width - len(line) + 1) + "|") + # Range visualization range_viz = self._create_price_range_visualization( Decimal(str(lower_price)), self._pool_price, @@ -686,27 +1022,28 @@ def to_format_status(self) -> List[str]: line = f"| {viz_line}" status.append(line + " " * (box_width - len(line) + 1) + "|") - # Show rebalance timer if out of range - out_of_range_seconds = executor.custom_info.get("out_of_range_seconds") + # Rebalance timer if out of range + out_of_range_seconds = custom.get("out_of_range_seconds") if out_of_range_seconds is not None: - # Check if beyond threshold beyond_threshold = self._is_beyond_rebalance_threshold(executor) if beyond_threshold: line = f"| Rebalance: {out_of_range_seconds}s / {self.config.rebalance_seconds}s" else: line = f"| Rebalance: waiting (below {float(self.config.rebalance_threshold_pct):.2f}% threshold)" status.append(line + " " * (box_width - len(line) + 1) + "|") + else: + line = "| Position: None" + status.append(line + " " * (box_width - len(line) + 1) + "|") - # Price limits visualization + # === PRICE LIMITS VISUALIZATION === has_limits = any([ self.config.sell_price_min, self.config.sell_price_max, self.config.buy_price_min, self.config.buy_price_max ]) if has_limits and self._pool_price: - # Get position bounds if available pos_lower = None pos_upper = None - if executor: + if executor and not executor.is_done: pos_lower = executor.custom_info.get("lower_price") pos_upper = executor.custom_info.get("upper_price") if pos_lower: @@ -723,77 +1060,96 @@ def to_format_status(self) -> List[str]: line = f"| {viz_line}" status.append(line + " " * (box_width - len(line) + 1) + "|") - # Balance comparison table (formatted like main balance table) + # === BALANCES === status.append("|" + " " * box_width + "|") try: - current_base = self.market_data_provider.get_balance( + wallet_base = self.market_data_provider.get_balance( self.config.connector_name, self._base_token ) - current_quote = self.market_data_provider.get_balance( + wallet_quote = self.market_data_provider.get_balance( self.config.connector_name, self._quote_token ) line = "| Balances:" status.append(line + " " * (box_width - len(line) + 1) + "|") - # Table header - header = f"| {'Asset':<12} {'Initial':>14} {'Current':>14} {'Change':>16}" + # Table header: Asset | Initial | Current (wallet) | Position | Change + header = f"| {'Asset':<8} {'Initial':>12} {'Current':>12} {'Position':>12} {'Change':>14}" status.append(header + " " * (box_width - len(header) + 1) + "|") # Base token row + # Change = (wallet + position) - initial if self._initial_base_balance is not None: - base_change = current_base - self._initial_base_balance + total_base = wallet_base + pos_base_amount + base_change = total_base - self._initial_base_balance init_b = float(self._initial_base_balance) - curr_b = float(current_base) + wall_b = float(wallet_base) + pos_b = float(pos_base_amount) chg_b = float(base_change) - line = f"| {self._base_token:<12} {init_b:>14.6f} {curr_b:>14.6f} {chg_b:>+16.6f}" + line = f"| {self._base_token:<8} {init_b:>12.6f} {wall_b:>12.6f} {pos_b:>12.6f} {chg_b:>+14.6f}" else: - curr_b = float(current_base) - line = f"| {self._base_token:<12} {'N/A':>14} {curr_b:>14.6f} {'N/A':>16}" + wall_b = float(wallet_base) + pos_b = float(pos_base_amount) + line = f"| {self._base_token:<8} {'N/A':>12} {wall_b:>12.6f} {pos_b:>12.6f} {'N/A':>14}" status.append(line + " " * (box_width - len(line) + 1) + "|") # Quote token row if self._initial_quote_balance is not None: - quote_change = current_quote - self._initial_quote_balance + total_quote = wallet_quote + pos_quote_amount + quote_change = total_quote - self._initial_quote_balance init_q = float(self._initial_quote_balance) - curr_q = float(current_quote) + wall_q = float(wallet_quote) + pos_q = float(pos_quote_amount) chg_q = float(quote_change) - line = f"| {self._quote_token:<12} {init_q:>14.6f} {curr_q:>14.6f} {chg_q:>+16.6f}" + line = f"| {self._quote_token:<8} {init_q:>12.6f} {wall_q:>12.6f} {pos_q:>12.6f} {chg_q:>+14.6f}" else: - curr_q = float(current_quote) - line = f"| {self._quote_token:<12} {'N/A':>14} {curr_q:>14.6f} {'N/A':>16}" + wall_q = float(wallet_quote) + pos_q = float(pos_quote_amount) + line = f"| {self._quote_token:<8} {'N/A':>12} {wall_q:>12.6f} {pos_q:>12.6f} {'N/A':>14}" status.append(line + " " * (box_width - len(line) + 1) + "|") except Exception as e: line = f"| Balances: Error fetching ({e})" status.append(line + " " * (box_width - len(line) + 1) + "|") - # Closed positions summary + # === CLOSED POSITIONS SUMMARY === status.append("|" + " " * box_width + "|") closed = [e for e in self.executors_info if e.is_done] - # Count closed by side (config.side: 0=both, 1=buy, 2=sell) - both_count = len([e for e in closed if getattr(e.config, "side", None) == 0]) - buy_count = len([e for e in closed if getattr(e.config, "side", None) == 1]) - sell_count = len([e for e in closed if getattr(e.config, "side", None) == 2]) + # Separate LP positions from swaps + closed_lp = [e for e in closed if getattr(e.config, "type", None) == "lp_executor"] + closed_swaps = [e for e in closed if getattr(e.config, "type", None) == "order_executor"] - # Calculate fees from closed positions + # Count LP positions by side + both_count = len([e for e in closed_lp if getattr(e.config, "side", None) == 0]) + buy_count = len([e for e in closed_lp if getattr(e.config, "side", None) == 1]) + sell_count = len([e for e in closed_lp if getattr(e.config, "side", None) == 2]) + + # Calculate fees from closed LP positions total_fees_base = Decimal("0") total_fees_quote = Decimal("0") - for e in closed: + for e in closed_lp: total_fees_base += Decimal(str(e.custom_info.get("base_fee", 0))) total_fees_quote += Decimal(str(e.custom_info.get("quote_fee", 0))) pool_price = self._pool_price or Decimal("0") total_fees_value = total_fees_base * pool_price + total_fees_quote - line = f"| Closed: {len(closed)} (both:{both_count} buy:{buy_count} sell:{sell_count})" + line = f"| Closed Positions: {len(closed_lp)} (both:{both_count} buy:{buy_count} sell:{sell_count})" status.append(line + " " * (box_width - len(line) + 1) + "|") + + # Show swaps count if any + if closed_swaps: + swap_buy = len([e for e in closed_swaps if e.custom_info.get("side") == "BUY"]) + swap_sell = len([e for e in closed_swaps if e.custom_info.get("side") == "SELL"]) + line = f"| Swaps Executed: {len(closed_swaps)} (buy:{swap_buy} sell:{swap_sell})" + status.append(line + " " * (box_width - len(line) + 1) + "|") + fb = float(total_fees_base) fq = float(total_fees_quote) fv = float(total_fees_value) - line = f"| Fees Collected: {fb:.6f} {self._base_token} + {fq:.6f} {self._quote_token} = {fv:.6f}" + line = f"| Fees Collected: {fb:.6f} {self._base_token} + {fq:.6f} {self._quote_token} = {fv:.6f} {self._quote_token}" status.append(line + " " * (box_width - len(line) + 1) + "|") status.append("+" + "-" * box_width + "+") diff --git a/main.py b/main.py index 0f2d996b..eba36fca 100644 --- a/main.py +++ b/main.py @@ -297,6 +297,7 @@ async def lifespan(app: FastAPI): description="API for managing Hummingbot trading instances", version=VERSION, lifespan=lifespan, + redirect_slashes=False, ) # Add CORS middleware diff --git a/models/executors.py b/models/executors.py index dda1b455..eadc0015 100644 --- a/models/executors.py +++ b/models/executors.py @@ -211,7 +211,7 @@ class PositionsSummaryResponse(BaseModel): "twap_executor", "xemm_executor", "order_executor", - "lp_executor" + "lp_executor", ] @@ -246,21 +246,19 @@ class CreateExecutorRequest(BaseModel): }, { "summary": "LP Executor", - "description": "Create an LP position on a CLMM DEX (Meteora, Raydium)", + "description": "Create an LP position on a CLMM DEX", "value": { "account_name": "master_account", "executor_config": { "type": "lp_executor", - "connector_name": "meteora/clmm", - "trading_pair": "SOL-USDC", + "connector_name": "solana-mainnet-beta", + "lp_provider": "meteora/clmm", "pool_address": "HTvjzsfX3yU6BUodCjZ5vZkUrAxMDTrBs3CJaq43ashR", "lower_price": "80", "upper_price": "100", "base_amount": "0", "quote_amount": "10.0", "side": 1, - "auto_close_above_range_seconds": None, - "auto_close_below_range_seconds": 300, "extra_params": {"strategyType": 0}, "keep_position": False } diff --git a/routers/accounts.py b/routers/accounts.py index a5e4106a..ff126207 100644 --- a/routers/accounts.py +++ b/routers/accounts.py @@ -10,7 +10,7 @@ router = APIRouter(tags=["Accounts"], prefix="/accounts") -@router.get("/", response_model=List[str]) +@router.get("", response_model=List[str]) async def list_accounts(accounts_service: AccountsService = Depends(get_accounts_service)): """ Get a list of all account names in the system. diff --git a/routers/archived_bots.py b/routers/archived_bots.py index 38b42d3b..454145ee 100644 --- a/routers/archived_bots.py +++ b/routers/archived_bots.py @@ -1,4 +1,5 @@ from typing import List, Optional + from fastapi import APIRouter, HTTPException, Query from utils.file_system import fs_util @@ -7,7 +8,7 @@ router = APIRouter(tags=["Archived Bots"], prefix="/archived-bots") -@router.get("/", response_model=List[str]) +@router.get("", response_model=List[str]) async def list_databases(): """ List all available database files in the system. diff --git a/routers/connectors.py b/routers/connectors.py index 85d7af86..8a1f906b 100644 --- a/routers/connectors.py +++ b/routers/connectors.py @@ -10,15 +10,17 @@ router = APIRouter(tags=["Connectors"], prefix="/connectors") -@router.get("/", response_model=List[str]) +@router.get("", response_model=List[str]) async def available_connectors(): """ Get a list of all available connectors. Returns: - List of connector names supported by the system + List of connector names supported by the system (excludes DEX providers which use Gateway networks) """ - return list(AllConnectorSettings.get_connector_settings().keys()) + all_connectors = AllConnectorSettings.get_connector_settings().keys() + # Filter out DEX providers (contain '/') - these are accessed via Gateway networks + return [c for c in all_connectors if '/' not in c] @router.get("/{connector_name}/config-map", response_model=Dict[str, dict]) diff --git a/routers/controllers.py b/routers/controllers.py index feec75f6..44d0cec5 100644 --- a/routers/controllers.py +++ b/routers/controllers.py @@ -11,7 +11,7 @@ router = APIRouter(tags=["Controllers"], prefix="/controllers") -@router.get("/", response_model=Dict[str, List[str]]) +@router.get("", response_model=Dict[str, List[str]]) async def list_controllers(): """ List all controllers organized by type. @@ -55,7 +55,7 @@ async def list_controllers(): # Controller Configuration endpoints (must come before controller type routes) -@router.get("/configs/", response_model=List[Dict]) +@router.get("/configs", response_model=List[Dict]) async def list_controller_configs(): """ List all controller configurations with metadata. diff --git a/routers/docker.py b/routers/docker.py index 7b0f8287..7a38e453 100644 --- a/routers/docker.py +++ b/routers/docker.py @@ -1,11 +1,11 @@ import os -from fastapi import APIRouter, HTTPException, Depends +from fastapi import APIRouter, Depends, HTTPException +from deps import get_bot_archiver, get_docker_service from models import DockerImage -from utils.bot_archiver import BotArchiver from services.docker_service import DockerService -from deps import get_docker_service, get_bot_archiver +from utils.bot_archiver import BotArchiver router = APIRouter(tags=["Docker"], prefix="/docker") @@ -24,7 +24,7 @@ async def is_docker_running(docker_service: DockerService = Depends(get_docker_s return docker_service.is_docker_running() -@router.get("/available-images/") +@router.get("/available-images") async def available_images(image_name: str = None, docker_service: DockerService = Depends(get_docker_service)): """ Get available Docker images matching the specified name. @@ -39,7 +39,7 @@ async def available_images(image_name: str = None, docker_service: DockerService available_images = docker_service.get_available_images() if image_name: return [tag for image in available_images["images"] for tag in image.tags if image_name in tag] - return [tag for tag in available_images["images"]] + return [tag for image in available_images["images"] for tag in image.tags] @router.get("/active-containers") @@ -161,7 +161,7 @@ async def start_container(container_name: str, docker_service: DockerService = D return docker_service.start_container(container_name) -@router.post("/pull-image/") +@router.post("/pull-image") async def pull_image(image: DockerImage, docker_service: DockerService = Depends(get_docker_service)): """ Initiate Docker image pull as background task. @@ -178,7 +178,7 @@ async def pull_image(image: DockerImage, docker_service: DockerService = Depends return result -@router.get("/pull-status/") +@router.get("/pull-status") async def get_pull_status(docker_service: DockerService = Depends(get_docker_service)): """ Get status of all pull operations. diff --git a/routers/gateway.py b/routers/gateway.py index 7bd7b428..6c599b87 100644 --- a/routers/gateway.py +++ b/routers/gateway.py @@ -606,9 +606,7 @@ async def add_network_token( return { "success": True, - "message": f"Token {token_request.symbol} added to {network_id}. Restart Gateway for changes to take effect.", - "restart_required": True, - "restart_endpoint": "POST /gateway/restart", + "message": f"Token {token_request.symbol} added to {network_id}.", "token": { "symbol": token_request.symbol, "address": token_request.address, @@ -661,9 +659,7 @@ async def delete_network_token( return { "success": True, - "message": f"Token {token_address} deleted from {network_id}. Restart Gateway for changes to take effect.", - "restart_required": True, - "restart_endpoint": "POST /gateway/restart", + "message": f"Token {token_address} deleted from {network_id}.", "token_address": token_address, "network_id": network_id } diff --git a/routers/scripts.py b/routers/scripts.py index c4c18409..bea8628e 100644 --- a/routers/scripts.py +++ b/routers/scripts.py @@ -1,7 +1,7 @@ import json -import yaml from typing import Dict, List +import yaml from fastapi import APIRouter, HTTPException from starlette import status @@ -11,7 +11,7 @@ router = APIRouter(tags=["Scripts"], prefix="/scripts") -@router.get("/", response_model=List[str]) +@router.get("", response_model=List[str]) async def list_scripts(): """ List all available scripts. @@ -23,7 +23,7 @@ async def list_scripts(): # Script Configuration endpoints (must come before script name routes) -@router.get("/configs/", response_model=List[Dict]) +@router.get("/configs", response_model=List[Dict]) async def list_script_configs(): """ List all script configurations with metadata. diff --git a/services/accounts_service.py b/services/accounts_service.py index 4726dc19..15125fee 100644 --- a/services/accounts_service.py +++ b/services/accounts_service.py @@ -2190,6 +2190,13 @@ async def _fetch_gateway_prices_immediate(self, chain: str, network: str, logger.warning(f"No pricing connector configured for chain '{chain}', skipping immediate price fetch") return prices + # Parse pricing connector into dex and trading_type (e.g., "jupiter/router" -> "jupiter", "router") + if "/" in pricing_connector: + dex_name, trading_type = pricing_connector.split("/", 1) + else: + dex_name = pricing_connector + trading_type = "router" + # Create tasks for all tokens in parallel tasks = [] task_tokens = [] @@ -2225,7 +2232,8 @@ async def _fetch_gateway_prices_immediate(self, chain: str, network: str, task = gateway_client.get_price( chain=chain, network=network, - connector=pricing_connector, + dex=dex_name, + trading_type=trading_type, base_asset=token, quote_asset=quote_asset, amount=Decimal("1"), diff --git a/services/executor_service.py b/services/executor_service.py index 13b24dc1..3691a858 100644 --- a/services/executor_service.py +++ b/services/executor_service.py @@ -348,13 +348,13 @@ async def create_executor( # Extract connector and trading pair from config connector_name = executor_config.get("connector_name") trading_pair = executor_config.get("trading_pair") - if not connector_name: - raise HTTPException(status_code=400, detail="connector_name is required in executor_config") - if not trading_pair: - raise HTTPException(status_code=400, detail="trading_pair is required in executor_config") # Ensure connector and market are ready - await trading_interface.add_market(connector_name, trading_pair) + if connector_name: + if trading_pair: + await trading_interface.add_market(connector_name, trading_pair) + else: + await trading_interface.ensure_connector(connector_name) # Set timestamp if not provided (required for time-based features like time_limit) if "timestamp" not in executor_config or executor_config["timestamp"] is None: diff --git a/services/gateway_client.py b/services/gateway_client.py index b62aec07..9a92a19b 100644 --- a/services/gateway_client.py +++ b/services/gateway_client.py @@ -1,5 +1,4 @@ import logging -from decimal import Decimal from typing import Dict, List, Optional import aiohttp @@ -582,7 +581,6 @@ async def poll_transaction( self, network_id: str, tx_hash: str, - wallet_address: Optional[str] = None ) -> Optional[Dict]: """ Poll transaction status on blockchain. @@ -590,12 +588,12 @@ async def poll_transaction( Args: network_id: Network ID in format 'chain-network' (e.g., 'solana-mainnet-beta', 'ethereum-mainnet') tx_hash: Transaction hash/signature - wallet_address: Optional wallet address for verification Returns: Transaction status dict with fields: - - txStatus: 1 for confirmed, 0 for failed/pending + - txStatus: 1 for confirmed, 0 for pending, -1 for failed - fee: Transaction fee amount + - error: Parsed error message if transaction failed (e.g., "SLIPPAGE_EXCEEDED (0x1771): ...") - txData: Full transaction data including meta.err Returns None if Gateway is unavailable or request fails. """ @@ -612,11 +610,8 @@ async def poll_transaction( "network": network, "signature": tx_hash } - if wallet_address: - payload["walletAddress"] = wallet_address return await self._request("POST", f"chains/{chain}/poll", json=payload) except Exception as e: logger.error(f"Error polling transaction {tx_hash}: {e}") return None - diff --git a/services/gateway_transaction_poller.py b/services/gateway_transaction_poller.py index 991fa44d..33d810f4 100644 --- a/services/gateway_transaction_poller.py +++ b/services/gateway_transaction_poller.py @@ -8,16 +8,16 @@ """ import asyncio import logging -from typing import Optional, Dict, List from datetime import datetime, timedelta, timezone from decimal import Decimal +from typing import Dict, List, Optional from sqlalchemy import select from sqlalchemy.orm import selectinload from database import AsyncDatabaseManager -from database.repositories import GatewaySwapRepository, GatewayCLMMRepository from database.models import GatewayCLMMEvent, GatewayCLMMPosition +from database.repositories import GatewayCLMMRepository, GatewaySwapRepository from services.gateway_client import GatewayClient logger = logging.getLogger(__name__) @@ -312,9 +312,6 @@ async def _check_transaction_status( # Parse the response with defensive checks tx_status = result.get("txStatus") - tx_data = result.get("txData") or {} - meta = tx_data.get("meta") if isinstance(tx_data, dict) else {} - error = meta.get("err") if isinstance(meta, dict) else None # Determine gas token based on chain gas_token = { @@ -326,8 +323,8 @@ async def _check_transaction_status( "avalanche": "AVAX" }.get(chain, "UNKNOWN") - # Transaction is confirmed if txStatus == 1 and no error - if tx_status == 1 and error is None: + # Transaction is confirmed if txStatus == 1 + if tx_status == 1: return { "status": "CONFIRMED", "gas_fee": result.get("fee", 0), @@ -335,9 +332,16 @@ async def _check_transaction_status( "error_message": None } - # Transaction failed if there's an error - if error is not None: - error_msg = str(error) if error else "Transaction failed on-chain" + # Transaction failed if txStatus == -1 or there's an error field + # Gateway now returns parsed error messages like "SLIPPAGE_EXCEEDED (0x1771): ..." + error_msg = result.get("error") + if tx_status == -1 or error_msg: + if not error_msg: + # Fallback to meta.err if no parsed error + tx_data = result.get("txData") or {} + meta = tx_data.get("meta") if isinstance(tx_data, dict) else {} + raw_error = meta.get("err") if isinstance(meta, dict) else None + error_msg = str(raw_error) if raw_error else "Transaction failed on-chain" return { "status": "FAILED", "gas_fee": result.get("fee", 0), @@ -352,14 +356,13 @@ async def _check_transaction_status( logger.error(f"Error checking transaction status for {tx_hash}: {e}") return None - async def poll_transaction_once(self, tx_hash: str, network_id: str, wallet_address: Optional[str] = None) -> Optional[Dict]: + async def poll_transaction_once(self, tx_hash: str, network_id: str) -> Optional[Dict]: """ Poll a specific transaction once (useful for immediate status checks). Args: tx_hash: Transaction hash network_id: Network ID in format 'chain-network' (e.g., 'solana-mainnet-beta') - wallet_address: Optional wallet address for verification Returns: Transaction status dict or None if pending diff --git a/services/orders_recorder.py b/services/orders_recorder.py index 90ca8119..04b41bce 100644 --- a/services/orders_recorder.py +++ b/services/orders_recorder.py @@ -2,20 +2,14 @@ import logging import math import time - -from typing import Any, Optional, Union from datetime import datetime from decimal import Decimal, InvalidOperation +from typing import Any, Optional, Union -from hummingbot.core.event.event_forwarder import SourceInfoEventForwarder -from hummingbot.core.event.events import ( - TradeType, - BuyOrderCreatedEvent, - SellOrderCreatedEvent, - OrderFilledEvent, - MarketEvent -) from hummingbot.connector.connector_base import ConnectorBase +from hummingbot.core.event.event_forwarder import SourceInfoEventForwarder +from hummingbot.core.event.events import BuyOrderCreatedEvent, MarketEvent, OrderFilledEvent, SellOrderCreatedEvent, TradeType + from database import AsyncDatabaseManager, OrderRepository, TradeRepository # Initialize logger @@ -215,10 +209,28 @@ async def _handle_order_filled(self, event: OrderFilledEvent): trade_fee_paid = float(fee_in_quote) trade_fee_currency = quote_asset except Exception as e: - logger.error(f"Error calculating trade fee: {e}") - trade_fee_paid = 0 - trade_fee_currency = None - + logger.warning(f"Primary fee calculation failed: {e}. Attempting fallback...") + try: + base_asset, quote_asset = event.trading_pair.split("-") + fallback_fee = await self._calculate_fee_fallback( + trade_fee=event.trade_fee, + base_asset=base_asset, + quote_asset=quote_asset, + fill_price=event.price, + order_amount=event.amount, + ) + if fallback_fee is not None: + trade_fee_paid = float(fallback_fee) + trade_fee_currency = quote_asset + logger.info(f"Fallback fee calculation succeeded: {trade_fee_paid} {trade_fee_currency}") + else: + logger.error(f"Fallback fee calculation returned None for {event.order_id}") + trade_fee_paid = 0 + trade_fee_currency = None + except Exception as fallback_err: + logger.error(f"Fallback fee calculation also failed: {fallback_err}") + trade_fee_paid = 0 + trade_fee_currency = None # Update order with fill information (handle potential NaN values like Hummingbot does) try: filled_amount = Decimal(str(event.amount)) @@ -303,7 +315,67 @@ def _get_order_details_from_connector(self, order_id: str) -> Optional[dict]: except Exception as e: logger.error(f"Error getting order details from connector: {e}") return None - + + async def _fetch_conversion_rate(self, from_token: str, to_token: str) -> Optional[Decimal]: + """Fetch the conversion rate between two tokens using the connector's REST API. + Tries direct pair first, then inverse pair.""" + if not self._connector: + return None + try: + direct_pair = f"{from_token}-{to_token}" + price = await asyncio.wait_for( + self._connector._get_last_traded_price(trading_pair=direct_pair), + timeout=5.0, + ) + if price and price > 0: + return Decimal(str(price)) + except Exception: + pass + try: + inverse_pair = f"{to_token}-{from_token}" + price = await asyncio.wait_for( + self._connector._get_last_traded_price(trading_pair=inverse_pair), + timeout=5.0, + ) + if price and price > 0: + return Decimal(1) / Decimal(str(price)) + except Exception: + pass + return None + + async def _calculate_fee_fallback( + self, + trade_fee, + base_asset: str, + quote_asset: str, + fill_price: Decimal, + order_amount: Decimal, + ) -> Optional[Decimal]: + """Manually compute the trade fee in quote asset when the primary method fails.""" + fee_amount = Decimal(0) + + # Handle percent component + if trade_fee.percent and trade_fee.percent != Decimal(0): + fee_amount += (fill_price * order_amount) * trade_fee.percent + + # Handle flat_fees component + for flat_fee in trade_fee.flat_fees: + if flat_fee.token == quote_asset: + fee_amount += flat_fee.amount + elif flat_fee.token == base_asset: + fee_amount += flat_fee.amount * fill_price + else: + rate = await self._fetch_conversion_rate(flat_fee.token, quote_asset) + if rate is not None: + fee_amount += flat_fee.amount * rate + else: + logger.error( + f"Could not fetch conversion rate for {flat_fee.token} -> {quote_asset}" + ) + return None + + return fee_amount + async def _handle_order_failed(self, event: Any): """Handle order failure events""" try: diff --git a/services/unified_connector_service.py b/services/unified_connector_service.py index 9d697f0a..830a29ed 100644 --- a/services/unified_connector_service.py +++ b/services/unified_connector_service.py @@ -22,7 +22,7 @@ from hummingbot.connector.connector_base import ConnectorBase from hummingbot.connector.connector_metrics_collector import TradeVolumeMetricCollector from hummingbot.connector.exchange_py_base import ExchangePyBase -from hummingbot.connector.gateway.gateway_lp import GatewayLp +from hummingbot.connector.gateway.gateway import Gateway from hummingbot.connector.perpetual_derivative_py_base import PerpetualDerivativePyBase from hummingbot.core.data_type.common import OrderType, PositionAction, PositionMode, TradeType from hummingbot.core.data_type.in_flight_order import InFlightOrder, OrderState @@ -635,21 +635,21 @@ def _create_trading_connector( ) -> ConnectorBase: """Create a trading connector with API keys. - For gateway connectors (containing '/'), creates a GatewayLp connector - which auto-detects chain/network and uses the default wallet. + For Gateway network connectors (e.g., 'solana-mainnet-beta'), creates a unified + Gateway connector which auto-detects chain/network and uses the default wallet. + The dex_name and trading_type are passed to methods, not to the connector. """ BackendAPISecurity.login_account( account_name=account_name, secrets_manager=self.secrets_manager ) - # Gateway connectors (e.g., 'meteora/clmm', 'raydium/clmm') are not in AllConnectorSettings - # They use GatewayLp which auto-detects chain/network from gateway config - if '/' in connector_name: - logger.info(f"Creating gateway connector: {connector_name}") - # GatewayLp handles chain/network auto-detection and default wallet lookup - # via start_network() call - return GatewayLp( + # Check if this is a Gateway network connector + # Gateway connectors are NOT in AllConnectorSettings (those are exchange connectors) + # Network format: "chain-network" (e.g., "solana-mainnet-beta", "ethereum-mainnet") + if connector_name not in self._conn_settings: + logger.info(f"Creating Gateway connector for network: {connector_name}") + return Gateway( connector_name=connector_name, trading_pairs=[], trading_required=True,