diff --git a/backend/app/api/__init__.py b/backend/app/api/__init__.py index 7c28f22..02fc736 100644 --- a/backend/app/api/__init__.py +++ b/backend/app/api/__init__.py @@ -16,6 +16,7 @@ model_files, models, ollama, + semantic_router, storage, system, workers, @@ -58,3 +59,6 @@ # Headscale VPN api_router.include_router(headscale.router, prefix="/headscale", tags=["headscale"]) + +# Semantic Router +api_router.include_router(semantic_router.router) diff --git a/backend/app/api/api_keys.py b/backend/app/api/api_keys.py index 0841785..64f4cc7 100644 --- a/backend/app/api/api_keys.py +++ b/backend/app/api/api_keys.py @@ -265,3 +265,114 @@ async def get_all_api_keys_stats( ), "per_key_stats": per_key_stats, } + + +@router.get("/stats/by-model") +async def get_stats_by_model( + db: AsyncSession = Depends(get_db), + current_user: User = Depends(require_viewer), +): + """Get usage statistics grouped by model (requires viewer+)""" + from datetime import timedelta + + from sqlalchemy.orm import selectinload + + from app.models.api_key import Usage + from app.models.deployment import Deployment + from app.models.llm_model import LLMModel + + thirty_days_ago = datetime.utcnow() - timedelta(days=30) + + # Get per-model stats + model_result = await db.execute( + select( + Usage.model_id, + func.sum(Usage.request_count).label("requests"), + func.sum(Usage.prompt_tokens).label("prompt_tokens"), + func.sum(Usage.completion_tokens).label("completion_tokens"), + ) + .where(Usage.date >= thirty_days_ago) + .where(Usage.model_id.isnot(None)) + .group_by(Usage.model_id) + ) + + # Get model info + models_result = await db.execute(select(LLMModel)) + models_map = {m.id: m for m in models_result.scalars().all()} + + # Get running deployments to know which models are active + deployments_result = await db.execute( + select(Deployment) + .options(selectinload(Deployment.model)) + .where(Deployment.status == "running") + ) + running_deployments = deployments_result.scalars().all() + running_model_ids = {d.model_id for d in running_deployments} + + per_model_stats = [] + for row in model_result: + model = models_map.get(row.model_id) + if model: + per_model_stats.append( + { + "model_id": row.model_id, + "model_name": model.name, + "model_source": model.source, + "requests": row.requests or 0, + "prompt_tokens": row.prompt_tokens or 0, + "completion_tokens": row.completion_tokens or 0, + "total_tokens": (row.prompt_tokens or 0) + (row.completion_tokens or 0), + "is_running": row.model_id in running_model_ids, + } + ) + + # Add models with no usage but are running + for deployment in running_deployments: + if deployment.model_id not in {s["model_id"] for s in per_model_stats}: + model = deployment.model + if model: + per_model_stats.append( + { + "model_id": model.id, + "model_name": model.name, + "model_source": model.source, + "requests": 0, + "prompt_tokens": 0, + "completion_tokens": 0, + "total_tokens": 0, + "is_running": True, + } + ) + + # Get stats for MoM (Semantic Router) - model_id is NULL + mom_result = await db.execute( + select( + func.sum(Usage.request_count).label("requests"), + func.sum(Usage.prompt_tokens).label("prompt_tokens"), + func.sum(Usage.completion_tokens).label("completion_tokens"), + ) + .where(Usage.date >= thirty_days_ago) + .where(Usage.model_id.is_(None)) + ) + mom_row = mom_result.first() + + mom_stats = None + if mom_row and (mom_row.requests or 0) > 0: + mom_stats = { + "model_id": None, + "model_name": "MoM (Semantic Router)", + "model_source": "semantic-router", + "requests": mom_row.requests or 0, + "prompt_tokens": mom_row.prompt_tokens or 0, + "completion_tokens": mom_row.completion_tokens or 0, + "total_tokens": (mom_row.prompt_tokens or 0) + (mom_row.completion_tokens or 0), + "is_running": True, # Check from Semantic Router status + } + + # Sort by total tokens descending + per_model_stats.sort(key=lambda x: x["total_tokens"], reverse=True) + + return { + "models": per_model_stats, + "mom_stats": mom_stats, + } diff --git a/backend/app/api/apps/deployment.py b/backend/app/api/apps/deployment.py index 4002c5d..261d920 100644 --- a/backend/app/api/apps/deployment.py +++ b/backend/app/api/apps/deployment.py @@ -4,6 +4,7 @@ - Image pulling with progress tracking - Container creation and health checking - Nginx proxy setup +- Semantic Router config generation """ import asyncio @@ -16,6 +17,7 @@ from app.models.app import App, AppStatus, AppType from app.models.worker import Worker from app.services.app_proxy_manager import get_proxy_manager +from app.services.semantic_router import semantic_router_service logger = logging.getLogger(__name__) @@ -277,14 +279,18 @@ async def _verify_http_access( app_url: str, app_id: int, ) -> bool: - """Verify app is accessible via HTTP.""" + """Verify app is accessible via HTTP. + + Returns True if the app responds to HTTP requests (any status code). + A 500 error still means the app is running and accepting connections, + just that it may be initializing or the endpoint doesn't exist. + """ try: http_check = await client.get(app_url, timeout=10.0) - if http_check.status_code < 500: - logger.info(f"App {app_id} HTTP check passed: {http_check.status_code}") - return True - else: - logger.warning(f"App {app_id} HTTP check failed: {http_check.status_code}") + # Any HTTP response (including 500) means the app is running + # Only connection errors should be treated as failures + logger.info(f"App {app_id} HTTP check passed: {http_check.status_code}") + return True except Exception as http_err: logger.warning(f"App {app_id} HTTP check error: {http_err}") return False @@ -305,6 +311,7 @@ async def deploy_app_background( app_def: dict, lmstack_port: str, use_proxy: bool = True, + lmstack_host: str | None = None, ) -> None: """Background task to deploy an app. @@ -324,6 +331,7 @@ async def deploy_app_background( app_def: App definition from APP_DEFINITIONS lmstack_port: LMStack API port use_proxy: Whether to setup nginx proxy + lmstack_host: LMStack API host (for semantic router config) """ from app.database import async_session_maker @@ -362,12 +370,36 @@ async def deploy_app_background( await db.commit() return - # Phase 2: Create container + # Phase 2: Pre-deployment setup (e.g., config files) + if app_type == AppType.SEMANTIC_ROUTER: + set_deployment_progress(app_id, "starting", 0, "Generating config...") + try: + # Use lmstack_host parameter, or fallback to LMSTACK_BACKEND_URL env var + import os + + if lmstack_host: + lmstack_api_url = f"http://{lmstack_host}:{lmstack_port}" + else: + backend_url = os.environ.get("LMSTACK_BACKEND_URL") + if backend_url: + lmstack_api_url = backend_url.rstrip("/") + else: + # Last resort: use worker host (may not work from container) + lmstack_api_url = f"http://{worker_host}:{lmstack_port}" + logger.info(f"Semantic router will use LMStack API: {lmstack_api_url}") + # Get API key from app config + api_key = (app.config or {}).get("api_key") + await write_semantic_router_config(worker_address, lmstack_api_url, db, api_key) + except Exception as e: + logger.warning(f"Failed to write semantic router config: {e}") + # Continue anyway, config can be updated later + + # Phase 3: Create container app.status = AppStatus.STARTING.value app.status_message = "Starting container..." await db.commit() - set_deployment_progress(app_id, "starting", 0, "Creating container...") + set_deployment_progress(app_id, "starting", 10, "Creating container...") container_id = await _create_container( worker_address, app_id, app_type, app_def, env_vars, port @@ -397,7 +429,7 @@ async def deploy_app_background( # Phase 4: Setup proxy if use_proxy: - await _setup_nginx_proxy(app_id, app_type, worker_address, port) + await _setup_nginx_proxy(app_id, app_type, worker_address, port, app_def) else: logger.info(f"Proxy disabled for app {app_id}, using direct worker connection") @@ -409,6 +441,11 @@ async def deploy_app_background( set_deployment_progress(app_id, "running", 100, "App deployed successfully") logger.info(f"App {app_id} deployed successfully") + # Auto-deploy monitoring services for apps that support it + if app_def.get("has_monitoring"): + logger.info(f"Auto-deploying monitoring services for app {app_id}") + await _auto_deploy_monitoring(db, app, worker, port) + except Exception as e: logger.exception(f"Failed to deploy app {app_id}: {e}") try: @@ -452,17 +489,38 @@ async def _create_container( } ) + # Build port mappings + ports = [ + { + "container_port": app_def["internal_port"], + "host_port": port, + "protocol": "tcp", + } + ] + + # Add additional ports (e.g., dashboard port for semantic router) + additional_ports = app_def.get("additional_ports", []) + for i, additional_port_info in enumerate(additional_ports): + # Handle both old format (int) and new format (dict with container_port and name) + if isinstance(additional_port_info, dict): + container_port = additional_port_info["container_port"] + else: + container_port = additional_port_info + # Map additional ports starting from port + 1 + host_port = port + 1 + i + ports.append( + { + "container_port": container_port, + "host_port": host_port, + "protocol": "tcp", + } + ) + payload = { "name": container_name, "image": app_def["image"], "env": env_vars, - "ports": [ - { - "container_port": app_def["internal_port"], - "host_port": port, - "protocol": "tcp", - } - ], + "ports": ports, "volumes": volumes, "restart_policy": "unless-stopped", "labels": { @@ -474,6 +532,12 @@ async def _create_container( "extra_hosts": {"host.docker.internal": "host-gateway"}, } + # Add entrypoint/command if specified (e.g., for semantic router config symlink) + if app_def.get("entrypoint"): + payload["entrypoint"] = app_def["entrypoint"] + if app_def.get("command"): + payload["command"] = app_def["command"] + # Add Linux capabilities if specified (e.g., SYS_ADMIN for AnythingLLM) if app_def.get("cap_add"): payload["cap_add"] = app_def["cap_add"] @@ -508,13 +572,16 @@ async def _setup_nginx_proxy( app_type: AppType, worker_address: str, port: int, + app_def: dict | None = None, ) -> None: - """Setup nginx proxy for app.""" + """Setup nginx proxy for app and its additional ports.""" set_deployment_progress(app_id, "starting", 95, "Setting up proxy...") try: proxy_manager = get_proxy_manager() proxy_worker_host = worker_address.split(":")[0] + + # Setup main port proxy await proxy_manager.add_app_proxy( app_id=app_id, app_type=app_type.value, @@ -522,7 +589,239 @@ async def _setup_nginx_proxy( worker_host=proxy_worker_host, worker_port=port, ) - logger.info(f"Nginx proxy configured for app {app_id}") + logger.info(f"Nginx proxy configured for app {app_id} main port {port}") + + # Setup additional port proxies (e.g., dashboard for semantic router) + if app_def: + additional_ports = app_def.get("additional_ports", []) + for i, port_info in enumerate(additional_ports): + if isinstance(port_info, dict): + port_name = port_info.get("name", f"port{i+1}") + else: + port_name = f"port{i+1}" + + host_port = port + 1 + i + await proxy_manager.add_app_proxy( + app_id=app_id * 1000 + i + 1, # Unique ID for additional port + app_type=f"{app_type.value}-{port_name.lower()}", + listen_port=host_port, + worker_host=proxy_worker_host, + worker_port=host_port, + ) + logger.info(f"Nginx proxy configured for app {app_id} {port_name} port {host_port}") + except Exception as e: logger.warning(f"Failed to setup nginx proxy: {e}") # Continue anyway, user can access directly via worker IP + + +# ============================================================================= +# Semantic Router Config +# ============================================================================= + + +async def write_semantic_router_config( + worker_address: str, + lmstack_api_url: str, + db, + api_key: str | None = None, +) -> None: + """Write semantic router config.yaml to the worker volume. + + Args: + worker_address: Worker address (host:port) + lmstack_api_url: LMStack API URL for the semantic router to call + db: Database session + api_key: LMStack API key for authentication + """ + # Generate config with API key + config = await semantic_router_service.generate_config(db, lmstack_api_url, api_key) + config_yaml = semantic_router_service.config_to_yaml(config) + + # Write to worker volume + volume_name = "lmstack-app-semantic-router-semantic-router-config" + + async with httpx.AsyncClient(timeout=CONTAINER_ACTION_TIMEOUT) as client: + response = await client.post( + f"http://{worker_address}/storage/volumes/write-file", + json={ + "volume_name": volume_name, + "file_path": "config.yaml", + "content": config_yaml, + }, + ) + if response.status_code >= 400: + raise Exception(f"Failed to write config: {response.text}") + + logger.info(f"Wrote semantic router config to {volume_name}/config.yaml") + + +async def update_semantic_router_config_if_deployed(db) -> bool: + """Update semantic router config if it's deployed. + + This should be called when deployments change (add/remove models). + + Args: + db: Database session + + Returns: + True if config was updated, False if semantic router not deployed + """ + import os + + # Check if semantic router is deployed + app = await semantic_router_service.get_semantic_router_app(db) + if not app: + return False + + # Get worker address + result = await db.execute(select(Worker).where(Worker.id == app.worker_id)) + worker = result.scalar_one_or_none() + if not worker: + return False + + # Build LMStack API URL + # Priority: 1) stored lmstack_host, 2) LMSTACK_BACKEND_URL env var, 3) worker IP + app_config = app.config or {} + lmstack_host = app_config.get("lmstack_host") + + if lmstack_host: + lmstack_api_url = f"http://{lmstack_host}:52000" + else: + backend_url = os.environ.get("LMSTACK_BACKEND_URL") + if backend_url: + lmstack_api_url = backend_url.rstrip("/") + else: + # Fallback: use worker IP (may not work from container) + worker_host = worker.address.split(":")[0] + lmstack_api_url = f"http://{worker_host}:52000" + + logger.info(f"Updating semantic router config with LMStack API: {lmstack_api_url}") + + # Get API key from app config + api_key = app_config.get("api_key") + + try: + await write_semantic_router_config(worker.address, lmstack_api_url, db, api_key) + logger.info("Updated semantic router config with latest deployments") + + # Restart envoy to apply new config + if app.container_id: + await _restart_semantic_router_envoy(worker.address, app.container_id) + + return True + except Exception as e: + logger.error(f"Failed to update semantic router config: {e}") + return False + + +async def _restart_semantic_router_envoy(worker_address: str, container_id: str) -> None: + """Restart envoy process inside semantic router container to apply new config. + + Args: + worker_address: Worker address (host:port) + container_id: Semantic router container ID + """ + try: + async with httpx.AsyncClient(timeout=CONTAINER_ACTION_TIMEOUT) as client: + # Execute supervisorctl restart envoy inside the container + response = await client.post( + f"http://{worker_address}/containers/{container_id}/exec", + json={ + "command": ["supervisorctl", "restart", "envoy"], + }, + ) + if response.status_code >= 400: + logger.warning(f"Failed to restart envoy: {response.text}") + else: + logger.info("Restarted semantic router envoy to apply new config") + except Exception as e: + logger.warning(f"Failed to restart semantic router envoy: {e}") + # Don't raise - config was updated, envoy restart is best-effort + + +# ============================================================================= +# Auto-deploy Monitoring +# ============================================================================= + + +async def _auto_deploy_monitoring( + db, + parent_app: App, + worker: Worker, + parent_port: int, +) -> None: + """Auto-deploy monitoring services (Grafana, Prometheus, Jaeger) for apps that support it. + + This is called automatically after Semantic Router deployment completes. + Deploys services sequentially: Prometheus first (Grafana needs it), then Grafana, then Jaeger. + """ + from app.api.apps.monitoring import deploy_monitoring_background + from app.models.app import MONITORING_DEFINITIONS + + logger.info(f"Starting auto-deployment of monitoring services for app {parent_app.id}") + + # Find available ports starting from parent app's port + 10 + base_port = parent_port + 10 + result = await db.execute( + select(App.port).where(App.worker_id == worker.id, App.port.isnot(None)) + ) + used_ports = {row[0] for row in result.fetchall()} + + # Create monitoring app records + services_to_deploy = ["prometheus", "grafana", "jaeger"] + created_apps = [] + port = base_port + + for svc_type in services_to_deploy: + # Find next available port + while port in used_ports: + port += 1 + + svc_def = MONITORING_DEFINITIONS[svc_type] + svc_app = App( + app_type=svc_type, + name=f"{svc_def['name']} ({parent_app.name})", + worker_id=worker.id, + parent_app_id=parent_app.id, + status=AppStatus.PENDING.value, + proxy_path=f"/apps/{parent_app.app_type}/monitoring/{svc_type}", + port=port, + use_proxy=parent_app.use_proxy, + ) + db.add(svc_app) + await db.flush() + created_apps.append((svc_app, svc_def)) + used_ports.add(port) + port += 1 + + await db.commit() + + # Find prometheus port for Grafana configuration + prometheus_port = None + for svc_app, _ in created_apps: + if svc_app.app_type == "prometheus": + prometheus_port = svc_app.port + break + + # Deploy services sequentially (not in parallel) to ensure proper ordering + # Prometheus must be ready before Grafana tries to configure its datasource + for svc_app, svc_def in created_apps: + logger.info(f"Deploying monitoring service: {svc_app.app_type}") + try: + await deploy_monitoring_background( + app_id=svc_app.id, + parent_app_id=parent_app.id, + svc_type=svc_app.app_type, + worker_address=worker.address, + port=svc_app.port, + svc_def=svc_def, + use_proxy=svc_app.use_proxy, + parent_app_port=parent_port, + prometheus_port=prometheus_port, + ) + except Exception as e: + logger.error(f"Failed to deploy monitoring service {svc_app.app_type}: {e}") + # Continue with other services even if one fails + + logger.info(f"Completed auto-deployment of monitoring services for app {parent_app.id}") diff --git a/backend/app/api/apps/lifecycle.py b/backend/app/api/apps/lifecycle.py index 53ff99c..d41ef3e 100644 --- a/backend/app/api/apps/lifecycle.py +++ b/backend/app/api/apps/lifecycle.py @@ -70,6 +70,9 @@ async def stop_app( app.status = AppStatus.STOPPED.value await db.commit() + # Refresh the entire app object to ensure all attributes are loaded + await db.refresh(app) + await db.refresh(app, ["worker"]) except Exception as e: logger.exception(f"Failed to stop app: {e}") @@ -124,6 +127,9 @@ async def start_app( app.status = AppStatus.RUNNING.value app.status_message = None await db.commit() + # Refresh the entire app object to ensure all attributes are loaded + await db.refresh(app) + await db.refresh(app, ["worker"]) except Exception as e: logger.exception(f"Failed to start app: {e}") @@ -150,6 +156,34 @@ async def delete_app( await db.refresh(app, ["worker", "api_key"]) + # Delete child apps (monitoring services) first + child_result = await db.execute(select(App).where(App.parent_app_id == app_id)) + child_apps = child_result.scalars().all() + + for child in child_apps: + # Remove child container + if child.container_id and app.worker and app.worker.status == "online": + try: + await call_worker_api( + app.worker, + "DELETE", + f"/containers/{child.container_id}", + params={"force": True, "volumes": False}, + ) + except Exception as e: + logger.warning(f"Failed to remove child container {child.app_type}: {e}") + + # Remove child nginx proxy + if child.use_proxy: + try: + proxy_manager = get_proxy_manager() + await proxy_manager.remove_app_proxy(child.id) + except Exception as e: + logger.warning(f"Failed to remove child nginx proxy: {e}") + + # Delete child app record + await db.delete(child) + # Try to remove container if it exists if app.container_id and app.worker and app.worker.status == "online": try: diff --git a/backend/app/api/apps/monitoring.py b/backend/app/api/apps/monitoring.py new file mode 100644 index 0000000..d353834 --- /dev/null +++ b/backend/app/api/apps/monitoring.py @@ -0,0 +1,1095 @@ +"""API routes for app monitoring services. + +Manages Grafana, Prometheus, and Jaeger as sub-services of apps like Semantic Router. +""" + +import asyncio +import base64 +import logging + +import httpx +from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Request +from pydantic import BaseModel +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from app.api.apps.deployment import ( + pull_image_with_progress, + set_deployment_progress, + wait_for_container_healthy, +) +from app.api.apps.utils import CONTAINER_ACTION_TIMEOUT +from app.core.deps import require_operator, require_viewer +from app.database import get_db +from app.models.app import APP_DEFINITIONS, MONITORING_DEFINITIONS, App, AppStatus, AppType +from app.models.user import User +from app.models.worker import Worker +from app.services.app_proxy_manager import get_proxy_manager + +logger = logging.getLogger(__name__) +router = APIRouter() + + +# ============================================================================= +# Schemas +# ============================================================================= + + +class MonitoringServiceStatus(BaseModel): + """Status of a single monitoring service.""" + + name: str + type: str # grafana, prometheus, jaeger + status: str + port: int | None = None + url: str | None = None + + +class MonitoringStatus(BaseModel): + """Overall monitoring status for an app.""" + + enabled: bool + services: list[MonitoringServiceStatus] + + +class MonitoringDeployRequest(BaseModel): + """Request to deploy monitoring services.""" + + services: list[str] | None = None # ["grafana", "prometheus", "jaeger"], None = all + + +# ============================================================================= +# Routes +# ============================================================================= + + +@router.get("/{app_id}/monitoring", response_model=MonitoringStatus) +async def get_monitoring_status( + app_id: int, + request: Request, + db: AsyncSession = Depends(get_db), + current_user: User = Depends(require_viewer), +): + """Get monitoring status for an app.""" + # Get parent app + result = await db.execute(select(App).where(App.id == app_id)) + app = result.scalar_one_or_none() + if not app: + raise HTTPException(status_code=404, detail="App not found") + + # Check if app supports monitoring + try: + app_type = AppType(app.app_type) + app_def = APP_DEFINITIONS.get(app_type, {}) + if not app_def.get("has_monitoring"): + raise HTTPException(status_code=400, detail="This app does not support monitoring") + except ValueError: + raise HTTPException(status_code=400, detail="Invalid app type") + + # Get child monitoring apps + result = await db.execute(select(App).where(App.parent_app_id == app_id)) + child_apps = result.scalars().all() + + # Get worker for URL building + await db.refresh(app, ["worker"]) + worker_host = app.worker.address.split(":")[0] + + # Use browser hostname if available + host = request.headers.get("host", "").split(":")[0] + if not host or host in ("localhost", "127.0.0.1"): + host = worker_host + + services = [] + for child in child_apps: + url = ( + f"http://{host}:{child.port}" + if child.port and child.status == AppStatus.RUNNING.value + else None + ) + services.append( + MonitoringServiceStatus( + name=MONITORING_DEFINITIONS.get(child.app_type, {}).get("name", child.app_type), + type=child.app_type, + status=child.status, + port=child.port, + url=url, + ) + ) + + return MonitoringStatus( + enabled=len(services) > 0, + services=services, + ) + + +@router.post("/{app_id}/monitoring", response_model=MonitoringStatus) +async def deploy_monitoring( + app_id: int, + deploy_request: MonitoringDeployRequest, + request: Request, + background_tasks: BackgroundTasks, + db: AsyncSession = Depends(get_db), + current_user: User = Depends(require_operator), +): + """Deploy monitoring services for an app.""" + # Get parent app + result = await db.execute(select(App).where(App.id == app_id)) + app = result.scalar_one_or_none() + if not app: + raise HTTPException(status_code=404, detail="App not found") + + # Check if app supports monitoring + try: + app_type = AppType(app.app_type) + app_def = APP_DEFINITIONS.get(app_type, {}) + if not app_def.get("has_monitoring"): + raise HTTPException(status_code=400, detail="This app does not support monitoring") + except ValueError: + raise HTTPException(status_code=400, detail="Invalid app type") + + # Determine which services to deploy + services_to_deploy = deploy_request.services or list(MONITORING_DEFINITIONS.keys()) + + # Validate services + for svc in services_to_deploy: + if svc not in MONITORING_DEFINITIONS: + raise HTTPException(status_code=400, detail=f"Unknown monitoring service: {svc}") + + # Check for already deployed services + result = await db.execute(select(App).where(App.parent_app_id == app_id)) + existing = {a.app_type: a for a in result.scalars().all()} + + # Get worker + await db.refresh(app, ["worker"]) + worker = app.worker + + # Find available ports starting from parent app's port + 10 + base_port = (app.port or 9000) + 10 + result = await db.execute( + select(App.port).where(App.worker_id == worker.id, App.port.isnot(None)) + ) + used_ports = {row[0] for row in result.fetchall()} + + created_apps = [] + port = base_port + for svc_type in services_to_deploy: + if svc_type in existing: + # Already deployed, skip + created_apps.append(existing[svc_type]) + continue + + # Find next available port + while port in used_ports: + port += 1 + + svc_def = MONITORING_DEFINITIONS[svc_type] + svc_app = App( + app_type=svc_type, + name=f"{svc_def['name']} ({app.name})", + worker_id=worker.id, + parent_app_id=app_id, + status=AppStatus.PENDING.value, + proxy_path=f"/apps/{app.app_type}/monitoring/{svc_type}", + port=port, + use_proxy=app.use_proxy, + ) + db.add(svc_app) + await db.flush() + created_apps.append(svc_app) + used_ports.add(port) + port += 1 + + await db.commit() + + # Find prometheus port for Grafana configuration + prometheus_port = None + for svc_app in created_apps: + if svc_app.app_type == "prometheus": + prometheus_port = svc_app.port + break + + # Start background deployment for new services + # Deploy in order: prometheus first (Grafana needs it), then others + deploy_order = ["prometheus", "grafana", "jaeger"] + sorted_apps = sorted( + created_apps, + key=lambda a: deploy_order.index(a.app_type) if a.app_type in deploy_order else 99, + ) + + for svc_app in sorted_apps: + if svc_app.status == AppStatus.PENDING.value: + svc_def = MONITORING_DEFINITIONS[svc_app.app_type] + background_tasks.add_task( + deploy_monitoring_background, + app_id=svc_app.id, + parent_app_id=app_id, + svc_type=svc_app.app_type, + worker_address=worker.address, + port=svc_app.port, + svc_def=svc_def, + use_proxy=svc_app.use_proxy, + parent_app_port=app.port, + prometheus_port=prometheus_port, + ) + + # Return status + host = request.headers.get("host", "").split(":")[0] + worker_host = worker.address.split(":")[0] + if not host or host in ("localhost", "127.0.0.1"): + host = worker_host + + services = [] + for svc_app in created_apps: + await db.refresh(svc_app) + url = ( + f"http://{host}:{svc_app.port}" + if svc_app.port and svc_app.status == AppStatus.RUNNING.value + else None + ) + services.append( + MonitoringServiceStatus( + name=MONITORING_DEFINITIONS.get(svc_app.app_type, {}).get("name", svc_app.app_type), + type=svc_app.app_type, + status=svc_app.status, + port=svc_app.port, + url=url, + ) + ) + + return MonitoringStatus(enabled=True, services=services) + + +@router.delete("/{app_id}/monitoring") +async def remove_monitoring( + app_id: int, + db: AsyncSession = Depends(get_db), + current_user: User = Depends(require_operator), +): + """Remove all monitoring services for an app.""" + # Get parent app + result = await db.execute(select(App).where(App.id == app_id)) + app = result.scalar_one_or_none() + if not app: + raise HTTPException(status_code=404, detail="App not found") + + # Get child monitoring apps + result = await db.execute(select(App).where(App.parent_app_id == app_id)) + child_apps = result.scalars().all() + + if not child_apps: + return {"message": "No monitoring services to remove"} + + # Get worker + await db.refresh(app, ["worker"]) + worker = app.worker + + # Stop and remove containers + async with httpx.AsyncClient(timeout=CONTAINER_ACTION_TIMEOUT) as client: + for child in child_apps: + if child.container_id: + try: + # Stop container + await client.post( + f"http://{worker.address}/containers/{child.container_id}/stop" + ) + # Remove container + await client.delete(f"http://{worker.address}/containers/{child.container_id}") + except Exception as e: + logger.warning(f"Failed to remove container for {child.app_type}: {e}") + + # Remove nginx proxy + if child.use_proxy: + try: + proxy_manager = get_proxy_manager() + await proxy_manager.remove_app_proxy(child.id) + except Exception as e: + logger.warning(f"Failed to remove proxy for {child.app_type}: {e}") + + # Delete from database + await db.delete(child) + + await db.commit() + + return {"message": f"Removed {len(child_apps)} monitoring service(s)"} + + +@router.delete("/{app_id}/monitoring/{service_type}") +async def remove_monitoring_service( + app_id: int, + service_type: str, + db: AsyncSession = Depends(get_db), + current_user: User = Depends(require_operator), +): + """Remove a specific monitoring service.""" + # Get parent app + result = await db.execute(select(App).where(App.id == app_id)) + app = result.scalar_one_or_none() + if not app: + raise HTTPException(status_code=404, detail="App not found") + + # Get specific monitoring app + result = await db.execute( + select(App).where(App.parent_app_id == app_id, App.app_type == service_type) + ) + child = result.scalar_one_or_none() + if not child: + raise HTTPException(status_code=404, detail=f"Monitoring service {service_type} not found") + + # Get worker + await db.refresh(app, ["worker"]) + worker = app.worker + + # Stop and remove container + if child.container_id: + async with httpx.AsyncClient(timeout=CONTAINER_ACTION_TIMEOUT) as client: + try: + await client.post(f"http://{worker.address}/containers/{child.container_id}/stop") + await client.delete(f"http://{worker.address}/containers/{child.container_id}") + except Exception as e: + logger.warning(f"Failed to remove container for {service_type}: {e}") + + # Remove nginx proxy + if child.use_proxy: + try: + proxy_manager = get_proxy_manager() + await proxy_manager.remove_app_proxy(child.id) + except Exception as e: + logger.warning(f"Failed to remove proxy for {service_type}: {e}") + + # Delete from database + await db.delete(child) + await db.commit() + + return {"message": f"Removed {service_type} monitoring service"} + + +# ============================================================================= +# Background Tasks +# ============================================================================= + + +async def deploy_monitoring_background( + app_id: int, + parent_app_id: int, + svc_type: str, + worker_address: str, + port: int, + svc_def: dict, + use_proxy: bool, + parent_app_port: int | None = None, + prometheus_port: int | None = None, +) -> None: + """Background task to deploy a monitoring service. + + Args: + app_id: Monitoring service app ID + parent_app_id: Parent app (Semantic Router) ID + svc_type: Service type (grafana, prometheus, jaeger) + worker_address: Worker address + port: Host port for this service + svc_def: Service definition + use_proxy: Whether to setup nginx proxy + parent_app_port: Parent app's port (for calculating metrics port) + prometheus_port: Prometheus port (for Grafana datasource config) + """ + from app.database import async_session_maker + + async with async_session_maker() as db: + try: + # Get app + result = await db.execute(select(App).where(App.id == app_id)) + app = result.scalar_one_or_none() + if not app: + logger.error(f"Monitoring app {app_id} not found") + return + + # Get worker + result = await db.execute(select(Worker).where(Worker.id == app.worker_id)) + worker = result.scalar_one_or_none() + if not worker: + logger.error(f"Worker not found for monitoring app {app_id}") + app.status = AppStatus.ERROR.value + app.status_message = "Worker not found" + await db.commit() + return + + # Get parent app for config + parent_app = None + if parent_app_id: + result = await db.execute(select(App).where(App.id == parent_app_id)) + parent_app = result.scalar_one_or_none() + + # Phase 1: Pull image + app.status = AppStatus.PULLING.value + app.status_message = "Pulling image..." + await db.commit() + + try: + await pull_image_with_progress(worker, svc_def["image"], app_id) + except Exception as e: + app.status = AppStatus.ERROR.value + app.status_message = f"Failed to pull image: {e}" + await db.commit() + return + + # Phase 2: Setup configuration (Prometheus needs config file) + if svc_type == "prometheus" and parent_app: + set_deployment_progress(app_id, "starting", 5, "Creating Prometheus config...") + await _create_prometheus_config(worker_address, parent_app, port) + + # Phase 3: Create container + app.status = AppStatus.STARTING.value + app.status_message = "Starting container..." + await db.commit() + + set_deployment_progress(app_id, "starting", 10, "Creating container...") + + container_id = await _create_monitoring_container( + worker_address=worker_address, + app_id=app_id, + svc_type=svc_type, + svc_def=svc_def, + port=port, + parent_app=parent_app, + prometheus_port=prometheus_port, + ) + if not container_id: + return + + app.container_id = container_id + await db.commit() + + # Phase 4: Wait for health + set_deployment_progress(app_id, "starting", 50, "Waiting for service to start...") + + await wait_for_container_healthy( + worker_address=worker_address, + container_id=container_id, + app_id=app_id, + port=port, + ) + + # Phase 5: Post-deployment setup (Grafana needs datasource and dashboard) + if svc_type == "grafana" and prometheus_port: + set_deployment_progress(app_id, "starting", 80, "Configuring Grafana datasource...") + await _configure_grafana(worker_address, port, prometheus_port) + + # Phase 6: Setup proxy + if use_proxy: + await _setup_monitoring_proxy(app_id, svc_type, worker_address, port) + + # Mark as running FIRST (before checking if all monitoring services are ready) + app.status = AppStatus.RUNNING.value + app.status_message = None + await db.commit() + + set_deployment_progress(app_id, "running", 100, f"{svc_def['name']} deployed") + logger.info(f"Monitoring service {svc_type} deployed for app {parent_app_id}") + + # Update parent app's environment with monitoring URLs + # This must be called AFTER marking current service as running + await _update_parent_monitoring_urls(db, parent_app_id) + + except Exception as e: + logger.exception(f"Failed to deploy monitoring {svc_type}: {e}") + try: + result = await db.execute(select(App).where(App.id == app_id)) + app = result.scalar_one_or_none() + if app: + app.status = AppStatus.ERROR.value + app.status_message = str(e) + await db.commit() + except Exception as db_error: + logger.error(f"Failed to update monitoring error status: {db_error}") + + set_deployment_progress(app_id, "error", 0, str(e)) + + +async def _create_monitoring_container( + worker_address: str, + app_id: int, + svc_type: str, + svc_def: dict, + port: int, + parent_app: App | None = None, + prometheus_port: int | None = None, +) -> str | None: + """Create monitoring container on worker. + + Args: + worker_address: Worker address + app_id: Monitoring app ID + svc_type: Service type (grafana, prometheus, jaeger) + svc_def: Service definition from MONITORING_DEFINITIONS + port: Host port for this service + parent_app: Parent app (Semantic Router) for getting metrics endpoint + prometheus_port: Prometheus port (needed for Grafana datasource config) + """ + from app.database import async_session_maker + + container_name = f"lmstack-monitoring-{svc_type}" + worker_host = worker_address.split(":")[0] + + # Build volumes + volumes = [] + for vol in svc_def.get("volumes", []): + volumes.append( + { + "source": f"{container_name}-{vol['name']}", + "destination": vol["destination"], + "mode": "rw", + } + ) + + # Build port mappings + ports = [ + { + "container_port": svc_def["internal_port"], + "host_port": port, + "protocol": "tcp", + } + ] + + # Build env vars + env_vars = dict(svc_def.get("env_template", {})) + + # Service-specific configuration + command = None + + if svc_type == "prometheus" and parent_app: + # Prometheus needs to know where to scrape metrics from + # Semantic Router exposes metrics on port 9190 + parent_metrics_port = ( + parent_app.port + 2 if parent_app.port else 9192 + ) # Main port + 2 = metrics + + # For local workers, use host.docker.internal + # For remote workers, use the worker's actual address + if worker_host in ("localhost", "127.0.0.1"): + metrics_target = f"host.docker.internal:{parent_metrics_port}" + else: + metrics_target = f"{worker_host}:{parent_metrics_port}" + + # Prometheus config passed via command line args + # We use a minimal config that scrapes the semantic router + command = [ + "--config.file=/etc/prometheus/prometheus.yml", + "--storage.tsdb.path=/prometheus", + "--web.console.libraries=/usr/share/prometheus/console_libraries", + "--web.console.templates=/usr/share/prometheus/consoles", + "--web.enable-lifecycle", + ] + + # We'll need to create the config file via init container or bind mount + # For now, use file_configs volume approach - create config on worker + env_vars["ROUTER_TARGET"] = metrics_target + + elif svc_type == "grafana" and prometheus_port: + # Grafana needs to know where Prometheus is + if worker_host in ("localhost", "127.0.0.1"): + prometheus_url = f"http://host.docker.internal:{prometheus_port}" + else: + prometheus_url = f"http://{worker_host}:{prometheus_port}" + + # Use Grafana's environment-based datasource provisioning + env_vars["GF_DATASOURCES_DEFAULT_NAME"] = "Prometheus" + env_vars["GF_DATASOURCES_DEFAULT_TYPE"] = "prometheus" + env_vars["GF_DATASOURCES_DEFAULT_URL"] = prometheus_url + env_vars["GF_DATASOURCES_DEFAULT_ACCESS"] = "proxy" + env_vars["GF_DATASOURCES_DEFAULT_ISDEFAULT"] = "true" + + # CRITICAL: Set Grafana's root URL so redirects work correctly + # Without this, Grafana redirects to localhost:3000 which breaks iframe embedding + env_vars["GF_SERVER_ROOT_URL"] = f"http://{worker_host}:{port}" + # Also set serve_from_sub_path since we access via proxy + env_vars["GF_SERVER_SERVE_FROM_SUB_PATH"] = "false" + + payload = { + "name": container_name, + "image": svc_def["image"], + "env": env_vars, + "ports": ports, + "volumes": volumes, + "restart_policy": "unless-stopped", + "labels": { + "lmstack.monitoring": "true", + "lmstack.monitoring.type": svc_type, + "lmstack.monitoring.app_id": str(app_id), + }, + "extra_hosts": {"host.docker.internal": "host-gateway"}, + } + + if command: + payload["command"] = command + + try: + async with httpx.AsyncClient(timeout=CONTAINER_ACTION_TIMEOUT) as client: + response = await client.post( + f"http://{worker_address}/containers", + json=payload, + ) + if response.status_code >= 400: + raise Exception(f"Failed to create container: {response.text}") + + container_data = response.json() + return container_data.get("id") + + except Exception as e: + async with async_session_maker() as db: + result = await db.execute(select(App).where(App.id == app_id)) + app = result.scalar_one_or_none() + if app: + app.status = AppStatus.ERROR.value + app.status_message = f"Failed to create container: {e}" + await db.commit() + + set_deployment_progress(app_id, "error", 0, str(e)) + return None + + +async def _setup_monitoring_proxy( + app_id: int, + svc_type: str, + worker_address: str, + port: int, +) -> None: + """Setup nginx proxy for monitoring service.""" + set_deployment_progress(app_id, "starting", 95, "Setting up proxy...") + + try: + proxy_manager = get_proxy_manager() + proxy_worker_host = worker_address.split(":")[0] + + await proxy_manager.add_app_proxy( + app_id=app_id, + app_type=f"monitoring-{svc_type}", + listen_port=port, + worker_host=proxy_worker_host, + worker_port=port, + ) + logger.info(f"Nginx proxy configured for monitoring {svc_type} on port {port}") + + except Exception as e: + logger.warning(f"Failed to setup nginx proxy for monitoring: {e}") + + +async def _update_parent_monitoring_urls(db: AsyncSession, parent_app_id: int) -> None: + """Update parent app's environment with monitoring URLs. + + This updates the Semantic Router container with the URLs of deployed monitoring services. + When all monitoring services are running, it restarts the parent container with new env vars. + """ + # IMPORTANT: Use a fresh session to see commits from other parallel background tasks + # Each background task has its own isolated session, so we need a new one to get + # the current state of all monitoring services + from app.database import async_session_maker + + async with async_session_maker() as fresh_db: + # Get parent app + result = await fresh_db.execute(select(App).where(App.id == parent_app_id)) + parent_app = result.scalar_one_or_none() + if not parent_app: + return + + # Get all monitoring services + result = await fresh_db.execute(select(App).where(App.parent_app_id == parent_app_id)) + monitoring_apps = list(result.scalars().all()) + + # Build monitoring URLs + # These URLs are used by the Semantic Router DASHBOARD (runs in user's browser) + # to render iframes for Grafana/Jaeger. Since the browser can't resolve + # host.docker.internal, we must use the worker's external IP. + await fresh_db.refresh(parent_app, ["worker"]) + + # Use the worker's external IP for browser-accessible URLs + worker_host = parent_app.worker.address.split(":")[0] + + monitoring_urls = {} + all_running = True + for mon_app in monitoring_apps: + if mon_app.status == AppStatus.RUNNING.value and mon_app.port: + if mon_app.app_type == "grafana": + monitoring_urls["grafana_url"] = f"http://{worker_host}:{mon_app.port}" + elif mon_app.app_type == "prometheus": + monitoring_urls["prometheus_url"] = f"http://{worker_host}:{mon_app.port}" + elif mon_app.app_type == "jaeger": + monitoring_urls["jaeger_url"] = f"http://{worker_host}:{mon_app.port}" + elif mon_app.status not in (AppStatus.ERROR.value, AppStatus.STOPPED.value): + all_running = False + + # Store in parent app's config for reference + config = parent_app.config or {} + config["monitoring_urls"] = monitoring_urls + parent_app.config = config + await fresh_db.commit() + + logger.info(f"Updated monitoring URLs for app {parent_app_id}: {monitoring_urls}") + logger.info( + f"Restart check: all_running={all_running}, has_urls={bool(monitoring_urls)}, " + f"parent_status={parent_app.status}, services={[(m.app_type, m.status) for m in monitoring_apps]}" + ) + + # If all monitoring services are running, restart parent app to pick up new URLs + if all_running and monitoring_urls and parent_app.status == AppStatus.RUNNING.value: + logger.info( + f"All monitoring services running, restarting parent app {parent_app_id} to apply URLs" + ) + try: + await _restart_parent_with_monitoring_urls(fresh_db, parent_app, monitoring_urls) + except Exception as e: + logger.exception(f"Failed to restart parent app with monitoring URLs: {e}") + + +async def _restart_parent_with_monitoring_urls( + db: AsyncSession, + parent_app: App, + monitoring_urls: dict, +) -> None: + """Restart parent app container with monitoring URLs injected into environment. + + This stops the old container, creates a new one with updated env vars, and starts it. + """ + from app.api.apps.deployment import wait_for_container_healthy + from app.models.app import APP_DEFINITIONS, AppType + + if not parent_app.container_id: + return + + try: + app_type = AppType(parent_app.app_type) + app_def = APP_DEFINITIONS.get(app_type) + if not app_def: + return + except ValueError: + return + + await db.refresh(parent_app, ["worker"]) + worker = parent_app.worker + worker_address = worker.address + + logger.info(f"Restarting {parent_app.name} with monitoring URLs: {monitoring_urls}") + + async with httpx.AsyncClient(timeout=CONTAINER_ACTION_TIMEOUT) as client: + # Stop and remove old container + try: + await client.post(f"http://{worker_address}/containers/{parent_app.container_id}/stop") + await client.delete(f"http://{worker_address}/containers/{parent_app.container_id}") + except Exception as e: + logger.warning(f"Failed to stop/remove old container: {e}") + + # Build env vars from template, injecting monitoring URLs + new_env = {} + for key, value in app_def.get("env_template", {}).items(): + if value == "{grafana_url}": + new_env[key] = monitoring_urls.get("grafana_url", "") + elif value == "{prometheus_url}": + new_env[key] = monitoring_urls.get("prometheus_url", "") + elif value == "{jaeger_url}": + new_env[key] = monitoring_urls.get("jaeger_url", "") + elif value == "{hf_token}": + # Try to get HF_TOKEN from app config if stored, otherwise empty + app_config = parent_app.config or {} + new_env[key] = app_config.get("hf_token", "") + elif value.startswith("{") and value.endswith("}"): + # Other placeholders - use empty or defaults + new_env[key] = "" + else: + # Static values + new_env[key] = value + + # Rebuild container with same config but new env + container_name = f"lmstack-app-{app_type.value}" + + volumes = [] + for vol in app_def.get("volumes", []): + volumes.append( + { + "source": f"{container_name}-{vol['name']}", + "destination": vol["destination"], + "mode": "rw", + } + ) + + ports = [ + { + "container_port": app_def["internal_port"], + "host_port": parent_app.port, + "protocol": "tcp", + } + ] + + # Add additional ports + for i, port_info in enumerate(app_def.get("additional_ports", [])): + if isinstance(port_info, dict): + container_port = port_info["container_port"] + else: + container_port = port_info + ports.append( + { + "container_port": container_port, + "host_port": parent_app.port + 1 + i, + "protocol": "tcp", + } + ) + + payload = { + "name": container_name, + "image": app_def["image"], + "env": new_env, + "ports": ports, + "volumes": volumes, + "restart_policy": "unless-stopped", + "labels": { + "lmstack.app": "true", + "lmstack.app.type": app_type.value, + "lmstack.app.id": str(parent_app.id), + }, + "extra_hosts": {"host.docker.internal": "host-gateway"}, + } + + if app_def.get("entrypoint"): + payload["entrypoint"] = app_def["entrypoint"] + if app_def.get("command"): + payload["command"] = app_def["command"] + if app_def.get("cap_add"): + payload["cap_add"] = app_def["cap_add"] + + try: + resp = await client.post(f"http://{worker_address}/containers", json=payload) + if resp.status_code >= 400: + logger.error(f"Failed to create new container: {resp.text}") + return + + new_container_id = resp.json().get("id") + parent_app.container_id = new_container_id + await db.commit() + + logger.info(f"Created new container {new_container_id} with monitoring URLs") + + # Wait for container to be healthy + await wait_for_container_healthy( + worker_address=worker_address, + container_id=new_container_id, + app_id=parent_app.id, + port=parent_app.port, + ) + + logger.info(f"Parent app {parent_app.id} restarted successfully with monitoring URLs") + + except Exception as e: + logger.error(f"Failed to restart parent app: {e}") + + +async def _create_prometheus_config( + worker_address: str, + parent_app: App, + prometheus_port: int, +) -> None: + """Create Prometheus configuration file on worker. + + Uses a helper container to write the prometheus.yml config to the volume. + """ + # Calculate metrics port: Semantic Router main port + 2 = metrics port (9190 internally mapped) + # The router exposes :8888 (API), :8700 (Dashboard), and :9190 (metrics) + # We map them as: port, port+1, port+2 + metrics_port = parent_app.port + 2 if parent_app.port else 9192 + + # Prometheus runs in a container, so it must use host.docker.internal + # to access services on the same host (regardless of worker's external IP) + metrics_target = f"host.docker.internal:{metrics_port}" + + # Prometheus configuration YAML - using cat with heredoc to avoid quote issues + prometheus_config = f"""global: + scrape_interval: 15s + evaluation_interval: 15s + +scrape_configs: + - job_name: semantic-router + static_configs: + - targets: ["{metrics_target}"] + metrics_path: /metrics + scrape_interval: 5s +""" + + volume_name = "lmstack-monitoring-prometheus-prometheus-config" + + # Use a helper container to write the config file + # Using base64 encoding to avoid shell quoting issues + config_b64 = base64.b64encode(prometheus_config.encode()).decode() + + helper_payload = { + "name": "lmstack-prometheus-config-helper", + "image": "alpine:latest", + "command": [ + "sh", + "-c", + f"mkdir -p /etc/prometheus && echo '{config_b64}' | base64 -d > /etc/prometheus/prometheus.yml && cat /etc/prometheus/prometheus.yml", + ], + "volumes": [ + { + "source": volume_name, + "destination": "/etc/prometheus", + "mode": "rw", + } + ], + "restart_policy": "no", + "labels": {"lmstack.helper": "true"}, + } + + async with httpx.AsyncClient(timeout=CONTAINER_ACTION_TIMEOUT) as client: + try: + # Create and run helper container + resp = await client.post( + f"http://{worker_address}/containers", + json=helper_payload, + ) + if resp.status_code >= 400: + logger.warning(f"Failed to create prometheus config helper: {resp.text}") + return + + helper_id = resp.json().get("id") + logger.info(f"Created prometheus config with helper container {helper_id}") + + # Wait a moment for the container to finish + await asyncio.sleep(2) + + # Clean up helper container + try: + await client.delete( + f"http://{worker_address}/containers/{helper_id}", + params={"force": True}, + ) + except Exception: + pass + + except Exception as e: + logger.warning(f"Failed to create prometheus config: {e}") + + +async def _configure_grafana( + worker_address: str, + grafana_port: int, + prometheus_port: int, +) -> None: + """Configure Grafana datasource and dashboard via API. + + Grafana needs a moment to start, so we retry a few times. + """ + import json + from pathlib import Path + + worker_host = worker_address.split(":")[0] + + # Grafana runs in a container, so it must use host.docker.internal + # to access Prometheus on the same host + prometheus_url = f"http://host.docker.internal:{prometheus_port}" + + # For API calls from the backend, use the actual host + if worker_host in ("localhost", "127.0.0.1"): + grafana_url = f"http://localhost:{grafana_port}" + else: + grafana_url = f"http://{worker_host}:{grafana_port}" + + # Grafana datasource payload + datasource_payload = { + "name": "Prometheus", + "type": "prometheus", + "url": prometheus_url, + "access": "proxy", + "isDefault": True, + } + + datasource_uid = None + + # Try to add datasource (Grafana needs time to start) + max_retries = 5 + for i in range(max_retries): + try: + async with httpx.AsyncClient(timeout=30.0) as client: + # Check if Grafana is ready + health_resp = await client.get(f"{grafana_url}/api/health") + if health_resp.status_code != 200: + raise Exception("Grafana not ready") + + # Add datasource + ds_resp = await client.post( + f"{grafana_url}/api/datasources", + json=datasource_payload, + auth=("admin", "admin"), + ) + + if ds_resp.status_code == 200: + ds_data = ds_resp.json() + datasource_uid = ds_data.get("datasource", {}).get("uid") + logger.info(f"Grafana datasource created: uid={datasource_uid}") + break + elif ds_resp.status_code == 409: # Already exists + # Get existing datasource UID + get_ds_resp = await client.get( + f"{grafana_url}/api/datasources/name/Prometheus", + auth=("admin", "admin"), + ) + if get_ds_resp.status_code == 200: + datasource_uid = get_ds_resp.json().get("uid") + logger.info(f"Grafana datasource exists: uid={datasource_uid}") + break + else: + logger.warning(f"Failed to add Grafana datasource: {ds_resp.text}") + + except Exception as e: + logger.debug(f"Grafana not ready yet (attempt {i+1}/{max_retries}): {e}") + await asyncio.sleep(3) + + if not datasource_uid: + logger.warning("Failed to configure Grafana datasource after retries") + return + + # Import LLM Router dashboard + dashboard_path = Path( + "/home/rickychen/Desktop/llm/lmstack/semantic-router/deploy/docker-compose/addons/llm-router-dashboard.json" + ) + if not dashboard_path.exists(): + logger.warning(f"Dashboard file not found: {dashboard_path}") + return + + try: + dashboard_json = json.loads(dashboard_path.read_text()) + + # Replace datasource variable with actual UID + dashboard_str = json.dumps(dashboard_json) + dashboard_str = dashboard_str.replace("${DS_PROMETHEUS}", datasource_uid) + dashboard_str = dashboard_str.replace('"uid": "prometheus"', f'"uid": "{datasource_uid}"') + dashboard_json = json.loads(dashboard_str) + + # Remove id to create new dashboard + dashboard_json.pop("id", None) + dashboard_json["uid"] = "llm-router-metrics" + + # Import dashboard + import_payload = { + "dashboard": dashboard_json, + "overwrite": True, + "inputs": [ + { + "name": "DS_PROMETHEUS", + "type": "datasource", + "pluginId": "prometheus", + "value": datasource_uid, + } + ], + } + + async with httpx.AsyncClient(timeout=30.0) as client: + dash_resp = await client.post( + f"{grafana_url}/api/dashboards/import", + json=import_payload, + auth=("admin", "admin"), + ) + + if dash_resp.status_code == 200: + logger.info("LLM Router dashboard imported successfully") + else: + logger.warning(f"Failed to import dashboard: {dash_resp.text}") + + except Exception as e: + logger.exception(f"Failed to import Grafana dashboard: {e}") diff --git a/backend/app/api/apps/routes.py b/backend/app/api/apps/routes.py index f694a9f..9509e42 100644 --- a/backend/app/api/apps/routes.py +++ b/backend/app/api/apps/routes.py @@ -17,6 +17,7 @@ set_deployment_progress, ) from app.api.apps.lifecycle import router as lifecycle_router +from app.api.apps.monitoring import router as monitoring_router from app.api.apps.utils import ( API_KEY_PREFIX, app_to_response, @@ -48,6 +49,9 @@ # Include lifecycle routes (start/stop/delete/logs) router.include_router(lifecycle_router) +# Include monitoring routes (grafana/prometheus/jaeger for semantic router) +router.include_router(monitoring_router) + # ============================================================================= # List & Discovery Endpoints @@ -67,6 +71,7 @@ async def list_available_apps( name=definition["name"], description=definition["description"], image=definition["image"], + has_monitoring=definition.get("has_monitoring", False), ) ) return AvailableAppsResponse(items=items) @@ -80,12 +85,24 @@ async def list_apps( db: AsyncSession = Depends(get_db), current_user: User = Depends(require_viewer), ): - """List all deployed apps (requires viewer+).""" - # Count total - total = await db.scalar(select(func.count()).select_from(App)) + """List all deployed apps (requires viewer+). + + Note: Child apps (monitoring services) are filtered out - they appear + in their parent app's monitoring section instead. + """ + # Count total (excluding child apps) + total = await db.scalar( + select(func.count()).select_from(App).where(App.parent_app_id.is_(None)) + ) - # Get paginated results with worker relationship - result = await db.execute(select(App).offset(skip).limit(limit).order_by(App.created_at.desc())) + # Get paginated results, excluding child apps (monitoring services) + result = await db.execute( + select(App) + .where(App.parent_app_id.is_(None)) + .offset(skip) + .limit(limit) + .order_by(App.created_at.desc()) + ) apps = result.scalars().all() # Load worker relationships @@ -188,6 +205,18 @@ async def deploy_app( use_proxy = False logger.info(f"Auto-disabled proxy for localhost worker {worker.name}") + # Store deployment config for later use (e.g., when restarting with monitoring URLs) + app_config = {} + if deploy_request.hf_token: + app_config["hf_token"] = deploy_request.hf_token + + # Store API key in config for apps that need it (e.g., Semantic Router config generation) + if full_key: + app_config["api_key"] = full_key + + # Store LMStack host for apps that need to call back to API Gateway + app_config["lmstack_host"] = get_host_ip(request, worker) + # Create app record app = App( app_type=app_type.value, @@ -198,6 +227,7 @@ async def deploy_app( proxy_path=proxy_path, port=port, use_proxy=use_proxy, + config=app_config if app_config else None, ) db.add(app) await db.commit() @@ -211,6 +241,7 @@ async def deploy_app( full_key=full_key, port=port, db=db, + hf_token=deploy_request.hf_token, ) # Initialize progress @@ -219,6 +250,9 @@ async def deploy_app( # Always use backend API port (52000) lmstack_port = "52000" + # Get correct LMStack host for apps that need to call back to API Gateway + lmstack_host = get_host_ip(request, worker) + # Start background deployment background_tasks.add_task( deploy_app_background, @@ -231,6 +265,7 @@ async def deploy_app( app_def=app_def, lmstack_port=lmstack_port, use_proxy=use_proxy, + lmstack_host=lmstack_host, ) return app_to_response(app, request) @@ -273,11 +308,31 @@ async def _create_api_key_if_needed( async def _find_available_port(db: AsyncSession, worker_id: int) -> int: - """Find an available port on the worker.""" + """Find an available port on the worker. + + Also considers additional ports used by apps (e.g., Semantic Router uses + main_port, main_port+1 for dashboard, main_port+2 for metrics). + """ + from app.models.app import APP_DEFINITIONS, AppType + result = await db.execute( - select(App.port).where(App.worker_id == worker_id, App.port.isnot(None)) + select(App.port, App.app_type).where(App.worker_id == worker_id, App.port.isnot(None)) ) - used_ports = {row[0] for row in result.fetchall()} + + used_ports = set() + for row in result.fetchall(): + port, app_type = row + used_ports.add(port) + + # Add additional ports for apps that use them + try: + app_type_enum = AppType(app_type) + app_def = APP_DEFINITIONS.get(app_type_enum, {}) + additional_ports = app_def.get("additional_ports", []) + for i in range(len(additional_ports)): + used_ports.add(port + 1 + i) + except (ValueError, KeyError): + pass port = 9000 # Start from 9000 to avoid conflicts with dev servers while port in used_ports: @@ -293,6 +348,7 @@ async def _build_env_vars( full_key: str, port: int, db: AsyncSession, + hf_token: str | None = None, ) -> dict: """Build environment variables for the app container.""" # Always use backend API port (52000), not the frontend port from request @@ -337,6 +393,12 @@ async def _build_env_vars( env_vars[key] = app_secret_key elif value == "{model_list}": env_vars[key] = model_list + elif value == "{hf_token}": + # HuggingFace token - use provided token or empty string + env_vars[key] = hf_token or "" + elif value in ("{grafana_url}", "{prometheus_url}", "{jaeger_url}"): + # Optional monitoring URLs - leave empty if not configured + env_vars[key] = "" else: env_vars[key] = value diff --git a/backend/app/api/apps/utils.py b/backend/app/api/apps/utils.py index 36b52b2..50b220c 100644 --- a/backend/app/api/apps/utils.py +++ b/backend/app/api/apps/utils.py @@ -13,9 +13,9 @@ from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession -from app.models.app import App, AppStatus +from app.models.app import APP_DEFINITIONS, App, AppStatus, AppType from app.models.worker import Worker -from app.schemas.app import AppResponse +from app.schemas.app import AppPortInfo, AppResponse logger = logging.getLogger(__name__) @@ -202,18 +202,48 @@ def app_to_response(app: App, request: Request) -> AppResponse: if app.status == AppStatus.RUNNING.value and app.proxy_path: proxy_url = str(request.base_url).rstrip("/") + app.proxy_path + # Determine host for URL building + if app.use_proxy: + # Use LMStack host (nginx proxy) + url_host = request.headers.get("host", "localhost:52000").split(":")[0] + else: + # Direct connection to worker + url_host = worker_address.split(":")[0] if worker_address else None + # Build access URL based on proxy setting access_url = None - if app.status == AppStatus.RUNNING.value and app.port: - if app.use_proxy: - # Use LMStack host with app port (nginx proxy) - host = request.headers.get("host", "localhost:52000").split(":")[0] - access_url = f"http://{host}:{app.port}" - else: - # Direct connection to worker - if worker_address: - worker_host = worker_address.split(":")[0] - access_url = f"http://{worker_host}:{app.port}" + if app.status == AppStatus.RUNNING.value and app.port and url_host: + access_url = f"http://{url_host}:{app.port}" + + # Build additional URLs for apps with multiple ports + additional_urls = None + has_monitoring = False + try: + app_type = AppType(app.app_type) + app_def = APP_DEFINITIONS.get(app_type, {}) + has_monitoring = app_def.get("has_monitoring", False) + + if app.status == AppStatus.RUNNING.value and app.port and url_host: + additional_ports = app_def.get("additional_ports", []) + + if additional_ports: + additional_urls = [] + for i, port_info in enumerate(additional_ports): + if isinstance(port_info, dict): + port_name = port_info.get("name", f"Port {i + 1}") + else: + port_name = f"Port {i + 1}" + + host_port = app.port + 1 + i + additional_urls.append( + AppPortInfo( + name=port_name, + port=host_port, + url=f"http://{url_host}:{host_port}", + ) + ) + except (ValueError, KeyError): + pass # Invalid app type, skip additional URLs return AppResponse( id=app.id, @@ -230,7 +260,9 @@ def app_to_response(app: App, request: Request) -> AppResponse: proxy_url=proxy_url, use_proxy=app.use_proxy, access_url=access_url, + additional_urls=additional_urls, api_key_id=app.api_key_id, + has_monitoring=has_monitoring, created_at=app.created_at, updated_at=app.updated_at, ) diff --git a/backend/app/api/deployments.py b/backend/app/api/deployments.py index e974213..bec1a31 100644 --- a/backend/app/api/deployments.py +++ b/backend/app/api/deployments.py @@ -26,7 +26,7 @@ ModelSummary, WorkerSummary, ) -from app.services.deployer import DeployerService +from app.services.deployer import DeployerService, _update_semantic_router_config_background from app.services.gateway import gateway_service logger = logging.getLogger(__name__) @@ -279,6 +279,9 @@ async def delete_deployment( await db.delete(deployment) await db.commit() + # Update semantic router config to remove this model + background_tasks.add_task(_update_semantic_router_config_background) + @router.post("/{deployment_id}/stop", response_model=DeploymentResponse) async def stop_deployment( @@ -314,6 +317,9 @@ async def stop_deployment( await db.commit() await db.refresh(deployment) + # Update semantic router config to remove this model + background_tasks.add_task(_update_semantic_router_config_background) + return deployment_to_response(deployment) diff --git a/backend/app/api/gateway.py b/backend/app/api/gateway.py index c814267..02be605 100644 --- a/backend/app/api/gateway.py +++ b/backend/app/api/gateway.py @@ -17,6 +17,10 @@ from app.database import async_session_maker, get_db from app.models.deployment import Deployment from app.services.gateway import gateway_service +from app.services.semantic_router import semantic_router_service + +# Special model names that trigger semantic routing +SEMANTIC_ROUTER_MODEL_NAMES = {"mom", "mixture-of-models", "auto", "semantic-router"} logger = logging.getLogger(__name__) @@ -87,6 +91,47 @@ async def list_models( models = await gateway_service.get_available_models(db, api_key) + # Add Semantic Router "MoM" model if deployed and running + # Only show if api_key has access to MoM + router_app = await semantic_router_service.get_semantic_router_app(db) + if router_app and router_app.status == "running": + # Check MoM access if api_key is provided + show_mom = True + if api_key: + show_mom = await gateway_service.check_mom_access(api_key) + + if show_mom: + created_timestamp = ( + int(router_app.created_at.timestamp()) if router_app.created_at else 0 + ) + models.append( + { + "id": "MoM", + "object": "model", + "created": created_timestamp, + "owned_by": "lmstack-semantic-router", + "root": "Mixture-of-Models", + "parent": None, + "description": "Semantic Router that automatically selects the best model for each request", + "permission": [ + { + "id": "modelperm-semantic-router", + "object": "model_permission", + "created": created_timestamp, + "allow_create_engine": False, + "allow_sampling": True, + "allow_logprobs": True, + "allow_search_indices": False, + "allow_view": True, + "allow_fine_tuning": False, + "organization": "*", + "group": None, + "is_blocking": False, + } + ], + } + ) + return { "object": "list", "data": models, @@ -177,6 +222,26 @@ async def chat_completions( }, ) + # Check if using semantic routing (model="MoM", "auto", etc.) + if model_name.lower() in SEMANTIC_ROUTER_MODEL_NAMES: + # Check MoM access + if not await gateway_service.check_mom_access(api_key): + raise HTTPException( + status_code=403, + detail={ + "error": { + "message": "API key does not have access to MoM (Semantic Router)", + "type": "permission_error", + } + }, + ) + return await _proxy_to_semantic_router( + db=db, + body=body, + api_key=api_key, + endpoint="/v1/chat/completions", + ) + # Find deployment for model result = await gateway_service.find_deployment_for_model(db, model_name) if not result: @@ -407,8 +472,8 @@ async def proxy_request( async def record_usage_background( api_key_id: int, - model_id: int, - deployment_id: int, + model_id: int | None, + deployment_id: int | None, prompt_tokens: int, completion_tokens: int, ) -> None: @@ -1011,3 +1076,244 @@ async def responses( logger.info(f"Responses API response: {json.dumps(responses_response)[:500]}") return JSONResponse(content=responses_response) + + +# ============================================================================= +# Semantic Router Proxy +# ============================================================================= + + +async def _proxy_to_semantic_router( + db: AsyncSession, + body: dict, + api_key, + endpoint: str, +) -> JSONResponse | StreamingResponse: + """Proxy request to Semantic Router for intelligent model selection. + + Args: + db: Database session + body: Request body + api_key: Validated API key + endpoint: API endpoint (e.g., /v1/chat/completions) + + Returns: + Response from Semantic Router + """ + # Check if Semantic Router is deployed + router_app = await semantic_router_service.get_semantic_router_app(db) + if not router_app or router_app.status != "running": + raise HTTPException( + status_code=503, + detail={ + "error": { + "message": "Semantic Router is not deployed. Deploy it from the Apps page to use automatic model routing.", + "type": "service_unavailable", + "hint": "Use a specific model name instead, or deploy Semantic Router first.", + } + }, + ) + + router_url = await semantic_router_service.get_semantic_router_url(db) + if not router_url: + raise HTTPException( + status_code=503, + detail={ + "error": { + "message": "Semantic Router URL not available", + "type": "service_unavailable", + } + }, + ) + + # Get API key from router app config for Semantic Router authentication + router_config = router_app.config or {} + sr_api_key = router_config.get("api_key") + + # Set a default model - Semantic Router requires a model field + body_copy = body.copy() + model_name = body_copy.get("model", "") + if not model_name or model_name.lower() in SEMANTIC_ROUTER_MODEL_NAMES: + # Get the first running deployment's model name + from sqlalchemy.orm import selectinload + + from app.models.deployment import Deployment, DeploymentStatus + + result = await db.execute( + select(Deployment) + .where(Deployment.status == DeploymentStatus.RUNNING.value) + .options(selectinload(Deployment.model)) + .limit(1) + ) + deployment = result.scalar_one_or_none() + if deployment and deployment.model: + default_model = deployment.model.name.replace("/", "-").replace(":", "-") + else: + default_model = "default" + body_copy["model"] = default_model + + upstream_url = f"{router_url}{endpoint}" + is_streaming = body.get("stream", False) + + # Use the caller's API key for usage tracking + # This allows users to see their MoM usage in their own API Key dashboard + caller_api_key_id = api_key.id if api_key else None + + if is_streaming: + return await _proxy_semantic_router_streaming( + upstream_url, body_copy, caller_api_key_id, db, sr_api_key + ) + else: + return await _proxy_semantic_router_request( + upstream_url, body_copy, caller_api_key_id, db, sr_api_key + ) + + +async def _proxy_semantic_router_request( + upstream_url: str, + body: dict, + api_key_id: int | None, + db: AsyncSession, + sr_api_key: str | None = None, +) -> JSONResponse: + """Proxy non-streaming request to Semantic Router.""" + headers = {"Content-Type": "application/json"} + if sr_api_key: + headers["Authorization"] = f"Bearer {sr_api_key}" + + try: + async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client: + response = await client.post( + upstream_url, + json=body, + headers=headers, + ) + response_data = response.json() + + # Record usage for Semantic Router + if api_key_id: + usage = response_data.get("usage", {}) + prompt_tokens = usage.get("prompt_tokens", 0) + completion_tokens = usage.get("completion_tokens", 0) + + await gateway_service.record_usage( + db=db, + api_key_id=api_key_id, + model_id=None, # No specific model for Semantic Router + deployment_id=None, + prompt_tokens=prompt_tokens, + completion_tokens=completion_tokens, + ) + + return JSONResponse(content=response_data, status_code=response.status_code) + + except httpx.TimeoutException: + raise HTTPException( + status_code=504, + detail={ + "error": { + "message": "Request to Semantic Router timed out", + "type": "timeout_error", + } + }, + ) + except httpx.RequestError as e: + logger.error(f"Semantic Router request error: {e}") + raise HTTPException( + status_code=502, + detail={ + "error": { + "message": "Failed to connect to Semantic Router", + "type": "connection_error", + } + }, + ) + + +async def _proxy_semantic_router_streaming( + upstream_url: str, + body: dict, + api_key_id: int | None, + db: AsyncSession, + sr_api_key: str | None = None, +) -> StreamingResponse: + """Proxy streaming request to Semantic Router.""" + headers = {"Content-Type": "application/json"} + if sr_api_key: + headers["Authorization"] = f"Bearer {sr_api_key}" + + usage_info = {"prompt_tokens": 0, "completion_tokens": 0} + + async def stream_generator() -> AsyncGenerator[bytes, None]: + try: + async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client: + async with client.stream( + "POST", + upstream_url, + json=body, + headers=headers, + ) as response: + async for chunk in response.aiter_bytes(): + yield chunk + + # Try to extract usage from final chunk + try: + chunk_str = chunk.decode("utf-8") + for line in chunk_str.split("\n"): + if line.startswith("data: ") and line != "data: [DONE]": + data = json.loads(line[6:]) + if "usage" in data: + usage_info["prompt_tokens"] = data["usage"].get( + "prompt_tokens", 0 + ) + usage_info["completion_tokens"] = data["usage"].get( + "completion_tokens", 0 + ) + except (json.JSONDecodeError, UnicodeDecodeError): + pass + + except httpx.TimeoutException: + error_data = { + "error": { + "message": "Request to Semantic Router timed out", + "type": "timeout_error", + } + } + yield f"data: {json.dumps(error_data)}\n\n".encode() + except httpx.RequestError as e: + logger.error(f"Semantic Router streaming error: {e}") + error_data = { + "error": { + "message": f"Connection error: {e}", + "type": "connection_error", + } + } + yield f"data: {json.dumps(error_data)}\n\n".encode() + + # Estimate tokens if not provided + if usage_info["prompt_tokens"] == 0: + messages = body.get("messages", []) + for msg in messages: + content = msg.get("content", "") + if isinstance(content, str): + usage_info["prompt_tokens"] += len(content) // 4 + + # Record usage with background task + if api_key_id: + await record_usage_background( + api_key_id=api_key_id, + model_id=None, + deployment_id=None, + prompt_tokens=usage_info["prompt_tokens"], + completion_tokens=usage_info["completion_tokens"], + ) + + return StreamingResponse( + stream_generator(), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "X-Accel-Buffering": "no", + }, + ) diff --git a/backend/app/api/semantic_router.py b/backend/app/api/semantic_router.py new file mode 100644 index 0000000..74fce22 --- /dev/null +++ b/backend/app/api/semantic_router.py @@ -0,0 +1,285 @@ +"""Semantic Router API endpoints. + +Provides endpoints for: +- Checking if Semantic Router is deployed +- Updating Semantic Router config +- Getting Semantic Router status +- Proxying chat requests to Semantic Router (for logged-in users) +""" + +import json +import logging +from collections.abc import AsyncGenerator + +import httpx +from fastapi import APIRouter, Depends, HTTPException, Request +from fastapi.responses import StreamingResponse +from pydantic import BaseModel +from sqlalchemy.ext.asyncio import AsyncSession + +from app.api.apps.deployment import update_semantic_router_config_if_deployed +from app.core.deps import require_viewer +from app.database import get_db +from app.models.user import User +from app.services.semantic_router import semantic_router_service + +logger = logging.getLogger(__name__) + +# Timeout for chat requests (5 minutes for long model responses) +CHAT_PROXY_TIMEOUT = 300.0 + +router = APIRouter(prefix="/semantic-router", tags=["semantic-router"]) + + +class SemanticRouterStatus(BaseModel): + """Semantic Router deployment status.""" + + deployed: bool + url: str | None = None + dashboard_url: str | None = None + message: str | None = None + + +class ConfigUpdateResponse(BaseModel): + """Response for config update.""" + + success: bool + message: str + + +@router.get("/status", response_model=SemanticRouterStatus) +async def get_semantic_router_status( + db: AsyncSession = Depends(get_db), +): + """Check if Semantic Router is deployed and get its URLs.""" + app = await semantic_router_service.get_semantic_router_app(db) + + if not app: + return SemanticRouterStatus( + deployed=False, + message="Semantic Router is not deployed. Deploy it from the Apps page to enable intelligent model routing.", + ) + + if not app.worker: + return SemanticRouterStatus( + deployed=False, + message="Semantic Router worker not found.", + ) + + # Build URLs + worker_host = app.worker.address.split(":")[0] + api_url = f"http://{worker_host}:{app.port}" + dashboard_url = f"http://{worker_host}:{app.port + 1}" # Dashboard is on port + 1 + + return SemanticRouterStatus( + deployed=True, + url=api_url, + dashboard_url=dashboard_url, + message="Semantic Router is running. Use model='MoM' for automatic routing.", + ) + + +@router.post("/update-config", response_model=ConfigUpdateResponse) +async def update_semantic_router_config( + db: AsyncSession = Depends(get_db), +): + """Update Semantic Router config with latest deployments. + + This endpoint regenerates the config.yaml with current running models + and writes it to the Semantic Router volume. The router will automatically + reload the config (hot-reload supported). + """ + try: + updated = await update_semantic_router_config_if_deployed(db) + + if updated: + return ConfigUpdateResponse( + success=True, + message="Semantic Router config updated successfully. Changes will take effect automatically.", + ) + else: + return ConfigUpdateResponse( + success=False, + message="Semantic Router is not deployed. Deploy it first from the Apps page.", + ) + + except Exception as e: + logger.error(f"Failed to update semantic router config: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + +@router.get("/config-preview") +async def preview_semantic_router_config( + db: AsyncSession = Depends(get_db), +): + """Preview the Semantic Router config that would be generated. + + This is useful for debugging or understanding how the config is built. + """ + # Try to get lmstack_host from deployed Semantic Router config, fallback to placeholder + app = await semantic_router_service.get_semantic_router_app(db) + if app: + app_config = app.config or {} + lmstack_host = app_config.get("lmstack_host") + if lmstack_host: + lmstack_api_url = f"http://{lmstack_host}:52000" + elif app.worker: + # Fallback to worker IP (may not be correct for container access) + worker_host = app.worker.address.split(":")[0] + lmstack_api_url = f"http://{worker_host}:52000" + else: + lmstack_api_url = "http://:52000" + else: + lmstack_api_url = "http://:52000" + config = await semantic_router_service.generate_config(db, lmstack_api_url) + return config + + +@router.post("/chat") +async def proxy_semantic_router_chat( + request: Request, + db: AsyncSession = Depends(get_db), + current_user: User = Depends(require_viewer), +): + """Proxy chat requests to Semantic Router (requires viewer+). + + This endpoint allows logged-in users to chat with the Semantic Router + using JWT authentication instead of API keys. The Semantic Router will + intelligently select the best model for each request. + """ + # Check if Semantic Router is deployed + router_app = await semantic_router_service.get_semantic_router_app(db) + if not router_app or router_app.status != "running": + raise HTTPException( + status_code=503, + detail="Semantic Router is not deployed or not running. Deploy it from the Apps page.", + ) + + router_url = await semantic_router_service.get_semantic_router_url(db) + if not router_url: + raise HTTPException(status_code=503, detail="Semantic Router URL not available") + + # Get request body + try: + body = await request.json() + except (json.JSONDecodeError, ValueError): + raise HTTPException(status_code=400, detail="Invalid JSON body") + + # Get the default model from running deployments + # Semantic Router requires a model field, it will route based on domain detection + model_name = body.get("model", "") + if not model_name or model_name.lower() in ("mom", "mom (intelligent router)"): + # Query the first running deployment to get a valid model name + from sqlalchemy import select + from sqlalchemy.orm import selectinload + + from app.models.deployment import Deployment, DeploymentStatus + + result = await db.execute( + select(Deployment) + .where(Deployment.status == DeploymentStatus.RUNNING.value) + .options(selectinload(Deployment.model)) + .limit(1) + ) + deployment = result.scalar_one_or_none() + if deployment and deployment.model: + # Use the model name format that matches Semantic Router config + default_model = deployment.model.name.replace("/", "-").replace(":", "-") + else: + default_model = "default" + body["model"] = default_model + + # Get API key from app config for Semantic Router authentication + app_config = router_app.config or {} + api_key = app_config.get("api_key") + + # Check if streaming + is_streaming = body.get("stream", False) + + chat_endpoint = f"{router_url}/v1/chat/completions" + + if is_streaming: + return await _proxy_streaming_chat(chat_endpoint, body, api_key) + else: + return await _proxy_chat(chat_endpoint, body, api_key) + + +async def _proxy_chat(upstream_url: str, body: dict, api_key: str | None = None) -> dict: + """Proxy a non-streaming chat request.""" + headers = {"Content-Type": "application/json"} + if api_key: + headers["Authorization"] = f"Bearer {api_key}" + + try: + async with httpx.AsyncClient(timeout=CHAT_PROXY_TIMEOUT) as client: + response = await client.post( + upstream_url, + json=body, + headers=headers, + ) + return response.json() + + except httpx.TimeoutException: + raise HTTPException(status_code=504, detail="Request to Semantic Router timed out") + except httpx.ConnectError: + raise HTTPException(status_code=502, detail="Failed to connect to Semantic Router") + except httpx.RequestError as e: + logger.error(f"Chat proxy request error: {e}") + raise HTTPException(status_code=502, detail=f"Request error: {str(e)}") + + +async def _proxy_streaming_chat( + upstream_url: str, body: dict, api_key: str | None = None +) -> StreamingResponse: + """Proxy a streaming chat request.""" + headers = {"Content-Type": "application/json"} + if api_key: + headers["Authorization"] = f"Bearer {api_key}" + + async def stream_generator() -> AsyncGenerator[bytes, None]: + try: + timeout = httpx.Timeout(CHAT_PROXY_TIMEOUT, connect=10.0) + async with httpx.AsyncClient(timeout=timeout) as client: + async with client.stream( + "POST", + upstream_url, + json=body, + headers=headers, + ) as response: + # Stream each line separately for better real-time delivery + async for line in response.aiter_lines(): + if line: + yield (line + "\n").encode() + + except httpx.TimeoutException: + logger.error(f"Streaming timeout for {upstream_url}") + error_data = { + "error": { + "message": "Request to Semantic Router timed out", + "type": "timeout_error", + } + } + yield f"data: {json.dumps(error_data)}\n\n".encode() + except httpx.ConnectError: + logger.error(f"Connection error for {upstream_url}") + error_data = { + "error": { + "message": "Failed to connect to Semantic Router", + "type": "connection_error", + } + } + yield f"data: {json.dumps(error_data)}\n\n".encode() + except httpx.RequestError as e: + logger.error(f"Streaming request error: {e}") + error_data = {"error": {"message": f"Request error: {str(e)}", "type": "request_error"}} + yield f"data: {json.dumps(error_data)}\n\n".encode() + + return StreamingResponse( + stream_generator(), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "X-Accel-Buffering": "no", + }, + ) diff --git a/backend/app/api/workers.py b/backend/app/api/workers.py index 7e2e047..398fd20 100644 --- a/backend/app/api/workers.py +++ b/backend/app/api/workers.py @@ -1,15 +1,17 @@ """Worker API routes""" +import logging from datetime import UTC, datetime -from fastapi import APIRouter, Depends, HTTPException, Query, Request +from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Query, Request from sqlalchemy import func, select from sqlalchemy.ext.asyncio import AsyncSession from app.config import get_settings from app.core.deps import require_operator, require_viewer -from app.database import get_db -from app.models.deployment import Deployment +from app.database import async_session_maker, get_db +from app.models.app import App, AppStatus +from app.models.deployment import Deployment, DeploymentStatus from app.models.registration_token import RegistrationToken from app.models.user import User from app.models.worker import Worker, WorkerStatus @@ -26,6 +28,8 @@ ) from app.services.local_worker import get_local_hostname, spawn_docker_worker, stop_docker_worker +logger = logging.getLogger(__name__) + router = APIRouter() @@ -97,6 +101,7 @@ def _get_client_ip(request: Request) -> str: async def create_worker( worker_in: WorkerCreate | WorkerRegisterWithToken, request: Request, + background_tasks: BackgroundTasks, db: AsyncSession = Depends(get_db), ): """Register a new worker (requires registration token)""" @@ -129,46 +134,51 @@ async def create_worker( ) original_worker = original_worker_result.scalar_one_or_none() - if existing_worker and token.used_by_worker_id == existing_worker.id: - # Allow reconnection - update existing worker with real IP + if original_worker is not None: + # Allow reconnection - update existing worker with new info + # Worker name may have changed (e.g., new container ID), update it client_ip = _get_client_ip(request) reported_port = "52001" if ":" in worker_in.address: reported_port = worker_in.address.split(":")[-1] - existing_worker.address = f"{client_ip}:{reported_port}" - existing_worker.gpu_info = ( + original_worker.name = worker_in.name # Update name in case it changed + original_worker.address = f"{client_ip}:{reported_port}" + original_worker.gpu_info = ( [gpu.model_dump() for gpu in worker_in.gpu_info] if worker_in.gpu_info else None ) - existing_worker.system_info = ( + original_worker.system_info = ( worker_in.system_info.model_dump() if worker_in.system_info else None ) - existing_worker.status = WorkerStatus.ONLINE.value - existing_worker.last_heartbeat = datetime.now(UTC) + original_worker.status = WorkerStatus.ONLINE.value + original_worker.last_heartbeat = datetime.now(UTC) # Update labels to mark as local if token is for local worker if token.is_local: - worker_labels = dict(existing_worker.labels) if existing_worker.labels else {} + worker_labels = dict(original_worker.labels) if original_worker.labels else {} worker_labels["type"] = "local" - existing_worker.labels = worker_labels + original_worker.labels = worker_labels await db.commit() - await db.refresh(existing_worker) + await db.refresh(original_worker) + + # Refresh deployments and apps status on this worker + background_tasks.add_task(_refresh_worker_resources, original_worker.id) return WorkerResponse( - id=existing_worker.id, - name=existing_worker.name, - address=existing_worker.address, - description=existing_worker.description, - labels=existing_worker.labels, - status=existing_worker.status, - gpu_info=existing_worker.gpu_info, - system_info=existing_worker.system_info, - created_at=existing_worker.created_at, - updated_at=existing_worker.updated_at, - last_heartbeat=existing_worker.last_heartbeat, + id=original_worker.id, + name=original_worker.name, + address=original_worker.address, + description=original_worker.description, + labels=original_worker.labels, + status=original_worker.status, + gpu_info=original_worker.gpu_info, + system_info=original_worker.system_info, + created_at=original_worker.created_at, + updated_at=original_worker.updated_at, + last_heartbeat=original_worker.last_heartbeat, deployment_count=0, ) - elif original_worker is None: + else: # Original worker was deleted, allow re-registration with new worker # Reset token for reuse token.is_used = False @@ -176,8 +186,6 @@ async def create_worker( token.used_by_worker_id = None await db.commit() # Continue to create new worker below - else: - raise HTTPException(status_code=401, detail="Registration token has already been used") if not token.is_valid: raise HTTPException(status_code=401, detail="Registration token has expired") @@ -348,6 +356,7 @@ async def delete_worker( @router.post("/heartbeat") async def worker_heartbeat( heartbeat: WorkerHeartbeat, + background_tasks: BackgroundTasks, db: AsyncSession = Depends(get_db), ): """Receive heartbeat from worker""" @@ -357,6 +366,10 @@ async def worker_heartbeat( if not worker: raise HTTPException(status_code=404, detail="Worker not found") + # Check if worker is coming back online + was_offline = worker.status == WorkerStatus.OFFLINE.value + is_now_online = heartbeat.status == WorkerStatus.ONLINE + worker.last_heartbeat = datetime.now(UTC) worker.status = heartbeat.status.value @@ -366,8 +379,19 @@ async def worker_heartbeat( if heartbeat.system_info: worker.system_info = heartbeat.system_info.model_dump() + # Check if worker is going offline + is_going_offline = heartbeat.status == WorkerStatus.OFFLINE + + # If worker is going offline, immediately update all deployments and apps + if is_going_offline: + await _mark_worker_resources_offline(db, worker.id, worker.name) + await db.commit() + # If worker came back online, refresh deployments and apps status + if was_offline and is_now_online: + background_tasks.add_task(_refresh_worker_resources, worker.id) + return {"status": "ok"} @@ -614,3 +638,156 @@ async def delete_registration_token( await db.delete(token) await db.commit() + + +# ============================================================================= +# Helper Functions +# ============================================================================= + + +async def _mark_worker_resources_offline(db: AsyncSession, worker_id: int, worker_name: str): + """Mark all deployments and apps on an offline worker as unavailable. + + This is called synchronously when a worker sends an offline heartbeat. + """ + # Update deployments on this worker + dep_result = await db.execute( + select(Deployment).where( + Deployment.worker_id == worker_id, + Deployment.status.in_( + [ + DeploymentStatus.RUNNING.value, + DeploymentStatus.STARTING.value, + ] + ), + ) + ) + deployments = dep_result.scalars().all() + + for deployment in deployments: + deployment.status = DeploymentStatus.ERROR.value + deployment.status_message = f"Worker {worker_name} is offline" + + # Update apps on this worker + app_result = await db.execute( + select(App).where( + App.worker_id == worker_id, + App.status.in_( + [ + AppStatus.RUNNING.value, + AppStatus.STARTING.value, + AppStatus.PULLING.value, + ] + ), + ) + ) + apps = app_result.scalars().all() + + for app in apps: + app.status = AppStatus.ERROR.value + app.status_message = f"Worker {worker_name} is offline" + + if deployments or apps: + logger.info( + f"Marked {len(deployments)} deployments and {len(apps)} apps as offline " + f"for worker {worker_name}" + ) + + +async def _refresh_worker_resources(worker_id: int): + """Refresh status of deployments and apps on a worker that just came online. + + This is called as a background task when a worker's heartbeat indicates + it has come back online after being offline. + """ + import httpx + + logger.info(f"Refreshing resources for worker {worker_id} after coming online") + + async with async_session_maker() as db: + # Get the worker + result = await db.execute(select(Worker).where(Worker.id == worker_id)) + worker = result.scalar_one_or_none() + if not worker: + return + + # Refresh deployments on this worker + dep_result = await db.execute( + select(Deployment).where( + Deployment.worker_id == worker_id, + Deployment.status.in_( + [ + DeploymentStatus.ERROR.value, + DeploymentStatus.STARTING.value, + DeploymentStatus.RUNNING.value, + ] + ), + ) + ) + deployments = dep_result.scalars().all() + + for deployment in deployments: + if not deployment.container_id: + continue + try: + async with httpx.AsyncClient(timeout=5.0) as client: + response = await client.get( + f"http://{worker.address}/containers/{deployment.container_id}" + ) + if response.status_code == 200: + container_info = response.json() + state = container_info.get("state", "").lower() + if state == "running": + deployment.status = DeploymentStatus.RUNNING.value + deployment.status_message = "Model ready" + elif state in ("exited", "dead"): + deployment.status = DeploymentStatus.STOPPED.value + deployment.status_message = f"Container {state}" + elif response.status_code == 404: + deployment.status = DeploymentStatus.ERROR.value + deployment.status_message = "Container not found" + except Exception as e: + logger.warning(f"Failed to check deployment {deployment.id}: {e}") + + # Refresh apps on this worker + app_result = await db.execute( + select(App).where( + App.worker_id == worker_id, + App.status.in_( + [ + AppStatus.ERROR.value, + AppStatus.STARTING.value, + AppStatus.RUNNING.value, + ] + ), + ) + ) + apps = app_result.scalars().all() + + for app in apps: + if not app.container_id: + continue + try: + async with httpx.AsyncClient(timeout=5.0) as client: + response = await client.get( + f"http://{worker.address}/containers/{app.container_id}" + ) + if response.status_code == 200: + container_info = response.json() + state = container_info.get("state", "").lower() + if state == "running": + app.status = AppStatus.RUNNING.value + app.status_message = None + elif state in ("exited", "dead"): + app.status = AppStatus.STOPPED.value + app.status_message = f"Container {state}" + elif response.status_code == 404: + app.status = AppStatus.ERROR.value + app.status_message = "Container not found" + except Exception as e: + logger.warning(f"Failed to check app {app.id}: {e}") + + await db.commit() + logger.info( + f"Refreshed {len(deployments)} deployments and {len(apps)} apps for worker {worker_id}" + ) diff --git a/backend/app/main.py b/backend/app/main.py index 703a75f..b8cae25 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -19,6 +19,7 @@ from app.models.worker import Worker, WorkerStatus from app.services.app_sync import app_sync_service from app.services.deployment_sync import deployment_sync_service +from app.services.worker_sync import worker_sync_service # Configure logging logging.basicConfig( @@ -126,10 +127,13 @@ async def check_app_health(): stats = await app_sync_service.sync_all_apps() if stats["total"] > 0: - logger.debug( - f"App health check: {stats['running_verified']} healthy, " - f"{stats['container_missing']} missing" - ) + log_parts = [ + f"{stats['running_verified']} healthy", + f"{stats['container_missing']} missing", + ] + if stats.get("proxy_repaired", 0) > 0: + log_parts.append(f"{stats['proxy_repaired']} proxy repaired") + logger.debug(f"App health check: {', '.join(log_parts)}") except asyncio.CancelledError: logger.info("App health check task cancelled") @@ -149,6 +153,21 @@ async def lifespan(app: FastAPI): await init_db() logger.info("Database initialized") + # Check all workers' status first, then refresh resources on online workers + try: + logger.info("Checking worker status...") + worker_stats = await worker_sync_service.sync_all_workers() + if worker_stats["total"] > 0: + logger.info( + f"Worker sync complete: {worker_stats['online']} online, " + f"{worker_stats['offline']} offline" + ) + # Refresh resources on online workers + if worker_stats["online"] > 0: + await worker_sync_service.refresh_online_workers_resources() + except Exception as e: + logger.error(f"Failed to sync workers on startup: {e}") + # Synchronize deployment status with actual container state # This is important after system reboot try: @@ -168,10 +187,13 @@ async def lifespan(app: FastAPI): logger.info("Synchronizing app status...") app_stats = await app_sync_service.sync_all_apps() if app_stats["total"] > 0: - logger.info( - f"App sync complete: {app_stats['running_verified']} running, " - f"{app_stats['container_missing']} missing" - ) + log_parts = [ + f"{app_stats['running_verified']} running", + f"{app_stats['container_missing']} missing", + ] + if app_stats.get("proxy_repaired", 0) > 0: + log_parts.append(f"{app_stats['proxy_repaired']} proxy repaired") + logger.info(f"App sync complete: {', '.join(log_parts)}") except Exception as e: logger.error(f"Failed to sync apps on startup: {e}") diff --git a/backend/app/models/app.py b/backend/app/models/app.py index 845d1af..661af3f 100644 --- a/backend/app/models/app.py +++ b/backend/app/models/app.py @@ -22,6 +22,7 @@ class AppType(str, Enum): FLOWISE = "flowise" ANYTHINGLLM = "anythingllm" LOBECHAT = "lobechat" + SEMANTIC_ROUTER = "semantic-router" class AppStatus(str, Enum): @@ -109,6 +110,116 @@ class AppStatus(str, Enum): }, "volumes": [], # LobeChat is stateless by default }, + AppType.SEMANTIC_ROUTER: { + "name": "Semantic Router", + "description": "Intelligent LLM router that automatically selects the best model based on query intent", + "image": "ghcr.io/vllm-project/semantic-router/vllm-sr:latest", + "internal_port": 8888, # Main OpenAI-compatible API port (Envoy listens on 8888) + "port_name": "API", # Name for the main port + "additional_ports": [ + {"container_port": 8700, "name": "Dashboard"}, + {"container_port": 9190, "name": "Metrics"}, + ], + "env_template": { + "ENVOY_LISTEN_PORT": "8888", + "DASHBOARD_PORT": "8700", + "HF_TOKEN": "{hf_token}", # Optional: for gated models + # LMStack API key for authentication (stored in config.yaml, not used as env var) + "LMSTACK_API_KEY": "{api_key}", + # Redirect HuggingFace cache to the models volume for persistence + "HF_HOME": "/app/models", + "TRANSFORMERS_CACHE": "/app/models", + # Router API URL (internal) + "TARGET_ROUTER_API_URL": "http://localhost:8080", + "TARGET_ROUTER_METRICS_URL": "http://localhost:9190/metrics", + # Optional: Monitoring URLs (leave empty to disable) + "TARGET_GRAFANA_URL": "{grafana_url}", + "TARGET_PROMETHEUS_URL": "{prometheus_url}", + "TARGET_JAEGER_URL": "{jaeger_url}", + }, + "volumes": [ + {"name": "semantic-router-config", "destination": "/app/config"}, + {"name": "semantic-router-models", "destination": "/app/models"}, + ], + # Override entrypoint to create symlink before starting supervisord + # (supervisord.conf hardcodes /app/config.yaml path) + # Patch router-defaults.yaml to enable metrics BEFORE supervisord starts + # (patching router-config.yaml doesn't work because it gets regenerated on restart) + "entrypoint": ["/bin/sh", "-c"], + "command": [ + # Patch the defaults file to enable metrics + 'python3 -c "' + "import yaml; " + "f='/app/cli/templates/router-defaults.yaml'; " + "c=yaml.safe_load(open(f)); " + "c.setdefault('observability',{}).setdefault('metrics',{})['enabled']=True; " + "yaml.dump(c,open(f,'w'),default_flow_style=False); " + "print('Patched router-defaults.yaml to enable metrics')\" && " + # Create symlink for config + "ln -sf /app/config/config.yaml /app/config.yaml && " + # Start supervisord + "exec /usr/bin/supervisord -c /etc/supervisor/supervisord.conf" + ], + "requires_config": True, # Indicates this app needs dynamic config generation + "singleton": True, # Only one instance should be deployed per cluster + "has_monitoring": True, # Supports optional monitoring stack + }, +} + + +# Internal monitoring service definitions (not user-deployable, only as sub-services) +MONITORING_DEFINITIONS = { + "grafana": { + "name": "Grafana", + "description": "Metrics visualization dashboard", + "image": "grafana/grafana:latest", + "internal_port": 3000, + "env_template": { + "GF_SECURITY_ADMIN_PASSWORD": "admin", + "GF_USERS_ALLOW_SIGN_UP": "false", + "GF_AUTH_ANONYMOUS_ENABLED": "true", + "GF_AUTH_ANONYMOUS_ORG_ROLE": "Viewer", + # Allow embedding in iframes (for Semantic Router dashboard) + "GF_SECURITY_ALLOW_EMBEDDING": "true", + "GF_AUTH_ANONYMOUS_ORG_NAME": "Main Org.", + # Allow any origin for live websocket (needed when accessed via proxy) + "GF_LIVE_ALLOWED_ORIGINS": "*", + # Don't enforce strict origin checks + "GF_SECURITY_COOKIE_SAMESITE": "disabled", + # Disable features we don't need + "GF_ALERTING_ENABLED": "false", + "GF_UNIFIED_ALERTING_ENABLED": "false", + }, + "volumes": [ + {"name": "grafana-data", "destination": "/var/lib/grafana"}, + # Provisioning directories will be mounted + ], + # Grafana needs special provisioning - see monitoring.py + "requires_provisioning": True, + }, + "prometheus": { + "name": "Prometheus", + "description": "Metrics collection and alerting", + "image": "prom/prometheus:latest", + "internal_port": 9090, + "env_template": {}, + "volumes": [ + {"name": "prometheus-data", "destination": "/prometheus"}, + {"name": "prometheus-config", "destination": "/etc/prometheus"}, + ], + # Prometheus needs custom config - see monitoring.py + "requires_config": True, + }, + "jaeger": { + "name": "Jaeger", + "description": "Distributed tracing", + "image": "jaegertracing/all-in-one:latest", + "internal_port": 16686, + "env_template": { + "COLLECTOR_OTLP_ENABLED": "true", + }, + "volumes": [], + }, } @@ -128,6 +239,11 @@ class App(Base): # Worker where app is deployed worker_id: Mapped[int] = mapped_column(Integer, ForeignKey("workers.id"), nullable=False) + # Parent app (for monitoring services linked to Semantic Router, etc.) + parent_app_id: Mapped[int | None] = mapped_column( + Integer, ForeignKey("apps.id", ondelete="CASCADE"), nullable=True + ) + # Associated API key (auto-created for the app) api_key_id: Mapped[int | None] = mapped_column( Integer, ForeignKey("api_keys.id"), nullable=True @@ -159,6 +275,12 @@ class App(Base): # Relationships worker: Mapped["Worker"] = relationship("Worker") api_key: Mapped[Optional["ApiKey"]] = relationship("ApiKey") + parent_app: Mapped[Optional["App"]] = relationship( + "App", remote_side="App.id", back_populates="child_apps" + ) + child_apps: Mapped[list["App"]] = relationship( + "App", back_populates="parent_app", cascade="all, delete-orphan" + ) def __repr__(self) -> str: return f"" diff --git a/backend/app/schemas/api_key.py b/backend/app/schemas/api_key.py index 3da3719..7c5625f 100644 --- a/backend/app/schemas/api_key.py +++ b/backend/app/schemas/api_key.py @@ -10,7 +10,8 @@ class ApiKeyCreate(BaseModel): name: str = Field(..., min_length=1, max_length=100) description: str | None = None - allowed_model_ids: list[int] | None = None # None = all models allowed + # Can include model IDs (int) and "mom" (str) for MoM access + allowed_model_ids: list[int | str] | None = None # None = all models allowed monthly_token_limit: int | None = None # None = unlimited expires_in_days: int | None = None # None = never expires @@ -20,7 +21,8 @@ class ApiKeyUpdate(BaseModel): name: str | None = None description: str | None = None - allowed_model_ids: list[int] | None = None + # Can include model IDs (int) and "mom" (str) for MoM access + allowed_model_ids: list[int | str] | None = None monthly_token_limit: int | None = None @@ -31,7 +33,8 @@ class ApiKeyResponse(BaseModel): name: str description: str | None access_key: str - allowed_model_ids: list[int] | None + # Can include model IDs (int) and "mom" (str) for MoM access + allowed_model_ids: list[int | str] | None monthly_token_limit: int | None expires_at: datetime | None created_at: datetime diff --git a/backend/app/schemas/app.py b/backend/app/schemas/app.py index 2229d90..3c68546 100644 --- a/backend/app/schemas/app.py +++ b/backend/app/schemas/app.py @@ -12,6 +12,7 @@ class AppDefinition(BaseModel): name: str description: str image: str + has_monitoring: bool = False # Whether app supports monitoring stack class AppDeploy(BaseModel): @@ -24,6 +25,18 @@ class AppDeploy(BaseModel): True, description="Use LMStack nginx proxy (recommended) or direct worker connection", ) + hf_token: str | None = Field( + None, + description="HuggingFace API token (required for Semantic Router)", + ) + + +class AppPortInfo(BaseModel): + """Information about an app port""" + + name: str # e.g., "API", "Dashboard" + port: int # Host port + url: str | None = None # Access URL class AppResponse(BaseModel): @@ -42,8 +55,10 @@ class AppResponse(BaseModel): proxy_path: str proxy_url: str | None = None use_proxy: bool = True - access_url: str | None = None # The URL to access the app + access_url: str | None = None # The URL to access the app (main port) + additional_urls: list[AppPortInfo] | None = None # Additional ports/URLs api_key_id: int | None = None + has_monitoring: bool = False # Whether app supports monitoring stack created_at: datetime updated_at: datetime diff --git a/backend/app/services/app_proxy_manager.py b/backend/app/services/app_proxy_manager.py index e831179..05ffc34 100644 --- a/backend/app/services/app_proxy_manager.py +++ b/backend/app/services/app_proxy_manager.py @@ -146,22 +146,18 @@ async def ensure_running(self) -> bool: logger.info(f"Pulling {NGINX_IMAGE}...") self.client.images.pull(NGINX_IMAGE) - # Get all ports from existing configs - ports = self._get_used_ports() - port_bindings = {f"{p}/tcp": ("0.0.0.0", p) for p in ports} - - # Start container + # Start container with host network so it can access worker IPs logger.info("Creating nginx proxy container") try: self.client.containers.run( NGINX_IMAGE, name=NGINX_CONTAINER_NAME, detach=True, + network_mode="host", # Use host network to access worker IPs volumes={ NGINX_CONF_PATH: {"bind": "/etc/nginx/nginx.conf", "mode": "ro"}, NGINX_CONFD_PATH: {"bind": "/etc/nginx/conf.d", "mode": "ro"}, }, - ports=port_bindings, restart_policy={"Name": "unless-stopped"}, ) return True @@ -200,20 +196,19 @@ async def add_app_proxy( container.stop() container.remove() - # Start with all port bindings + # Start with host network ports = self._get_used_ports() - port_bindings = {f"{p}/tcp": ("0.0.0.0", p) for p in ports} try: self.client.containers.run( NGINX_IMAGE, name=NGINX_CONTAINER_NAME, detach=True, + network_mode="host", # Use host network to access worker IPs volumes={ NGINX_CONF_PATH: {"bind": "/etc/nginx/nginx.conf", "mode": "ro"}, NGINX_CONFD_PATH: {"bind": "/etc/nginx/conf.d", "mode": "ro"}, }, - ports=port_bindings, restart_policy={"Name": "unless-stopped"}, ) logger.info(f"Nginx proxy container started with ports: {list(ports)}") @@ -241,17 +236,16 @@ async def remove_app_proxy(self, app_id: int) -> bool: logger.info("No more app proxies, nginx container removed") return True - port_bindings = {f"{p}/tcp": ("0.0.0.0", p) for p in ports} try: self.client.containers.run( NGINX_IMAGE, name=NGINX_CONTAINER_NAME, detach=True, + network_mode="host", # Use host network to access worker IPs volumes={ NGINX_CONF_PATH: {"bind": "/etc/nginx/nginx.conf", "mode": "ro"}, NGINX_CONFD_PATH: {"bind": "/etc/nginx/conf.d", "mode": "ro"}, }, - ports=port_bindings, restart_policy={"Name": "unless-stopped"}, ) return True diff --git a/backend/app/services/app_sync.py b/backend/app/services/app_sync.py index b8ddaef..3227407 100644 --- a/backend/app/services/app_sync.py +++ b/backend/app/services/app_sync.py @@ -2,10 +2,12 @@ Synchronizes app status with actual container state. This is important after system reboot to ensure database status matches reality. +Also verifies and repairs nginx proxy configuration for running apps. """ import asyncio import logging +from pathlib import Path import httpx from sqlalchemy import select @@ -13,6 +15,7 @@ from app.database import async_session_maker from app.models.app import App, AppStatus +from app.services.app_proxy_manager import NGINX_CONFD_PATH, get_proxy_manager logger = logging.getLogger(__name__) @@ -27,6 +30,8 @@ class AppSyncService: async def sync_all_apps(self) -> dict: """Synchronize all app statuses. + Also verifies and repairs nginx proxy configurations for running apps. + Returns: dict with sync statistics """ @@ -38,6 +43,7 @@ async def sync_all_apps(self) -> dict: "container_missing": 0, "errors": 0, "skipped": 0, + "proxy_repaired": 0, } async with async_session_maker() as db: @@ -89,13 +95,80 @@ async def check_with_semaphore(app: App): await db.commit() + # Verify and repair proxy configurations for running apps + proxy_repair_count = await self._verify_and_repair_proxies(apps) + stats["proxy_repaired"] = proxy_repair_count + logger.info( f"App sync complete: {stats['running_verified']} running, " - f"{stats['container_missing']} missing, {stats['errors']} errors" + f"{stats['container_missing']} missing, {stats['errors']} errors, " + f"{stats['proxy_repaired']} proxy configs repaired" ) return stats + async def _verify_and_repair_proxies(self, apps: list[App]) -> int: + """Verify and repair nginx proxy configurations for running apps. + + This ensures that all running apps with use_proxy=True have their + nginx proxy configuration file. If missing, creates the config and + restarts nginx. + + Args: + apps: List of apps to check + + Returns: + Number of proxy configs repaired + """ + # Filter to running apps that need proxy + running_apps_with_proxy = [ + app + for app in apps + if app.status == AppStatus.RUNNING.value and app.use_proxy and app.port and app.worker + ] + + if not running_apps_with_proxy: + return 0 + + # Check which apps are missing proxy configs + apps_missing_proxy = [] + confd_path = Path(NGINX_CONFD_PATH) + + for app in running_apps_with_proxy: + config_file = confd_path / f"app_{app.id}.conf" + if not config_file.exists(): + logger.warning( + f"App {app.id} ({app.app_type}) is running with use_proxy=True " + f"but missing proxy config file" + ) + apps_missing_proxy.append(app) + + if not apps_missing_proxy: + logger.debug("All running apps have proxy configs") + return 0 + + # Repair missing proxy configs + logger.info(f"Repairing {len(apps_missing_proxy)} missing proxy configs...") + proxy_manager = get_proxy_manager() + repaired = 0 + + for app in apps_missing_proxy: + try: + worker_host = app.worker.address.split(":")[0] + await proxy_manager.add_app_proxy( + app_id=app.id, + app_type=app.app_type, + listen_port=app.port, + worker_host=worker_host, + worker_port=app.port, + ) + logger.info(f"Repaired proxy config for app {app.id}: port {app.port}") + repaired += 1 + except Exception as e: + logger.error(f"Failed to repair proxy config for app {app.id}: {e}") + + return repaired + async def _check_and_update_app(self, app: App, db) -> str: """Check a single app and update its status. @@ -109,6 +182,11 @@ async def _check_and_update_app(self, app: App, db) -> str: return "skipped" if not app.container_id: + # If app is still being deployed (STARTING/PULLING), skip it + if app.status in (AppStatus.STARTING.value, AppStatus.PULLING.value): + logger.debug(f"App {app.id} is still deploying, skipping") + return "skipped" + # Only mark as error if app claims to be RUNNING but has no container logger.warning(f"App {app.id} has no container_id, marking as error") app.status = AppStatus.ERROR.value app.status_message = "Container ID missing" diff --git a/backend/app/services/deployer.py b/backend/app/services/deployer.py index aff7a4c..bf4d62d 100644 --- a/backend/app/services/deployer.py +++ b/backend/app/services/deployer.py @@ -18,6 +18,17 @@ settings = get_settings() +async def _update_semantic_router_config_background(): + """Background task to update semantic router config after deployment changes.""" + try: + from app.api.apps.deployment import update_semantic_router_config_if_deployed + + async with async_session_maker() as db: + await update_semantic_router_config_if_deployed(db) + except Exception as e: + logger.debug(f"Failed to update semantic router config: {e}") + + class DeployerService: """Service for deploying models to workers""" @@ -201,6 +212,9 @@ async def deploy(self, deployment_id: int) -> None: deployment.status = DeploymentStatus.RUNNING.value deployment.status_message = "Model ready" + # Update semantic router config if deployed + asyncio.create_task(_update_semantic_router_config_background()) + except httpx.ConnectError: deployment.status = DeploymentStatus.ERROR.value deployment.status_message = ( diff --git a/backend/app/services/deployment_sync.py b/backend/app/services/deployment_sync.py index d99b7e0..3e9e55d 100644 --- a/backend/app/services/deployment_sync.py +++ b/backend/app/services/deployment_sync.py @@ -135,6 +135,11 @@ async def _check_and_update_deployment(self, deployment: Deployment, db) -> str: return "skipped" if not deployment.container_id: + # If deployment is still starting, skip it + if deployment.status == DeploymentStatus.STARTING.value: + logger.debug(f"Deployment {deployment.id} is still starting, skipping") + return "skipped" + # Only mark as error if deployment claims to be RUNNING but has no container logger.warning(f"Deployment {deployment.id} has no container_id, marking as error") deployment.status = DeploymentStatus.ERROR.value deployment.status_message = "Container ID missing after restart" diff --git a/backend/app/services/gateway.py b/backend/app/services/gateway.py index a1dc05e..b4a033d 100644 --- a/backend/app/services/gateway.py +++ b/backend/app/services/gateway.py @@ -81,6 +81,20 @@ async def check_model_access( return model_id in api_key.allowed_model_ids + @staticmethod + async def check_mom_access(api_key: ApiKey) -> bool: + """Check if API key has access to MoM (Semantic Router). + + MoM access is granted if: + - allowed_model_ids is empty/null (all models allowed) + - "mom" is explicitly in allowed_model_ids + """ + if not api_key.allowed_model_ids: + # No restrictions, allow all including MoM + return True + + return "mom" in api_key.allowed_model_ids + @staticmethod async def find_deployment_for_model( db: AsyncSession, diff --git a/backend/app/services/semantic_router.py b/backend/app/services/semantic_router.py new file mode 100644 index 0000000..5e17ed0 --- /dev/null +++ b/backend/app/services/semantic_router.py @@ -0,0 +1,309 @@ +"""Semantic Router Configuration Service + +Generates and manages config.yaml for the Semantic Router app. +Supports hot-reload by updating the config file when models change. +""" + +import logging +from typing import Any + +import yaml +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.orm import selectinload + +from app.models.app import App, AppStatus, AppType +from app.models.deployment import Deployment, DeploymentStatus + +logger = logging.getLogger(__name__) + + +class SemanticRouterService: + """Service for managing Semantic Router configuration.""" + + # Default domains for semantic routing + DEFAULT_DOMAINS = [ + { + "name": "math", + "description": "Mathematics and quantitative reasoning", + "mmlu_categories": ["math"], + }, + { + "name": "coding", + "description": "Programming and software development", + "mmlu_categories": ["computer_science"], + }, + { + "name": "science", + "description": "Scientific questions and research", + "mmlu_categories": ["physics", "chemistry", "biology"], + }, + { + "name": "creative", + "description": "Creative writing and brainstorming", + "mmlu_categories": ["other"], + }, + { + "name": "general", + "description": "General knowledge and conversation", + "mmlu_categories": ["other"], + }, + ] + + async def generate_config( + self, + db: AsyncSession, + lmstack_api_url: str, + api_key: str | None = None, + ) -> dict[str, Any]: + """Generate semantic router config.yaml content. + + Uses the new v0.1 config format with version, listeners, providers, etc. + + Args: + db: Database session + lmstack_api_url: LMStack API URL (e.g., http://192.168.201.16:52000) + api_key: LMStack API key for authentication + + Returns: + Config dictionary ready to be serialized to YAML + """ + # Get all running deployments with their models + result = await db.execute( + select(Deployment) + .where(Deployment.status == DeploymentStatus.RUNNING.value) + .options(selectinload(Deployment.model), selectinload(Deployment.worker)) + ) + deployments = result.scalars().all() + + # Extract host and port from lmstack_api_url + url_parts = lmstack_api_url.replace("http://", "").replace("https://", "").split(":") + host = url_parts[0] + port = int(url_parts[1].split("/")[0]) if len(url_parts) > 1 else 52000 + + # Build models list for providers + models = [] + model_names = [] + + for deployment in deployments: + if not deployment.model or not deployment.worker: + continue + + model_name = deployment.model.name.replace("/", "-").replace(":", "-") + endpoint_name = f"lmstack-{model_name}" + model_names.append(model_name) + + model_config = { + "name": model_name, + "endpoints": [ + { + "name": endpoint_name, + "weight": 1, + "endpoint": f"{host}:{port}", + "protocol": "http", + } + ], + } + # Add API key for authentication if provided + if api_key: + model_config["access_key"] = api_key + models.append(model_config) + + # If no deployments, add a placeholder model + if not models: + models.append( + { + "name": "default", + "endpoints": [ + { + "name": "placeholder", + "weight": 1, + "endpoint": f"{host}:{port}", + "protocol": "http", + } + ], + } + ) + model_names.append("default") + + default_model = model_names[0] + + # Build config in new v0.1 format + config = { + "version": "v0.1", + # Listener configuration + "listeners": [ + { + "name": "http-8888", + "address": "0.0.0.0", + "port": 8888, + "timeout": "300s", + } + ], + # Observability (metrics endpoint) + "observability": { + "metrics": { + "enabled": True, + "port": 9190, + } + }, + # Response API caching for faster repeated queries + "response_api": { + "enabled": True, + "store_backend": "memory", + "ttl_seconds": 3600, + "max_responses": 1000, + }, + # Note: The following features require ML models to be downloaded. + # They are omitted from config to avoid startup errors. + # To enable: prompt_guard, classifier, semantic_cache, hallucination_mitigation + # See: https://vllm-semantic-router.com/docs/installation/configuration + # Signals (domains for routing) + "signals": { + "domains": self.DEFAULT_DOMAINS, + }, + # Decisions (routing rules) + "decisions": self._generate_decisions(model_names, default_model), + # Providers (models and endpoints) + "providers": { + "models": models, + "default_model": default_model, + "reasoning_families": {}, + "default_reasoning_effort": "high", + }, + } + + return config + + def _generate_decisions(self, model_names: list[str], default_model: str) -> list[dict]: + """Generate routing decisions based on available models. + + Creates decisions for each domain that route to available models. + """ + if not model_names: + return [] + + decisions = [] + + # Math decision - use reasoning if available + decisions.append( + { + "name": "math_decision", + "description": "Mathematics and quantitative reasoning", + "priority": 100, + "rules": { + "operator": "AND", + "conditions": [{"type": "domain", "name": "math"}], + }, + "modelRefs": [{"model": default_model, "use_reasoning": True}], + "plugins": [ + { + "type": "system_prompt", + "configuration": { + "system_prompt": "You are a mathematics expert. Provide step-by-step solutions with clear reasoning." + }, + }, + ], + } + ) + + # Coding decision + decisions.append( + { + "name": "coding_decision", + "description": "Programming and software development", + "priority": 100, + "rules": { + "operator": "AND", + "conditions": [{"type": "domain", "name": "coding"}], + }, + "modelRefs": [{"model": default_model, "use_reasoning": False}], + "plugins": [ + { + "type": "system_prompt", + "configuration": { + "system_prompt": "You are a programming expert. Provide clean, well-documented code with explanations." + }, + }, + ], + } + ) + + # Science decision + decisions.append( + { + "name": "science_decision", + "description": "Scientific questions and research", + "priority": 100, + "rules": { + "operator": "AND", + "conditions": [{"type": "domain", "name": "science"}], + }, + "modelRefs": [{"model": default_model, "use_reasoning": True}], + "plugins": [ + { + "type": "system_prompt", + "configuration": { + "system_prompt": "You are a science expert. Explain concepts clearly with scientific accuracy." + }, + }, + ], + } + ) + + # General/default decision (lowest priority) + decisions.append( + { + "name": "general_decision", + "description": "General knowledge and miscellaneous", + "priority": 50, + "rules": { + "operator": "AND", + "conditions": [{"type": "domain", "name": "general"}], + }, + "modelRefs": [{"model": default_model, "use_reasoning": False}], + "plugins": [ + { + "type": "system_prompt", + "configuration": {"system_prompt": "You are a helpful assistant."}, + }, + ], + } + ) + + return decisions + + def config_to_yaml(self, config: dict) -> str: + """Convert config dict to YAML string.""" + return yaml.dump(config, default_flow_style=False, allow_unicode=True, sort_keys=False) + + async def get_semantic_router_app(self, db: AsyncSession) -> App | None: + """Get the deployed Semantic Router app if exists.""" + result = await db.execute( + select(App) + .where( + App.app_type == AppType.SEMANTIC_ROUTER.value, + App.status == AppStatus.RUNNING.value, + ) + .options(selectinload(App.worker)) + ) + return result.scalar_one_or_none() + + async def is_semantic_router_deployed(self, db: AsyncSession) -> bool: + """Check if Semantic Router is deployed and running.""" + app = await self.get_semantic_router_app(db) + return app is not None + + async def get_semantic_router_url(self, db: AsyncSession) -> str | None: + """Get the Semantic Router API URL if deployed.""" + app = await self.get_semantic_router_app(db) + if not app or not app.worker: + return None + + # Return the worker address with semantic router port + worker_address = app.worker.address.split(":")[0] + return f"http://{worker_address}:{app.port}" + + +# Global instance +semantic_router_service = SemanticRouterService() diff --git a/backend/app/services/worker_sync.py b/backend/app/services/worker_sync.py new file mode 100644 index 0000000..706beb3 --- /dev/null +++ b/backend/app/services/worker_sync.py @@ -0,0 +1,314 @@ +"""Worker Sync Service + +Checks all workers' online status by pinging their health endpoints. +This is important on startup to ensure database status matches reality. +""" + +import asyncio +import logging + +import httpx +from sqlalchemy import select +from sqlalchemy.orm import selectinload + +from app.database import async_session_maker +from app.models.app import App, AppStatus +from app.models.deployment import Deployment, DeploymentStatus +from app.models.worker import Worker, WorkerStatus + +logger = logging.getLogger(__name__) + + +class WorkerSyncService: + """Service for synchronizing worker status with actual state.""" + + HEALTH_CHECK_TIMEOUT = 5 # seconds + MAX_CONCURRENT_CHECKS = 10 + + async def sync_all_workers(self) -> dict: + """Check all workers' online status. + + Returns: + dict with sync statistics + """ + logger.info("Starting worker status synchronization...") + + stats = { + "total": 0, + "online": 0, + "offline": 0, + "errors": 0, + } + + async with async_session_maker() as db: + # Get all workers that are marked as online + result = await db.execute( + select(Worker).where(Worker.status == WorkerStatus.ONLINE.value) + ) + workers = result.scalars().all() + stats["total"] = len(workers) + + if not workers: + logger.info("No online workers to check") + return stats + + logger.info(f"Checking {len(workers)} workers...") + + # Check workers with limited concurrency + semaphore = asyncio.Semaphore(self.MAX_CONCURRENT_CHECKS) + + async def check_with_semaphore(worker: Worker): + async with semaphore: + return await self._check_worker(worker, db) + + tasks = [check_with_semaphore(w) for w in workers] + results = await asyncio.gather(*tasks, return_exceptions=True) + + # Aggregate results + for result in results: + if isinstance(result, Exception): + logger.error(f"Worker check failed: {result}") + stats["errors"] += 1 + elif result == "online": + stats["online"] += 1 + elif result == "offline": + stats["offline"] += 1 + + await db.commit() + + logger.info( + f"Worker sync complete: {stats['online']} online, " + f"{stats['offline']} offline, {stats['errors']} errors" + ) + + return stats + + async def _check_worker(self, worker: Worker, db) -> str: + """Check a single worker's status by pinging its health endpoint. + + Returns: + "online" or "offline" + """ + # Skip local workers that don't have a real address + if not worker.address or worker.address.startswith("local"): + return "online" + + try: + async with httpx.AsyncClient(timeout=self.HEALTH_CHECK_TIMEOUT) as client: + response = await client.get(f"http://{worker.address}/health") + if response.status_code == 200: + logger.debug(f"Worker {worker.name}: online") + return "online" + else: + logger.warning( + f"Worker {worker.name}: unhealthy (status {response.status_code})" + ) + worker.status = WorkerStatus.OFFLINE.value + return "offline" + + except (httpx.ConnectError, httpx.ConnectTimeout): + logger.warning(f"Worker {worker.name}: offline (unreachable)") + worker.status = WorkerStatus.OFFLINE.value + return "offline" + + except Exception as e: + logger.error(f"Error checking worker {worker.name}: {e}") + worker.status = WorkerStatus.OFFLINE.value + return "offline" + + async def refresh_online_workers_resources(self) -> dict: + """Refresh deployment and app status for all online workers. + + This should be called after worker sync to update resource status. + + Returns: + dict with refresh statistics + """ + logger.info("Refreshing resources on online workers...") + + stats = { + "workers_checked": 0, + "deployments_updated": 0, + "apps_updated": 0, + "errors": 0, + } + + async with async_session_maker() as db: + # Get all online workers + result = await db.execute( + select(Worker).where(Worker.status == WorkerStatus.ONLINE.value) + ) + workers = result.scalars().all() + + if not workers: + logger.info("No online workers to refresh") + return stats + + stats["workers_checked"] = len(workers) + worker_ids = [w.id for w in workers] + + # Refresh deployments on these workers + deploy_result = await db.execute( + select(Deployment) + .where( + Deployment.worker_id.in_(worker_ids), + Deployment.status.in_( + [ + DeploymentStatus.RUNNING.value, + DeploymentStatus.STARTING.value, + ] + ), + ) + .options(selectinload(Deployment.worker)) + ) + deployments = deploy_result.scalars().all() + + for deployment in deployments: + try: + updated = await self._refresh_deployment_status(deployment, db) + if updated: + stats["deployments_updated"] += 1 + except Exception as e: + logger.error(f"Error refreshing deployment {deployment.id}: {e}") + stats["errors"] += 1 + + # Refresh apps on these workers + app_result = await db.execute( + select(App) + .where( + App.worker_id.in_(worker_ids), + App.status.in_( + [ + AppStatus.RUNNING.value, + AppStatus.STARTING.value, + AppStatus.PULLING.value, + ] + ), + ) + .options(selectinload(App.worker)) + ) + apps = app_result.scalars().all() + + for app in apps: + try: + updated = await self._refresh_app_status(app, db) + if updated: + stats["apps_updated"] += 1 + except Exception as e: + logger.error(f"Error refreshing app {app.id}: {e}") + stats["errors"] += 1 + + await db.commit() + + logger.info( + f"Resource refresh complete: {stats['deployments_updated']} deployments, " + f"{stats['apps_updated']} apps updated, {stats['errors']} errors" + ) + + return stats + + async def _refresh_deployment_status(self, deployment: Deployment, db) -> bool: + """Refresh a single deployment's status by checking container. + + Returns: + True if status was updated, False otherwise + """ + if not deployment.worker or not deployment.container_id: + return False + + try: + async with httpx.AsyncClient(timeout=self.HEALTH_CHECK_TIMEOUT) as client: + response = await client.get( + f"http://{deployment.worker.address}/containers/{deployment.container_id}" + ) + + if response.status_code == 404: + # Container doesn't exist + if deployment.status != DeploymentStatus.ERROR.value: + deployment.status = DeploymentStatus.ERROR.value + deployment.status_message = "Container not found. Please redeploy." + logger.warning(f"Deployment {deployment.name}: container not found") + return True + return False + + if response.status_code == 200: + container_info = response.json() + state = container_info.get("state", "").lower() + + if state == "running": + # Container is running, status is valid + logger.debug(f"Deployment {deployment.name}: container running") + return False + + elif state in ("exited", "dead"): + if deployment.status != DeploymentStatus.ERROR.value: + deployment.status = DeploymentStatus.ERROR.value + deployment.status_message = f"Container {state}. Please restart." + logger.warning(f"Deployment {deployment.name}: container {state}") + return True + return False + + except httpx.ConnectError: + # Worker unreachable - don't change status, worker sync handles this + logger.debug(f"Deployment {deployment.name}: worker unreachable") + return False + except Exception as e: + logger.error(f"Error checking deployment {deployment.name}: {e}") + return False + + return False + + async def _refresh_app_status(self, app: App, db) -> bool: + """Refresh a single app's status by checking container. + + Returns: + True if status was updated, False otherwise + """ + if not app.worker or not app.container_id: + return False + + try: + async with httpx.AsyncClient(timeout=self.HEALTH_CHECK_TIMEOUT) as client: + response = await client.get( + f"http://{app.worker.address}/containers/{app.container_id}" + ) + + if response.status_code == 404: + # Container doesn't exist + if app.status != AppStatus.ERROR.value: + app.status = AppStatus.ERROR.value + app.status_message = "Container not found. Please redeploy." + logger.warning(f"App {app.name}: container not found") + return True + return False + + if response.status_code == 200: + container_info = response.json() + state = container_info.get("state", "").lower() + + if state == "running": + # Container is running, status is valid + logger.debug(f"App {app.name}: container running") + return False + + elif state in ("exited", "dead"): + if app.status != AppStatus.STOPPED.value: + app.status = AppStatus.STOPPED.value + app.status_message = f"Container {state}" + logger.warning(f"App {app.name}: container {state}") + return True + return False + + except httpx.ConnectError: + # Worker unreachable - don't change status, worker sync handles this + logger.debug(f"App {app.name}: worker unreachable") + return False + except Exception as e: + logger.error(f"Error checking app {app.name}: {e}") + return False + + return False + + +# Global instance +worker_sync_service = WorkerSyncService() diff --git a/backend/migrations/008_add_app_parent_id.py b/backend/migrations/008_add_app_parent_id.py new file mode 100644 index 0000000..a31d275 --- /dev/null +++ b/backend/migrations/008_add_app_parent_id.py @@ -0,0 +1,54 @@ +""" +Migration: Add parent_app_id column to apps table for monitoring services + +Run with: python -m migrations.008_add_app_parent_id +""" + +import asyncio +import sys +from pathlib import Path + +# Add parent directory to path for imports +sys.path.insert(0, str(Path(__file__).parent.parent)) + +from sqlalchemy import text +from sqlalchemy.ext.asyncio import create_async_engine + +from app.config import get_settings + + +async def column_exists(conn, table_name: str, column_name: str) -> bool: + """Check if a column exists in a table (SQLite compatible)""" + result = await conn.execute(text(f"PRAGMA table_info({table_name})")) + columns = [row[1] for row in result.fetchall()] + return column_name in columns + + +async def migrate(): + settings = get_settings() + engine = create_async_engine(settings.database_url, echo=True) + + async with engine.begin() as conn: + # Add parent_app_id column if not exists + if not await column_exists(conn, "apps", "parent_app_id"): + print("Adding 'parent_app_id' column to apps table...") + await conn.execute( + text( + """ + ALTER TABLE apps ADD COLUMN parent_app_id INTEGER REFERENCES apps(id) ON DELETE CASCADE + """ + ) + ) + print("'parent_app_id' column added successfully!") + else: + print("'parent_app_id' column already exists") + + print("\n" + "=" * 50) + print("Migration completed successfully!") + print("=" * 50) + + await engine.dispose() + + +if __name__ == "__main__": + asyncio.run(migrate()) diff --git a/frontend/index.html b/frontend/index.html index 4b972d0..51b85bb 100644 --- a/frontend/index.html +++ b/frontend/index.html @@ -2,7 +2,9 @@ - + + + LMStack - LLM Deployment Platform diff --git a/frontend/public/favicon-192.png b/frontend/public/favicon-192.png new file mode 100644 index 0000000..988ac20 Binary files /dev/null and b/frontend/public/favicon-192.png differ diff --git a/frontend/public/favicon-512.png b/frontend/public/favicon-512.png new file mode 100644 index 0000000..5ba963f Binary files /dev/null and b/frontend/public/favicon-512.png differ diff --git a/frontend/public/favicon.ico b/frontend/public/favicon.ico new file mode 100644 index 0000000..6a966ba Binary files /dev/null and b/frontend/public/favicon.ico differ diff --git a/frontend/public/favicon.png b/frontend/public/favicon.png new file mode 100644 index 0000000..b56f1e4 Binary files /dev/null and b/frontend/public/favicon.png differ diff --git a/frontend/src/api/apiKeys.ts b/frontend/src/api/apiKeys.ts index 5cf968a..a691fcf 100644 --- a/frontend/src/api/apiKeys.ts +++ b/frontend/src/api/apiKeys.ts @@ -22,6 +22,22 @@ export interface ApiKeyStats { per_key_stats: Record; } +export interface ModelUsageStats { + model_id: number | null; + model_name: string; + model_source: string; + requests: number; + prompt_tokens: number; + completion_tokens: number; + total_tokens: number; + is_running: boolean; +} + +export interface ModelStatsResponse { + models: ModelUsageStats[]; + mom_stats: ModelUsageStats | null; +} + export const apiKeysApi = { list: async (params?: ApiKeyListParams): Promise> => { const response = await api.get>("/api-keys", { @@ -53,4 +69,11 @@ export const apiKeysApi = { const response = await api.get("/api-keys/stats/summary"); return response.data; }, + + getModelStats: async (): Promise => { + const response = await api.get( + "/api-keys/stats/by-model", + ); + return response.data; + }, }; diff --git a/frontend/src/api/apps.ts b/frontend/src/api/apps.ts index a305adf..3f4e373 100644 --- a/frontend/src/api/apps.ts +++ b/frontend/src/api/apps.ts @@ -9,6 +9,13 @@ export interface AppDefinition { name: string; description: string; image: string; + has_monitoring?: boolean; +} + +export interface AppPortInfo { + name: string; + port: number; + url?: string; } export interface DeployedApp { @@ -26,16 +33,32 @@ export interface DeployedApp { proxy_url?: string; use_proxy: boolean; access_url?: string; + additional_urls?: AppPortInfo[]; api_key_id?: number; + has_monitoring?: boolean; created_at: string; updated_at: string; } +export interface MonitoringServiceStatus { + name: string; + type: string; + status: string; + port?: number; + url?: string; +} + +export interface MonitoringStatus { + enabled: boolean; + services: MonitoringServiceStatus[]; +} + export interface AppDeployRequest { app_type: string; worker_id: number; name?: string; use_proxy?: boolean; + hf_token?: string; } export interface DeployProgress { @@ -97,4 +120,34 @@ export const appsApi = { }); return response.data; }, + + // Monitoring endpoints + getMonitoringStatus: async (id: number): Promise => { + const response = await api.get(`/apps/${id}/monitoring`); + return response.data; + }, + + deployMonitoring: async ( + id: number, + services?: string[], + ): Promise => { + const response = await api.post( + `/apps/${id}/monitoring`, + { + services, + }, + ); + return response.data; + }, + + removeMonitoring: async (id: number): Promise => { + await api.delete(`/apps/${id}/monitoring`); + }, + + removeMonitoringService: async ( + id: number, + serviceType: string, + ): Promise => { + await api.delete(`/apps/${id}/monitoring/${serviceType}`); + }, }; diff --git a/frontend/src/api/index.ts b/frontend/src/api/index.ts index f7974c2..48f8ec6 100644 --- a/frontend/src/api/index.ts +++ b/frontend/src/api/index.ts @@ -30,6 +30,7 @@ export { ollamaApi } from "./ollama"; export { conversationsApi } from "./conversations"; export { appsApi } from "./apps"; export { headscaleApi } from "./headscale"; +export { semanticRouterApi } from "./semanticRouter"; // Types - Workers export type { WorkerListParams } from "./workers"; @@ -50,7 +51,12 @@ export type { ChangePasswordRequest } from "./auth"; export type { UserListParams } from "./users"; // Types - API Keys -export type { ApiKeyListParams, ApiKeyStats } from "./apiKeys"; +export type { + ApiKeyListParams, + ApiKeyStats, + ModelUsageStats, + ModelStatsResponse, +} from "./apiKeys"; // Types - Images export type { ImageListParams, ImageSearchResult } from "./images"; @@ -102,6 +108,8 @@ export type { DeployedApp, AppDeployRequest, DeployProgress, + MonitoringServiceStatus, + MonitoringStatus, } from "./apps"; // Types - Headscale @@ -111,3 +119,6 @@ export type { PreauthKeyResponse, HeadscaleProgress, } from "./headscale"; + +// Types - Semantic Router +export type { SemanticRouterStatus } from "./semanticRouter"; diff --git a/frontend/src/api/semanticRouter.ts b/frontend/src/api/semanticRouter.ts new file mode 100644 index 0000000..7542748 --- /dev/null +++ b/frontend/src/api/semanticRouter.ts @@ -0,0 +1,27 @@ +/** + * Semantic Router API + */ +import { api } from "./client"; + +export interface SemanticRouterStatus { + deployed: boolean; + url?: string; + dashboard_url?: string; + message?: string; +} + +export const semanticRouterApi = { + getStatus: async (): Promise => { + const response = await api.get( + "/semantic-router/status", + ); + return response.data; + }, + + updateConfig: async (): Promise<{ success: boolean; message: string }> => { + const response = await api.post<{ success: boolean; message: string }>( + "/semantic-router/update-config", + ); + return response.data; + }, +}; diff --git a/frontend/src/assets/apps/semantic-router.webp b/frontend/src/assets/apps/semantic-router.webp new file mode 100644 index 0000000..183e1dd Binary files /dev/null and b/frontend/src/assets/apps/semantic-router.webp differ diff --git a/frontend/src/components/HuggingFaceModelPicker.tsx b/frontend/src/components/HuggingFaceModelPicker.tsx index a1efe73..2533866 100644 --- a/frontend/src/components/HuggingFaceModelPicker.tsx +++ b/frontend/src/components/HuggingFaceModelPicker.tsx @@ -627,7 +627,7 @@ export default function HuggingFaceModelPicker({ : "success" } format={() => - `${vramEstimate.estimated_vram_gb.toFixed(1)} / ${gpuMemoryGb} GB` + `${vramEstimate.estimated_vram_gb.toFixed(1)} / ${gpuMemoryGb?.toFixed(1) ?? "N/A"} GB` } /> diff --git a/frontend/src/components/ModelCompatibilityCheck.tsx b/frontend/src/components/ModelCompatibilityCheck.tsx index 61c0832..cd8edf8 100644 --- a/frontend/src/components/ModelCompatibilityCheck.tsx +++ b/frontend/src/components/ModelCompatibilityCheck.tsx @@ -319,7 +319,7 @@ export default function ModelCompatibilityCheck({ : "success" } format={() => - `${vramEstimate.estimated_vram_gb.toFixed(1)} / ${gpuMemoryGb} GB` + `${vramEstimate.estimated_vram_gb.toFixed(1)} / ${gpuMemoryGb?.toFixed(1) ?? "N/A"} GB` } /> diff --git a/frontend/src/pages/ApiKeys.tsx b/frontend/src/pages/ApiKeys.tsx index 561d8d9..44f17f7 100644 --- a/frontend/src/pages/ApiKeys.tsx +++ b/frontend/src/pages/ApiKeys.tsx @@ -39,12 +39,17 @@ import { CheckCircleOutlined, EyeOutlined, EyeInvisibleOutlined, + RobotOutlined, + EditOutlined, } from "@ant-design/icons"; import { apiKeysApi, deploymentsApi, modelsApi, + semanticRouterApi, type ApiKeyStats, + type ModelStatsResponse, + type SemanticRouterStatus, } from "../services/api"; import type { ApiKey, ApiKeyCreate, Deployment, LLMModel } from "../types"; import { useAppTheme, useResponsive } from "../hooks"; @@ -299,12 +304,17 @@ export default function ApiKeys() { [], ); const [stats, setStats] = useState(null); + const [modelStats, setModelStats] = useState(null); + const [semanticRouterStatus, setSemanticRouterStatus] = + useState(null); const [loading, setLoading] = useState(true); const [createModalOpen, setCreateModalOpen] = useState(false); + const [editModalKey, setEditModalKey] = useState(null); const [codeModalKey, setCodeModalKey] = useState(null); const [newKeyModal, setNewKeyModal] = useState(null); const [revealedKeys, setRevealedKeys] = useState>(new Set()); const [createForm] = Form.useForm(); + const [editForm] = Form.useForm(); const { isMobile } = useResponsive(); const { isDark } = useAppTheme(); const { canEdit } = useAuth(); @@ -317,11 +327,20 @@ export default function ApiKeys() { const fetchData = useCallback(async () => { try { - const [keysRes, modelsRes, deploymentsRes, statsRes] = await Promise.all([ + const [ + keysRes, + modelsRes, + deploymentsRes, + statsRes, + modelStatsRes, + srStatusRes, + ] = await Promise.all([ apiKeysApi.list(), modelsApi.list(), deploymentsApi.list(), apiKeysApi.getStats().catch(() => null), + apiKeysApi.getModelStats().catch(() => null), + semanticRouterApi.getStatus().catch(() => null), ]); setApiKeys(keysRes.items); setModels(modelsRes.items); @@ -330,6 +349,8 @@ export default function ApiKeys() { deploymentsRes.items.filter((d) => d.status === "running"), ); setStats(statsRes); + setModelStats(modelStatsRes); + setSemanticRouterStatus(srStatusRes); } catch (error) { console.error("Failed to fetch data:", error); } finally { @@ -366,6 +387,30 @@ export default function ApiKeys() { } }; + const handleUpdate = async (values: Partial) => { + if (!editModalKey) return; + try { + await apiKeysApi.update(editModalKey.id, values); + message.success("API key updated"); + setEditModalKey(null); + editForm.resetFields(); + fetchData(); + } catch (error: unknown) { + const err = error as { response?: { data?: { detail?: string } } }; + message.error(err.response?.data?.detail || "Failed to update API key"); + } + }; + + const openEditModal = (record: ApiKey) => { + setEditModalKey(record); + editForm.setFieldsValue({ + name: record.name, + description: record.description, + allowed_model_ids: record.allowed_model_ids, + monthly_token_limit: record.monthly_token_limit, + }); + }; + const copyToClipboard = async (text: string, label = "Copied") => { try { // Try modern clipboard API first @@ -465,7 +510,7 @@ export default function ApiKeys() { { title: "", key: "actions", - width: 80, + width: 100, render: (_: unknown, record: ApiKey) => (