Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,5 @@ configs/simulation_runs/
configs/agent_experiment_runs/
configs/agent_config_*.yaml
configs/cached/

.pnpm-store/
46 changes: 28 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@
git clone https://github.com/spire-studio/figaro.git
cd figaro
uv sync

cp .env.example .env
# Edit .env and set:
# OPENAI_API_KEY=your-api-key
# POSTGRES_PASSWORD=postgres
```

## 🚀 Quick Start
Expand All @@ -110,15 +115,7 @@ docker run -d --name figaro-pg -p 5433:5432 \
-e POSTGRES_PASSWORD=postgres -e POSTGRES_DB=figaro postgres:16
```

**Step 2 — Configure `.env`**:
```bash
cp .env.example .env
# Edit .env and set:
# OPENAI_API_KEY=your-api-key
# POSTGRES_PASSWORD=postgres
```

**Step 3 — Backend**:
**Step 2 — Backend**:
```bash
POSTGRES_HOST=localhost POSTGRES_PORT=5433 POSTGRES_PASSWORD=postgres POSTGRES_DB=figaro \
PYTHONPATH=libs:apps/backend/runners \
Expand All @@ -130,7 +127,7 @@ To use a specific GPU (e.g. GPU 1):
CUDA_VISIBLE_DEVICES=1 POSTGRES_HOST=localhost ... uv run uvicorn ...
```

**Step 4 — Frontend** (in another terminal):
**Step 3 — Frontend** (in another terminal):
```bash
cd apps/frontend && pnpm install && pnpm dev
```
Expand Down Expand Up @@ -207,14 +204,27 @@ figaro/

PRs welcome! Figaro is meant to be a readable, research-friendly FL platform.

**Roadmap** — tentative, contributions welcome:

- [ ] **Richer agent planning** — multi-step reflection and failure recovery in the LangGraph pipeline
- [ ] **More aggregation strategies** — FedProx, FedAvgM, Scaffold on top of the existing FedAvg baseline
- [ ] **Hardened distributed mode** — fault tolerance, client reconnection, and heterogeneous workers
- [ ] **Expanded datasets & models** — beyond CIFAR-10 / MNIST and CNN / ResNet
- [ ] **End-to-end reproducibility** — deterministic seeds, artifact lineage, and one-click replay
- [ ] **Observability** — per-run metrics dashboard and structured logs
**Roadmap**:

**Phase 1: Solidifying the Agentic Platform**
- [x] **Interactive Agent Planning** — Multi-turn dialogue support for refining experiments, plus visual topology previews (Plan Preview) before execution.
- [x] **Execution Transparency** — Real-time tracking of node-level status during execution and automated natural-language interpretation of results.
- [ ] **Strict Configuration Engine** — Implement strict Pydantic/JSON Schema validation to resolve historical inconsistencies between `config_schema` and underlying algorithms.
- [ ] **Advanced Experiment Tracking** — Multi-dimensional search filtering (by metrics, hyperparameters, status) and configuration version control (diffing).

**Phase 2: LLM & LoRA Federated Fine-Tuning**
- [ ] **Native LLM Ecosystem Integration** — Seamless Hugging Face model loading (e.g., Llama 3, Qwen) and efficient parsing of JSONL instruction-tuning datasets.
- [ ] **Parameter-Efficient Runtime** — Deep integration with LoRA/PEFT, including support for QLoRA (4-bit/8-bit quantization) to lower client-side memory barriers.
- [ ] **Specialized Adapter Aggregation** — Custom aggregation mechanisms for LoRA adapters, exploring support for heterogeneous LoRA ranks across clients.
- [ ] **LLM Evaluation Metrics** — Built-in evaluation for generative tasks (Rouge, BLEU, Perplexity) and automated LLM-as-a-Judge capabilities.
- [ ] **Hardware Guardrails** — Pre-run dynamic GPU memory estimation (OOM prevention) and automated tuning of gradient accumulation and checkpointing.

**Phase 3: Enterprise & Team Collaboration**
- [ ] **Multi-Tenant Workspaces** — Isolated project environments with Role-Based Access Control (RBAC) and comprehensive audit logging.
- [ ] **Robust Distributed Scheduling** — Global GPU resource queuing, quota management, and enhanced fault tolerance for client reconnections/dropouts.
- [ ] **Cloud-Native Infrastructure** — Native Kubernetes (K8s) Runner integration with auto-scaling workers based on queue volume.
- [ ] **Model Asset Registry** — Centralized Artifact Registry to track complete data lineage from dataset versions to final aggregated weights.
- [ ] **Compliance & Governance** — Automated privacy compliance reporting (e.g., auditing Differential Privacy parameters) to ensure enterprise-grade security.

<p align="center">
<sub>Figaro is for research and educational use.</sub>
Expand Down
46 changes: 28 additions & 18 deletions README.zh-CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@
git clone https://github.com/spire-studio/figaro.git
cd figaro
uv sync

cp .env.example .env
# 编辑 .env,设置:
# OPENAI_API_KEY=your-api-key
# POSTGRES_PASSWORD=postgres
```

## 🚀 快速开始
Expand All @@ -111,15 +116,7 @@ docker run -d --name figaro-pg -p 5433:5432 \
-e POSTGRES_PASSWORD=postgres -e POSTGRES_DB=figaro postgres:16
```

**第 2 步 — 配置 `.env`**:
```bash
cp .env.example .env
# 编辑 .env,设置:
# OPENAI_API_KEY=your-api-key
# POSTGRES_PASSWORD=postgres
```

**第 3 步 — 后端**:
**第 2 步 — 后端**:
```bash
POSTGRES_HOST=localhost POSTGRES_PORT=5433 POSTGRES_PASSWORD=postgres POSTGRES_DB=figaro \
PYTHONPATH=libs:apps/backend/runners \
Expand All @@ -131,7 +128,7 @@ POSTGRES_HOST=localhost POSTGRES_PORT=5433 POSTGRES_PASSWORD=postgres POSTGRES_D
CUDA_VISIBLE_DEVICES=1 POSTGRES_HOST=localhost ... uv run uvicorn ...
```

**第 4 步 — 前端**(在另一个终端):
**第 3 步 — 前端**(在另一个终端):
```bash
cd apps/frontend && pnpm install && pnpm dev
```
Expand Down Expand Up @@ -208,14 +205,27 @@ figaro/

欢迎提 PR!Figaro 的目标是做一个可读性强、研究友好的联邦学习平台。

**Roadmap**(暂定,欢迎贡献):

- [ ] **更丰富的 Agent 规划** —— LangGraph 流水线加入多步反思和失败恢复
- [ ] **更多聚合策略** —— 在现有 FedAvg 之上补充 FedProx、FedAvgM、Scaffold
- [ ] **分布式模式加固** —— 容错、客户端重连、异构 worker 支持
- [ ] **扩展数据集和模型** —— 突破 CIFAR-10 / MNIST 和 CNN / ResNet 的范围
- [ ] **端到端可复现** —— 确定性种子、产物血缘、一键回放
- [ ] **可观测性** —— 单实验指标面板与结构化日志
**Roadmap**:

**Phase 1:夯实 Agentic 实验平台**
- [x] **Agent 交互体验升级** —— 支持多轮对话微调实验计划,提供实验执行前的Plan Preview。
- [x] **执行与分析透明化** —— 支持实验节点的实时状态追踪,以及 Agent 驱动的运行结果自动化图表解释。
- [ ] **配置引擎重构** —— 引入基于 Pydantic/JSON Schema 的严格强校验,彻底修复 `config_schema` 与底层算法实现不一致的问题。
- [ ] **高阶实验管理** —— 支持按指标、超参等多维度搜索过滤实验历史,支持配置文件版本控制与 Diff 差异对比。

**Phase 2:LLM / LoRA 联邦微调支持**
- [ ] **大模型生态原生接入** —— 内置 Hugging Face 适配层,一键加载主流开源模型,支持 JSONL 格式的指令微调数据集高效解析。
- [ ] **高效分布式微调** —— 深度集成 LoRA/PEFT 训练环境,支持 QLoRA (4-bit/8-bit 量化) 以显著降低边缘节点的显存门槛。
- [ ] **专属权重聚合策略** —— 针对 LLM 微调定制的 Adapter 权重聚合机制,探索支持客户端异构 LoRA Rank 的聚合方案。
- [ ] **大模型专项评估体系** —— 集成生成式 NLP 指标,并引入基于大模型的自动化指令跟随能力评估 (LLM-as-a-Judge)。
- [ ] **资源护栏与预判** —— 训练任务启动前进行动态 GPU 显存预估(防 OOM 机制),并支持梯度累积与 Checkpointing 的自动调优。

**Phase 3:平台化与企业级协作**
- [ ] **多租户与细粒度权限** —— 构建多用户隔离的项目空间 (Workspaces),引入基于角色的访问控制 (RBAC) 和完整的操作审计日志。
- [ ] **生产级调度与容错** —— 实现全局 GPU 资源排队与配额限制,增强分布式环境下的客户端掉线重连与死机容错机制。
- [ ] **云原生基础设施** —— 提供原生的 Kubernetes (K8s) Runner 支持,支持基于排队任务量的 Worker 节点动态扩缩容。
- [ ] **模型资产与血缘管理** —— 建立集中式的产物注册表 (Artifact Registry),追踪从数据集版本到最终模型权重的完整数据血缘 (Lineage)。
- [ ] **治理与安全合规** —— 提供自动化的隐私合规检查与报告生成(例如审计差分隐私的 $\epsilon$ 参数),确保企业级联邦学习的数据安全。

<p align="center">
<sub>Figaro 仅用于科研和教学目的。</sub>
Expand Down
149 changes: 148 additions & 1 deletion apps/backend/app/api/v1/endpoints/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@

from __future__ import annotations

from fastapi import APIRouter, status
from fastapi import APIRouter, status, HTTPException
import json

from app.api.deps import AsyncSessionDep
from app.schemas.agent import (
Expand All @@ -27,6 +28,8 @@
from app.services.agent.graph import FederatedAgentGraphBuilder
from app.services.agent.objectives import AgentOptimizationObjective, resolve_objective
from app.services.llm import LLMRegistry, LLMService
from app.schemas.agent import AgentPlanPreviewRequest, AgentPlanPreviewResponse, ExperimentPlanPreview, AgentPlanReviseRequest
import uuid

agent_router = APIRouter()
agent_runtime_service = AgentRuntimeService()
Expand Down Expand Up @@ -143,6 +146,7 @@ def _build_progress_response(snapshot: dict) -> AgentOptimizeProgressResponse:
best_config=snapshot.get("best_config"),
best_metrics=best_metrics,
experiments=_build_experiments(snapshot.get("experiments", [])),
draft_experiments=snapshot.get("draft_experiments", []),
summary_text=snapshot.get("summary_text"),
error_message=snapshot.get("error_message"),
created_at=snapshot.get("created_at"),
Expand Down Expand Up @@ -243,6 +247,7 @@ async def start_optimization(
system_mode=payload.system_mode,
model_name=payload.model_name,
objective=payload.objective,
planned_experiments=payload.planned_experiments,
)
return _build_progress_response(snapshot)

Expand Down Expand Up @@ -463,3 +468,145 @@ async def get_agent_run_logs(
)
for log in logs
]


@agent_router.post(
"/optimize/plan",
response_model=AgentPlanPreviewResponse,
status_code=status.HTTP_200_OK,
summary="Generate a plan preview and save as draft",
)
async def generate_plan_preview(
payload: AgentPlanPreviewRequest,
session: AsyncSessionDep,
) -> AgentPlanPreviewResponse:
"""
1. Call LLM to generate the plan.
2. Create a Job record with PENDING_REVIEW status.
"""
# Borrow the graph builder to do the LLM parsing
llm_service = LLMService()
builder = FederatedAgentGraphBuilder(llm_service=llm_service, session=session)

# Construct a dummy initial state to run through the parse node
temp_state = AgentState(
goal=payload.goal,
job_name=payload.job_name,
model_name=payload.model_name,
system_mode=payload.system_mode,
max_iterations=10,
objective=AgentOptimizationObjective.AUTO,
resolved_objective=AgentOptimizationObjective.ACCURACY,
)

# Call the graph's parse node directly to get the LLM result
parsed_state = await builder._node_parse(temp_state)

previews = []
for exp in parsed_state.experiments:
previews.append(ExperimentPlanPreview(
name=exp.name,
plan_summary=exp.plan_summary or "",
config_patch=exp.config_patch,
# Fallback values if LLM doesn't generate them
estimated_minutes=0,
estimated_gpu_vram_gb=0.0,
))

# Persist the draft (write snapshot to DB, set status to pending_review)
history_service = AgentOptimizationHistoryService(session)

job = await history_service.create_job(
task_id=f"draft-{uuid.uuid4().hex[:8]}",
goal=payload.goal,
job_name=payload.job_name,
model_name=payload.model_name,
system_mode=payload.system_mode,
max_iterations=len(previews) or 1,
status="pending_review",
snapshot={
"goal": payload.goal,
"job_name": payload.job_name,
"status": "pending_review",
"experiments": [],
"draft_experiments": [p.model_dump() for p in previews]
}
)

return AgentPlanPreviewResponse(
optimization_job_id=job.id,
goal=payload.goal,
experiments=previews,
system_mode=payload.system_mode
)

@agent_router.post(
"/optimization-jobs/{job_id}/revise",
response_model=AgentPlanPreviewResponse,
status_code=status.HTTP_200_OK,
summary="Revise a pending plan draft using natural language",
)
async def revise_plan_preview(
job_id: int,
payload: AgentPlanReviseRequest,
session: AsyncSessionDep,
) -> AgentPlanPreviewResponse:
history_service = AgentOptimizationHistoryService(session)
job = await history_service.get_job_or_raise(job_id)

current_status = job.status.value if hasattr(job.status, "value") else str(job.status)
if current_status != "pending_review":
raise HTTPException(status_code=400, detail="Only pending_review jobs can be revised.")

snapshot = job.snapshot_json
current_experiments = snapshot.get("draft_experiments", [])
if not current_experiments:
raise HTTPException(status_code=400, detail="No draft experiments found to revise.")

instructions = (
"You are an AI assistant managing a Federated Learning experiment plan. "
"The user wants to modify the current experiment configuration based on their feedback. "
"Update the JSON configuration to reflect their request. "
"Respond ONLY with a valid JSON array of experiment objects, matching the original schema. "
"Do not include any other text."
)

input_text = (
f"Current Plan (JSON):\n{json.dumps(current_experiments, indent=2)}\n\n"
f"User Modification Request:\n{payload.instruction}\n\n"
"Please output the updated JSON array of experiments:"
)

llm_service = LLMService()
model_name = snapshot.get("model_name")
text, _ = await llm_service.generate_text(
model=model_name if model_name else None,
instructions=instructions,
input_text=input_text,
)

clean_text = text.strip()
if clean_text.startswith("```json"):
clean_text = clean_text[7:]
elif clean_text.startswith("```"):
clean_text = clean_text[3:]
if clean_text.endswith("```"):
clean_text = clean_text[:-3]
clean_text = clean_text.strip()

try:
updated_experiments = json.loads(clean_text)
if not isinstance(updated_experiments, list):
raise ValueError("LLM did not return a list.")
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to parse LLM response: {str(e)}")

snapshot["draft_experiments"] = updated_experiments
await history_service.update_job_snapshot(task_id=job.task_id, snapshot=snapshot)

return AgentPlanPreviewResponse(
optimization_job_id=job.id,
goal=snapshot.get("goal", ""),
experiments=[ExperimentPlanPreview.model_validate(exp) for exp in updated_experiments],
system_mode=snapshot.get("system_mode", "simulation")
)
10 changes: 5 additions & 5 deletions apps/backend/app/core/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ async def init_db() -> None:
async with engine.begin() as conn:
# In dev mode we hard-reset postgres schema so renamed tables/types are
# fully cleaned up (including legacy enum dependencies).
if conn.dialect.name == "postgresql":
await conn.execute(text("DROP SCHEMA IF EXISTS public CASCADE"))
await conn.execute(text("CREATE SCHEMA public"))
else:
await conn.run_sync(SQLModel.metadata.drop_all)
# if conn.dialect.name == "postgresql":
# await conn.execute(text("DROP SCHEMA IF EXISTS public CASCADE"))
# await conn.execute(text("CREATE SCHEMA public"))
# else:
# await conn.run_sync(SQLModel.metadata.drop_all)
await conn.run_sync(SQLModel.metadata.create_all)
1 change: 1 addition & 0 deletions apps/backend/app/models/agent/optimization_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ class AgentOptimizationJobStatus(str, Enum):
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
PENDING_REVIEW = "pending_review"


class AgentOptimizationJob(SQLModel, table=True):
Expand Down
2 changes: 1 addition & 1 deletion apps/backend/app/models/distributed/sessions.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class DistributedSession(SQLModel, table=True):
sa_column=Column(Integer, ForeignKey("distributed_jobs.id"), nullable=False, index=True),
)
server_ip: str = Field(
default="127.0.0.1",
default="localhost",
sa_column=Column(String(DistributedLimits.SERVER_IP_MAX_LENGTH), nullable=False),
)
server_port: int = Field(
Expand Down
Loading
Loading