Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
12b87f9
(docs) add improvements backlog
cardosofede Jun 12, 2026
5c6f278
(perf) PERF-004: single-query multi-account portfolio history
cardosofede Jun 12, 2026
ba05ab7
(refactor) ARCH-010: remove dead AccountTradingInterface from account…
cardosofede Jun 12, 2026
1121ccb
(refactor) ARCH-013: extract shared cursor-pagination helper
cardosofede Jun 12, 2026
f3fb6a6
(refactor) ARCH-014: split create_executor into focused helpers
cardosofede Jun 12, 2026
d0bbd10
(fix) CORR-006: retain strong refs to recorder event tasks
cardosofede Jun 12, 2026
62f463f
(fix) CORR-009: await MQTT teardown in BotsOrchestrator.stop
cardosofede Jun 12, 2026
ead0511
(perf) PERF-002: one DB session per connector in order state sync
cardosofede Jun 12, 2026
1b6d76e
(fix) SEC-016: validate db_path containment in archived-bots endpoints
cardosofede Jun 12, 2026
f19451c
(refactor) READ-024: use logging instead of print in BotArchiver
cardosofede Jun 12, 2026
d7b5d10
(refactor) READ-025: drop redundant local 'import time as _time'
cardosofede Jun 12, 2026
62c393b
(docs) close improvements wave 1 (10 items)
cardosofede Jun 12, 2026
54ab032
(refactor) ARCH-011: remove dead trading methods from TradingService
cardosofede Jun 12, 2026
1f7ba52
(fix) SEC-018: warn loudly when default credentials are in use
cardosofede Jun 12, 2026
0d459f4
(perf) PERF-003: batch controller performance snapshots
cardosofede Jun 12, 2026
51f4ea4
(perf) PERF-005: demote order-event hot-path logging to debug
cardosofede Jun 12, 2026
2b08c4e
(docs) close improvements wave 2 (4 items)
cardosofede Jun 12, 2026
10ea186
(fix) SEC-017: reject path traversal in account/connector names
cardosofede Jun 12, 2026
c0080a0
(fix) SEC-019: configurable CORS origins instead of '*' with credentials
cardosofede Jun 12, 2026
84c6733
(docs) close improvements wave 3 (SEC-017, SEC-019)
cardosofede Jun 12, 2026
833d888
(fix) SEC-020: gate debug_mode auth bypass to non-production environm…
cardosofede Jun 12, 2026
8b6c29c
(docs) close improvements wave 4 (SEC-020, READ-021)
cardosofede Jun 12, 2026
0421b9d
(refactor) READ-022: dedupe cached-price fallback in AccountsService
cardosofede Jun 12, 2026
5dc3633
(fix) CORR-008: make _last_known_prices per-instance state
cardosofede Jun 12, 2026
3a92452
(refactor) ARCH-015: shared balance-entry and gateway-guard helpers
cardosofede Jun 12, 2026
585a804
(fix) CORR-007: snapshot accounts_state before iterating
cardosofede Jun 12, 2026
e2a7c8e
(refactor) READ-023: use typing.Any instead of builtin any in hints
cardosofede Jun 12, 2026
2c65cdf
(perf) PERF-001: commit account snapshots once per dump
cardosofede Jun 12, 2026
9c0db71
(docs) close improvements serial wave (6 accounts_service items)
cardosofede Jun 12, 2026
f4764bb
(refactor) ARCH-012: split AccountsService god-class along its seams
cardosofede Jun 12, 2026
cef5e75
(docs) close ARCH-012 — improvements backlog complete (25/25)
cardosofede Jun 12, 2026
49d9f78
(feat) remove improvements
cardosofede Jun 16, 2026
76ec85e
(feat) update market data services
cardosofede Jun 16, 2026
8c2a51a
(feat) improve routers
cardosofede Jun 16, 2026
053952a
(fix) CORR-032: claim executor atomically to prevent double-completio…
cardosofede Jun 16, 2026
7595a26
(fix) CORR-033: preserve persisted exchange_order_id on order completion
cardosofede Jun 16, 2026
2a2b16d
(fix) CORR-034: use get_session_context for funding payment writes
cardosofede Jun 16, 2026
a3e93b3
(perf) PERF-026: aggregate get_orders_summary with COUNT GROUP BY
cardosofede Jun 16, 2026
fef7332
(refactor) ARCH-040: dedupe token_state to dict mapping in AccountRep…
cardosofede Jun 16, 2026
8b1a14e
(refactor) ARCH-041: add get_position_by_id to GatewayCLMMRepository
cardosofede Jun 16, 2026
3dd3f73
(refactor) ARCH-039: remove dead TradeRepository.get_trades
cardosofede Jun 16, 2026
8495317
(docs) READ-047: remove obsolete comments in bots_orchestrator
cardosofede Jun 16, 2026
919c949
(fix) SEC-044: validate deployment names against path traversal
cardosofede Jun 16, 2026
ee3419a
(perf) PERF-030: fetch gateway chain configs concurrently
cardosofede Jun 16, 2026
8bce907
(perf) PERF-028: sync orders to DB concurrently across connectors
cardosofede Jun 16, 2026
3d0ae72
(refactor) ARCH-038: import ExecutorRepository once at module level
cardosofede Jun 16, 2026
3450b9b
(perf) PERF-029: reconcile_active_orders uses one DB session per conn…
cardosofede Jun 16, 2026
ec2d5d8
(perf) PERF-031: paginate account state history by distinct timestamps
cardosofede Jun 16, 2026
b73ade0
(refactor) ARCH-035: move bot stop/archive DB logic into BotsOrchestr…
cardosofede Jun 16, 2026
b0847b7
(perf) PERF-027: avoid json round-trip in _format_executor_info
cardosofede Jun 16, 2026
6f851a4
(refactor) ARCH-042: add public refresh_connector_state, drop cross-s…
cardosofede Jun 16, 2026
326df6a
(fix) SEC-045: preserve existing credentials on failed update (backup…
cardosofede Jun 16, 2026
bce4975
(refactor) ARCH-036: inject shared db_manager into AccountsService an…
cardosofede Jun 16, 2026
6a30fd3
(refactor) ARCH-037: constructor-inject sibling services into Account…
cardosofede Jun 16, 2026
939d0be
(refactor) ARCH-043: extract TradingHistoryService from AccountsService
cardosofede Jun 16, 2026
60437ca
(refactor) READ-046: remove unused imports across services and database
cardosofede Jun 16, 2026
1402e47
(feat) remove debug mode
cardosofede Jun 18, 2026
6f3298f
(feat) add default quote for lighter
cardosofede Jun 19, 2026
ce8e1ea
(feat) fix filtering of connectors
cardosofede Jun 19, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -179,3 +179,4 @@ bots/conf/
# IDE files
.vscode/
.idea/
improvements
84 changes: 76 additions & 8 deletions config.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
from typing import List

from pydantic import Field
Expand Down Expand Up @@ -58,19 +59,60 @@ class MarketDataSettings(BaseSettings):
model_config = SettingsConfigDict(env_prefix="MARKET_DATA_", extra="ignore")


class SecuritySettings(BaseSettings):
"""Security and authentication configuration."""
# Insecure default credential values (SEC-018), mapped to the environment variables that override them.
# They are kept only for local development convenience and MUST be overridden in production deployments.
_INSECURE_SECURITY_DEFAULTS = {
"USERNAME": "admin",
"PASSWORD": "admin",
"CONFIG_PASSWORD": "a",
}


username: str = Field(default="admin", description="API basic auth username")
password: str = Field(default="admin", description="API basic auth password")
debug_mode: bool = Field(default=False, description="Enable debug mode (disables auth)")
config_password: str = Field(default="a", description="Bot configuration encryption password")
class SecuritySettings(BaseSettings):
"""Security and authentication configuration.

All fields are read from environment variables without a prefix (or from .env):
- USERNAME: API basic auth username (default "admin" — local development only, never use in production)
- PASSWORD: API basic auth password (default "admin" — local development only, never use in production)
- CONFIG_PASSWORD: password used to encrypt ALL connector credentials (default "a" — local development only,
never use in production)
"""

username: str = Field(default="admin", description="API basic auth username (override via USERNAME in production)")
password: str = Field(default="admin", description="API basic auth password (override via PASSWORD in production)")
config_password: str = Field(
default="a",
description="Bot configuration encryption password (override via CONFIG_PASSWORD in production)"
)

model_config = SettingsConfigDict(
env_prefix="",
extra="ignore" # Ignore extra environment variables
)

def insecure_defaults_in_use(self) -> List[str]:
"""Return the env var names of security settings still set to their insecure default values."""
current_values = {"USERNAME": self.username, "PASSWORD": self.password, "CONFIG_PASSWORD": self.config_password}
return [name for name, default in _INSECURE_SECURITY_DEFAULTS.items() if current_values[name] == default]


def warn_if_insecure_security_defaults(security: SecuritySettings) -> List[str]:
"""Emit a high-severity log if any security setting still uses its insecure default value (SEC-018).

Returns the list of env var names that are still at their defaults (empty list when fully configured).
"""
insecure = security.insecure_defaults_in_use()
if insecure:
logging.critical(
"SECURITY WARNING: insecure default credentials in use for: %s. "
"Anyone who can reach this API can authenticate with the default basic auth credentials, and all "
"connector credentials are encrypted with a trivially guessable password. "
"Set the USERNAME, PASSWORD and CONFIG_PASSWORD environment variables (e.g. in .env) before deploying "
"to production. Do NOT run a production deployment with these defaults.",
", ".join(insecure),
)
return insecure


class AWSSettings(BaseSettings):
"""AWS configuration for S3 archiving."""
Expand All @@ -93,6 +135,33 @@ class GatewaySettings(BaseSettings):
model_config = SettingsConfigDict(env_prefix="GATEWAY_", extra="ignore")


class CORSSettings(BaseSettings):
"""CORS configuration for the API (SEC-019).

A wildcard origin ("*") must never be combined with allow_credentials=True: browsers reject that
combination per the CORS spec, and Starlette works around it by reflecting any Origin, which lets
arbitrary third-party pages call the API from an authenticated operator's browser. Origins are
therefore restricted by default and configurable via environment variables:
- CORS_ALLOW_ORIGINS: JSON list of explicit trusted origins, e.g. '["https://dashboard.example.com"]'
- CORS_ALLOW_ORIGIN_REGEX: regex for trusted origins (defaults to localhost-only for local development;
set to an empty string to disable regex matching entirely)
"""

allow_origins: List[str] = Field(
default=[],
description='Explicit list of trusted CORS origins, e.g. CORS_ALLOW_ORIGINS=\'["https://dashboard.example.com"]\''
)
allow_origin_regex: str = Field(
default=r"https?://(localhost|127\.0\.0\.1)(:\d+)?",
description="Regex matching trusted CORS origins; defaults to localhost-only. Empty string disables regex matching."
)
allow_credentials: bool = Field(default=True, description="Allow credentialed (cookies/auth) cross-origin requests")
allow_methods: List[str] = Field(default=["*"], description="HTTP methods allowed for cross-origin requests")
allow_headers: List[str] = Field(default=["*"], description="HTTP headers allowed for cross-origin requests")

model_config = SettingsConfigDict(env_prefix="CORS_", extra="ignore")


class AppSettings(BaseSettings):
"""Main application settings."""

Expand Down Expand Up @@ -130,6 +199,7 @@ class Settings(BaseSettings):
security: SecuritySettings = Field(default_factory=SecuritySettings)
aws: AWSSettings = Field(default_factory=AWSSettings)
gateway: GatewaySettings = Field(default_factory=GatewaySettings)
cors: CORSSettings = Field(default_factory=CORSSettings)
app: AppSettings = Field(default_factory=AppSettings)

# Direct banned_tokens field to handle env parsing
Expand All @@ -145,6 +215,4 @@ class Settings(BaseSettings):
extra="ignore"
)


# Create global settings instance
settings = Settings()
2 changes: 2 additions & 0 deletions database/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
AccountRepository,
BotRunRepository,
ControllerPerformanceRepository,
ExecutorRepository,
FundingRepository,
GatewayCLMMRepository,
GatewaySwapRepository,
Expand All @@ -30,6 +31,7 @@
"ControllerPerformanceSnapshot",
"Base", "AsyncDatabaseManager",
"AccountRepository", "BotRunRepository", "ControllerPerformanceRepository",
"ExecutorRepository",
"OrderRepository", "TradeRepository", "FundingRepository",
"GatewaySwapRepository", "GatewayCLMMRepository"
]
164 changes: 85 additions & 79 deletions database/repositories/account_repository.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
from datetime import datetime, timedelta
from datetime import datetime
from decimal import Decimal
from typing import Dict, List, Optional, Tuple
import base64
import json

from sqlalchemy import desc, select, func
from sqlalchemy import desc, func, select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import joinedload

Expand All @@ -15,6 +13,17 @@ class AccountRepository:
def __init__(self, session: AsyncSession):
self.session = session

@staticmethod
def _token_state_to_dict(token_state: TokenState) -> Dict:
"""Serialize a TokenState into the standard token info dict with float casts."""
return {
"token": token_state.token,
"units": float(token_state.units),
"price": float(token_state.price),
"value": float(token_state.value),
"available_units": float(token_state.available_units)
}

@staticmethod
def _interval_to_minutes(interval: str) -> int:
"""Convert interval string to minutes."""
Expand Down Expand Up @@ -63,11 +72,16 @@ def _sample_history_by_interval(history: List[Dict], interval_minutes: int) -> L

return sampled

async def save_account_state(self, account_name: str, connector_name: str, tokens_info: List[Dict],
async def save_account_state(self, account_name: str, connector_name: str, tokens_info: List[Dict],
snapshot_timestamp: Optional[datetime] = None) -> AccountState:
"""
Save account state with token information to the database.
If snapshot_timestamp is provided, use it instead of server default.

Note: this method does NOT commit; it only flushes to obtain the AccountState id.
The caller's session context owns the transaction and commits once
(e.g. get_session_context commits on successful exit), so a snapshot spanning
multiple accounts/connectors persists atomically in a single transaction.
"""
account_state_data = {
"account_name": account_name,
Expand All @@ -93,8 +107,7 @@ async def save_account_state(self, account_name: str, connector_name: str, token
available_units=Decimal(str(token_info["available_units"]))
)
self.session.add(token_state)

await self.session.commit()

return account_state

async def get_latest_account_states(self) -> Dict[str, Dict[str, List[Dict]]]:
Expand Down Expand Up @@ -133,23 +146,16 @@ async def get_latest_account_states(self) -> Dict[str, Dict[str, List[Dict]]]:
if account_state.account_name not in accounts_state:
accounts_state[account_state.account_name] = {}

token_info = []
for token_state in account_state.token_states:
token_info.append({
"token": token_state.token,
"units": float(token_state.units),
"price": float(token_state.price),
"value": float(token_state.value),
"available_units": float(token_state.available_units)
})

token_info = [self._token_state_to_dict(token_state) for token_state in account_state.token_states]

accounts_state[account_state.account_name][account_state.connector_name] = token_info

return accounts_state

async def get_account_state_history(self,
limit: Optional[int] = None,
account_name: Optional[str] = None,
account_names: Optional[List[str]] = None,
connector_name: Optional[str] = None,
cursor: Optional[str] = None,
start_time: Optional[datetime] = None,
Expand All @@ -160,7 +166,8 @@ async def get_account_state_history(self,

Args:
limit: Maximum number of records to return
account_name: Filter by account name
account_name: Filter by a single account name
account_names: Filter by multiple account names (IN filter)
connector_name: Filter by connector name
cursor: Cursor for pagination
start_time: Start time filter
Expand All @@ -171,52 +178,67 @@ async def get_account_state_history(self,
Tuple of (data, next_cursor, has_more)
"""
interval_minutes = self._interval_to_minutes(interval)
query = (
select(AccountState)
.options(joinedload(AccountState.token_states))
.order_by(desc(AccountState.timestamp))
)

# Apply filters
if account_name:
query = query.filter(AccountState.account_name == account_name)
if connector_name:
query = query.filter(AccountState.connector_name == connector_name)
if start_time:
query = query.filter(AccountState.timestamp >= start_time)
if end_time:
query = query.filter(AccountState.timestamp <= end_time)

# Handle cursor-based pagination
if cursor:
try:
cursor_time = datetime.fromisoformat(cursor.replace('Z', '+00:00'))
query = query.filter(AccountState.timestamp < cursor_time)
except (ValueError, TypeError):
# Invalid cursor, ignore it
pass

# Fetch more records than requested to ensure we have enough after sampling
# For intervals > 5m, we need to fetch more data to get enough sampled points

# Minute bucket expression: a single logical snapshot fans out into one row per
# (account_name, connector_name) but all share the same minute. Paginate by these
# distinct minute buckets so the limit/cursor are independent of the account/connector
# fan-out (a row-based limit would collapse N*M rows into far fewer buckets than `limit`).
minute_bucket = func.date_trunc("minute", AccountState.timestamp)

def _apply_filters(stmt):
if account_name:
stmt = stmt.filter(AccountState.account_name == account_name)
if account_names:
stmt = stmt.filter(AccountState.account_name.in_(account_names))
if connector_name:
stmt = stmt.filter(AccountState.connector_name == connector_name)
if start_time:
stmt = stmt.filter(AccountState.timestamp >= start_time)
if end_time:
stmt = stmt.filter(AccountState.timestamp <= end_time)
# Handle cursor-based pagination: the cursor is a minute-bucket timestamp, so
# everything strictly before it excludes all already-returned buckets.
if cursor:
try:
cursor_time = datetime.fromisoformat(cursor.replace('Z', '+00:00'))
stmt = stmt.filter(AccountState.timestamp < cursor_time)
except (ValueError, TypeError):
# Invalid cursor, ignore it
pass
return stmt

# Step 1: select the distinct minute buckets that match the filters, most recent first.
# For intervals > 5m we widen the window so sampling still has enough buckets to pick from.
sampling_multiplier = max(1, interval_minutes // 5) # How many 5m intervals per sample
fetch_limit = (limit * sampling_multiplier + 1) if limit else (100 * sampling_multiplier + 1)
query = query.limit(fetch_limit)

result = await self.session.execute(query)
account_states = result.unique().scalars().all()
timestamps_query = (
select(minute_bucket.label("minute"))
.distinct()
.order_by(desc(minute_bucket))
.limit(fetch_limit)
)
timestamps_query = _apply_filters(timestamps_query)
timestamps_result = await self.session.execute(timestamps_query)
selected_minutes = [row.minute for row in timestamps_result.all()]

# Step 2: fetch the AccountState (+token) rows only for the selected minute buckets.
if selected_minutes:
query = (
select(AccountState)
.options(joinedload(AccountState.token_states))
.filter(minute_bucket.in_(selected_minutes))
.order_by(desc(AccountState.timestamp))
)
query = _apply_filters(query)
result = await self.session.execute(query)
account_states = result.unique().scalars().all()
else:
account_states = []

# Format response - Group by minute to aggregate account/connector states
minute_groups = {}
for account_state in account_states:
token_info = []
for token_state in account_state.token_states:
token_info.append({
"token": token_state.token,
"units": float(token_state.units),
"price": float(token_state.price),
"value": float(token_state.value),
"available_units": float(token_state.available_units)
})
token_info = [self._token_state_to_dict(token_state) for token_state in account_state.token_states]

# Round timestamp to the nearest minute for grouping
minute_timestamp = account_state.timestamp.replace(second=0, microsecond=0)
Expand All @@ -235,9 +257,9 @@ async def get_account_state_history(self,

minute_groups[minute_key]["state"][account_state.account_name][account_state.connector_name] = token_info

# Convert to list and maintain chronological order (most recent first)
# Already ordered most-recent-first: Step 2 fetched rows ordered by descending
# timestamp and minute truncation is monotonic, so dict insertion order is descending.
history = list(minute_groups.values())
history.sort(key=lambda x: x["timestamp"], reverse=True)

# Apply interval sampling
sampled_history = self._sample_history_by_interval(history, interval_minutes)
Expand Down Expand Up @@ -284,15 +306,7 @@ async def get_account_current_state(self, account_name: str) -> Dict[str, List[D

state = {}
for account_state in account_states:
token_info = []
for token_state in account_state.token_states:
token_info.append({
"token": token_state.token,
"units": float(token_state.units),
"price": float(token_state.price),
"value": float(token_state.value),
"available_units": float(token_state.available_units)
})
token_info = [self._token_state_to_dict(token_state) for token_state in account_state.token_states]
state[account_state.connector_name] = token_info

return state
Expand All @@ -318,16 +332,8 @@ async def get_connector_current_state(self, account_name: str, connector_name: s
if not account_state:
return []

token_info = []
for token_state in account_state.token_states:
token_info.append({
"token": token_state.token,
"units": float(token_state.units),
"price": float(token_state.price),
"value": float(token_state.value),
"available_units": float(token_state.available_units)
})

token_info = [self._token_state_to_dict(token_state) for token_state in account_state.token_states]

return token_info

async def get_all_unique_tokens(self) -> List[str]:
Expand Down
Loading
Loading