diff --git a/.gitignore b/.gitignore index c96ea54..3b88f71 100644 --- a/.gitignore +++ b/.gitignore @@ -32,3 +32,5 @@ configs/simulation_runs/ configs/agent_experiment_runs/ configs/agent_config_*.yaml configs/cached/ + +.pnpm-store/ \ No newline at end of file diff --git a/README.md b/README.md index b80fdcd..05002f2 100644 --- a/README.md +++ b/README.md @@ -92,6 +92,11 @@ git clone https://github.com/spire-studio/figaro.git cd figaro uv sync + +cp .env.example .env +# Edit .env and set: +# OPENAI_API_KEY=your-api-key +# POSTGRES_PASSWORD=postgres ``` ## 🚀 Quick Start @@ -110,15 +115,7 @@ docker run -d --name figaro-pg -p 5433:5432 \ -e POSTGRES_PASSWORD=postgres -e POSTGRES_DB=figaro postgres:16 ``` -**Step 2 — Configure `.env`**: -```bash -cp .env.example .env -# Edit .env and set: -# OPENAI_API_KEY=your-api-key -# POSTGRES_PASSWORD=postgres -``` - -**Step 3 — Backend**: +**Step 2 — Backend**: ```bash POSTGRES_HOST=localhost POSTGRES_PORT=5433 POSTGRES_PASSWORD=postgres POSTGRES_DB=figaro \ PYTHONPATH=libs:apps/backend/runners \ @@ -130,7 +127,7 @@ To use a specific GPU (e.g. GPU 1): CUDA_VISIBLE_DEVICES=1 POSTGRES_HOST=localhost ... uv run uvicorn ... ``` -**Step 4 — Frontend** (in another terminal): +**Step 3 — Frontend** (in another terminal): ```bash cd apps/frontend && pnpm install && pnpm dev ``` @@ -207,14 +204,27 @@ figaro/ PRs welcome! Figaro is meant to be a readable, research-friendly FL platform. -**Roadmap** — tentative, contributions welcome: - -- [ ] **Richer agent planning** — multi-step reflection and failure recovery in the LangGraph pipeline -- [ ] **More aggregation strategies** — FedProx, FedAvgM, Scaffold on top of the existing FedAvg baseline -- [ ] **Hardened distributed mode** — fault tolerance, client reconnection, and heterogeneous workers -- [ ] **Expanded datasets & models** — beyond CIFAR-10 / MNIST and CNN / ResNet -- [ ] **End-to-end reproducibility** — deterministic seeds, artifact lineage, and one-click replay -- [ ] **Observability** — per-run metrics dashboard and structured logs +**Roadmap**: + +**Phase 1: Solidifying the Agentic Platform** +- [x] **Interactive Agent Planning** — Multi-turn dialogue support for refining experiments, plus visual topology previews (Plan Preview) before execution. +- [x] **Execution Transparency** — Real-time tracking of node-level status during execution and automated natural-language interpretation of results. +- [ ] **Strict Configuration Engine** — Implement strict Pydantic/JSON Schema validation to resolve historical inconsistencies between `config_schema` and underlying algorithms. +- [ ] **Advanced Experiment Tracking** — Multi-dimensional search filtering (by metrics, hyperparameters, status) and configuration version control (diffing). + +**Phase 2: LLM & LoRA Federated Fine-Tuning** +- [ ] **Native LLM Ecosystem Integration** — Seamless Hugging Face model loading (e.g., Llama 3, Qwen) and efficient parsing of JSONL instruction-tuning datasets. +- [ ] **Parameter-Efficient Runtime** — Deep integration with LoRA/PEFT, including support for QLoRA (4-bit/8-bit quantization) to lower client-side memory barriers. +- [ ] **Specialized Adapter Aggregation** — Custom aggregation mechanisms for LoRA adapters, exploring support for heterogeneous LoRA ranks across clients. +- [ ] **LLM Evaluation Metrics** — Built-in evaluation for generative tasks (Rouge, BLEU, Perplexity) and automated LLM-as-a-Judge capabilities. +- [ ] **Hardware Guardrails** — Pre-run dynamic GPU memory estimation (OOM prevention) and automated tuning of gradient accumulation and checkpointing. + +**Phase 3: Enterprise & Team Collaboration** +- [ ] **Multi-Tenant Workspaces** — Isolated project environments with Role-Based Access Control (RBAC) and comprehensive audit logging. +- [ ] **Robust Distributed Scheduling** — Global GPU resource queuing, quota management, and enhanced fault tolerance for client reconnections/dropouts. +- [ ] **Cloud-Native Infrastructure** — Native Kubernetes (K8s) Runner integration with auto-scaling workers based on queue volume. +- [ ] **Model Asset Registry** — Centralized Artifact Registry to track complete data lineage from dataset versions to final aggregated weights. +- [ ] **Compliance & Governance** — Automated privacy compliance reporting (e.g., auditing Differential Privacy parameters) to ensure enterprise-grade security.

Figaro is for research and educational use. diff --git a/README.zh-CN.md b/README.zh-CN.md index d97145d..64e4396 100644 --- a/README.zh-CN.md +++ b/README.zh-CN.md @@ -93,6 +93,11 @@ git clone https://github.com/spire-studio/figaro.git cd figaro uv sync + +cp .env.example .env +# 编辑 .env,设置: +# OPENAI_API_KEY=your-api-key +# POSTGRES_PASSWORD=postgres ``` ## 🚀 快速开始 @@ -111,15 +116,7 @@ docker run -d --name figaro-pg -p 5433:5432 \ -e POSTGRES_PASSWORD=postgres -e POSTGRES_DB=figaro postgres:16 ``` -**第 2 步 — 配置 `.env`**: -```bash -cp .env.example .env -# 编辑 .env,设置: -# OPENAI_API_KEY=your-api-key -# POSTGRES_PASSWORD=postgres -``` - -**第 3 步 — 后端**: +**第 2 步 — 后端**: ```bash POSTGRES_HOST=localhost POSTGRES_PORT=5433 POSTGRES_PASSWORD=postgres POSTGRES_DB=figaro \ PYTHONPATH=libs:apps/backend/runners \ @@ -131,7 +128,7 @@ POSTGRES_HOST=localhost POSTGRES_PORT=5433 POSTGRES_PASSWORD=postgres POSTGRES_D CUDA_VISIBLE_DEVICES=1 POSTGRES_HOST=localhost ... uv run uvicorn ... ``` -**第 4 步 — 前端**(在另一个终端): +**第 3 步 — 前端**(在另一个终端): ```bash cd apps/frontend && pnpm install && pnpm dev ``` @@ -208,14 +205,27 @@ figaro/ 欢迎提 PR!Figaro 的目标是做一个可读性强、研究友好的联邦学习平台。 -**Roadmap**(暂定,欢迎贡献): - -- [ ] **更丰富的 Agent 规划** —— LangGraph 流水线加入多步反思和失败恢复 -- [ ] **更多聚合策略** —— 在现有 FedAvg 之上补充 FedProx、FedAvgM、Scaffold -- [ ] **分布式模式加固** —— 容错、客户端重连、异构 worker 支持 -- [ ] **扩展数据集和模型** —— 突破 CIFAR-10 / MNIST 和 CNN / ResNet 的范围 -- [ ] **端到端可复现** —— 确定性种子、产物血缘、一键回放 -- [ ] **可观测性** —— 单实验指标面板与结构化日志 +**Roadmap**: + +**Phase 1:夯实 Agentic 实验平台** +- [x] **Agent 交互体验升级** —— 支持多轮对话微调实验计划,提供实验执行前的Plan Preview。 +- [x] **执行与分析透明化** —— 支持实验节点的实时状态追踪,以及 Agent 驱动的运行结果自动化图表解释。 +- [ ] **配置引擎重构** —— 引入基于 Pydantic/JSON Schema 的严格强校验,彻底修复 `config_schema` 与底层算法实现不一致的问题。 +- [ ] **高阶实验管理** —— 支持按指标、超参等多维度搜索过滤实验历史,支持配置文件版本控制与 Diff 差异对比。 + +**Phase 2:LLM / LoRA 联邦微调支持** +- [ ] **大模型生态原生接入** —— 内置 Hugging Face 适配层,一键加载主流开源模型,支持 JSONL 格式的指令微调数据集高效解析。 +- [ ] **高效分布式微调** —— 深度集成 LoRA/PEFT 训练环境,支持 QLoRA (4-bit/8-bit 量化) 以显著降低边缘节点的显存门槛。 +- [ ] **专属权重聚合策略** —— 针对 LLM 微调定制的 Adapter 权重聚合机制,探索支持客户端异构 LoRA Rank 的聚合方案。 +- [ ] **大模型专项评估体系** —— 集成生成式 NLP 指标,并引入基于大模型的自动化指令跟随能力评估 (LLM-as-a-Judge)。 +- [ ] **资源护栏与预判** —— 训练任务启动前进行动态 GPU 显存预估(防 OOM 机制),并支持梯度累积与 Checkpointing 的自动调优。 + +**Phase 3:平台化与企业级协作** +- [ ] **多租户与细粒度权限** —— 构建多用户隔离的项目空间 (Workspaces),引入基于角色的访问控制 (RBAC) 和完整的操作审计日志。 +- [ ] **生产级调度与容错** —— 实现全局 GPU 资源排队与配额限制,增强分布式环境下的客户端掉线重连与死机容错机制。 +- [ ] **云原生基础设施** —— 提供原生的 Kubernetes (K8s) Runner 支持,支持基于排队任务量的 Worker 节点动态扩缩容。 +- [ ] **模型资产与血缘管理** —— 建立集中式的产物注册表 (Artifact Registry),追踪从数据集版本到最终模型权重的完整数据血缘 (Lineage)。 +- [ ] **治理与安全合规** —— 提供自动化的隐私合规检查与报告生成(例如审计差分隐私的 $\epsilon$ 参数),确保企业级联邦学习的数据安全。

Figaro 仅用于科研和教学目的。 diff --git a/apps/backend/app/api/v1/endpoints/agent.py b/apps/backend/app/api/v1/endpoints/agent.py index 2265a3f..63c5888 100644 --- a/apps/backend/app/api/v1/endpoints/agent.py +++ b/apps/backend/app/api/v1/endpoints/agent.py @@ -4,7 +4,8 @@ from __future__ import annotations -from fastapi import APIRouter, status +from fastapi import APIRouter, status, HTTPException +import json from app.api.deps import AsyncSessionDep from app.schemas.agent import ( @@ -27,6 +28,8 @@ from app.services.agent.graph import FederatedAgentGraphBuilder from app.services.agent.objectives import AgentOptimizationObjective, resolve_objective from app.services.llm import LLMRegistry, LLMService +from app.schemas.agent import AgentPlanPreviewRequest, AgentPlanPreviewResponse, ExperimentPlanPreview, AgentPlanReviseRequest +import uuid agent_router = APIRouter() agent_runtime_service = AgentRuntimeService() @@ -143,6 +146,7 @@ def _build_progress_response(snapshot: dict) -> AgentOptimizeProgressResponse: best_config=snapshot.get("best_config"), best_metrics=best_metrics, experiments=_build_experiments(snapshot.get("experiments", [])), + draft_experiments=snapshot.get("draft_experiments", []), summary_text=snapshot.get("summary_text"), error_message=snapshot.get("error_message"), created_at=snapshot.get("created_at"), @@ -243,6 +247,7 @@ async def start_optimization( system_mode=payload.system_mode, model_name=payload.model_name, objective=payload.objective, + planned_experiments=payload.planned_experiments, ) return _build_progress_response(snapshot) @@ -463,3 +468,145 @@ async def get_agent_run_logs( ) for log in logs ] + + +@agent_router.post( + "/optimize/plan", + response_model=AgentPlanPreviewResponse, + status_code=status.HTTP_200_OK, + summary="Generate a plan preview and save as draft", +) +async def generate_plan_preview( + payload: AgentPlanPreviewRequest, + session: AsyncSessionDep, +) -> AgentPlanPreviewResponse: + """ + 1. Call LLM to generate the plan. + 2. Create a Job record with PENDING_REVIEW status. + """ + # Borrow the graph builder to do the LLM parsing + llm_service = LLMService() + builder = FederatedAgentGraphBuilder(llm_service=llm_service, session=session) + + # Construct a dummy initial state to run through the parse node + temp_state = AgentState( + goal=payload.goal, + job_name=payload.job_name, + model_name=payload.model_name, + system_mode=payload.system_mode, + max_iterations=10, + objective=AgentOptimizationObjective.AUTO, + resolved_objective=AgentOptimizationObjective.ACCURACY, + ) + + # Call the graph's parse node directly to get the LLM result + parsed_state = await builder._node_parse(temp_state) + + previews = [] + for exp in parsed_state.experiments: + previews.append(ExperimentPlanPreview( + name=exp.name, + plan_summary=exp.plan_summary or "", + config_patch=exp.config_patch, + # Fallback values if LLM doesn't generate them + estimated_minutes=0, + estimated_gpu_vram_gb=0.0, + )) + + # Persist the draft (write snapshot to DB, set status to pending_review) + history_service = AgentOptimizationHistoryService(session) + + job = await history_service.create_job( + task_id=f"draft-{uuid.uuid4().hex[:8]}", + goal=payload.goal, + job_name=payload.job_name, + model_name=payload.model_name, + system_mode=payload.system_mode, + max_iterations=len(previews) or 1, + status="pending_review", + snapshot={ + "goal": payload.goal, + "job_name": payload.job_name, + "status": "pending_review", + "experiments": [], + "draft_experiments": [p.model_dump() for p in previews] + } + ) + + return AgentPlanPreviewResponse( + optimization_job_id=job.id, + goal=payload.goal, + experiments=previews, + system_mode=payload.system_mode + ) + +@agent_router.post( + "/optimization-jobs/{job_id}/revise", + response_model=AgentPlanPreviewResponse, + status_code=status.HTTP_200_OK, + summary="Revise a pending plan draft using natural language", +) +async def revise_plan_preview( + job_id: int, + payload: AgentPlanReviseRequest, + session: AsyncSessionDep, +) -> AgentPlanPreviewResponse: + history_service = AgentOptimizationHistoryService(session) + job = await history_service.get_job_or_raise(job_id) + + current_status = job.status.value if hasattr(job.status, "value") else str(job.status) + if current_status != "pending_review": + raise HTTPException(status_code=400, detail="Only pending_review jobs can be revised.") + + snapshot = job.snapshot_json + current_experiments = snapshot.get("draft_experiments", []) + if not current_experiments: + raise HTTPException(status_code=400, detail="No draft experiments found to revise.") + + instructions = ( + "You are an AI assistant managing a Federated Learning experiment plan. " + "The user wants to modify the current experiment configuration based on their feedback. " + "Update the JSON configuration to reflect their request. " + "Respond ONLY with a valid JSON array of experiment objects, matching the original schema. " + "Do not include any other text." + ) + + input_text = ( + f"Current Plan (JSON):\n{json.dumps(current_experiments, indent=2)}\n\n" + f"User Modification Request:\n{payload.instruction}\n\n" + "Please output the updated JSON array of experiments:" + ) + + llm_service = LLMService() + model_name = snapshot.get("model_name") + text, _ = await llm_service.generate_text( + model=model_name if model_name else None, + instructions=instructions, + input_text=input_text, + ) + + clean_text = text.strip() + if clean_text.startswith("```json"): + clean_text = clean_text[7:] + elif clean_text.startswith("```"): + clean_text = clean_text[3:] + if clean_text.endswith("```"): + clean_text = clean_text[:-3] + clean_text = clean_text.strip() + + try: + updated_experiments = json.loads(clean_text) + if not isinstance(updated_experiments, list): + raise ValueError("LLM did not return a list.") + except Exception as e: + raise HTTPException(status_code=500, detail=f"Failed to parse LLM response: {str(e)}") + + snapshot["draft_experiments"] = updated_experiments + await history_service.update_job_snapshot(task_id=job.task_id, snapshot=snapshot) + + return AgentPlanPreviewResponse( + optimization_job_id=job.id, + goal=snapshot.get("goal", ""), + experiments=[ExperimentPlanPreview.model_validate(exp) for exp in updated_experiments], + system_mode=snapshot.get("system_mode", "simulation") + ) \ No newline at end of file diff --git a/apps/backend/app/core/db.py b/apps/backend/app/core/db.py index 0298e43..85a856c 100644 --- a/apps/backend/app/core/db.py +++ b/apps/backend/app/core/db.py @@ -72,9 +72,9 @@ async def init_db() -> None: async with engine.begin() as conn: # In dev mode we hard-reset postgres schema so renamed tables/types are # fully cleaned up (including legacy enum dependencies). - if conn.dialect.name == "postgresql": - await conn.execute(text("DROP SCHEMA IF EXISTS public CASCADE")) - await conn.execute(text("CREATE SCHEMA public")) - else: - await conn.run_sync(SQLModel.metadata.drop_all) + # if conn.dialect.name == "postgresql": + # await conn.execute(text("DROP SCHEMA IF EXISTS public CASCADE")) + # await conn.execute(text("CREATE SCHEMA public")) + # else: + # await conn.run_sync(SQLModel.metadata.drop_all) await conn.run_sync(SQLModel.metadata.create_all) diff --git a/apps/backend/app/models/agent/optimization_jobs.py b/apps/backend/app/models/agent/optimization_jobs.py index bde431c..8848516 100644 --- a/apps/backend/app/models/agent/optimization_jobs.py +++ b/apps/backend/app/models/agent/optimization_jobs.py @@ -21,6 +21,7 @@ class AgentOptimizationJobStatus(str, Enum): RUNNING = "running" COMPLETED = "completed" FAILED = "failed" + PENDING_REVIEW = "pending_review" class AgentOptimizationJob(SQLModel, table=True): diff --git a/apps/backend/app/models/distributed/sessions.py b/apps/backend/app/models/distributed/sessions.py index 65fcd41..fd9bd1c 100644 --- a/apps/backend/app/models/distributed/sessions.py +++ b/apps/backend/app/models/distributed/sessions.py @@ -37,7 +37,7 @@ class DistributedSession(SQLModel, table=True): sa_column=Column(Integer, ForeignKey("distributed_jobs.id"), nullable=False, index=True), ) server_ip: str = Field( - default="127.0.0.1", + default="localhost", sa_column=Column(String(DistributedLimits.SERVER_IP_MAX_LENGTH), nullable=False), ) server_port: int = Field( diff --git a/apps/backend/app/schemas/agent.py b/apps/backend/app/schemas/agent.py index 89ea8e4..f11614f 100644 --- a/apps/backend/app/schemas/agent.py +++ b/apps/backend/app/schemas/agent.py @@ -41,6 +41,10 @@ class AgentOptimizeRequest(BaseModel): default=AgentOptimizationObjective.AUTO, description="Objective for the experiment batch. 'accuracy' maximizes accuracy; 'auto' defaults to accuracy.", ) + planned_experiments: Optional[list[dict[str, Any]]] = Field( + default=None, + description="Explicit list of experiment patches to run, bypassing LLM parse." + ) class AgentModelsResponse(BaseModel): @@ -143,6 +147,7 @@ class AgentOptimizeProgressResponse(BaseModel): best_config: dict[str, Any] | None = None best_metrics: SimulationRunMetricsResponse | None = None experiments: list[AgentExperimentSummary] = Field(default_factory=list) + draft_experiments: list[dict[str, Any]] = Field(default_factory=list) summary_text: str | None = None error_message: str | None = None created_at: datetime | None = None @@ -215,3 +220,31 @@ class AgentRunMetricsResponse(BaseModel): run_id: str metrics: dict[str, Any] = Field(default_factory=dict) + + +class ExperimentPlanPreview(BaseModel): + """Single experiment draft for frontend UI display and editing.""" + name: str + plan_summary: str + config_patch: dict[str, Any] + estimated_minutes: int = Field(default=0, description="Estimated time in minutes") + estimated_gpu_vram_gb: float = Field(default=0.0, description="Estimated VRAM in GB") + risk_warnings: list[str] = Field(default_factory=list) + +class AgentPlanPreviewRequest(BaseModel): + """Payload for requesting a draft generation without execution.""" + goal: str = Field(..., description="Natural-language experiment request.") + job_name: str = Field(..., description="Name for the job group.") + model_name: Optional[str] = None + system_mode: str = "simulation" + +class AgentPlanPreviewResponse(BaseModel): + """Draft results returned to the frontend.""" + optimization_job_id: int + goal: str + experiments: list[ExperimentPlanPreview] + system_mode: str + +class AgentPlanReviseRequest(BaseModel): + """Payload for requesting a revision to an existing plan draft.""" + instruction: str = Field(..., description="Natural language feedback to revise the plan.") \ No newline at end of file diff --git a/apps/backend/app/schemas/distributed/sessions.py b/apps/backend/app/schemas/distributed/sessions.py index d658712..1d3f738 100644 --- a/apps/backend/app/schemas/distributed/sessions.py +++ b/apps/backend/app/schemas/distributed/sessions.py @@ -17,7 +17,7 @@ class DistributedSessionCreateRequest(BaseModel): Request payload for creating or getting distributed session. """ - server_ip: str = "127.0.0.1" + server_ip: str = "localhost" server_port: int = 50052 diff --git a/apps/backend/app/services/agent/experiment_service.py b/apps/backend/app/services/agent/experiment_service.py index f37afca..601e932 100644 --- a/apps/backend/app/services/agent/experiment_service.py +++ b/apps/backend/app/services/agent/experiment_service.py @@ -33,6 +33,10 @@ ) from app.repositories.agent import AgentExperimentRepository +from app.core.logger import get_logger + +logger = get_logger(__name__) + # --------------------------------------------------------------------------- # Shared constants (mirrored from simulation run_service / run_metrics_service) @@ -472,11 +476,9 @@ async def get_run_metrics(self, run_id: str) -> dict[str, Any]: run = await self.repo.get_run(run_id) if not run: raise exceptions.RunNotFound("Run not found") - # If terminal and already has metrics, return them if run.status in TERMINAL_RUN_STATUSES and isinstance(run.metrics_json, dict) and run.metrics_json: return self._normalize_metrics_payload(run.metrics_json) - # Try live result file live_result_path = self._results_dir() / self._build_live_results_filename(run_id) loaded = self._load_metrics_file(live_result_path) diff --git a/apps/backend/app/services/agent/graph.py b/apps/backend/app/services/agent/graph.py index 3e6fcd6..cdaabd8 100644 --- a/apps/backend/app/services/agent/graph.py +++ b/apps/backend/app/services/agent/graph.py @@ -13,6 +13,9 @@ import uuid from collections.abc import Awaitable, Callable +from app.core.logger import get_logger +logger = get_logger(__name__) + from langgraph.graph import END, StateGraph # type: ignore[import-untyped] from sqlalchemy.ext.asyncio import AsyncSession @@ -71,6 +74,25 @@ async def _node_parse(self, state: AgentState) -> AgentState: state.phase = "parsing" await self._publish_progress(state) + if getattr(state, "planned_experiments", None): + experiments: list[ExperimentPlan] = [] + for idx, exp in enumerate(state.planned_experiments): + name = exp.get("name", f"exp-{idx + 1}") + experiments.append( + ExperimentPlan( + iteration=idx + 1, + iteration_goal=f"Run experiment: {name}", + name=name, + plan_summary=exp.get("plan_summary", ""), + config_patch=exp.get("config_patch", {}), + ) + ) + state.experiments = experiments + state.max_iterations = len(experiments) + state.phase = "parsed" + await self._publish_progress(state) + return state + experiment_service = AgentExperimentService(self._session) schema = experiment_service.get_config_schema() capabilities = get_platform_capabilities() @@ -83,16 +105,30 @@ async def _node_parse(self, state: AgentState) -> AgentState: base_config=base_config, ) + logger.info("llm input instructions=%s", instructions) + logger.info("llm input prompt=%s", prompt) + text, _ = await self._llm.generate_text( model=select_llm_model(state.model_name), instructions=instructions, input_text=prompt, ) + logger.info("llm raw output=%s", text) + + clean_text = text.strip() + if clean_text.startswith("```json"): + clean_text = clean_text[7:] + elif clean_text.startswith("```"): + clean_text = clean_text[3:] + if clean_text.endswith("```"): + clean_text = clean_text[:-3] + clean_text = clean_text.strip() + # Parse LLM response into experiment plans experiments: list[ExperimentPlan] = [] try: - parsed = json.loads(text) + parsed = json.loads(clean_text) if isinstance(parsed, dict): plan_summary = parsed.get("plan_summary", "") raw_experiments = parsed.get("experiments", []) @@ -101,14 +137,16 @@ async def _node_parse(self, state: AgentState) -> AgentState: if not isinstance(exp, dict): continue name = exp.get("name", f"exp-{idx + 1}") - config = exp.get("config", {}) - if not isinstance(config, dict): - config = {} + patch_config = exp.get("config", {}) + if not isinstance(patch_config, dict): + patch_config = {} # Merge with base config and normalize - merged = self._deep_merge_dicts(base_config, config) + current_full_config = copy.deepcopy(base_config) + merged = self._deep_merge_dicts(current_full_config, patch_config) try: normalized = experiment_service.normalize_simulation_config(merged) - except exceptions.BadRequestError: + except Exception: + # 如果合并出错,至少保证能跑,回退到基础配置 normalized = experiment_service.normalize_simulation_config( copy.deepcopy(base_config) ) @@ -118,7 +156,7 @@ async def _node_parse(self, state: AgentState) -> AgentState: iteration_goal=f"Run experiment: {name}", name=name, plan_summary=plan_summary, - config_patch=config, + config_patch=normalized, ) ) except (json.JSONDecodeError, Exception): diff --git a/apps/backend/app/services/agent/history_service.py b/apps/backend/app/services/agent/history_service.py index 637132a..7dce611 100644 --- a/apps/backend/app/services/agent/history_service.py +++ b/apps/backend/app/services/agent/history_service.py @@ -37,9 +37,15 @@ async def create_job( model_name: str | None, max_iterations: int, snapshot: dict[str, Any], + status: AgentOptimizationJobStatus | str = AgentOptimizationJobStatus.QUEUED, ) -> AgentOptimizationJob: if job_name: await self._ensure_unique_job_name(job_name) + + resolved_status = status + if isinstance(status, str): + resolved_status = AgentOptimizationJobStatus(status.lower()) + try: job = await self.repository.create_job( task_id=task_id, @@ -47,7 +53,7 @@ async def create_job( goal=goal, system_mode=system_mode, model_name=model_name, - status=AgentOptimizationJobStatus.QUEUED, + status=resolved_status, max_iterations=max_iterations, snapshot_json=self._to_json_safe(snapshot), ) @@ -120,8 +126,12 @@ async def mark_job_failed( async def _ensure_unique_job_name(self, job_name: str, *, exclude_job_id: int | None = None) -> None: existing = await self.repository.get_job_by_name(job_name) if existing and (exclude_job_id is None or existing.id != exclude_job_id): - raise exceptions.JobAlreadyExists(self.JOB_NAME_CONFLICT_MESSAGE) - + current_status = existing.status.value if hasattr(existing.status, "value") else str(existing.status) + if current_status.lower() == "pending_review": + await self.session.delete(existing) + await self.session.flush() + else: + raise exceptions.JobAlreadyExists(self.JOB_NAME_CONFLICT_MESSAGE) @staticmethod def _extract_simulation_job_id(snapshot: dict[str, Any]) -> int | None: current_experiment = snapshot.get("current_experiment") diff --git a/apps/backend/app/services/agent/runtime_service.py b/apps/backend/app/services/agent/runtime_service.py index 79661f3..5a3335b 100644 --- a/apps/backend/app/services/agent/runtime_service.py +++ b/apps/backend/app/services/agent/runtime_service.py @@ -39,6 +39,7 @@ async def start_optimization( model_name: str | None, job_name: str | None, objective: AgentOptimizationObjective, + planned_experiments: list[dict[str, Any]], ) -> dict[str, Any]: """ Create a background experiment task and return its initial snapshot. @@ -98,6 +99,7 @@ async def start_optimization( job_name=job_name, objective=objective, resolved_objective=resolved_objective, + planned_experiments=planned_experiments, ) ) task.add_done_callback(lambda finished_task, current_task_id=task_id: self._on_task_done(current_task_id, finished_task)) @@ -124,6 +126,7 @@ async def _run_task( job_name: str | None, objective: AgentOptimizationObjective, resolved_objective: AgentOptimizationObjective, + planned_experiments: list[dict[str, Any]] | None = None, ) -> None: """Execute one experiment task in the background.""" logger.info("agent_task_started task_id=%s", task_id) @@ -138,6 +141,7 @@ async def _run_task( objective=objective, resolved_objective=resolved_objective, phase="parsing", + planned_experiments=planned_experiments, ) await self._update_from_state(task_id, initial_state, status="running") async with AsyncSessionLocal() as session: diff --git a/apps/backend/app/services/agent/state.py b/apps/backend/app/services/agent/state.py index e34ba4d..911274b 100644 --- a/apps/backend/app/services/agent/state.py +++ b/apps/backend/app/services/agent/state.py @@ -102,3 +102,5 @@ class AgentState: history: list[ExperimentRecord] = field(default_factory=list) terminated: bool = False summary: str | None = None + + planned_experiments: list[dict[str, Any]] | None = None diff --git a/apps/backend/app/services/agent/summary.py b/apps/backend/app/services/agent/summary.py index 2fac63e..04e0b5c 100644 --- a/apps/backend/app/services/agent/summary.py +++ b/apps/backend/app/services/agent/summary.py @@ -137,3 +137,23 @@ def build_lessons_learned( ) -> list[str]: """Bench mode does not generate iterative lessons.""" return [] + +def calculate_communication_cost(config: dict[str, Any]) -> int: + """Estimate communication cost: total rounds * clients per round.""" + federated = config.get("federated") or {} + rounds = int(federated.get("num_rounds", 0)) + clients = int(federated.get("clients_per_round", 0)) + return rounds * clients + +def calculate_computation_cost(config: dict[str, Any]) -> int: + """Estimate computation cost: total rounds * clients per round * local epochs.""" + federated = config.get("federated") or {} + epochs = int(federated.get("local_epochs", 0)) + return calculate_communication_cost(config) * epochs + +def get_best_experiment(state: AgentState) -> ExperimentRecord | None: + """Find the best performing experiment (the Winner).""" + scored = [r for r in state.experiment_results if r.score is not None] + if not scored: + return None + return max(scored, key=lambda r: r.score) \ No newline at end of file diff --git a/apps/backend/app/services/llm.py b/apps/backend/app/services/llm.py index 88e28db..00caa25 100644 --- a/apps/backend/app/services/llm.py +++ b/apps/backend/app/services/llm.py @@ -56,6 +56,14 @@ class LLMRegistry: "name": "gpt-4o", "llm": _build_openai_model("gpt-4o"), }, + { + "name": "bytedance-seed/seed-2.0-mini", + "llm": _build_openai_model("bytedance-seed/seed-2.0-mini"), + }, + { + "name": "moonshotai/kimi-k2.5", + "llm": _build_openai_model("moonshotai/kimi-k2.5"), + } ] @classmethod diff --git a/apps/backend/tests/api/v1/endpoints/test_agent.py b/apps/backend/tests/api/v1/endpoints/test_agent.py index 30db778..cefb45a 100644 --- a/apps/backend/tests/api/v1/endpoints/test_agent.py +++ b/apps/backend/tests/api/v1/endpoints/test_agent.py @@ -155,12 +155,14 @@ def build(): def test_agent_optimize_start_endpoint_returns_live_progress(client, monkeypatch): import app.api.v1.endpoints.agent as agent_module - async def _fake_start_optimization(*, goal, max_iterations, system_mode, model_name, job_name, objective): + # ADD `planned_experiments` here 👇 + async def _fake_start_optimization(*, goal, max_iterations, system_mode, model_name, job_name, objective, planned_experiments): assert goal == "live optimize" assert max_iterations == 3 assert system_mode == "simulation" assert model_name == "gpt-live" assert job_name == "opt-job-live" + assert planned_experiments is None # Optional: verify the default value is passed return { "task_id": "task-1", "status": "running", diff --git a/apps/backend/tests/api/v1/endpoints/test_distributed.py b/apps/backend/tests/api/v1/endpoints/test_distributed.py index 00cbf77..0a9ed4b 100644 --- a/apps/backend/tests/api/v1/endpoints/test_distributed.py +++ b/apps/backend/tests/api/v1/endpoints/test_distributed.py @@ -23,7 +23,7 @@ def test_distributed_session_lifecycle(client): created_session = client.post( f"/api/v1/distributed/jobs/{job_id}/session", - json={"server_ip": "127.0.0.1", "server_port": 50052}, + json={"server_ip": "localhost", "server_port": 50052}, ) assert created_session.status_code == 201 session_id = created_session.json()["id"] @@ -102,7 +102,7 @@ def test_distributed_start_requires_all_clients_ready(client): created_session = client.post( f"/api/v1/distributed/jobs/{job_id}/session", - json={"server_ip": "127.0.0.1", "server_port": 50052}, + json={"server_ip": "localhost", "server_port": 50052}, ) assert created_session.status_code == 201 session_id = created_session.json()["id"] diff --git a/apps/backend/tests/core/test_config.py b/apps/backend/tests/core/test_config.py index ca7dab9..b05f308 100644 --- a/apps/backend/tests/core/test_config.py +++ b/apps/backend/tests/core/test_config.py @@ -10,7 +10,7 @@ def _base_settings_kwargs() -> dict: "PROJECT_NAME": "Figaro", "API_PREFIX": "/api/v1", "FRONTEND_URL": "http://localhost:5173", - "ALLOWED_ORIGINS": "http://localhost:5173, http://127.0.0.1:5173", + "ALLOWED_ORIGINS": "http://localhost:5173, http://localhost:5173", "POSTGRES_HOST": "localhost", "POSTGRES_PORT": 5432, "POSTGRES_DB": "figaro", diff --git a/apps/backend/tests/core/test_exception_handlers.py b/apps/backend/tests/core/test_exception_handlers.py index 385975f..e339d87 100644 --- a/apps/backend/tests/core/test_exception_handlers.py +++ b/apps/backend/tests/core/test_exception_handlers.py @@ -19,7 +19,7 @@ def _build_request(path: str = "/test") -> Request: "raw_path": path.encode("utf-8"), "query_string": b"", "headers": [], - "client": ("127.0.0.1", 12345), + "client": ("localhost", 12345), "server": ("testserver", 80), } return Request(scope) diff --git a/apps/backend/tests/models/test_models.py b/apps/backend/tests/models/test_models.py index 9d08999..2a52ff8 100644 --- a/apps/backend/tests/models/test_models.py +++ b/apps/backend/tests/models/test_models.py @@ -46,7 +46,7 @@ def test_distributed_models_have_expected_defaults(): assert job.status == DistributedJobStatus.DRAFT assert job.expected_clients == 1 assert session.status == DistributedSessionStatus.WAITING_CLIENTS - assert session.server_ip == "127.0.0.1" + assert session.server_ip == "localhost" assert session.server_port == 50052 assert len(session.id) == 8 assert participant.status == DistributedParticipantStatus.PENDING_APPROVAL diff --git a/apps/backend/tests/schemas/test_contracts.py b/apps/backend/tests/schemas/test_contracts.py index ba6b545..488a136 100644 --- a/apps/backend/tests/schemas/test_contracts.py +++ b/apps/backend/tests/schemas/test_contracts.py @@ -33,7 +33,7 @@ def test_basic_request_schema_defaults(): sim_cfg = SimulationJobConfigUpdateRequest(config={"x": 1}) sim_copy = SimulationJobCopyRequest() - assert dist_session.server_ip == "127.0.0.1" + assert dist_session.server_ip == "localhost" assert dist_session.server_port == 50052 assert dist_cfg.config == {} assert message.detail is None @@ -64,7 +64,7 @@ def test_from_attributes_response_models_round_trip(): dist_session = DistributedSession( id="sess0001", job_id=2, - server_ip="127.0.0.1", + server_ip="localhost", server_port=50052, ) diff --git a/apps/frontend/.env.example b/apps/frontend/.env.example index 52caf89..1d9b979 100644 --- a/apps/frontend/.env.example +++ b/apps/frontend/.env.example @@ -1,2 +1,2 @@ -VITE_API_BASE_URL=http://127.0.0.1:8000 -OPENAPI_URL=http://127.0.0.1:8000/openapi.json +VITE_API_BASE_URL=http://localhost:8000 +OPENAPI_URL=http://localhost:8000/openapi.json diff --git a/apps/frontend/README.md b/apps/frontend/README.md index 2c21b42..25051f0 100644 --- a/apps/frontend/README.md +++ b/apps/frontend/README.md @@ -23,7 +23,7 @@ This updates `src/api/openapi.d.ts` from FastAPI OpenAPI schema. pnpm run dev ``` -Frontend uses `VITE_API_BASE_URL` (default: `http://127.0.0.1:8000`). +Frontend uses `VITE_API_BASE_URL` (default: `http://localhost:8000`). ## Pages/Features diff --git a/apps/frontend/package.json b/apps/frontend/package.json index d59a501..c5d3584 100644 --- a/apps/frontend/package.json +++ b/apps/frontend/package.json @@ -8,8 +8,8 @@ "dev": "vite", "build": "tsc -b && vite build", "preview": "vite preview", - "gen:api": "openapi-typescript ${OPENAPI_URL:-http://127.0.0.1:8000/openapi.json} -o src/api/openapi.d.ts" - }, + "gen:api": "openapi-typescript ${OPENAPI_URL:-http://localhost:8000/openapi.json} -o src/api/openapi.d.ts" + }, "dependencies": { "@radix-ui/react-alert-dialog": "^1.1.14", "@radix-ui/react-dropdown-menu": "^2.1.16", diff --git a/apps/frontend/src/App.tsx b/apps/frontend/src/App.tsx index caeaace..3faca2f 100644 --- a/apps/frontend/src/App.tsx +++ b/apps/frontend/src/App.tsx @@ -1,58 +1,74 @@ -import { useState } from "react"; -import { BriefcaseBusiness, Loader2 } from "lucide-react"; - +import { BriefcaseBusiness, Sparkles, LayoutDashboard, LineChart } from "lucide-react"; import { Card, CardContent } from "./components/ui/card"; import { Tabs, TabsList, TabsTrigger } from "./components/ui/tabs"; + import { useAgentController } from "./features/agent/useAgentController"; -import { useDistributedController } from "./features/distributed/useDistributedController"; -import { useSimulationController } from "./features/simulation/useSimulationController"; -import { AgentPage } from "./pages/AgentPage"; -import { DistributedPage } from "./pages/DistributedPage"; -import { SimulationPage } from "./pages/SimulationPage"; -import type { PageMode } from "./pages/types"; +import { AgentExperimentStudio } from "./features/agent/components/AgentExperimentStudio"; +import { AgentPlanPreview } from "./features/agent/components/AgentPlanPreview"; +import { AgentRunDashboard } from "./features/agent/components/AgentRunDashboard"; +import { AgentResultsCompare } from "./features/agent/components/AgentResultsCompare"; export default function App() { - const [pageMode, setPageMode] = useState("agent"); - const simulationPageProps = useSimulationController(); - const agentPageProps = useAgentController(); - const distributedPageProps = useDistributedController({ - pageMode, - configSchema: simulationPageProps.configSchema, - }); - const busy = simulationPageProps.busy || distributedPageProps.busy || agentPageProps.busy; + const agentProps = useAgentController(); + const { workflowStep, setWorkflowStep, draftPlan } = agentProps; + + const activeTab = + (workflowStep === "home" || workflowStep === "preview") ? "studio" : + (workflowStep === "running") ? "dashboard" : + "results"; + + const handleTabChange = (val: string) => { + if (val === "studio") { + setWorkflowStep(draftPlan ? "preview" : "home"); + } else if (val === "dashboard") { + setWorkflowStep("running"); + } else if (val === "results") { + setWorkflowStep("results"); + } + }; return ( -

-
- +
+
+ + {/* 顶层全局导航栏 */} +
- -

Figaro Control Center

+
+ +
+

+ Figaro Agent Studio +

+
- setPageMode(value as PageMode)}> - - Agent - Simulation - Distributed + + + + Agent Studio + + + Run Dashboard + + + Results & History +
- {pageMode === "simulation" && } - {pageMode === "distributed" && } - {pageMode === "agent" && } - - {busy && ( -
- - Processing... -
- )} + {/* 动态内容渲染区 */} +
+ {activeTab === "studio" && workflowStep === "home" && } + {activeTab === "studio" && workflowStep === "preview" && } + {activeTab === "dashboard" && } + {activeTab === "results" && } +
); -} +} \ No newline at end of file diff --git a/apps/frontend/src/api/agent.ts b/apps/frontend/src/api/agent.ts index d5e61ac..3a67e27 100644 --- a/apps/frontend/src/api/agent.ts +++ b/apps/frontend/src/api/agent.ts @@ -90,6 +90,7 @@ export type AgentOptimizeProgressResponse = { best_config: Record | null; best_metrics: RunMetrics | null; experiments: AgentExperimentSummary[]; + draft_experiments?: any[]; summary_text: string | null; error_message: string | null; created_at: string | null; @@ -153,6 +154,33 @@ export type AgentRunLogResponse = { created_at: string; }; +export interface ExperimentPlanPreview { + name: string; + plan_summary: string; + config_patch: Record; + estimated_minutes?: number; + estimated_gpu_vram_gb?: number; + risk_warnings?: string[]; +}; + +export interface AgentPlanPreviewRequest { + goal: string; + job_name: string; + model_name: string | null; + system_mode: string; +}; + +export interface AgentPlanPreviewResponse { + optimization_job_id: number; + goal: string; + experiments: ExperimentPlanPreview[]; + system_mode: string; +}; + +export interface AgentPlanReviseRequest { + instruction: string; +}; + async function readJson(response: Response): Promise { if (!response.ok) { throw new Error(await getHttpErrorMessage(response)); @@ -235,4 +263,35 @@ export const agentApi = { const response = await fetch(`${baseUrl}/api/v1/agent/runs/${encodeURIComponent(runId)}/logs`); return readJson(response); }, -}; + + async generatePlan(request: AgentPlanPreviewRequest): Promise { + const response = await fetch(`${baseUrl}/api/v1/agent/optimize/plan`, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify(request), + }); + return readJson(response); + }, + + async startOptimizeFromDraft(request: any): Promise { + const response = await fetch(`${baseUrl}/api/v1/agent/optimize/start`, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify(request), + }); + return readJson(response); + }, + + async revisePlan(jobId: number, request: AgentPlanReviseRequest): Promise { + const response = await fetch(`${baseUrl}/api/v1/agent/optimization-jobs/${encodeURIComponent(jobId)}/revise`, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify(request), + }); + if (!response.ok) { + const err = await response.json().catch(() => ({})); + throw new Error(err.detail || "Failed to revise plan"); + } + return response.json(); + }, +}; \ No newline at end of file diff --git a/apps/frontend/src/api/client.ts b/apps/frontend/src/api/client.ts index 2924daf..1c53624 100644 --- a/apps/frontend/src/api/client.ts +++ b/apps/frontend/src/api/client.ts @@ -1,7 +1,7 @@ import createClient from "openapi-fetch"; import type { paths } from "./openapi"; -export const baseUrl = import.meta.env.VITE_API_BASE_URL ?? "http://127.0.0.1:8000"; +export const baseUrl = import.meta.env.VITE_API_BASE_URL ?? "http://localhost:8000"; export const api = createClient({ baseUrl diff --git a/apps/frontend/src/api/runs.ts b/apps/frontend/src/api/runs.ts index 50c30fc..799cddd 100644 --- a/apps/frontend/src/api/runs.ts +++ b/apps/frontend/src/api/runs.ts @@ -24,7 +24,7 @@ export type RunMetrics = { client_results: Record; }; -const baseUrl = import.meta.env.VITE_API_BASE_URL ?? "http://127.0.0.1:8000"; +const baseUrl = import.meta.env.VITE_API_BASE_URL ?? "http://localhost:8000"; async function readJson(response: Response): Promise { if (!response.ok) { diff --git a/apps/frontend/src/components/ui/separator.tsx b/apps/frontend/src/components/ui/separator.tsx index b49016d..fa85fbe 100644 --- a/apps/frontend/src/components/ui/separator.tsx +++ b/apps/frontend/src/components/ui/separator.tsx @@ -1,3 +1,10 @@ -export function Separator() { - return
; -} +import * as React from "react"; + +export function Separator({ className, ...props }: React.HTMLAttributes) { + return ( +
+ ); +} \ No newline at end of file diff --git a/apps/frontend/src/features/agent/components/AgentExperimentStudio.tsx b/apps/frontend/src/features/agent/components/AgentExperimentStudio.tsx new file mode 100644 index 0000000..887a3b8 --- /dev/null +++ b/apps/frontend/src/features/agent/components/AgentExperimentStudio.tsx @@ -0,0 +1,73 @@ +import { Sparkles, ArrowRight, Loader2 } from "lucide-react"; +import { Button } from "../../../components/ui/button"; +import { Card, CardContent, CardHeader, CardTitle, CardDescription } from "../../../components/ui/card"; +import { Textarea } from "../../../components/ui/textarea"; +import { Input } from "../../../components/ui/input"; +import type { AgentPageProps } from "../../../pages/types"; + +export function AgentExperimentStudio(props: AgentPageProps) { + const { goal, setGoal, jobName, setJobName, presets, busy, handleGeneratePlan } = props; + + return ( +
+
+

Figaro Studio

+

Describe your federated learning goals. The agent will design the configuration.

+
+ + + +
+ + setJobName(e.target.value)} + placeholder="e.g., resnet-cifar-tuning" + /> +
+ +
+ +