From ab9ce97d54e3a391acb72130ad5f2255159d5da5 Mon Sep 17 00:00:00 2001 From: cardosofede Date: Mon, 25 May 2026 23:43:10 -0300 Subject: [PATCH 1/5] (feat) improve bot runs deletion --- database/repositories/bot_run_repository.py | 31 +++++++++++++++-- routers/archived_bots.py | 33 +++++++++++++++--- routers/bot_orchestration.py | 38 +++++++++++++++++++++ 3 files changed, 96 insertions(+), 6 deletions(-) diff --git a/database/repositories/bot_run_repository.py b/database/repositories/bot_run_repository.py index 3999389e..57f4bad5 100644 --- a/database/repositories/bot_run_repository.py +++ b/database/repositories/bot_run_repository.py @@ -2,7 +2,7 @@ from datetime import datetime, timezone from typing import Dict, List, Optional, Any -from sqlalchemy import desc, select, and_, or_, func +from sqlalchemy import delete, desc, select, and_, or_, func from sqlalchemy.ext.asyncio import AsyncSession from database.models import BotRun @@ -188,4 +188,31 @@ async def get_bot_run_stats(self) -> Dict[str, Any]: "active_runs": active_runs, "strategy_type_counts": strategy_counts, "status_counts": status_counts - } \ No newline at end of file + } + + async def delete_bot_run(self, bot_run_id: int) -> Optional[BotRun]: + """Delete a bot run record by ID. Returns the deleted record or None.""" + stmt = select(BotRun).where(BotRun.id == bot_run_id) + result = await self.session.execute(stmt) + bot_run = result.scalar_one_or_none() + + if bot_run: + await self.session.delete(bot_run) + await self.session.flush() + + return bot_run + + async def delete_bot_runs_by_bot_name(self, bot_name: str) -> int: + """Delete all bot run records for a given bot_name. Returns count deleted.""" + stmt = select(BotRun).where(BotRun.bot_name == bot_name) + result = await self.session.execute(stmt) + bot_runs = result.scalars().all() + + count = len(bot_runs) + for bot_run in bot_runs: + await self.session.delete(bot_run) + + if count > 0: + await self.session.flush() + + return count \ No newline at end of file diff --git a/routers/archived_bots.py b/routers/archived_bots.py index 179beb50..729c471f 100644 --- a/routers/archived_bots.py +++ b/routers/archived_bots.py @@ -1,10 +1,15 @@ +import logging from typing import List, Optional -from fastapi import APIRouter, HTTPException, Query +from fastapi import APIRouter, Depends, HTTPException, Query +from database import AsyncDatabaseManager, BotRunRepository +from deps import get_database_manager from utils.file_system import fs_util from utils.hummingbot_database_reader import HummingbotDatabase +logger = logging.getLogger(__name__) + router = APIRouter(tags=["Archived Bots"], prefix="/archived-bots") @@ -20,19 +25,22 @@ async def list_databases(): @router.delete("/{db_path:path}") -async def delete_archived_bot(db_path: str): +async def delete_archived_bot( + db_path: str, + db_manager: AsyncDatabaseManager = Depends(get_database_manager) +): """ Delete an archived bot and its entire directory. + Also attempts to delete matching BotRun records from PostgreSQL (best-effort). Args: db_path: Path to the database file (as returned by list_databases) Returns: - Confirmation message with the deleted bot name + Confirmation message with the deleted bot name and count of cleaned PG records """ try: bot_name = fs_util.delete_archived_bot(db_path) - return {"message": f"Archived bot '{bot_name}' deleted successfully", "bot_name": bot_name} except FileNotFoundError as e: raise HTTPException(status_code=404, detail=str(e)) except ValueError as e: @@ -42,6 +50,23 @@ async def delete_archived_bot(db_path: str): except Exception as e: raise HTTPException(status_code=500, detail=f"Error deleting archived bot: {str(e)}") + # Best-effort: also clean matching BotRun records from PG + bot_runs_deleted = 0 + try: + async with db_manager.get_session_context() as session: + bot_run_repo = BotRunRepository(session) + bot_runs_deleted = await bot_run_repo.delete_bot_runs_by_bot_name(bot_name) + if bot_runs_deleted > 0: + logger.info(f"Deleted {bot_runs_deleted} bot run record(s) for '{bot_name}'") + except Exception as e: + logger.warning(f"Failed to clean bot run records for '{bot_name}': {e}") + + return { + "message": f"Archived bot '{bot_name}' deleted successfully", + "bot_name": bot_name, + "bot_runs_deleted": bot_runs_deleted + } + @router.get("/{db_path:path}/status") async def get_database_status(db_path: str): diff --git a/routers/bot_orchestration.py b/routers/bot_orchestration.py index 72254d4a..69a96a16 100644 --- a/routers/bot_orchestration.py +++ b/routers/bot_orchestration.py @@ -387,6 +387,44 @@ async def get_bot_run_by_id( raise HTTPException(status_code=500, detail=str(e)) +@router.delete("/bot-runs/{bot_run_id}") +async def delete_bot_run( + bot_run_id: int, + db_manager: AsyncDatabaseManager = Depends(get_database_manager) +): + """ + Delete a bot run record by ID. + + Args: + bot_run_id: ID of the bot run to delete + db_manager: Database manager dependency + + Returns: + Confirmation of deletion + + Raises: + HTTPException: 404 if bot run not found + """ + try: + async with db_manager.get_session_context() as session: + bot_run_repo = BotRunRepository(session) + bot_run = await bot_run_repo.delete_bot_run(bot_run_id) + + if not bot_run: + raise HTTPException(status_code=404, detail=f"Bot run {bot_run_id} not found") + + return { + "status": "success", + "message": f"Bot run {bot_run_id} deleted successfully", + "bot_name": bot_run.bot_name + } + except HTTPException: + raise + except Exception as e: + logger.error(f"Failed to delete bot run {bot_run_id}: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + @router.get("/bot-runs/stats") async def get_bot_run_stats( db_manager: AsyncDatabaseManager = Depends(get_database_manager) From 6b769aa3a24df5ec4ecdcbdbd469cf167b4bffba Mon Sep 17 00:00:00 2001 From: cardosofede Date: Mon, 25 May 2026 23:43:29 -0300 Subject: [PATCH 2/5] (feat) add controller performance interval configurable --- bots/controllers/generic/pmm_mister.py | 32 ++++++++++++++++++++++++++ bots/scripts/v2_with_controllers.py | 15 +++--------- config.py | 1 + main.py | 3 ++- 4 files changed, 38 insertions(+), 13 deletions(-) diff --git a/bots/controllers/generic/pmm_mister.py b/bots/controllers/generic/pmm_mister.py index fcd37b79..f318651c 100644 --- a/bots/controllers/generic/pmm_mister.py +++ b/bots/controllers/generic/pmm_mister.py @@ -698,6 +698,38 @@ def get_trade_type_from_level_id(self, level_id: str) -> TradeType: def get_level_from_level_id(self, level_id: str) -> int: return int(level_id.split('_')[1]) + # ── Custom info for MQTT reporting ────────────────────────────────── + + def get_custom_info(self) -> dict: + if not self.processed_data: + return {} + + executor_stats = self.processed_data.get("executor_stats", {}) + level_conditions = self.processed_data.get("level_conditions", {}) + + # Count blocking reasons across all levels + blocking_summary = {} + for lc in level_conditions.values(): + for condition in lc.get("blocking_conditions", []): + blocking_summary[condition] = blocking_summary.get(condition, 0) + 1 + + return { + "reference_price": float(self.processed_data.get("reference_price", 0)), + "spread_multiplier": float(self.processed_data.get("spread_multiplier", 1)), + "current_base_pct": float(self.processed_data.get("current_base_pct", 0)), + "deviation": float(self.processed_data.get("deviation", 0)), + "buy_skew": float(self.processed_data.get("buy_skew", 1)), + "sell_skew": float(self.processed_data.get("sell_skew", 1)), + "breakeven_price": float(self.processed_data.get("breakeven_price") or 0), + "position_amount": float(self.processed_data.get("position_amount", 0)), + "unrealized_pnl_pct": float(self.processed_data.get("unrealized_pnl_pct", 0)), + "total_active": executor_stats.get("total_active", 0), + "total_trading": executor_stats.get("total_trading", 0), + "total_not_trading": executor_stats.get("total_not_trading", 0), + "executable_levels": len(self.processed_data.get("levels_to_execute", [])), + "blocking_summary": blocking_summary, + } + # ── Status display ──────────────────────────────────────────────────── def to_format_status(self) -> List[str]: diff --git a/bots/scripts/v2_with_controllers.py b/bots/scripts/v2_with_controllers.py index 241041d4..4c06f97b 100644 --- a/bots/scripts/v2_with_controllers.py +++ b/bots/scripts/v2_with_controllers.py @@ -69,18 +69,12 @@ def check_max_controller_drawdown(self): filter_func=lambda x: x.is_active and not x.is_trading, ) self.executor_orchestrator.execute_actions( - actions=[ - StopExecutorAction(controller_id=controller_id, executor_id=executor.id) - for executor in executors_order_placed - ] + actions=[StopExecutorAction(controller_id=controller_id, executor_id=executor.id) for executor in executors_order_placed] ) self.drawdown_exited_controllers.append(controller_id) def check_max_global_drawdown(self): - current_global_pnl = sum([ - self.get_performance_report(controller_id).global_pnl_quote - for controller_id in self.controllers.keys() - ]) + current_global_pnl = sum([self.get_performance_report(controller_id).global_pnl_quote for controller_id in self.controllers.keys()]) if current_global_pnl > self.max_global_pnl: self.max_global_pnl = current_global_pnl else: @@ -103,10 +97,7 @@ def get_controller_report(self, controller_id: str) -> dict: def send_performance_report(self): if self.current_timestamp - self._last_performance_report_timestamp >= self.performance_report_interval and self._pub: - controller_reports = { - controller_id: self.get_controller_report(controller_id) - for controller_id in self.controllers.keys() - } + controller_reports = {controller_id: self.get_controller_report(controller_id) for controller_id in self.controllers.keys()} self._pub(controller_reports) self._last_performance_report_timestamp = self.current_timestamp diff --git a/config.py b/config.py index d6550923..2f1b08dc 100644 --- a/config.py +++ b/config.py @@ -11,6 +11,7 @@ class BrokerSettings(BaseSettings): port: int = Field(default=1883, description="MQTT broker port") username: str = Field(default="admin", description="MQTT broker username") password: str = Field(default="password", description="MQTT broker password") + performance_dump_interval: int = Field(default=5, description="Controller performance dump interval in minutes") model_config = SettingsConfigDict(env_prefix="BROKER_", extra="ignore") diff --git a/main.py b/main.py index 22e1e00a..d6499aef 100644 --- a/main.py +++ b/main.py @@ -225,7 +225,8 @@ async def lifespan(app: FastAPI): broker_host=settings.broker.host, broker_port=settings.broker.port, broker_username=settings.broker.username, - broker_password=settings.broker.password + broker_password=settings.broker.password, + performance_dump_interval=settings.broker.performance_dump_interval ) backtesting_service = BacktestingService() From a6d275fed3285bb4b0a37a165a59695b01c368b2 Mon Sep 17 00:00:00 2001 From: cardosofede Date: Tue, 26 May 2026 18:16:19 -0300 Subject: [PATCH 3/5] (feat) update pmm mister --- bots/controllers/generic/pmm_mister.py | 547 ++++++++++++++++++++++--- 1 file changed, 491 insertions(+), 56 deletions(-) diff --git a/bots/controllers/generic/pmm_mister.py b/bots/controllers/generic/pmm_mister.py index f318651c..7a9d24fa 100644 --- a/bots/controllers/generic/pmm_mister.py +++ b/bots/controllers/generic/pmm_mister.py @@ -2,14 +2,24 @@ from decimal import Decimal from typing import Dict, List, Optional, Tuple, Union -from hummingbot.core.data_type.common import MarketDict, OrderType, PositionMode, PriceType, TradeType +from pydantic import Field, field_validator +from pydantic_core.core_schema import ValidationInfo + +from hummingbot.core.data_type.common import ( + MarketDict, + OrderType, + PositionAction, + PositionMode, + PositionSide, + PriceType, + TradeType, +) from hummingbot.strategy_v2.controllers.controller_base import ControllerBase, ControllerConfigBase from hummingbot.strategy_v2.executors.data_types import ConnectorPair +from hummingbot.strategy_v2.executors.order_executor.data_types import ExecutionStrategy, OrderExecutorConfig from hummingbot.strategy_v2.executors.position_executor.data_types import PositionExecutorConfig, TripleBarrierConfig from hummingbot.strategy_v2.models.executor_actions import CreateExecutorAction, ExecutorAction, StopExecutorAction from hummingbot.strategy_v2.utils.common import parse_comma_separated_list, parse_enum_value -from pydantic import Field, field_validator -from pydantic_core.core_schema import ValidationInfo class PMMisterConfig(ControllerConfigBase): @@ -44,10 +54,12 @@ class PMMisterConfig(ControllerConfigBase): tolerance_scaling: Decimal = Field(default=Decimal("1.2"), json_schema_extra={"is_updatable": True}) leverage: int = Field(default=20, json_schema_extra={"is_updatable": True}) - position_mode: PositionMode = Field(default="ONEWAY") + position_mode: PositionMode = Field(default=PositionMode.ONEWAY) + # LONG: buys accumulate, sells reduce. SHORT: sells accumulate, buys reduce. + position_side: TradeType = Field(default=TradeType.BUY) take_profit: Optional[Decimal] = Field(default=Decimal("0.0001"), gt=0, json_schema_extra={"is_updatable": True}) - take_profit_order_type: Optional[OrderType] = Field(default="LIMIT_MAKER", json_schema_extra={"is_updatable": True}) - open_order_type: Optional[OrderType] = Field(default="LIMIT_MAKER", json_schema_extra={"is_updatable": True}) + take_profit_order_type: Optional[OrderType] = Field(default=OrderType.LIMIT_MAKER, json_schema_extra={"is_updatable": True}) + open_order_type: Optional[OrderType] = Field(default=OrderType.LIMIT_MAKER, json_schema_extra={"is_updatable": True}) max_active_executors_by_level: Optional[int] = Field(default=4, json_schema_extra={"is_updatable": True}) tick_mode: bool = Field(default=False, json_schema_extra={"is_updatable": True}) position_profit_protection: bool = Field(default=False, json_schema_extra={"is_updatable": True}) @@ -55,6 +67,20 @@ class PMMisterConfig(ControllerConfigBase): global_take_profit: Decimal = Field(default=Decimal("0.03"), json_schema_extra={"is_updatable": True}) global_stop_loss: Decimal = Field(default=Decimal("0.05"), json_schema_extra={"is_updatable": True}) + # Global TP/SL activation settings + global_tp_enabled: bool = Field(default=False, json_schema_extra={"is_updatable": True}) + global_sl_enabled: bool = Field(default=False, json_schema_extra={"is_updatable": True}) + # TP activates when position >= this threshold: "min_base" (earlier) or "target_base" (later) + global_tp_activation_from: str = Field(default="min_base", json_schema_extra={"is_updatable": True}) + # SL activates when position >= this threshold: "target_base" (earlier) or "max_base" (later) + global_sl_activation_from: str = Field(default="target_base", json_schema_extra={"is_updatable": True}) + # PnL reference: "position" = pnl/position_value, "portfolio" = pnl/total_amount_quote + global_pnl_reference: str = Field(default="position", json_schema_extra={"is_updatable": True}) + + # Limit chaser config for position closing (tight values for fast fills) + close_chaser_distance: Decimal = Field(default=Decimal("0.0001"), json_schema_extra={"is_updatable": True}) + close_chaser_refresh_threshold: Decimal = Field(default=Decimal("0.0005"), json_schema_extra={"is_updatable": True}) + @field_validator("take_profit", mode="before") @classmethod def validate_target(cls, v): @@ -101,6 +127,41 @@ def parse_and_validate_amounts(cls, v, validation_info: ValidationInfo): def validate_position_mode(cls, v) -> PositionMode: return parse_enum_value(PositionMode, v, "position_mode") + @field_validator('position_side', mode="before") + @classmethod + def validate_position_side(cls, v) -> TradeType: + if isinstance(v, TradeType): + return v + mapping = {"BUY": TradeType.BUY, "SELL": TradeType.SELL, "LONG": TradeType.BUY, "SHORT": TradeType.SELL} + upper = str(v).upper() + if upper in mapping: + return mapping[upper] + raise ValueError(f"position_side must be BUY/LONG or SELL/SHORT, got {v}") + + @field_validator('global_tp_activation_from', mode="before") + @classmethod + def validate_tp_activation_from(cls, v): + valid = {"always", "min_base", "target_base"} + if v not in valid: + raise ValueError(f"global_tp_activation_from must be one of {valid}") + return v + + @field_validator('global_sl_activation_from', mode="before") + @classmethod + def validate_sl_activation_from(cls, v): + valid = {"target_base", "max_base"} + if v not in valid: + raise ValueError(f"global_sl_activation_from must be one of {valid}") + return v + + @field_validator('global_pnl_reference', mode="before") + @classmethod + def validate_pnl_reference(cls, v): + valid = {"position", "portfolio"} + if v not in valid: + raise ValueError(f"global_pnl_reference must be one of {valid}") + return v + @field_validator('price_distance_tolerance', 'refresh_tolerance', 'tolerance_scaling', mode="before") @classmethod def validate_tolerance_fields(cls, v, validation_info: ValidationInfo): @@ -111,6 +172,10 @@ def validate_tolerance_fields(cls, v, validation_info: ValidationInfo): raise ValueError(f"{field_name} must be greater than 0") return v + @property + def is_short(self) -> bool: + return self.position_side == TradeType.SELL + @property def triple_barrier_config(self) -> TripleBarrierConfig: # Ensure we're passing OrderType enum values, not strings @@ -197,6 +262,35 @@ def __init__(self, config: PMMisterConfig, *args, **kwargs): self.order_history = [] self.max_order_history = 20 self.processed_data = {} + self._position_mode_verified = False + self._global_close_phase: Optional[str] = None # None | "stopping" | "closing" + self._global_close_side: Optional[TradeType] = None # Side of the position when TP/SL triggered + self._global_close_retries: int = 0 # Count how many times PHASE 2 has created a close executor + + def _verify_position_mode(self) -> bool: + """Check that the connector's position mode matches the config. Blocks trading until confirmed.""" + if self._position_mode_verified: + return True + try: + connector = self.market_data_provider.get_connector(self.config.connector_name) + # Only perpetual connectors have position_mode; skip check for spot connectors + if not hasattr(connector, 'position_mode'): + self._position_mode_verified = True + return True + exchange_mode = connector.position_mode + config_mode = self.config.position_mode + if exchange_mode != config_mode: + self.logger().warning( + f"Position mode mismatch: exchange={exchange_mode}, config={config_mode}. " + f"Waiting for position mode to be set correctly before trading.") + return False + self._position_mode_verified = True + self.logger().info( + f"Position mode verified: {exchange_mode} matches config. Trading enabled.") + return True + except Exception as e: + self.logger().warning(f"Could not verify position mode: {e}. Blocking trading.") + return False # ── Market data (called by framework) ───────────────────────────────── @@ -235,14 +329,242 @@ async def update_processed_data(self): # ── Executor actions (called by framework) ──────────────────────────── def determine_executor_actions(self) -> List[ExecutorAction]: + # Guard: verify position mode matches config before operating + if not self._verify_position_mode(): + return [] + self._update_position_state() self._compute_executor_analysis() actions = [] + + # Check global TP/SL — two-phase: stop executors, then close position + tp_sl_actions = self._check_global_tp_sl() + if tp_sl_actions: + actions.extend(tp_sl_actions) + + # Block normal trading while global close is in progress + if self._global_close_phase is not None: + return actions + actions.extend(self.create_actions_proposal()) actions.extend(self.stop_actions_proposal()) return actions + # ── Global TP/SL ────────────────────────────────────────────────────── + + def _get_tp_activation_threshold(self) -> Decimal: + if self.config.global_tp_activation_from == "always": + return Decimal("0") + if self.config.global_tp_activation_from == "min_base": + return self.config.min_base_pct + return self.config.target_base_pct + + def _get_sl_activation_threshold(self) -> Decimal: + if self.config.global_sl_activation_from == "target_base": + return self.config.target_base_pct + return self.config.max_base_pct + + def _get_exchange_position(self) -> Tuple[Decimal, Optional[TradeType]]: + """Read the REAL position from the exchange connector (WebSocket-updated, no orchestrator delay). + Returns (abs_amount, side) where side is BUY for long, SELL for short, None if no position.""" + try: + connector = self.market_data_provider.get_connector(self.config.connector_name) + if not hasattr(connector, '_perpetual_trading'): + return Decimal("0"), None + perp = connector._perpetual_trading + pos = perp.get_position(self.config.trading_pair, PositionSide.BOTH) + if pos is None or pos.amount == Decimal("0"): + return Decimal("0"), None + amount = pos.amount + # Binance ONEWAY: positive amount = long, negative = short + if amount > 0: + return amount, TradeType.BUY + else: + return abs(amount), TradeType.SELL + except Exception as e: + self.logger().warning(f"Failed to read exchange position: {e}") + return Decimal("0"), None + + def _check_global_tp_sl(self) -> List[ExecutorAction]: + """Check global TP/SL using a two-phase approach: + Phase 1 (stopping): Stop all active executors with keep_position=True. + Phase 2 (closing): Once no active executors remain, close the actual position.""" + + # --- Phase: stopping --- wait for all executors to finish, then transition to closing + if self._global_close_phase == "stopping": + active_non_close = [ + e for e in self.executors_info + if e.is_active and e.custom_info.get("level_id") != "global_close" + ] + if active_non_close: + self.logger().debug( + f"Global close phase=stopping: waiting for {len(active_non_close)} executors to finish" + ) + return [] + # All executors stopped — transition to closing phase + self._global_close_phase = "closing" + self.logger().info("Global close phase=stopping complete. All executors stopped. Transitioning to closing.") + + # --- Phase: closing --- create close executor for the real position + if self._global_close_phase == "closing": + # If a close executor is already active, wait for it + close_executors = [ + e for e in self.executors_info + if e.is_active and e.custom_info.get("level_id") == "global_close" + ] + if close_executors: + return [] + + # Guard: abort after too many failed close attempts (e.g. below min notional) + if self._global_close_retries >= 3: + self.logger().warning( + f"=== GLOBAL CLOSE ABORTED: {self._global_close_retries} close attempts failed. ===\n" + f" Position may be below minimum notional. Aborting to prevent infinite loop." + ) + self._global_close_phase = None + self._global_close_side = None + self._global_close_retries = 0 + return [] + + # Read position from the EXCHANGE CONNECTOR (WebSocket-updated, no orchestrator delay) + # This avoids the race condition where positions_held is stale + exchange_amount, exchange_side = self._get_exchange_position() + + if exchange_amount == Decimal("0") or exchange_side is None: + self.logger().info("Global close phase=closing: exchange position is 0. Done.") + self._global_close_phase = None + self._global_close_side = None + self._global_close_retries = 0 + return [] + + # SAFETY: Detect position side flip — if position flipped direction, abort close + if self._global_close_side is not None and exchange_side != self._global_close_side: + self.logger().warning( + f"=== GLOBAL CLOSE ABORTED: Position side flipped! ===\n" + f" Original side: {self._global_close_side.name} | " + f"Current side: {exchange_side.name} | Amount: {exchange_amount}\n" + f" This indicates over-selling. Aborting global close to prevent further damage." + ) + self._global_close_phase = None + self._global_close_side = None + self._global_close_retries = 0 + return [] + + quantized = self.market_data_provider.quantize_order_amount( + self.config.connector_name, self.config.trading_pair, exchange_amount + ) + if quantized == Decimal("0"): + self._global_close_phase = None + self._global_close_side = None + self._global_close_retries = 0 + return [] + + # Determine close side from the EXCHANGE position side + close_side = TradeType.SELL if exchange_side == TradeType.BUY else TradeType.BUY + + self._global_close_retries += 1 + self.logger().info( + f"=== GLOBAL CLOSE — PHASE 2: CLOSING POSITION (attempt {self._global_close_retries}/3) ===\n" + f" Exchange position: {exchange_side.name} {exchange_amount} | " + f"Close side: {close_side.name} | Creating close executor." + ) + close_action = self._create_close_action_with_side(close_side, exchange_amount) + return [close_action] if close_action else [] + + # --- No phase active: check if TP/SL should trigger --- + current_base_pct = self.processed_data.get("current_base_pct", Decimal("0")) + unrealized_pnl_pct = self.processed_data.get("unrealized_pnl_pct", Decimal("0")) + position_amount = self.processed_data.get("position_amount", Decimal("0")) + + if position_amount == Decimal("0"): + return [] + + triggered = False + trigger_reason = "" + + # Check take profit + tp_threshold = self._get_tp_activation_threshold() + if self.config.global_tp_enabled and current_base_pct >= tp_threshold: + if unrealized_pnl_pct >= self.config.global_take_profit: + triggered = True + trigger_reason = "take_profit" + + # Check stop loss + sl_threshold = self._get_sl_activation_threshold() + if not triggered and self.config.global_sl_enabled and current_base_pct >= sl_threshold: + if unrealized_pnl_pct <= -self.config.global_stop_loss: + triggered = True + trigger_reason = "stop_loss" + + if not triggered: + return [] + + # --- Trigger: enter stopping phase --- stop all active executors first + self._global_close_phase = "stopping" + self._global_close_retries = 0 + # Remember the position side at trigger time so we always close in the right direction + position_held = next((p for p in self.positions_held if + p.trading_pair == self.config.trading_pair and + p.connector_name == self.config.connector_name), None) + self._global_close_side = position_held.side if position_held else None + active_executors = [ + e for e in self.executors_info + if e.is_active and e.custom_info.get("level_id") != "global_close" + ] + + self.logger().info( + f"=== GLOBAL {trigger_reason.upper()} TRIGGERED — PHASE 1: STOPPING EXECUTORS ===\n" + f" PnL: {unrealized_pnl_pct:.4%} | Position: {position_amount} | Base%: {current_base_pct:.4%}\n" + f" Stopping {len(active_executors)} active executors before closing position." + ) + + stop_actions = [] + for executor in active_executors: + stop_actions.append(StopExecutorAction( + controller_id=self.config.id, + keep_position=True, + executor_id=executor.id, + )) + + return stop_actions + + def _create_close_action(self, position_amount: Decimal) -> Optional[CreateExecutorAction]: + """Create a close action by inferring the side from position_held. Kept for backward compat.""" + position_held = next((p for p in self.positions_held if + p.trading_pair == self.config.trading_pair and + p.connector_name == self.config.connector_name), None) + if position_held is None or position_amount == Decimal("0"): + return None + close_side = TradeType.SELL if position_held.side == TradeType.BUY else TradeType.BUY + return self._create_close_action_with_side(close_side, abs(position_amount)) + + def _create_close_action_with_side(self, side: TradeType, amount: Decimal) -> Optional[CreateExecutorAction]: + if amount == Decimal("0"): + return None + + self.logger().info( + f"Creating close executor: side={side.name} amount={amount} " + f"action=CLOSE strategy=MARKET (reduceOnly on exchange)" + ) + + config = OrderExecutorConfig( + timestamp=self.market_data_provider.time(), + trading_pair=self.config.trading_pair, + connector_name=self.config.connector_name, + side=side, + amount=amount, + execution_strategy=ExecutionStrategy.MARKET, + position_action=PositionAction.CLOSE, + leverage=self.config.leverage, + level_id="global_close", + ) + + return CreateExecutorAction( + controller_id=self.config.id, + executor_config=config, + ) + # ── Single-pass executor analysis ───────────────────────────────────── def _compute_executor_analysis(self): @@ -282,6 +604,8 @@ def _compute_executor_analysis(self): breakeven_price = self.processed_data.get("breakeven_price") for level_id in all_level_ids: + if not level_id.startswith(("buy_", "sell_")): + continue executors = executors_by_level.get(level_id, []) active = [e for e in executors if e.is_active] active_not_trading = [e for e in active if not e.is_trading] @@ -343,15 +667,23 @@ def _compute_executor_analysis(self): blocking.append("price_distance") # e) Position constraints - if current_pct < self.config.min_base_pct and not is_buy: + is_accumulation = self._is_accumulation_side(trade_type) + if current_pct < self.config.min_base_pct and not is_accumulation: blocking.append("below_min_position") - elif current_pct > self.config.max_base_pct and is_buy: + elif current_pct > self.config.max_base_pct and is_accumulation: blocking.append("above_max_position") - # f) Position profit protection - if (self.config.position_profit_protection and not is_buy - and breakeven_price and breakeven_price > 0 and reference_price < breakeven_price): - blocking.append("position_profit_protection") + # f) Position profit protection — block the reduction side when price is unfavorable + is_reduction = not is_accumulation + if self.config.position_profit_protection and is_reduction and breakeven_price and breakeven_price > 0: + if self.config.is_short: + # SHORT: buying to reduce — block if price > breakeven (would realize a loss) + if reference_price > breakeven_price: + blocking.append("position_profit_protection") + else: + # LONG: selling to reduce — block if price < breakeven (would realize a loss) + if reference_price < breakeven_price: + blocking.append("position_profit_protection") # Execution-blocking conditions determine "working" levels execution_blocking = {"active_not_trading", "max_active_executors", "cooldown", "price_distance"} @@ -499,24 +831,52 @@ def _update_position_state(self): target_position = self.config.total_amount_quote * self.config.target_base_pct if position_held is not None: - current_base_pct = position_held.amount_quote / self.config.total_amount_quote - deviation = (target_position - position_held.amount_quote) / target_position - unrealized_pnl_pct = (position_held.unrealized_pnl_quote / position_held.amount_quote - if position_held.amount_quote != 0 else Decimal("0")) + # Use abs(amount_quote) so current_base_pct is always positive for both long and short + current_base_pct = abs(position_held.amount_quote) / self.config.total_amount_quote + deviation = (target_position - abs(position_held.amount_quote)) / target_position breakeven_price = position_held.breakeven_price position_amount = position_held.amount + position_cum_fees = position_held.cum_fees_quote + position_realized_pnl = position_held.realized_pnl_quote + position_unrealized_pnl = position_held.unrealized_pnl_quote + position_volume = position_held.volume_traded_quote + if self.config.global_pnl_reference == "portfolio": + pnl_denominator = self.config.total_amount_quote + else: + # Use entry value (breakeven * amount) for stable PnL % instead of mark-price based amount_quote + pnl_denominator = (abs(position_amount) * breakeven_price + if breakeven_price and breakeven_price > 0 + else abs(position_held.amount_quote)) + unrealized_pnl_pct = (position_held.unrealized_pnl_quote / pnl_denominator + if pnl_denominator != 0 else Decimal("0")) else: current_base_pct = Decimal("0") deviation = Decimal("1") unrealized_pnl_pct = Decimal("0") breakeven_price = None position_amount = Decimal("0") + position_cum_fees = Decimal("0") + position_realized_pnl = Decimal("0") + position_unrealized_pnl = Decimal("0") + position_volume = Decimal("0") + + # Executor fees (from active executors) + executor_fees = sum( + (e.cum_fees_quote for e in self.executors_info if e.is_active), + Decimal("0") + ) min_pct = self.config.min_base_pct max_pct = self.config.max_base_pct if max_pct > min_pct: - buy_skew = (max_pct - current_base_pct) / (max_pct - min_pct) - sell_skew = (current_base_pct - min_pct) / (max_pct - min_pct) + if self.config.is_short: + # SHORT: sell accumulates → sell_skew high when position small, buy_skew high when position large + sell_skew = (max_pct - current_base_pct) / (max_pct - min_pct) + buy_skew = (current_base_pct - min_pct) / (max_pct - min_pct) + else: + # LONG: buy accumulates → buy_skew high when position small, sell_skew high when position large + buy_skew = (max_pct - current_base_pct) / (max_pct - min_pct) + sell_skew = (current_base_pct - min_pct) / (max_pct - min_pct) buy_skew = max(min(buy_skew, Decimal("1.0")), self.config.min_skew) sell_skew = max(min(sell_skew, Decimal("1.0")), self.config.min_skew) else: @@ -530,6 +890,12 @@ def _update_position_state(self): "position_amount": position_amount, "buy_skew": buy_skew, "sell_skew": sell_skew, + "position_cum_fees": position_cum_fees, + "position_realized_pnl": position_realized_pnl, + "position_unrealized_pnl": position_unrealized_pnl, + "position_volume": position_volume, + "executor_fees": executor_fees, + "total_fees": position_cum_fees + executor_fees, }) # ── Create / stop proposals ─────────────────────────────────────────── @@ -571,11 +937,16 @@ def create_actions_proposal(self) -> List[ExecutorAction]: self.logger().warning(f"The amount of the level {level_id} is 0. Skipping.") continue - # Position profit protection: don't place sell orders below breakeven - if self.config.position_profit_protection and trade_type == TradeType.SELL: + # Position profit protection: block reduction-side orders at unfavorable prices + if self.config.position_profit_protection and not self._is_accumulation_side(trade_type): breakeven_price = self.processed_data.get("breakeven_price") - if breakeven_price is not None and breakeven_price > 0 and price < breakeven_price: - continue + if breakeven_price is not None and breakeven_price > 0: + # LONG reduces by selling → skip if price < breakeven + # SHORT reduces by buying → skip if price > breakeven + if self.config.is_short and price > breakeven_price: + continue + elif not self.config.is_short and price < breakeven_price: + continue executor_config = self.get_executor_config(level_id, price, amount) if executor_config is not None: @@ -628,12 +999,22 @@ def _get_executable_levels(self, working_levels: set) -> List[str]: if f"sell_{i}" not in working_levels ] + # Determine which side accumulates vs reduces based on position_side + if self.config.is_short: + accumulation_levels = sell_missing + reduction_levels = buy_missing + else: + accumulation_levels = buy_missing + reduction_levels = sell_missing + current_pct = self.processed_data.get("current_base_pct", Decimal("0")) + # Below min → only accumulate if current_pct < self.config.min_base_pct: - return buy_missing + return accumulation_levels + # Above max → only reduce elif current_pct > self.config.max_base_pct: - return sell_missing + return reduction_levels if self.config.position_profit_protection: breakeven_price = self.processed_data.get("breakeven_price") @@ -641,10 +1022,20 @@ def _get_executable_levels(self, working_levels: set) -> List[str]: target_pct = self.config.target_base_pct if breakeven_price is not None and breakeven_price > 0: - if current_pct < target_pct and reference_price < breakeven_price: - return buy_missing - elif current_pct > target_pct and reference_price > breakeven_price: - return sell_missing + if self.config.is_short: + # SHORT: below target & price above breakeven → only accumulate (sell more) + if current_pct < target_pct and reference_price > breakeven_price: + return accumulation_levels + # SHORT: above target & price below breakeven → only reduce (buy back) + elif current_pct > target_pct and reference_price < breakeven_price: + return reduction_levels + else: + # LONG: below target & price below breakeven → only accumulate (buy more) + if current_pct < target_pct and reference_price < breakeven_price: + return accumulation_levels + # LONG: above target & price above breakeven → only reduce (sell) + elif current_pct > target_pct and reference_price > breakeven_price: + return reduction_levels return buy_missing + sell_missing @@ -689,6 +1080,13 @@ def get_executor_config(self, level_id: str, price: Decimal, amount: Decimal): side=trade_type, ) + def _is_accumulation_side(self, trade_type: TradeType) -> bool: + """Returns True if trade_type is the side that accumulates position. + LONG: BUY accumulates. SHORT: SELL accumulates.""" + if self.config.is_short: + return trade_type == TradeType.SELL + return trade_type == TradeType.BUY + def get_level_id_from_side(self, trade_type: TradeType, level: int) -> str: return f"{trade_type.name.lower()}_{level}" @@ -696,38 +1094,54 @@ def get_trade_type_from_level_id(self, level_id: str) -> TradeType: return TradeType.BUY if level_id.startswith("buy") else TradeType.SELL def get_level_from_level_id(self, level_id: str) -> int: - return int(level_id.split('_')[1]) + parts = level_id.split('_') + try: + return int(parts[1]) + except (ValueError, IndexError): + return -1 - # ── Custom info for MQTT reporting ────────────────────────────────── + # ── Custom info (MQTT / broker) ────────────────────────────────────── def get_custom_info(self) -> dict: if not self.processed_data: return {} + reference_price = self.processed_data.get("reference_price", Decimal("0")) + position_amount = self.processed_data.get("position_amount", Decimal("0")) + current_base_pct = self.processed_data.get("current_base_pct", Decimal("0")) + unrealized_pnl_pct = self.processed_data.get("unrealized_pnl_pct", Decimal("0")) + breakeven_price = self.processed_data.get("breakeven_price") + buy_skew = self.processed_data.get("buy_skew", Decimal("1")) + sell_skew = self.processed_data.get("sell_skew", Decimal("1")) executor_stats = self.processed_data.get("executor_stats", {}) level_conditions = self.processed_data.get("level_conditions", {}) - # Count blocking reasons across all levels - blocking_summary = {} - for lc in level_conditions.values(): - for condition in lc.get("blocking_conditions", []): - blocking_summary[condition] = blocking_summary.get(condition, 0) + 1 + # Distance to global TP/SL + distance_to_tp = float(self.config.global_take_profit - unrealized_pnl_pct) + distance_to_sl = float(unrealized_pnl_pct + self.config.global_stop_loss) + + # Executable levels count + can_buy = sum(1 for lc in level_conditions.values() if lc.get("trade_type") == "BUY" and lc.get("can_execute")) + can_sell = sum(1 for lc in level_conditions.values() if lc.get("trade_type") == "SELL" and lc.get("can_execute")) return { - "reference_price": float(self.processed_data.get("reference_price", 0)), - "spread_multiplier": float(self.processed_data.get("spread_multiplier", 1)), - "current_base_pct": float(self.processed_data.get("current_base_pct", 0)), - "deviation": float(self.processed_data.get("deviation", 0)), - "buy_skew": float(self.processed_data.get("buy_skew", 1)), - "sell_skew": float(self.processed_data.get("sell_skew", 1)), - "breakeven_price": float(self.processed_data.get("breakeven_price") or 0), - "position_amount": float(self.processed_data.get("position_amount", 0)), - "unrealized_pnl_pct": float(self.processed_data.get("unrealized_pnl_pct", 0)), - "total_active": executor_stats.get("total_active", 0), - "total_trading": executor_stats.get("total_trading", 0), - "total_not_trading": executor_stats.get("total_not_trading", 0), - "executable_levels": len(self.processed_data.get("levels_to_execute", [])), - "blocking_summary": blocking_summary, + "reference_price": float(reference_price), + "position_amount": float(position_amount), + "current_base_pct": float(current_base_pct), + "unrealized_pnl_pct": float(unrealized_pnl_pct), + "breakeven_price": float(breakeven_price) if breakeven_price is not None else None, + "buy_skew": float(buy_skew), + "sell_skew": float(sell_skew), + "distance_to_tp": distance_to_tp, + "distance_to_sl": distance_to_sl, + "global_tp_enabled": self.config.global_tp_enabled, + "global_sl_enabled": self.config.global_sl_enabled, + "global_close_phase": self._global_close_phase, + "closing_position": self._global_close_phase is not None, + "active_executors": executor_stats.get("total_active", 0), + "trading_executors": executor_stats.get("total_trading", 0), + "executable_buy_levels": can_buy, + "executable_sell_levels": can_sell, } # ── Status display ──────────────────────────────────────────────────── @@ -889,11 +1303,15 @@ def to_format_status(self) -> List[str]: skew = base_pct - target_pct skew_pct = skew / target_pct if target_pct != 0 else Decimal('0') + pos_side_label = "SHORT" if self.config.is_short else "LONG" + pos_amount = self.processed_data.get('position_amount', Decimal('0')) position_info = [ - f"Current: {base_pct:.2%} (Target: {target_pct:.2%})", + f"Current: {base_pct:.2%} (Target: {target_pct:.2%}) [{pos_side_label}]", f"Range: {min_pct:.2%} - {max_pct:.2%}", + f"Amount: {pos_amount}", f"Skew: {skew_pct:+.2%} (min {self.config.min_skew:.2%})", - f"Buy Skew: {buy_skew:.2f} | Sell Skew: {sell_skew:.2f}" + f"Buy Skew: {buy_skew:.2f} | Sell Skew: {sell_skew:.2f}", + "", ] breakeven_str = f"{breakeven:.2f}" if breakeven is not None else "N/A" @@ -901,10 +1319,26 @@ def to_format_status(self) -> List[str]: distance_to_tp = self.config.global_take_profit - pnl if pnl < self.config.global_take_profit else Decimal('0') distance_to_sl = pnl + self.config.global_stop_loss if pnl > -self.config.global_stop_loss else Decimal('0') + tp_active = self.config.global_tp_enabled and base_pct >= self._get_tp_activation_threshold() + sl_active = self.config.global_sl_enabled and base_pct >= self._get_sl_activation_threshold() + tp_status = "ACTIVE" if tp_active else ("OFF" if not self.config.global_tp_enabled else f"from {self.config.global_tp_activation_from}") + sl_status = "ACTIVE" if sl_active else ("OFF" if not self.config.global_sl_enabled else f"from {self.config.global_sl_activation_from}") + + # Fee and PnL data + position_fees = self.processed_data.get('position_cum_fees', Decimal('0')) + executor_fees = self.processed_data.get('executor_fees', Decimal('0')) + total_fees = self.processed_data.get('total_fees', Decimal('0')) + realized_pnl = self.processed_data.get('position_realized_pnl', Decimal('0')) + unrealized_pnl_quote = self.processed_data.get('position_unrealized_pnl', Decimal('0')) + volume = self.processed_data.get('position_volume', Decimal('0')) + quote = self.config.trading_pair.split("-")[1] + pnl_info = [ - f"Unrealized: {pnl_sign}{pnl:.2%}", - f"Take Profit: {self.config.global_take_profit:.2%} (Δ{distance_to_tp:.2%})", - f"Stop Loss: {-self.config.global_stop_loss:.2%} (Δ{distance_to_sl:.2%})", + f"Unrealized: {pnl_sign}{pnl:.2%} ({unrealized_pnl_quote:+.4f} {quote})", + f"Realized: {realized_pnl:+.4f} {quote} | Vol: {volume:.2f} {quote}", + f"Fees: {total_fees:.4f} {quote} (pos:{position_fees:.4f} exec:{executor_fees:.4f})", + f"TP: {self.config.global_take_profit:.2%} (Δ{distance_to_tp:.2%}) [{tp_status}]", + f"SL: {-self.config.global_stop_loss:.2%} (Δ{distance_to_sl:.2%}) [{sl_status}]", f"Breakeven: {breakeven_str}" ] @@ -1116,8 +1550,9 @@ def _format_position_visualization(self, base_pct: Decimal, target_pct: Decimal, else: pnl_bar = "─" * bar_width - pnl_status = "PROFIT" if pnl > 0 else "LOSS" if pnl < 0 else "BREAK-EVEN" - lines.append(f"│ PnL: [{pnl_bar}] {pnl_status} │") + pnl_sign = "+" if pnl > 0 else "" + pnl_status = f"{pnl_sign}{pnl:.2%}" + lines.append(f"│ Position PnL: [{pnl_bar}] {pnl_status} (S={-self.config.global_stop_loss:.2%} T={self.config.global_take_profit:.2%}) │") return lines From be930dbe675156b6d452e7e88dfe58afecc47623 Mon Sep 17 00:00:00 2001 From: cardosofede Date: Wed, 27 May 2026 10:05:57 -0300 Subject: [PATCH 4/5] (feat) fix delete with permissions --- routers/bot_orchestration.py | 19 ++++++++++++++++++- utils/file_system.py | 8 +++++--- 2 files changed, 23 insertions(+), 4 deletions(-) diff --git a/routers/bot_orchestration.py b/routers/bot_orchestration.py index 69a96a16..659d3610 100644 --- a/routers/bot_orchestration.py +++ b/routers/bot_orchestration.py @@ -1,6 +1,7 @@ import asyncio import logging import os +import shutil from datetime import datetime, timezone from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Query @@ -413,10 +414,26 @@ async def delete_bot_run( if not bot_run: raise HTTPException(status_code=404, detail=f"Bot run {bot_run_id} not found") + # Also delete the archived bot folder if it exists + archived_dir = os.path.join('bots', 'archived', bot_run.instance_name) + archived_deleted = False + if os.path.isdir(archived_dir): + try: + import subprocess, platform + if platform.system() == 'Darwin': + # Strip macOS ACLs (Docker adds "deny delete" ACLs) + subprocess.run(['chmod', '-R', '-N', archived_dir], check=False) + shutil.rmtree(archived_dir) + archived_deleted = True + logger.info(f"Deleted archived folder: {archived_dir}") + except Exception as e: + logger.warning(f"Failed to delete archived folder {archived_dir}: {e}") + return { "status": "success", "message": f"Bot run {bot_run_id} deleted successfully", - "bot_name": bot_run.bot_name + "bot_name": bot_run.bot_name, + "archived_folder_deleted": archived_deleted } except HTTPException: raise diff --git a/utils/file_system.py b/utils/file_system.py index d0da1766..2b555202 100644 --- a/utils/file_system.py +++ b/utils/file_system.py @@ -443,9 +443,11 @@ def delete_archived_bot(self, db_path: str) -> str: if not os.path.isdir(archived_bot_dir): raise FileNotFoundError(f"Archived bot directory '{bot_name}' not found") - shutil.rmtree(archived_bot_dir, ignore_errors=False, onerror=lambda func, path, exc: ( - os.chmod(path, 0o777), func(path) - )) + import subprocess, platform + if platform.system() == 'Darwin': + # Strip macOS ACLs (Docker adds "deny delete" ACLs) + subprocess.run(['chmod', '-R', '-N', archived_bot_dir], check=False) + shutil.rmtree(archived_bot_dir) return bot_name def list_databases(self) -> List[str]: From 3384ab348496255c67482433a3b53af5c1aa5d6c Mon Sep 17 00:00:00 2001 From: cardosofede Date: Wed, 27 May 2026 11:47:59 -0300 Subject: [PATCH 5/5] (feat) remove chaser config --- bots/controllers/generic/pmm_mister.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/bots/controllers/generic/pmm_mister.py b/bots/controllers/generic/pmm_mister.py index 7a9d24fa..339a1428 100644 --- a/bots/controllers/generic/pmm_mister.py +++ b/bots/controllers/generic/pmm_mister.py @@ -77,10 +77,6 @@ class PMMisterConfig(ControllerConfigBase): # PnL reference: "position" = pnl/position_value, "portfolio" = pnl/total_amount_quote global_pnl_reference: str = Field(default="position", json_schema_extra={"is_updatable": True}) - # Limit chaser config for position closing (tight values for fast fills) - close_chaser_distance: Decimal = Field(default=Decimal("0.0001"), json_schema_extra={"is_updatable": True}) - close_chaser_refresh_threshold: Decimal = Field(default=Decimal("0.0005"), json_schema_extra={"is_updatable": True}) - @field_validator("take_profit", mode="before") @classmethod def validate_target(cls, v):