diff --git a/bots/controllers/directional_trading/ai_livestream.py b/bots/controllers/directional_trading/ai_livestream.py deleted file mode 100644 index 28a1157a..00000000 --- a/bots/controllers/directional_trading/ai_livestream.py +++ /dev/null @@ -1,84 +0,0 @@ -from decimal import Decimal -from typing import List - -import pandas_ta as ta # noqa: F401 -from pydantic import Field - -from hummingbot.core.data_type.common import TradeType -from hummingbot.remote_iface.mqtt import ExternalTopicFactory -from hummingbot.strategy_v2.controllers.directional_trading_controller_base import ( - DirectionalTradingControllerBase, - DirectionalTradingControllerConfigBase, -) -from hummingbot.strategy_v2.executors.position_executor.data_types import PositionExecutorConfig - - -class AILivestreamControllerConfig(DirectionalTradingControllerConfigBase): - controller_name: str = "ai_livestream" - long_threshold: float = Field(default=0.5, json_schema_extra={"is_updatable": True}) - short_threshold: float = Field(default=0.5, json_schema_extra={"is_updatable": True}) - topic: str = "hbot/predictions" - - -class AILivestreamController(DirectionalTradingControllerBase): - def __init__(self, config: AILivestreamControllerConfig, *args, **kwargs): - self.config = config - super().__init__(config, *args, **kwargs) - # Start ML signal listener - self._init_ml_signal_listener() - - def _init_ml_signal_listener(self): - """Initialize a listener for ML signals from the MQTT broker""" - try: - normalized_pair = self.config.trading_pair.replace("-", "_").lower() - topic = f"{self.config.topic}/{normalized_pair}/ML_SIGNALS" - self._ml_signal_listener = ExternalTopicFactory.create_async( - topic=topic, - callback=self._handle_ml_signal, - use_bot_prefix=False, - ) - self.logger().info("ML signal listener initialized successfully") - except Exception as e: - self.logger().error(f"Failed to initialize ML signal listener: {str(e)}") - self._ml_signal_listener = None - - def _handle_ml_signal(self, signal: dict, topic: str): - """Handle incoming ML signal""" - # self.logger().info(f"Received ML signal: {signal}") - short, neutral, long = signal["probabilities"] - if short > self.config.short_threshold: - self.processed_data["signal"] = -1 - elif long > self.config.long_threshold: - self.processed_data["signal"] = 1 - else: - self.processed_data["signal"] = 0 - self.processed_data["features"] = signal - - async def update_processed_data(self): - pass - - def get_executor_config(self, trade_type: TradeType, price: Decimal, amount: Decimal): - """ - Get the executor config based on the trade_type, price and amount. This method can be overridden by the - subclasses if required. - """ - return PositionExecutorConfig( - timestamp=self.market_data_provider.time(), - connector_name=self.config.connector_name, - trading_pair=self.config.trading_pair, - side=trade_type, - entry_price=price, - amount=amount, - triple_barrier_config=self.config.triple_barrier_config.new_instance_with_adjusted_volatility( - volatility_factor=self.processed_data["features"].get("target_pct", 0.01)), - leverage=self.config.leverage, - ) - - def to_format_status(self) -> List[str]: - lines = [] - features = self.processed_data.get("features", {}) - lines.append(f"Signal: {self.processed_data.get('signal', 'N/A')}") - lines.append(f"Timestamp: {features.get('timestamp', 'N/A')}") - lines.append(f"Probabilities: {features.get('probabilities', 'N/A')}") - lines.append(f"Target Pct: {features.get('target_pct', 'N/A')}") - return lines diff --git a/bots/controllers/directional_trading/dman_v3.py b/bots/controllers/directional_trading/dman_v3.py index 7562af50..b763acf2 100644 --- a/bots/controllers/directional_trading/dman_v3.py +++ b/bots/controllers/directional_trading/dman_v3.py @@ -48,6 +48,7 @@ class DManV3ControllerConfig(DirectionalTradingControllerConfigBase): ) dca_spreads: List[Decimal] = Field( default="0.001,0.018,0.15,0.25", + validate_default=True, json_schema_extra={ "prompt": "Enter the spreads for each DCA level (comma-separated) if dynamic_spread=True this value " "will multiply the Bollinger Bands width, e.g. if the Bollinger Bands width is 0.1 (10%)" @@ -56,6 +57,7 @@ class DManV3ControllerConfig(DirectionalTradingControllerConfigBase): ) dca_amounts_pct: List[Decimal] = Field( default=None, + validate_default=True, json_schema_extra={ "prompt": "Enter the amounts for each DCA level (as a percentage of the total balance, " "comma-separated). Don't worry about the final sum, it will be normalized. ", diff --git a/bots/controllers/generic/pmm.py b/bots/controllers/generic/pmm.py index 97e55135..0934b5d4 100644 --- a/bots/controllers/generic/pmm.py +++ b/bots/controllers/generic/pmm.py @@ -65,6 +65,7 @@ class PMMConfig(ControllerConfigBase): ) buy_spreads: List[float] = Field( default="0.01,0.02", + validate_default=True, json_schema_extra={ "prompt_on_new": True, "is_updatable": True, "prompt": "Enter a comma-separated list of buy spreads (e.g., '0.01, 0.02'):", @@ -72,6 +73,7 @@ class PMMConfig(ControllerConfigBase): ) sell_spreads: List[float] = Field( default="0.01,0.02", + validate_default=True, json_schema_extra={ "prompt_on_new": True, "is_updatable": True, "prompt": "Enter a comma-separated list of sell spreads (e.g., '0.01, 0.02'):", @@ -79,6 +81,7 @@ class PMMConfig(ControllerConfigBase): ) buy_amounts_pct: Union[List[Decimal], None] = Field( default=None, + validate_default=True, json_schema_extra={ "prompt_on_new": True, "is_updatable": True, "prompt": "Enter a comma-separated list of buy amounts as percentages (e.g., '50, 50'), or leave blank to distribute equally:", @@ -86,6 +89,7 @@ class PMMConfig(ControllerConfigBase): ) sell_amounts_pct: Union[List[Decimal], None] = Field( default=None, + validate_default=True, json_schema_extra={ "prompt_on_new": True, "is_updatable": True, "prompt": "Enter a comma-separated list of sell amounts as percentages (e.g., '50, 50'), or leave blank to distribute equally:", @@ -180,13 +184,17 @@ def parse_and_validate_amounts(cls, v, validation_info: ValidationInfo): field_name = validation_info.field_name if v is None or v == "": spread_field = field_name.replace('amounts_pct', 'spreads') - return [1 for _ in validation_info.data[spread_field]] + return [Decimal("1") for _ in validation_info.data[spread_field]] if isinstance(v, str): - return [float(x.strip()) for x in v.split(',')] - elif isinstance(v, list) and len(v) != len(validation_info.data[field_name.replace('amounts_pct', 'spreads')]): + parsed = [Decimal(x.strip()) for x in v.split(',')] + elif isinstance(v, list): + parsed = [Decimal(str(x)) for x in v] + else: + parsed = [Decimal(str(v))] + if len(parsed) != len(validation_info.data[field_name.replace('amounts_pct', 'spreads')]): raise ValueError( f"The number of {field_name} must match the number of {field_name.replace('amounts_pct', 'spreads')}.") - return v + return parsed @field_validator('position_mode', mode="before") @classmethod diff --git a/bots/controllers/generic/pmm_adjusted.py b/bots/controllers/generic/pmm_adjusted.py index e9bc2667..9ea69604 100644 --- a/bots/controllers/generic/pmm_adjusted.py +++ b/bots/controllers/generic/pmm_adjusted.py @@ -69,6 +69,7 @@ class PMMAdjustedConfig(ControllerConfigBase): ) buy_spreads: List[float] = Field( default="0.01,0.02", + validate_default=True, json_schema_extra={ "prompt_on_new": True, "is_updatable": True, "prompt": "Enter a comma-separated list of buy spreads (e.g., '0.01, 0.02'):", @@ -76,6 +77,7 @@ class PMMAdjustedConfig(ControllerConfigBase): ) sell_spreads: List[float] = Field( default="0.01,0.02", + validate_default=True, json_schema_extra={ "prompt_on_new": True, "is_updatable": True, "prompt": "Enter a comma-separated list of sell spreads (e.g., '0.01, 0.02'):", @@ -83,6 +85,7 @@ class PMMAdjustedConfig(ControllerConfigBase): ) buy_amounts_pct: Union[List[Decimal], None] = Field( default=None, + validate_default=True, json_schema_extra={ "prompt_on_new": True, "is_updatable": True, "prompt": "Enter a comma-separated list of buy amounts as percentages (e.g., '50, 50'), or leave blank to distribute equally:", @@ -90,6 +93,7 @@ class PMMAdjustedConfig(ControllerConfigBase): ) sell_amounts_pct: Union[List[Decimal], None] = Field( default=None, + validate_default=True, json_schema_extra={ "prompt_on_new": True, "is_updatable": True, "prompt": "Enter a comma-separated list of sell amounts as percentages (e.g., '50, 50'), or leave blank to distribute equally:", @@ -184,13 +188,17 @@ def parse_and_validate_amounts(cls, v, validation_info: ValidationInfo): field_name = validation_info.field_name if v is None or v == "": spread_field = field_name.replace('amounts_pct', 'spreads') - return [1 for _ in validation_info.data[spread_field]] + return [Decimal("1") for _ in validation_info.data[spread_field]] if isinstance(v, str): - return [float(x.strip()) for x in v.split(',')] - elif isinstance(v, list) and len(v) != len(validation_info.data[field_name.replace('amounts_pct', 'spreads')]): + parsed = [Decimal(x.strip()) for x in v.split(',')] + elif isinstance(v, list): + parsed = [Decimal(str(x)) for x in v] + else: + parsed = [Decimal(str(v))] + if len(parsed) != len(validation_info.data[field_name.replace('amounts_pct', 'spreads')]): raise ValueError( f"The number of {field_name} must match the number of {field_name.replace('amounts_pct', 'spreads')}.") - return v + return parsed @field_validator('position_mode', mode="before") @classmethod diff --git a/bots/controllers/generic/pmm_mister.py b/bots/controllers/generic/pmm_mister.py index 353beee0..a4d1c780 100644 --- a/bots/controllers/generic/pmm_mister.py +++ b/bots/controllers/generic/pmm_mister.py @@ -25,10 +25,10 @@ class PMMisterConfig(ControllerConfigBase): target_base_pct: Decimal = Field(default=Decimal("0.5"), json_schema_extra={"is_updatable": True}) min_base_pct: Decimal = Field(default=Decimal("0.3"), json_schema_extra={"is_updatable": True}) max_base_pct: Decimal = Field(default=Decimal("0.7"), json_schema_extra={"is_updatable": True}) - buy_spreads: List[float] = Field(default="0.0005", json_schema_extra={"is_updatable": True}) - sell_spreads: List[float] = Field(default="0.0005", json_schema_extra={"is_updatable": True}) - buy_amounts_pct: Union[List[Decimal], None] = Field(default="1", json_schema_extra={"is_updatable": True}) - sell_amounts_pct: Union[List[Decimal], None] = Field(default="1", json_schema_extra={"is_updatable": True}) + buy_spreads: List[float] = Field(default="0.0005", validate_default=True, json_schema_extra={"is_updatable": True}) + sell_spreads: List[float] = Field(default="0.0005", validate_default=True, json_schema_extra={"is_updatable": True}) + buy_amounts_pct: Union[List[Decimal], None] = Field(default="1", validate_default=True, json_schema_extra={"is_updatable": True}) + sell_amounts_pct: Union[List[Decimal], None] = Field(default="1", validate_default=True, json_schema_extra={"is_updatable": True}) executor_refresh_time: int = Field(default=30, json_schema_extra={"is_updatable": True}) # Enhanced timing parameters @@ -89,9 +89,10 @@ def parse_and_validate_amounts(cls, v, validation_info: ValidationInfo): field_name = validation_info.field_name if v is None or v == "": spread_field = field_name.replace('amounts_pct', 'spreads') - return [1 for _ in validation_info.data[spread_field]] + return [Decimal("1") for _ in validation_info.data[spread_field]] parsed = parse_comma_separated_list(v) - if isinstance(parsed, list) and len(parsed) != len( + parsed = [Decimal(str(x)) for x in parsed] + if len(parsed) != len( validation_info.data[field_name.replace('amounts_pct', 'spreads')]): raise ValueError( f"The number of {field_name} must match the number of {field_name.replace('amounts_pct', 'spreads')}.") diff --git a/bots/controllers/market_making/dman_maker_v2.py b/bots/controllers/market_making/dman_maker_v2.py index 3ead968c..2aeeb8ad 100644 --- a/bots/controllers/market_making/dman_maker_v2.py +++ b/bots/controllers/market_making/dman_maker_v2.py @@ -22,9 +22,11 @@ class DManMakerV2Config(MarketMakingControllerConfigBase): # DCA configuration dca_spreads: List[Decimal] = Field( default="0.01,0.02,0.04,0.08", + validate_default=True, json_schema_extra={"prompt": "Enter a comma-separated list of spreads for each DCA level: ", "prompt_on_new": True}) dca_amounts: List[Decimal] = Field( default="0.1,0.2,0.4,0.8", + validate_default=True, json_schema_extra={"prompt": "Enter a comma-separated list of amounts for each DCA level: ", "prompt_on_new": True}) top_executor_refresh_time: Optional[float] = Field(default=None, json_schema_extra={"is_updatable": True}) executor_activation_bounds: Optional[List[Decimal]] = Field(default=None, json_schema_extra={"is_updatable": True}) @@ -55,13 +57,17 @@ def parse_dca_spreads(cls, v): @classmethod def parse_and_validate_dca_amounts(cls, v, validation_info): if v is None or v == "": - return [1 for _ in validation_info.data['dca_spreads']] + return [Decimal("1") for _ in validation_info.data['dca_spreads']] if isinstance(v, str): - return [float(x.strip()) for x in v.split(',')] - elif isinstance(v, list) and len(v) != len(validation_info.data['dca_spreads']): + parsed = [Decimal(x.strip()) for x in v.split(',')] + elif isinstance(v, list): + parsed = [Decimal(str(x)) for x in v] + else: + parsed = [Decimal(str(v))] + if len(parsed) != len(validation_info.data['dca_spreads']): raise ValueError( f"The number of dca amounts must match the number of {validation_info.data['dca_spreads']}.") - return v + return parsed class DManMakerV2(MarketMakingControllerBase): diff --git a/database/connection.py b/database/connection.py index f3c61ee1..a9ae68f7 100644 --- a/database/connection.py +++ b/database/connection.py @@ -69,6 +69,11 @@ async def _run_migrations(self, conn): "executors", "error_log", "ALTER TABLE executors ADD COLUMN error_log TEXT" ), + # Add cum_fees_quote to position_holds table for tracking fees + ( + "position_holds", "cum_fees_quote", + "ALTER TABLE position_holds ADD COLUMN cum_fees_quote NUMERIC(30,18) NOT NULL DEFAULT 0" + ), ] for table, column, sql in migrations: try: diff --git a/database/models.py b/database/models.py index 8ec348e9..61b5d243 100644 --- a/database/models.py +++ b/database/models.py @@ -414,6 +414,7 @@ class PositionHoldRecord(Base): sell_amount_base = Column(Numeric(precision=30, scale=18), nullable=False, default=0) sell_amount_quote = Column(Numeric(precision=30, scale=18), nullable=False, default=0) realized_pnl_quote = Column(Numeric(precision=30, scale=18), nullable=False, default=0) + cum_fees_quote = Column(Numeric(precision=30, scale=18), nullable=False, default=0) # Tracking executor_ids = Column(Text, nullable=True) # JSON array of executor IDs diff --git a/database/repositories/executor_repository.py b/database/repositories/executor_repository.py index 274e3024..c6c6c0b4 100644 --- a/database/repositories/executor_repository.py +++ b/database/repositories/executor_repository.py @@ -195,10 +195,13 @@ async def upsert_position_hold( sell_amount_base: Decimal, sell_amount_quote: Decimal, realized_pnl_quote: Decimal, - executor_ids: List[str] + cum_fees_quote: Decimal = Decimal("0"), + executor_ids: List[str] = None ) -> PositionHoldRecord: """Create or update a position hold record.""" import json as _json + if executor_ids is None: + executor_ids = [] stmt = select(PositionHoldRecord).where(and_( PositionHoldRecord.account_name == account_name, @@ -216,6 +219,7 @@ async def upsert_position_hold( record.sell_amount_base = sell_amount_base record.sell_amount_quote = sell_amount_quote record.realized_pnl_quote = realized_pnl_quote + record.cum_fees_quote = cum_fees_quote record.executor_ids = _json.dumps(executor_ids) else: record = PositionHoldRecord( @@ -228,6 +232,7 @@ async def upsert_position_hold( sell_amount_base=sell_amount_base, sell_amount_quote=sell_amount_quote, realized_pnl_quote=realized_pnl_quote, + cum_fees_quote=cum_fees_quote, executor_ids=_json.dumps(executor_ids), status="ACTIVE", ) diff --git a/models/executors.py b/models/executors.py index 1c1f1941..ece6c201 100644 --- a/models/executors.py +++ b/models/executors.py @@ -44,6 +44,9 @@ class PositionHold(BaseModel): # Realized PnL from matched positions realized_pnl_quote: Decimal = Field(default=Decimal("0"), description="Realized PnL from matched buy/sell pairs") + # Cumulative fees + cum_fees_quote: Decimal = Field(default=Decimal("0"), description="Cumulative fees paid in quote currency") + # Tracking executor_ids: List[str] = Field(default_factory=list, description="IDs of executors contributing to this position") last_updated: Optional[datetime] = Field(default=None, description="Last update timestamp") @@ -97,7 +100,8 @@ def add_fill( side: str, amount_base: Decimal, amount_quote: Decimal, - executor_id: Optional[str] = None + executor_id: Optional[str] = None, + fees_quote: Decimal = Decimal("0") ): """ Add a fill to the position tracking. @@ -107,6 +111,7 @@ def add_fill( amount_base: Amount in base currency amount_quote: Amount in quote currency executor_id: Optional executor ID to track + fees_quote: Fees paid for this fill in quote currency """ if side.upper() == "BUY": self.buy_amount_base += amount_base @@ -115,6 +120,8 @@ def add_fill( self.sell_amount_base += amount_base self.sell_amount_quote += amount_quote + self.cum_fees_quote += fees_quote + # Calculate realized PnL when we have matched volume self._calculate_realized_pnl() @@ -124,24 +131,36 @@ def add_fill( self.last_updated = datetime.utcnow() def _calculate_realized_pnl(self): - """Calculate realized PnL from matched buy/sell pairs using FIFO.""" - matched = self.matched_amount_base + """Calculate realized PnL from matched buy/sell pairs and settle matched volume. + + After settling, only the unmatched (open) position remains, so breakeven + prices always reflect the current position, not historical closed trades. + """ + matched = min(self.buy_amount_base, self.sell_amount_base) if matched > 0 and self.buy_amount_base > 0 and self.sell_amount_base > 0: - # Average prices + # Average prices before settlement avg_buy = self.buy_amount_quote / self.buy_amount_base avg_sell = self.sell_amount_quote / self.sell_amount_base # Realized PnL = matched_amount * (avg_sell - avg_buy) - self.realized_pnl_quote = matched * (avg_sell - avg_buy) + self.realized_pnl_quote += matched * (avg_sell - avg_buy) + + # Settle matched volume: remove it from both sides + self.buy_amount_base -= matched + self.buy_amount_quote -= matched * avg_buy + self.sell_amount_base -= matched + self.sell_amount_quote -= matched * avg_sell def get_unrealized_pnl(self, current_price: Decimal) -> Decimal: """ - Calculate unrealized PnL for unmatched position. + Calculate unrealized PnL for unmatched position (raw price movement). + + Fees are tracked separately in cum_fees_quote. Args: current_price: Current market price Returns: - Unrealized PnL in quote currency + Unrealized PnL in quote currency (before fees) """ if self.net_amount_base > 0: # Long position: profit if price goes up @@ -159,6 +178,7 @@ def merge(self, other: "PositionHold"): self.buy_amount_quote += other.buy_amount_quote self.sell_amount_base += other.sell_amount_base self.sell_amount_quote += other.sell_amount_quote + self.cum_fees_quote += other.cum_fees_quote for eid in other.executor_ids: if eid not in self.executor_ids: @@ -185,6 +205,7 @@ class PositionHoldResponse(BaseModel): unmatched_amount_base: float position_side: Optional[str] realized_pnl_quote: float + cum_fees_quote: float = 0.0 unrealized_pnl_quote: Optional[float] = None executor_count: int executor_ids: List[str] diff --git a/routers/executors.py b/routers/executors.py index dc17780b..1110fd43 100644 --- a/routers/executors.py +++ b/routers/executors.py @@ -409,6 +409,7 @@ async def get_positions_summary( unmatched_amount_base=float(p.unmatched_amount_base), position_side=p.position_side, realized_pnl_quote=float(p.realized_pnl_quote), + cum_fees_quote=float(p.cum_fees_quote), unrealized_pnl_quote=unrealized_pnl, executor_count=len(p.executor_ids), executor_ids=p.executor_ids, @@ -484,6 +485,7 @@ async def get_position_held( unmatched_amount_base=float(position.unmatched_amount_base), position_side=position.position_side, realized_pnl_quote=float(position.realized_pnl_quote), + cum_fees_quote=float(position.cum_fees_quote), unrealized_pnl_quote=unrealized_pnl, executor_count=len(position.executor_ids), executor_ids=position.executor_ids, diff --git a/services/backtesting_service.py b/services/backtesting_service.py index 24e7b31f..897a0515 100644 --- a/services/backtesting_service.py +++ b/services/backtesting_service.py @@ -108,7 +108,7 @@ async def _run_task(self, task: BacktestTask): except Exception as e: task.status = BacktestTaskStatus.FAILED task.error = str(e) - logger.error(f"Backtesting task {task.task_id} failed: {e}") + logger.error(f"Backtesting task {task.task_id} failed: {e}", exc_info=True) finally: task.completed_at = datetime.now(timezone.utc) diff --git a/services/executor_service.py b/services/executor_service.py index 1becb5ce..ae3f231e 100644 --- a/services/executor_service.py +++ b/services/executor_service.py @@ -166,7 +166,7 @@ async def recover_positions_from_db(self): except (json.JSONDecodeError, TypeError): pass - self._positions_held[position_key] = PositionHold( + position = PositionHold( trading_pair=record.trading_pair, connector_name=record.connector_name, account_name=record.account_name, @@ -176,9 +176,13 @@ async def recover_positions_from_db(self): sell_amount_base=Decimal(str(record.sell_amount_base or 0)), sell_amount_quote=Decimal(str(record.sell_amount_quote or 0)), realized_pnl_quote=Decimal(str(record.realized_pnl_quote or 0)), + cum_fees_quote=Decimal(str(record.cum_fees_quote or 0)), executor_ids=executor_ids, last_updated=record.last_updated, ) + # Settle any matched volume from legacy unsettled data + position._calculate_realized_pnl() + self._positions_held[position_key] = position if self._positions_held: logger.info(f"Recovered {len(self._positions_held)} position holds from database") @@ -786,16 +790,48 @@ async def get_performance_report( positions = self.get_positions_held(controller_id=controller_id) report["active_positions"] = len(positions) + # Accumulate fees from position holds (already paid, reduce PnL) + position_hold_fees = sum(float(p.cum_fees_quote) for p in positions) + if market_data_service: + # First pass: try oracle for each position, collect misses grouped by connector + missing_by_connector: Dict[str, List[tuple]] = {} # connector_key -> [(position, trading_pair)] for p in positions: parts = p.trading_pair.split("-") - if len(parts) == 2: - base, quote = parts - rate = market_data_service.get_rate(base, quote) - if rate is not None: - unrealized_pnl += float(p.get_unrealized_pnl(rate)) + if len(parts) != 2: + continue + base, quote = parts + rate = market_data_service.get_rate(base, quote) + if rate is not None: + unrealized_pnl += float(p.get_unrealized_pnl(rate)) + else: + # Group by connector+account for batch fallback + connector_key = f"{p.connector_name}|{p.account_name}" + missing_by_connector.setdefault(connector_key, []).append((p, p.trading_pair)) + + # Second pass: batch-fetch missing prices from the actual connectors + for connector_key, items in missing_by_connector.items(): + connector_name, account_name = connector_key.split("|", 1) + trading_pairs = [tp for _, tp in items] + try: + prices = await market_data_service.get_prices( + connector_name=connector_name, + trading_pairs=trading_pairs, + account_name=account_name, + ) + if isinstance(prices, dict) and "error" not in prices: + for pos, tp in items: + price = prices.get(tp) + if price is not None and price > 0: + unrealized_pnl += float(pos.get_unrealized_pnl(Decimal(str(price)))) + except Exception as e: + logger.warning(f"Fallback price fetch failed for {connector_name}: {e}") + + # Subtract position hold fees from unrealized PnL + unrealized_pnl -= position_hold_fees report["unrealized_pnl_quote"] = round(unrealized_pnl, 8) + report["position_hold_fees_quote"] = round(position_hold_fees, 8) report["global_pnl_quote"] = round(report["pnl_total_quote"] + unrealized_pnl, 8) return report @@ -998,17 +1034,26 @@ async def _aggregate_position_hold( # Check for held_position_orders (used by grid_executor, position_executor, etc.) held_orders = custom_info.get("held_position_orders", []) if custom_info else [] + # Extract cumulative fees from the executor + executor_fees = Decimal("0") + try: + executor_fees = Decimal(str(executor.cum_fees_quote or 0)) + except Exception: + pass + if held_orders: buy_filled_base = Decimal("0") buy_filled_quote = Decimal("0") sell_filled_base = Decimal("0") sell_filled_quote = Decimal("0") + orders_fees = Decimal("0") for order in held_orders: if isinstance(order, dict): trade_type = order.get("trade_type", "BUY") exec_base = Decimal(str(order.get("executed_amount_base", 0))) exec_quote = Decimal(str(order.get("executed_amount_quote", 0))) + orders_fees += Decimal(str(order.get("cumulative_fee_paid_quote", 0))) if trade_type == "BUY": buy_filled_base += exec_base @@ -1017,20 +1062,28 @@ async def _aggregate_position_hold( sell_filled_base += exec_base sell_filled_quote += exec_quote + # Use order-level fees if available, otherwise fall back to executor-level + fees = orders_fees if orders_fees > 0 else executor_fees + # Add buy and sell fills separately if buy_filled_base > 0: - position.add_fill("BUY", buy_filled_base, buy_filled_quote, executor_id) + # Split fees proportionally between buy and sell by quote volume + total_quote = buy_filled_quote + sell_filled_quote + buy_fee_share = fees * (buy_filled_quote / total_quote) if total_quote > 0 else fees + position.add_fill("BUY", buy_filled_base, buy_filled_quote, executor_id, fees_quote=buy_fee_share) if sell_filled_base > 0: - position.add_fill("SELL", sell_filled_base, sell_filled_quote, executor_id) + total_quote = buy_filled_quote + sell_filled_quote + sell_fee_share = fees * (sell_filled_quote / total_quote) if total_quote > 0 else fees + position.add_fill("SELL", sell_filled_base, sell_filled_quote, executor_id, fees_quote=sell_fee_share) logger.info( f"Aggregated executor {executor_id} to position {position_key}: " - f"buy={buy_filled_base} base, sell={sell_filled_base} base" + f"buy={buy_filled_base} base, sell={sell_filled_base} base, fees={fees} quote" ) elif filled_amount_base > 0: # For non-grid executors with a single side - position.add_fill(side, filled_amount_base, filled_amount_quote, executor_id) + position.add_fill(side, filled_amount_base, filled_amount_quote, executor_id, fees_quote=executor_fees) logger.info( f"Aggregated executor {executor_id} to position {position_key}: " f"{side} {filled_amount_base} base @ {filled_amount_quote} quote" @@ -1062,6 +1115,7 @@ async def _persist_position_hold(self, position: PositionHold): sell_amount_base=position.sell_amount_base, sell_amount_quote=position.sell_amount_quote, realized_pnl_quote=position.realized_pnl_quote, + cum_fees_quote=position.cum_fees_quote, executor_ids=position.executor_ids, ) except Exception as e: @@ -1197,6 +1251,7 @@ def get_positions_summary(self) -> Dict[str, Any]: "unmatched_amount_base": float(p.unmatched_amount_base), "position_side": p.position_side, "realized_pnl_quote": float(p.realized_pnl_quote), + "cum_fees_quote": float(p.cum_fees_quote), "executor_count": len(p.executor_ids), "executor_ids": p.executor_ids, "last_updated": p.last_updated.isoformat() if p.last_updated else None