From 5048918a47ee868fccc512639931466bf4db4cd4 Mon Sep 17 00:00:00 2001 From: dman Date: Mon, 16 Mar 2026 12:51:01 -0300 Subject: [PATCH 1/3] (feat) add controller id to executors --- database/connection.py | 31 +++++- database/models.py | 1 + database/repositories/executor_repository.py | 14 ++- models/executors.py | 7 ++ routers/executors.py | 27 ++++- services/executor_service.py | 52 +++++++--- services/orders_recorder.py | 102 ++++++++++++++++--- 7 files changed, 195 insertions(+), 39 deletions(-) diff --git a/database/connection.py b/database/connection.py index 6cca9bb8..6e3118c3 100644 --- a/database/connection.py +++ b/database/connection.py @@ -3,7 +3,7 @@ from typing import AsyncGenerator from sqlalchemy import text -from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker +from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine from .models import Base @@ -44,15 +44,40 @@ async def create_tables(self): try: async with self.engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) - + + # Run lightweight migrations for existing tables + await self._run_migrations(conn) + # Drop Hummingbot's native tables since we use our custom orders/trades tables await self._drop_hummingbot_tables(conn) - + logger.info("Database tables created successfully") except Exception as e: logger.error(f"Failed to create database tables: {e}") raise + async def _run_migrations(self, conn): + """Run lightweight schema migrations for existing tables.""" + migrations = [ + # Add controller_id to executors table (default "main" for existing rows) + ( + "executors", "controller_id", + "ALTER TABLE executors ADD COLUMN controller_id TEXT NOT NULL DEFAULT 'main'" + ), + ] + for table, column, sql in migrations: + try: + # Check if column already exists + result = await conn.execute(text( + "SELECT column_name FROM information_schema.columns " + f"WHERE table_name = '{table}' AND column_name = '{column}'" + )) + if result.fetchone() is None: + await conn.execute(text(sql)) + logger.info(f"Migration: added {column} to {table}") + except Exception as e: + logger.debug(f"Migration check for {table}.{column}: {e}") + async def _drop_hummingbot_tables(self, conn): """Drop Hummingbot's native database tables since we use custom ones.""" hummingbot_tables = [ diff --git a/database/models.py b/database/models.py index 830e40d9..524a6a0b 100644 --- a/database/models.py +++ b/database/models.py @@ -358,6 +358,7 @@ class ExecutorRecord(Base): account_name = Column(String, nullable=False, index=True) connector_name = Column(String, nullable=False, index=True) trading_pair = Column(String, nullable=False, index=True) + controller_id = Column(String, nullable=False, default="main", index=True) # Timestamps created_at = Column(TIMESTAMP(timezone=True), server_default=func.now(), nullable=False, index=True) diff --git a/database/repositories/executor_repository.py b/database/repositories/executor_repository.py index 0041c561..10c07461 100644 --- a/database/repositories/executor_repository.py +++ b/database/repositories/executor_repository.py @@ -29,7 +29,8 @@ async def create_executor( connector_name: str, trading_pair: str, config: Optional[str] = None, - status: str = "RUNNING" + status: str = "RUNNING", + controller_id: str = "main" ) -> ExecutorRecord: """Create a new executor record.""" executor = ExecutorRecord( @@ -38,6 +39,7 @@ async def create_executor( account_name=account_name, connector_name=connector_name, trading_pair=trading_pair, + controller_id=controller_id, config=config, status=status ) @@ -98,6 +100,7 @@ async def get_executors( trading_pair: Optional[str] = None, executor_type: Optional[str] = None, status: Optional[str] = None, + controller_id: Optional[str] = None, limit: int = 100, offset: int = 0 ) -> List[ExecutorRecord]: @@ -115,6 +118,8 @@ async def get_executors( conditions.append(ExecutorRecord.executor_type == executor_type) if status: conditions.append(ExecutorRecord.status == status) + if controller_id: + conditions.append(ExecutorRecord.controller_id == controller_id) if conditions: stmt = stmt.where(and_(*conditions)) @@ -146,7 +151,8 @@ async def get_position_hold_executors( self, account_name: Optional[str] = None, connector_name: Optional[str] = None, - trading_pair: Optional[str] = None + trading_pair: Optional[str] = None, + controller_id: Optional[str] = None ) -> List[ExecutorRecord]: """Get executors that closed with POSITION_HOLD (keep_position=True).""" stmt = select(ExecutorRecord).where(ExecutorRecord.close_type == "POSITION_HOLD") @@ -158,6 +164,8 @@ async def get_position_hold_executors( conditions.append(ExecutorRecord.connector_name == connector_name) if trading_pair: conditions.append(ExecutorRecord.trading_pair == trading_pair) + if controller_id: + conditions.append(ExecutorRecord.controller_id == controller_id) if conditions: stmt = stmt.where(and_(*conditions)) @@ -323,7 +331,7 @@ async def cleanup_orphaned_executors( Number of executors cleaned up """ from sqlalchemy import update - + # Find executors that are RUNNING but not in the active list conditions = [ExecutorRecord.status == "RUNNING"] diff --git a/models/executors.py b/models/executors.py index dda1b455..c6444af8 100644 --- a/models/executors.py +++ b/models/executors.py @@ -31,6 +31,7 @@ class PositionHold(BaseModel): trading_pair: str = Field(description="Trading pair (e.g., 'BTC-USDT')") connector_name: str = Field(description="Connector name") account_name: str = Field(description="Account name") + controller_id: str = Field(default="main", description="Controller that owns this position") # Buy side tracking buy_amount_base: Decimal = Field(default=Decimal("0"), description="Total bought amount in base currency") @@ -172,6 +173,7 @@ class PositionHoldResponse(BaseModel): trading_pair: str connector_name: str account_name: str + controller_id: str = Field(default="main", description="Controller that owns this position") buy_amount_base: float buy_amount_quote: float sell_amount_base: float @@ -310,6 +312,10 @@ class ExecutorFilterRequest(PaginationParams): None, description="Filter by status (RUNNING, TERMINATED, etc.)" ) + controller_ids: Optional[List[str]] = Field( + None, + description="Filter by controller IDs" + ) # ======================================== @@ -383,6 +389,7 @@ class CreateExecutorResponse(BaseModel): executor_type: str = Field(description="Type of executor created") connector_name: str = Field(description="Connector name") trading_pair: str = Field(description="Trading pair") + controller_id: str = Field(default="main", description="Controller that owns this executor") status: str = Field(description="Initial status") created_at: str = Field(description="Creation timestamp (ISO format)") diff --git a/routers/executors.py b/routers/executors.py index 8cb8545c..09dfe034 100644 --- a/routers/executors.py +++ b/routers/executors.py @@ -97,7 +97,8 @@ async def list_executors( connector_name=filter_request.connector_names[0] if filter_request.connector_names else None, trading_pair=filter_request.trading_pairs[0] if filter_request.trading_pairs else None, executor_type=filter_request.executor_types[0] if filter_request.executor_types else None, - status=filter_request.status + status=filter_request.status, + controller_id=filter_request.controller_ids[0] if filter_request.controller_ids else None ) # Apply additional multi-value filters @@ -109,6 +110,8 @@ async def list_executors( executors = [e for e in executors if e.get("trading_pair") in filter_request.trading_pairs] if filter_request.executor_types and len(filter_request.executor_types) > 1: executors = [e for e in executors if e.get("executor_type") in filter_request.executor_types] + if filter_request.controller_ids and len(filter_request.controller_ids) > 1: + executors = [e for e in executors if e.get("controller_id") in filter_request.controller_ids] # Apply cursor-based pagination start_idx = 0 @@ -309,6 +312,7 @@ async def stop_executor( @router.get("/positions/summary", response_model=PositionsSummaryResponse) async def get_positions_summary( + controller_id: Optional[str] = None, executor_service: ExecutorService = Depends(get_executor_service), market_data_service: MarketDataService = Depends(get_market_data_service) ): @@ -320,9 +324,12 @@ async def get_positions_summary( - Total realized PnL across all positions - Total unrealized PnL (when market rates are available) - List of all positions with breakeven prices and PnL + + Query parameters: + - **controller_id**: Filter positions by controller ID """ try: - positions = executor_service.get_positions_held() + positions = executor_service.get_positions_held(controller_id=controller_id) total_realized_pnl = sum(float(p.realized_pnl_quote) for p in positions) total_unrealized_pnl = None position_responses = [] @@ -343,6 +350,7 @@ async def get_positions_summary( trading_pair=p.trading_pair, connector_name=p.connector_name, account_name=p.account_name, + controller_id=p.controller_id, buy_amount_base=float(p.buy_amount_base), buy_amount_quote=float(p.buy_amount_quote), sell_amount_base=float(p.sell_amount_base), @@ -378,6 +386,7 @@ async def get_position_held( connector_name: str, trading_pair: str, account_name: str = "master_account", + controller_id: str = "main", executor_service: ExecutorService = Depends(get_executor_service), market_data_service: MarketDataService = Depends(get_market_data_service) ): @@ -386,12 +395,16 @@ async def get_position_held( Returns the aggregated position from executors stopped with keep_position=True, including breakeven prices, matched/unmatched volume, realized PnL, and unrealized PnL. + + Query parameters: + - **controller_id**: Controller ID (default "main") """ try: position = executor_service.get_position_held( account_name=account_name, connector_name=connector_name, - trading_pair=trading_pair + trading_pair=trading_pair, + controller_id=controller_id ) if not position: @@ -412,6 +425,7 @@ async def get_position_held( trading_pair=position.trading_pair, connector_name=position.connector_name, account_name=position.account_name, + controller_id=position.controller_id, buy_amount_base=float(position.buy_amount_base), buy_amount_quote=float(position.buy_amount_quote), sell_amount_base=float(position.sell_amount_base), @@ -440,6 +454,7 @@ async def clear_position_held( connector_name: str, trading_pair: str, account_name: str = "master_account", + controller_id: str = "main", executor_service: ExecutorService = Depends(get_executor_service) ): """ @@ -447,12 +462,16 @@ async def clear_position_held( This removes the position from tracking but preserves historical data in completed executors. + + Query parameters: + - **controller_id**: Controller ID (default "main") """ try: cleared = executor_service.clear_position_held( account_name=account_name, connector_name=connector_name, - trading_pair=trading_pair + trading_pair=trading_pair, + controller_id=controller_id ) if not cleared: diff --git a/services/executor_service.py b/services/executor_service.py index 13b24dc1..95d22a2c 100644 --- a/services/executor_service.py +++ b/services/executor_service.py @@ -155,10 +155,12 @@ async def recover_positions_from_db(self): for executor_record in position_hold_executors: # Build position key + controller_id = getattr(executor_record, "controller_id", "main") or "main" position_key = self._get_position_key( executor_record.account_name, executor_record.connector_name, - executor_record.trading_pair + executor_record.trading_pair, + controller_id ) # Initialize position if needed @@ -167,6 +169,7 @@ async def recover_positions_from_db(self): trading_pair=executor_record.trading_pair, connector_name=executor_record.connector_name, account_name=executor_record.account_name, + controller_id=controller_id, ) position = self._positions_held[position_key] @@ -386,12 +389,14 @@ async def create_executor( # Store executor and metadata executor_id = typed_config.id + controller_id = getattr(typed_config, "controller_id", "main") or "main" self._active_executors[executor_id] = executor self._executor_metadata[executor_id] = { "account_name": account, "connector_name": connector_name, "trading_pair": trading_pair, "executor_type": executor_type, + "controller_id": controller_id, "created_at": datetime.now(timezone.utc), "config": executor_config } @@ -419,6 +424,7 @@ async def create_executor( "executor_type": executor_type, "connector_name": connector_name, "trading_pair": trading_pair, + "controller_id": controller_id, "status": executor.status.name, "created_at": created_at } @@ -429,7 +435,8 @@ async def get_executors( connector_name: Optional[str] = None, trading_pair: Optional[str] = None, executor_type: Optional[str] = None, - status: Optional[str] = None + status: Optional[str] = None, + controller_id: Optional[str] = None ) -> List[Dict[str, Any]]: """ Get list of executors with optional filtering. @@ -442,6 +449,7 @@ async def get_executors( trading_pair: Filter by trading pair executor_type: Filter by executor type status: Filter by status + controller_id: Filter by controller ID Returns: List of executor information dictionaries @@ -463,6 +471,8 @@ async def get_executors( continue if status and executor.status.name != status: continue + if controller_id and metadata.get("controller_id", "main") != controller_id: + continue result.append(self._format_executor_info(executor_id, executor)) @@ -478,7 +488,8 @@ async def get_executors( connector_name=connector_name, trading_pair=trading_pair, executor_type=executor_type, - status=status + status=status, + controller_id=controller_id ) for record in db_executors: @@ -629,6 +640,7 @@ def _format_executor_info( result["connector_name"] = metadata.get("connector_name") if metadata.get("trading_pair"): result["trading_pair"] = metadata.get("trading_pair") + result["controller_id"] = metadata.get("controller_id", "main") # Read status/close_type directly from executor result["status"] = executor.status.name @@ -663,7 +675,7 @@ def _format_db_record(self, record) -> Dict[str, Any]: "created_at": record.created_at.isoformat() if record.created_at else None, "close_timestamp": record.closed_at.timestamp() if record.closed_at else None, "closed_at": record.closed_at.isoformat() if record.closed_at else None, - "controller_id": None, + "controller_id": record.controller_id or "main", "net_pnl_quote": float(record.net_pnl_quote) if record.net_pnl_quote else 0.0, "net_pnl_pct": float(record.net_pnl_pct) if record.net_pnl_pct else 0.0, "cum_fees_quote": float(record.cum_fees_quote) if record.cum_fees_quote else 0.0, @@ -730,7 +742,8 @@ async def _persist_executor_created(self, executor_id: str, executor: ExecutorBa connector_name=metadata.get("connector_name"), trading_pair=metadata.get("trading_pair"), config=json.dumps(metadata.get("config", {}), default=_json_default), - status=executor.status.name + status=executor.status.name, + controller_id=metadata.get("controller_id", "main") ) logger.debug(f"Persisted executor {executor_id} creation to database") @@ -818,10 +831,11 @@ def _get_position_key( self, account_name: str, connector_name: str, - trading_pair: str + trading_pair: str, + controller_id: str = "main" ) -> str: """Generate a unique key for position tracking.""" - return f"{account_name}|{connector_name}|{trading_pair}" + return f"{account_name}|{connector_name}|{trading_pair}|{controller_id}" async def _aggregate_position_hold( self, @@ -838,19 +852,21 @@ async def _aggregate_position_hold( account_name = metadata.get("account_name", self.default_account) connector_name = metadata.get("connector_name", "") trading_pair = metadata.get("trading_pair", "") + controller_id = metadata.get("controller_id", "main") if not connector_name or not trading_pair: logger.warning(f"Cannot aggregate position for executor {executor_id}: missing connector/pair info") return - position_key = self._get_position_key(account_name, connector_name, trading_pair) + position_key = self._get_position_key(account_name, connector_name, trading_pair, controller_id) # Get or create position hold if position_key not in self._positions_held: self._positions_held[position_key] = PositionHold( trading_pair=trading_pair, connector_name=connector_name, - account_name=account_name + account_name=account_name, + controller_id=controller_id ) position = self._positions_held[position_key] @@ -934,7 +950,8 @@ def get_positions_held( self, account_name: Optional[str] = None, connector_name: Optional[str] = None, - trading_pair: Optional[str] = None + trading_pair: Optional[str] = None, + controller_id: Optional[str] = None ) -> List[PositionHold]: """ Get held positions with optional filtering. @@ -943,6 +960,7 @@ def get_positions_held( account_name: Filter by account name connector_name: Filter by connector name trading_pair: Filter by trading pair + controller_id: Filter by controller ID Returns: List of PositionHold objects matching the filters @@ -957,6 +975,8 @@ def get_positions_held( continue if trading_pair and position.trading_pair != trading_pair: continue + if controller_id and position.controller_id != controller_id: + continue # Only include positions with actual volume if position.buy_amount_base > 0 or position.sell_amount_base > 0: @@ -968,7 +988,8 @@ def get_position_held( self, account_name: str, connector_name: str, - trading_pair: str + trading_pair: str, + controller_id: str = "main" ) -> Optional[PositionHold]: """ Get a specific held position. @@ -977,18 +998,20 @@ def get_position_held( account_name: Account name connector_name: Connector name trading_pair: Trading pair + controller_id: Controller ID Returns: PositionHold or None if not found """ - position_key = self._get_position_key(account_name, connector_name, trading_pair) + position_key = self._get_position_key(account_name, connector_name, trading_pair, controller_id) return self._positions_held.get(position_key) def clear_position_held( self, account_name: str, connector_name: str, - trading_pair: str + trading_pair: str, + controller_id: str = "main" ) -> bool: """ Clear a specific held position (after manual close or full exit). @@ -997,11 +1020,12 @@ def clear_position_held( account_name: Account name connector_name: Connector name trading_pair: Trading pair + controller_id: Controller ID Returns: True if cleared, False if not found """ - position_key = self._get_position_key(account_name, connector_name, trading_pair) + position_key = self._get_position_key(account_name, connector_name, trading_pair, controller_id) if position_key in self._positions_held: del self._positions_held[position_key] logger.info(f"Cleared position hold for {position_key}") diff --git a/services/orders_recorder.py b/services/orders_recorder.py index 90ca8119..04b41bce 100644 --- a/services/orders_recorder.py +++ b/services/orders_recorder.py @@ -2,20 +2,14 @@ import logging import math import time - -from typing import Any, Optional, Union from datetime import datetime from decimal import Decimal, InvalidOperation +from typing import Any, Optional, Union -from hummingbot.core.event.event_forwarder import SourceInfoEventForwarder -from hummingbot.core.event.events import ( - TradeType, - BuyOrderCreatedEvent, - SellOrderCreatedEvent, - OrderFilledEvent, - MarketEvent -) from hummingbot.connector.connector_base import ConnectorBase +from hummingbot.core.event.event_forwarder import SourceInfoEventForwarder +from hummingbot.core.event.events import BuyOrderCreatedEvent, MarketEvent, OrderFilledEvent, SellOrderCreatedEvent, TradeType + from database import AsyncDatabaseManager, OrderRepository, TradeRepository # Initialize logger @@ -215,10 +209,28 @@ async def _handle_order_filled(self, event: OrderFilledEvent): trade_fee_paid = float(fee_in_quote) trade_fee_currency = quote_asset except Exception as e: - logger.error(f"Error calculating trade fee: {e}") - trade_fee_paid = 0 - trade_fee_currency = None - + logger.warning(f"Primary fee calculation failed: {e}. Attempting fallback...") + try: + base_asset, quote_asset = event.trading_pair.split("-") + fallback_fee = await self._calculate_fee_fallback( + trade_fee=event.trade_fee, + base_asset=base_asset, + quote_asset=quote_asset, + fill_price=event.price, + order_amount=event.amount, + ) + if fallback_fee is not None: + trade_fee_paid = float(fallback_fee) + trade_fee_currency = quote_asset + logger.info(f"Fallback fee calculation succeeded: {trade_fee_paid} {trade_fee_currency}") + else: + logger.error(f"Fallback fee calculation returned None for {event.order_id}") + trade_fee_paid = 0 + trade_fee_currency = None + except Exception as fallback_err: + logger.error(f"Fallback fee calculation also failed: {fallback_err}") + trade_fee_paid = 0 + trade_fee_currency = None # Update order with fill information (handle potential NaN values like Hummingbot does) try: filled_amount = Decimal(str(event.amount)) @@ -303,7 +315,67 @@ def _get_order_details_from_connector(self, order_id: str) -> Optional[dict]: except Exception as e: logger.error(f"Error getting order details from connector: {e}") return None - + + async def _fetch_conversion_rate(self, from_token: str, to_token: str) -> Optional[Decimal]: + """Fetch the conversion rate between two tokens using the connector's REST API. + Tries direct pair first, then inverse pair.""" + if not self._connector: + return None + try: + direct_pair = f"{from_token}-{to_token}" + price = await asyncio.wait_for( + self._connector._get_last_traded_price(trading_pair=direct_pair), + timeout=5.0, + ) + if price and price > 0: + return Decimal(str(price)) + except Exception: + pass + try: + inverse_pair = f"{to_token}-{from_token}" + price = await asyncio.wait_for( + self._connector._get_last_traded_price(trading_pair=inverse_pair), + timeout=5.0, + ) + if price and price > 0: + return Decimal(1) / Decimal(str(price)) + except Exception: + pass + return None + + async def _calculate_fee_fallback( + self, + trade_fee, + base_asset: str, + quote_asset: str, + fill_price: Decimal, + order_amount: Decimal, + ) -> Optional[Decimal]: + """Manually compute the trade fee in quote asset when the primary method fails.""" + fee_amount = Decimal(0) + + # Handle percent component + if trade_fee.percent and trade_fee.percent != Decimal(0): + fee_amount += (fill_price * order_amount) * trade_fee.percent + + # Handle flat_fees component + for flat_fee in trade_fee.flat_fees: + if flat_fee.token == quote_asset: + fee_amount += flat_fee.amount + elif flat_fee.token == base_asset: + fee_amount += flat_fee.amount * fill_price + else: + rate = await self._fetch_conversion_rate(flat_fee.token, quote_asset) + if rate is not None: + fee_amount += flat_fee.amount * rate + else: + logger.error( + f"Could not fetch conversion rate for {flat_fee.token} -> {quote_asset}" + ) + return None + + return fee_amount + async def _handle_order_failed(self, event: Any): """Handle order failure events""" try: From d49b71bace9532b35465992b8d0d4517bb7c73a7 Mon Sep 17 00:00:00 2001 From: dman Date: Sat, 21 Mar 2026 12:29:23 -0300 Subject: [PATCH 2/3] feat: add controller ID support to executors Co-Authored-By: Claude Opus 4.6 --- database/connection.py | 11 +- database/repositories/executor_repository.py | 108 ++++++++++++++++++- models/executors.py | 28 +++++ routers/executors.py | 35 ++++++ services/executor_service.py | 94 ++++++++++++++++ 5 files changed, 271 insertions(+), 5 deletions(-) diff --git a/database/connection.py b/database/connection.py index 6e3118c3..44f1fecd 100644 --- a/database/connection.py +++ b/database/connection.py @@ -68,10 +68,13 @@ async def _run_migrations(self, conn): for table, column, sql in migrations: try: # Check if column already exists - result = await conn.execute(text( - "SELECT column_name FROM information_schema.columns " - f"WHERE table_name = '{table}' AND column_name = '{column}'" - )) + result = await conn.execute( + text( + "SELECT column_name FROM information_schema.columns " + "WHERE table_name = :table AND column_name = :column" + ), + {"table": table, "column": column} + ) if result.fetchone() is None: await conn.execute(text(sql)) logger.info(f"Migration: added {column} to {table}") diff --git a/database/repositories/executor_repository.py b/database/repositories/executor_repository.py index 10c07461..14f3147c 100644 --- a/database/repositories/executor_repository.py +++ b/database/repositories/executor_repository.py @@ -5,7 +5,7 @@ from decimal import Decimal from typing import Any, Dict, List, Optional -from sqlalchemy import and_, desc, func, select +from sqlalchemy import and_, case, desc, func, select from sqlalchemy.ext.asyncio import AsyncSession from database.models import ExecutorOrder, ExecutorRecord @@ -233,6 +233,112 @@ async def get_executor_stats(self) -> Dict[str, Any]: "connector_counts": connector_counts } + async def get_performance_report( + self, + controller_id: Optional[str] = None + ) -> Dict[str, Any]: + """Get a performance report, optionally filtered by controller_id. + + Returns aggregate metrics: total executors, PnL, fees, volume, + win rate, per-executor PnL list (for Sharpe), and breakdown by type. + """ + base_filter = [] + if controller_id: + base_filter.append(ExecutorRecord.controller_id == controller_id) + + def _where(stmt): + return stmt.where(and_(*base_filter)) if base_filter else stmt + + # --- Status counts --- + status_stmt = _where( + select( + ExecutorRecord.status, + func.count(ExecutorRecord.id).label("cnt"), + ).group_by(ExecutorRecord.status) + ) + status_rows = await self.session.execute(status_stmt) + status_counts = {r.status: r.cnt for r in status_rows} + + total_executors = sum(status_counts.values()) + + # --- Aggregate PnL / fees / volume (completed only, excluding POSITION_HOLD to avoid double-counting) --- + completed_filter = base_filter + [ + ExecutorRecord.status != "RUNNING", + ExecutorRecord.close_type != "POSITION_HOLD", + ] + agg_stmt = select( + func.coalesce(func.sum(ExecutorRecord.net_pnl_quote), Decimal(0)).label("pnl"), + func.coalesce(func.sum(ExecutorRecord.cum_fees_quote), Decimal(0)).label("fees"), + func.coalesce(func.sum(ExecutorRecord.filled_amount_quote), Decimal(0)).label("vol"), + func.coalesce(func.avg(ExecutorRecord.net_pnl_pct), Decimal(0)).label("pnl_pct_avg"), + func.count(ExecutorRecord.id).label("completed_count"), + func.sum(case( + (ExecutorRecord.net_pnl_quote > 0, 1), + else_=0, + )).label("wins"), + ) + if completed_filter: + agg_stmt = agg_stmt.where(and_(*completed_filter)) + agg_row = (await self.session.execute(agg_stmt)).one() + + completed_count = agg_row.completed_count or 0 + wins = agg_row.wins or 0 + win_rate = (wins / completed_count) if completed_count > 0 else 0.0 + + # --- Per-executor PnL list for Sharpe (excluding POSITION_HOLD) --- + pnl_list_stmt = _where( + select(ExecutorRecord.net_pnl_quote).where( + ExecutorRecord.status != "RUNNING", + ExecutorRecord.close_type != "POSITION_HOLD", + ) + ) + pnl_rows = await self.session.execute(pnl_list_stmt) + pnl_values = [float(r[0] or 0) for r in pnl_rows] + + # --- Breakdown by executor type --- + type_stmt = _where( + select( + ExecutorRecord.executor_type, + func.count(ExecutorRecord.id).label("total"), + func.sum(case( + (ExecutorRecord.status != "RUNNING", 1), + else_=0, + )).label("completed"), + func.sum(case( + (ExecutorRecord.status == "RUNNING", 1), + else_=0, + )).label("running"), + func.coalesce(func.sum(ExecutorRecord.net_pnl_quote), Decimal(0)).label("pnl"), + func.coalesce(func.sum(ExecutorRecord.filled_amount_quote), Decimal(0)).label("vol"), + func.coalesce(func.sum(ExecutorRecord.cum_fees_quote), Decimal(0)).label("fees"), + ).group_by(ExecutorRecord.executor_type) + ) + type_rows = await self.session.execute(type_stmt) + by_type = [ + { + "executor_type": r.executor_type, + "total": r.total, + "completed": r.completed or 0, + "running": r.running or 0, + "pnl_quote": float(r.pnl), + "volume_quote": float(r.vol), + "fees_quote": float(r.fees), + } + for r in type_rows + ] + + return { + "total_executors": total_executors, + "status_counts": status_counts, + "pnl_total_quote": float(agg_row.pnl), + "pnl_pct_avg": float(agg_row.pnl_pct_avg), + "fees_total_quote": float(agg_row.fees), + "volume_total_quote": float(agg_row.vol), + "win_rate": win_rate, + "pnl_values": pnl_values, + "by_type": by_type, + } + # ======================================== # ExecutorOrder Operations # ======================================== diff --git a/models/executors.py b/models/executors.py index c6444af8..13f3a8e6 100644 --- a/models/executors.py +++ b/models/executors.py @@ -424,6 +424,34 @@ class ExecutorsSummaryResponse(BaseModel): by_status: Dict[str, int] = Field(description="Executor count by status") +class ExecutorTypeBreakdown(BaseModel): + """Performance breakdown for a single executor type.""" + executor_type: str = Field(description="Executor type name") + total: int = Field(description="Total executors of this type") + completed: int = Field(description="Completed executors") + running: int = Field(description="Currently running executors") + pnl_quote: float = Field(description="Net PnL in quote currency") + volume_quote: float = Field(description="Total filled volume in quote currency") + fees_quote: float = Field(description="Cumulative fees in quote currency") + + +class PerformanceReportResponse(BaseModel): + """Performance report for executors, optionally filtered by controller_id.""" + controller_id: Optional[str] = Field(None, description="Controller ID filter (None = all)") + total_executors: int = Field(description="Total executor count") + by_status: Dict[str, int] = Field(description="Executor count by status") + pnl_total_quote: float = Field(description="Realized PnL from completed executors in quote currency") + unrealized_pnl_quote: float = Field(description="Unrealized PnL from active executors and position holds") + global_pnl_quote: float = Field(description="Global PnL (realized + unrealized)") + pnl_pct_avg: float = Field(description="Average PnL percentage across completed executors") + fees_total_quote: float = Field(description="Total cumulative fees in quote currency") + volume_total_quote: float = Field(description="Total filled volume in quote currency") + win_rate: float = Field(description="Win rate: fraction of completed executors with positive PnL") + sharpe_ratio: Optional[float] = Field(None, description="Sharpe ratio of PnL returns (null if <2 executors)") + by_type: List[ExecutorTypeBreakdown] = Field(description="Performance breakdown by executor type") + active_positions: int = Field(description="Number of active position holds") + + class ExecutorLogEntry(BaseModel): """A single log entry from an executor.""" timestamp: str = Field(description="ISO-format timestamp") diff --git a/routers/executors.py b/routers/executors.py index 09dfe034..7e0aebdf 100644 --- a/routers/executors.py +++ b/routers/executors.py @@ -18,6 +18,7 @@ ExecutorFilterRequest, ExecutorLogsResponse, ExecutorsSummaryResponse, + PerformanceReportResponse, PositionHoldResponse, PositionsSummaryResponse, StopExecutorRequest, @@ -164,6 +165,40 @@ async def get_executors_summary( raise HTTPException(status_code=500, detail=f"Error getting summary: {str(e)}") +@router.get("/performance", response_model=PerformanceReportResponse) +async def get_performance_report( + controller_id: Optional[str] = None, + executor_service: ExecutorService = Depends(get_executor_service), + market_data_service: MarketDataService = Depends(get_market_data_service) +): + """ + Get a performance report for executors. + + Aggregates metrics from all completed executors (optionally filtered by controller_id): + - Realized PnL (from completed executors, excluding POSITION_HOLD close type) + - Unrealized PnL (from active executors + position holds) + - Global PnL (realized + unrealized) + - Fees and volume totals + - Win rate and Sharpe ratio + - Breakdown by executor type + - Active position count + + Query parameters: + - **controller_id**: Filter by controller ID (omit for all controllers) + """ + try: + report = await executor_service.get_performance_report( + controller_id=controller_id, + market_data_service=market_data_service + ) + return PerformanceReportResponse(**report) + except HTTPException: + raise + except Exception as e: + logger.error(f"Error generating performance report: {e}", exc_info=True) + raise HTTPException(status_code=500, detail=f"Error generating performance report: {str(e)}") + + @router.get("/{executor_id}/logs", response_model=ExecutorLogsResponse) async def get_executor_logs( executor_id: str, diff --git a/services/executor_service.py b/services/executor_service.py index 95d22a2c..7e852bbd 100644 --- a/services/executor_service.py +++ b/services/executor_service.py @@ -723,6 +723,100 @@ def get_summary(self) -> Dict[str, Any]: "by_status": by_status } + async def get_performance_report( + self, + controller_id: Optional[str] = None, + market_data_service=None + ) -> Dict[str, Any]: + """ + Generate a performance report aggregating executor metrics. + + Combines database aggregations (completed executors) with in-memory + active executor and position hold unrealized PnL. + Excludes POSITION_HOLD close_type from realized PnL to avoid double-counting. + + Args: + controller_id: Filter by controller ID (None = all) + market_data_service: MarketDataService for position hold unrealized PnL + + Returns: + Dictionary with performance metrics ready for PerformanceReportResponse. + """ + import math + + report: Dict[str, Any] = { + "controller_id": controller_id, + "total_executors": 0, + "by_status": {}, + "pnl_total_quote": 0.0, + "unrealized_pnl_quote": 0.0, + "global_pnl_quote": 0.0, + "pnl_pct_avg": 0.0, + "fees_total_quote": 0.0, + "volume_total_quote": 0.0, + "win_rate": 0.0, + "sharpe_ratio": None, + "by_type": [], + "active_positions": 0, + } + + if self.db_manager: + try: + async with self.db_manager.get_session_context() as session: + from database.repositories.executor_repository import ExecutorRepository + repo = ExecutorRepository(session) + db_data = await repo.get_performance_report(controller_id=controller_id) + + report["total_executors"] = db_data["total_executors"] + report["by_status"] = db_data["status_counts"] + report["pnl_total_quote"] = db_data["pnl_total_quote"] + report["pnl_pct_avg"] = db_data["pnl_pct_avg"] + report["fees_total_quote"] = db_data["fees_total_quote"] + report["volume_total_quote"] = db_data["volume_total_quote"] + report["win_rate"] = db_data["win_rate"] + report["by_type"] = db_data["by_type"] + + # Sharpe ratio: mean(pnl) / std(pnl), requires >= 2 values + pnl_values = db_data.get("pnl_values", []) + if len(pnl_values) >= 2: + mean_pnl = sum(pnl_values) / len(pnl_values) + variance = sum((v - mean_pnl) ** 2 for v in pnl_values) / (len(pnl_values) - 1) + std_pnl = math.sqrt(variance) + if std_pnl > 0: + report["sharpe_ratio"] = round(mean_pnl / std_pnl, 4) + + except Exception as e: + logger.error(f"Error generating performance report: {e}", exc_info=True) + + # --- Unrealized PnL from active executors --- + unrealized_pnl = 0.0 + for executor_id, executor in self._active_executors.items(): + metadata = self._executor_metadata.get(executor_id, {}) + if controller_id and metadata.get("controller_id", "main") != controller_id: + continue + try: + unrealized_pnl += float(executor.executor_info.net_pnl_quote) + except Exception: + pass + + # --- Unrealized PnL from position holds --- + positions = self.get_positions_held(controller_id=controller_id) + report["active_positions"] = len(positions) + + if market_data_service: + for p in positions: + parts = p.trading_pair.split("-") + if len(parts) == 2: + base, quote = parts + rate = market_data_service.get_rate(base, quote) + if rate is not None: + unrealized_pnl += float(p.get_unrealized_pnl(rate)) + + report["unrealized_pnl_quote"] = round(unrealized_pnl, 8) + report["global_pnl_quote"] = round(report["pnl_total_quote"] + unrealized_pnl, 8) + + return report + async def _persist_executor_created(self, executor_id: str, executor: ExecutorBase): """Persist executor creation to database.""" if not self.db_manager: From 432604f9260f9f139014e7a537fc5eac9c33c138 Mon Sep 17 00:00:00 2001 From: cardosofede Date: Mon, 23 Mar 2026 18:07:46 -0300 Subject: [PATCH 3/3] (feat) update method usage --- database/connection.py | 33 ++-- database/repositories/executor_repository.py | 190 +++++++++---------- models/executors.py | 4 + routers/executors.py | 3 +- services/executor_service.py | 5 +- 5 files changed, 117 insertions(+), 118 deletions(-) diff --git a/database/connection.py b/database/connection.py index 44f1fecd..adcaf82d 100644 --- a/database/connection.py +++ b/database/connection.py @@ -15,7 +15,7 @@ def __init__(self, database_url: str): # Convert postgresql:// to postgresql+asyncpg:// for async support if database_url.startswith("postgresql://"): database_url = database_url.replace("postgresql://", "postgresql+asyncpg://") - + self.engine = create_async_engine( database_url, # Connection pool settings for async @@ -34,11 +34,11 @@ def __init__(self, database_url: str): } ) self.async_session = async_sessionmaker( - self.engine, - class_=AsyncSession, + self.engine, + class_=AsyncSession, expire_on_commit=False ) - + async def create_tables(self): """Create all tables defined in the models.""" try: @@ -55,7 +55,7 @@ async def create_tables(self): except Exception as e: logger.error(f"Failed to create database tables: {e}") raise - + async def _run_migrations(self, conn): """Run lightweight schema migrations for existing tables.""" migrations = [ @@ -79,37 +79,41 @@ async def _run_migrations(self, conn): await conn.execute(text(sql)) logger.info(f"Migration: added {column} to {table}") except Exception as e: - logger.debug(f"Migration check for {table}.{column}: {e}") + # Column-already-exists is expected on repeat startups + err_msg = str(e).lower() + if "already exists" in err_msg or "duplicate column" in err_msg: + logger.debug(f"Migration check for {table}.{column}: {e}") + else: + logger.warning(f"Unexpected migration error for {table}.{column}: {e}") async def _drop_hummingbot_tables(self, conn): """Drop Hummingbot's native database tables since we use custom ones.""" hummingbot_tables = [ "hummingbot_orders", - "hummingbot_trade_fills", + "hummingbot_trade_fills", "hummingbot_order_status" ] - + for table_name in hummingbot_tables: try: await conn.execute(text(f"DROP TABLE IF EXISTS {table_name}")) logger.info(f"Dropped Hummingbot table: {table_name}") except Exception as e: logger.debug(f"Could not drop table {table_name}: {e}") # Use debug since table might not exist - + async def close(self): """Close all database connections.""" await self.engine.dispose() logger.info("Database connections closed") - + def get_session(self) -> AsyncSession: """Get a new database session.""" return self.async_session() - + @asynccontextmanager async def get_session_context(self) -> AsyncGenerator[AsyncSession, None]: """ Get a database session with automatic error handling and cleanup. - Usage: async with db_manager.get_session_context() as session: # Use session here @@ -123,11 +127,10 @@ async def get_session_context(self) -> AsyncGenerator[AsyncSession, None]: raise finally: await session.close() - + async def health_check(self) -> bool: """ Check if the database connection is healthy. - Returns: bool: True if connection is healthy, False otherwise. """ @@ -137,4 +140,4 @@ async def health_check(self) -> bool: return True except Exception as e: logger.error(f"Database health check failed: {e}") - return False \ No newline at end of file + return False diff --git a/database/repositories/executor_repository.py b/database/repositories/executor_repository.py index 14f3147c..760ca39d 100644 --- a/database/repositories/executor_repository.py +++ b/database/repositories/executor_repository.py @@ -22,15 +22,15 @@ def __init__(self, session: AsyncSession): # ======================================== async def create_executor( - self, - executor_id: str, - executor_type: str, - account_name: str, - connector_name: str, - trading_pair: str, - config: Optional[str] = None, - status: str = "RUNNING", - controller_id: str = "main" + self, + executor_id: str, + executor_type: str, + account_name: str, + connector_name: str, + trading_pair: str, + config: Optional[str] = None, + status: str = "RUNNING", + controller_id: str = "main" ) -> ExecutorRecord: """Create a new executor record.""" executor = ExecutorRecord( @@ -50,15 +50,15 @@ async def create_executor( return executor async def update_executor( - self, - executor_id: str, - status: Optional[str] = None, - close_type: Optional[str] = None, - net_pnl_quote: Optional[Decimal] = None, - net_pnl_pct: Optional[Decimal] = None, - cum_fees_quote: Optional[Decimal] = None, - filled_amount_quote: Optional[Decimal] = None, - final_state: Optional[str] = None + self, + executor_id: str, + status: Optional[str] = None, + close_type: Optional[str] = None, + net_pnl_quote: Optional[Decimal] = None, + net_pnl_pct: Optional[Decimal] = None, + cum_fees_quote: Optional[Decimal] = None, + filled_amount_quote: Optional[Decimal] = None, + final_state: Optional[str] = None ) -> Optional[ExecutorRecord]: """Update an executor record.""" stmt = select(ExecutorRecord).where(ExecutorRecord.executor_id == executor_id) @@ -94,15 +94,15 @@ async def get_executor_by_id(self, executor_id: str) -> Optional[ExecutorRecord] return result.scalar_one_or_none() async def get_executors( - self, - account_name: Optional[str] = None, - connector_name: Optional[str] = None, - trading_pair: Optional[str] = None, - executor_type: Optional[str] = None, - status: Optional[str] = None, - controller_id: Optional[str] = None, - limit: int = 100, - offset: int = 0 + self, + account_name: Optional[str] = None, + connector_name: Optional[str] = None, + trading_pair: Optional[str] = None, + executor_type: Optional[str] = None, + status: Optional[str] = None, + controller_id: Optional[str] = None, + limit: int = 100, + offset: int = 0 ) -> List[ExecutorRecord]: """Get executors with optional filters.""" stmt = select(ExecutorRecord) @@ -130,9 +130,9 @@ async def get_executors( return list(result.scalars().all()) async def get_active_executors( - self, - account_name: Optional[str] = None, - connector_name: Optional[str] = None + self, + account_name: Optional[str] = None, + connector_name: Optional[str] = None ) -> List[ExecutorRecord]: """Get all active (running) executors.""" stmt = select(ExecutorRecord).where(ExecutorRecord.status == "RUNNING") @@ -148,11 +148,11 @@ async def get_active_executors( return list(result.scalars().all()) async def get_position_hold_executors( - self, - account_name: Optional[str] = None, - connector_name: Optional[str] = None, - trading_pair: Optional[str] = None, - controller_id: Optional[str] = None + self, + account_name: Optional[str] = None, + connector_name: Optional[str] = None, + trading_pair: Optional[str] = None, + controller_id: Optional[str] = None ) -> List[ExecutorRecord]: """Get executors that closed with POSITION_HOLD (keep_position=True).""" stmt = select(ExecutorRecord).where(ExecutorRecord.close_type == "POSITION_HOLD") @@ -234,8 +234,8 @@ async def get_executor_stats(self) -> Dict[str, Any]: } async def get_performance_report( - self, - controller_id: Optional[str] = None + self, + controller_id: Optional[str] = None ) -> Dict[str, Any]: """Get a performance report, optionally filtered by controller_id. @@ -246,16 +246,13 @@ async def get_performance_report( if controller_id: base_filter.append(ExecutorRecord.controller_id == controller_id) - def _where(stmt): - return stmt.where(and_(*base_filter)) if base_filter else stmt - # --- Status counts --- - status_stmt = _where( - select( - ExecutorRecord.status, - func.count(ExecutorRecord.id).label("cnt"), - ).group_by(ExecutorRecord.status) - ) + status_stmt = select( + ExecutorRecord.status, + func.count(ExecutorRecord.id).label("cnt"), + ).group_by(ExecutorRecord.status) + if base_filter: + status_stmt = status_stmt.where(and_(*base_filter)) status_rows = await self.session.execute(status_stmt) status_counts = {r.status: r.cnt for r in status_rows} @@ -276,9 +273,7 @@ def _where(stmt): (ExecutorRecord.net_pnl_quote > 0, 1), else_=0, )).label("wins"), - ) - if completed_filter: - agg_stmt = agg_stmt.where(and_(*completed_filter)) + ).where(and_(*completed_filter)) agg_row = (await self.session.execute(agg_stmt)).one() completed_count = agg_row.completed_count or 0 @@ -286,33 +281,30 @@ def _where(stmt): win_rate = (wins / completed_count) if completed_count > 0 else 0.0 # --- Per-executor PnL list for Sharpe (excluding POSITION_HOLD) --- - pnl_list_stmt = _where( - select(ExecutorRecord.net_pnl_quote).where( - ExecutorRecord.status != "RUNNING", - ExecutorRecord.close_type != "POSITION_HOLD", - ) + pnl_list_stmt = select(ExecutorRecord.net_pnl_quote).where( + and_(*completed_filter) ) pnl_rows = await self.session.execute(pnl_list_stmt) pnl_values = [float(r[0] or 0) for r in pnl_rows] - # --- Breakdown by executor type --- - type_stmt = _where( - select( - ExecutorRecord.executor_type, - func.count(ExecutorRecord.id).label("total"), - func.sum(case( - (ExecutorRecord.status != "RUNNING", 1), - else_=0, - )).label("completed"), - func.sum(case( - (ExecutorRecord.status == "RUNNING", 1), - else_=0, - )).label("running"), - func.coalesce(func.sum(ExecutorRecord.net_pnl_quote), Decimal(0)).label("pnl"), - func.coalesce(func.sum(ExecutorRecord.filled_amount_quote), Decimal(0)).label("vol"), - func.coalesce(func.sum(ExecutorRecord.cum_fees_quote), Decimal(0)).label("fees"), - ).group_by(ExecutorRecord.executor_type) - ) + # --- Breakdown by executor type (also excluding POSITION_HOLD to match aggregate totals) --- + type_stmt = select( + ExecutorRecord.executor_type, + func.count(ExecutorRecord.id).label("total"), + func.sum(case( + (ExecutorRecord.status != "RUNNING", 1), + else_=0, + )).label("completed"), + func.sum(case( + (ExecutorRecord.status == "RUNNING", 1), + else_=0, + )).label("running"), + func.coalesce(func.sum(ExecutorRecord.net_pnl_quote), Decimal(0)).label("pnl"), + func.coalesce(func.sum(ExecutorRecord.filled_amount_quote), Decimal(0)).label("vol"), + func.coalesce(func.sum(ExecutorRecord.cum_fees_quote), Decimal(0)).label("fees"), + ).where( + and_(*completed_filter) + ).group_by(ExecutorRecord.executor_type) type_rows = await self.session.execute(type_stmt) by_type = [ { @@ -344,15 +336,15 @@ def _where(stmt): # ======================================== async def create_executor_order( - self, - executor_id: str, - client_order_id: str, - order_type: str, - trade_type: str, - amount: Decimal, - price: Optional[Decimal] = None, - exchange_order_id: Optional[str] = None, - status: str = "SUBMITTED" + self, + executor_id: str, + client_order_id: str, + order_type: str, + trade_type: str, + amount: Decimal, + price: Optional[Decimal] = None, + exchange_order_id: Optional[str] = None, + status: str = "SUBMITTED" ) -> ExecutorOrder: """Create a new executor order record.""" order = ExecutorOrder( @@ -372,12 +364,12 @@ async def create_executor_order( return order async def update_executor_order( - self, - client_order_id: str, - status: Optional[str] = None, - filled_amount: Optional[Decimal] = None, - average_fill_price: Optional[Decimal] = None, - exchange_order_id: Optional[str] = None + self, + client_order_id: str, + status: Optional[str] = None, + filled_amount: Optional[Decimal] = None, + average_fill_price: Optional[Decimal] = None, + exchange_order_id: Optional[str] = None ) -> Optional[ExecutorOrder]: """Update an executor order record.""" stmt = select(ExecutorOrder).where(ExecutorOrder.client_order_id == client_order_id) @@ -400,9 +392,9 @@ async def update_executor_order( return order async def get_executor_orders( - self, - executor_id: str, - status: Optional[str] = None + self, + executor_id: str, + status: Optional[str] = None ) -> List[ExecutorOrder]: """Get orders for an executor.""" stmt = select(ExecutorOrder).where(ExecutorOrder.executor_id == executor_id) @@ -422,17 +414,15 @@ async def get_order_by_client_id(self, client_order_id: str) -> Optional[Executo return result.scalar_one_or_none() async def cleanup_orphaned_executors( - self, - active_executor_ids: List[str], - close_type: str = "SYSTEM_CLEANUP" + self, + active_executor_ids: List[str], + close_type: str = "SYSTEM_CLEANUP" ) -> int: """ Clean up orphaned executors - those marked as RUNNING but not in active memory. - Args: active_executor_ids: List of executor IDs currently active in memory close_type: Close type to set for cleaned up executors - Returns: Number of executors cleaned up """ @@ -440,15 +430,15 @@ async def cleanup_orphaned_executors( # Find executors that are RUNNING but not in the active list conditions = [ExecutorRecord.status == "RUNNING"] - + if active_executor_ids: conditions.append(~ExecutorRecord.executor_id.in_(active_executor_ids)) - + # First, get the count of orphaned executors for logging count_stmt = select(func.count(ExecutorRecord.id)).where(and_(*conditions)) count_result = await self.session.execute(count_stmt) orphaned_count = count_result.scalar() or 0 - + if orphaned_count > 0: # Update orphaned executors to TERMINATED status update_stmt = ( @@ -460,8 +450,8 @@ async def cleanup_orphaned_executors( closed_at=datetime.now(timezone.utc) ) ) - + await self.session.execute(update_stmt) await self.session.flush() - + return orphaned_count diff --git a/models/executors.py b/models/executors.py index 13f3a8e6..1c1f1941 100644 --- a/models/executors.py +++ b/models/executors.py @@ -276,6 +276,10 @@ class CreateExecutorRequest(BaseModel): None, description="Account name to use (defaults to master_account)" ) + controller_id: str = Field( + default="main", + description="Controller ID that owns this executor (for per-agent isolation)" + ) executor_config: Dict[str, Any] = Field( ..., description="Executor configuration. Must include 'type' field and executor-specific parameters." diff --git a/routers/executors.py b/routers/executors.py index 7e0aebdf..11ff01d7 100644 --- a/routers/executors.py +++ b/routers/executors.py @@ -62,7 +62,8 @@ async def create_executor( try: result = await executor_service.create_executor( executor_config=request.executor_config, - account_name=request.account_name + account_name=request.account_name, + controller_id=request.controller_id ) return CreateExecutorResponse(**result) except HTTPException: diff --git a/services/executor_service.py b/services/executor_service.py index 7e852bbd..62bd7ddb 100644 --- a/services/executor_service.py +++ b/services/executor_service.py @@ -316,7 +316,8 @@ def _get_trading_interface(self, account_name: str) -> AccountTradingInterface: async def create_executor( self, executor_config: Dict[str, Any], - account_name: Optional[str] = None + account_name: Optional[str] = None, + controller_id: Optional[str] = None ) -> Dict[str, Any]: """ Create and start a new executor. @@ -389,7 +390,7 @@ async def create_executor( # Store executor and metadata executor_id = typed_config.id - controller_id = getattr(typed_config, "controller_id", "main") or "main" + controller_id = controller_id or getattr(typed_config, "controller_id", "main") or "main" self._active_executors[executor_id] = executor self._executor_metadata[executor_id] = { "account_name": account,