From 1583c634b8e357d426c2169cc912efbecacc9845 Mon Sep 17 00:00:00 2001 From: cardosofede Date: Fri, 17 Apr 2026 11:37:19 -0300 Subject: [PATCH 1/5] (feat) update python version --- environment.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/environment.yml b/environment.yml index 6100a57c..4a3606a1 100644 --- a/environment.yml +++ b/environment.yml @@ -3,7 +3,7 @@ channels: - conda-forge - defaults dependencies: - - python=3.12 + - python=3.13 - fastapi - uvicorn - boto3 From fbbd622c4a2f8945cdafd605804819bc8b716c29 Mon Sep 17 00:00:00 2001 From: cardosofede Date: Fri, 17 Apr 2026 16:31:56 -0300 Subject: [PATCH 2/5] (feat) update executors db management --- database/connection.py | 5 + database/models.py | 115 +++++++++----- database/repositories/executor_repository.py | 121 ++++++++++++--- services/executor_service.py | 154 +++++++++++-------- services/orders_recorder.py | 126 ++++++++------- 5 files changed, 341 insertions(+), 180 deletions(-) diff --git a/database/connection.py b/database/connection.py index adcaf82d..f3c61ee1 100644 --- a/database/connection.py +++ b/database/connection.py @@ -64,6 +64,11 @@ async def _run_migrations(self, conn): "executors", "controller_id", "ALTER TABLE executors ADD COLUMN controller_id TEXT NOT NULL DEFAULT 'main'" ), + # Add error_log to executors table for storing errors on failed executors + ( + "executors", "error_log", + "ALTER TABLE executors ADD COLUMN error_log TEXT" + ), ] for table, column, sql in migrations: try: diff --git a/database/models.py b/database/models.py index 524a6a0b..8ec348e9 100644 --- a/database/models.py +++ b/database/models.py @@ -1,4 +1,4 @@ -from sqlalchemy import TIMESTAMP, Column, ForeignKey, Integer, Numeric, String, Text, func +from sqlalchemy import TIMESTAMP, Column, ForeignKey, Integer, Numeric, String, Text, UniqueConstraint, func from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import relationship @@ -12,7 +12,7 @@ class AccountState(Base): timestamp = Column(TIMESTAMP(timezone=True), server_default=func.now(), nullable=False, index=True) account_name = Column(String, nullable=False, index=True) connector_name = Column(String, nullable=False, index=True) - + token_states = relationship("TokenState", back_populates="account_state", cascade="all, delete-orphan") @@ -26,112 +26,113 @@ class TokenState(Base): price = Column(Numeric(precision=30, scale=18), nullable=False) value = Column(Numeric(precision=30, scale=18), nullable=False) available_units = Column(Numeric(precision=30, scale=18), nullable=False) - + account_state = relationship("AccountState", back_populates="token_states") class Order(Base): __tablename__ = "orders" - + id = Column(Integer, primary_key=True, index=True) # Order identification client_order_id = Column(String, nullable=False, unique=True, index=True) exchange_order_id = Column(String, nullable=True, index=True) - + # Timestamps created_at = Column(TIMESTAMP(timezone=True), server_default=func.now(), nullable=False, index=True) updated_at = Column(TIMESTAMP(timezone=True), server_default=func.now(), onupdate=func.now(), nullable=False) - + # Account and connector info account_name = Column(String, nullable=False, index=True) connector_name = Column(String, nullable=False, index=True) - + # Order details trading_pair = Column(String, nullable=False, index=True) trade_type = Column(String, nullable=False) # BUY, SELL order_type = Column(String, nullable=False) # LIMIT, MARKET, LIMIT_MAKER amount = Column(Numeric(precision=30, scale=18), nullable=False) price = Column(Numeric(precision=30, scale=18), nullable=True) # Null for market orders - + # Order status and execution - status = Column(String, nullable=False, default="SUBMITTED", index=True) # SUBMITTED, OPEN, FILLED, CANCELLED, FAILED + status = Column(String, nullable=False, default="SUBMITTED", + index=True) # SUBMITTED, OPEN, FILLED, CANCELLED, FAILED filled_amount = Column(Numeric(precision=30, scale=18), nullable=False, default=0) average_fill_price = Column(Numeric(precision=30, scale=18), nullable=True) - + # Fee information fee_paid = Column(Numeric(precision=30, scale=18), default=0, nullable=True) fee_currency = Column(String, nullable=True) - + # Additional metadata error_message = Column(Text, nullable=True) - + # Relationships for future enhancements trades = relationship("Trade", back_populates="order", cascade="all, delete-orphan") class Trade(Base): __tablename__ = "trades" - + id = Column(Integer, primary_key=True, index=True) order_id = Column(Integer, ForeignKey("orders.id"), nullable=False) - + # Trade identification trade_id = Column(String, nullable=False, unique=True, index=True) - + # Timestamps timestamp = Column(TIMESTAMP(timezone=True), nullable=False, index=True) - + # Trade details trading_pair = Column(String, nullable=False, index=True) trade_type = Column(String, nullable=False) # BUY, SELL amount = Column(Numeric(precision=30, scale=18), nullable=False) price = Column(Numeric(precision=30, scale=18), nullable=False) - + # Fee information fee_paid = Column(Numeric(precision=30, scale=18), nullable=False, default=0) fee_currency = Column(String, nullable=True) - + # Relationship order = relationship("Order", back_populates="trades") class PositionSnapshot(Base): __tablename__ = "position_snapshots" - + id = Column(Integer, primary_key=True, index=True) - + # Position identification account_name = Column(String, nullable=False, index=True) connector_name = Column(String, nullable=False, index=True) trading_pair = Column(String, nullable=False, index=True) - + # Timestamps timestamp = Column(TIMESTAMP(timezone=True), server_default=func.now(), nullable=False, index=True) - + # Real-time exchange data (from connector.account_positions) side = Column(String, nullable=False) # LONG, SHORT exchange_size = Column(Numeric(precision=30, scale=18), nullable=False) # Size from exchange entry_price = Column(Numeric(precision=30, scale=18), nullable=True) # Average entry price mark_price = Column(Numeric(precision=30, scale=18), nullable=True) # Current mark price - + # Real-time PnL data (can't be derived from trades alone) unrealized_pnl = Column(Numeric(precision=30, scale=18), nullable=True) # From exchange percentage_pnl = Column(Numeric(precision=10, scale=6), nullable=True) # PnL percentage - + # Leverage and margin info leverage = Column(Numeric(precision=10, scale=2), nullable=True) # Position leverage initial_margin = Column(Numeric(precision=30, scale=18), nullable=True) # Initial margin maintenance_margin = Column(Numeric(precision=30, scale=18), nullable=True) # Maintenance margin - + # Fee tracking (exchange provides cumulative data) cumulative_funding_fees = Column(Numeric(precision=30, scale=18), nullable=False, default=0) # Funding fees fee_currency = Column(String, nullable=True) # Fee currency (usually USDT) - + # Reconciliation fields (calculated from our trade data) calculated_size = Column(Numeric(precision=30, scale=18), nullable=True) # Size from our trades calculated_entry_price = Column(Numeric(precision=30, scale=18), nullable=True) # Entry from our trades size_difference = Column(Numeric(precision=30, scale=18), nullable=True) # Difference for reconciliation - + # Additional metadata exchange_position_id = Column(String, nullable=True, index=True) # Exchange position ID is_reconciled = Column(String, nullable=False, default="PENDING") # RECONCILED, MISMATCH, PENDING @@ -139,29 +140,29 @@ class PositionSnapshot(Base): class FundingPayment(Base): __tablename__ = "funding_payments" - + id = Column(Integer, primary_key=True, index=True) - + # Payment identification funding_payment_id = Column(String, nullable=False, unique=True, index=True) - + # Timestamps timestamp = Column(TIMESTAMP(timezone=True), nullable=False, index=True) - + # Account and connector info account_name = Column(String, nullable=False, index=True) connector_name = Column(String, nullable=False, index=True) - + # Funding details trading_pair = Column(String, nullable=False, index=True) funding_rate = Column(Numeric(precision=20, scale=18), nullable=False) # Funding rate funding_payment = Column(Numeric(precision=30, scale=18), nullable=False) # Payment amount fee_currency = Column(String, nullable=False) # Payment currency (usually USDT) - + # Position association position_size = Column(Numeric(precision=30, scale=18), nullable=True) # Position size at time of payment position_side = Column(String, nullable=True) # LONG, SHORT - + # Additional metadata exchange_funding_id = Column(String, nullable=True, index=True) # Exchange funding ID @@ -277,7 +278,8 @@ class GatewayCLMMPosition(Base): # Price tracking for PnL calculation entry_price = Column(Numeric(precision=30, scale=18), nullable=True) # Pool price when position opened - current_price = Column(Numeric(precision=30, scale=18), nullable=True) # Latest price (becomes close price when closed) + current_price = Column(Numeric(precision=30, scale=18), + nullable=True) # Latest price (becomes close price when closed) # Initial deposit amounts (for PnL calculation) initial_base_token_amount = Column(Numeric(precision=30, scale=18), nullable=True) @@ -322,7 +324,8 @@ class GatewayCLMMEvent(Base): timestamp = Column(TIMESTAMP(timezone=True), server_default=func.now(), nullable=False, index=True) # Event type - event_type = Column(String, nullable=False, index=True) # OPEN, ADD_LIQUIDITY, REMOVE_LIQUIDITY, COLLECT_FEES, CLOSE + event_type = Column(String, nullable=False, + index=True) # OPEN, ADD_LIQUIDITY, REMOVE_LIQUIDITY, COLLECT_FEES, CLOSE # Event amounts base_token_amount = Column(Numeric(precision=30, scale=18), nullable=True) @@ -374,6 +377,9 @@ class ExecutorRecord(Base): cum_fees_quote = Column(Numeric(precision=30, scale=18), nullable=False, default=0) filled_amount_quote = Column(Numeric(precision=30, scale=18), nullable=False, default=0) + # Error tracking + error_log = Column(Text, nullable=True) # JSON: last errors captured during execution + # Configuration (JSON) config = Column(Text, nullable=True) @@ -384,6 +390,41 @@ class ExecutorRecord(Base): orders = relationship("ExecutorOrder", back_populates="executor", cascade="all, delete-orphan") +class PositionHoldRecord(Base): + """Database model for position hold tracking (separate from executor lifecycle).""" + __tablename__ = "position_holds" + __table_args__ = ( + UniqueConstraint( + "account_name", "connector_name", "trading_pair", "controller_id", + name="uq_position_hold_key" + ), + ) + + id = Column(Integer, primary_key=True, index=True) + + # Position identification + account_name = Column(String, nullable=False, index=True) + connector_name = Column(String, nullable=False, index=True) + trading_pair = Column(String, nullable=False, index=True) + controller_id = Column(String, nullable=False, default="main", index=True) + + # Aggregated amounts + buy_amount_base = Column(Numeric(precision=30, scale=18), nullable=False, default=0) + buy_amount_quote = Column(Numeric(precision=30, scale=18), nullable=False, default=0) + sell_amount_base = Column(Numeric(precision=30, scale=18), nullable=False, default=0) + sell_amount_quote = Column(Numeric(precision=30, scale=18), nullable=False, default=0) + realized_pnl_quote = Column(Numeric(precision=30, scale=18), nullable=False, default=0) + + # Tracking + executor_ids = Column(Text, nullable=True) # JSON array of executor IDs + status = Column(String, nullable=False, default="ACTIVE", index=True) # ACTIVE, CLEARED + + # Timestamps + created_at = Column(TIMESTAMP(timezone=True), server_default=func.now(), nullable=False, index=True) + last_updated = Column(TIMESTAMP(timezone=True), server_default=func.now(), onupdate=func.now(), nullable=False) + cleared_at = Column(TIMESTAMP(timezone=True), nullable=True) + + class ExecutorOrder(Base): """Database model for orders created by executors.""" __tablename__ = "executor_orders" @@ -414,5 +455,3 @@ class ExecutorOrder(Base): # Relationship executor = relationship("ExecutorRecord", back_populates="orders") - - diff --git a/database/repositories/executor_repository.py b/database/repositories/executor_repository.py index 09ee48ce..274e3024 100644 --- a/database/repositories/executor_repository.py +++ b/database/repositories/executor_repository.py @@ -8,7 +8,7 @@ from sqlalchemy import and_, case, desc, func, select from sqlalchemy.ext.asyncio import AsyncSession -from database.models import ExecutorOrder, ExecutorRecord +from database.models import ExecutorOrder, ExecutorRecord, PositionHoldRecord class ExecutorRepository: @@ -58,7 +58,8 @@ async def update_executor( net_pnl_pct: Optional[Decimal] = None, cum_fees_quote: Optional[Decimal] = None, filled_amount_quote: Optional[Decimal] = None, - final_state: Optional[str] = None + final_state: Optional[str] = None, + error_log: Optional[str] = None ) -> Optional[ExecutorRecord]: """Update an executor record.""" stmt = select(ExecutorRecord).where(ExecutorRecord.executor_id == executor_id) @@ -81,6 +82,8 @@ async def update_executor( executor.filled_amount_quote = filled_amount_quote if final_state is not None: executor.final_state = final_state + if error_log is not None: + executor.error_log = error_log await self.session.flush() await self.session.refresh(executor) @@ -177,33 +180,113 @@ async def get_position_hold_executors( result = await self.session.execute(stmt) return list(result.scalars().all()) - async def clear_position_hold_executors( + # ======================================== + # PositionHoldRecord Operations + # ======================================== + + async def upsert_position_hold( self, account_name: str, connector_name: str, trading_pair: str, - controller_id: str = "main" - ) -> int: - """Clear close_type for POSITION_HOLD executors so they won't be recovered on restart.""" - from sqlalchemy import update + controller_id: str, + buy_amount_base: Decimal, + buy_amount_quote: Decimal, + sell_amount_base: Decimal, + sell_amount_quote: Decimal, + realized_pnl_quote: Decimal, + executor_ids: List[str] + ) -> PositionHoldRecord: + """Create or update a position hold record.""" + import json as _json + + stmt = select(PositionHoldRecord).where(and_( + PositionHoldRecord.account_name == account_name, + PositionHoldRecord.connector_name == connector_name, + PositionHoldRecord.trading_pair == trading_pair, + PositionHoldRecord.controller_id == controller_id, + PositionHoldRecord.status == "ACTIVE", + )) + result = await self.session.execute(stmt) + record = result.scalar_one_or_none() + + if record: + record.buy_amount_base = buy_amount_base + record.buy_amount_quote = buy_amount_quote + record.sell_amount_base = sell_amount_base + record.sell_amount_quote = sell_amount_quote + record.realized_pnl_quote = realized_pnl_quote + record.executor_ids = _json.dumps(executor_ids) + else: + record = PositionHoldRecord( + account_name=account_name, + connector_name=connector_name, + trading_pair=trading_pair, + controller_id=controller_id, + buy_amount_base=buy_amount_base, + buy_amount_quote=buy_amount_quote, + sell_amount_base=sell_amount_base, + sell_amount_quote=sell_amount_quote, + realized_pnl_quote=realized_pnl_quote, + executor_ids=_json.dumps(executor_ids), + status="ACTIVE", + ) + self.session.add(record) - conditions = [ - ExecutorRecord.close_type == "POSITION_HOLD", - ExecutorRecord.account_name == account_name, - ExecutorRecord.connector_name == connector_name, - ExecutorRecord.trading_pair == trading_pair, - ExecutorRecord.controller_id == controller_id, - ] + await self.session.flush() + return record + + async def get_active_position_holds( + self, + account_name: Optional[str] = None, + connector_name: Optional[str] = None, + trading_pair: Optional[str] = None, + controller_id: Optional[str] = None, + ) -> List[PositionHoldRecord]: + """Get all ACTIVE position hold records.""" + stmt = select(PositionHoldRecord).where(PositionHoldRecord.status == "ACTIVE") + + conditions = [] + if account_name: + conditions.append(PositionHoldRecord.account_name == account_name) + if connector_name: + conditions.append(PositionHoldRecord.connector_name == connector_name) + if trading_pair: + conditions.append(PositionHoldRecord.trading_pair == trading_pair) + if controller_id: + conditions.append(PositionHoldRecord.controller_id == controller_id) + + if conditions: + stmt = stmt.where(and_(*conditions)) + + stmt = stmt.order_by(desc(PositionHoldRecord.last_updated)) + result = await self.session.execute(stmt) + return list(result.scalars().all()) + + async def clear_position_hold( + self, + account_name: str, + connector_name: str, + trading_pair: str, + controller_id: str = "main" + ) -> bool: + """Mark a position hold as CLEARED.""" + from sqlalchemy import update as sa_update stmt = ( - update(ExecutorRecord) - .where(and_(*conditions)) - .values(close_type="POSITION_HOLD_CLEARED") + sa_update(PositionHoldRecord) + .where(and_( + PositionHoldRecord.account_name == account_name, + PositionHoldRecord.connector_name == connector_name, + PositionHoldRecord.trading_pair == trading_pair, + PositionHoldRecord.controller_id == controller_id, + PositionHoldRecord.status == "ACTIVE", + )) + .values(status="CLEARED", cleared_at=datetime.now(timezone.utc)) ) - result = await self.session.execute(stmt) await self.session.commit() - return result.rowcount + return result.rowcount > 0 async def get_executor_stats(self) -> Dict[str, Any]: """Get statistics about executors.""" diff --git a/services/executor_service.py b/services/executor_service.py index 3955c39f..1becb5ce 100644 --- a/services/executor_service.py +++ b/services/executor_service.py @@ -138,10 +138,7 @@ def start(self): async def recover_positions_from_db(self): """ - Recover position holds from database on startup. - - This loads executors that closed with POSITION_HOLD (keep_position=True) - and reconstructs the _positions_held tracking from their final state. + Recover position holds from the dedicated position_holds table on startup. """ if not self.db_manager: return @@ -151,68 +148,37 @@ async def recover_positions_from_db(self): from database.repositories.executor_repository import ExecutorRepository repo = ExecutorRepository(session) - position_hold_executors = await repo.get_position_hold_executors() + records = await repo.get_active_position_holds() - for executor_record in position_hold_executors: - # Build position key - controller_id = getattr(executor_record, "controller_id", "main") or "main" + for record in records: + controller_id = record.controller_id or "main" position_key = self._get_position_key( - executor_record.account_name, - executor_record.connector_name, - executor_record.trading_pair, + record.account_name, + record.connector_name, + record.trading_pair, controller_id ) - # Initialize position if needed - if position_key not in self._positions_held: - self._positions_held[position_key] = PositionHold( - trading_pair=executor_record.trading_pair, - connector_name=executor_record.connector_name, - account_name=executor_record.account_name, - controller_id=controller_id, - ) - - position = self._positions_held[position_key] - - # Try to extract fill data from final_state - if executor_record.final_state: + executor_ids = [] + if record.executor_ids: try: - final_state = json.loads(executor_record.final_state) - - # Process held_position_orders (most accurate source) - held_orders = final_state.get("held_position_orders", []) - if held_orders: - buy_filled_base = Decimal("0") - buy_filled_quote = Decimal("0") - sell_filled_base = Decimal("0") - sell_filled_quote = Decimal("0") - - for order in held_orders: - if isinstance(order, dict): - trade_type = order.get("trade_type", "BUY") - exec_base = Decimal(str(order.get("executed_amount_base", 0))) - exec_quote = Decimal(str(order.get("executed_amount_quote", 0))) - - if trade_type == "BUY": - buy_filled_base += exec_base - buy_filled_quote += exec_quote - else: - sell_filled_base += exec_base - sell_filled_quote += exec_quote - - # Add fills using proper method - if buy_filled_base > 0: - position.add_fill("BUY", buy_filled_base, buy_filled_quote, executor_record.executor_id) - if sell_filled_base > 0: - position.add_fill("SELL", sell_filled_base, sell_filled_quote, executor_record.executor_id) - - logger.debug( - f"Recovered position from {executor_record.executor_id}: " - f"buy={buy_filled_base} base, sell={sell_filled_base} base" - ) - - except (json.JSONDecodeError, TypeError) as e: - logger.debug(f"Could not parse final_state for {executor_record.executor_id}: {e}") + executor_ids = json.loads(record.executor_ids) + except (json.JSONDecodeError, TypeError): + pass + + self._positions_held[position_key] = PositionHold( + trading_pair=record.trading_pair, + connector_name=record.connector_name, + account_name=record.account_name, + controller_id=controller_id, + buy_amount_base=Decimal(str(record.buy_amount_base or 0)), + buy_amount_quote=Decimal(str(record.buy_amount_quote or 0)), + sell_amount_base=Decimal(str(record.sell_amount_base or 0)), + sell_amount_quote=Decimal(str(record.sell_amount_quote or 0)), + realized_pnl_quote=Decimal(str(record.realized_pnl_quote or 0)), + executor_ids=executor_ids, + last_updated=record.last_updated, + ) if self._positions_held: logger.info(f"Recovered {len(self._positions_held)} position holds from database") @@ -663,6 +629,18 @@ def _format_executor_info( def _format_db_record(self, record) -> Dict[str, Any]: """Format a database ExecutorRecord for API response.""" + # Parse error_log from DB for completed executors + error_count = 0 + last_error = None + if record.error_log: + try: + errors = json.loads(record.error_log) + error_count = len(errors) + if errors: + last_error = errors[-1].get("message") + except (json.JSONDecodeError, TypeError): + pass + return { "executor_id": record.executor_id, "executor_type": record.executor_type, @@ -685,6 +663,8 @@ def _format_db_record(self, record) -> Dict[str, Any]: "filled_amount_quote": float(record.filled_amount_quote) if record.filled_amount_quote else 0.0, "config": json.loads(record.config) if record.config else None, "custom_info": json.loads(record.final_state) if record.final_state else None, + "error_count": error_count, + "last_error": last_error, } def get_summary(self) -> Dict[str, Any]: @@ -900,6 +880,23 @@ async def _persist_executor_completed(self, executor_id: str, executor: Executor except Exception: final_state_json = None + # Capture error logs before persisting + error_log_json = None + error_count = self._log_capture.get_error_count(executor_id) + if error_count > 0: + try: + error_entries = self._log_capture.get_logs(executor_id, level="ERROR") + error_log_json = json.dumps([ + { + "timestamp": entry.get("timestamp"), + "message": entry.get("message"), + "exc_info": entry.get("exc_info"), + } + for entry in error_entries + ]) + except Exception as e: + logger.debug(f"Failed to serialize error logs for {executor_id}: {e}") + async with self.db_manager.get_session_context() as session: from database.repositories.executor_repository import ExecutorRepository repo = ExecutorRepository(session) @@ -912,7 +909,8 @@ async def _persist_executor_completed(self, executor_id: str, executor: Executor net_pnl_pct=net_pnl_pct, cum_fees_quote=cum_fees_quote, filled_amount_quote=filled_amount_quote, - final_state=final_state_json + final_state=final_state_json, + error_log=error_log_json ) logger.debug(f"Persisted executor {executor_id} completion to database") @@ -1040,9 +1038,35 @@ async def _aggregate_position_hold( else: logger.debug(f"Executor {executor_id} has no filled amounts to aggregate") + # Persist position hold to the dedicated table + await self._persist_position_hold(position) + except Exception as e: logger.error(f"Error aggregating position for executor {executor_id}: {e}", exc_info=True) + async def _persist_position_hold(self, position: PositionHold): + """Persist a position hold to the dedicated position_holds table.""" + if not self.db_manager: + return + try: + async with self.db_manager.get_session_context() as session: + from database.repositories.executor_repository import ExecutorRepository + repo = ExecutorRepository(session) + await repo.upsert_position_hold( + account_name=position.account_name, + connector_name=position.connector_name, + trading_pair=position.trading_pair, + controller_id=position.controller_id, + buy_amount_base=position.buy_amount_base, + buy_amount_quote=position.buy_amount_quote, + sell_amount_base=position.sell_amount_base, + sell_amount_quote=position.sell_amount_quote, + realized_pnl_quote=position.realized_pnl_quote, + executor_ids=position.executor_ids, + ) + except Exception as e: + logger.error(f"Error persisting position hold: {e}", exc_info=True) + def get_positions_held( self, account_name: Optional[str] = None, @@ -1125,21 +1149,21 @@ async def clear_position_held( position_key = self._get_position_key(account_name, connector_name, trading_pair, controller_id) if position_key in self._positions_held: del self._positions_held[position_key] - # Also clear from database so they don't get reloaded on restart + # Mark position hold as CLEARED in the dedicated table if self.db_manager: try: async with self.db_manager.get_session_context() as session: from database.repositories.executor_repository import ExecutorRepository repo = ExecutorRepository(session) - count = await repo.clear_position_hold_executors( + cleared = await repo.clear_position_hold( account_name=account_name, connector_name=connector_name, trading_pair=trading_pair, controller_id=controller_id ) - logger.info(f"Cleared {count} position hold records from database for {position_key}") + logger.info(f"Cleared position hold record from database for {position_key}: {cleared}") except Exception as e: - logger.error(f"Failed to clear position holds from database: {e}", exc_info=True) + logger.error(f"Failed to clear position hold from database: {e}", exc_info=True) logger.info(f"Cleared position hold for {position_key}") return True return False diff --git a/services/orders_recorder.py b/services/orders_recorder.py index 04b41bce..ad563617 100644 --- a/services/orders_recorder.py +++ b/services/orders_recorder.py @@ -21,20 +21,20 @@ class OrdersRecorder: Custom orders recorder that mimics Hummingbot's MarketsRecorder functionality but uses our AsyncDatabaseManager for storage. """ - + def __init__(self, db_manager: AsyncDatabaseManager, account_name: str, connector_name: str): self.db_manager = db_manager self.account_name = account_name self.connector_name = connector_name self._connector: Optional[ConnectorBase] = None - + # Create event forwarders similar to MarketsRecorder self._create_order_forwarder = SourceInfoEventForwarder(self._did_create_order) self._fill_order_forwarder = SourceInfoEventForwarder(self._did_fill_order) self._cancel_order_forwarder = SourceInfoEventForwarder(self._did_cancel_order) self._fail_order_forwarder = SourceInfoEventForwarder(self._did_fail_order) self._complete_order_forwarder = SourceInfoEventForwarder(self._did_complete_order) - + # Event pairs mapping events to forwarders self._event_pairs = [ (MarketEvent.BuyOrderCreated, self._create_order_forwarder), @@ -45,12 +45,13 @@ def __init__(self, db_manager: AsyncDatabaseManager, account_name: str, connecto (MarketEvent.BuyOrderCompleted, self._complete_order_forwarder), (MarketEvent.SellOrderCompleted, self._complete_order_forwarder), ] - + def start(self, connector: ConnectorBase): """Start recording orders for the given connector""" # Idempotency guard: prevent double-registration of listeners if self._connector is not None: - logger.warning(f"OrdersRecorder already started for {self.account_name}/{self.connector_name}, ignoring duplicate start") + logger.warning( + f"OrdersRecorder already started for {self.account_name}/{self.connector_name}, ignoring duplicate start") return self._connector = connector @@ -59,37 +60,38 @@ def start(self, connector: ConnectorBase): for event, forwarder in self._event_pairs: connector.add_listener(event, forwarder) logger.info(f"OrdersRecorder: Added listener for {event} with forwarder {forwarder}") - + # Debug: Check if listeners were actually added if hasattr(connector, '_event_listeners'): listeners = connector._event_listeners.get(event, []) logger.info(f"OrdersRecorder: Event {event} now has {len(listeners)} listeners") for i, listener in enumerate(listeners): logger.info(f"OrdersRecorder: Listener {i}: {listener}") - - logger.info(f"OrdersRecorder started for {self.account_name}/{self.connector_name} with {len(self._event_pairs)} event listeners") - + + logger.info( + f"OrdersRecorder started for {self.account_name}/{self.connector_name} with {len(self._event_pairs)} event listeners") + # Debug: Print connector info logger.info(f"OrdersRecorder: Connector type: {type(connector)}") logger.info(f"OrdersRecorder: Connector name: {getattr(connector, 'name', 'unknown')}") logger.info(f"OrdersRecorder: Connector ready: {getattr(connector, 'ready', 'unknown')}") - + # Test if forwarders are callable for event, forwarder in self._event_pairs: if callable(forwarder): logger.info(f"OrdersRecorder: Forwarder for {event} is callable") else: logger.error(f"OrdersRecorder: Forwarder for {event} is NOT callable: {type(forwarder)}") - + async def stop(self): """Stop recording orders""" if self._connector: # Remove all event listeners for event, forwarder in self._event_pairs: self._connector.remove_listener(event, forwarder) - + logger.info(f"OrdersRecorder stopped for {self.account_name}/{self.connector_name}") - + def _extract_error_message(self, event) -> str: """Extract error message from various possible event attributes.""" # Try different possible attribute names for error messages @@ -98,11 +100,12 @@ def _extract_error_message(self, event) -> str: error_value = getattr(event, attr_name) if error_value: return str(error_value) - + # If no error message found, create a descriptive one return f"Order failed: {event.__class__.__name__}" - - def _did_create_order(self, event_tag: int, market: ConnectorBase, event: Union[BuyOrderCreatedEvent, SellOrderCreatedEvent]): + + def _did_create_order(self, event_tag: int, market: ConnectorBase, + event: Union[BuyOrderCreatedEvent, SellOrderCreatedEvent]): """Handle order creation events - called by SourceInfoEventForwarder""" logger.info(f"OrdersRecorder: _did_create_order called for order {getattr(event, 'order_id', 'unknown')}") try: @@ -112,61 +115,64 @@ def _did_create_order(self, event_tag: int, market: ConnectorBase, event: Union[ asyncio.create_task(self._handle_order_created(event, trade_type)) except Exception as e: logger.error(f"Error in _did_create_order: {e}") - + def _did_fill_order(self, event_tag: int, market: ConnectorBase, event: OrderFilledEvent): """Handle order fill events - called by SourceInfoEventForwarder""" try: asyncio.create_task(self._handle_order_filled(event)) except Exception as e: logger.error(f"Error in _did_fill_order: {e}") - + def _did_cancel_order(self, event_tag: int, market: ConnectorBase, event: Any): """Handle order cancel events - called by SourceInfoEventForwarder""" try: asyncio.create_task(self._handle_order_cancelled(event)) except Exception as e: logger.error(f"Error in _did_cancel_order: {e}") - + def _did_fail_order(self, event_tag: int, market: ConnectorBase, event: Any): """Handle order failure events - called by SourceInfoEventForwarder""" try: asyncio.create_task(self._handle_order_failed(event)) except Exception as e: logger.error(f"Error in _did_fail_order: {e}") - + def _did_complete_order(self, event_tag: int, market: ConnectorBase, event: Any): """Handle order completion events - called by SourceInfoEventForwarder""" try: asyncio.create_task(self._handle_order_completed(event)) except Exception as e: logger.error(f"Error in _did_complete_order: {e}") - - async def _handle_order_created(self, event: Union[BuyOrderCreatedEvent, SellOrderCreatedEvent], trade_type: TradeType): + + async def _handle_order_created(self, event: Union[BuyOrderCreatedEvent, SellOrderCreatedEvent], + trade_type: TradeType): """Handle order creation events""" logger.info(f"OrdersRecorder: _handle_order_created started for order {event.order_id}") try: async with self.db_manager.get_session_context() as session: order_repo = OrderRepository(session) - + # Check if order already exists first existing_order = await order_repo.get_order_by_client_id(event.order_id) if existing_order: - logger.info(f"OrdersRecorder: Order {event.order_id} already exists with status {existing_order.status}") - + logger.info( + f"OrdersRecorder: Order {event.order_id} already exists with status {existing_order.status}") + # Update exchange_order_id if we have it now and it was missing exchange_order_id = getattr(event, 'exchange_order_id', None) if exchange_order_id and not existing_order.exchange_order_id: existing_order.exchange_order_id = exchange_order_id - logger.info(f"OrdersRecorder: Updated exchange_order_id to {exchange_order_id} for order {event.order_id}") - + logger.info( + f"OrdersRecorder: Updated exchange_order_id to {exchange_order_id} for order {event.order_id}") + # Update status if it's still in PENDING_CREATE or similar early state if existing_order.status in ["PENDING_CREATE", "PENDING", "SUBMITTED"]: existing_order.status = "OPEN" logger.info(f"OrdersRecorder: Updated status to OPEN for order {event.order_id}") - + await session.flush() return - + order_data = { "client_order_id": event.order_id, "account_name": self.account_name, @@ -180,11 +186,11 @@ async def _handle_order_created(self, event: Union[BuyOrderCreatedEvent, SellOrd "exchange_order_id": getattr(event, 'exchange_order_id', None) } await order_repo.create_order(order_data) - + logger.info(f"OrdersRecorder: Successfully recorded order created: {event.order_id}") except Exception as e: logger.error(f"OrdersRecorder: Error recording order created: {e}") - + async def _handle_order_filled(self, event: OrderFilledEvent): """Handle order fill events""" try: @@ -195,7 +201,7 @@ async def _handle_order_filled(self, event: OrderFilledEvent): # Calculate fees trade_fee_paid = 0 trade_fee_currency = None - + if event.trade_fee: try: base_asset, quote_asset = event.trading_pair.split("-") @@ -204,7 +210,6 @@ async def _handle_order_filled(self, event: OrderFilledEvent): price=event.price, order_amount=event.amount, token=quote_asset, - exchange=self._connector, ) trade_fee_paid = float(fee_in_quote) trade_fee_currency = quote_asset @@ -222,7 +227,8 @@ async def _handle_order_filled(self, event: OrderFilledEvent): if fallback_fee is not None: trade_fee_paid = float(fallback_fee) trade_fee_currency = quote_asset - logger.info(f"Fallback fee calculation succeeded: {trade_fee_paid} {trade_fee_currency}") + logger.info( + f"Fallback fee calculation succeeded: {trade_fee_paid} {trade_fee_currency}") else: logger.error(f"Fallback fee calculation returned None for {event.order_id}") trade_fee_paid = 0 @@ -236,7 +242,7 @@ async def _handle_order_filled(self, event: OrderFilledEvent): filled_amount = Decimal(str(event.amount)) average_fill_price = Decimal(str(event.price)) fee_paid_decimal = Decimal(str(trade_fee_paid)) if trade_fee_paid else None - + order = await order_repo.update_order_fill( client_order_id=event.order_id, filled_amount=filled_amount, @@ -247,12 +253,13 @@ async def _handle_order_filled(self, event: OrderFilledEvent): except (ValueError, InvalidOperation) as e: logger.error(f"Error processing order fill for {event.order_id}: {e}, skipping update") return - + # Create trade record using validated values if order: try: # Validate all values before creating trade record - validated_timestamp = event.timestamp if event.timestamp and not math.isnan(event.timestamp) else time.time() + validated_timestamp = event.timestamp if event.timestamp and not math.isnan( + event.timestamp) else time.time() validated_fee = trade_fee_paid if trade_fee_paid and not math.isnan(trade_fee_paid) else 0 # Use exchange_trade_id if available (unique per fill), fallback to generated id @@ -279,12 +286,14 @@ async def _handle_order_filled(self, event: OrderFilledEvent): logger.debug(f"Trade {trade_id} already exists, skipping duplicate") except (ValueError, TypeError) as e: logger.error(f"Error creating trade record for {event.order_id}: {e}") - logger.error(f"Trade data that failed: timestamp={event.timestamp}, amount={event.amount}, price={event.price}, fee={trade_fee_paid}") - + logger.error( + f"Trade data that failed: timestamp={event.timestamp}, " + f"amount={event.amount}, price={event.price}, fee={trade_fee_paid}") + logger.debug(f"Recorded order fill: {event.order_id} - {event.amount} @ {event.price}") except Exception as e: logger.error(f"Error recording order fill: {e}") - + async def _handle_order_cancelled(self, event: Any): """Handle order cancellation events""" try: @@ -294,11 +303,11 @@ async def _handle_order_cancelled(self, event: Any): client_order_id=event.order_id, status="CANCELLED" ) - + logger.debug(f"Recorded order cancelled: {event.order_id}") except Exception as e: logger.error(f"Error recording order cancellation: {e}") - + def _get_order_details_from_connector(self, order_id: str) -> Optional[dict]: """Try to get order details from connector's tracked orders""" try: @@ -344,12 +353,12 @@ async def _fetch_conversion_rate(self, from_token: str, to_token: str) -> Option return None async def _calculate_fee_fallback( - self, - trade_fee, - base_asset: str, - quote_asset: str, - fill_price: Decimal, - order_amount: Decimal, + self, + trade_fee, + base_asset: str, + quote_asset: str, + fill_price: Decimal, + order_amount: Decimal, ) -> Optional[Decimal]: """Manually compute the trade fee in quote asset when the primary method fails.""" fee_amount = Decimal(0) @@ -381,13 +390,13 @@ async def _handle_order_failed(self, event: Any): try: async with self.db_manager.get_session_context() as session: order_repo = OrderRepository(session) - + # Check if order exists, if not try to get details from connector's tracked orders existing_order = await order_repo.get_order_by_client_id(event.order_id) if existing_order: # Extract error message from various possible attributes error_msg = self._extract_error_message(event) - + # Update existing order with failure status and error message await order_repo.update_order_status( client_order_id=event.order_id, @@ -400,7 +409,7 @@ async def _handle_order_failed(self, event: Any): order_details = self._get_order_details_from_connector(event.order_id) if order_details: logger.info(f"Retrieved order details from connector for {event.order_id}: {order_details}") - + # Create order record as FAILED with available details if order_details: order_data = { @@ -422,20 +431,21 @@ async def _handle_order_failed(self, event: Any): "account_name": self.account_name, "connector_name": self.connector_name, "trading_pair": "UNKNOWN", - "trade_type": "UNKNOWN", + "trade_type": "UNKNOWN", "order_type": "UNKNOWN", "amount": 0.0, "price": None, "status": "FAILED", "error_message": self._extract_error_message(event) } - + try: await order_repo.create_order(order_data) logger.info(f"Created failed order record for {event.order_id}") except Exception as create_error: # If creation fails due to duplicate key, try to update existing order - if "duplicate key" in str(create_error).lower() or "unique constraint" in str(create_error).lower(): + if "duplicate key" in str(create_error).lower() or "unique constraint" in str( + create_error).lower(): logger.info(f"Order {event.order_id} already exists, updating status to FAILED") await order_repo.update_order_status( client_order_id=event.order_id, @@ -444,10 +454,10 @@ async def _handle_order_failed(self, event: Any): ) else: raise create_error - + except Exception as e: logger.error(f"Error recording order failure: {e}") - + async def _handle_order_completed(self, event: Any): """Handle order completion events""" try: @@ -457,7 +467,7 @@ async def _handle_order_completed(self, event: Any): if order: order.status = "FILLED" order.exchange_order_id = getattr(event, 'exchange_order_id', None) - + logger.debug(f"Recorded order completed: {event.order_id}") except Exception as e: - logger.error(f"Error recording order completion: {e}") \ No newline at end of file + logger.error(f"Error recording order completion: {e}") From 7b15987a6f393c22ec015adaa649e56d5eb23af1 Mon Sep 17 00:00:00 2001 From: cardosofede Date: Fri, 17 Apr 2026 16:32:59 -0300 Subject: [PATCH 3/5] (feat) revert version --- environment.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/environment.yml b/environment.yml index 4a3606a1..6100a57c 100644 --- a/environment.yml +++ b/environment.yml @@ -3,7 +3,7 @@ channels: - conda-forge - defaults dependencies: - - python=3.13 + - python=3.12 - fastapi - uvicorn - boto3 From 67d57b2f03d249d2bf1ff00ed3f45d3ba76e3a10 Mon Sep 17 00:00:00 2001 From: cardosofede Date: Mon, 20 Apr 2026 11:49:50 -0300 Subject: [PATCH 4/5] (feat) update pmm mister --- bots/controllers/generic/pmm_mister.py | 1347 +++++++++--------------- 1 file changed, 493 insertions(+), 854 deletions(-) diff --git a/bots/controllers/generic/pmm_mister.py b/bots/controllers/generic/pmm_mister.py index 03aef7f5..353beee0 100644 --- a/bots/controllers/generic/pmm_mister.py +++ b/bots/controllers/generic/pmm_mister.py @@ -1,5 +1,6 @@ +from collections import defaultdict from decimal import Decimal -from typing import Dict, List, Optional, Set, Tuple, Union +from typing import Dict, List, Optional, Tuple, Union from hummingbot.core.data_type.common import MarketDict, OrderType, PositionMode, PriceType, TradeType from hummingbot.strategy_v2.controllers.controller_base import ControllerBase, ControllerConfigBase @@ -19,7 +20,7 @@ class PMMisterConfig(ControllerConfigBase): controller_type: str = "generic" controller_name: str = "pmm_mister" connector_name: str = Field(default="binance") - trading_pair: str = Field(default="BTC-FDUSD") + trading_pair: str = Field(default="BTC-USDT") portfolio_allocation: Decimal = Field(default=Decimal("0.1"), json_schema_extra={"is_updatable": True}) target_base_pct: Decimal = Field(default=Decimal("0.5"), json_schema_extra={"is_updatable": True}) min_base_pct: Decimal = Field(default=Decimal("0.3"), json_schema_extra={"is_updatable": True}) @@ -194,99 +195,361 @@ def __init__(self, config: PMMisterConfig, *args, **kwargs): self.market_data_provider.initialize_rate_sources( [ConnectorPair(connector_name=config.connector_name, trading_pair=config.trading_pair)] ) - # Price history for visualization (last 60 price points) self.price_history = [] self.max_price_history = 60 - # Order history for visualization self.order_history = [] self.max_order_history = 20 - # Initialize processed_data to prevent access errors self.processed_data = {} + # ── Market data (called by framework) ───────────────────────────────── + + async def update_processed_data(self): + """Compute reference price and spread multiplier only. All executor analysis + is done in _compute_executor_analysis called from determine_executor_actions.""" + try: + reference_price = self.market_data_provider.get_price_by_type( + self.config.connector_name, self.config.trading_pair, PriceType.MidPrice + ) + if reference_price is None or reference_price <= 0: + self.logger().warning("Invalid reference price received, using previous price if available") + reference_price = self.processed_data.get("reference_price", Decimal("100")) + except Exception as e: + self.logger().warning(f"Error getting reference price: {e}, using previous price if available") + reference_price = self.processed_data.get("reference_price", Decimal("100")) + + current_time = self.market_data_provider.time() + + self.price_history.append({'timestamp': current_time, 'price': Decimal(reference_price)}) + if len(self.price_history) > self.max_price_history: + self.price_history.pop(0) + + if self.config.tick_mode: + spread_multiplier = (self.market_data_provider.get_trading_rules( + self.config.connector_name, self.config.trading_pair + ).min_price_increment / reference_price) + else: + spread_multiplier = Decimal("1") + + self.processed_data = { + "reference_price": Decimal(reference_price), + "spread_multiplier": spread_multiplier, + } + + # ── Executor actions (called by framework) ──────────────────────────── + def determine_executor_actions(self) -> List[ExecutorAction]: - """ - Determine actions based on the current state with advanced position management. - """ - actions = [] + self._update_position_state() + self._compute_executor_analysis() - # Create new executors + actions = [] actions.extend(self.create_actions_proposal()) - - # Stop executors (refresh and early stop) actions.extend(self.stop_actions_proposal()) - return actions - def should_effectivize_executor(self, executor_info, current_time: int) -> bool: - """Check if a hanging executor should be effectivized""" - level_id = executor_info.custom_info.get("level_id", "") - fill_time = executor_info.custom_info["open_order_last_update"] - if not level_id or not fill_time: - return False + # ── Single-pass executor analysis ───────────────────────────────────── - trade_type = self.get_trade_type_from_level_id(level_id) - effectivization_time = self.config.get_position_effectivization_time(trade_type) + def _compute_executor_analysis(self): + """Analyse every executor and level once per tick. Results are stored + in self.processed_data and consumed by create/stop proposals and status display.""" + current_time = self.market_data_provider.time() + reference_price = Decimal(str(self.processed_data.get("reference_price", 0))) + if reference_price <= 0: + return + + # -- 1. Group executors by level_id in a single pass ----------------- + executors_by_level: Dict[str, list] = defaultdict(list) + for e in self.executors_info: + level_id = e.custom_info.get("level_id") + if level_id: + executors_by_level[level_id].append(e) + + # All configured levels (may not have executors yet) + all_level_ids = set() + for i in range(len(self.config.buy_spreads)): + all_level_ids.add(f"buy_{i}") + for i in range(len(self.config.sell_spreads)): + all_level_ids.add(f"sell_{i}") + all_level_ids.update(executors_by_level.keys()) + + # -- 2. Per-level analysis + blocking conditions ---------------------- + levels_analysis: Dict[str, Dict] = {} + level_conditions: Dict[str, Dict] = {} + working_levels = set() - return current_time - fill_time >= effectivization_time + cooldown_status = { + "buy": {"active": False, "remaining_time": 0, "progress_pct": Decimal("0")}, + "sell": {"active": False, "remaining_time": 0, "progress_pct": Decimal("0")}, + } - def calculate_theoretical_price(self, level_id: str, reference_price: Decimal) -> Decimal: - """Calculate the theoretical price for a given level""" - trade_type = self.get_trade_type_from_level_id(level_id) - level = self.get_level_from_level_id(level_id) + current_pct = self.processed_data.get("current_base_pct", Decimal("0")) + breakeven_price = self.processed_data.get("breakeven_price") - if trade_type == TradeType.BUY: - spreads = self.config.buy_spreads - else: - spreads = self.config.sell_spreads + for level_id in all_level_ids: + executors = executors_by_level.get(level_id, []) + active = [e for e in executors if e.is_active] + active_not_trading = [e for e in active if not e.is_trading] + active_trading = [e for e in active if e.is_trading] - if level >= len(spreads): - return reference_price + open_order_updates = [ + e.custom_info.get("open_order_last_update") for e in executors + if e.custom_info.get("open_order_last_update") is not None + ] + latest_update = max(open_order_updates) if open_order_updates else None + prices = [Decimal(str(e.config.entry_price)) for e in active if hasattr(e.config, 'entry_price')] + + analysis = { + "active_not_trading": active_not_trading, + "active_trading": active_trading, + "total_active": len(active), + "open_order_last_update": latest_update, + "min_price": min(prices) if prices else None, + "max_price": max(prices) if prices else None, + } + levels_analysis[level_id] = analysis - spread_in_pct = Decimal(spreads[level]) * Decimal(self.processed_data.get("spread_multiplier", 1)) - side_multiplier = Decimal("-1") if trade_type == TradeType.BUY else Decimal("1") - theoretical_price = reference_price * (Decimal("1") + side_multiplier * spread_in_pct) + trade_type = self.get_trade_type_from_level_id(level_id) + is_buy = level_id.startswith("buy") + level = self.get_level_from_level_id(level_id) - return theoretical_price + blocking: List[str] = [] - def should_refresh_executor_by_distance(self, executor_info, reference_price: Decimal) -> bool: - """Check if executor should be refreshed due to price distance deviation""" - level_id = executor_info.custom_info.get("level_id", "") - if not level_id or not hasattr(executor_info.config, 'entry_price'): - return False + # a) Has open (not yet filled) executors + if active_not_trading: + blocking.append("active_not_trading") - current_order_price = executor_info.config.entry_price - theoretical_price = self.calculate_theoretical_price(level_id, reference_price) + # b) Max executor cap reached + if analysis["total_active"] >= self.config.max_active_executors_by_level: + blocking.append("max_active_executors") - # Calculate distance deviation percentage - if theoretical_price == 0: - return False + # c) Cooldown + if latest_update is not None: + cooldown_time = self.config.get_cooldown_time(trade_type) + time_since = current_time - latest_update + if time_since < cooldown_time: + blocking.append("cooldown") + # Track cooldown progress for display (keep the most recent) + side = "buy" if is_buy else "sell" + remaining = cooldown_time - time_since + progress = Decimal(str(time_since)) / Decimal(str(cooldown_time)) + if not cooldown_status[side]["active"] or remaining > cooldown_status[side]["remaining_time"]: + cooldown_status[side].update(active=True, remaining_time=remaining, progress_pct=progress) + + # d) Price distance violation + level_tolerance = self.config.get_price_distance_level_tolerance(level) + if is_buy and analysis["min_price"] is not None: + distance = (analysis["min_price"] - reference_price) / reference_price + if distance < level_tolerance: + blocking.append("price_distance") + elif not is_buy and analysis["max_price"] is not None: + distance = (reference_price - analysis["max_price"]) / reference_price + if distance < level_tolerance: + blocking.append("price_distance") - distance_deviation = abs(current_order_price - theoretical_price) / theoretical_price + # e) Position constraints + if current_pct < self.config.min_base_pct and not is_buy: + blocking.append("below_min_position") + elif current_pct > self.config.max_base_pct and is_buy: + blocking.append("above_max_position") - # Check if deviation exceeds level-specific refresh tolerance - level = self.get_level_from_level_id(level_id) - level_tolerance = self.config.get_refresh_level_tolerance(level) - return distance_deviation > level_tolerance + # f) Position profit protection + if (self.config.position_profit_protection and not is_buy + and breakeven_price and breakeven_price > 0 and reference_price < breakeven_price): + blocking.append("position_profit_protection") + + # Execution-blocking conditions determine "working" levels + execution_blocking = {"active_not_trading", "max_active_executors", "cooldown", "price_distance"} + if any(b in execution_blocking for b in blocking): + working_levels.add(level_id) + + level_conditions[level_id] = { + "trade_type": trade_type.name, + "can_execute": len(blocking) == 0, + "blocking_conditions": blocking, + "active_executors": len(active_not_trading), + "hanging_executors": len(active_trading), + } + + # -- 3. Levels to execute (position-aware) ---------------------------- + levels_to_execute = self._get_executable_levels(working_levels) + + # -- 4. Executors to refresh + refresh tracking ----------------------- + executors_to_refresh = [] + refresh_tracking = { + "refresh_candidates": [], "near_refresh": 0, + "refresh_ready": 0, "distance_violations": 0, + } + + for e in self.executors_info: + if not e.is_active or e.is_trading: + continue + + age = current_time - e.timestamp + time_based = age > self.config.executor_refresh_time + distance_based = reference_price > 0 and self.should_refresh_executor_by_distance(e, reference_price) + + if time_based or distance_based: + executors_to_refresh.append(e) + + # Tracking data for display + time_to_refresh = max(0, self.config.executor_refresh_time - age) + progress = min(Decimal("1"), Decimal(str(age)) / Decimal(str(self.config.executor_refresh_time))) + ready = time_based or distance_based + near = time_to_refresh <= self.config.executor_refresh_time * 0.2 + + distance_deviation_pct = Decimal("0") + e_level_id = e.custom_info.get("level_id", "") + if e_level_id and hasattr(e.config, 'entry_price') and reference_price > 0: + theoretical = self.calculate_theoretical_price(e_level_id, reference_price) + if theoretical > 0: + distance_deviation_pct = abs(e.config.entry_price - theoretical) / theoretical + + if ready: + refresh_tracking["refresh_ready"] += 1 + elif near: + refresh_tracking["near_refresh"] += 1 + if distance_based: + refresh_tracking["distance_violations"] += 1 + + e_level = self.get_level_from_level_id(e_level_id) if e_level_id else 0 + refresh_tracking["refresh_candidates"].append({ + "executor_id": e.id, + "level_id": e_level_id or "unknown", + "level": e_level, + "age": age, + "time_to_refresh": time_to_refresh, + "progress_pct": progress, + "ready": ready, + "ready_by_time": time_based, + "ready_by_distance": distance_based, + "distance_deviation_pct": distance_deviation_pct, + "distance_violation": distance_based, + "level_tolerance": self.config.get_refresh_level_tolerance(e_level), + "near_refresh": near, + }) + + # -- 5. Hanging executors to effectivize + tracking ------------------- + executors_to_effectivize = [] + effectivization_tracking = { + "hanging_executors": [], "total_hanging": 0, "ready_for_effectivization": 0, + } + + for e in self.executors_info: + if not (e.is_active and e.is_trading): + continue + + e_level_id = e.custom_info.get("level_id", "") + fill_time = e.custom_info.get("open_order_last_update") + if not e_level_id or fill_time is None: + continue + + trade_type = self.get_trade_type_from_level_id(e_level_id) + eff_time = self.config.get_position_effectivization_time(trade_type) + elapsed = current_time - fill_time + remaining = max(0, eff_time - elapsed) + progress = min(Decimal("1"), Decimal(str(elapsed)) / Decimal(str(eff_time))) + ready = remaining == 0 + + if ready: + executors_to_effectivize.append(e) + effectivization_tracking["ready_for_effectivization"] += 1 + + effectivization_tracking["total_hanging"] += 1 + effectivization_tracking["hanging_executors"].append({ + "level_id": e_level_id, + "trade_type": trade_type.name, + "time_elapsed": elapsed, + "remaining_time": remaining, + "progress_pct": progress, + "ready": ready, + "executor_id": e.id, + }) + + # -- 6. Executor statistics ------------------------------------------- + active_all = [e for e in self.executors_info if e.is_active] + total_trading = sum(1 for e in active_all if e.is_trading) + executor_stats = { + "total_active": len(active_all), + "total_trading": total_trading, + "total_not_trading": len(active_all) - total_trading, + } + + # -- Store everything ------------------------------------------------- + self.processed_data.update({ + "levels_analysis": levels_analysis, + "level_conditions": level_conditions, + "levels_to_execute": levels_to_execute, + "executors_to_refresh": executors_to_refresh, + "executors_to_effectivize": executors_to_effectivize, + "cooldown_status": cooldown_status, + "effectivization_tracking": effectivization_tracking, + "refresh_tracking": refresh_tracking, + "executor_stats": executor_stats, + "current_time": current_time, + }) + + # ── Position state ──────────────────────────────────────────────────── + + def _update_position_state(self): + """Recalculate position-derived fields (skews, deviation, breakeven) from positions_held.""" + reference_price = self.processed_data.get("reference_price") + if reference_price is None: + return + + position_held = next((p for p in self.positions_held if + p.trading_pair == self.config.trading_pair and + p.connector_name == self.config.connector_name), None) + + target_position = self.config.total_amount_quote * self.config.target_base_pct + + if position_held is not None: + current_base_pct = position_held.amount_quote / self.config.total_amount_quote + deviation = (target_position - position_held.amount_quote) / target_position + unrealized_pnl_pct = (position_held.unrealized_pnl_quote / position_held.amount_quote + if position_held.amount_quote != 0 else Decimal("0")) + breakeven_price = position_held.breakeven_price + position_amount = position_held.amount + else: + current_base_pct = Decimal("0") + deviation = Decimal("1") + unrealized_pnl_pct = Decimal("0") + breakeven_price = None + position_amount = Decimal("0") + + min_pct = self.config.min_base_pct + max_pct = self.config.max_base_pct + if max_pct > min_pct: + buy_skew = (max_pct - current_base_pct) / (max_pct - min_pct) + sell_skew = (current_base_pct - min_pct) / (max_pct - min_pct) + buy_skew = max(min(buy_skew, Decimal("1.0")), self.config.min_skew) + sell_skew = max(min(sell_skew, Decimal("1.0")), self.config.min_skew) + else: + buy_skew = sell_skew = Decimal("1.0") + + self.processed_data.update({ + "deviation": deviation, + "current_base_pct": current_base_pct, + "unrealized_pnl_pct": unrealized_pnl_pct, + "breakeven_price": breakeven_price, + "position_amount": position_amount, + "buy_skew": buy_skew, + "sell_skew": sell_skew, + }) + + # ── Create / stop proposals ─────────────────────────────────────────── def create_actions_proposal(self) -> List[ExecutorAction]: - """ - Create actions proposal with advanced position management logic. - """ create_actions = [] - # Get levels to execute with advanced logic - levels_to_execute = self.get_levels_to_execute() + levels_to_execute = self.processed_data.get("levels_to_execute", []) + if not levels_to_execute: + return create_actions - # Pre-calculate spreads and amounts buy_spreads, buy_amounts_quote = self.config.get_spreads_and_amounts_in_quote(TradeType.BUY) sell_spreads, sell_amounts_quote = self.config.get_spreads_and_amounts_in_quote(TradeType.SELL) reference_price = Decimal(self.processed_data["reference_price"]) - - # Use pre-calculated skew factors from processed_data buy_skew = self.processed_data["buy_skew"] sell_skew = self.processed_data["sell_skew"] - # Create executors for each level for level_id in levels_to_execute: trade_type = self.get_trade_type_from_level_id(level_id) level = self.get_level_from_level_id(level_id) @@ -298,10 +561,7 @@ def create_actions_proposal(self) -> List[ExecutorAction]: spread_in_pct = Decimal(sell_spreads[level]) * Decimal(self.processed_data["spread_multiplier"]) amount_quote = Decimal(sell_amounts_quote[level]) - # Apply skew to amount calculation skew = buy_skew if trade_type == TradeType.BUY else sell_skew - - # Calculate price and amount side_multiplier = Decimal("-1") if trade_type == TradeType.BUY else Decimal("1") price = reference_price * (Decimal("1") + side_multiplier * spread_in_pct) amount = self.market_data_provider.quantize_order_amount( @@ -322,7 +582,6 @@ def create_actions_proposal(self) -> List[ExecutorAction]: executor_config = self.get_executor_config(level_id, price, amount) if executor_config is not None: - # Track order creation for visualization self.order_history.append({ 'timestamp': self.market_data_provider.time(), 'price': price, @@ -340,210 +599,86 @@ def create_actions_proposal(self) -> List[ExecutorAction]: return create_actions - def get_levels_to_execute(self) -> List[str]: - """ - Get levels to execute with advanced hanging executor logic using the analyzer. - """ - current_time = self.market_data_provider.time() - - # Analyze all levels to understand executor states - all_levels_analysis = self.analyze_all_levels() - - # Get working levels (active or hanging with cooldown) - working_levels_ids = [] - - for analysis in all_levels_analysis: - level_id = analysis["level_id"] - trade_type = self.get_trade_type_from_level_id(level_id) - is_buy = level_id.startswith("buy") - current_price = Decimal(self.processed_data["reference_price"]) - - # Evaluate each condition separately for debugging - has_active_not_trading = len(analysis["active_executors_not_trading"]) > 0 - has_too_many_executors = analysis["total_active_executors"] >= self.config.max_active_executors_by_level + def stop_actions_proposal(self) -> List[ExecutorAction]: + stop_actions = [] - # Check cooldown condition - has_active_cooldown = False - if analysis["open_order_last_update"]: - cooldown_time = self.config.get_cooldown_time(trade_type) - has_active_cooldown = current_time - analysis["open_order_last_update"] < cooldown_time + for executor in self.processed_data.get("executors_to_refresh", []): + stop_actions.append(StopExecutorAction( + controller_id=self.config.id, + keep_position=True, + executor_id=executor.id + )) - # Enhanced price distance logic with level-specific tolerance - price_distance_violated = False - level = self.get_level_from_level_id(level_id) + for executor in self.processed_data.get("executors_to_effectivize", []): + stop_actions.append(StopExecutorAction( + controller_id=self.config.id, + keep_position=True, + executor_id=executor.id + )) - if is_buy and analysis["max_price"]: - # For buy orders, ensure they're not too close to current price - distance_from_current = (current_price - analysis["max_price"]) / current_price - level_tolerance = self.config.get_price_distance_level_tolerance(level) - if distance_from_current < level_tolerance: - price_distance_violated = True - elif not is_buy and analysis["min_price"]: - # For sell orders, ensure they're not too close to current price - distance_from_current = (analysis["min_price"] - current_price) / current_price - level_tolerance = self.config.get_price_distance_level_tolerance(level) - if distance_from_current < level_tolerance: - price_distance_violated = True - - # Level is working if any condition is true - if (has_active_not_trading or - has_too_many_executors or - has_active_cooldown or - price_distance_violated): - working_levels_ids.append(level_id) - continue - return self.get_not_active_levels_ids(working_levels_ids) - - def stop_actions_proposal(self) -> List[ExecutorAction]: - """ - Create stop actions with enhanced refresh logic. - """ - stop_actions = [] - stop_actions.extend(self.executors_to_refresh()) - stop_actions.extend(self.process_hanging_executors()) return stop_actions - def executors_to_refresh(self) -> List[ExecutorAction]: - """Refresh executors that have been active too long or deviated too far from theoretical price""" - current_time = self.market_data_provider.time() - reference_price = Decimal(self.processed_data.get("reference_price", Decimal("0"))) - - executors_to_refresh = self.filter_executors( - executors=self.executors_info, - filter_func=lambda x: ( - not x.is_trading and x.is_active and ( - # Time-based refresh condition - current_time - x.timestamp > self.config.executor_refresh_time or - # Distance-based refresh condition - (reference_price > 0 and self.should_refresh_executor_by_distance(x, reference_price)) - ) - ) - ) - return [StopExecutorAction( - controller_id=self.config.id, - keep_position=True, - executor_id=executor.id - ) for executor in executors_to_refresh] - - def process_hanging_executors(self) -> List[ExecutorAction]: - """Process hanging executors and effectivize them when appropriate""" - current_time = self.market_data_provider.time() - # Find hanging executors that should be effectivized (only is_trading) - executors_to_effectivize = self.filter_executors( - executors=self.executors_info, - filter_func=lambda x: x.is_trading and self.should_effectivize_executor(x, current_time) - ) - - # Create actions for effectivization (keep position) - effectivize_actions = [StopExecutorAction( - controller_id=self.config.id, - keep_position=True, - executor_id=executor.id - ) for executor in executors_to_effectivize] + # ── Helpers ─────────────────────────────────────────────────────────── - return effectivize_actions + def _get_executable_levels(self, working_levels: set) -> List[str]: + """Get levels that should be executed, applying position constraints.""" + buy_missing = [ + f"buy_{i}" for i in range(len(self.config.buy_spreads)) + if f"buy_{i}" not in working_levels + ] + sell_missing = [ + f"sell_{i}" for i in range(len(self.config.sell_spreads)) + if f"sell_{i}" not in working_levels + ] - async def update_processed_data(self): - """ - Update processed data with enhanced condition tracking and analysis. - """ - current_time = self.market_data_provider.time() + current_pct = self.processed_data.get("current_base_pct", Decimal("0")) - # Safely get reference price with fallback - try: - reference_price = self.market_data_provider.get_price_by_type( - self.config.connector_name, self.config.trading_pair, PriceType.MidPrice - ) - if reference_price is None or reference_price <= 0: - self.logger().warning("Invalid reference price received, using previous price if available") - reference_price = self.processed_data.get("reference_price", Decimal("100")) # Default fallback - except Exception as e: - self.logger().warning(f"Error getting reference price: {e}, using previous price if available") - reference_price = self.processed_data.get("reference_price", Decimal("100")) # Default fallback + if current_pct < self.config.min_base_pct: + return buy_missing + elif current_pct > self.config.max_base_pct: + return sell_missing - # Update price history for visualization - self.price_history.append({ - 'timestamp': current_time, - 'price': Decimal(reference_price) - }) - if len(self.price_history) > self.max_price_history: - self.price_history.pop(0) + if self.config.position_profit_protection: + breakeven_price = self.processed_data.get("breakeven_price") + reference_price = self.processed_data["reference_price"] + target_pct = self.config.target_base_pct - position_held = next((position for position in self.positions_held if - (position.trading_pair == self.config.trading_pair) & - (position.connector_name == self.config.connector_name)), None) + if breakeven_price is not None and breakeven_price > 0: + if current_pct < target_pct and reference_price < breakeven_price: + return buy_missing + elif current_pct > target_pct and reference_price > breakeven_price: + return sell_missing - target_position = self.config.total_amount_quote * self.config.target_base_pct + return buy_missing + sell_missing - if position_held is not None: - position_amount = position_held.amount - current_base_pct = position_held.amount_quote / self.config.total_amount_quote - deviation = (target_position - position_held.amount_quote) / target_position - unrealized_pnl_pct = ( - position_held.unrealized_pnl_quote / position_held.amount_quote - if position_held.amount_quote != 0 else Decimal("0") - ) - breakeven_price = position_held.breakeven_price - else: - position_amount = 0 - current_base_pct = 0 - deviation = 1 - unrealized_pnl_pct = 0 - breakeven_price = None + def calculate_theoretical_price(self, level_id: str, reference_price: Decimal) -> Decimal: + """Calculate the theoretical price for a given level""" + trade_type = self.get_trade_type_from_level_id(level_id) + level = self.get_level_from_level_id(level_id) - if self.config.tick_mode: - spread_multiplier = ( - self.market_data_provider.get_trading_rules( - self.config.connector_name, self.config.trading_pair - ).min_price_increment / reference_price - ) - else: - spread_multiplier = Decimal("1") + spreads = self.config.buy_spreads if trade_type == TradeType.BUY else self.config.sell_spreads + if level >= len(spreads): + return reference_price - # Calculate skew factors for position balancing - min_pct = self.config.min_base_pct - max_pct = self.config.max_base_pct + spread_in_pct = Decimal(spreads[level]) * Decimal(self.processed_data.get("spread_multiplier", 1)) + side_multiplier = Decimal("-1") if trade_type == TradeType.BUY else Decimal("1") + return reference_price * (Decimal("1") + side_multiplier * spread_in_pct) - if max_pct > min_pct: - # Calculate skew factors based on position deviation - buy_skew = (max_pct - current_base_pct) / (max_pct - min_pct) - sell_skew = (current_base_pct - min_pct) / (max_pct - min_pct) - # Apply minimum skew to prevent orders from becoming too small - buy_skew = max(min(buy_skew, Decimal("1.0")), self.config.min_skew) - sell_skew = max(min(sell_skew, Decimal("1.0")), self.config.min_skew) - else: - buy_skew = sell_skew = Decimal("1.0") + def should_refresh_executor_by_distance(self, executor_info, reference_price: Decimal) -> bool: + """Check if executor should be refreshed due to price distance deviation""" + level_id = executor_info.custom_info.get("level_id", "") + if not level_id or not hasattr(executor_info.config, 'entry_price'): + return False - # Enhanced condition tracking - only if we have valid data - cooldown_status = self._calculate_cooldown_status(current_time) - price_distance_analysis = self._calculate_price_distance_analysis(Decimal(reference_price)) - effectivization_tracking = self._calculate_effectivization_tracking(current_time) - level_conditions = self._analyze_level_conditions(current_time, Decimal(reference_price)) - executor_stats = self._calculate_executor_statistics(current_time) - refresh_tracking = self._calculate_refresh_tracking(current_time) + theoretical_price = self.calculate_theoretical_price(level_id, reference_price) + if theoretical_price == 0: + return False - self.processed_data = { - "reference_price": Decimal(reference_price), - "spread_multiplier": spread_multiplier, - "deviation": deviation, - "current_base_pct": current_base_pct, - "unrealized_pnl_pct": unrealized_pnl_pct, - "position_amount": position_amount, - "breakeven_price": breakeven_price, - "buy_skew": buy_skew, - "sell_skew": sell_skew, - # Enhanced tracking data - "cooldown_status": cooldown_status, - "price_distance_analysis": price_distance_analysis, - "effectivization_tracking": effectivization_tracking, - "level_conditions": level_conditions, - "executor_stats": executor_stats, - "refresh_tracking": refresh_tracking, - "current_time": current_time - } + distance_deviation = abs(executor_info.config.entry_price - theoretical_price) / theoretical_price + level = self.get_level_from_level_id(level_id) + return distance_deviation > self.config.get_refresh_level_tolerance(level) def get_executor_config(self, level_id: str, price: Decimal, amount: Decimal): - """Get executor config for a given level""" trade_type = self.get_trade_type_from_level_id(level_id) return PositionExecutorConfig( timestamp=self.market_data_provider.time(), @@ -558,7 +693,6 @@ def get_executor_config(self, level_id: str, price: Decimal, amount: Decimal): ) def get_level_id_from_side(self, trade_type: TradeType, level: int) -> str: - """Get level ID based on trade type and level""" return f"{trade_type.name.lower()}_{level}" def get_trade_type_from_level_id(self, level_id: str) -> TradeType: @@ -567,91 +701,17 @@ def get_trade_type_from_level_id(self, level_id: str) -> TradeType: def get_level_from_level_id(self, level_id: str) -> int: return int(level_id.split('_')[1]) - def get_not_active_levels_ids(self, active_levels_ids: List[str]) -> List[str]: - """Get levels that should be executed based on position constraints""" - buy_ids_missing = [ - self.get_level_id_from_side(TradeType.BUY, level) - for level in range(len(self.config.buy_spreads)) - if self.get_level_id_from_side(TradeType.BUY, level) not in active_levels_ids - ] - sell_ids_missing = [ - self.get_level_id_from_side(TradeType.SELL, level) - for level in range(len(self.config.sell_spreads)) - if self.get_level_id_from_side(TradeType.SELL, level) not in active_levels_ids - ] - - current_pct = self.processed_data["current_base_pct"] - - if current_pct < self.config.min_base_pct: - return buy_ids_missing - elif current_pct > self.config.max_base_pct: - return sell_ids_missing - - # Position profit protection: filter based on breakeven - if self.config.position_profit_protection: - breakeven_price = self.processed_data.get("breakeven_price") - reference_price = self.processed_data["reference_price"] - target_pct = self.config.target_base_pct - - if breakeven_price is not None and breakeven_price > 0: - if current_pct < target_pct and reference_price < breakeven_price: - return buy_ids_missing # Don't sell at a loss when underweight - elif current_pct > target_pct and reference_price > breakeven_price: - return sell_ids_missing # Don't buy more when overweight and in profit - - return buy_ids_missing + sell_ids_missing - - def analyze_all_levels(self) -> List[Dict]: - """Analyze executors for all levels.""" - level_ids: Set[str] = {e.custom_info.get("level_id") for e in self.executors_info if - "level_id" in e.custom_info} - return [self._analyze_by_level_id(level_id) for level_id in level_ids] - - def _analyze_by_level_id(self, level_id: str) -> Dict: - """Analyze executors for a specific level ID.""" - # Get active executors for level calculations - filtered_executors = [e for e in self.executors_info if - e.custom_info.get("level_id") == level_id and e.is_active] - - active_not_trading = [e for e in filtered_executors if e.is_active and not e.is_trading] - active_trading = [e for e in filtered_executors if e.is_active and e.is_trading] - - # For cooldown calculation, include both active and recently completed executors - all_level_executors = [e for e in self.executors_info if e.custom_info.get("level_id") == level_id] - open_order_last_updates = [ - e.custom_info.get("open_order_last_update") for e in all_level_executors - if "open_order_last_update" in e.custom_info and e.custom_info["open_order_last_update"] is not None - ] - latest_open_order_update = max(open_order_last_updates) if open_order_last_updates else None - - prices = [e.config.entry_price for e in filtered_executors if hasattr(e.config, 'entry_price')] - - return { - "level_id": level_id, - "active_executors_not_trading": active_not_trading, - "active_executors_trading": active_trading, - "total_active_executors": len(active_not_trading) + len(active_trading), - "open_order_last_update": latest_open_order_update, - "min_price": min(prices) if prices else None, - "max_price": max(prices) if prices else None, - } + # ── Status display ──────────────────────────────────────────────────── def to_format_status(self) -> List[str]: - """ - Comprehensive real-time trading conditions dashboard. - """ from decimal import Decimal from itertools import zip_longest status = [] - - # Layout dimensions - set early for error cases outer_width = 170 inner_width = outer_width - 4 - # Get all required data with safe fallbacks if not hasattr(self, 'processed_data') or not self.processed_data: - # Return minimal status if processed_data is not available status.append("╒" + "═" * inner_width + "╕") status.append(f"│ {'Initializing controller... please wait':<{inner_width}} │") status.append(f"╘{'═' * inner_width}╛") @@ -667,34 +727,30 @@ def to_format_status(self) -> List[str]: buy_skew = self.processed_data.get('buy_skew', Decimal("1.0")) sell_skew = self.processed_data.get('sell_skew', Decimal("1.0")) - # Enhanced condition data cooldown_status = self.processed_data.get('cooldown_status', {}) effectivization = self.processed_data.get('effectivization_tracking', {}) level_conditions = self.processed_data.get('level_conditions', {}) executor_stats = self.processed_data.get('executor_stats', {}) refresh_tracking = self.processed_data.get('refresh_tracking', {}) + levels_analysis = self.processed_data.get('levels_analysis', {}) - # Layout dimensions already set above - - # Smart column distribution for 5 columns - col1_width = 28 # Cooldowns - col2_width = 35 # Price distances - col3_width = 28 # Effectivization - col4_width = 25 # Refresh tracking - col5_width = inner_width - col1_width - col2_width - col3_width - col4_width - 4 # Execution status + col1_width = 28 + col2_width = 35 + col3_width = 28 + col4_width = 25 + col5_width = inner_width - col1_width - col2_width - col3_width - col4_width - 4 half_width = inner_width // 2 - 1 bar_width = inner_width - 25 - # Header with enhanced info + # Header status.append("╒" + "═" * inner_width + "╕") header_line = ( f"{self.config.connector_name}:{self.config.trading_pair} @ {current_price:.2f} " f"Alloc: {self.config.portfolio_allocation:.1%} " - f"Spread×{self.processed_data['spread_multiplier']:.3f} " - f"Dist: {self.config.price_distance_tolerance:.4%} " - f"Ref: {self.config.refresh_tolerance:.4%} (×{self.config.tolerance_scaling}) " + f"Spread×{self.processed_data.get('spread_multiplier', Decimal('1')):.3f} " + f"Dist: {self.config.price_distance_tolerance:.4%} Ref: {self.config.refresh_tolerance:.4%} (×{self.config.tolerance_scaling}) " f"Pos Protect: {'ON' if self.config.position_profit_protection else 'OFF'}" ) status.append(f"│ {header_line:<{inner_width}} │") @@ -705,13 +761,11 @@ def to_format_status(self) -> List[str]: status.append( f"├{'─' * col1_width}┬{'─' * col2_width}┬{'─' * col3_width}┬{'─' * col4_width}┬{'─' * col5_width}┤") status.append( - f"│ {'COOLDOWNS':<{col1_width}} │ {'PRICE DISTANCES':<{col2_width}} │ " - f"{'EFFECTIVIZATION':<{col3_width}} │ {'REFRESH TRACKING':<{col4_width}} │ " - f"{'EXECUTION':<{col5_width}} │") + f"│ {'COOLDOWNS':<{col1_width}} │ {'PRICE DISTANCES':<{col2_width}} │ {'EFFECTIVIZATION':<{col3_width}} │ " + f"{'REFRESH TRACKING':<{col4_width}} │ {'EXECUTION':<{col5_width}} │") status.append( f"├{'─' * col1_width}┼{'─' * col2_width}┼{'─' * col3_width}┼{'─' * col4_width}┼{'─' * col5_width}┤") - # Cooldown information buy_cooldown = cooldown_status.get('buy', {}) sell_cooldown = cooldown_status.get('sell', {}) @@ -722,27 +776,21 @@ def to_format_status(self) -> List[str]: "" ] - # Calculate actual distances for current levels + # Calculate actual distances from pre-computed levels_analysis current_buy_distance = "" current_sell_distance = "" - - all_levels_analysis = self.analyze_all_levels() - for analysis in all_levels_analysis: - level_id = analysis["level_id"] + for level_id, analysis in levels_analysis.items(): is_buy = level_id.startswith("buy") - - if is_buy and analysis["max_price"]: - distance = (current_price - analysis["max_price"]) / current_price - current_buy_distance = f"({distance:.3%})" - elif not is_buy and analysis["min_price"]: + if is_buy and analysis.get("min_price"): distance = (analysis["min_price"] - current_price) / current_price + current_buy_distance = f"({distance:.3%})" + elif not is_buy and analysis.get("max_price"): + distance = (current_price - analysis["max_price"]) / current_price current_sell_distance = f"({distance:.3%})" - # Enhanced price info with unified tolerance approach violation_marker = " ⚠️" if (current_buy_distance and "(0.0" in current_buy_distance) or ( - current_sell_distance and "(0.0" in current_sell_distance) else "" + current_sell_distance and "(0.0" in current_sell_distance) else "" - # Show level-specific tolerances dist_l0 = self.config.get_price_distance_level_tolerance(0) dist_l1 = self.config.get_price_distance_level_tolerance(1) if len(self.config.buy_spreads) > 1 else None @@ -753,7 +801,6 @@ def to_format_status(self) -> List[str]: f"SELL Current: {current_sell_distance}" ] - # Effectivization information total_hanging = effectivization.get('total_hanging', 0) ready_count = effectivization.get('ready_for_effectivization', 0) @@ -764,7 +811,6 @@ def to_format_status(self) -> List[str]: "" ] - # Refresh tracking information near_refresh = refresh_tracking.get('near_refresh', 0) refresh_ready = refresh_tracking.get('refresh_ready', 0) distance_violations = refresh_tracking.get('distance_violations', 0) @@ -776,11 +822,10 @@ def to_format_status(self) -> List[str]: f"Threshold: {self.config.executor_refresh_time}s" ] - # Execution status - can_execute_buy = len([level for level in level_conditions.values() if - level.get('trade_type') == 'BUY' and level.get('can_execute')]) - can_execute_sell = len([level for level in level_conditions.values() if - level.get('trade_type') == 'SELL' and level.get('can_execute')]) + can_execute_buy = len( + [lc for lc in level_conditions.values() if lc.get('trade_type') == 'BUY' and lc.get('can_execute')]) + can_execute_sell = len( + [lc for lc in level_conditions.values() if lc.get('trade_type') == 'SELL' and lc.get('can_execute')]) total_buy_levels = len(self.config.buy_spreads) total_sell_levels = len(self.config.sell_spreads) @@ -791,21 +836,18 @@ def to_format_status(self) -> List[str]: "" ] - # Display conditions in 5 columns for cool_line, price_line, effect_line, refresh_line, exec_line in zip_longest(cooldown_info, price_info, effect_info, refresh_info, execution_info, fillvalue=""): status.append( - f"│ {cool_line:<{col1_width}} │ {price_line:<{col2_width}} │ " - f"{effect_line:<{col3_width}} │ {refresh_line:<{col4_width}} │ " - f"{exec_line:<{col5_width}} │") + f"│ {cool_line:<{col1_width}} │ {price_line:<{col2_width}} │ {effect_line:<{col3_width}} │ { + refresh_line:<{col4_width}} │ {exec_line:<{col5_width}} │") # LEVEL-BY-LEVEL ANALYSIS status.append(f"├{'─' * inner_width}┤") status.append(f"│ {'📊 LEVEL-BY-LEVEL ANALYSIS':<{inner_width}} │") status.append(f"├{'─' * inner_width}┤") - # Show level conditions status.extend(self._format_level_conditions(level_conditions, inner_width)) # VISUAL PROGRESS INDICATORS @@ -813,15 +855,12 @@ def to_format_status(self) -> List[str]: status.append(f"│ {'🔄 VISUAL PROGRESS INDICATORS':<{inner_width}} │") status.append(f"├{'─' * inner_width}┤") - # Cooldown progress bars if buy_cooldown.get('active') or sell_cooldown.get('active'): status.extend(self._format_cooldown_bars(buy_cooldown, sell_cooldown, bar_width, inner_width)) - # Effectivization progress if total_hanging > 0: status.extend(self._format_effectivization_bars(effectivization, bar_width, inner_width)) - # Refresh progress bars if refresh_tracking.get('refresh_candidates', []): status.extend(self._format_refresh_bars(refresh_tracking, bar_width, inner_width)) @@ -830,7 +869,6 @@ def to_format_status(self) -> List[str]: status.append(f"│ {'📍 POSITION STATUS':<{half_width}} │ {'💰 PROFIT & LOSS':<{half_width}} │") status.append(f"├{'─' * half_width}┼{'─' * half_width}┤") - # Position data with enhanced skew info skew = base_pct - target_pct skew_pct = skew / target_pct if target_pct != 0 else Decimal('0') position_info = [ @@ -840,7 +878,6 @@ def to_format_status(self) -> List[str]: f"Buy Skew: {buy_skew:.2f} | Sell Skew: {sell_skew:.2f}" ] - # Enhanced PnL data breakeven_str = f"{breakeven:.2f}" if breakeven is not None else "N/A" pnl_sign = "+" if pnl >= 0 else "" distance_to_tp = self.config.global_take_profit - pnl if pnl < self.config.global_take_profit else Decimal('0') @@ -853,64 +890,29 @@ def to_format_status(self) -> List[str]: f"Breakeven: {breakeven_str}" ] - # Display position and PnL info for pos_line, pnl_line in zip_longest(position_info, pnl_info, fillvalue=""): status.append(f"│ {pos_line:<{half_width}} │ {pnl_line:<{half_width}} │") - # Position visualization with enhanced details status.append(f"├{'─' * inner_width}┤") status.extend( self._format_position_visualization(base_pct, target_pct, min_pct, max_pct, skew_pct, pnl, bar_width, inner_width)) - # Bottom border status.append(f"╘{'═' * inner_width}╛") return status - def _is_executor_too_far_from_price(self, executor_info, current_price: Decimal) -> bool: - """Check if hanging executor is too far from current price and should be stopped""" - if not hasattr(executor_info.config, 'entry_price'): - return False - - entry_price = executor_info.config.entry_price - level_id = executor_info.custom_info.get("level_id", "") - - if not level_id: - return False - - is_buy = level_id.startswith("buy") - - # Calculate price distance - if is_buy: - # For buy orders, stop if they're above current price (inverted) - if entry_price >= current_price: - return True - distance = (current_price - entry_price) / current_price - max_distance = Decimal("0.05") # 5% maximum distance - else: - # For sell orders, stop if they're below current price - if entry_price <= current_price: - return True - distance = (entry_price - current_price) / current_price - max_distance = Decimal("0.05") # 5% maximum distance - - return distance > max_distance + # ── Display formatting helpers ──────────────────────────────────────── def _format_cooldown_status(self, cooldown_data: Dict) -> str: - """Format cooldown status for display""" if not cooldown_data.get('active'): return "READY ✓" - remaining = cooldown_data.get('remaining_time', 0) progress = cooldown_data.get('progress_pct', Decimal('0')) return f"{remaining:.1f}s ({progress:.0%})" def _format_level_conditions(self, level_conditions: Dict, inner_width: int) -> List[str]: - """Format level-by-level conditions analysis""" lines = [] - - # Group by trade type buy_levels = {k: v for k, v in level_conditions.items() if v.get('trade_type') == 'BUY'} sell_levels = {k: v for k, v in level_conditions.items() if v.get('trade_type') == 'SELL'} @@ -918,7 +920,6 @@ def _format_level_conditions(self, level_conditions: Dict, inner_width: int) -> lines.append(f"│ {'No levels configured':<{inner_width}} │") return lines - # BUY levels analysis if buy_levels: lines.append(f"│ {'BUY LEVELS:':<{inner_width}} │") for level_id, conditions in sorted(buy_levels.items()): @@ -926,14 +927,11 @@ def _format_level_conditions(self, level_conditions: Dict, inner_width: int) -> blocking = ", ".join(conditions.get('blocking_conditions', [])) active = conditions.get('active_executors', 0) hanging = conditions.get('hanging_executors', 0) - level_line = f" {level_id}: {status_icon} Active:{active} Hanging:{hanging}" if blocking: level_line += f" | Blocked: {blocking}" - lines.append(f"│ {level_line:<{inner_width}} │") - # SELL levels analysis if sell_levels: lines.append(f"│ {'SELL LEVELS:':<{inner_width}} │") for level_id, conditions in sorted(sell_levels.items()): @@ -941,46 +939,37 @@ def _format_level_conditions(self, level_conditions: Dict, inner_width: int) -> blocking = ", ".join(conditions.get('blocking_conditions', [])) active = conditions.get('active_executors', 0) hanging = conditions.get('hanging_executors', 0) - level_line = f" {level_id}: {status_icon} Active:{active} Hanging:{hanging}" if blocking: level_line += f" | Blocked: {blocking}" - lines.append(f"│ {level_line:<{inner_width}} │") return lines - def _format_cooldown_bars( - self, buy_cooldown: Dict, sell_cooldown: Dict, bar_width: int, inner_width: int - ) -> List[str]: - """Format cooldown progress bars""" + def _format_cooldown_bars(self, buy_cooldown: Dict, sell_cooldown: Dict, + bar_width: int, inner_width: int) -> List[ + str]: lines = [] - if buy_cooldown.get('active'): progress = float(buy_cooldown.get('progress_pct', 0)) remaining = buy_cooldown.get('remaining_time', 0) - bar = self._create_progress_bar(progress, bar_width // 2) # Same size as other bars + bar = self._create_progress_bar(progress, bar_width // 2) lines.append(f"│ BUY Cooldown: [{bar}] {remaining:.1f}s remaining │") - if sell_cooldown.get('active'): progress = float(sell_cooldown.get('progress_pct', 0)) remaining = sell_cooldown.get('remaining_time', 0) - bar = self._create_progress_bar(progress, bar_width // 2) # Same size as other bars + bar = self._create_progress_bar(progress, bar_width // 2) lines.append(f"│ SELL Cooldown: [{bar}] {remaining:.1f}s remaining │") - return lines def _format_effectivization_bars(self, effectivization: Dict, bar_width: int, inner_width: int) -> List[str]: - """Format effectivization progress bars""" lines = [] - hanging_executors = effectivization.get('hanging_executors', []) if not hanging_executors: return lines lines.append(f"│ {'EFFECTIVIZATION PROGRESS:':<{inner_width}} │") - # Show up to 5 hanging executors with progress for executor in hanging_executors[:5]: level_id = executor.get('level_id', 'unknown') trade_type = executor.get('trade_type', 'UNKNOWN') @@ -989,23 +978,63 @@ def _format_effectivization_bars(self, effectivization: Dict, bar_width: int, in ready = executor.get('ready', False) bar = self._create_progress_bar(progress, bar_width // 2) - status = "READY!" if ready else f"{remaining}s" - icon = "🔄" if not ready else "✓" - - lines.append(f"│ {icon} {level_id} ({trade_type}): [{bar}] {status:<10} │") + eff_status = "READY!" if ready else f"{remaining}s" + icon = "✓" if ready else "🔄" + lines.append(f"│ {icon} {level_id} ({trade_type}): [{bar}] {eff_status:<10} │") if len(hanging_executors) > 5: lines.append(f"│ {'... and ' + str(len(hanging_executors) - 5) + ' more':<{inner_width}} │") return lines + def _format_refresh_bars(self, refresh_tracking: Dict, bar_width: int, inner_width: int) -> List[str]: + lines = [] + refresh_candidates = refresh_tracking.get('refresh_candidates', []) + if not refresh_candidates: + return lines + + lines.append(f"│ {'REFRESH PROGRESS:':<{inner_width}} │") + + for candidate in refresh_candidates[:5]: + level_id = candidate.get('level_id', 'unknown') + time_to_refresh = candidate.get('time_to_refresh', 0) + progress = float(candidate.get('progress_pct', 0)) + ready = candidate.get('ready', False) + ready_by_distance = candidate.get('ready_by_distance', False) + distance_deviation_pct = candidate.get('distance_deviation_pct', Decimal('0')) + near_refresh = candidate.get('near_refresh', False) + + bar = self._create_progress_bar(progress, bar_width // 2) + + if ready: + if ready_by_distance: + ref_status = f"DISTANCE! ({distance_deviation_pct:.1%})" + icon = "⚠️" + else: + ref_status = "TIME REFRESH!" + icon = "🔄" + elif near_refresh: + ref_status = f"{time_to_refresh}s (Soon)" + icon = "⏰" + else: + if distance_deviation_pct > 0: + ref_status = f"{time_to_refresh}s ({distance_deviation_pct:.1%})" + else: + ref_status = f"{time_to_refresh}s" + icon = "⏳" + + lines.append(f"│ {icon} {level_id}: [{bar}] {ref_status:<15} │") + + if len(refresh_candidates) > 5: + lines.append(f"│ {'... and ' + str(len(refresh_candidates) - 5) + ' more':<{inner_width}} │") + + return lines + def _format_position_visualization(self, base_pct: Decimal, target_pct: Decimal, min_pct: Decimal, max_pct: Decimal, skew_pct: Decimal, pnl: Decimal, bar_width: int, inner_width: int) -> List[str]: - """Format enhanced position visualization""" lines = [] - # Position bar filled_width = int(float(base_pct) * bar_width) min_pos = int(float(min_pct) * bar_width) max_pos = int(float(max_pct) * bar_width) @@ -1014,21 +1043,20 @@ def _format_position_visualization(self, base_pct: Decimal, target_pct: Decimal, position_bar = "" for i in range(bar_width): if i == filled_width: - position_bar += "◆" # Current position marker + position_bar += "◆" elif i == target_pos: - position_bar += "┇" # Target line + position_bar += "┇" elif i == min_pos: - position_bar += "┃" # Min threshold + position_bar += "┃" elif i == max_pos: - position_bar += "┃" # Max threshold + position_bar += "┃" elif i < filled_width: - position_bar += "█" # Filled area + position_bar += "█" else: - position_bar += "░" # Empty area + position_bar += "░" lines.append(f"│ Position: [{position_bar}] {base_pct:.2%} │") - # Skew visualization center = bar_width // 2 skew_pos = center + int(float(skew_pct) * center) skew_pos = max(0, min(bar_width - 1, skew_pos)) @@ -1036,16 +1064,15 @@ def _format_position_visualization(self, base_pct: Decimal, target_pct: Decimal, skew_bar = "" for i in range(bar_width): if i == center: - skew_bar += "┃" # Center line (neutral) + skew_bar += "┃" elif i == skew_pos: - skew_bar += "⬤" # Current skew position + skew_bar += "⬤" else: skew_bar += "─" skew_direction = "BULLISH" if skew_pct > 0 else "BEARISH" if skew_pct < 0 else "NEUTRAL" lines.append(f"│ Skew: [{skew_bar}] {skew_direction} │") - # PnL visualization with dynamic scaling max_range = max(abs(self.config.global_take_profit), abs(self.config.global_stop_loss), abs(pnl)) * Decimal( "1.2") if max_range > 0: @@ -1061,16 +1088,16 @@ def _format_position_visualization(self, base_pct: Decimal, target_pct: Decimal, pnl_bar = "" for i in range(bar_width): if i == center: - pnl_bar += "│" # Zero line + pnl_bar += "│" elif i == pnl_pos: - pnl_bar += "⬤" # Current PnL + pnl_bar += "⬤" elif i == take_profit_pos: - pnl_bar += "T" # Take profit target + pnl_bar += "T" elif i == stop_loss_pos: - pnl_bar += "S" # Stop loss target + pnl_bar += "S" elif ((pnl >= 0 and center <= i < pnl_pos) or (pnl < 0 and pnl_pos < i <= center)): - pnl_bar += "█" if pnl >= 0 else "▓" # Fill to current PnL + pnl_bar += "█" if pnl >= 0 else "▓" else: pnl_bar += "─" else: @@ -1082,474 +1109,89 @@ def _format_position_visualization(self, base_pct: Decimal, target_pct: Decimal, return lines def _create_progress_bar(self, progress: float, width: int) -> str: - """Create a progress bar string""" - progress = max(0, min(1, progress)) # Clamp between 0 and 1 + progress = max(0, min(1, progress)) filled = int(progress * width) - bar = "" for i in range(width): if i < filled: - bar += "█" # Filled + bar += "█" elif i == filled and filled < width: - bar += "▌" # Partial fill + bar += "▌" else: - bar += "░" # Empty - + bar += "░" return bar - def _calculate_cooldown_status(self, current_time: int) -> Dict: - """Calculate cooldown status for buy and sell sides""" - cooldown_status = { - "buy": {"active": False, "remaining_time": 0, "progress_pct": Decimal("0")}, - "sell": {"active": False, "remaining_time": 0, "progress_pct": Decimal("0")} - } - - # Get latest order timestamps for each trade type - buy_executors = [e for e in self.executors_info if e.custom_info.get("level_id", "").startswith("buy")] - sell_executors = [e for e in self.executors_info if e.custom_info.get("level_id", "").startswith("sell")] - - for trade_type, executors in [("buy", buy_executors), ("sell", sell_executors)]: - if not executors: - continue - - # Find most recent open order update - latest_updates = [ - e.custom_info.get("open_order_last_update") for e in executors - if "open_order_last_update" in e.custom_info and e.custom_info["open_order_last_update"] is not None - ] - - if not latest_updates: - continue - - latest_update = max(latest_updates) - cooldown_time = (self.config.buy_cooldown_time if trade_type == "buy" - else self.config.sell_cooldown_time) - - time_since_update = current_time - latest_update - remaining_time = max(0, cooldown_time - time_since_update) - - if remaining_time > 0: - cooldown_status[trade_type]["active"] = True - cooldown_status[trade_type]["remaining_time"] = remaining_time - cooldown_status[trade_type]["progress_pct"] = Decimal(str(time_since_update)) / Decimal( - str(cooldown_time)) - else: - cooldown_status[trade_type]["progress_pct"] = Decimal("1") - - return cooldown_status - - def _calculate_price_distance_analysis(self, reference_price: Decimal) -> Dict: - """Analyze price distance conditions for all levels with unified tolerance approach""" - price_analysis = { - "buy": {"violations": [], "distances": [], "base_tolerance": self.config.price_distance_tolerance}, - "sell": {"violations": [], "distances": [], "base_tolerance": self.config.price_distance_tolerance} - } - - # Analyze all levels for price distance violations - all_levels_analysis = self.analyze_all_levels() - - for analysis in all_levels_analysis: - level_id = analysis["level_id"] - is_buy = level_id.startswith("buy") - level = self.get_level_from_level_id(level_id) - - if is_buy and analysis["max_price"]: - current_distance = (reference_price - analysis["max_price"]) / reference_price - level_tolerance = self.config.get_price_distance_level_tolerance(level) - - price_analysis["buy"]["distances"].append({ - "level_id": level_id, - "level": level, - "current_distance": current_distance, - "distance_pct": current_distance, - "tolerance": level_tolerance, - "violates": current_distance < level_tolerance - }) - - if current_distance < level_tolerance: - price_analysis["buy"]["violations"].append(level_id) - - elif not is_buy and analysis["min_price"]: - current_distance = (analysis["min_price"] - reference_price) / reference_price - level_tolerance = self.config.get_price_distance_level_tolerance(level) - - price_analysis["sell"]["distances"].append({ - "level_id": level_id, - "level": level, - "current_distance": current_distance, - "distance_pct": current_distance, - "tolerance": level_tolerance, - "violates": current_distance < level_tolerance - }) - - if current_distance < level_tolerance: - price_analysis["sell"]["violations"].append(level_id) - - return price_analysis - - def _calculate_effectivization_tracking(self, current_time: int) -> Dict: - """Track hanging executor effectivization progress""" - effectivization_data = { - "hanging_executors": [], - "total_hanging": 0, - "ready_for_effectivization": 0 - } - - hanging_executors = [e for e in self.executors_info if e.is_active and e.is_trading] - effectivization_data["total_hanging"] = len(hanging_executors) - - for executor in hanging_executors: - level_id = executor.custom_info.get("level_id", "") - if not level_id: - continue - - trade_type = self.get_trade_type_from_level_id(level_id) - effectivization_time = self.config.get_position_effectivization_time(trade_type) - fill_time = executor.custom_info.get("open_order_last_update", current_time) - - time_elapsed = current_time - fill_time - remaining_time = max(0, effectivization_time - time_elapsed) - progress_pct = min(Decimal("1"), Decimal(str(time_elapsed)) / Decimal(str(effectivization_time))) - - ready = remaining_time == 0 - if ready: - effectivization_data["ready_for_effectivization"] += 1 - - effectivization_data["hanging_executors"].append({ - "level_id": level_id, - "trade_type": trade_type.name, - "time_elapsed": time_elapsed, - "remaining_time": remaining_time, - "progress_pct": progress_pct, - "ready": ready, - "executor_id": executor.id - }) - - return effectivization_data - - def _analyze_level_conditions(self, current_time: int, reference_price: Decimal) -> Dict: - """Analyze conditions preventing each level from executing""" - level_conditions = {} - - # Get all possible levels - all_buy_levels = [self.get_level_id_from_side(TradeType.BUY, i) for i in range(len(self.config.buy_spreads))] - all_sell_levels = [self.get_level_id_from_side(TradeType.SELL, i) for i in range(len(self.config.sell_spreads))] - all_levels = all_buy_levels + all_sell_levels - - # Cache level analysis to avoid redundant calculations - level_analysis_cache = {} - for level_id in all_levels: - level_analysis_cache[level_id] = self._analyze_by_level_id(level_id) - - # Pre-calculate position constraints with safe defaults - if hasattr(self, 'processed_data') and self.processed_data: - current_pct = self.processed_data.get("current_base_pct", Decimal("0")) - breakeven_price = self.processed_data.get("breakeven_price") - else: - current_pct = Decimal("0") - breakeven_price = None - - below_min_position = current_pct < self.config.min_base_pct - above_max_position = current_pct > self.config.max_base_pct - - # Analyze each level - for level_id in all_levels: - trade_type = self.get_trade_type_from_level_id(level_id) - is_buy = level_id.startswith("buy") - - conditions = { - "level_id": level_id, - "trade_type": trade_type.name, - "can_execute": True, - "blocking_conditions": [], - "active_executors": 0, - "hanging_executors": 0 - } - - # Get cached level analysis - level_analysis = level_analysis_cache[level_id] - - # Check various blocking conditions - # 1. Active executor limit - if level_analysis["total_active_executors"] >= self.config.max_active_executors_by_level: - conditions["blocking_conditions"].append("max_active_executors_reached") - conditions["can_execute"] = False - - # 2. Cooldown check - cooldown_time = self.config.get_cooldown_time(trade_type) - if level_analysis["open_order_last_update"]: - time_since_update = current_time - level_analysis["open_order_last_update"] - if time_since_update < cooldown_time: - conditions["blocking_conditions"].append("cooldown_active") - conditions["can_execute"] = False - - # 3. Price distance check with level-specific tolerance - level = self.get_level_from_level_id(level_id) - if is_buy and level_analysis["max_price"]: - distance = (reference_price - level_analysis["max_price"]) / reference_price - level_tolerance = self.config.get_price_distance_level_tolerance(level) - if distance < level_tolerance: - conditions["blocking_conditions"].append("price_distance_violation") - conditions["can_execute"] = False - elif not is_buy and level_analysis["min_price"]: - distance = (level_analysis["min_price"] - reference_price) / reference_price - level_tolerance = self.config.get_price_distance_level_tolerance(level) - if distance < level_tolerance: - conditions["blocking_conditions"].append("price_distance_violation") - conditions["can_execute"] = False - - # 4. Position constraints - if below_min_position and not is_buy: - conditions["blocking_conditions"].append("below_min_position") - conditions["can_execute"] = False - elif above_max_position and is_buy: - conditions["blocking_conditions"].append("above_max_position") - conditions["can_execute"] = False - - # 5. Position profit protection - if (self.config.position_profit_protection and not is_buy and - breakeven_price and breakeven_price > 0 and reference_price < breakeven_price): - conditions["blocking_conditions"].append("position_profit_protection") - conditions["can_execute"] = False - - conditions["active_executors"] = len(level_analysis["active_executors_not_trading"]) - conditions["hanging_executors"] = len(level_analysis["active_executors_trading"]) - - level_conditions[level_id] = conditions - - return level_conditions - - def _calculate_executor_statistics(self, current_time: int) -> Dict: - """Calculate performance statistics for executors""" - stats = { - "total_active": len([e for e in self.executors_info if e.is_active]), - "total_trading": len([e for e in self.executors_info if e.is_active and e.is_trading]), - "total_not_trading": len([e for e in self.executors_info if e.is_active and not e.is_trading]), - "avg_executor_age": Decimal("0"), - "oldest_executor_age": 0, - "refresh_candidates": 0 - } - - active_executors = [e for e in self.executors_info if e.is_active] - - if active_executors: - ages = [current_time - e.timestamp for e in active_executors] - stats["avg_executor_age"] = Decimal(str(sum(ages))) / Decimal(str(len(ages))) - stats["oldest_executor_age"] = max(ages) - - # Count refresh candidates - stats["refresh_candidates"] = len([ - e for e in active_executors - if not e.is_trading and current_time - e.timestamp > self.config.executor_refresh_time - ]) - - return stats - - def _calculate_refresh_tracking(self, current_time: int) -> Dict: - """Track executor refresh progress including distance-based refresh conditions""" - refresh_data = { - "refresh_candidates": [], - "near_refresh": 0, - "refresh_ready": 0, - "distance_violations": 0 - } - - # Get active non-trading executors - active_not_trading = [e for e in self.executors_info if e.is_active and not e.is_trading] - reference_price = Decimal(self.processed_data.get("reference_price", Decimal("0"))) - - for executor in active_not_trading: - age = current_time - executor.timestamp - time_to_refresh = max(0, self.config.executor_refresh_time - age) - progress_pct = min(Decimal("1"), Decimal(str(age)) / Decimal(str(self.config.executor_refresh_time))) - - # Check distance-based refresh condition - distance_violation = (reference_price > 0 and - self.should_refresh_executor_by_distance(executor, reference_price)) - # Calculate distance deviation for display - distance_deviation_pct = Decimal("0") - if reference_price > 0: - level_id = executor.custom_info.get("level_id", "") - if level_id and hasattr(executor.config, 'entry_price'): - theoretical_price = self.calculate_theoretical_price(level_id, reference_price) - if theoretical_price > 0: - distance_deviation_pct = abs( - executor.config.entry_price - theoretical_price) / theoretical_price - - ready_by_time = time_to_refresh == 0 - ready_by_distance = distance_violation - ready = ready_by_time or ready_by_distance - near_refresh = time_to_refresh <= (self.config.executor_refresh_time * 0.2) # Within 20% of refresh time - - if ready: - refresh_data["refresh_ready"] += 1 - elif near_refresh: - refresh_data["near_refresh"] += 1 - - if distance_violation: - refresh_data["distance_violations"] += 1 - - level_id = executor.custom_info.get("level_id", "unknown") - level = self.get_level_from_level_id(level_id) if level_id != "unknown" else 0 - - # Get level-specific refresh tolerance for display - level_tolerance = self.config.get_refresh_level_tolerance( - level) if level_id != "unknown" else self.config.refresh_tolerance - - refresh_data["refresh_candidates"].append({ - "executor_id": executor.id, - "level_id": level_id, - "level": level, - "age": age, - "time_to_refresh": time_to_refresh, - "progress_pct": progress_pct, - "ready": ready, - "ready_by_time": ready_by_time, - "ready_by_distance": ready_by_distance, - "distance_deviation_pct": distance_deviation_pct, - "distance_violation": distance_violation, - "level_tolerance": level_tolerance, - "near_refresh": near_refresh - }) - - return refresh_data - - def _format_refresh_bars(self, refresh_tracking: Dict, bar_width: int, inner_width: int) -> List[str]: - """Format refresh progress bars""" - lines = [] - - refresh_candidates = refresh_tracking.get('refresh_candidates', []) - if not refresh_candidates: - return lines - - lines.append(f"│ {'REFRESH PROGRESS:':<{inner_width}} │") - - # Show up to 5 executors approaching refresh - for candidate in refresh_candidates[:5]: - level_id = candidate.get('level_id', 'unknown') - time_to_refresh = candidate.get('time_to_refresh', 0) - progress = float(candidate.get('progress_pct', 0)) - ready = candidate.get('ready', False) - ready_by_distance = candidate.get('ready_by_distance', False) - distance_deviation_pct = candidate.get('distance_deviation_pct', Decimal('0')) - near_refresh = candidate.get('near_refresh', False) - - bar = self._create_progress_bar(progress, bar_width // 2) - - if ready: - if ready_by_distance: - status = f"DISTANCE! ({distance_deviation_pct:.1%})" - icon = "⚠️" - else: - status = "TIME REFRESH!" - icon = "🔄" - elif near_refresh: - status = f"{time_to_refresh}s (Soon)" - icon = "⏰" - else: - if distance_deviation_pct > 0: - status = f"{time_to_refresh}s ({distance_deviation_pct:.1%})" - else: - status = f"{time_to_refresh}s" - icon = "⏳" - - lines.append(f"│ {icon} {level_id}: [{bar}] {status:<15} │") - - if len(refresh_candidates) > 5: - lines.append(f"│ {'... and ' + str(len(refresh_candidates) - 5) + ' more':<{inner_width}} │") - - return lines - - def _format_price_graph( - self, current_price: Decimal, breakeven_price: Optional[Decimal], inner_width: int - ) -> List[str]: - """Format price graph with order zones and history""" + def _format_price_graph(self, current_price: Decimal, breakeven_price: Optional[Decimal], + inner_width: int) -> List[str]: lines = [] if len(self.price_history) < 10: lines.append(f"│ {'Collecting price data...':<{inner_width}} │") return lines - # Get last 30 price points for the graph recent_prices = [p['price'] for p in self.price_history[-30:]] min_price = min(recent_prices) max_price = max(recent_prices) - # Calculate price range with some padding price_range = max_price - min_price if price_range == 0: - price_range = current_price * Decimal('0.01') # 1% range if no movement + price_range = current_price * Decimal('0.01') - padding = price_range * Decimal('0.1') # 10% padding + padding = price_range * Decimal('0.1') graph_min = min_price - padding graph_max = max_price + padding graph_range = graph_max - graph_min - # Calculate order zones using level 0 price distance tolerance level_0_tolerance = self.config.get_price_distance_level_tolerance(0) buy_distance = current_price * level_0_tolerance sell_distance = current_price * level_0_tolerance buy_zone_price = current_price - buy_distance sell_zone_price = current_price + sell_distance - # Graph dimensions - graph_width = inner_width - 20 # Leave space for price labels and borders + graph_width = inner_width - 20 graph_height = 8 - # Create the graph graph_lines = [] for row in range(graph_height): - # Calculate price level for this row (top to bottom) price_level = graph_max - (Decimal(row) / Decimal(graph_height - 1)) * graph_range line = "" - - # Price label (left side) price_label = f"{float(price_level):6.2f}" line += price_label + " ┼" - # Graph data for col in range(graph_width): - # Calculate which price point this column represents col_index = int((col / graph_width) * len(recent_prices)) if col_index >= len(recent_prices): col_index = len(recent_prices) - 1 price_at_col = recent_prices[col_index] + char = "─" - # Determine what to show at this position - char = "─" # Default horizontal line - - # Check if current price line crosses this position if abs(float(price_at_col - price_level)) < float(graph_range) / (graph_height * 2): if price_at_col == current_price: - char = "●" # Current price marker + char = "●" else: - char = "·" # Price history point + char = "·" - # Mark breakeven line if breakeven_price and abs(float(breakeven_price - price_level)) < float(graph_range) / ( graph_height * 2): - char = "=" # Breakeven line + char = "=" - # Mark order zones if abs(float(buy_zone_price - price_level)) < float(graph_range) / (graph_height * 4): - char = "B" # Buy zone boundary + char = "B" elif abs(float(sell_zone_price - price_level)) < float(graph_range) / (graph_height * 4): - char = "S" # Sell zone boundary + char = "S" - # Mark recent orders - for order in self.order_history[-10:]: # Last 10 orders + for order in self.order_history[-10:]: order_price = order['price'] if abs(float(order_price - price_level)) < float(graph_range) / (graph_height * 3): if order['side'] == 'BUY': - char = "b" # Buy order + char = "b" else: - char = "s" # Sell order + char = "s" break line += char - # Add right border and annotations annotation = "" if abs(float(current_price - price_level)) < float(graph_range) / (graph_height * 2): annotation = " ← Current" @@ -1564,21 +1206,18 @@ def _format_price_graph( line += annotation graph_lines.append(line) - # Format graph lines with proper padding for graph_line in graph_lines: lines.append(f"│ {graph_line:<{inner_width}} │") - # Add legend lines.append( f"│ {'Legend: ● Current price = Breakeven B/S Zone boundaries b/s Recent orders':<{inner_width}} │") - # Add current metrics dist_l0 = self.config.get_price_distance_level_tolerance(0) ref_l0 = self.config.get_refresh_level_tolerance(0) metrics_line = f"Dist: L0 {dist_l0:.4%} | Refresh: L0 {ref_l0:.4%} | Scaling: ×{self.config.tolerance_scaling}" if breakeven_price: distance_to_breakeven = ( - (current_price - breakeven_price) / current_price) if breakeven_price > 0 else Decimal(0) + (current_price - breakeven_price) / current_price) if breakeven_price > 0 else Decimal(0) metrics_line += f" | Breakeven gap: {distance_to_breakeven:+.2%}" lines.append(f"│ {metrics_line:<{inner_width}} │") From f95a1261fb83feba51a9b79dda5f23824452d2f4 Mon Sep 17 00:00:00 2001 From: cardosofede Date: Mon, 20 Apr 2026 11:51:57 -0300 Subject: [PATCH 5/5] (feat) remove gateway proxy --- main.py | 2 - routers/gateway_proxy.py | 169 --------------------------------------- 2 files changed, 171 deletions(-) delete mode 100644 routers/gateway_proxy.py diff --git a/main.py b/main.py index 4aed40e1..2a94c47f 100644 --- a/main.py +++ b/main.py @@ -51,7 +51,6 @@ def patched_save_to_yml(yml_path, cm): executors, gateway, gateway_clmm, - gateway_proxy, gateway_swap, market_data, portfolio, @@ -384,7 +383,6 @@ def auth_user( app.include_router(archived_bots.router, dependencies=[Depends(auth_user)]) app.include_router(executors.router, dependencies=[Depends(auth_user)]) -app.include_router(gateway_proxy.router, dependencies=[Depends(auth_user)]) # WebSocket router (handles its own auth) app.include_router(websocket.router) diff --git a/routers/gateway_proxy.py b/routers/gateway_proxy.py deleted file mode 100644 index a6a0b596..00000000 --- a/routers/gateway_proxy.py +++ /dev/null @@ -1,169 +0,0 @@ -""" -Gateway Proxy Router - -Catch-all router that forwards requests to Gateway server unchanged. -Dashboard calls /gateway-proxy/* and this router forwards to Gateway at localhost:15888/*. - -This allows the dashboard to access all Gateway endpoints through the API without -needing each endpoint to be explicitly defined. - -Examples: - GET /gateway-proxy/wallet -> GET localhost:15888/wallet - POST /gateway-proxy/wallet/add -> POST localhost:15888/wallet/add - GET /gateway-proxy/config -> GET localhost:15888/config - GET /gateway-proxy/trading/clmm/positions-owned -> GET localhost:15888/trading/clmm/positions-owned -""" - -import json -import logging - -import aiohttp -from fastapi import APIRouter, Depends, HTTPException, Request, Response -from fastapi.responses import JSONResponse - -from deps import get_accounts_service -from services.accounts_service import AccountsService - -logger = logging.getLogger(__name__) - -router = APIRouter(tags=["Gateway Proxy"], prefix="/gateway-proxy") - - -async def _forward_to_gateway( - path: str, - request: Request, - accounts_service: AccountsService -): - """Internal handler that forwards requests to Gateway.""" - gateway_client = accounts_service.gateway_client - gateway_url = gateway_client.base_url - - # Build target URL - target_url = f"{gateway_url}/{path}" - - # Get query parameters - query_params = dict(request.query_params) - - # Get request body if present - body = None - if request.method in ["POST", "PUT", "PATCH", "DELETE"]: - try: - body = await request.json() - except Exception: - # No JSON body or invalid JSON - that's OK for some requests - body = None - - try: - # Get or create aiohttp session - session = await gateway_client._get_session() - - # Forward the request - async with session.request( - method=request.method, - url=target_url, - params=query_params if query_params else None, - json=body if body else None, - ) as response: - # Read response body - response_body = await response.read() - - # Try to parse as JSON, otherwise return as-is - content_type = response.headers.get("Content-Type", "") - - if "application/json" in content_type: - try: - json_body = json.loads(response_body) - return JSONResponse( - content=json_body, - status_code=response.status, - ) - except Exception: - pass - - # Return raw response - return Response( - content=response_body, - status_code=response.status, - media_type=content_type or "application/octet-stream", - ) - - except aiohttp.ClientError as e: - logger.error(f"Gateway proxy error: {e}") - raise HTTPException( - status_code=503, - detail=f"Gateway service unavailable: {str(e)}" - ) - except Exception as e: - logger.error(f"Gateway proxy error: {e}") - raise HTTPException( - status_code=500, - detail=f"Gateway proxy error: {str(e)}" - ) - - -@router.get("/{path:path}", operation_id="gateway_proxy_get") -async def gateway_proxy_get( - path: str, - request: Request, - accounts_service: AccountsService = Depends(get_accounts_service) -): - """GET request to Gateway. Example: GET /gateway-proxy/wallet""" - return await _forward_to_gateway(path, request, accounts_service) - - -@router.post("/{path:path}", operation_id="gateway_proxy_post") -async def gateway_proxy_post( - path: str, - request: Request, - accounts_service: AccountsService = Depends(get_accounts_service) -): - """POST request to Gateway. Example: POST /gateway-proxy/wallet/add""" - return await _forward_to_gateway(path, request, accounts_service) - - -@router.put("/{path:path}", operation_id="gateway_proxy_put") -async def gateway_proxy_put( - path: str, - request: Request, - accounts_service: AccountsService = Depends(get_accounts_service) -): - """PUT request to Gateway.""" - return await _forward_to_gateway(path, request, accounts_service) - - -@router.delete("/{path:path}", operation_id="gateway_proxy_delete") -async def gateway_proxy_delete( - path: str, - request: Request, - accounts_service: AccountsService = Depends(get_accounts_service) -): - """DELETE request to Gateway.""" - return await _forward_to_gateway(path, request, accounts_service) - - -@router.patch("/{path:path}", operation_id="gateway_proxy_patch") -async def gateway_proxy_patch( - path: str, - request: Request, - accounts_service: AccountsService = Depends(get_accounts_service) -): - """PATCH request to Gateway.""" - return await _forward_to_gateway(path, request, accounts_service) - - -# Also expose the root endpoint for health checks -@router.get("") -async def gateway_root( - accounts_service: AccountsService = Depends(get_accounts_service) -): - """ - Gateway health check. - Forwards to Gateway root endpoint to check if it's online. - """ - gateway_client = accounts_service.gateway_client - result = await gateway_client._request("GET", "") - if result is None: - raise HTTPException(status_code=503, detail="Gateway service unavailable") - if "error" in result: - raise HTTPException(status_code=result.get("status", 500), detail=result["error"]) - return result