diff --git a/README.md b/README.md index 6e3ca7f..91ac717 100644 --- a/README.md +++ b/README.md @@ -52,6 +52,33 @@ docker compose -f docker-compose.deploy.yml up -d - Frontend: http://localhost:3000 - Backend API: http://localhost:52000 +### Windows Docker Desktop - LAN Access + +Windows Firewall blocks LAN access by default. Choose one of the following options: + +**Option 1: Disable Firewall (Simplest)** + +```powershell +# Run in PowerShell (Administrator) +Set-NetFirewallProfile -Profile Domain,Public,Private -Enabled False +``` + +**Option 2: Add Firewall Rules (More Secure)** + +```powershell +# Run in PowerShell (Administrator) +# Base ports (Frontend + Backend API) +New-NetFirewallRule -DisplayName "LMStack" -Direction Inbound -LocalPort 3000,52000 -Protocol TCP -Action Allow + +# Model deployment ports (add ports as needed, e.g., 40000-40100) +New-NetFirewallRule -DisplayName "LMStack Models" -Direction Inbound -LocalPort 40000-40100 -Protocol TCP -Action Allow + +# App ports (e.g., Open WebUI on 46488) +New-NetFirewallRule -DisplayName "LMStack Apps" -Direction Inbound -LocalPort 46000-46500 -Protocol TCP -Action Allow +``` + +> **Note**: When you deploy models or apps, check the assigned port in the UI and ensure it's allowed through the firewall. + ### Usage 1. Login with `admin` / `admin` (change password after first login) diff --git a/README_zh-TW.md b/README_zh-TW.md index 28c4500..561d06f 100644 --- a/README_zh-TW.md +++ b/README_zh-TW.md @@ -52,6 +52,33 @@ docker compose -f docker-compose.deploy.yml up -d - 前端: http://localhost:3000 - 後端 API: http://localhost:52000 +### Windows Docker Desktop - 區域網路存取 + +Windows 防火牆預設會阻擋區域網路存取。請選擇以下其中一種方式: + +**方式一:關閉防火牆(最簡單)** + +```powershell +# 在 PowerShell(系統管理員)中執行 +Set-NetFirewallProfile -Profile Domain,Public,Private -Enabled False +``` + +**方式二:新增防火牆規則(較安全)** + +```powershell +# 在 PowerShell(系統管理員)中執行 +# 基本端口(前端 + 後端 API) +New-NetFirewallRule -DisplayName "LMStack" -Direction Inbound -LocalPort 3000,52000 -Protocol TCP -Action Allow + +# 模型部署端口(依需求新增,例如 40000-40100) +New-NetFirewallRule -DisplayName "LMStack Models" -Direction Inbound -LocalPort 40000-40100 -Protocol TCP -Action Allow + +# App 端口(例如 Open WebUI 使用 46488) +New-NetFirewallRule -DisplayName "LMStack Apps" -Direction Inbound -LocalPort 46000-46500 -Protocol TCP -Action Allow +``` + +> **注意**:部署模型或 App 時,請在 UI 中查看分配的端口,並確保該端口已在防火牆中開放。 + ### 使用方式 1. 使用 `admin` / `admin` 登入(首次登入後請更改密碼) diff --git a/backend/Dockerfile b/backend/Dockerfile index 11fc738..323cd1e 100644 --- a/backend/Dockerfile +++ b/backend/Dockerfile @@ -20,9 +20,10 @@ FROM python:3.11-slim WORKDIR /app # Install docker CLI for local worker spawn feature +# Using Docker 27.x for API version 1.47 compatibility RUN apt-get update && apt-get install -y --no-install-recommends \ curl \ - && curl -fsSL https://download.docker.com/linux/static/stable/x86_64/docker-24.0.7.tgz | tar xz --strip-components=1 -C /usr/local/bin docker/docker \ + && curl -fsSL https://download.docker.com/linux/static/stable/x86_64/docker-27.4.1.tgz | tar xz --strip-components=1 -C /usr/local/bin docker/docker \ && rm -rf /var/lib/apt/lists/* # Copy installed packages from builder diff --git a/backend/app/api/apps/deployment.py b/backend/app/api/apps/deployment.py index 6400d42..4002c5d 100644 --- a/backend/app/api/apps/deployment.py +++ b/backend/app/api/apps/deployment.py @@ -96,6 +96,9 @@ async def pull_image_with_progress( ) # Poll for progress while waiting + # Track last known progress to avoid regression when status is "unknown" + last_known_progress = 0 + while not pull_task.done(): try: progress_resp = await client.get(progress_url, timeout=5.0) @@ -105,12 +108,15 @@ async def pull_image_with_progress( progress = progress_data.get("progress", 0) if status == "pulling": - set_deployment_progress( - app_id, - "pulling", - progress, - f"Pulling image {image}... ({progress}%)", - ) + # Only update if progress is moving forward (avoid regression) + if progress >= last_known_progress: + last_known_progress = progress + set_deployment_progress( + app_id, + "pulling", + progress, + f"Pulling image {image}... ({progress}%)", + ) elif status == "completed": set_deployment_progress( app_id, @@ -118,6 +124,7 @@ async def pull_image_with_progress( 100, "Image pulled successfully", ) + # Ignore "unknown" status - keep showing last known progress except Exception: pass # Progress polling is best-effort @@ -145,7 +152,6 @@ async def wait_for_container_healthy( container_id: str, app_id: int, port: int, - max_wait: int = 600, poll_interval: int = 2, ) -> bool: """Wait for container to become healthy. @@ -155,21 +161,22 @@ async def wait_for_container_healthy( container_id: Container ID to check app_id: App ID for progress tracking port: App port for HTTP health check - max_wait: Maximum wait time in seconds poll_interval: Time between checks in seconds Returns: - True if healthy, False if timeout + True if healthy (waits indefinitely until healthy or error) """ waited = 0 consecutive_failures = 0 max_consecutive_failures = 10 # Fail after 20 seconds of no connection + slow_threshold = 1800 # 30 minutes before showing "check" message + shown_slow_message = False worker_host = worker_address.split(":")[0] app_url = f"http://{worker_host}:{port}" async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client: - while waited < max_wait: + while True: # Wait indefinitely try: response = await client.get(f"http://{worker_address}/containers/{container_id}") @@ -205,13 +212,34 @@ async def wait_for_container_healthy( elif "health:" in status or "starting)" in status: # Health check still running - progress_pct = min(50 + int(waited / max_wait * 40), 90) - set_deployment_progress( - app_id, - "starting", - progress_pct, - f"App is initializing ({waited}s, please wait)...", - ) + mins = waited // 60 + secs = waited % 60 + time_str = f"{mins}m {secs}s" if mins > 0 else f"{secs}s" + + if waited >= slow_threshold and not shown_slow_message: + set_deployment_progress( + app_id, + "starting", + 80, + f"App is initializing ({time_str}) - Taking longer than expected. " + "Please check container logs for issues.", + ) + shown_slow_message = True + elif shown_slow_message: + set_deployment_progress( + app_id, + "starting", + 80, + f"App is initializing ({time_str}) - Please check logs if needed.", + ) + else: + progress_pct = min(50 + int(waited / 600 * 40), 90) + set_deployment_progress( + app_id, + "starting", + progress_pct, + f"App is initializing ({time_str}, please wait)...", + ) elif "health" not in status: # No health check defined, verify HTTP access directly @@ -243,8 +271,6 @@ async def wait_for_container_healthy( await asyncio.sleep(poll_interval) waited += poll_interval - return False - async def _verify_http_access( client: httpx.AsyncClient, @@ -354,28 +380,21 @@ async def deploy_app_background( app.port = port await db.commit() - # Phase 3: Wait for health + # Phase 3: Wait for health (waits indefinitely until healthy or error) set_deployment_progress( app_id, "starting", 50, - "Waiting for app to start (this may take 1-3 minutes)...", + "Waiting for app to start...", ) - is_healthy = await wait_for_container_healthy( + await wait_for_container_healthy( worker_address=worker_address, container_id=container_id, app_id=app_id, port=port, ) - if not is_healthy: - app.status = AppStatus.ERROR.value - app.status_message = "Container health check timed out after 10 minutes" - await db.commit() - set_deployment_progress(app_id, "error", 0, "Container health check timed out") - return - # Phase 4: Setup proxy if use_proxy: await _setup_nginx_proxy(app_id, app_type, worker_address, port) @@ -451,6 +470,8 @@ async def _create_container( "lmstack.app.type": app_type.value, "lmstack.app.id": str(app_id), }, + # Add host.docker.internal mapping for container to access host services + "extra_hosts": {"host.docker.internal": "host-gateway"}, } # Add Linux capabilities if specified (e.g., SYS_ADMIN for AnythingLLM) diff --git a/backend/app/api/apps/lifecycle.py b/backend/app/api/apps/lifecycle.py index 80e6d21..53ff99c 100644 --- a/backend/app/api/apps/lifecycle.py +++ b/backend/app/api/apps/lifecycle.py @@ -44,6 +44,12 @@ async def stop_app( if app.status != AppStatus.RUNNING.value: raise HTTPException(status_code=400, detail="App is not running") + if not app.container_id: + # No container to stop, just mark as stopped + app.status = AppStatus.STOPPED.value + await db.commit() + return app_to_response(app, request) + await db.refresh(app, ["worker"]) worker = app.worker @@ -92,6 +98,12 @@ async def start_app( if app.status not in [AppStatus.STOPPED.value, AppStatus.ERROR.value]: raise HTTPException(status_code=400, detail="App is not stopped") + if not app.container_id: + raise HTTPException( + status_code=400, + detail="Container not found. Please delete and redeploy the app.", + ) + await db.refresh(app, ["worker"]) worker = app.worker diff --git a/backend/app/api/apps/routes.py b/backend/app/api/apps/routes.py index 675bd51..f694a9f 100644 --- a/backend/app/api/apps/routes.py +++ b/backend/app/api/apps/routes.py @@ -216,9 +216,8 @@ async def deploy_app( # Initialize progress set_deployment_progress(app.id, "pending", 0, "Deployment queued...") - # Extract lmstack_port for background task - lmstack_host = request.headers.get("host", "localhost:52000") - lmstack_port = lmstack_host.split(":")[-1] if ":" in lmstack_host else "8000" + # Always use backend API port (52000) + lmstack_port = "52000" # Start background deployment background_tasks.add_task( @@ -296,8 +295,9 @@ async def _build_env_vars( db: AsyncSession, ) -> dict: """Build environment variables for the app container.""" - lmstack_host = request.headers.get("host", "localhost:52000") - lmstack_port = lmstack_host.split(":")[-1] if ":" in lmstack_host else "8000" + # Always use backend API port (52000), not the frontend port from request + # The request may come from frontend (port 3000) but API is on 52000 + lmstack_port = "52000" host_ip = get_host_ip(request, worker) diff --git a/backend/app/api/apps/utils.py b/backend/app/api/apps/utils.py index 94e16a9..36b52b2 100644 --- a/backend/app/api/apps/utils.py +++ b/backend/app/api/apps/utils.py @@ -141,6 +141,9 @@ async def call_worker_api( def get_host_ip(request: Request, worker: Worker) -> str: """Determine the host IP that the container can use to reach LMStack. + For Docker containers to reach the host, we use host.docker.internal which + is mapped via extra_hosts to host-gateway when creating the container. + Args: request: FastAPI request object worker: Worker where app is deployed @@ -148,27 +151,35 @@ def get_host_ip(request: Request, worker: Worker) -> str: Returns: Host IP address string """ - import socket + # Check if worker is local (on same machine as LMStack) + worker_ip = worker.address.split(":")[0] + worker_labels = worker.labels or {} + is_local_worker = ( + worker_ip in ("localhost", "127.0.0.1") or worker_labels.get("type") == "local" + ) + if is_local_worker: + # For local workers, use host.docker.internal which is mapped to + # host-gateway via extra_hosts when creating the container. + # This works on all platforms (Linux, Windows, Mac). + return "host.docker.internal" + + # For remote workers, use the LMStack host IP that the worker can reach lmstack_host = request.headers.get("host", "localhost:52000") host_ip = lmstack_host.split(":")[0] if ":" in lmstack_host else lmstack_host - # If host is localhost, try alternatives + # If host is localhost, try to find our external IP if host_ip in ("localhost", "127.0.0.1"): - forwarded_host = request.headers.get("x-forwarded-host") - if forwarded_host: - host_ip = forwarded_host.split(":")[0] - else: - # Try to get our IP on the same network as the worker - try: - worker_ip = worker.address.split(":")[0] - s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - s.connect((worker_ip, 80)) - host_ip = s.getsockname()[0] - s.close() - except OSError as e: - logger.warning(f"Could not determine host IP for worker {worker_ip}: {e}") - host_ip = "host.docker.internal" # Fallback for Docker Desktop + import socket + + try: + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + s.connect((worker_ip, 80)) + host_ip = s.getsockname()[0] + s.close() + except OSError as e: + logger.warning(f"Could not determine host IP for worker {worker_ip}: {e}") + host_ip = "host.docker.internal" # Fallback return host_ip diff --git a/backend/app/api/deployments.py b/backend/app/api/deployments.py index 75d60d6..e974213 100644 --- a/backend/app/api/deployments.py +++ b/backend/app/api/deployments.py @@ -1,6 +1,12 @@ """Deployment API routes""" -from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Query +import json +import logging +from collections.abc import AsyncGenerator + +import httpx +from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Query, Request +from fastapi.responses import StreamingResponse from sqlalchemy import func, select from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import selectinload @@ -21,6 +27,9 @@ WorkerSummary, ) from app.services.deployer import DeployerService +from app.services.gateway import gateway_service + +logger = logging.getLogger(__name__) router = APIRouter() @@ -377,3 +386,134 @@ async def get_deployment_logs( logs = await deployer.get_logs(deployment, tail=tail) return DeploymentLogsResponse(deployment_id=deployment_id, logs=logs) + + +# Chat proxy timeout (5 minutes for long model responses) +CHAT_PROXY_TIMEOUT = 300.0 + + +@router.post("/{deployment_id}/chat") +async def proxy_chat( + deployment_id: int, + request: Request, + db: AsyncSession = Depends(get_db), + current_user: User = Depends(require_viewer), +): + """Proxy chat requests to deployment (requires viewer+). + + This endpoint proxies chat completion requests to the model container, + allowing the frontend to communicate with models without needing direct + network access to Docker internal IPs. + + The request body should be an OpenAI-compatible chat completion request. + Supports both streaming and non-streaming responses. + """ + # Get deployment with worker info + result = await db.execute( + select(Deployment) + .where(Deployment.id == deployment_id) + .options( + selectinload(Deployment.worker), + selectinload(Deployment.model), + ) + ) + deployment = result.scalar_one_or_none() + + if not deployment: + raise HTTPException(status_code=404, detail="Deployment not found") + + if deployment.status != DeploymentStatus.RUNNING.value: + raise HTTPException( + status_code=400, + detail=f"Deployment is not running (status: {deployment.status})", + ) + + if not deployment.worker or not deployment.port: + raise HTTPException(status_code=400, detail="Deployment has no worker or port assigned") + + # Build upstream URL using the gateway service (handles Docker networking correctly) + upstream_url = gateway_service.build_upstream_url( + deployment.worker.address, + deployment.port, + deployment.container_name, + ) + chat_endpoint = f"{upstream_url}/v1/chat/completions" + + # Get request body + try: + body = await request.json() + except (json.JSONDecodeError, ValueError): + raise HTTPException(status_code=400, detail="Invalid JSON body") + + # Check if streaming + is_streaming = body.get("stream", False) + + logger.debug(f"Proxying chat to {chat_endpoint}, streaming={is_streaming}") + + if is_streaming: + return await _proxy_streaming_chat(chat_endpoint, body) + else: + return await _proxy_chat(chat_endpoint, body) + + +async def _proxy_chat(upstream_url: str, body: dict) -> dict: + """Proxy a non-streaming chat request.""" + try: + async with httpx.AsyncClient(timeout=CHAT_PROXY_TIMEOUT) as client: + response = await client.post( + upstream_url, + json=body, + headers={"Content-Type": "application/json"}, + ) + return response.json() + + except httpx.TimeoutException: + raise HTTPException(status_code=504, detail="Request to model timed out") + except httpx.ConnectError: + raise HTTPException(status_code=502, detail="Failed to connect to model") + except httpx.RequestError as e: + logger.error(f"Chat proxy request error: {e}") + raise HTTPException(status_code=502, detail=f"Request error: {str(e)}") + + +async def _proxy_streaming_chat(upstream_url: str, body: dict) -> StreamingResponse: + """Proxy a streaming chat request.""" + + async def stream_generator() -> AsyncGenerator[bytes, None]: + try: + async with httpx.AsyncClient(timeout=CHAT_PROXY_TIMEOUT) as client: + async with client.stream( + "POST", + upstream_url, + json=body, + headers={"Content-Type": "application/json"}, + ) as response: + async for chunk in response.aiter_bytes(): + yield chunk + + except httpx.TimeoutException: + logger.error(f"Streaming timeout for {upstream_url}") + error_data = { + "error": {"message": "Request to model timed out", "type": "timeout_error"} + } + yield f"data: {json.dumps(error_data)}\n\n".encode() + except httpx.ConnectError: + logger.error(f"Connection error for {upstream_url}") + error_data = { + "error": {"message": "Failed to connect to model", "type": "connection_error"} + } + yield f"data: {json.dumps(error_data)}\n\n".encode() + except httpx.RequestError as e: + logger.error(f"Streaming request error: {e}") + error_data = {"error": {"message": f"Request error: {str(e)}", "type": "request_error"}} + yield f"data: {json.dumps(error_data)}\n\n".encode() + + return StreamingResponse( + stream_generator(), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "X-Accel-Buffering": "no", + }, + ) diff --git a/backend/app/api/gateway.py b/backend/app/api/gateway.py index 50d7ad7..c814267 100644 --- a/backend/app/api/gateway.py +++ b/backend/app/api/gateway.py @@ -215,6 +215,7 @@ async def chat_completions( upstream_url = gateway_service.build_upstream_url( deployment.worker.address, deployment.port, + deployment.container_name, ) # Replace model name with the actual model_id for vLLM @@ -317,6 +318,7 @@ async def completions( upstream_url = gateway_service.build_upstream_url( deployment.worker.address, deployment.port, + deployment.container_name, ) # Replace model name with the actual model_id for vLLM @@ -587,6 +589,7 @@ async def embeddings( upstream_url = gateway_service.build_upstream_url( deployment.worker.address, deployment.port, + deployment.container_name, ) # Replace model name with the actual model_id for vLLM @@ -930,6 +933,7 @@ async def responses( upstream_url = gateway_service.build_upstream_url( deployment.worker.address, deployment.port, + deployment.container_name, ) # Convert Responses API format to Chat Completions format diff --git a/backend/app/api/workers.py b/backend/app/api/workers.py index 8c5954d..7e2e047 100644 --- a/backend/app/api/workers.py +++ b/backend/app/api/workers.py @@ -24,12 +24,7 @@ WorkerResponse, WorkerUpdate, ) -from app.services.local_worker import ( - get_local_hostname, - get_local_ip, - spawn_docker_worker, - stop_docker_worker, -) +from app.services.local_worker import get_local_hostname, spawn_docker_worker, stop_docker_worker router = APIRouter() @@ -150,6 +145,12 @@ async def create_worker( existing_worker.status = WorkerStatus.ONLINE.value existing_worker.last_heartbeat = datetime.now(UTC) + # Update labels to mark as local if token is for local worker + if token.is_local: + worker_labels = dict(existing_worker.labels) if existing_worker.labels else {} + worker_labels["type"] = "local" + existing_worker.labels = worker_labels + await db.commit() await db.refresh(existing_worker) @@ -192,11 +193,16 @@ async def create_worker( reported_port = worker_in.address.split(":")[-1] real_address = f"{client_ip}:{reported_port}" + # Set labels for local workers (created via /local endpoint) + worker_labels = dict(worker_in.labels) if worker_in.labels else {} + if token.is_local: + worker_labels["type"] = "local" + worker = Worker( name=worker_in.name, address=real_address, description=worker_in.description, - labels=worker_in.labels, + labels=worker_labels if worker_labels else None, gpu_info=([gpu.model_dump() for gpu in worker_in.gpu_info] if worker_in.gpu_info else None), system_info=(worker_in.system_info.model_dump() if worker_in.system_info else None), status=WorkerStatus.ONLINE.value, @@ -380,15 +386,15 @@ async def register_local_worker( hostname = get_local_hostname() worker_name = hostname - # Get backend URL - use local IP so the Docker container can reach it - local_ip = get_local_ip() + # Use localhost since local worker uses --network host mode settings = get_settings() - backend_url = f"http://{local_ip}:{settings.port}" + backend_url = f"http://localhost:{settings.port}" - # Create a registration token for this worker + # Create a registration token for this worker (marked as local) token = RegistrationToken.create( name=worker_name, expires_in_hours=24, # Token valid for 24 hours + is_local=True, # Mark as local worker ) db.add(token) @@ -425,19 +431,42 @@ def _generate_docker_command(token: str, name: str, backend_url: str) -> str: 1. Worker registers with localhost/host IP (not Docker internal IP) 2. Apps deployed by Worker are accessible via host network 3. Works seamlessly on both regular machines and WSL + + Uses --restart unless-stopped so worker auto-starts after system reboot. + + Command is single-line for cross-platform compatibility (Linux/Mac/Windows). """ - return f"""docker run -d \\ - --name lmstack-worker \\ - --network host \\ - --gpus all \\ - --privileged \\ - -v /var/run/docker.sock:/var/run/docker.sock \\ - -v ~/.cache/huggingface:/root/.cache/huggingface \\ - -v /:/host:ro \\ - -e BACKEND_URL={backend_url} \\ - -e WORKER_NAME={name} \\ - -e REGISTRATION_TOKEN={token} \\ - infinirc/lmstack-worker:latest""" + return ( + f"docker run -d --name lmstack-worker --restart unless-stopped " + f"--network host --gpus all --privileged " + f"-v /var/run/docker.sock:/var/run/docker.sock " + f"-v ~/.cache/huggingface:/root/.cache/huggingface " + f"-v /:/host:ro " + f"-e BACKEND_URL={backend_url} " + f"-e WORKER_NAME={name} " + f"-e REGISTRATION_TOKEN={token} " + f"infinirc/lmstack-worker:latest" + ) + + +def _get_wsl_windows_ip() -> str | None: + """Get Windows host IP from WSL.""" + try: + # Check if we're in WSL + with open("/proc/version") as f: + if "microsoft" not in f.read().lower(): + return None + # Read Windows host IP from resolv.conf (nameserver is Windows host) + with open("/etc/resolv.conf") as f: + for line in f: + if line.startswith("nameserver"): + ip = line.split()[1].strip() + # Filter out localhost entries + if not ip.startswith("127."): + return ip + except Exception: + pass + return None def _get_backend_url(request: Request) -> str: @@ -445,6 +474,12 @@ def _get_backend_url(request: Request) -> str: settings = get_settings() if settings.external_url: return settings.external_url.rstrip("/") + + # For WSL, try to get the Windows host IP for external access + windows_ip = _get_wsl_windows_ip() + if windows_ip: + return f"http://{windows_ip}:{settings.port}" + # Check X-Forwarded headers (from nginx/vite proxy) forwarded_host = request.headers.get("X-Forwarded-Host") if forwarded_host: diff --git a/backend/app/database.py b/backend/app/database.py index 19df838..eaa925a 100644 --- a/backend/app/database.py +++ b/backend/app/database.py @@ -37,11 +37,38 @@ async def get_db() -> AsyncSession: await session.close() +async def _run_migrations(conn): + """Run schema migrations for new columns (SQLite compatible).""" + from sqlalchemy import text + + async def column_exists(table_name: str, column_name: str) -> bool: + """Check if a column exists in a table.""" + result = await conn.execute(text(f"PRAGMA table_info({table_name})")) + columns = [row[1] for row in result.fetchall()] + return column_name in columns + + # Migration: Add container_name to deployments (for Windows Docker compatibility) + if not await column_exists("deployments", "container_name"): + logger.info("Adding 'container_name' column to deployments table...") + await conn.execute(text("ALTER TABLE deployments ADD COLUMN container_name VARCHAR(255)")) + logger.info("'container_name' column added!") + + # Migration: Add is_local to registration_tokens (for local worker detection) + if not await column_exists("registration_tokens", "is_local"): + logger.info("Adding 'is_local' column to registration_tokens table...") + await conn.execute( + text("ALTER TABLE registration_tokens ADD COLUMN is_local BOOLEAN DEFAULT 0") + ) + logger.info("'is_local' column added!") + + async def init_db(): - """Initialize database tables""" + """Initialize database tables and run migrations""" try: async with engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) + # Run schema migrations for any new columns + await _run_migrations(conn) except Exception as e: # Ignore "already exists" errors from race conditions with multiple workers if "already exists" in str(e): diff --git a/backend/app/main.py b/backend/app/main.py index 3e0a87f..703a75f 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -17,6 +17,8 @@ from app.core.exceptions import LMStackError from app.database import async_session_maker, init_db from app.models.worker import Worker, WorkerStatus +from app.services.app_sync import app_sync_service +from app.services.deployment_sync import deployment_sync_service # Configure logging logging.basicConfig( @@ -29,6 +31,12 @@ # Background task control _worker_check_task = None +_deployment_health_task = None +_app_health_task = None + +# Health check interval (in seconds) +DEPLOYMENT_HEALTH_CHECK_INTERVAL = 60 # Check every minute +APP_HEALTH_CHECK_INTERVAL = 30 # Check apps more frequently async def check_worker_status(): @@ -72,26 +80,119 @@ async def check_worker_status(): await asyncio.sleep(10) # Wait before retrying on error +async def check_deployment_health(): + """Background task to periodically check deployment health. + + This ensures that deployments marked as 'starting' eventually become + 'running' or 'error', and catches any containers that crash. + """ + # Initial delay to let containers stabilize after startup sync + await asyncio.sleep(30) + + while True: + try: + await asyncio.sleep(DEPLOYMENT_HEALTH_CHECK_INTERVAL) + + # Only sync deployments that are in transitional states + stats = await deployment_sync_service.sync_all_deployments() + + if stats["total"] > 0: + logger.debug( + f"Deployment health check: {stats['running_verified']} healthy, " + f"{stats['api_not_ready']} loading, {stats['container_missing']} missing" + ) + + except asyncio.CancelledError: + logger.info("Deployment health check task cancelled") + break + except Exception as e: + logger.error(f"Error in deployment health check: {e}") + await asyncio.sleep(30) # Wait before retrying on error + + +async def check_app_health(): + """Background task to periodically check app container health. + + This ensures that apps marked as 'running' are actually running, + and catches any containers that are manually deleted or crashed. + """ + # Initial delay to let system stabilize + await asyncio.sleep(15) + + while True: + try: + await asyncio.sleep(APP_HEALTH_CHECK_INTERVAL) + + stats = await app_sync_service.sync_all_apps() + + if stats["total"] > 0: + logger.debug( + f"App health check: {stats['running_verified']} healthy, " + f"{stats['container_missing']} missing" + ) + + except asyncio.CancelledError: + logger.info("App health check task cancelled") + break + except Exception as e: + logger.error(f"Error in app health check: {e}") + await asyncio.sleep(30) # Wait before retrying on error + + @asynccontextmanager async def lifespan(app: FastAPI): """Application lifespan handler""" - global _worker_check_task + global _worker_check_task, _deployment_health_task, _app_health_task # Startup logger.info("Starting LMStack API Server...") await init_db() logger.info("Database initialized") + # Synchronize deployment status with actual container state + # This is important after system reboot + try: + logger.info("Synchronizing deployment status...") + sync_stats = await deployment_sync_service.sync_all_deployments() + if sync_stats["total"] > 0: + logger.info( + f"Deployment sync complete: {sync_stats['running_verified']} running, " + f"{sync_stats['restarting']} restarting, {sync_stats['api_not_ready']} loading, " + f"{sync_stats['container_missing']} missing" + ) + except Exception as e: + logger.error(f"Failed to sync deployments on startup: {e}") + + # Synchronize app status with actual container state + try: + logger.info("Synchronizing app status...") + app_stats = await app_sync_service.sync_all_apps() + if app_stats["total"] > 0: + logger.info( + f"App sync complete: {app_stats['running_verified']} running, " + f"{app_stats['container_missing']} missing" + ) + except Exception as e: + logger.error(f"Failed to sync apps on startup: {e}") + # Start background task for checking worker status _worker_check_task = asyncio.create_task(check_worker_status()) logger.info("Worker status check task started") + # Start background task for checking deployment health + _deployment_health_task = asyncio.create_task(check_deployment_health()) + logger.info("Deployment health check task started") + + # Start background task for checking app health + _app_health_task = asyncio.create_task(check_app_health()) + logger.info("App health check task started") + yield # Shutdown logger.info("Shutting down LMStack API Server...") - # Cancel background task + # Cancel background tasks if _worker_check_task: _worker_check_task.cancel() try: @@ -100,6 +201,22 @@ async def lifespan(app: FastAPI): pass logger.info("Worker status check task stopped") + if _deployment_health_task: + _deployment_health_task.cancel() + try: + await _deployment_health_task + except asyncio.CancelledError: + pass + logger.info("Deployment health check task stopped") + + if _app_health_task: + _app_health_task.cancel() + try: + await _app_health_task + except asyncio.CancelledError: + pass + logger.info("App health check task stopped") + app = FastAPI( title=settings.app_name, diff --git a/backend/app/models/app.py b/backend/app/models/app.py index 0c6a553..845d1af 100644 --- a/backend/app/models/app.py +++ b/backend/app/models/app.py @@ -49,6 +49,7 @@ class AppStatus(str, Enum): "OLLAMA_BASE_URL": "", # Disable Ollama "WEBUI_SECRET_KEY": "", "ENABLE_OLLAMA_API": "false", + "RESET_CONFIG_ON_START": "true", # Force use env vars on restart }, "volumes": [{"name": "open-webui-data", "destination": "/app/backend/data"}], }, diff --git a/backend/app/models/deployment.py b/backend/app/models/deployment.py index 9064637..83eff06 100644 --- a/backend/app/models/deployment.py +++ b/backend/app/models/deployment.py @@ -48,6 +48,7 @@ class Deployment(Base): # Container info container_id: Mapped[str | None] = mapped_column(String(255), nullable=True) + container_name: Mapped[str | None] = mapped_column(String(255), nullable=True) port: Mapped[int | None] = mapped_column(Integer, nullable=True) # Configuration diff --git a/backend/app/models/registration_token.py b/backend/app/models/registration_token.py index a26d84d..9dfdfd8 100644 --- a/backend/app/models/registration_token.py +++ b/backend/app/models/registration_token.py @@ -25,6 +25,9 @@ class RegistrationToken(Base): ) name: Mapped[str] = mapped_column(String(255), nullable=False) # Suggested worker name is_used: Mapped[bool] = mapped_column(Boolean, default=False) + is_local: Mapped[bool] = mapped_column( + Boolean, default=False + ) # True if created via /local endpoint used_by_worker_id: Mapped[int | None] = mapped_column(Integer, nullable=True) created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now()) @@ -32,11 +35,14 @@ class RegistrationToken(Base): used_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True) @classmethod - def create(cls, name: str, expires_in_hours: int = 24) -> "RegistrationToken": + def create( + cls, name: str, expires_in_hours: int = 24, is_local: bool = False + ) -> "RegistrationToken": """Create a new registration token""" return cls( token=generate_token(), name=name, + is_local=is_local, expires_at=datetime.utcnow() + timedelta(hours=expires_in_hours), ) diff --git a/backend/app/services/__init__.py b/backend/app/services/__init__.py index 8f4b144..97b5f64 100644 --- a/backend/app/services/__init__.py +++ b/backend/app/services/__init__.py @@ -2,10 +2,13 @@ from app.services.auth import AuthService, auth_service from app.services.deployer import DeployerService +from app.services.deployment_sync import DeploymentSyncService, deployment_sync_service from app.services.gateway import GatewayService, gateway_service __all__ = [ "DeployerService", + "DeploymentSyncService", + "deployment_sync_service", "GatewayService", "gateway_service", "AuthService", diff --git a/backend/app/services/app_sync.py b/backend/app/services/app_sync.py new file mode 100644 index 0000000..b8ddaef --- /dev/null +++ b/backend/app/services/app_sync.py @@ -0,0 +1,173 @@ +"""App Sync Service + +Synchronizes app status with actual container state. +This is important after system reboot to ensure database status matches reality. +""" + +import asyncio +import logging + +import httpx +from sqlalchemy import select +from sqlalchemy.orm import selectinload + +from app.database import async_session_maker +from app.models.app import App, AppStatus + +logger = logging.getLogger(__name__) + + +class AppSyncService: + """Service for synchronizing app status with actual container state.""" + + # Configuration + CONTAINER_CHECK_TIMEOUT = 10 # seconds + MAX_CONCURRENT_CHECKS = 5 # limit concurrent checks + + async def sync_all_apps(self) -> dict: + """Synchronize all app statuses. + + Returns: + dict with sync statistics + """ + logger.info("Starting app status synchronization...") + + stats = { + "total": 0, + "running_verified": 0, + "container_missing": 0, + "errors": 0, + "skipped": 0, + } + + async with async_session_maker() as db: + # Get all apps that should be running or are in transitional states + result = await db.execute( + select(App) + .where( + App.status.in_( + [ + AppStatus.RUNNING.value, + AppStatus.STARTING.value, + AppStatus.PULLING.value, + ] + ) + ) + .options(selectinload(App.worker)) + ) + apps = result.scalars().all() + stats["total"] = len(apps) + + if not apps: + logger.info("No active apps to sync") + return stats + + logger.info(f"Found {len(apps)} active apps to check") + + # Check apps with limited concurrency + semaphore = asyncio.Semaphore(self.MAX_CONCURRENT_CHECKS) + + async def check_with_semaphore(app: App): + async with semaphore: + return await self._check_and_update_app(app, db) + + tasks = [check_with_semaphore(a) for a in apps] + results = await asyncio.gather(*tasks, return_exceptions=True) + + # Aggregate results + for result in results: + if isinstance(result, Exception): + logger.error(f"App check failed: {result}") + stats["errors"] += 1 + elif isinstance(result, str): + if result == "running_verified": + stats["running_verified"] += 1 + elif result == "container_missing": + stats["container_missing"] += 1 + elif result == "skipped": + stats["skipped"] += 1 + + await db.commit() + + logger.info( + f"App sync complete: {stats['running_verified']} running, " + f"{stats['container_missing']} missing, {stats['errors']} errors" + ) + + return stats + + async def _check_and_update_app(self, app: App, db) -> str: + """Check a single app and update its status. + + Returns: + Status string: running_verified, container_missing, skipped + """ + logger.debug(f"Checking app {app.id}: {app.name}") + + if not app.worker: + logger.warning(f"App {app.id} has no worker, skipping") + return "skipped" + + if not app.container_id: + logger.warning(f"App {app.id} has no container_id, marking as error") + app.status = AppStatus.ERROR.value + app.status_message = "Container ID missing" + return "container_missing" + + # Check if worker is online + if app.worker.status != "online": + logger.warning(f"App {app.id}: worker offline") + app.status = AppStatus.ERROR.value + app.status_message = f"Worker {app.worker.name} is offline" + return "container_missing" + + # Check container status via worker API + try: + async with httpx.AsyncClient(timeout=self.CONTAINER_CHECK_TIMEOUT) as client: + response = await client.get( + f"http://{app.worker.address}/containers/{app.container_id}" + ) + + if response.status_code == 404: + # Container doesn't exist + logger.warning(f"App {app.name}: container not found") + app.status = AppStatus.ERROR.value + app.status_message = "Container not found. Please redeploy." + return "container_missing" + + if response.status_code == 200: + container_info = response.json() + state = container_info.get("state", "").lower() + + if state == "running": + if app.status != AppStatus.RUNNING.value: + app.status = AppStatus.RUNNING.value + app.status_message = None + logger.debug(f"App {app.name}: running and verified") + return "running_verified" + + elif state in ("exited", "dead"): + logger.warning(f"App {app.name}: container {state}") + app.status = AppStatus.STOPPED.value + app.status_message = f"Container {state}" + return "container_missing" + + else: + logger.debug(f"App {app.name}: container state {state}") + return "running_verified" + + except httpx.ConnectError: + logger.warning(f"App {app.id}: cannot connect to worker") + app.status = AppStatus.ERROR.value + app.status_message = f"Cannot connect to worker {app.worker.name}" + return "container_missing" + + except Exception as e: + logger.error(f"Error checking app {app.name}: {e}") + return "skipped" + + return "skipped" + + +# Global instance +app_sync_service = AppSyncService() diff --git a/backend/app/services/deployer.py b/backend/app/services/deployer.py index 103dcec..aff7a4c 100644 --- a/backend/app/services/deployer.py +++ b/backend/app/services/deployer.py @@ -23,7 +23,7 @@ class DeployerService: # Health check configuration HEALTH_CHECK_INTERVAL = 5 # seconds between checks - HEALTH_CHECK_TIMEOUT = 600 # max seconds to wait (10 minutes for large models) + HEALTH_CHECK_SLOW_THRESHOLD = 600 # seconds before showing "slow loading" message (10 min) HEALTH_CHECK_REQUEST_TIMEOUT = 10 # timeout for each health check request async def deploy(self, deployment_id: int) -> None: @@ -81,7 +81,11 @@ async def deploy(self, deployment_id: int) -> None: return deployment.container_id = result.get("container_id") deployment.port = result.get("port") + # Store container_name for internal Docker network communication + local_container_name = result.get("container_name") + deployment.container_name = local_container_name else: + local_container_name = None # Remote workers use IP:port # Send to remote worker agent worker_url = f"http://{deployment.worker.address}/deploy" progress_url = ( @@ -149,6 +153,7 @@ async def deploy(self, deployment_id: int) -> None: ollama_ready = await self._wait_for_ollama_ready( deployment.worker.address, deployment.port, + container_name=local_container_name, ) if not ollama_ready: deployment.status = DeploymentStatus.ERROR.value @@ -163,6 +168,7 @@ async def deploy(self, deployment_id: int) -> None: deployment.worker.address, deployment.port, deployment.model.model_id, + container_name=local_container_name, ) if not pull_success: deployment.status = DeploymentStatus.ERROR.value @@ -180,6 +186,7 @@ async def deploy(self, deployment_id: int) -> None: deployment_id, db, backend=deployment.backend, + container_name=local_container_name, ) # Refresh deployment object after health check updates @@ -189,12 +196,10 @@ async def deploy(self, deployment_id: int) -> None: # Deployment was cancelled, don't update status logger.info(f"Deployment {deployment_id} cancelled during startup") return - elif api_ready: + else: + # api_ready is True, model is ready deployment.status = DeploymentStatus.RUNNING.value deployment.status_message = "Model ready" - else: - deployment.status = DeploymentStatus.ERROR.value - deployment.status_message = "Model failed to start within timeout" except httpx.ConnectError: deployment.status = DeploymentStatus.ERROR.value @@ -213,6 +218,7 @@ async def _wait_for_ollama_ready( worker_address: str, port: int, timeout: int = 60, + container_name: str | None = None, ) -> bool: """Wait for Ollama API to be available. @@ -220,12 +226,17 @@ async def _wait_for_ollama_ready( worker_address: Worker address (host:port) port: Ollama container port timeout: Maximum wait time in seconds + container_name: Container name for Docker network (Windows compatibility) Returns: True if Ollama is ready, False on timeout """ - worker_ip = worker_address.split(":")[0] - api_url = f"http://{worker_ip}:{port}/api/tags" + # Ollama is configured to use port 8000 (OLLAMA_HOST=0.0.0.0:8000) + if container_name: + api_url = f"http://{container_name}:8000/api/tags" + else: + worker_ip = worker_address.split(":")[0] + api_url = f"http://{worker_ip}:{port}/api/tags" logger.info(f"Waiting for Ollama API at {api_url}") @@ -255,14 +266,19 @@ async def _ollama_pull_model( worker_address: str, port: int, model_id: str, + container_name: str | None = None, ) -> bool: """Pull a model using Ollama API. Ollama requires models to be pulled before they can be used. This method calls the /api/pull endpoint and waits for completion. """ - worker_ip = worker_address.split(":")[0] - api_url = f"http://{worker_ip}:{port}/api/pull" + # Ollama is configured to use port 8000 (OLLAMA_HOST=0.0.0.0:8000) + if container_name: + api_url = f"http://{container_name}:8000/api/pull" + else: + worker_ip = worker_address.split(":")[0] + api_url = f"http://{worker_ip}:{port}/api/pull" logger.info(f"Pulling Ollama model: {model_id}") @@ -319,17 +335,33 @@ async def _wait_for_api_ready( deployment_id: int, db, backend: str = BackendType.VLLM.value, + container_name: str | None = None, ) -> bool | None: """ - Poll the OpenAI API endpoint until it's ready or timeout. + Poll the OpenAI API endpoint until it's ready or cancelled. + + Args: + worker_address: Worker address (host:port) + port: Host port for the model API + deployment_id: Deployment ID for status updates + db: Database session + backend: Backend type (vllm, ollama, etc.) + container_name: Container name for local Docker network communication. + If set, uses container_name:8000 instead of worker_ip:port. + This is needed for Windows Docker Desktop compatibility. Returns: True: API is ready - False: Timeout or error None: Cancelled (user stopped deployment) """ - worker_ip = worker_address.split(":")[0] - api_base_url = f"http://{worker_ip}:{port}" + # For local deployments with container_name, use Docker internal networking + # All backends (vLLM, SGLang, Ollama) are configured to use port 8000 + if container_name: + api_base_url = f"http://{container_name}:8000" + logger.info(f"Using Docker network for API: {api_base_url}") + else: + worker_ip = worker_address.split(":")[0] + api_base_url = f"http://{worker_ip}:{port}" # Both vLLM and Ollama support OpenAI-compatible /v1/models endpoint health_endpoint = f"{api_base_url}/v1/models" @@ -339,11 +371,12 @@ async def _wait_for_api_ready( elapsed = 0 check_count = 0 + shown_slow_message = False logger.info(f"Waiting for API to be ready at {health_endpoint} (backend={backend})") async with httpx.AsyncClient(timeout=self.HEALTH_CHECK_REQUEST_TIMEOUT) as client: - while elapsed < self.HEALTH_CHECK_TIMEOUT: + while True: # Wait indefinitely until ready or cancelled check_count += 1 # Check if deployment was cancelled @@ -408,9 +441,25 @@ async def _wait_for_api_ready( mins = elapsed // 60 secs = elapsed % 60 time_str = f"{mins}m {secs}s" if mins > 0 else f"{secs}s" - deployment.status_message = ( - f"Loading model into GPU memory... ({time_str})" - ) + + # Show patience message after threshold + if ( + elapsed >= self.HEALTH_CHECK_SLOW_THRESHOLD + and not shown_slow_message + ): + deployment.status_message = ( + f"Loading model... ({time_str}) - " + "Large model or slow network detected. Please be patient." + ) + shown_slow_message = True + elif shown_slow_message: + deployment.status_message = ( + f"Loading model... ({time_str}) - Please be patient." + ) + else: + deployment.status_message = ( + f"Loading model into GPU memory... ({time_str})" + ) await db.commit() except Exception as e: logger.debug(f"Error updating deployment status message: {e}") @@ -418,9 +467,6 @@ async def _wait_for_api_ready( await asyncio.sleep(self.HEALTH_CHECK_INTERVAL) elapsed += self.HEALTH_CHECK_INTERVAL - logger.warning(f"API health check timed out after {elapsed}s ({check_count} checks)") - return False - def _is_local_worker(self, address: str) -> bool: """Check if the worker address refers to the local machine.""" if not address: @@ -479,6 +525,10 @@ async def _deploy_local(self, deploy_request: dict) -> dict: This is used for local workers where we don't need to go through a remote worker agent. + + On Windows Docker Desktop, containers must be on the same network + to communicate. We put model containers on the 'lmstack' network + so the backend can reach them via container name. """ try: client = docker.from_env() @@ -489,12 +539,26 @@ async def _deploy_local(self, deploy_request: dict) -> dict: gpu_indexes = deploy_request.get("gpu_indexes", [0]) deployment_name = deploy_request.get("deployment_name", "lmstack-deployment") - # Find available port + # Find available port (still used for external access) host_port = self._find_available_port() - # Container name + # Container name - used for internal Docker network communication container_name = f"lmstack-{deployment_name}-{deploy_request['deployment_id']}" + # Ensure lmstack network exists (for Windows Docker Desktop compatibility) + network_name = "lmstack_lmstack" + try: + client.networks.get(network_name) + except docker.errors.NotFound: + # Try alternative network name (depends on compose project name) + try: + network_name = "lmstack" + client.networks.get(network_name) + except docker.errors.NotFound: + # Create the network if it doesn't exist + logger.info(f"Creating Docker network: {network_name}") + client.networks.create(network_name, driver="bridge") + # Build GPU device requests device_requests = [ docker.types.DeviceRequest( @@ -526,12 +590,17 @@ async def _deploy_local(self, deploy_request: dict) -> dict: }, shm_size="16g", # Required for large model inference restart_policy={"Name": "unless-stopped"}, + network=network_name, # Join lmstack network for Windows compatibility ) - logger.info(f"Started local container: {container.id[:12]} on port {host_port}") + logger.info( + f"Started local container: {container.id[:12]} " + f"(name={container_name}) on network={network_name}, port={host_port}" + ) return { "container_id": container.id, + "container_name": container_name, "port": host_port, } diff --git a/backend/app/services/deployment_sync.py b/backend/app/services/deployment_sync.py new file mode 100644 index 0000000..d99b7e0 --- /dev/null +++ b/backend/app/services/deployment_sync.py @@ -0,0 +1,360 @@ +"""Deployment Sync Service + +Synchronizes deployment status with actual container state. +This is important after system reboot to ensure database status matches reality. +""" + +import asyncio +import logging + +import docker +import httpx +from sqlalchemy import select +from sqlalchemy.orm import selectinload + +from app.database import async_session_maker +from app.models.deployment import Deployment, DeploymentStatus +from app.models.llm_model import BackendType + +logger = logging.getLogger(__name__) + + +class DeploymentSyncService: + """Service for synchronizing deployment status with actual container state.""" + + # Health check configuration + HEALTH_CHECK_TIMEOUT = 10 # seconds + CONTAINER_CHECK_TIMEOUT = 5 # seconds + MAX_CONCURRENT_CHECKS = 5 # limit concurrent health checks + + def __init__(self): + self._docker_client: docker.DockerClient | None = None + + @property + def docker_client(self) -> docker.DockerClient: + """Lazy-load Docker client.""" + if self._docker_client is None: + try: + self._docker_client = docker.from_env() + except docker.errors.DockerException as e: + logger.warning(f"Failed to connect to Docker: {e}") + raise + return self._docker_client + + async def sync_all_deployments(self) -> dict: + """Synchronize all deployment statuses on startup. + + Returns: + dict with sync statistics + """ + logger.info("Starting deployment status synchronization...") + + stats = { + "total": 0, + "running_verified": 0, + "restarting": 0, + "container_missing": 0, + "api_not_ready": 0, + "errors": 0, + "skipped": 0, + } + + async with async_session_maker() as db: + # Get all deployments that should be running + result = await db.execute( + select(Deployment) + .where( + Deployment.status.in_( + [ + DeploymentStatus.RUNNING.value, + DeploymentStatus.STARTING.value, + ] + ) + ) + .options( + selectinload(Deployment.worker), + selectinload(Deployment.model), + ) + ) + deployments = result.scalars().all() + stats["total"] = len(deployments) + + if not deployments: + logger.info("No active deployments to sync") + return stats + + logger.info(f"Found {len(deployments)} active deployments to check") + + # Check deployments with limited concurrency + semaphore = asyncio.Semaphore(self.MAX_CONCURRENT_CHECKS) + + async def check_with_semaphore(deployment: Deployment): + async with semaphore: + return await self._check_and_update_deployment(deployment, db) + + tasks = [check_with_semaphore(d) for d in deployments] + results = await asyncio.gather(*tasks, return_exceptions=True) + + # Aggregate results + for result in results: + if isinstance(result, Exception): + logger.error(f"Deployment check failed: {result}") + stats["errors"] += 1 + elif isinstance(result, str): + if result == "running_verified": + stats["running_verified"] += 1 + elif result == "restarting": + stats["restarting"] += 1 + elif result == "container_missing": + stats["container_missing"] += 1 + elif result == "api_not_ready": + stats["api_not_ready"] += 1 + elif result == "skipped": + stats["skipped"] += 1 + + await db.commit() + + logger.info( + f"Deployment sync complete: {stats['running_verified']} running, " + f"{stats['restarting']} restarting, {stats['container_missing']} missing, " + f"{stats['api_not_ready']} not ready, {stats['errors']} errors" + ) + + return stats + + async def _check_and_update_deployment(self, deployment: Deployment, db) -> str: + """Check a single deployment and update its status. + + Returns: + Status string: running_verified, restarting, container_missing, api_not_ready, skipped + """ + logger.debug(f"Checking deployment {deployment.id}: {deployment.name}") + + if not deployment.worker: + logger.warning(f"Deployment {deployment.id} has no worker, skipping") + return "skipped" + + if not deployment.container_id: + logger.warning(f"Deployment {deployment.id} has no container_id, marking as error") + deployment.status = DeploymentStatus.ERROR.value + deployment.status_message = "Container ID missing after restart" + return "container_missing" + + # Check if this is a local worker + is_local = self._is_local_worker(deployment.worker.address) + + if is_local: + return await self._check_local_deployment(deployment) + else: + return await self._check_remote_deployment(deployment) + + def _is_local_worker(self, address: str) -> bool: + """Check if the worker address refers to the local machine.""" + if not address: + return False + host = address.split(":")[0].lower() + return host in ("localhost", "127.0.0.1", "local") + + async def _check_local_deployment(self, deployment: Deployment) -> str: + """Check a local deployment's container and API status.""" + try: + # Check container status + container = self.docker_client.containers.get(deployment.container_id) + container_status = container.status + + if container_status == "running": + # Container is running, check API health + api_healthy = await self._check_api_health( + deployment.worker.address, + deployment.port, + deployment.backend, + deployment.container_name, + ) + + if api_healthy: + # Everything is good + if deployment.status != DeploymentStatus.RUNNING.value: + deployment.status = DeploymentStatus.RUNNING.value + deployment.status_message = "Model ready (verified after restart)" + logger.info(f"Deployment {deployment.name}: running and healthy") + return "running_verified" + else: + # Container running but API not ready yet + deployment.status = DeploymentStatus.STARTING.value + deployment.status_message = "Container running, waiting for model to load..." + logger.info(f"Deployment {deployment.name}: container running, API not ready") + return "api_not_ready" + + elif container_status == "restarting": + deployment.status = DeploymentStatus.STARTING.value + deployment.status_message = "Container restarting after system reboot..." + logger.info(f"Deployment {deployment.name}: container restarting") + return "restarting" + + elif container_status in ("exited", "dead"): + # Container exists but stopped - try to restart it + logger.info( + f"Deployment {deployment.name}: container {container_status}, attempting restart" + ) + try: + container.start() + deployment.status = DeploymentStatus.STARTING.value + deployment.status_message = "Restarting container after system reboot..." + return "restarting" + except docker.errors.APIError as e: + logger.error(f"Failed to restart container: {e}") + deployment.status = DeploymentStatus.ERROR.value + deployment.status_message = f"Failed to restart container: {e}" + return "container_missing" + + else: + # Unknown status + deployment.status = DeploymentStatus.STARTING.value + deployment.status_message = f"Container status: {container_status}" + logger.warning( + f"Deployment {deployment.name}: unknown container status {container_status}" + ) + return "restarting" + + except docker.errors.NotFound: + # Container doesn't exist + logger.warning(f"Deployment {deployment.name}: container not found") + deployment.status = DeploymentStatus.ERROR.value + deployment.status_message = "Container not found after restart. Please redeploy." + return "container_missing" + + except docker.errors.DockerException as e: + logger.error(f"Docker error checking deployment {deployment.name}: {e}") + deployment.status = DeploymentStatus.ERROR.value + deployment.status_message = f"Docker error: {e}" + return "container_missing" + + async def _check_remote_deployment(self, deployment: Deployment) -> str: + """Check a remote deployment's status via worker API.""" + try: + # For remote workers, check if worker is online first + if deployment.worker.status != "online": + deployment.status = DeploymentStatus.ERROR.value + deployment.status_message = f"Worker {deployment.worker.name} is offline" + logger.warning(f"Deployment {deployment.name}: worker offline") + return "container_missing" + + # Check API health + api_healthy = await self._check_api_health( + deployment.worker.address, + deployment.port, + deployment.backend, + None, # No container_name for remote + ) + + if api_healthy: + if deployment.status != DeploymentStatus.RUNNING.value: + deployment.status = DeploymentStatus.RUNNING.value + deployment.status_message = "Model ready (verified after restart)" + logger.info(f"Deployment {deployment.name}: remote deployment healthy") + return "running_verified" + else: + deployment.status = DeploymentStatus.STARTING.value + deployment.status_message = "Waiting for model to load..." + logger.info(f"Deployment {deployment.name}: remote API not ready") + return "api_not_ready" + + except Exception as e: + logger.error(f"Error checking remote deployment {deployment.name}: {e}") + deployment.status = DeploymentStatus.ERROR.value + deployment.status_message = f"Error checking status: {e}" + return "container_missing" + + async def _check_api_health( + self, + worker_address: str, + port: int, + backend: str, + container_name: str | None = None, + ) -> bool: + """Check if the deployment API is healthy. + + Args: + worker_address: Worker address (host:port) + port: Host port for the model API + backend: Backend type (vllm, ollama, etc.) + container_name: Container name for Docker network (local deployments) + + Returns: + True if API is healthy, False otherwise + """ + # Build API URL + if container_name: + # Local deployment - use container name on Docker network + api_base_url = f"http://{container_name}:8000" + else: + # Remote deployment - use worker IP and port + worker_ip = worker_address.split(":")[0] + api_base_url = f"http://{worker_ip}:{port}" + + # Check /v1/models endpoint (supported by vLLM, SGLang, and Ollama) + health_endpoint = f"{api_base_url}/v1/models" + + try: + async with httpx.AsyncClient(timeout=self.HEALTH_CHECK_TIMEOUT) as client: + response = await client.get(health_endpoint) + + if response.status_code == 200: + data = response.json() + # Check if models are loaded + if data.get("data") and len(data["data"]) > 0: + return True + + # For Ollama, also try native endpoint + if backend == BackendType.OLLAMA.value: + ollama_endpoint = f"{api_base_url}/api/tags" + ollama_response = await client.get(ollama_endpoint) + if ollama_response.status_code == 200: + ollama_data = ollama_response.json() + if ollama_data.get("models") and len(ollama_data["models"]) > 0: + return True + + return False + + except httpx.ConnectError: + logger.debug(f"API not reachable at {health_endpoint}") + return False + except httpx.ReadTimeout: + logger.debug(f"API timeout at {health_endpoint}") + return False + except Exception as e: + logger.debug(f"API health check error: {e}") + return False + + async def check_deployment_health(self, deployment_id: int) -> dict: + """Check health of a single deployment. + + Returns: + dict with status and message + """ + async with async_session_maker() as db: + result = await db.execute( + select(Deployment) + .where(Deployment.id == deployment_id) + .options( + selectinload(Deployment.worker), + selectinload(Deployment.model), + ) + ) + deployment = result.scalar_one_or_none() + + if not deployment: + return {"status": "error", "message": "Deployment not found"} + + status_result = await self._check_and_update_deployment(deployment, db) + await db.commit() + + return { + "status": deployment.status, + "message": deployment.status_message, + "check_result": status_result, + } + + +# Global instance +deployment_sync_service = DeploymentSyncService() diff --git a/backend/app/services/gateway.py b/backend/app/services/gateway.py index 5439c1d..a1dc05e 100644 --- a/backend/app/services/gateway.py +++ b/backend/app/services/gateway.py @@ -263,12 +263,27 @@ async def record_usage( await db.commit() @staticmethod - def build_upstream_url(worker_address: str, port: int) -> str: + def build_upstream_url( + worker_address: str, port: int, container_name: str | None = None + ) -> str: """Build the upstream URL for the deployment. - worker_address may include port (e.g., "192.168.1.1:8080"), - we only need the host part. + For local deployments with container_name, use Docker internal networking + (container_name:8000) for container-to-container communication. + This is required for Windows Docker Desktop where host.docker.internal:port + doesn't work for backend-to-model communication. + + For remote workers, use worker_address:port as before. + + Args: + worker_address: Worker address (may include port, e.g., "192.168.1.1:8080") + port: Host port for the deployment + container_name: Docker container name for local deployments """ + # For local deployments with container name, use Docker internal networking + if container_name: + return f"http://{container_name}:8000" + # Extract host from worker address (remove agent port if present) host = worker_address.split(":")[0] return f"http://{host}:{port}" diff --git a/backend/app/services/local_worker.py b/backend/app/services/local_worker.py index 28f4456..4b5e499 100644 --- a/backend/app/services/local_worker.py +++ b/backend/app/services/local_worker.py @@ -160,12 +160,15 @@ def spawn_docker_worker( # Build the docker run command # Use --network host so worker can access backend and deployed apps + # Use --restart unless-stopped so worker auto-starts after reboot cmd = [ "docker", "run", "-d", "--name", container_name, + "--restart", + "unless-stopped", "--network", "host", "--gpus", diff --git a/backend/migrations/007_add_deployment_container_name.py b/backend/migrations/007_add_deployment_container_name.py new file mode 100644 index 0000000..af4b769 --- /dev/null +++ b/backend/migrations/007_add_deployment_container_name.py @@ -0,0 +1,59 @@ +""" +Migration: Add container_name column to deployments table + +This column stores the Docker container name for local deployments, +enabling internal Docker network communication (container-to-container). +Required for Windows Docker Desktop compatibility where host.docker.internal:port +doesn't work for backend-to-model communication. + +Run with: python -m migrations.007_add_deployment_container_name +""" + +import asyncio +import sys +from pathlib import Path + +# Add parent directory to path for imports +sys.path.insert(0, str(Path(__file__).parent.parent)) + +from sqlalchemy import text +from sqlalchemy.ext.asyncio import create_async_engine + +from app.config import get_settings + + +async def column_exists(conn, table_name: str, column_name: str) -> bool: + """Check if a column exists in a table (SQLite compatible)""" + result = await conn.execute(text(f"PRAGMA table_info({table_name})")) + columns = [row[1] for row in result.fetchall()] + return column_name in columns + + +async def migrate(): + settings = get_settings() + engine = create_async_engine(settings.database_url, echo=True) + + async with engine.begin() as conn: + # Add container_name column + if not await column_exists(conn, "deployments", "container_name"): + print("Adding 'container_name' column to deployments table...") + await conn.execute( + text( + """ + ALTER TABLE deployments ADD COLUMN container_name VARCHAR(255) + """ + ) + ) + print("'container_name' column added!") + else: + print("'container_name' column already exists") + + print("\n" + "=" * 50) + print("Migration completed successfully!") + print("=" * 50) + + await engine.dispose() + + +if __name__ == "__main__": + asyncio.run(migrate()) diff --git a/docker-compose.local.yml b/docker-compose.local.yml index 8e2d736..4423911 100644 --- a/docker-compose.local.yml +++ b/docker-compose.local.yml @@ -9,7 +9,9 @@ services: image: infinirc/lmstack-backend:local container_name: lmstack-backend user: root - network_mode: host + # Use bridge network for Windows compatibility (network_mode: host doesn't work on Windows) + ports: + - "0.0.0.0:52000:52000" volumes: - lmstack-data:/app/data - /var/run/docker.sock:/var/run/docker.sock @@ -18,21 +20,27 @@ services: - LMSTACK_SECRET_KEY=${SECRET_KEY:-dev-secret-key} - LMSTACK_EXTERNAL_URL=${EXTERNAL_URL:-} restart: unless-stopped + networks: + - lmstack frontend: image: infinirc/lmstack-frontend:local container_name: lmstack-frontend ports: - - "3000:80" - extra_hosts: - - "host.docker.internal:host-gateway" + - "0.0.0.0:3000:80" environment: - - BACKEND_HOST=host.docker.internal + - BACKEND_HOST=server - NGINX_ENVSUBST_FILTER=BACKEND_HOST depends_on: - server restart: unless-stopped + networks: + - lmstack volumes: lmstack-data: driver: local + +networks: + lmstack: + driver: bridge diff --git a/docker-compose.yml b/docker-compose.yml index 1254427..388d427 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -36,6 +36,9 @@ services: LMSTACK_CORS_ORIGINS: ${CORS_ORIGINS:-*} ports: - "${BACKEND_PORT:-52000}:52000" + volumes: + # Mount docker socket for local worker management + - /var/run/docker.sock:/var/run/docker.sock depends_on: db: condition: service_healthy diff --git a/frontend/src/pages/ApiKeys.tsx b/frontend/src/pages/ApiKeys.tsx index bb3580f..561d8d9 100644 --- a/frontend/src/pages/ApiKeys.tsx +++ b/frontend/src/pages/ApiKeys.tsx @@ -764,11 +764,18 @@ export default function ApiKeys() {
Use the OpenAI SDK with your LMStack endpoint. Base URL:{" "} {baseUrl}/v1 + + For Docker containers (e.g., Open WebUI, n8n), use:{" "} + http://host.docker.internal:52000/v1 + { - if ( - !deployment.worker || - !deployment.port || - deployment.status !== "running" - ) { + if (deployment.status !== "running") { return null; } - const workerIp = deployment.worker.address.split(":")[0]; - return `http://${workerIp}:${deployment.port}/v1/chat/completions`; + // Use backend proxy endpoint instead of direct model URL + // This handles Docker internal networking correctly (especially on Windows) + return `/api/deployments/${deployment.id}/chat`; }; /** @@ -354,9 +352,13 @@ export default function Chat() { try { abortControllerRef.current = new AbortController(); + const token = localStorage.getItem(STORAGE_KEYS.TOKEN); const response = await fetch(endpoint, { method: "POST", - headers: { "Content-Type": "application/json" }, + headers: { + "Content-Type": "application/json", + ...(token && { Authorization: `Bearer ${token}` }), + }, body: JSON.stringify({ model: selectedDeployment.model?.model_id || "default", messages: [ diff --git a/frontend/src/pages/Containers.tsx b/frontend/src/pages/Containers.tsx index 08d88aa..d7bfae9 100644 --- a/frontend/src/pages/Containers.tsx +++ b/frontend/src/pages/Containers.tsx @@ -81,6 +81,7 @@ export default function Containers() { const [logs, setLogs] = useState(""); const [logsLoading, setLogsLoading] = useState(false); const [logsFullscreen, setLogsFullscreen] = useState(true); + const [logsAutoRefresh, setLogsAutoRefresh] = useState(true); const [autoScroll, setAutoScroll] = useState(true); const logsRef = useRef(null); const [execModal, setExecModal] = useState(null); @@ -123,9 +124,9 @@ export default function Containers() { return () => clearInterval(interval); }, [fetchData]); - // Auto-refresh logs when modal is open + // Auto-refresh logs when modal is open and auto-refresh is enabled useEffect(() => { - if (!logsModal) return; + if (!logsModal || !logsAutoRefresh) return; const interval = setInterval(async () => { try { @@ -141,7 +142,7 @@ export default function Containers() { }, 2000); // Refresh every 2 seconds return () => clearInterval(interval); - }, [logsModal]); + }, [logsModal, logsAutoRefresh]); // Auto-scroll logs to bottom useEffect(() => { @@ -783,7 +784,13 @@ export default function Containers() { }} > Logs: {logsModal?.name} - Auto-refresh + setLogsAutoRefresh(!logsAutoRefresh)} + > + Auto-refresh {logsAutoRefresh ? "ON" : "OFF"} +
} open={!!logsModal} @@ -791,6 +798,7 @@ export default function Containers() { setLogsModal(null); setLogs(""); setLogsFullscreen(true); + setLogsAutoRefresh(true); setAutoScroll(true); }} footer={ @@ -821,7 +829,7 @@ export default function Containers() { onClick={() => setLogsFullscreen(!logsFullscreen)} size="small" > - {logsFullscreen ? "Exit" : "Fullscreen"} + {logsFullscreen ? "Minimize" : "Fullscreen"} } diff --git a/frontend/src/pages/DeployApps.tsx b/frontend/src/pages/DeployApps.tsx index 2047199..96ec530 100644 --- a/frontend/src/pages/DeployApps.tsx +++ b/frontend/src/pages/DeployApps.tsx @@ -122,6 +122,7 @@ export default function DeployApps() { const [logs, setLogs] = useState(""); const [logsLoading, setLogsLoading] = useState(false); const [logsFullscreen, setLogsFullscreen] = useState(true); + const [logsAutoRefresh, setLogsAutoRefresh] = useState(true); const [autoScroll, setAutoScroll] = useState(true); const logsRef = useRef(null); const { isMobile } = useResponsive(); @@ -175,9 +176,9 @@ export default function DeployApps() { return () => clearInterval(interval); }, [deployedApps]); - // Auto-refresh logs when modal is open + // Auto-refresh logs when modal is open and auto-refresh is enabled useEffect(() => { - if (!logsModal) return; + if (!logsModal || !logsAutoRefresh) return; const interval = setInterval(async () => { try { @@ -189,7 +190,7 @@ export default function DeployApps() { }, 2000); // Refresh every 2 seconds return () => clearInterval(interval); - }, [logsModal]); + }, [logsModal, logsAutoRefresh]); // Auto-scroll logs to bottom useEffect(() => { @@ -303,10 +304,24 @@ export default function DeployApps() { // Use current hostname (LMStack IP) + app port return `http://${window.location.hostname}:${app.port}`; } else { - // Direct connection to worker + // Direct connection - for local workers, use current hostname + // Check if worker_address is a private/internal IP (Docker network, etc.) if (app.worker_address) { const workerHost = app.worker_address.split(":")[0]; - return `http://${workerHost}:${app.port}`; + const isInternalIp = + workerHost.startsWith("172.") || + workerHost.startsWith("10.") || + workerHost.startsWith("192.168.") || + workerHost === "localhost" || + workerHost === "127.0.0.1"; + + if (isInternalIp) { + // Local worker - use current browser hostname + return `http://${window.location.hostname}:${app.port}`; + } else { + // Remote worker - use worker's IP + return `http://${workerHost}:${app.port}`; + } } return null; } @@ -343,6 +358,12 @@ export default function DeployApps() { Deploy companion applications that integrate with LMStack +
+ + Apps connect to LMStack API via{" "} + http://host.docker.internal:52000/v1 + +
{/* Available Apps */} @@ -510,18 +531,19 @@ export default function DeployApps() { {app.status === "starting" && } {app.status === "pending" && } - {progress?.stage === "unknown" || - !progress?.message - ? app.status === "starting" - ? "Starting app (first startup may take 1-3 minutes)..." - : app.status === "pulling" + {app.status === "starting" + ? "Starting app (first startup may take 1-3 minutes)..." + : progress?.stage === "unknown" || + !progress?.message + ? app.status === "pulling" ? "Pulling image..." : "Preparing..." - : progress.message} + : progress.message} - {/* Use indeterminate style when no real progress data */} - {!progress || + {/* Use indeterminate style for starting stage or when no real progress data */} + {app.status === "starting" || + !progress || progress.stage === "unknown" || (app.status === "pulling" && progress.progress === 0) ? ( @@ -560,15 +582,25 @@ export default function DeployApps() { )} {app.status === "running" && appUrl && ( - +
+ + {app.port && ( + + Port: {app.port} + + )} +
)}
@@ -659,12 +691,14 @@ export default function DeployApps() { value={selectedWorker} onChange={(value) => { setSelectedWorker(value); - // Auto-disable proxy for localhost workers + // Auto-disable proxy for local workers (same machine as LMStack) const worker = workers.find((w) => w.id === value); if (worker) { - const workerHost = worker.address.split(":")[0]; - if (workerHost === "localhost" || workerHost === "127.0.0.1") { + const isLocalWorker = worker.labels?.type === "local"; + if (isLocalWorker) { setUseProxy(false); + } else { + setUseProxy(true); } } }} @@ -675,13 +709,11 @@ export default function DeployApps() { />
- {/* Hide proxy option for localhost workers (they use direct connection) */} + {/* Hide proxy option for local workers (same machine as LMStack) */} {(() => { const worker = workers.find((w) => w.id === selectedWorker); - const workerHost = worker?.address.split(":")[0]; - const isLocalhost = - workerHost === "localhost" || workerHost === "127.0.0.1"; - if (isLocalhost) return null; + const isLocalWorker = worker?.labels?.type === "local"; + if (isLocalWorker) return null; return (
Logs: {logsModal?.name} - Auto-refresh + setLogsAutoRefresh(!logsAutoRefresh)} + > + Auto-refresh {logsAutoRefresh ? "ON" : "OFF"} +
} open={!!logsModal} onCancel={() => { setLogsModal(null); setLogsFullscreen(false); + setLogsAutoRefresh(true); setAutoScroll(true); }} footer={ @@ -765,7 +804,7 @@ export default function DeployApps() { onClick={() => setLogsFullscreen(!logsFullscreen)} size="small" > - {logsFullscreen ? "Exit" : "Fullscreen"} + {logsFullscreen ? "Minimize" : "Fullscreen"} } diff --git a/frontend/src/pages/Deployments.tsx b/frontend/src/pages/Deployments.tsx index 9c69c11..50da57a 100644 --- a/frontend/src/pages/Deployments.tsx +++ b/frontend/src/pages/Deployments.tsx @@ -66,6 +66,7 @@ export default function Deployments() { const [logs, setLogs] = useState(""); const [logsLoading, setLogsLoading] = useState(false); const [logsFullscreen, setLogsFullscreen] = useState(true); + const [logsAutoRefresh, setLogsAutoRefresh] = useState(true); const [autoScroll, setAutoScroll] = useState(true); const logsRef = useRef(null); const [form] = Form.useForm(); @@ -148,9 +149,9 @@ export default function Deployments() { return () => clearInterval(interval); }, [fetchDeployments]); - // Auto-refresh logs when modal is open + // Auto-refresh logs when modal is open and auto-refresh is enabled useEffect(() => { - if (!logsModal) return; + if (!logsModal || !logsAutoRefresh) return; const interval = setInterval(async () => { try { @@ -162,7 +163,7 @@ export default function Deployments() { }, 2000); // Refresh every 2 seconds return () => clearInterval(interval); - }, [logsModal]); + }, [logsModal, logsAutoRefresh]); // Auto-scroll logs to bottom useEffect(() => { @@ -1021,13 +1022,20 @@ export default function Deployments() { }} > Logs: {logsModal?.name} - Auto-refresh + setLogsAutoRefresh(!logsAutoRefresh)} + > + Auto-refresh {logsAutoRefresh ? "ON" : "OFF"} +
} open={!!logsModal} onCancel={() => { setLogsModal(null); setLogsFullscreen(false); + setLogsAutoRefresh(true); setAutoScroll(true); }} footer={ @@ -1058,7 +1066,7 @@ export default function Deployments() { onClick={() => setLogsFullscreen(!logsFullscreen)} size="small" > - {logsFullscreen ? "Exit" : "Fullscreen"} + {logsFullscreen ? "Minimize" : "Fullscreen"} } diff --git a/frontend/src/pages/Workers.tsx b/frontend/src/pages/Workers.tsx index 0ba1293..d4e85e4 100644 --- a/frontend/src/pages/Workers.tsx +++ b/frontend/src/pages/Workers.tsx @@ -857,6 +857,69 @@ export default function Workers() { ), }, + { + key: "windows-local", + label: "Windows (Same Machine as Backend)", + children: ( +
+
+                        {generatedToken.docker_command?.replace(
+                          /BACKEND_URL=http:\/\/[^:]+:52000/,
+                          "BACKEND_URL=http://host.docker.internal:52000",
+                        )}
+                      
+ +
+ ), + }, { key: "dev-python", label: "Development Mode (Python)", diff --git a/worker/agent.py b/worker/agent.py index c1dbafa..3fbe87a 100644 --- a/worker/agent.py +++ b/worker/agent.py @@ -80,17 +80,31 @@ def __init__( self._heartbeat_task: Optional[asyncio.Task] = None self._running = True + def _is_local_worker(self) -> bool: + """Check if this worker is running on the same machine as the server.""" + # Check if server_url contains localhost or host.docker.internal + local_indicators = ["localhost", "127.0.0.1", "host.docker.internal", "::1"] + server_host = self.server_url.split("://")[-1].split(":")[0].split("/")[0].lower() + return server_host in local_indicators + async def register(self) -> bool: """Register this worker with the server.""" try: gpu_info = self.gpu_detector.detect() system_info = self.system_detector.detect() + # Build labels - mark as local if connecting to localhost/host.docker.internal + labels = {} + if self._is_local_worker(): + labels["type"] = "local" + logger.info("Detected local worker (same machine as LMStack server)") + registration_data = { "name": self.name, "address": f"{self._get_advertise_address()}:{self.port}", "gpu_info": gpu_info, "system_info": system_info, + "labels": labels if labels else None, } # Include registration token if provided diff --git a/worker/docker_ops/containers.py b/worker/docker_ops/containers.py index d4e501e..3ace542 100644 --- a/worker/docker_ops/containers.py +++ b/worker/docker_ops/containers.py @@ -8,7 +8,7 @@ from typing import Any, Optional import docker -from docker.errors import APIError, NotFound +from docker.errors import APIError, ImageNotFound, NotFound logger = logging.getLogger(__name__) @@ -265,6 +265,7 @@ def create_container( cpu_limit: Optional[float] = None, memory_limit: Optional[int] = None, cap_add: Optional[list[str]] = None, + extra_hosts: Optional[dict[str, str]] = None, ) -> dict[str, Any]: """Create and start a new container. @@ -282,12 +283,31 @@ def create_container( cpu_limit: CPU limit (number of CPUs) memory_limit: Memory limit in bytes cap_add: Linux capabilities to add (e.g., ["SYS_ADMIN"]) + extra_hosts: Extra hostname mappings (e.g., {"host.docker.internal": "host-gateway"}) Returns: Created container information """ logger.info(f"Creating container: {name} from image {image}") + # Verify image exists and is valid, pull if needed + # Note: We need to handle the case where a tag exists but points to a + # deleted/pruned image SHA. In this case, images.get() may succeed but + # container creation will fail with a 404 error. + image_valid = False + try: + img = self.client.images.get(image) + # Try to inspect the image to verify the SHA exists + img.attrs # This will fail if SHA is pruned + logger.info(f"Image {image} found locally and valid") + image_valid = True + except (ImageNotFound, APIError) as e: + logger.info(f"Image {image} not found or invalid ({type(e).__name__}), pulling...") + + if not image_valid: + self.client.images.pull(image) + logger.info(f"Image {image} pulled successfully") + # Remove existing container with same name try: existing = self.client.containers.get(name) @@ -334,24 +354,38 @@ def create_container( if restart_policy == "on-failure": restart_config["MaximumRetryCount"] = 3 - container = self.client.containers.run( - image=image, - name=name, - command=command, - entrypoint=entrypoint, - detach=True, - remove=False, - ports=port_bindings if port_bindings else None, - volumes=volume_bindings if volume_bindings else None, - device_requests=device_requests, - environment=environment, - labels=container_labels, - restart_policy=restart_config, - cpu_period=100000 if cpu_limit else None, - cpu_quota=int(cpu_limit * 100000) if cpu_limit else None, - mem_limit=memory_limit, - cap_add=cap_add, - ) + # Try to create container, retry with fresh pull if image is stale + run_kwargs = { + "image": image, + "name": name, + "command": command, + "entrypoint": entrypoint, + "detach": True, + "remove": False, + "ports": port_bindings if port_bindings else None, + "volumes": volume_bindings if volume_bindings else None, + "device_requests": device_requests, + "environment": environment, + "labels": container_labels, + "restart_policy": restart_config, + "cpu_period": 100000 if cpu_limit else None, + "cpu_quota": int(cpu_limit * 100000) if cpu_limit else None, + "mem_limit": memory_limit, + "cap_add": cap_add, + "extra_hosts": extra_hosts, + } + + try: + container = self.client.containers.run(**run_kwargs) + except APIError as e: + # If error is related to missing image SHA, re-pull and retry + if "No such image" in str(e) or "not found" in str(e).lower(): + logger.warning(f"Image {image} appears stale, re-pulling...") + self.client.images.pull(image) + logger.info(f"Image {image} re-pulled, retrying container creation") + container = self.client.containers.run(**run_kwargs) + else: + raise logger.info(f"Container {name} created with ID {container.short_id}") return self.get_container_detail(container.id) diff --git a/worker/docker_ops/images.py b/worker/docker_ops/images.py index f0f3b86..9689a3f 100644 --- a/worker/docker_ops/images.py +++ b/worker/docker_ops/images.py @@ -160,22 +160,54 @@ def pull_image( # Pull with progress tracking layers_progress: dict[str, dict] = {} + max_progress = 0 # Track max progress to prevent going backwards for line in self.client.api.pull(image, stream=True, decode=True, auth_config=auth): if progress_callback and "id" in line: layer_id = line["id"] + status = line.get("status", "") detail = line.get("progressDetail", {}) - layers_progress[layer_id] = { - "status": line.get("status", ""), - "current": detail.get("current", 0), - "total": detail.get("total", 0), - } - - # Calculate overall progress - total_size = sum(lp.get("total", 0) for lp in layers_progress.values()) - downloaded = sum(lp.get("current", 0) for lp in layers_progress.values()) + + # Only update download progress for "Downloading" status + # Once downloaded, keep the layer at 100% (current = total) + if status == "Downloading": + layers_progress[layer_id] = { + "status": status, + "current": detail.get("current", 0), + "total": detail.get("total", 0), + } + elif status in ("Download complete", "Pull complete", "Already exists"): + # Layer is complete, mark as 100% + existing = layers_progress.get(layer_id, {}) + total = existing.get("total", 0) + layers_progress[layer_id] = { + "status": status, + "current": total, + "total": total, + } + elif status == "Pulling fs layer": + # New layer, initialize with 0 + layers_progress[layer_id] = { + "status": status, + "current": 0, + "total": 0, + } + # Ignore "Extracting" and other statuses to avoid progress reset + + # Calculate overall progress (only count layers with total > 0) + layers_with_size = [ + lyr for lyr in layers_progress.values() if lyr.get("total", 0) > 0 + ] + total_size = sum(lyr.get("total", 0) for lyr in layers_with_size) + downloaded = sum(lyr.get("current", 0) for lyr in layers_with_size) progress = int((downloaded / total_size) * 100) if total_size > 0 else 0 + # Never let progress go backwards + if progress > max_progress: + max_progress = progress + else: + progress = max_progress + progress_callback(progress, layers_progress) # Get the pulled image info diff --git a/worker/docker_ops/runner.py b/worker/docker_ops/runner.py index 8931c94..62a97bb 100644 --- a/worker/docker_ops/runner.py +++ b/worker/docker_ops/runner.py @@ -131,23 +131,46 @@ def pull_image_with_progress(self, image: str, deployment_id: int) -> None: try: for line in self.client.api.pull(image, stream=True, decode=True): - if "id" in line and "progressDetail" in line: + if "id" in line: layer_id = line["id"] + status = line.get("status", "") detail = line.get("progressDetail", {}) - current = detail.get("current", 0) - total = detail.get("total", 0) progress_data = _pull_progress.get(str(deployment_id), {}) layers = progress_data.get("layers", {}) - layers[layer_id] = { - "status": line.get("status", ""), - "current": current, - "total": total, - } - # Calculate overall progress - total_size = sum(layer.get("total", 0) for layer in layers.values()) - downloaded = sum(layer.get("current", 0) for layer in layers.values()) + # Only update download progress for "Downloading" status + # Once downloaded, keep the layer at 100% (total = total, current = total) + if status == "Downloading": + current = detail.get("current", 0) + total = detail.get("total", 0) + layers[layer_id] = { + "status": status, + "current": current, + "total": total, + } + elif status in ("Download complete", "Pull complete", "Already exists"): + # Layer is complete, mark as 100% + existing = layers.get(layer_id, {}) + total = existing.get("total", 0) + layers[layer_id] = { + "status": status, + "current": total, # current = total = 100% + "total": total, + } + elif status == "Pulling fs layer": + # New layer, initialize with 0 + layers[layer_id] = { + "status": status, + "current": 0, + "total": 0, + } + # Ignore "Extracting" and other statuses to avoid progress reset + + # Calculate overall progress (only count layers with total > 0) + layers_with_size = [lyr for lyr in layers.values() if lyr.get("total", 0) > 0] + total_size = sum(lyr.get("total", 0) for lyr in layers_with_size) + downloaded = sum(lyr.get("current", 0) for lyr in layers_with_size) overall_progress = int((downloaded / total_size) * 100) if total_size > 0 else 0 _set_pull_progress( @@ -193,8 +216,20 @@ async def run( environment: dict[str, str], deployment_id: int = 0, port: Optional[int] = None, + network: Optional[str] = None, ) -> tuple[str, int]: - """Run a container and return (container_id, port).""" + """Run a container and return (container_id, port). + + Args: + name: Container name + image: Docker image + command: Container command + gpu_indexes: GPU indices to use + environment: Environment variables + deployment_id: Deployment ID for progress tracking + port: Optional specific port to use + network: Optional Docker network to join (for Windows compatibility) + """ loop = asyncio.get_event_loop() return await loop.run_in_executor( None, @@ -206,6 +241,7 @@ async def run( environment, deployment_id, port, + network, ) def _run_sync( @@ -217,6 +253,7 @@ def _run_sync( environment: dict[str, str], deployment_id: int = 0, port: Optional[int] = None, + network: Optional[str] = None, ) -> tuple[str, int]: """Synchronous container run.""" # Check if container with same name exists @@ -228,10 +265,13 @@ def _run_sync( pass # Pull image if not exists + # Note: We catch both NotFound and APIError because when a tag exists + # but points to a deleted/pruned image SHA, Docker returns a 404 APIError + # instead of NotFound. try: self.client.images.get(image) - except NotFound: - logger.info(f"Pulling image {image}...") + except (NotFound, APIError) as e: + logger.info(f"Image {image} not found or invalid ({type(e).__name__}), pulling...") self.pull_image_with_progress(image, deployment_id) # Update progress to starting @@ -263,24 +303,32 @@ def _run_sync( **environment, } - # Run container - container = self.client.containers.run( - image=image, - name=name, - command=command, - detach=True, - remove=False, - ports={"8000/tcp": host_port}, - device_requests=device_requests, - environment=env, - shm_size="16g", - volumes={ + # Build container run kwargs + run_kwargs = { + "image": image, + "name": name, + "command": command, + "detach": True, + "remove": False, + "ports": {"8000/tcp": host_port}, + "device_requests": device_requests, + "environment": env, + "shm_size": "16g", + "volumes": { "/root/.cache/huggingface": { "bind": "/root/.cache/huggingface", "mode": "rw", }, }, - ) + } + + # Add network if specified (for Windows Docker Desktop compatibility) + if network: + run_kwargs["network"] = network + logger.info(f"Creating container on network: {network}") + + # Run container + container = self.client.containers.run(**run_kwargs) return container.id, host_port diff --git a/worker/models.py b/worker/models.py index 99057cd..ec982c9 100644 --- a/worker/models.py +++ b/worker/models.py @@ -87,6 +87,7 @@ class ContainerCreateRequest(BaseModel): cpu_limit: Optional[float] = None memory_limit: Optional[int] = None cap_add: Optional[list[str]] = None + extra_hosts: Optional[dict[str, str]] = None class ContainerActionRequest(BaseModel): diff --git a/worker/routes/containers.py b/worker/routes/containers.py index 5c7c4e1..1c38c4b 100644 --- a/worker/routes/containers.py +++ b/worker/routes/containers.py @@ -95,6 +95,7 @@ async def create_container(request: ContainerCreateRequest): cpu_limit=request.cpu_limit, memory_limit=request.memory_limit, cap_add=request.cap_add, + extra_hosts=request.extra_hosts, ) except Exception as e: logger.error(f"Failed to create container {request.name}: {e}")