diff --git a/src/console/commands/command_registry.py b/src/console/commands/command_registry.py index fbb266a..0372832 100644 --- a/src/console/commands/command_registry.py +++ b/src/console/commands/command_registry.py @@ -48,6 +48,7 @@ def create_default(cls) -> CommandRegistry: from src.console.commands.systems.quit_cmd import QuitCommand from src.console.commands.systems.tool_detail_cmd import ToolDetailCommand from src.console.commands.systems.tools_cmd import ToolsCommand + from src.console.commands.workspaces.agent_cmd import AgentCommand from src.console.commands.workspaces.workspace_cmd import WorkspaceCommand registry = cls() @@ -59,6 +60,7 @@ def create_default(cls) -> CommandRegistry: CopyCommand(), HistoryCommand(), WorkspaceCommand(), + AgentCommand(), NewWindowCommand(), HelpCommand(), ClsCommand(), diff --git a/src/console/commands/workspaces/agent_cmd.py b/src/console/commands/workspaces/agent_cmd.py new file mode 100644 index 0000000..6e695b3 --- /dev/null +++ b/src/console/commands/workspaces/agent_cmd.py @@ -0,0 +1,177 @@ +"""Agent management command (/agent).""" + +from __future__ import annotations + +from argparse import ArgumentParser + +from src.core.agent_manager import AgentManager +from src.core.copy2clip import copy_to_clipboard +from src.models.agent import AgentConfig +from src.models.commands import Command, CommandContext, CommandResult + + +def _reset_default(mgr: AgentManager, context: CommandContext) -> CommandResult: + if mgr.reset_default(): + context.console.print("[green]default.md 已根据内置Default Agent重写完成[/green]") + else: + context.console.print("[red]重置失败: 工作区根路径未初始化[/red]") + return CommandResult(success=True) + + +def _show_current(mgr: AgentManager, context: CommandContext) -> CommandResult: + agent = mgr.get_current() + context.console.print( + f"[bold]Current Agent:[/bold] {agent.name}\n" + f"[dim]{agent.description}[/dim]\n" + f"Whitelist: {agent.tool_permissions.whitelist or '(all)'}\n" + f"Blacklist: {agent.tool_permissions.blacklist or '(none)'}" + ) + return CommandResult(success=True) + + +def _list_all(mgr: AgentManager, context: CommandContext) -> CommandResult: + agents = mgr.list_agents() + if not agents: + context.console.print("[yellow]No agents found in .ManualAid/agents/[/yellow]") + return CommandResult(success=True) + + lines = ["[bold]Available Agents:[/bold]"] + for a in agents: + marker = ">" if a.name == mgr.current_agent_name else " " + lines.append(f" {marker} {a.name} — {a.description}") + context.console.print("\n".join(lines)) + return CommandResult(success=True) + + +class AgentCommand(Command): + """Manage Agent configuration""" + + def __init__(self): + super().__init__() + self.name = "agent" + self.aliases = ["/agent"] + self.description = "管理 Agent 配置 (列表、切换、复制、重置)" + self.usage = ( + "/agent — 显示当前 Agent\n" + "/agent list — 列出所有 Agent\n" + "/agent — 按名称或唯一前缀切换 Agent\n" + "/agent default — 切换到默认 Agent\n" + "/agent copy — 复制当前 Agent 的角色+工作流到剪贴板\n" + "/agent copy — 复制指定 Agent 的角色+工作流到剪贴板\n" + "/agent reset — 根据 prompts.py 重写 default.md" + ) + self.argparse = ArgumentParser("agent") + self.argparse.add_argument( + "subcommand", + nargs="?", + default=None, + help="子命令: list, default, copy, reset, 或 Agent 名称", + ) + for usage in self.usage.split("\n"): + self.argparse.add_argument( + "Usage", + nargs="?", + default=None, + help=usage, + ) + + def execute(self, context: CommandContext) -> CommandResult: + # Show help on -h / --help + if "-h" in context.parsed_input.source or "--help" in context.parsed_input.source: + context.console.print(self.argparse.format_help()) + return CommandResult(success=True) + + mgr = AgentManager() + # Parse args from source: "/agent list" -> "list" + parts = context.parsed_input.source.split() + args = " ".join(parts[1:]) if len(parts) > 1 else "" + + if not args: + return _show_current(mgr, context) + if args == "list": + return _list_all(mgr, context) + if args.startswith("copy"): + rest = args[4:].strip() + return self._copy_agent(mgr, context, rest or None) + if args == "default": + return self._switch(mgr, "default", context) + if args == "reset": + return _reset_default(mgr, context) + + # Treat as agent name (supports unique prefix matching) + return self._switch(mgr, args, context) + + def _switch(self, mgr: AgentManager, name: str, context: CommandContext) -> CommandResult: + # Try exact match first + if mgr.switch_agent(name): + agent = mgr.get_current() + context.console.print(f"[green]Switched to agent:[/green] {agent.name}") + # Update TUI dropdown if available + self._sync_tui(context, mgr.current_agent_name) + return CommandResult(success=True) + + # Try unique prefix match + matches = [n for n in mgr.agent_names() if n.startswith(name)] + if len(matches) == 1: + mgr.switch_agent(matches[0]) + context.console.print(f"[green]Switched to agent:[/green] {matches[0]}") + self._sync_tui(context, mgr.current_agent_name) + return CommandResult(success=True) + + if len(matches) > 1: + context.console.print(f"[red]Ambiguous prefix '{name}' matches: {', '.join(matches)}[/red]") + else: + context.console.print(f"[red]Agent '{name}' not found.[/red]") + context.console.print("Use [bold]/agent list[/bold] to see available agents.") + return CommandResult(success=True) + + def _copy_agent(self, mgr: AgentManager, context: CommandContext, name: str | None) -> CommandResult: + if name: + agent = mgr.get(name) + if agent is None: + matches = [n for n in mgr.agent_names() if n.startswith(name)] + if len(matches) == 1: + agent = mgr.get(matches[0]) + elif len(matches) > 1: + context.console.print(f"[red]Ambiguous prefix '{name}' matches: {', '.join(matches)}[/red]") + return CommandResult(success=False) + else: + context.console.print(f"[red]Agent '{name}' not found.[/red]") + return CommandResult(success=False) + else: + agent = mgr.get_current() + + text = self._format_agent_copy(agent) + if copy_to_clipboard(text): + context.console.print(f"[green]Agent '{agent.name}' settings copied to clipboard.[/green]") + else: + context.console.print(text) + context.console.print("[yellow](Clipboard unavailable — printed above instead)[/yellow]") + return CommandResult(success=True) + + @staticmethod + def _format_agent_copy(agent: AgentConfig) -> str: + """Format agent body (role + workflow) for external pasting.""" + parts = [f"--- Agent: {agent.name} ---", ""] + if agent.body_role: + parts.append(agent.body_role) + parts.append("") + if agent.body_workflow: + parts.append(agent.body_workflow) + parts.append("") + return "\n".join(parts).strip() + + @staticmethod + def _sync_tui(context: CommandContext, agent_name: str) -> None: + """Update TUI dropdown and title bar after agent switch.""" + app = context.app + if app is None: + return + try: + from textual.widgets import Select + + select = app.query_one("#agent-select", Select) + if select: + select.value = agent_name + except Exception: + pass diff --git a/src/console/commands/workspaces/workspace_cmd.py b/src/console/commands/workspaces/workspace_cmd.py index 53d9bdd..a5ae3e3 100644 --- a/src/console/commands/workspaces/workspace_cmd.py +++ b/src/console/commands/workspaces/workspace_cmd.py @@ -4,11 +4,14 @@ from src.constants.prompts import ( AUGMENTATION_WRAPPER, - SYSTEM_IDENTITY, + SYSTEM_CONSTRAINTS, + SYSTEM_ROLE, TOOL_RULES, WORKFLOW_GUIDELINES, generate_extensions_section, ) +from src.core.agent_manager import AgentManager +from src.models.agent import AgentConfig from src.models.commands import Command, CommandContext, CommandResult INSTRUCTION: list[str] = ["AGENTS.md", "CLAUDE.md"] @@ -16,14 +19,19 @@ AGENTS_MD_FENCE_END = "" -def _generate_tool_definitions_section(context: CommandContext) -> str: - """Generate XML block with doc for each registered tool.""" +def _generate_tool_definitions_section(context: CommandContext, agent: AgentConfig) -> str: + """Generate XML block with doc for each registered tool, + filtered by the current agent's tool permissions.""" tools = context.tool_registry.list_tools() + if not tools.get("sync"): return "" docs: list[str] = [""] for name in tools["sync"]: + # Filter by agent permissions + if not agent.tool_permissions.is_tool_allowed(name): + continue info = context.tool_registry.get_tool_info(name) if info: doc_xml = info.to_doc() @@ -82,28 +90,74 @@ def _load_agents_md(context: CommandContext) -> str: return "" +def _generate_agent_directive_section(agent: AgentConfig) -> str: + """Generate XML block from the current agent.""" + if not agent.body_role and not agent.body_workflow: + return "" + + parts = [f' ', ""] + if agent.body_role: + parts.append(agent.body_role) + parts.append("") + if agent.body_workflow: + parts.append(agent.body_workflow) + parts.append("") + parts.append(" ") + return "\n".join(parts) + + def _assemble_full_prompt(context: CommandContext) -> str: - """Assemble the complete system prompt from ordered XML sections.""" + """Assemble the complete system prompt from ordered XML sections. + + Order: role → constraints → agent_directive → tool_rules → tool_definitions + → workflow → workspace_context → augmentation → extensions + """ + agent = AgentManager().get_current() + sections = [ "", "", - SYSTEM_IDENTITY, - "", - TOOL_RULES, - "", - _generate_tool_definitions_section(context), - "", - WORKFLOW_GUIDELINES, - "", - _generate_workspace_metadata(context), - "", ] + # ① System role — skip if agent provides its own role + if not agent.body_role: + sections.append(SYSTEM_ROLE) + sections.append("") + + # ② System constraints — always injected (anti-hallucination handled by tool_rules) + sections.append(SYSTEM_CONSTRAINTS) + sections.append("") + + # ③ Agent directive (role + workflow from agent .md file, if any) + agent_directive = _generate_agent_directive_section(agent) + if agent_directive: + sections.append(agent_directive) + sections.append("") + + # ④ Tool call format rules + sections.append(TOOL_RULES) + sections.append("") + + # ⑤ Tool definitions + sections.append(_generate_tool_definitions_section(context, agent)) + sections.append("") + + # ⑥ Workflow guidelines — skip if agent provides its own workflow + if not agent.body_workflow: + sections.append(WORKFLOW_GUIDELINES) + sections.append("") + + # ⑦ Workspace metadata + sections.append(_generate_workspace_metadata(context)) + sections.append("") + + # ⑧ Augmentations from AGENTS.md / CLAUDE.md augmentations = _load_agents_md(context) if augmentations: sections.append(augmentations) sections.append("") + # ⑨ Extensions (Skills / MCP hooks) sections.append(generate_extensions_section()) sections.append("") sections.append("") diff --git a/src/console/main.py b/src/console/main.py index 0d1c65c..65a8d15 100644 --- a/src/console/main.py +++ b/src/console/main.py @@ -80,6 +80,13 @@ def init_workspace(start_path: str | None = None) -> Workspace | None: workspace: Workspace = Workspace(str(folder_path)) tool_registry.register(workspace) + # Initialize AgentManager and write default agent config + from src.core.agent_manager import AgentManager + + agent_manager = AgentManager() + agent_manager.initialize(workspace.root_path) + agent_manager.write_default(workspace.root_path) + # 在创建新会话之前清理孤立的会话 _cleanup_orphaned_sessions(workspace.db) diff --git a/src/console/ui/repl.py b/src/console/ui/repl.py index 47f2f86..326fc45 100644 --- a/src/console/ui/repl.py +++ b/src/console/ui/repl.py @@ -6,11 +6,12 @@ from textual.app import App, ComposeResult from textual.binding import Binding from textual.containers import Horizontal, Vertical -from textual.widgets import Button, Footer, Label, TextArea +from textual.widgets import Button, Footer, Label, Select, TextArea from src.console.handlers.command_handler import CommandHandler from src.console.handlers.tool_handler import ToolHandler from src.console.ui.tui_console import TuiConsole +from src.core.agent_manager import AgentManager from src.core.input_parser import parse_input from src.core.paste_cache import PasteReference from src.core.paste_window import show_paste_window @@ -48,7 +49,7 @@ class REPL(App): } #title-left { - width: 20%; + width: auto; content-align: right middle; text-style: bold; color: $success; @@ -56,7 +57,7 @@ class REPL(App): } #title-version { - width: 20%; + width: auto; content-align: left middle; color: $text-muted; text-style: italic; @@ -64,8 +65,14 @@ class REPL(App): margin-left: 1; } + #agent-select { + width: auto; + max-width: 40; + margin: 0 1; + } + #title-right { - width: 60%; + width: 1fr; content-align: center middle; color: $text-muted; } @@ -154,12 +161,23 @@ def __init__( def compose(self) -> ComposeResult: """构建控件树""" - # 标题栏:左侧名称 + 中间工作区路径 + 右侧版本号 + # 标题栏:左侧名称 + 版本号 + Agent选择 + 工作区路径 with Horizontal(id="title-bar"), Horizontal(): yield Label(self.CONSOLE_TITLE, id="title-left") from src.constants import __version__ yield Label(f"v{__version__}", id="title-version") + + # Agent selector dropdown + mgr = AgentManager() + options = [(a, a) for a in mgr.agent_names()] + yield Select( + options, + id="agent-select", + prompt="Agent", + value=mgr.current_agent_name, + ) + yield Label("工作区", id="title-right") # 输出区域: 使用新的 TuiConsole 组件 @@ -191,9 +209,10 @@ def on_mount(self) -> None: self.tui_console = tui_console self.result_manager.console = tui_console - # 更新标题栏右侧显示实际工作区路径 + # 更新标题栏右侧显示实际工作区路径和当前 Agent title_right = self.query_one("#title-right", Label) - title_right.update(f"工作区: {self.workspace.root_path}") + mgr = AgentManager() + title_right.update(f"Agent: {mgr.current_agent_name} | 工作区: {self.workspace.root_path}") # 创建审核提交模块并注入审核标签页 from src.core.audit_committer import AuditCommitter @@ -228,6 +247,15 @@ def on_mount(self) -> None: # 自动聚焦输入框 self.query_one("#input-field", TextArea).focus() + def on_select_changed(self, event: Select.Changed) -> None: + """Handle agent selection change from dropdown.""" + if event.select.id == "agent-select": + mgr = AgentManager() + if mgr.switch_agent(str(event.value)): + title_right = self.query_one("#title-right", Label) + title_right.update(f"Agent: {mgr.current_agent_name} | 工作区: {self.workspace.root_path}") + self.tui_console.print(f"[dim]Switched to agent: {mgr.current_agent_name}[/dim]") + # -- 输入处理 ----------------------------------------------------------- def on_button_pressed(self, event: Button.Pressed) -> None: diff --git a/src/constants/manual_aid.py b/src/constants/manual_aid.py index f22ee04..6c88e60 100644 --- a/src/constants/manual_aid.py +++ b/src/constants/manual_aid.py @@ -24,3 +24,6 @@ # 用户配置覆盖文件名 CONFIG_FILE = "config.json" + +# Agent 配置目录 +AGENTS_DIR = "agents" diff --git a/src/constants/prompts.py b/src/constants/prompts.py index a4ef5db..4a8c4f0 100644 --- a/src/constants/prompts.py +++ b/src/constants/prompts.py @@ -2,18 +2,15 @@ from collections.abc import Callable -SYSTEM_IDENTITY: str = """ -你是一个与 ManualAid 工作区集成的、依赖工具进行文件探索和编辑的助手 +SYSTEM_ROLE: str = """ +你是一个与 ManualAid 工作区集成的、依赖工具完成任务的助手 你的能力来源于工作区提供的工具——如果没有调用正确的工具,你无法独立行动 +""" - - 你是一个依赖工具的助手. 你不能独立行动;必须调用工具来完成任务 +SYSTEM_CONSTRAINTS: str = """ + 你是一个依赖工具的助手.你不能独立行动;必须调用工具来完成任务 严格使用指定的 XML 格式来调用工具(见 <tool_rules>) - 调用工具后,始终停止并等待用户的工具输出 - 绝不虚构工具返回值. 绝不臆测结果继续 - 如果工具调用失败或返回空,向用户请求澄清 - -""" +""" TOOL_RULES: str = """ @@ -42,9 +39,9 @@ - + 绝不虚构工具返回的数据 - 在调用工具后停止——不要代表工具生成结果 + 在调用工具后停止并等待用户的工具输出——不要代表工具生成结果,也不要臆测结果继续 如果工具返回错误或空结果,向用户请求澄清 当参数值包含XML标签字符(`<`或`>`)时, 必须将其转换为HTML实体转义符(`<`和`>`). 例如: 如果参数值是, 必须写为<func_call> @@ -53,10 +50,11 @@ WORKFLOW_GUIDELINES: str = """ 1. 在采取行动前充分理解用户的请求. 如有疑问,先提问再行动 -2. 将复杂或多步骤任务分解为较小的顺序子任务;一次一个步骤 -3. 为每个步骤选择最合适的工具. 如果没有合适的工具,解释并请求替代方案 -4. 等待每个工具的结果后再继续下一步 -5. 构建响应时,先使用工具收集信息,然后形成最终答案 +2. 先通过搜索和读取了解项目结构与上下文,再制定具体方案 +3. 将复杂或多步骤任务分解为较小的顺序子任务;一次一个步骤 +4. 为每个步骤选择最合适的工具. 如果没有合适的工具,解释并请求替代方案 +5. 等待每个工具的结果后再继续下一步 +6. 构建响应时,先使用工具收集信息,然后形成最终答案 """ AUGMENTATION_WRAPPER: str = """ diff --git a/src/core/agent_manager.py b/src/core/agent_manager.py new file mode 100644 index 0000000..4987fe2 --- /dev/null +++ b/src/core/agent_manager.py @@ -0,0 +1,290 @@ +"""Agent manager — singleton, loads and manages agent configurations.""" + +from __future__ import annotations + +import threading +import warnings +from pathlib import Path + +from src.constants.manual_aid import AGENTS_DIR, MANUALAID_DIR +from src.constants.prompts import SYSTEM_ROLE, WORKFLOW_GUIDELINES +from src.models.agent import AgentConfig, ToolPermissions + +# --------------------------------------------------------------------------- +# Frontmatter parser (YAML subset: key:value, nested keys, dash lists) +# --------------------------------------------------------------------------- + + +def _parse_frontmatter(content: str) -> tuple[dict, str]: + """Parse YAML frontmatter delimited by --- markers. + + Returns (metadata_dict, body_string). Only supports: + - key: value pairs (strings) + - nested keys via indentation (2-space) + - dash list items under nested keys + + Non-goal: not a full YAML parser; only the subset needed for agent configs. + """ + lines = content.split("\n") + if not lines or lines[0].strip() != "---": + return {}, content + + end_idx = -1 + for i in range(1, len(lines)): + if lines[i].strip() == "---": + end_idx = i + break + + if end_idx == -1: + return {}, content + + frontmatter_lines = lines[1:end_idx] + body = "\n".join(lines[end_idx + 1 :]).strip() + + metadata: dict = {} + current_section: str | None = None + current_subsection: str | None = None + + for line in frontmatter_lines: + stripped = line.rstrip() + if not stripped.strip(): + continue + + indent = len(line) - len(line.lstrip()) + + if stripped.lstrip().startswith("- ") and current_subsection: + item = stripped.lstrip()[2:].strip() + if item: + metadata.setdefault(current_subsection, []).append(item) + elif ":" in stripped: + parts = stripped.split(":", 1) + key = parts[0].strip() + value = parts[1].strip() if len(parts) > 1 and parts[1].strip() else "" + + if indent == 0: + current_section = key + current_subsection = None + if value: + metadata[key] = value + elif indent > 0 and current_section: + qualified = f"{current_section}.{key}" + if value and value != "[]": + metadata[qualified] = value + current_subsection = None + else: + current_subsection = qualified if value != "[]" else None + + return metadata, body + + +def _parse_agent_file(file_path: Path) -> AgentConfig | None: + """Parse a single agent .md file into an AgentConfig.""" + try: + content = file_path.read_text(encoding="utf-8") + except Exception as e: + warnings.warn(f"Failed to read agent file {file_path}: {e}", stacklevel=2) + return None + + metadata, body = _parse_frontmatter(content) + + name = metadata.get("name", file_path.stem) + description = metadata.get("description", "") + + # Parse tool permissions — handle both list and empty-list cases + raw_whitelist = metadata.get("tool_permissions.whitelist", []) + raw_blacklist = metadata.get("tool_permissions.blacklist", []) + if isinstance(raw_whitelist, str): + raw_whitelist = [raw_whitelist] if raw_whitelist else [] + if isinstance(raw_blacklist, str): + raw_blacklist = [raw_blacklist] if raw_blacklist else [] + + tool_permissions = ToolPermissions( + whitelist=raw_whitelist, + blacklist=raw_blacklist, + ) + + # Parse ## Role and ## Workflow sections from body + body_role = "" + body_workflow = "" + current_heading: str | None = None + section_lines: list[str] = [] + + for line in body.split("\n"): + if line.startswith("## "): + # Save previous section + heading_text = line[3:].strip().lower() + joined = "\n".join(section_lines).strip() + if current_heading == "role": + body_role = joined + elif current_heading == "workflow": + body_workflow = joined + + current_heading = heading_text if heading_text in ("role", "workflow") else None + section_lines = [] + elif current_heading: + section_lines.append(line) + else: + section_lines = [] + + # Save last section + joined = "\n".join(section_lines).strip() + if current_heading == "role": + body_role = joined + elif current_heading == "workflow": + body_workflow = joined + + return AgentConfig( + name=name, + description=description, + tool_permissions=tool_permissions, + body_role=body_role, + body_workflow=body_workflow, + ) + + +# --------------------------------------------------------------------------- +# Agent Manager (singleton) +# --------------------------------------------------------------------------- + + +class AgentManager: + """Singleton — loads and caches agent configurations from .ManualAid/agents/. + + Usage: + mgr = AgentManager() + mgr.initialize(workspace_root) + mgr.write_default(workspace_root) + agent = mgr.get_current() + """ + + _instance: AgentManager | None = None + _instance_lock: threading.Lock = threading.Lock() + + def __new__(cls) -> AgentManager: + with cls._instance_lock: + if cls._instance is None: + instance = super().__new__(cls) + instance._initialized = False + cls._instance = instance + return cls._instance + + def __init__(self) -> None: + if self._initialized: + return + self._agents: dict[str, AgentConfig] = {} + self._agents_dir: Path | None = None + self._root_path: Path | None = None + self._loaded = False + self._load_lock: threading.Lock = threading.Lock() + self._current_agent_name: str = "default" + self._initialized = True + + @property + def current_agent_name(self) -> str: + return self._current_agent_name + + @current_agent_name.setter + def current_agent_name(self, value: str) -> None: + self._current_agent_name = value + + def initialize(self, root_path: str | Path) -> None: + """Set the workspace root. Agents are loaded lazily on first access.""" + self._root_path = Path(root_path) + self._agents_dir = self._root_path / MANUALAID_DIR / AGENTS_DIR + self._loaded = False + self._agents = {} + + def _ensure_loaded(self) -> None: + if self._loaded: + return + with self._load_lock: + if self._loaded: + return + self._agents = {} + if self._agents_dir and self._agents_dir.is_dir(): + for fpath in sorted(self._agents_dir.glob("*.md")): + agent = _parse_agent_file(fpath) + if agent is not None: + self._agents[agent.name] = agent + self._loaded = True + + def get(self, name: str) -> AgentConfig | None: + self._ensure_loaded() + return self._agents.get(name) + + def get_default(self) -> AgentConfig: + self._ensure_loaded() + default = self._agents.get("default") + if default is None: + return AgentConfig( + name="default", + description="Default agent", + tool_permissions=ToolPermissions(), + ) + return default + + def get_current(self) -> AgentConfig: + agent = self.get(self._current_agent_name) + return agent if agent is not None else self.get_default() + + def switch_agent(self, name: str) -> bool: + self._ensure_loaded() + if name in self._agents: + self._current_agent_name = name + return True + return False + + def list_agents(self) -> list[AgentConfig]: + self._ensure_loaded() + return list(self._agents.values()) + + def agent_names(self) -> list[str]: + self._ensure_loaded() + return sorted(self._agents.keys()) + + def write_default(self, root_path: str | Path, *, force: bool = False) -> None: + """Write the default.md agent file if it does not exist. + + Content mirrors the prompts.py SYSTEM_ROLE and WORKFLOW_GUIDELINES + so that users can edit language/behavior by modifying this file. + + Args: + root_path: Workspace root directory. + force: If True, overwrite existing default.md. + """ + agents_dir = Path(root_path) / MANUALAID_DIR / AGENTS_DIR + agents_dir.mkdir(parents=True, exist_ok=True) + default_path = agents_dir / "default.md" + if default_path.exists() and not force: + return + + content = f"""--- +name: default +description: Default ManualAid agent +tool_permissions: + whitelist: [] + blacklist: [] +--- + +## Role + +{SYSTEM_ROLE} + +## Workflow + +{WORKFLOW_GUIDELINES} +""" + default_path.write_text(content, encoding="utf-8") + + def reset_default(self) -> bool: + """Rewrite default.md from current prompts.py constants. + + Returns True on success, False if root_path was never set. + """ + if self._root_path is None: + return False + self.write_default(self._root_path, force=True) + # Invalidate cache so next access re-reads the file + self._loaded = False + self._agents = {} + return True diff --git a/src/models/agent.py b/src/models/agent.py new file mode 100644 index 0000000..9f064ca --- /dev/null +++ b/src/models/agent.py @@ -0,0 +1,28 @@ +from __future__ import annotations + +from dataclasses import dataclass, field + + +@dataclass +class ToolPermissions: + whitelist: list[str] = field(default_factory=list) + blacklist: list[str] = field(default_factory=list) + + def is_tool_allowed(self, tool_name: str) -> bool: + """判定工具是否应注入到 /ws 输出. + + Priority: blacklist first, then whitelist. + Empty whitelist = allow all. + """ + if tool_name in self.blacklist: + return False + return not self.whitelist or tool_name in self.whitelist + + +@dataclass +class AgentConfig: + name: str + description: str + tool_permissions: ToolPermissions + body_role: str = "" + body_workflow: str = "" diff --git a/tests/core/test_agent_manager.py b/tests/core/test_agent_manager.py new file mode 100644 index 0000000..bb4d794 --- /dev/null +++ b/tests/core/test_agent_manager.py @@ -0,0 +1,159 @@ +"""Tests for AgentManager and frontmatter parser.""" + +from pathlib import Path + +from src.core.agent_manager import AgentManager, _parse_agent_file, _parse_frontmatter + + +class TestParseFrontmatter: + def test_no_frontmatter(self): + content = "## Role\nhello" + meta, body = _parse_frontmatter(content) + assert meta == {} + assert "hello" in body + + def test_basic_frontmatter(self): + content = """--- +name: test-agent +description: A test agent +--- +## Role +Hello World +""" + meta, body = _parse_frontmatter(content) + assert meta["name"] == "test-agent" + assert meta["description"] == "A test agent" + assert "Hello World" in body + + def test_permissions_frontmatter(self): + content = """--- +name: restricted +description: Restricted agent +tool_permissions: + whitelist: + - read + - glob + blacklist: + - git +--- +## Role +test +""" + meta, _body = _parse_frontmatter(content) + assert meta["tool_permissions.whitelist"] == ["read", "glob"] + assert meta["tool_permissions.blacklist"] == ["git"] + + def test_empty_permissions(self): + content = """--- +name: empty +description: Empty perms +tool_permissions: + whitelist: [] + blacklist: [] +--- +## Role +test +""" + meta, _body = _parse_frontmatter(content) + whitelist = meta.get("tool_permissions.whitelist", []) + blacklist = meta.get("tool_permissions.blacklist", []) + assert whitelist == [], f"Expected [], got {whitelist!r}" + assert blacklist == [], f"Expected [], got {blacklist!r}" + + +class TestParseAgentFile: + def test_full_agent_file(self, tmp_path: Path): + md_file = tmp_path / "test-agent.md" + md_file.write_text( + """--- +name: test-agent +description: My test agent +tool_permissions: + whitelist: + - read + - glob + blacklist: + - git +--- + +## Role + +You are a test agent. + +## Workflow + +1. Test things. +2. Verify results. +""", + encoding="utf-8", + ) + agent = _parse_agent_file(md_file) + assert agent is not None + assert agent.name == "test-agent" + assert agent.description == "My test agent" + assert agent.tool_permissions.whitelist == ["read", "glob"] + assert agent.tool_permissions.blacklist == ["git"] + assert "You are a test agent." in agent.body_role + assert "Test things." in agent.body_workflow + + def test_invalid_file_returns_none(self, tmp_path: Path): + md_file = tmp_path / "invalid.md" + md_file.write_bytes(b"\x80\x81\x82") # invalid UTF-8 + agent = _parse_agent_file(md_file) + assert agent is None + + def test_no_role_section(self, tmp_path: Path): + md_file = tmp_path / "no-role.md" + md_file.write_text( + """--- +name: no-role +description: No role +--- +Some content without sections. +""", + encoding="utf-8", + ) + agent = _parse_agent_file(md_file) + assert agent is not None + assert agent.body_role == "" + assert agent.body_workflow == "" + + +class TestAgentManager: + def test_get_default_fallback(self, tmp_path): + """No agents dir -> get_default() returns a fallback AgentConfig.""" + mgr = AgentManager() + mgr.initialize(tmp_path) + default = mgr.get_default() + assert default.name == "default" + assert default.tool_permissions.whitelist == [] + + def test_switch_unknown_returns_false(self, tmp_path): + mgr = AgentManager() + mgr.initialize(tmp_path) + assert mgr.switch_agent("nonexistent") is False + + def test_write_default_creates_file(self, tmp_path): + mgr = AgentManager() + mgr.initialize(tmp_path) + mgr.write_default(tmp_path) + agents_dir = tmp_path / ".ManualAid" / "agents" + assert (agents_dir / "default.md").exists() + + def test_write_default_is_idempotent(self, tmp_path): + mgr = AgentManager() + mgr.initialize(tmp_path) + mgr.write_default(tmp_path) + content_first = (tmp_path / ".ManualAid" / "agents" / "default.md").read_text(encoding="utf-8") + mgr.write_default(tmp_path) # second call should be no-op + content_second = (tmp_path / ".ManualAid" / "agents" / "default.md").read_text(encoding="utf-8") + assert content_first == content_second + + def test_agent_names_sorted(self, tmp_path): + agents_dir = tmp_path / ".ManualAid" / "agents" + agents_dir.mkdir(parents=True) + (agents_dir / "z-agent.md").write_text("---\nname: z-agent\ndescription: Z\n---\n## Role\nx") + (agents_dir / "a-agent.md").write_text("---\nname: a-agent\ndescription: A\n---\n## Role\nx") + mgr = AgentManager() + mgr.initialize(tmp_path) + assert mgr.agent_names() == ["a-agent", "z-agent"] diff --git a/tests/models/test_agent.py b/tests/models/test_agent.py new file mode 100644 index 0000000..8ace23f --- /dev/null +++ b/tests/models/test_agent.py @@ -0,0 +1,59 @@ +"""Tests for agent data models.""" + +from src.models.agent import AgentConfig, ToolPermissions + + +def test_empty_permissions_allow_all(): + p = ToolPermissions() + assert p.is_tool_allowed("read") + assert p.is_tool_allowed("write") + assert p.is_tool_allowed("git") + # Explicit empty lists are equivalent to no arguments + p2 = ToolPermissions(whitelist=[], blacklist=[]) + assert p2.is_tool_allowed("anything") + + +def test_whitelist_restricts_tools(): + p = ToolPermissions(whitelist=["read", "glob", "ls"]) + assert p.is_tool_allowed("read") + assert p.is_tool_allowed("glob") + assert not p.is_tool_allowed("write") + assert not p.is_tool_allowed("git") + + +def test_blacklist_blocks_tools(): + p = ToolPermissions(blacklist=["git"]) + assert not p.is_tool_allowed("git") + assert p.is_tool_allowed("read") + + +def test_blacklist_overrides_whitelist(): + p = ToolPermissions(whitelist=["read", "git"], blacklist=["git"]) + assert p.is_tool_allowed("read") + assert not p.is_tool_allowed("git") + + +def test_agent_config_creation(): + """Verify AgentConfig stores fields correctly.""" + perms = ToolPermissions(whitelist=["read", "glob"]) + agent = AgentConfig( + name="test-agent", + description="A test agent", + tool_permissions=perms, + body_role="## Role\nYou are a test agent.", + body_workflow="## Workflow\n1. Do work.", + ) + assert agent.name == "test-agent" + assert agent.description == "A test agent" + assert agent.tool_permissions is perms + assert "You are a test agent." in agent.body_role + assert "Do work." in agent.body_workflow + + +def test_agent_config_defaults(): + """Verify AgentConfig defaults are empty strings.""" + agent = AgentConfig(name="minimal", description="Minimal", tool_permissions=ToolPermissions()) + assert agent.body_role == "" + assert agent.body_workflow == "" + assert agent.tool_permissions.whitelist == [] + assert agent.tool_permissions.blacklist == []