diff --git a/README.md b/README.md index 2675310..f1c4c43 100644 --- a/README.md +++ b/README.md @@ -9,25 +9,34 @@ LLM chat interfaces. Paste LLM-generated tool calls (in XML format), review and audit dangerous operations, and manage sessions with full history tracking -- all running locally on your machine. -> **Version**: 0.5.0 | **Python**: >=3.14 +> **Version**: 0.6.0 | **Python**: >=3.14 --- ## Features -- **TUI Console** -- Four-tab Textual interface: RichLog, Tool Calls, Audit, - Statistics -- **12 Built-in Tools** -- File system exploration, search, editing, Git - integration +- **TUI Console** -- Multi-tab Textual interface: RichLog, Tool Calls, Audit, + Statistics, Environment Config, Skill Config +- **14 Built-in Tools** -- File system exploration, search, editing, Git + integration, Shell execution, Skill system +- **Agent System** -- Support Agent configuration via YAML frontmatter for + different roles and permission workflows +- **Skill System** -- Dynamically load and execute custom Shell scripts as + extension tools - **Safe Editing** -- Two-phase commit for write/edit operations with diff preview and manual approval - **Git Integration** -- Whitelist-based Git command execution with safety filters +- **Gitignore Support** -- Automatically parse and apply `.gitignore` file rules + to search and file operations +- **Sensitive File Protection** -- Automatically block access to `.env`, + `*.pem`, `id_rsa` and other sensitive files - **Session Management** -- Automatic session tracking, rename, delete, and switch - **Tool Usage Analytics** -- Per-session and global tool call statistics with ranking -- **Audit System** -- Pending write/edit snapshots with approve/reject workflow +- **Audit System** -- Pending write/edit/Shell snapshots with approve/reject + workflow - **Result Caching** -- Auto-copy results to clipboard with configurable expiration - **Multi-window Launch** -- Spawn new ManualAid windows for different @@ -84,15 +93,17 @@ tabs. ## Console Interface -The TUI is built with [Textual](https://textual.textualize.io/) and has four +The TUI is built with [Textual](https://textual.textualize.io/) and has six tabs: -| Tab | Purpose | -| ---------- | ---------------------------------------------------- | -| RichLog | General log output and messages | -| Tool Calls | Collapsible tool execution results | -| Audit | Pending write/edit operations awaiting approval | -| Statistics | Session summaries, tool rankings, session management | +| Tab | Purpose | +| ------------------ | ----------------------------------------------------- | +| RichLog | General log output and messages | +| Tool Calls | Collapsible tool execution results | +| Audit | Pending write/edit/Shell operations awaiting approval | +| Statistics | Session summaries, tool rankings, session management | +| Environment Config | Environment variable and configuration management | +| Skill Config | Custom skill script management and configuration | ### Keyboard Shortcuts @@ -111,13 +122,14 @@ tabs: | `/help` | `/h` / `/?` | Show help text | | `/cls` | | Clear the log display | | `/workspace` | `/ws` | Generate a system prompt containing workspace information and tool definitions. | +| `/agent` | | List, switch, copy, or reset agent configurations | | `/new` | | Launch new ManualAid window | --- ## Available Tools -ManualAid registers 12 tools for LLM use via XML function calls: +ManualAid registers 14 tools for LLM use via XML function calls: ### Query Tools (read-only) @@ -140,9 +152,16 @@ ManualAid registers 12 tools for LLM use via XML function calls: ### Dangerous Tools (require audit approval) -| Tool | Description | -| ----- | ------------------------------------- | -| `git` | Whitelist-based Git command execution | +| Tool | Description | +| ------- | ------------------------------------------ | +| `git` | Whitelist-based Git command execution | +| `shell` | Execute Shell commands (requires approval) | + +### Extension Tools + +| Tool | Description | +| ------- | ------------------------------------------------------- | +| `skill` | Dynamically load and execute custom Shell script skills | > Tool calls use XML format. See `/help` in the console for syntax examples. @@ -150,12 +169,14 @@ ManualAid registers 12 tools for LLM use via XML function calls: ## Audit Workflow -Write and edit operations go through a two-phase safety workflow: +Write, edit, and Shell command operations go through a two-phase safety +workflow: -1. **Preview** -- The tool computes a diff and stores a snapshot with - `PENDING_AUDIT` status -2. **Review** -- Switch to the Audit tab to review the diff -3. **Decide** -- Click Approve to commit the change, or Reject to discard it +1. **Preview** -- The tool computes a diff or command content and stores a + snapshot with `PENDING_AUDIT` status +2. **Review** -- Switch to the Audit tab to review the diff or command details +3. **Decide** -- Click Approve to commit the change/execute command, or Reject + to discard it Git commands that are not in the safe list (`status`, `diff`, `log`, `show`) also require audit approval before execution. diff --git a/README_ZH.md b/README_ZH.md index 3479c05..965e5d7 100644 --- a/README_ZH.md +++ b/README_ZH.md @@ -8,19 +8,26 @@ ManualAid 提供了一个基于 Textual 的 TUI 控制台,在剪贴板和 LLM 聊天界面之间架起桥梁. 粘贴 LLM 生成的工具调用(XML 格式),审查和审计危险操作,并通过完整的历史追踪管理会话 -- 一切都在本地运行. -> **版本**: 0.5.0 | **Python**: >=3.14 +> **版本**: 0.6.0 | **Python**: >=3.14 --- ## 功能特性 -- **TUI 控制台** -- 四个标签页的 Textual 界面:RichLog、工具调用、审计、统计 -- **12 个内置工具** -- 文件系统探索、搜索、编辑、Git 集成 +- **TUI 控制台** + -- 多标签页的 Textual 界面:RichLog、工具调用、审计、统计、环境配置、Skill 配置 +- **14 个内置工具** + -- 文件系统探索、搜索、编辑、Git 集成、Shell 执行、Skill 系统 +- **Agent 系统** -- 支持通过 YAML + frontmatter 定义 Agent 配置,实现不同角色和权限的工作流 +- **Skill 系统** -- 动态加载和执行自定义 Shell 脚本作为扩展工具 - **安全编辑** -- 写入/编辑操作采用两阶段提交,包含 diff 预览和人工审批 - **Git 集成** -- 基于白名单的 Git 命令执行,带有安全过滤 +- **Gitignore 支持** -- 自动解析和应用 `.gitignore` 文件规则到搜索和文件操作 +- **敏感文件保护** -- 自动拦截对 `.env`、`*.pem`、`id_rsa` 等敏感文件的访问 - **会话管理** -- 自动会话追踪,支持重命名、删除和切换 - **工具使用分析** -- 按会话和全局的工具调用统计及排名 -- **审计系统** -- 待处理的写入/编辑快照,支持批准/拒绝工作流 +- **审计系统** -- 待处理的写入/编辑/Shell 快照,支持批准/拒绝工作流 - **结果缓存** -- 自动将结果复制到剪贴板,可配置过期时间 - **多窗口启动** -- 为不同工作区生成新的 ManualAid 窗口 - **跨平台** -- 支持 Windows、macOS 和 Linux(后两个没测试) @@ -72,14 +79,16 @@ python main.py -p /path/to/your/project ## 控制台界面 -TUI 基于 [Textual](https://textual.textualize.io/) 构建,包含四个标签页: +TUI 基于 [Textual](https://textual.textualize.io/) 构建,包含六个标签页: -| 标签页 | 用途 | -| -------- | ---------------------------- | -| RichLog | 通用日志输出和消息 | -| 工具调用 | 可折叠的工具执行结果 | -| 审计 | 等待审批的写入/编辑操作 | -| 统计 | 会话摘要、工具排名、会话管理 | +| 标签页 | 用途 | +| ---------- | ------------------------------ | +| RichLog | 通用日志输出和消息 | +| 工具调用 | 可折叠的工具执行结果 | +| 审计 | 等待审批的写入/编辑/Shell 操作 | +| 统计 | 会话摘要、工具排名、会话管理 | +| 环境配置 | 环境变量和配置项管理 | +| Skill 配置 | 自定义技能脚本管理和配置 | ### 键盘快捷键 @@ -98,13 +107,14 @@ TUI 基于 [Textual](https://textual.textualize.io/) 构建,包含四个标签 | `/help` | `/h` / `/?` | 显示帮助文本 | | `/cls` | | 清除日志显示 | | `/workspace` | `/ws` | 生成包含工作区信息, 工具定义的系统提示词 | +| `/agent` | | 列出、切换、复制或重置 Agent 配置 | | `/new` | `/n` | 启动新的 ManualAid 窗口 | --- ## 可用工具 -ManualAid 注册了 12 个工具供 LLM 通过 XML 函数调用使用: +ManualAid 注册了 14 个工具供 LLM 通过 XML 函数调用使用: ### 查询工具(只读) @@ -127,9 +137,16 @@ ManualAid 注册了 12 个工具供 LLM 通过 XML 函数调用使用: ### 危险工具(需审计审批) -| 工具 | 描述 | -| ----- | ------------------------- | -| `git` | 基于白名单的 Git 命令执行 | +| 工具 | 描述 | +| ------- | --------------------------- | +| `git` | 基于白名单的 Git 命令执行 | +| `shell` | 执行 Shell 命令(需审核通过) | + +### 扩展工具 + +| 工具 | 描述 | +| ------- | ----------------------------------- | +| `skill` | 动态加载和执行自定义 Shell 脚本技能 | > 工具调用使用 XML 格式. 在控制台中使用 `/help` 查看语法示例. @@ -137,11 +154,11 @@ ManualAid 注册了 12 个工具供 LLM 通过 XML 函数调用使用: ## 审计工作流 -写入和编辑操作经过两阶段安全流程: +写入、编辑和 Shell 命令操作经过两阶段安全流程: -1. **预览** -- 工具计算 diff 并存储状态为 `PENDING_AUDIT` 的快照 -2. **审查** -- 切换到审计标签页审查 diff -3. **决定** -- 点击批准提交更改,或点击拒绝放弃更改 +1. **预览** -- 工具计算 diff 或命令内容并存储状态为 `PENDING_AUDIT` 的快照 +2. **审查** -- 切换到审计标签页审查 diff 或命令详情 +3. **决定** -- 点击批准提交更改/执行命令,或点击拒绝放弃操作 不在安全列表(`status`、`diff`、`log`、`show`)中的 Git 命令也需在执行前通过审计审批. diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 70e2416..8616e1f 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -8,6 +8,76 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.6.0] - 2026-05-08 + +### Added + +- **Agent System**: Introduced Agent configuration management with YAML + frontmatter support for defining system prompts, constraints, and permissions. + Implemented `AgentManager` singleton pattern supporting Agent loading, + switching, and persistence + ([#149](https://github.com/SunYanbox/ManualAid/issues/149)). +- **Skill System**: Added Skill management functionality supporting dynamic + loading and execution of custom Shell scripts as extension tools. Includes + `SkillManager`, `skill_tool`, and related TUI configuration interface + ([#155](https://github.com/SunYanbox/ManualAid/issues/155)). +- **Shell Tool**: New `shell` tool for executing Shell commands with safety + auditing ([#155](https://github.com/SunYanbox/ManualAid/issues/155)). +- **Path Validation Enhancement**: Introduced `ExclusionManager` class to unify + performance exclusion (.gitignore rules) and security exclusion (sensitive + file blocking). Added `SensitiveFileError` exception to directly block access + to `.env`, `*.pem`, `id_rsa` and other sensitive files + ([#151](https://github.com/SunYanbox/ManualAid/issues/151)). +- **Gitignore Support**: New `GitignoreLoader` module to parse `.gitignore` + files and apply exclusion rules to search and file operation tools + ([#151](https://github.com/SunYanbox/ManualAid/issues/151)). +- **Binary Detection Extension**: Extended binary file detector to support Godot + project formats (.godot, .gd, .gd.uid, .tscn) and compilation artifacts (.pdb, + .pyd, .o) ([#151](https://github.com/SunYanbox/ManualAid/issues/154)). +- **Search Tool Binary Filtering**: Integrated binary file detection mechanism + in `regex_search` and `exact_search` tools to automatically skip binary files + ([#154](https://github.com/SunYanbox/ManualAid/issues/154)). +- **Agent Command**: New `/agent` command supporting listing agents, switching + current agent, copying agent configuration, and resetting default agent + ([#149](https://github.com/SunYanbox/ManualAid/issues/149)). +- **Config Manager**: New `ConfigManager` class for unified environment variable + configuration management + ([#155](https://github.com/SunYanbox/ManualAid/issues/155)). +- **Audit Committer**: New `AuditCommitter` class to handle audit system commit + logic ([#155](https://github.com/SunYanbox/ManualAid/issues/155)). +- **TUI Configuration Tabs**: New environment configuration and skill + configuration tabs providing graphical configuration interface + ([#155](https://github.com/SunYanbox/ManualAid/issues/155)). +- **Shell Result Tab**: New shell execution result tab displaying command output + ([#155](https://github.com/SunYanbox/ManualAid/issues/155)). + +### Changed + +- **System Prompt Refactor**: Refactored system prompt assembly logic to support + Agent-based dynamic overrides. Explicitly defined prompt assembly order: Role + → Constraints → Agent Directive → Tool Rules → Tool Definitions → Workflow → + Workspace Context → Augmentation → Extensions + ([#149](https://github.com/SunYanbox/ManualAid/issues/149)). +- **Tool Permission Filtering**: `/ws` command now filters tool definitions + based on current Agent's tool permission whitelist + ([#149](https://github.com/SunYanbox/ManualAid/issues/149)). +- **Path Exclusion Logic Refactor**: Removed `PermissionManager` class and + integrated sensitive file rules into `ExclusionManager`. Simplified search and + traversal logic with unified `ExclusionManager` for path filtering + ([#151](https://github.com/SunYanbox/ManualAid/issues/151)). +- **TUI Startup Initialization**: TUI now automatically initializes default + agent on startup ([#149](https://github.com/SunYanbox/ManualAid/issues/149)). +- **Database Extension**: Extended database manager to support Agent and Skill + related data persistence + ([#155](https://github.com/SunYanbox/ManualAid/issues/155)). + +### Fixed + +- **Git Tool Output Preservation**: Fixed issue where Git commands returning + non-zero exit code but still producing stdout (e.g., `git diff --exit-code`) + would discard stdout entirely. Now always preserves stdout even when + returncode != 0 ([#150](https://github.com/SunYanbox/ManualAid/issues/150)). + ## [0.5.0] - 2026-05-05 ### Added @@ -251,6 +321,7 @@ and this project adheres to _Initial release features and history._ +[0.6.0]: https://github.com/SunYanbox/ManualAid/releases/tag/v0.6.0 [0.5.0]: https://github.com/SunYanbox/ManualAid/releases/tag/v0.5.0 [0.4.1]: https://github.com/SunYanbox/ManualAid/releases/tag/v0.4.1 [0.4.0]: https://github.com/SunYanbox/ManualAid/releases/tag/v0.4.0 diff --git a/docs/CHANGELOG_ZH.md b/docs/CHANGELOG_ZH.md index 1540562..591644f 100644 --- a/docs/CHANGELOG_ZH.md +++ b/docs/CHANGELOG_ZH.md @@ -8,6 +8,59 @@ [Keep a Changelog](https://keepachangelog.com/zh-CN/1.0.0/). 并采用 [语义化版本](https://semver.org/lang/Chinese/). +## [0.6.0] - 2026-05-08 + +### 新增 + +- **Agent 系统**: 引入 Agent 配置管理功能,支持通过 YAML + frontmatter 定义 Agent 的系统提示词、约束和权限. 实现了 `AgentManager` + 单例模式, 支持 Agent 的加载、切换和持久化 ([#149](https://github.com/SunYanbox/ManualAid/issues/149)). +- **Skill 系统**: 添加 Skill 管理功能,支持动态加载和执行自定义 Shell 脚本作为扩展工具. 包含 + `SkillManager`、`skill_tool` + 和相关的 TUI 配置界面 ([#155](https://github.com/SunYanbox/ManualAid/issues/155)). +- **Shell 工具**: 新增 `shell` + 工具,支持执行 Shell 命令并进行安全审核 ([#155](https://github.com/SunYanbox/ManualAid/issues/155)). +- **路径验证增强**: 引入 `ExclusionManager` + 类,统一管理性能排除(.gitignore 规则)和安全排除(敏感文件拦截). 新增 + `SensitiveFileError` 异常, 直接禁止访问 `.env`、`*.pem`、`id_rsa` + 等敏感文件 ([#151](https://github.com/SunYanbox/ManualAid/issues/151)). +- **Gitignore 支持**: 新增 `GitignoreLoader` 模块,支持解析 `.gitignore` + 文件并应用排除规则到搜索和文件操作工具 ([#151](https://github.com/SunYanbox/ManualAid/issues/151)). +- **二进制文件检测扩展**: 扩展二进制文件检测器以支持 Godot 项目格式(.godot、.gd、.gd.uid、.tscn)和编译产物(.pdb、.pyd、.o) + ([#151](https://github.com/SunYanbox/ManualAid/issues/154)). +- **搜索工具二进制文件过滤**: 在 `regex_search` 和 `exact_search` + 工具中集成二进制文件检测机制,自动跳过二进制文件 ([#154](https://github.com/SunYanbox/ManualAid/issues/154)). +- **Agent 命令**: 新增 `/agent` + 命令,支持列出 Agent、切换当前 Agent、复制 Agent 配置和重置默认 Agent + ([#149](https://github.com/SunYanbox/ManualAid/issues/149)). +- **配置管理器**: 新增 `ConfigManager` + 类,统一管理环境变量配置 ([#155](https://github.com/SunYanbox/ManualAid/issues/155)). +- **审计提交器**: 新增 `AuditCommitter` + 类,处理审计系统的提交逻辑 ([#155](https://github.com/SunYanbox/ManualAid/issues/155)). +- **TUI 配置标签页**: 新增环境配置和 Skill 配置标签页,提供图形化配置界面 ([#155](https://github.com/SunYanbox/ManualAid/issues/155)). +- **Shell 结果标签页**: 新增 Shell 执行结果标签页,展示命令输出 ([#155](https://github.com/SunYanbox/ManualAid/issues/155)). + +### 更改 + +- **系统提示词重构**: 重构系统提示词组装逻辑,支持基于 Agent 配置的动态覆盖. 明确定义提示词组装顺序:Role + → Constraints → Agent Directive → Tool Rules → Tool Definitions → Workflow + →Workspace Context → Augmentation → Extensions + ([#149](https://github.com/SunYanbox/ManualAid/issues/149)). +- **工具权限过滤**: `/ws` + 命令现在根据当前 Agent 的工具权限白名单过滤工具定义 ([#149](https://github.com/SunYanbox/ManualAid/issues/149)). +- **路径排除逻辑重构**: 移除 `PermissionManager` 类,整合敏感文件规则至 + `ExclusionManager`. 简化搜索与遍历逻辑,统一使用 `ExclusionManager` + 进行路径过滤 ([#151](https://github.com/SunYanbox/ManualAid/issues/151)). +- **TUI 启动初始化**: TUI 启动时自动初始化默认 Agent + ([#149](https://github.com/SunYanbox/ManualAid/issues/149)). +- **数据库扩展**: 扩展数据库管理器,支持 Agent 和 Skill 相关数据的持久化 ([#155](https://github.com/SunYanbox/ManualAid/issues/155)). + +### 修复 + +- **Git 工具输出保留**: 修复当 Git 命令返回非零退出码但仍有标准输出时(如 + `git diff --exit-code`),之前的代码会完全丢弃 stdout 的问题. 现在总是保留 stdout,即使 returncode + != 0 ([#150](https://github.com/SunYanbox/ManualAid/issues/150)). + ## [0.5.0] - 2026-05-05 ### 新增 @@ -198,6 +251,7 @@ _初始发布的功能和历史记录._ +[0.6.0]: https://github.com/SunYanbox/ManualAid/releases/tag/v0.6.0 [0.5.0]: https://github.com/SunYanbox/ManualAid/releases/tag/v0.5.0 [0.4.1]: https://github.com/SunYanbox/ManualAid/releases/tag/v0.4.1 [0.4.0]: https://github.com/SunYanbox/ManualAid/releases/tag/v0.4.0 diff --git a/pyproject.toml b/pyproject.toml index ed36fd5..c099493 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "ManualAid" -version = "0.5.0" +version = "0.6.0" description = "" requires-python = ">=3.14" dependencies = [ diff --git a/requirements.txt b/requirements.txt index e022fb4..f38a9b6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,4 +7,5 @@ pytest-cov==7.1.0 rich==15.0.0 ruff==0.15.11 textual -python-dotenv \ No newline at end of file +python-dotenv +pyyaml \ No newline at end of file diff --git a/src/console/commands/command_registry.py b/src/console/commands/command_registry.py index fbb266a..0372832 100644 --- a/src/console/commands/command_registry.py +++ b/src/console/commands/command_registry.py @@ -48,6 +48,7 @@ def create_default(cls) -> CommandRegistry: from src.console.commands.systems.quit_cmd import QuitCommand from src.console.commands.systems.tool_detail_cmd import ToolDetailCommand from src.console.commands.systems.tools_cmd import ToolsCommand + from src.console.commands.workspaces.agent_cmd import AgentCommand from src.console.commands.workspaces.workspace_cmd import WorkspaceCommand registry = cls() @@ -59,6 +60,7 @@ def create_default(cls) -> CommandRegistry: CopyCommand(), HistoryCommand(), WorkspaceCommand(), + AgentCommand(), NewWindowCommand(), HelpCommand(), ClsCommand(), diff --git a/src/console/commands/workspaces/agent_cmd.py b/src/console/commands/workspaces/agent_cmd.py new file mode 100644 index 0000000..6e695b3 --- /dev/null +++ b/src/console/commands/workspaces/agent_cmd.py @@ -0,0 +1,177 @@ +"""Agent management command (/agent).""" + +from __future__ import annotations + +from argparse import ArgumentParser + +from src.core.agent_manager import AgentManager +from src.core.copy2clip import copy_to_clipboard +from src.models.agent import AgentConfig +from src.models.commands import Command, CommandContext, CommandResult + + +def _reset_default(mgr: AgentManager, context: CommandContext) -> CommandResult: + if mgr.reset_default(): + context.console.print("[green]default.md 已根据内置Default Agent重写完成[/green]") + else: + context.console.print("[red]重置失败: 工作区根路径未初始化[/red]") + return CommandResult(success=True) + + +def _show_current(mgr: AgentManager, context: CommandContext) -> CommandResult: + agent = mgr.get_current() + context.console.print( + f"[bold]Current Agent:[/bold] {agent.name}\n" + f"[dim]{agent.description}[/dim]\n" + f"Whitelist: {agent.tool_permissions.whitelist or '(all)'}\n" + f"Blacklist: {agent.tool_permissions.blacklist or '(none)'}" + ) + return CommandResult(success=True) + + +def _list_all(mgr: AgentManager, context: CommandContext) -> CommandResult: + agents = mgr.list_agents() + if not agents: + context.console.print("[yellow]No agents found in .ManualAid/agents/[/yellow]") + return CommandResult(success=True) + + lines = ["[bold]Available Agents:[/bold]"] + for a in agents: + marker = ">" if a.name == mgr.current_agent_name else " " + lines.append(f" {marker} {a.name} — {a.description}") + context.console.print("\n".join(lines)) + return CommandResult(success=True) + + +class AgentCommand(Command): + """Manage Agent configuration""" + + def __init__(self): + super().__init__() + self.name = "agent" + self.aliases = ["/agent"] + self.description = "管理 Agent 配置 (列表、切换、复制、重置)" + self.usage = ( + "/agent — 显示当前 Agent\n" + "/agent list — 列出所有 Agent\n" + "/agent — 按名称或唯一前缀切换 Agent\n" + "/agent default — 切换到默认 Agent\n" + "/agent copy — 复制当前 Agent 的角色+工作流到剪贴板\n" + "/agent copy — 复制指定 Agent 的角色+工作流到剪贴板\n" + "/agent reset — 根据 prompts.py 重写 default.md" + ) + self.argparse = ArgumentParser("agent") + self.argparse.add_argument( + "subcommand", + nargs="?", + default=None, + help="子命令: list, default, copy, reset, 或 Agent 名称", + ) + for usage in self.usage.split("\n"): + self.argparse.add_argument( + "Usage", + nargs="?", + default=None, + help=usage, + ) + + def execute(self, context: CommandContext) -> CommandResult: + # Show help on -h / --help + if "-h" in context.parsed_input.source or "--help" in context.parsed_input.source: + context.console.print(self.argparse.format_help()) + return CommandResult(success=True) + + mgr = AgentManager() + # Parse args from source: "/agent list" -> "list" + parts = context.parsed_input.source.split() + args = " ".join(parts[1:]) if len(parts) > 1 else "" + + if not args: + return _show_current(mgr, context) + if args == "list": + return _list_all(mgr, context) + if args.startswith("copy"): + rest = args[4:].strip() + return self._copy_agent(mgr, context, rest or None) + if args == "default": + return self._switch(mgr, "default", context) + if args == "reset": + return _reset_default(mgr, context) + + # Treat as agent name (supports unique prefix matching) + return self._switch(mgr, args, context) + + def _switch(self, mgr: AgentManager, name: str, context: CommandContext) -> CommandResult: + # Try exact match first + if mgr.switch_agent(name): + agent = mgr.get_current() + context.console.print(f"[green]Switched to agent:[/green] {agent.name}") + # Update TUI dropdown if available + self._sync_tui(context, mgr.current_agent_name) + return CommandResult(success=True) + + # Try unique prefix match + matches = [n for n in mgr.agent_names() if n.startswith(name)] + if len(matches) == 1: + mgr.switch_agent(matches[0]) + context.console.print(f"[green]Switched to agent:[/green] {matches[0]}") + self._sync_tui(context, mgr.current_agent_name) + return CommandResult(success=True) + + if len(matches) > 1: + context.console.print(f"[red]Ambiguous prefix '{name}' matches: {', '.join(matches)}[/red]") + else: + context.console.print(f"[red]Agent '{name}' not found.[/red]") + context.console.print("Use [bold]/agent list[/bold] to see available agents.") + return CommandResult(success=True) + + def _copy_agent(self, mgr: AgentManager, context: CommandContext, name: str | None) -> CommandResult: + if name: + agent = mgr.get(name) + if agent is None: + matches = [n for n in mgr.agent_names() if n.startswith(name)] + if len(matches) == 1: + agent = mgr.get(matches[0]) + elif len(matches) > 1: + context.console.print(f"[red]Ambiguous prefix '{name}' matches: {', '.join(matches)}[/red]") + return CommandResult(success=False) + else: + context.console.print(f"[red]Agent '{name}' not found.[/red]") + return CommandResult(success=False) + else: + agent = mgr.get_current() + + text = self._format_agent_copy(agent) + if copy_to_clipboard(text): + context.console.print(f"[green]Agent '{agent.name}' settings copied to clipboard.[/green]") + else: + context.console.print(text) + context.console.print("[yellow](Clipboard unavailable — printed above instead)[/yellow]") + return CommandResult(success=True) + + @staticmethod + def _format_agent_copy(agent: AgentConfig) -> str: + """Format agent body (role + workflow) for external pasting.""" + parts = [f"--- Agent: {agent.name} ---", ""] + if agent.body_role: + parts.append(agent.body_role) + parts.append("") + if agent.body_workflow: + parts.append(agent.body_workflow) + parts.append("") + return "\n".join(parts).strip() + + @staticmethod + def _sync_tui(context: CommandContext, agent_name: str) -> None: + """Update TUI dropdown and title bar after agent switch.""" + app = context.app + if app is None: + return + try: + from textual.widgets import Select + + select = app.query_one("#agent-select", Select) + if select: + select.value = agent_name + except Exception: + pass diff --git a/src/console/commands/workspaces/workspace_cmd.py b/src/console/commands/workspaces/workspace_cmd.py index 53d9bdd..bb94736 100644 --- a/src/console/commands/workspaces/workspace_cmd.py +++ b/src/console/commands/workspaces/workspace_cmd.py @@ -4,11 +4,17 @@ from src.constants.prompts import ( AUGMENTATION_WRAPPER, - SYSTEM_IDENTITY, + SYSTEM_CONSTRAINTS, + SYSTEM_ROLE, TOOL_RULES, WORKFLOW_GUIDELINES, + clear_extension_hooks, generate_extensions_section, + register_extension_hook, ) +from src.core.agent_manager import AgentManager +from src.core.skill_manager import SkillManager +from src.models.agent import AgentConfig from src.models.commands import Command, CommandContext, CommandResult INSTRUCTION: list[str] = ["AGENTS.md", "CLAUDE.md"] @@ -16,14 +22,22 @@ AGENTS_MD_FENCE_END = "" -def _generate_tool_definitions_section(context: CommandContext) -> str: - """Generate XML block with doc for each registered tool.""" +def _generate_tool_definitions_section(context: CommandContext, agent: AgentConfig, enable_skill: bool = False) -> str: + """Generate XML block with doc for each registered tool, + filtered by the current agent's tool permissions.""" tools = context.tool_registry.list_tools() + if not tools.get("sync"): return "" docs: list[str] = [""] for name in tools["sync"]: + # 与src/workspace/tools/skill_tool.py一致 + if (not enable_skill) and name == "skill": + continue + # Filter by agent permissions + if not agent.tool_permissions.is_tool_allowed(name): + continue info = context.tool_registry.get_tool_info(name) if info: doc_xml = info.to_doc() @@ -82,32 +96,111 @@ def _load_agents_md(context: CommandContext) -> str: return "" +def _generate_agent_directive_section(agent: AgentConfig) -> str: + """Generate XML block from the current agent.""" + if not agent.body_role and not agent.body_workflow: + return "" + + parts = [f' ', ""] + if agent.body_role: + parts.append(agent.body_role) + parts.append("") + if agent.body_workflow: + parts.append(agent.body_workflow) + parts.append("") + parts.append(" ") + return "\n".join(parts) + + +def _generate_skill_prompt_section() -> str: + """Generate Skill-related prompt sections: skill instructions and available skills list. + + Returns: + XML-formatted string containing skill prompt and available skills + """ + skill_manager = SkillManager() + + # 获取启用的 skills(会根据数据库中的禁用状态过滤) + enabled_skills = skill_manager.get_enabled() + + if not enabled_skills: + return "" + + parts = ["", ""] + + # 可用 Skill 列表 + for skill in sorted(enabled_skills.values(), key=lambda s: s.name): + parts.append(f""" + {skill.name} + {skill.description} + """) + parts.append("") + + return "\n".join(parts) + + def _assemble_full_prompt(context: CommandContext) -> str: - """Assemble the complete system prompt from ordered XML sections.""" + """Assemble the complete system prompt from ordered XML sections. + + Order: role → constraints → agent_directive → tool_rules → tool_definitions + → workflow → workspace_context → augmentation → extensions + """ + agent = AgentManager().get_current() + skill_prompt = _generate_skill_prompt_section() + sections = [ "", "", - SYSTEM_IDENTITY, - "", - TOOL_RULES, - "", - _generate_tool_definitions_section(context), - "", - WORKFLOW_GUIDELINES, - "", - _generate_workspace_metadata(context), - "", ] + # ① System role — skip if agent provides its own role + if not agent.body_role: + sections.append(SYSTEM_ROLE) + sections.append("") + + # ② System constraints — always injected (anti-hallucination handled by tool_rules) + sections.append(SYSTEM_CONSTRAINTS) + sections.append("") + + # ③ Agent directive (role + workflow from agent .md file, if any) + agent_directive = _generate_agent_directive_section(agent) + if agent_directive: + sections.append(agent_directive) + sections.append("") + + # ④ Tool call format rules + sections.append(TOOL_RULES) + sections.append("") + + # ⑤ Tool definitions + sections.append(_generate_tool_definitions_section(context, agent, len(skill_prompt) > 0)) + sections.append("") + + # ⑥ Workflow guidelines — skip if agent provides its own workflow + if not agent.body_workflow: + sections.append(WORKFLOW_GUIDELINES) + sections.append("") + + # ⑦ Workspace metadata + sections.append(_generate_workspace_metadata(context)) + sections.append("") + + # ⑧ Augmentations from AGENTS.md / CLAUDE.md augmentations = _load_agents_md(context) if augmentations: sections.append(augmentations) sections.append("") + if len(skill_prompt) > 0: + register_extension_hook(lambda: skill_prompt) + + # ⑨ Extensions (Skills / MCP hooks) sections.append(generate_extensions_section()) sections.append("") sections.append("") + clear_extension_hooks() + return "\n".join(sections) diff --git a/src/console/main.py b/src/console/main.py index 0d1c65c..07fe49f 100644 --- a/src/console/main.py +++ b/src/console/main.py @@ -14,6 +14,8 @@ from src.console.folder_picker import pick_folder from src.console.result_manager import ResultManager from src.console.ui.repl import REPL +from src.core.config_manager import ConfigManager +from src.core.skill_manager import SkillManager from src.core.tool_registry import ToolRegistry from src.workspace.workspace import Workspace @@ -80,6 +82,22 @@ def init_workspace(start_path: str | None = None) -> Workspace | None: workspace: Workspace = Workspace(str(folder_path)) tool_registry.register(workspace) + # Initialize AgentManager and write default agent config + from src.core.agent_manager import AgentManager + + agent_manager = AgentManager() + agent_manager.initialize(workspace.root_path) + agent_manager.write_default(workspace.root_path) + + # Initialize ConfigManager and apply environment configs + config_manager = ConfigManager() + config_manager.initialize(workspace.root_path) + config_manager.apply_env_configs() + + # Initialize SkillManager and discover skills + skill_manager = SkillManager() + skill_manager.discover(workspace.root_path) + # 在创建新会话之前清理孤立的会话 _cleanup_orphaned_sessions(workspace.db) diff --git a/src/console/ui/repl.py b/src/console/ui/repl.py index 47f2f86..a3900bc 100644 --- a/src/console/ui/repl.py +++ b/src/console/ui/repl.py @@ -6,11 +6,12 @@ from textual.app import App, ComposeResult from textual.binding import Binding from textual.containers import Horizontal, Vertical -from textual.widgets import Button, Footer, Label, TextArea +from textual.widgets import Button, Footer, Label, Select, TextArea from src.console.handlers.command_handler import CommandHandler from src.console.handlers.tool_handler import ToolHandler from src.console.ui.tui_console import TuiConsole +from src.core.agent_manager import AgentManager from src.core.input_parser import parse_input from src.core.paste_cache import PasteReference from src.core.paste_window import show_paste_window @@ -48,7 +49,7 @@ class REPL(App): } #title-left { - width: 20%; + width: auto; content-align: right middle; text-style: bold; color: $success; @@ -56,7 +57,7 @@ class REPL(App): } #title-version { - width: 20%; + width: auto; content-align: left middle; color: $text-muted; text-style: italic; @@ -64,8 +65,14 @@ class REPL(App): margin-left: 1; } + #agent-select { + width: auto; + max-width: 40; + margin: 0 1; + } + #title-right { - width: 60%; + width: 1fr; content-align: center middle; color: $text-muted; } @@ -154,12 +161,23 @@ def __init__( def compose(self) -> ComposeResult: """构建控件树""" - # 标题栏:左侧名称 + 中间工作区路径 + 右侧版本号 + # 标题栏:左侧名称 + 版本号 + Agent选择 + 工作区路径 with Horizontal(id="title-bar"), Horizontal(): yield Label(self.CONSOLE_TITLE, id="title-left") from src.constants import __version__ yield Label(f"v{__version__}", id="title-version") + + # Agent selector dropdown + mgr = AgentManager() + options = [(a, a) for a in mgr.agent_names()] + yield Select( + options, + id="agent-select", + prompt="Agent", + value=mgr.current_agent_name, + ) + yield Label("工作区", id="title-right") # 输出区域: 使用新的 TuiConsole 组件 @@ -191,9 +209,10 @@ def on_mount(self) -> None: self.tui_console = tui_console self.result_manager.console = tui_console - # 更新标题栏右侧显示实际工作区路径 + # 更新标题栏右侧显示实际工作区路径和当前 Agent title_right = self.query_one("#title-right", Label) - title_right.update(f"工作区: {self.workspace.root_path}") + mgr = AgentManager() + title_right.update(f"Agent: {mgr.current_agent_name} | 工作区: {self.workspace.root_path}") # 创建审核提交模块并注入审核标签页 from src.core.audit_committer import AuditCommitter @@ -201,12 +220,21 @@ def on_mount(self) -> None: audit_committer = AuditCommitter(self.workspace) tui_console.audit_tab.set_committer(audit_committer) + # 注入 Shell 结果标签页 + tui_console.shell_result_tab.set_database(self.workspace.db) + # 注入统计标签页 tui_console.stats_tab.set_database( self.workspace.db, getattr(self.tool_registry, "_current_session_id", None), ) + # 注入设置标签页 + from src.core.skill_manager import SkillManager + + skill_manager = SkillManager() + tui_console.settings_tab.set_managers(self.workspace.root_path, skill_manager) + # 创建 command handler self.command_handler = CommandHandler( self.workspace, @@ -228,6 +256,15 @@ def on_mount(self) -> None: # 自动聚焦输入框 self.query_one("#input-field", TextArea).focus() + def on_select_changed(self, event: Select.Changed) -> None: + """Handle agent selection change from dropdown.""" + if event.select.id == "agent-select": + mgr = AgentManager() + if mgr.switch_agent(str(event.value)): + title_right = self.query_one("#title-right", Label) + title_right.update(f"Agent: {mgr.current_agent_name} | 工作区: {self.workspace.root_path}") + self.tui_console.print(f"[dim]Switched to agent: {mgr.current_agent_name}[/dim]") + # -- 输入处理 ----------------------------------------------------------- def on_button_pressed(self, event: Button.Pressed) -> None: diff --git a/src/console/ui/tui_console.py b/src/console/ui/tui_console.py index f93f1e1..282f9ce 100644 --- a/src/console/ui/tui_console.py +++ b/src/console/ui/tui_console.py @@ -9,6 +9,8 @@ from textual.widgets import Collapsible, RichLog, Static, TabbedContent, TabPane from src.console.ui.widgets.audit_tab import AuditTab +from src.console.ui.widgets.settings_tab import SettingsTab +from src.console.ui.widgets.shell_result_tab import ShellResultTab from src.console.ui.widgets.stats_tab import StatsTab @@ -19,7 +21,9 @@ class TuiConsole(Vertical): - Tab 1 (RichLog): 用于普通富文本日志. - Tab 2 (Tool Calls): 用于显示工具调用情况. - Tab 3 (Audit): 用于审核待处理的写入/编辑操作. - - Tab 4 (Statistics): 用于查看会话统计与工具使用排名. + - Tab 4 (Shell Results): 用于查看已执行的 Shell 命令输出. + - Tab 5 (Statistics): 用于查看会话统计与工具使用排名. + - Tab 6 (Settings): 用于配置环境变量和 Skill. """ DEFAULT_CSS = """ @@ -60,8 +64,12 @@ def compose(self): yield Vertical(id="tui-console-tool-calls") with TabPane("Audit", id="tab-audit"): yield AuditTab() + with TabPane("Shell Results", id="tab-shell-results"): + yield ShellResultTab() with TabPane("Statistics", id="tab-stats"): yield StatsTab() + with TabPane("Settings", id="tab-settings"): + yield SettingsTab() @property def main_log(self) -> RichLog: @@ -75,16 +83,29 @@ def tool_calls_container(self) -> Vertical: def audit_tab(self) -> AuditTab: return self.query_one(AuditTab) + @property + def shell_result_tab(self) -> ShellResultTab: + return self.query_one(ShellResultTab) + @property def stats_tab(self) -> StatsTab: return self.query_one(StatsTab) + @property + def settings_tab(self) -> SettingsTab: + return self.query_one(SettingsTab) + async def on_tabbed_content_tab_activated(self, event: TabbedContent.TabActivated) -> None: """切换标签页时刷新内容.""" if event.pane.id == "tab-audit": await self.audit_tab._refresh() + elif event.pane.id == "tab-shell-results": + await self.shell_result_tab._refresh() elif event.pane.id == "tab-stats": await self.stats_tab._refresh() + elif event.pane.id == "tab-settings": + # Settings tab refreshes automatically when managers are set + pass def print(self, *args) -> None: """将内容写入主日志区""" diff --git a/src/console/ui/widgets/__init__.py b/src/console/ui/widgets/__init__.py index e69de29..28c73c8 100644 --- a/src/console/ui/widgets/__init__.py +++ b/src/console/ui/widgets/__init__.py @@ -0,0 +1,17 @@ +"""TUI widgets package.""" + +from src.console.ui.widgets.audit_tab import AuditTab +from src.console.ui.widgets.env_config_tab import EnvConfigTab +from src.console.ui.widgets.settings_tab import SettingsTab +from src.console.ui.widgets.shell_result_tab import ShellResultTab +from src.console.ui.widgets.skill_config_tab import SkillConfigTab +from src.console.ui.widgets.stats_tab import StatsTab + +__all__ = [ + "AuditTab", + "EnvConfigTab", + "SettingsTab", + "ShellResultTab", + "SkillConfigTab", + "StatsTab", +] diff --git a/src/console/ui/widgets/audit_tab.py b/src/console/ui/widgets/audit_tab.py index 3233a89..c955b60 100644 --- a/src/console/ui/widgets/audit_tab.py +++ b/src/console/ui/widgets/audit_tab.py @@ -81,6 +81,13 @@ class AuditTab(Vertical): padding: 0 1; margin-bottom: 1; } + + .audit-section-label { + height: auto; + padding: 0 0 0 0; + text-style: bold; + color: $text; + } """ def __init__(self) -> None: @@ -108,55 +115,80 @@ async def _refresh(self) -> None: await self.remove_children() - pending = self._committer.workspace.db.get_snapshots_by_audit_status("PENDING_AUDIT") + # Result area for showing commit results + result_log = Vertical(id="audit-result-log") + await self.mount(result_log) + + pending_files = self._committer.workspace.db.get_snapshots_by_audit_status("PENDING_AUDIT") + pending_shells = self._committer.workspace.db.get_shell_pending_audits() - if not pending: + total_items = len(pending_files) + len(pending_shells) + + if total_items == 0: await self.mount(Label("没有待审核的更改.", id="audit-empty")) return - # Group by file_path - grouped: defaultdict[str, list[tuple]] = defaultdict(list) - for snap in pending: - grouped[snap[1]].append(snap) - - # Result area for showing commit results - result_log = Vertical(id="audit-result-log") - await self.mount(result_log) + # Group file snapshots by file_path + file_grouped: defaultdict[str, list[tuple]] = defaultdict(list) + for snap in pending_files: + file_grouped[snap[1]].append(snap) header = Label( - f"待审核更改 ({sum(len(snaps) for snaps in grouped.values())} 项)", + f"待审核更改 ({total_items} 项)", id="audit-header", ) await self.mount(header) - for file_path in sorted(grouped): - snaps = grouped[file_path] - # Collect all children for all snaps of this file - all_snap_widgets: list[Static | Horizontal] = [] - for snap in snaps: - snap_id = snap[0] - diff_content = snap[4] or "(空 diff)" + # Render pending shell commands first + if pending_shells: + shell_children: list[Label | Collapsible] = [Label("Shell 命令:", classes="audit-section-label")] + for shell in pending_shells: + shell_id, command, description, _ts, _sid, _status = shell + preview = f"$ {command}" + if description: + preview += f"\n # {description}" - diff_container = Vertical(Static(diff_content, markup=False), classes="audit-diff") + cmd_display = Static(preview, markup=False, classes="audit-diff") btn_row = Horizontal( - Button("批准", variant="primary", id=f"approve-{snap_id}", classes="audit-approve"), - Button("拒绝", variant="error", id=f"reject-{snap_id}", classes="audit-reject"), + Button("批准", variant="primary", id=f"shell_approve-{shell_id}", classes="audit-approve"), + Button("拒绝", variant="error", id=f"shell_reject-{shell_id}", classes="audit-reject"), classes="audit-buttons", ) - all_snap_widgets.append(diff_container) - all_snap_widgets.append(btn_row) - - content_widgets = Vertical(*all_snap_widgets) - - collapsible = Collapsible( - content_widgets, - title=f"{file_path} ({len(snaps)} 次更改)", - classes="audit-collapsible", - ) - # Collapse excess items initially — keep first 3 expanded - if len(self.children) > 6: # header + result_log + 3 expanded - collapsible.collapsed = True - await self.mount(collapsible) + collapsible = Collapsible( + Vertical(cmd_display, btn_row), + title=f"Shell #{shell_id}: {command.strip()[:60]}{'...' if len(command.strip()) > 60 else ''}", + classes="audit-collapsible", + ) + shell_children.append(collapsible) + await self.mount(Vertical(*shell_children)) + + # Render file snapshot changes + if pending_files: + for file_path in sorted(file_grouped): + snaps = file_grouped[file_path] + all_snap_widgets: list[Static | Horizontal] = [] + for snap in snaps: + snap_id = snap[0] + diff_content = snap[4] or "(空 diff)" + + diff_container = Vertical(Static(diff_content, markup=False), classes="audit-diff") + btn_row = Horizontal( + Button("批准", variant="primary", id=f"approve-{snap_id}", classes="audit-approve"), + Button("拒绝", variant="error", id=f"reject-{snap_id}", classes="audit-reject"), + classes="audit-buttons", + ) + all_snap_widgets.append(diff_container) + all_snap_widgets.append(btn_row) + + content_widgets = Vertical(*all_snap_widgets) + collapsible = Collapsible( + content_widgets, + title=f"{file_path} ({len(snaps)} 次更改)", + classes="audit-collapsible", + ) + if len(self.children) > 6: + collapsible.collapsed = True + await self.mount(collapsible) async def on_button_pressed(self, event: Button.Pressed) -> None: """处理批准/拒绝按钮点击.""" @@ -169,16 +201,20 @@ async def on_button_pressed(self, event: Button.Pressed) -> None: if len(parts) != 2: return - action, snap_id_str = parts + action, id_str = parts try: - snapshot_id = int(snap_id_str) + item_id = int(id_str) except ValueError: return if action == "approve": - result = self._committer.commit(snapshot_id, approved=True) + result = self._committer.commit(item_id, approved=True) elif action == "reject": - result = self._committer.commit(snapshot_id, approved=False) + result = self._committer.commit(item_id, approved=False) + elif action == "shell_approve": + result = self._committer.commit_shell(item_id, approved=True) + elif action == "shell_reject": + result = self._committer.commit_shell(item_id, approved=False) else: return diff --git a/src/console/ui/widgets/env_config_tab.py b/src/console/ui/widgets/env_config_tab.py new file mode 100644 index 0000000..4c4ba45 --- /dev/null +++ b/src/console/ui/widgets/env_config_tab.py @@ -0,0 +1,283 @@ +from __future__ import annotations + +from pathlib import Path +from typing import ClassVar + +from textual.containers import Horizontal, Vertical +from textual.screen import ModalScreen +from textual.widgets import Button, DataTable, Input, Label, Static + +from src.core.config_manager import DEFAULT_ENVS + + +class EnvEditDialog(ModalScreen[str | None]): + """Modal dialog for editing an environment variable value""" + + DEFAULT_CSS = """ + EnvEditDialog { + align: center middle; + } + + #env-edit-dialog { + width: 50; + height: auto; + padding: 2; + border: thick $primary; + background: $surface; + } + + #env-edit-dialog > Label { + text-style: bold; + margin-bottom: 1; + } + + .env-edit-field { + margin-bottom: 1; + } + + #env-key-display { + margin-bottom: 1; + color: $text; + text-style: bold; + } + + #env-edit-buttons { + height: auto; + align: right middle; + } + + #env-edit-buttons Button { + margin-left: 1; + } + """ + + def __init__(self, key: str = "", value: str = "") -> None: + super().__init__() + self._key = key + self._value = value + + def compose(self): + with Vertical(id="env-edit-dialog"): + yield Label("编辑环境变量") + yield Label("键:", classes="env-edit-field") + yield Static(self._key, id="env-key-display") + yield Label("值:", classes="env-edit-field") + yield Input(value=self._value, id="env-value-input", placeholder="配置值") + with Horizontal(id="env-edit-buttons"): + yield Button("取消", id="cancel-btn", variant="default") + yield Button("确定", id="ok-btn", variant="primary") + + def on_button_pressed(self, event: Button.Pressed) -> None: + if event.button.id == "ok-btn": + value = self.query_one("#env-value-input", Input).value + self.dismiss(value) + elif event.button.id == "cancel-btn": + self.dismiss(None) + + +class EnvConfigTab(Vertical): + """环境变量配置标签页. + + 显示和编辑预定义的环境变量配置,支持: + - 查看所有预定义的环境变量 + - 编辑环境变量的值 + - 恢复环境变量为默认值 + - 从 .env 文件读取/写入配置 + + 注意:不支持添加自定义环境变量,键字段为只读. + """ + + DEFAULT_CSS: ClassVar[str] = """ + EnvConfigTab { + height: 1fr; + width: 1fr; + padding: 0 1; + overflow-y: auto; + } + + #env-header { + height: auto; + padding: 1 0; + text-style: bold; + color: $text; + border-bottom: solid $primary; + } + + #env-toolbar { + height: 1fr; + padding: 1 0; + align: left middle; + } + + #env-toolbar Button { + margin-right: 1; + } + + #env-table { + height: 1fr; + } + + #env-empty { + height: 100%; + content-align: center middle; + color: $text-muted; + } + + #env-help { + height: auto; + padding: 1; + margin-top: 1; + background: $surface; + border: solid $primary; + color: $text-muted; + } + """ + + def __init__(self) -> None: + super().__init__() + self._workspace_root: Path | None = None + self._env_data: dict[str, str] = {} # 当前环境变量(包含默认值和用户自定义) + self._user_envs: dict[str, str] = {} # 用户在 .env 中的配置 + + def compose(self): + yield Label("环境变量配置", id="env-header") + with Horizontal(id="env-toolbar"): + yield Button("编辑", id="env-edit-btn", variant="primary") + yield Button("恢复默认", id="env-reset-btn", variant="warning") + yield DataTable(id="env-table") + + def set_workspace_root(self, workspace_root: Path) -> None: + """设置工作区根目录.""" + self._workspace_root = workspace_root + self._load_env_file() + self._refresh() + + def on_mount(self) -> None: + table = self.query_one("#env-table", DataTable) + table.add_columns("键", "值", "默认值", "说明") + table.cursor_type = "row" + + def _load_env_file(self) -> None: + """从 .env 文件加载环境变量. + + 只加载预定义的环境变量,忽略自定义变量. + """ + self._user_envs.clear() + self._env_data.clear() + + # 加载默认值 + for key, config in DEFAULT_ENVS.items(): + self._env_data[key] = config["value"] + + # 从 .env 文件加载用户配置(仅限预定义变量) + if self._workspace_root: + env_file = self._workspace_root / ".env" + if env_file.exists(): + try: + for line in env_file.read_text(encoding="utf-8").splitlines(): + line = line.strip() + if line and not line.startswith("#") and "=" in line: + key, _, value = line.partition("=") + key = key.strip() + value = value.strip().strip('"').strip("'") + # 只接受预定义的环境变量 + if key in DEFAULT_ENVS: + self._user_envs[key] = value + self._env_data[key] = value + except Exception: + pass + + def _save_env_file(self) -> None: + """保存非默认值到 .env 文件. + + 只保存预定义变量中与默认值不同的配置. + """ + if not self._workspace_root: + return + + env_file = self._workspace_root / ".env" + + # 只保存与默认值不同的预定义变量 + non_default = {} + for key, value in self._env_data.items(): + if key in DEFAULT_ENVS: + default_config = DEFAULT_ENVS[key] + if value != default_config["value"]: + non_default[key] = value + + try: + if non_default: + lines = ["# ManualAid 环境变量配置\n"] + for key, value in sorted(non_default.items()): + lines.append(f"{key}={value}\n") + env_file.write_text("".join(lines), encoding="utf-8") + elif env_file.exists(): + # 如果所有值都是默认值,删除 .env 文件 + env_file.unlink() + except Exception as e: + self.notify(f"保存失败: {e}", severity="error") + + def _refresh(self) -> None: + """刷新环境变量列表. + + 只显示预定义的环境变量. + """ + table = self.query_one("#env-table", DataTable) + table.clear() + + # 只显示预定义的环境变量 + for key, config in DEFAULT_ENVS.items(): + value = self._env_data.get(key, config["value"]) + default_value = config["value"] + desc = config["description"] + table.add_row(key, value, default_value, desc) + + async def on_button_pressed(self, event: Button.Pressed) -> None: + button_id = event.button.id or "" + + if button_id == "env-edit-btn": + table = self.query_one("#env-table", DataTable) + if table.cursor_row is None or table.cursor_row < 0: + self.notify("请先选择一行", severity="warning") + return + + row_data = table.get_row_at(table.cursor_row) + if row_data: + key, value, _, _ = row_data + self.app.push_screen(EnvEditDialog(key=key, value=value), self._on_edit_result) + + elif button_id == "env-reset-btn": + # 重置选中行为默认值 + table = self.query_one("#env-table", DataTable) + if table.cursor_row is None or table.cursor_row < 0: + self.notify("请先选择一行", severity="warning") + return + + row_data = table.get_row_at(table.cursor_row) + if row_data: + key = row_data[0] + if key in DEFAULT_ENVS: + self._env_data[key] = DEFAULT_ENVS[key]["value"] + if key in self._user_envs: + del self._user_envs[key] + self._save_env_file() + self._refresh() + self.notify(f"已恢复默认值: {key}") + + def _on_edit_result(self, result: str | None) -> None: + """处理编辑对话框的结果. + + Args: + result: 编辑后的值,或 None 表示取消 + """ + if result is not None: + table = self.query_one("#env-table", DataTable) + if table.cursor_row is not None and table.cursor_row >= 0: + row_data = table.get_row_at(table.cursor_row) + if row_data: + key = row_data[0] + self._env_data[key] = result + self._user_envs[key] = result + self._save_env_file() + self._refresh() + self.notify(f"已更新: {key}") diff --git a/src/console/ui/widgets/settings_tab.py b/src/console/ui/widgets/settings_tab.py new file mode 100644 index 0000000..b46a2e2 --- /dev/null +++ b/src/console/ui/widgets/settings_tab.py @@ -0,0 +1,72 @@ +from __future__ import annotations + +from pathlib import Path +from typing import ClassVar + +from textual.containers import Vertical +from textual.widgets import TabbedContent, TabPane + +from src.console.ui.widgets.env_config_tab import EnvConfigTab +from src.console.ui.widgets.skill_config_tab import SkillConfigTab + + +class SettingsTab(Vertical): + """设置标签页. + + 包含多个子标签页: + - 环境变量配置 + - Skill 配置 + """ + + DEFAULT_CSS: ClassVar[str] = """ + SettingsTab { + height: 1fr; + width: 1fr; + padding: 0; + } + + SettingsTab TabbedContent { + height: 1fr; + } + + SettingsTab TabbedContent > TabPane { + padding: 0; + } + """ + + def __init__(self) -> None: + super().__init__() + self._workspace_root: Path | None = None + self._skill_manager = None + + def compose(self): + with TabbedContent(): + with TabPane("环境变量", id="tab-env"): + yield EnvConfigTab() + with TabPane("Skills", id="tab-skills"): + yield SkillConfigTab() + + def on_mount(self) -> None: + """初始化子标签页.""" + pass + + def set_managers(self, workspace_root: Path, skill_manager) -> None: + """设置工作区根目录和管理器.""" + self._workspace_root = workspace_root + self._skill_manager = skill_manager + + # 传递给环境变量配置 + env_tab = self.query_one(EnvConfigTab) + env_tab.set_workspace_root(workspace_root) + + # 传递给 Skill 配置 + skill_tab = self.query_one(SkillConfigTab) + skill_tab.set_managers(skill_manager, workspace_root) + + @property + def env_config_tab(self) -> EnvConfigTab: + return self.query_one(EnvConfigTab) + + @property + def skill_config_tab(self) -> SkillConfigTab: + return self.query_one(SkillConfigTab) diff --git a/src/console/ui/widgets/shell_result_tab.py b/src/console/ui/widgets/shell_result_tab.py new file mode 100644 index 0000000..5986d37 --- /dev/null +++ b/src/console/ui/widgets/shell_result_tab.py @@ -0,0 +1,173 @@ +"""Shell 命令结果标签页 — 查看/复制已执行的 Shell 命令输出.""" + +from __future__ import annotations + +import datetime +from typing import ClassVar + +from textual.containers import Horizontal, Vertical +from textual.widgets import Button, Collapsible, Label, Static + +from src.core.copy2clip import copy_to_clipboard + + +class ShellResultTab(Vertical): + """Shell 命令执行结果标签页. + + 展示所有已完成(已批准/已拒绝)的 Shell 命令及其输出, + 支持展开查看详细输出并复制. + """ + + DEFAULT_CSS: ClassVar[str] = """ + ShellResultTab { + height: 1fr; + width: 1fr; + padding: 0 1; + overflow-y: auto; + } + + #shell-result-placeholder { + height: 100%; + content-align: center middle; + color: $text-muted; + } + + #shell-result-empty { + height: 100%; + content-align: center middle; + color: $text-muted; + } + + #shell-result-header { + height: auto; + padding: 1 0; + text-style: bold; + color: $text; + } + + .shell-collapsible { + height: auto; + margin-bottom: 1; + } + + .shell-output-container { + max-height: 20; + overflow-y: auto; + padding: 1; + background: $surface; + border: solid $primary; + margin-bottom: 1; + } + + .shell-button-row { + height: auto; + align: left middle; + margin-bottom: 1; + } + """ + + def __init__(self) -> None: + super().__init__() + self._db = None + + def compose(self): + yield Label("正在加载...", id="shell-result-placeholder") + + def set_database(self, db) -> None: + """设置数据库引用并刷新.""" + self._db = db + self.set_timer(0.0, self._refresh) + + def on_mount(self) -> None: + if self._db is not None: + self.set_timer(0.1, self._refresh) + + async def _refresh(self) -> None: + """查询已完成的 Shell 命令并重建 UI.""" + if self._db is None: + return + + await self.remove_children() + + shells = self._db.get_shell_completed() + + if not shells: + await self.mount(Label("暂无已执行的 Shell 命令.", id="shell-result-empty")) + return + + header = Label(f"Shell 命令执行记录 ({len(shells)} 项)", id="shell-result-header") + await self.mount(header) + + for i, shell in enumerate(shells): + ( + shell_id, + command, + description, + _ts, + _sid, + audit_status, + output, + exit_code, + executed_at, + ) = shell + + is_approved = audit_status == "APPROVED" + status_icon = "✓" if is_approved else "✗" + status_color = "green" if is_approved else "red" + + # Build detailed content + lines: list[str] = [ + f"[bold]Command:[/bold] $ {command}", + f"[bold]Status:[/bold] [{status_color}]{status_icon} {audit_status}[/{status_color}]", + ] + if description: + lines.append(f"[bold]Description:[/bold] {description}") + if exit_code is not None: + lines.append(f"[bold]Exit Code:[/bold] {exit_code}") + if executed_at: + dt_str = datetime.datetime.fromtimestamp(executed_at).strftime("%Y-%m-%d %H:%M:%S") + lines.append(f"[bold]Executed At:[/bold] {dt_str}") + if output: + lines.append(f"\n[bold]Output:[/bold]\n{output}") + + content = "\n".join(lines) + + output_text = Static(content, markup=True) + output_container = Vertical(output_text, classes="shell-output-container") + copy_btn = Button("复制输出", id=f"shell_copy-{shell_id}") + btn_row = Horizontal(copy_btn, classes="shell-button-row") + + # First items expanded by default, rest collapsed + collapsed = i > 3 + collapsible = Collapsible( + Vertical(output_container, btn_row), + title=f"[{status_color}]{status_icon}[/{status_color}] " + f"Shell #{shell_id}: {command.strip()[:60]}{'...' if len(command.strip()) > 60 else ''}", + classes="shell-collapsible", + collapsed=collapsed, + ) + await self.mount(collapsible) + + async def on_button_pressed(self, event: Button.Pressed) -> None: + """处理复制按钮点击.""" + button_id = event.button.id or "" + if not button_id.startswith("shell_copy-"): + return + + try: + shell_id = int(button_id.split("-", 1)[1]) + except ValueError, IndexError: + return + + if self._db is None: + return + + shells = self._db.get_shell_completed() + for shell in shells: + if shell[0] == shell_id: + output = shell[6] or "(空输出)" + copy_to_clipboard(output) + self.notify("输出已复制到剪贴板", timeout=3) + return + + self.notify("未找到对应记录", severity="error", timeout=3) diff --git a/src/console/ui/widgets/skill_config_tab.py b/src/console/ui/widgets/skill_config_tab.py new file mode 100644 index 0000000..1e16296 --- /dev/null +++ b/src/console/ui/widgets/skill_config_tab.py @@ -0,0 +1,272 @@ +from __future__ import annotations + +from typing import ClassVar + +from textual.containers import Horizontal, Vertical +from textual.widgets import Button, DataTable, Label, Static + + +class SkillConfigTab(Vertical): + """Skill 配置标签页. + + 显示所有发现的 Skill,支持: + - 查看全局和项目级 Skill + - 启用/禁用 Skill + - 查看 Skill 详情 + """ + + DEFAULT_CSS: ClassVar[str] = """ + SkillConfigTab { + height: 1fr; + width: 1fr; + padding: 0 1; + overflow-y: auto; + } + + #skill-header { + height: auto; + padding: 1 0; + text-style: bold; + color: $text; + border-bottom: solid $primary; + } + + #skill-toolbar { + height: auto; + padding: 1 0; + align: left middle; + } + + #skill-toolbar Button { + margin-right: 1; + } + + #skill-table { + height: 1fr; + } + + #skill-detail { + height: auto; + max-height: 10; + padding: 1; + margin-top: 1; + background: $surface; + border: solid $primary; + overflow-y: auto; + } + + #skill-empty { + height: 100%; + content-align: center middle; + color: $text-muted; + } + + .skill-row-enabled { + color: $text; + } + + .skill-row-disabled { + color: $text-muted; + } + """ + + def __init__(self) -> None: + super().__init__() + self._skill_manager = None + self._workspace_root = None + self._skills_data: dict = {} + self._columns_initialized = False # 标记列是否已初始化 + + def compose(self): + yield Label("Skill 配置", id="skill-header") + with Horizontal(id="skill-toolbar"): + yield Button("刷新", id="skill-refresh-btn", variant="default") + yield Button("启用选中", id="skill-enable-btn", variant="success") + yield Button("禁用选中", id="skill-disable-btn", variant="warning") + yield Button("启用全部", id="skill-enable-all-btn", variant="success") + yield Button("禁用全部", id="skill-disable-all-btn", variant="warning") + yield DataTable(id="skill-table") + yield Static("选择 Skill 查看详情", id="skill-detail") + + def set_managers(self, skill_manager, workspace_root) -> None: + """设置 Skill 管理器和工作区根目录.""" + self._skill_manager = skill_manager + self._workspace_root = workspace_root + # 发现 skills + if workspace_root: + from pathlib import Path + + skill_manager.discover(Path(workspace_root)) + self._refresh() + + def on_mount(self) -> None: + table = self.query_one("#skill-table", DataTable) + table.add_columns("启用", "名称", "类型", "描述") + table.cursor_type = "row" + self._columns_initialized = True + # 如果已经有数据,刷新显示 + if self._skills_data: + self._refresh() + + def _refresh(self) -> None: + """刷新 Skill 列表.""" + if self._skill_manager is None: + return + + # 重新获取所有技能(会从数据库加载禁用状态) + self._skills_data = self._skill_manager.get_all() + + # 如果列还没初始化,不刷新(等 on_mount 后自动刷新) + if not self._columns_initialized: + return + + table = self.query_one("#skill-table", DataTable) + table.clear() + + # 获取当前禁用状态用于显示 + disabled_set = self._skill_manager.get_disabled() + + for name, skill in self._skills_data.items(): + is_global = skill.metadata.get("is_global", True) + skill_type = "全局" if is_global else "项目" + # 使用禁用集合判断状态,确保与持久化数据一致 + is_enabled = name not in disabled_set + status = "✓" if is_enabled else "✗" + description = skill.description[:50] + "..." if len(skill.description) > 50 else skill.description + table.add_row(status, name, skill_type, description) + + def _update_detail(self, row_index: int) -> None: + """更新详情显示.""" + if self._skill_manager is None: + return + + table = self.query_one("#skill-table", DataTable) + if row_index is None or row_index < 0: + return + + row_data = table.get_row_at(row_index) + if not row_data: + return + + name = row_data[1] + skill = self._skills_data.get(name) + if not skill: + return + + detail_text = ( + f"[bold]{skill.name}[/bold]\n" + f"位置: {skill.location}\n" + f"类型: {'全局' if skill.metadata.get('is_global', True) else '项目'}\n" + f"状态: {'启用' if skill.enabled else '禁用'}\n\n" + f"描述: {skill.description}" + ) + + detail = self.query_one("#skill-detail", Static) + detail.update(detail_text) + + async def on_data_table_row_highlighted(self, event: DataTable.RowHighlighted) -> None: + """行高亮时更新详情.""" + if event.data_table.id == "skill-table": + self._update_detail(event.cursor_row) + + async def on_button_pressed(self, event: Button.Pressed) -> None: + if self._skill_manager is None: + return + + button_id = event.button.id or "" + + if button_id == "skill-refresh-btn": + if self._workspace_root: + from pathlib import Path + + self._skill_manager.discover(Path(self._workspace_root)) + self._refresh() + self.notify("已刷新 Skill 列表") + + elif button_id == "skill-enable-btn": + # 启用选中的 Skill + table = self.query_one("#skill-table", DataTable) + row_index = table.cursor_row + if row_index is None or row_index < 0: + self.notify("请先选择一个 Skill", severity="warning") + return + + row_data = table.get_row_at(row_index) + if not row_data: + return + + name = row_data[1] + disabled = self._skill_manager.get_disabled() + if name in disabled: + disabled.discard(name) + self._skill_manager.set_disabled(disabled, persist=True) + self._refresh() + self.notify(f"已启用: {name}") + else: + self.notify(f"{name} 已经是启用状态", severity="information") + + elif button_id == "skill-disable-btn": + # 禁用选中的 Skill + table = self.query_one("#skill-table", DataTable) + row_index = table.cursor_row + if row_index is None or row_index < 0: + self.notify("请先选择一个 Skill", severity="warning") + return + + row_data = table.get_row_at(row_index) + if not row_data: + return + + name = row_data[1] + disabled = self._skill_manager.get_disabled() + if name not in disabled: + disabled.add(name) + self._skill_manager.set_disabled(disabled, persist=True) + self._refresh() + self.notify(f"已禁用: {name}") + else: + self.notify(f"{name} 已经是禁用状态", severity="information") + + elif button_id == "skill-enable-all-btn": + self._skill_manager.set_disabled(set(), persist=True) + self._refresh() + self.notify("已启用所有 Skill") + + elif button_id == "skill-disable-all-btn": + all_names = set(self._skills_data.keys()) + self._skill_manager.set_disabled(all_names, persist=True) + self._refresh() + self.notify("已禁用所有 Skill") + + async def on_data_table_cell_selected(self, event: DataTable.CellSelected) -> None: + """单元格选中时切换启用状态(点击启用列).""" + if event.data_table.id != "skill-table": + return + + if event.column_key != 0: # 只在"启用"列点击时切换 + return + + if self._skill_manager is None: + return + + row_data = event.data_table.get_row_at(event.cursor_row) + if not row_data: + return + + name = row_data[1] + skill = self._skills_data.get(name) + if not skill: + return + + # 切换状态 + disabled = self._skill_manager.get_disabled() + if name in disabled: + disabled.discard(name) + else: + disabled.add(name) + + self._skill_manager.set_disabled(disabled, persist=True) + self._refresh() + + status = "启用" if name not in disabled else "禁用" + self.notify(f"已{status}: {name}") diff --git a/src/constants/__init__.py b/src/constants/__init__.py index 3d18726..906d362 100644 --- a/src/constants/__init__.py +++ b/src/constants/__init__.py @@ -1 +1 @@ -__version__ = "0.5.0" +__version__ = "0.6.0" diff --git a/src/constants/manual_aid.py b/src/constants/manual_aid.py index f22ee04..6c88e60 100644 --- a/src/constants/manual_aid.py +++ b/src/constants/manual_aid.py @@ -24,3 +24,6 @@ # 用户配置覆盖文件名 CONFIG_FILE = "config.json" + +# Agent 配置目录 +AGENTS_DIR = "agents" diff --git a/src/constants/prompts.py b/src/constants/prompts.py index a4ef5db..e342577 100644 --- a/src/constants/prompts.py +++ b/src/constants/prompts.py @@ -2,18 +2,15 @@ from collections.abc import Callable -SYSTEM_IDENTITY: str = """ -你是一个与 ManualAid 工作区集成的、依赖工具进行文件探索和编辑的助手 +SYSTEM_ROLE: str = """ +你是一个与 ManualAid 工作区集成的、依赖工具完成任务的助手 你的能力来源于工作区提供的工具——如果没有调用正确的工具,你无法独立行动 +""" - - 你是一个依赖工具的助手. 你不能独立行动;必须调用工具来完成任务 +SYSTEM_CONSTRAINTS: str = """ + 你是一个依赖工具的助手.你不能独立行动;必须调用工具来完成任务 严格使用指定的 XML 格式来调用工具(见 <tool_rules>) - 调用工具后,始终停止并等待用户的工具输出 - 绝不虚构工具返回值. 绝不臆测结果继续 - 如果工具调用失败或返回空,向用户请求澄清 - -""" +""" TOOL_RULES: str = """ @@ -42,9 +39,9 @@ - + 绝不虚构工具返回的数据 - 在调用工具后停止——不要代表工具生成结果 + 在调用工具后停止并等待用户的工具输出——不要代表工具生成结果,也不要臆测结果继续 如果工具返回错误或空结果,向用户请求澄清 当参数值包含XML标签字符(`<`或`>`)时, 必须将其转换为HTML实体转义符(`<`和`>`). 例如: 如果参数值是, 必须写为<func_call> @@ -53,10 +50,11 @@ WORKFLOW_GUIDELINES: str = """ 1. 在采取行动前充分理解用户的请求. 如有疑问,先提问再行动 -2. 将复杂或多步骤任务分解为较小的顺序子任务;一次一个步骤 -3. 为每个步骤选择最合适的工具. 如果没有合适的工具,解释并请求替代方案 -4. 等待每个工具的结果后再继续下一步 -5. 构建响应时,先使用工具收集信息,然后形成最终答案 +2. 先通过搜索和读取了解项目结构与上下文,再制定具体方案 +3. 将复杂或多步骤任务分解为较小的顺序子任务;一次一个步骤 +4. 为每个步骤选择最合适的工具. 如果没有合适的工具,解释并请求替代方案 +5. 等待每个工具的结果后再继续下一步 +6. 构建响应时,先使用工具收集信息,然后形成最终答案 """ AUGMENTATION_WRAPPER: str = """ @@ -75,7 +73,7 @@ # Extension hooks(Skills / MCP placeholders) # --------------------------------------------------------------------------- -EXTENSION_HOOKS: list[Callable[[], str]] = [] +__EXTENSION_HOOKS: list[Callable[[], str]] = [] def register_extension_hook(hook: Callable[[], str]) -> None: @@ -83,15 +81,24 @@ def register_extension_hook(hook: Callable[[], str]) -> None: For use by future Skills/MCP modules at import time. """ - EXTENSION_HOOKS.append(hook) + __EXTENSION_HOOKS.append(hook) + + +def clear_extension_hooks() -> None: + """Clear all registered extension hooks. + + Should be called after generating extensions section to prevent + duplicate registrations on subsequent command invocations. + """ + __EXTENSION_HOOKS.clear() def generate_extensions_section() -> str: """Run all registered extension hooks and emit their output inside .""" - if not EXTENSION_HOOKS: + if not __EXTENSION_HOOKS: return "\n \n" parts = [""] - for hook in EXTENSION_HOOKS: + for hook in __EXTENSION_HOOKS: content = hook() if content: parts.append(content) diff --git a/src/core/agent_manager.py b/src/core/agent_manager.py new file mode 100644 index 0000000..4987fe2 --- /dev/null +++ b/src/core/agent_manager.py @@ -0,0 +1,290 @@ +"""Agent manager — singleton, loads and manages agent configurations.""" + +from __future__ import annotations + +import threading +import warnings +from pathlib import Path + +from src.constants.manual_aid import AGENTS_DIR, MANUALAID_DIR +from src.constants.prompts import SYSTEM_ROLE, WORKFLOW_GUIDELINES +from src.models.agent import AgentConfig, ToolPermissions + +# --------------------------------------------------------------------------- +# Frontmatter parser (YAML subset: key:value, nested keys, dash lists) +# --------------------------------------------------------------------------- + + +def _parse_frontmatter(content: str) -> tuple[dict, str]: + """Parse YAML frontmatter delimited by --- markers. + + Returns (metadata_dict, body_string). Only supports: + - key: value pairs (strings) + - nested keys via indentation (2-space) + - dash list items under nested keys + + Non-goal: not a full YAML parser; only the subset needed for agent configs. + """ + lines = content.split("\n") + if not lines or lines[0].strip() != "---": + return {}, content + + end_idx = -1 + for i in range(1, len(lines)): + if lines[i].strip() == "---": + end_idx = i + break + + if end_idx == -1: + return {}, content + + frontmatter_lines = lines[1:end_idx] + body = "\n".join(lines[end_idx + 1 :]).strip() + + metadata: dict = {} + current_section: str | None = None + current_subsection: str | None = None + + for line in frontmatter_lines: + stripped = line.rstrip() + if not stripped.strip(): + continue + + indent = len(line) - len(line.lstrip()) + + if stripped.lstrip().startswith("- ") and current_subsection: + item = stripped.lstrip()[2:].strip() + if item: + metadata.setdefault(current_subsection, []).append(item) + elif ":" in stripped: + parts = stripped.split(":", 1) + key = parts[0].strip() + value = parts[1].strip() if len(parts) > 1 and parts[1].strip() else "" + + if indent == 0: + current_section = key + current_subsection = None + if value: + metadata[key] = value + elif indent > 0 and current_section: + qualified = f"{current_section}.{key}" + if value and value != "[]": + metadata[qualified] = value + current_subsection = None + else: + current_subsection = qualified if value != "[]" else None + + return metadata, body + + +def _parse_agent_file(file_path: Path) -> AgentConfig | None: + """Parse a single agent .md file into an AgentConfig.""" + try: + content = file_path.read_text(encoding="utf-8") + except Exception as e: + warnings.warn(f"Failed to read agent file {file_path}: {e}", stacklevel=2) + return None + + metadata, body = _parse_frontmatter(content) + + name = metadata.get("name", file_path.stem) + description = metadata.get("description", "") + + # Parse tool permissions — handle both list and empty-list cases + raw_whitelist = metadata.get("tool_permissions.whitelist", []) + raw_blacklist = metadata.get("tool_permissions.blacklist", []) + if isinstance(raw_whitelist, str): + raw_whitelist = [raw_whitelist] if raw_whitelist else [] + if isinstance(raw_blacklist, str): + raw_blacklist = [raw_blacklist] if raw_blacklist else [] + + tool_permissions = ToolPermissions( + whitelist=raw_whitelist, + blacklist=raw_blacklist, + ) + + # Parse ## Role and ## Workflow sections from body + body_role = "" + body_workflow = "" + current_heading: str | None = None + section_lines: list[str] = [] + + for line in body.split("\n"): + if line.startswith("## "): + # Save previous section + heading_text = line[3:].strip().lower() + joined = "\n".join(section_lines).strip() + if current_heading == "role": + body_role = joined + elif current_heading == "workflow": + body_workflow = joined + + current_heading = heading_text if heading_text in ("role", "workflow") else None + section_lines = [] + elif current_heading: + section_lines.append(line) + else: + section_lines = [] + + # Save last section + joined = "\n".join(section_lines).strip() + if current_heading == "role": + body_role = joined + elif current_heading == "workflow": + body_workflow = joined + + return AgentConfig( + name=name, + description=description, + tool_permissions=tool_permissions, + body_role=body_role, + body_workflow=body_workflow, + ) + + +# --------------------------------------------------------------------------- +# Agent Manager (singleton) +# --------------------------------------------------------------------------- + + +class AgentManager: + """Singleton — loads and caches agent configurations from .ManualAid/agents/. + + Usage: + mgr = AgentManager() + mgr.initialize(workspace_root) + mgr.write_default(workspace_root) + agent = mgr.get_current() + """ + + _instance: AgentManager | None = None + _instance_lock: threading.Lock = threading.Lock() + + def __new__(cls) -> AgentManager: + with cls._instance_lock: + if cls._instance is None: + instance = super().__new__(cls) + instance._initialized = False + cls._instance = instance + return cls._instance + + def __init__(self) -> None: + if self._initialized: + return + self._agents: dict[str, AgentConfig] = {} + self._agents_dir: Path | None = None + self._root_path: Path | None = None + self._loaded = False + self._load_lock: threading.Lock = threading.Lock() + self._current_agent_name: str = "default" + self._initialized = True + + @property + def current_agent_name(self) -> str: + return self._current_agent_name + + @current_agent_name.setter + def current_agent_name(self, value: str) -> None: + self._current_agent_name = value + + def initialize(self, root_path: str | Path) -> None: + """Set the workspace root. Agents are loaded lazily on first access.""" + self._root_path = Path(root_path) + self._agents_dir = self._root_path / MANUALAID_DIR / AGENTS_DIR + self._loaded = False + self._agents = {} + + def _ensure_loaded(self) -> None: + if self._loaded: + return + with self._load_lock: + if self._loaded: + return + self._agents = {} + if self._agents_dir and self._agents_dir.is_dir(): + for fpath in sorted(self._agents_dir.glob("*.md")): + agent = _parse_agent_file(fpath) + if agent is not None: + self._agents[agent.name] = agent + self._loaded = True + + def get(self, name: str) -> AgentConfig | None: + self._ensure_loaded() + return self._agents.get(name) + + def get_default(self) -> AgentConfig: + self._ensure_loaded() + default = self._agents.get("default") + if default is None: + return AgentConfig( + name="default", + description="Default agent", + tool_permissions=ToolPermissions(), + ) + return default + + def get_current(self) -> AgentConfig: + agent = self.get(self._current_agent_name) + return agent if agent is not None else self.get_default() + + def switch_agent(self, name: str) -> bool: + self._ensure_loaded() + if name in self._agents: + self._current_agent_name = name + return True + return False + + def list_agents(self) -> list[AgentConfig]: + self._ensure_loaded() + return list(self._agents.values()) + + def agent_names(self) -> list[str]: + self._ensure_loaded() + return sorted(self._agents.keys()) + + def write_default(self, root_path: str | Path, *, force: bool = False) -> None: + """Write the default.md agent file if it does not exist. + + Content mirrors the prompts.py SYSTEM_ROLE and WORKFLOW_GUIDELINES + so that users can edit language/behavior by modifying this file. + + Args: + root_path: Workspace root directory. + force: If True, overwrite existing default.md. + """ + agents_dir = Path(root_path) / MANUALAID_DIR / AGENTS_DIR + agents_dir.mkdir(parents=True, exist_ok=True) + default_path = agents_dir / "default.md" + if default_path.exists() and not force: + return + + content = f"""--- +name: default +description: Default ManualAid agent +tool_permissions: + whitelist: [] + blacklist: [] +--- + +## Role + +{SYSTEM_ROLE} + +## Workflow + +{WORKFLOW_GUIDELINES} +""" + default_path.write_text(content, encoding="utf-8") + + def reset_default(self) -> bool: + """Rewrite default.md from current prompts.py constants. + + Returns True on success, False if root_path was never set. + """ + if self._root_path is None: + return False + self.write_default(self._root_path, force=True) + # Invalidate cache so next access re-reads the file + self._loaded = False + self._agents = {} + return True diff --git a/src/core/audit_committer.py b/src/core/audit_committer.py index 5e8b916..e89550a 100644 --- a/src/core/audit_committer.py +++ b/src/core/audit_committer.py @@ -1,3 +1,4 @@ +import subprocess from pathlib import Path from src.utils.binary_detector import is_binary_file @@ -90,3 +91,63 @@ def commit(self, snapshot_id: int, approved: bool = True) -> str: except Exception as e: return f"写入失败: {e.__class__.__name__}({e})" + + def commit_shell(self, shell_id: int, approved: bool = True) -> str: + """审核 Shell 命令: 通过后执行, 拒绝则忽略. + + Args: + shell_id: Shell 审核记录 ID + approved: True=批准执行, False=拒绝 + + Returns: + 操作结果消息 + """ + db = self.workspace.db + + shell = db.get_shell_by_id(shell_id) + if shell is None: + return f"Shell 命令不存在: {shell_id}" + + (_id, command, _, _ts, _session_id, audit_status, _output, _exit_code, _executed_at) = shell + + if audit_status != "PENDING_AUDIT": + return f"Shell 命令已处理 (当前状态: {audit_status})" + + if not approved: + db.update_shell_audit(shell_id, "REJECTED") + return f"已拒绝 Shell 命令 (id={shell_id})" + + # 批准 -- 执行命令 + try: + result = subprocess.run( + command, + capture_output=True, + text=True, + timeout=300, + cwd=str(self.workspace.root_path), + shell=True, + ) + + output_parts = [] + if result.stdout: + output_parts.append(result.stdout.rstrip("\n")) + if result.stderr: + output_parts.append(result.stderr.rstrip("\n")) + + output = "\n".join(output_parts) if output_parts else "(no output)" + exit_code = result.returncode + + # 截断超长输出防止 DB 膨胀 + if len(output) > 10000: + output = output[:10000] + "\n... (output truncated at 10000 chars)" + + db.update_shell_audit(shell_id, "APPROVED", output=output, exit_code=exit_code) + + return f"已批准并执行 Shell 命令 (id={shell_id})\nExit code: {exit_code}\nOutput:\n{output}" + + except subprocess.TimeoutExpired: + db.update_shell_audit(shell_id, "APPROVED", output="TIMEOUT: 命令执行超时(300s)", exit_code=-1) + return f"Shell 命令执行超时 (id={shell_id})" + except Exception as e: + db.update_shell_audit(shell_id, "APPROVED", output=f"ERROR: {e}", exit_code=-1) + return f"Shell 命令执行失败: {e}" diff --git a/src/core/config_manager.py b/src/core/config_manager.py new file mode 100644 index 0000000..8acc952 --- /dev/null +++ b/src/core/config_manager.py @@ -0,0 +1,244 @@ +"""配置管理器 - 统一管理应用程序配置.""" + +from __future__ import annotations + +import json +import threading +from pathlib import Path +from typing import Any, ClassVar + +from src.core.database_manager import DatabaseManager + +# 默认环境变量配置(与 .env.example 保持一致) +DEFAULT_ENVS = { + "TOOL_MAX_DOC_LENGTH": {"value": "360", "description": "工具文档最大长度(字符数)"}, + "TOOL_MAX_FUNC_NAME_LENGTH": {"value": "80", "description": "函数名最大长度(字符数)"}, + "TOOL_MAX_RESULT_LENGTH": {"value": "30000", "description": "结果输出最大长度(字符数)"}, + "TOOL_LIST_TRUNCATE_THRESHOLD": {"value": "100", "description": "列表截断阈值(项目数量上限)"}, + "TOOL_DICT_TRUNCATE_THRESHOLD": {"value": "100", "description": "字典截断阈值(键值对数量上限)"}, + "RESULT_EXPIRE_MINUTES": {"value": "5", "description": "结果过期时间(分钟)"}, + "RESULT_CLEANUP_MINUTES": {"value": "15", "description": "清理任务间隔时间(分钟)"}, + "MANUALAID_AUTO_COPY": {"value": "true", "description": "是否自动复制结果(支持 true/false/yes/no/on/off)"}, + "SESSION_UPDATE_INTERVAL": {"value": "30", "description": "会话持续时间持久化间隔(秒)"}, + "SESSION_FLAG_CHECK_INTERVAL": {"value": "5", "description": "会话标志检查间隔(秒)"}, +} + +# 默认配置值 +DEFAULTS: dict[str, Any] = { + # Skill 配置 + "skills.disabled": [], + # 通用配置 + "general.theme": "dark", + "general.log_level": "INFO", +} + +for __k, __v in DEFAULT_ENVS.items(): + DEFAULTS[f"env.{__k}"] = __v + + +def _parse_value(value: str) -> Any: + """解析配置值. + + 尝试解析为 JSON,失败则返回原始字符串. + + Args: + value: 原始字符串值 + + Returns: + 解析后的值 + """ + try: + return json.loads(value) + except json.JSONDecodeError, TypeError: + return value + + +def _serialize_value(value: Any) -> str: + """序列化配置值. + + Args: + value: 配置值 + + Returns: + 序列化后的字符串 + """ + if isinstance(value, str): + return value + return json.dumps(value, ensure_ascii=False) + + +class ConfigManager: + """配置管理器(单例模式). + + 提供统一的配置访问接口,支持多种配置类型: + - 环境变量配置 + - Skill 配置 + - 通用配置 + """ + + _instance: ClassVar[ConfigManager | None] = None + _instance_lock: ClassVar[threading.Lock] = threading.Lock() + + def __new__(cls) -> ConfigManager: + with cls._instance_lock: + if cls._instance is None: + instance = super().__new__(cls) + instance._initialized = False + cls._instance = instance + return cls._instance + + def __init__(self) -> None: + if self._initialized: + return + + self._db: DatabaseManager | None = None + self._cache: dict[str, Any] = {} + self._initialized = True + + def initialize(self, workspace_root: Path) -> None: + """初始化配置管理器. + + Args: + workspace_root: 工作区根目录 + """ + self._db = DatabaseManager(str(workspace_root)) + self._cache.clear() + self._load_from_db() + + def _load_from_db(self) -> None: + """从数据库加载所有配置到缓存.""" + if self._db is None: + return + + rows = self._db.get_all_config() + for key, value, _category, _updated_at in rows: + self._cache[key] = _parse_value(value) + + def get(self, key: str, default: Any = None) -> Any: + """获取配置值. + + Args: + key: 配置键 + default: 默认值 + + Returns: + 配置值 + """ + # 优先从缓存读取 + if key in self._cache: + return self._cache[key] + + # 尝试从默认值获取 + if key in DEFAULTS: + return DEFAULTS[key] + + return default + + def set(self, key: str, value: Any, category: str = "general") -> None: + """设置配置值. + + Args: + key: 配置键 + value: 配置值 + category: 配置类别 + """ + serialized = _serialize_value(value) + self._cache[key] = value + + if self._db: + self._db.set_config(key, serialized, category) + + def delete(self, key: str) -> None: + """删除配置值. + + Args: + key: 配置键 + """ + if key in self._cache: + del self._cache[key] + + if self._db: + self._db.delete_config(key) + + def get_category(self, category: str) -> dict[str, Any]: + """获取指定类别的所有配置. + + Args: + category: 配置类别 + + Returns: + 配置字典 + """ + if self._db is None: + return {} + + rows = self._db.get_all_config(category) + return {row[0]: _parse_value(row[1]) for row in rows} + + def get_env_configs(self) -> dict[str, str]: + """获取所有环境变量配置. + + Returns: + 环境变量配置字典 + """ + return {k[4:]: v for k, v in self._cache.items() if k.startswith("env.")} + + def set_env_config(self, key: str, value: str) -> None: + """设置环境变量配置. + + Args: + key: 环境变量名(不含 env. 前缀) + value: 配置值 + """ + self.set(f"env.{key}", value, category="env") + + def get_env_config(self, key: str, default: str = "") -> str: + """获取环境变量配置. + + Args: + key: 环境变量名(不含 env. 前缀) + default: 默认值 + + Returns: + 配置值 + """ + return str(self.get(f"env.{key}", default)) + + def apply_env_configs(self) -> None: + """应用环境变量配置到 os.environ.""" + import os + + env_configs = self.get_env_configs() + for key, value in env_configs.items(): + if value: # 只设置非空值 + os.environ[key] = str(value) + + # -- Skill 配置快捷方法 -- + + def get_disabled_skills(self) -> set[str]: + """获取禁用的 Skill 列表. + + Returns: + 禁用的 Skill 名称集合 + """ + if self._db: + return self._db.get_disabled_skills() + return set() + + def set_disabled_skills(self, *names: str) -> None: + """设置禁用的 Skill 列表. + + Args: + names: 禁用的 Skill 名称集合 + """ + if self._db: + self._db.set_disabled_skills(*names) + + @classmethod + def reset_instance(cls) -> None: + """重置单例实例(用于测试).""" + with cls._instance_lock: + if cls._instance is not None: + cls._instance._cache.clear() + cls._instance._db = None + cls._instance = None diff --git a/src/core/database_manager.py b/src/core/database_manager.py index e3a7f41..64b2fe6 100644 --- a/src/core/database_manager.py +++ b/src/core/database_manager.py @@ -1,4 +1,5 @@ import contextlib +import json import sqlite3 import threading import time @@ -114,6 +115,26 @@ def _init_tables(self) -> None: PRIMARY KEY (session_id, func_name, kwargs_json), FOREIGN KEY (session_id) REFERENCES sessions(id) ); + + CREATE TABLE IF NOT EXISTS config ( + key TEXT PRIMARY KEY, + value TEXT NOT NULL, + category TEXT NOT NULL DEFAULT 'general', + updated_at REAL NOT NULL + ); + + CREATE TABLE IF NOT EXISTS shell_audit ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + command TEXT NOT NULL, + description TEXT NOT NULL DEFAULT '', + timestamp REAL NOT NULL, + session_id INTEGER, + audit_status TEXT NOT NULL DEFAULT 'PENDING_AUDIT', + output TEXT NOT NULL DEFAULT '', + exit_code INTEGER, + executed_at REAL, + FOREIGN KEY (session_id) REFERENCES sessions(id) + ); """ ) @@ -132,6 +153,19 @@ def _init_tables(self) -> None: if not any(row[1] == "deleted" for row in conn.execute("PRAGMA table_info(sessions)")): conn.execute("ALTER TABLE sessions ADD COLUMN deleted INTEGER NOT NULL DEFAULT 0") + # Phase 6 migration: add config table + if not any(row[1] == "key" for row in conn.execute("PRAGMA table_info(config)")): + conn.execute( + """ + CREATE TABLE IF NOT EXISTS config ( + key TEXT PRIMARY KEY, + value TEXT NOT NULL, + category TEXT NOT NULL DEFAULT 'general', + updated_at REAL NOT NULL + ) + """ + ) + # Create all indexes after migrations so they apply to both # fresh databases and those upgraded from older schemas. conn.execute("CREATE INDEX IF NOT EXISTS idx_tool_calls_session ON tool_calls(session_id)") @@ -141,6 +175,8 @@ def _init_tables(self) -> None: "CREATE INDEX IF NOT EXISTS idx_file_read_records_session_path ON file_read_records(session_id, file_path)" ) conn.execute("CREATE INDEX IF NOT EXISTS idx_tool_call_summaries_session ON tool_call_summaries(session_id)") + conn.execute("CREATE INDEX IF NOT EXISTS idx_shell_audit_status ON shell_audit(audit_status)") + conn.execute("CREATE INDEX IF NOT EXISTS idx_shell_audit_session ON shell_audit(session_id)") @staticmethod def _migrate_add_pending_content(conn: sqlite3.Connection) -> None: @@ -257,7 +293,7 @@ def is_session_orphaned(self, session_id: int) -> bool: 检查一个会话是否在任何相关表中都没有关联数据 """ - tables = ["tool_calls", "file_read_records", "file_snapshots", "tool_call_summaries"] + tables = ["tool_calls", "file_read_records", "file_snapshots", "tool_call_summaries", "shell_audit"] for table in tables: row = self.fetchone( f"SELECT COUNT(*) FROM {table} WHERE session_id = ?", @@ -383,6 +419,102 @@ def get_snapshots_by_audit_status(self, status: str) -> list[tuple]: (status,), ) + # -- Shell command audit -- + + def record_shell_command( + self, + command: str, + description: str = "", + session_id: int | None = None, + ) -> int: + """记录一条待审核的 Shell 命令. + + Args: + command: Shell 命令内容 + description: 命令描述 + session_id: 会话 ID + + Returns: + 新记录的 ID + """ + cursor = self.execute( + "INSERT INTO shell_audit (command, description, timestamp, session_id, audit_status) " + "VALUES (?, ?, ?, ?, 'PENDING_AUDIT')", + (command, description, time.time(), session_id), + ) + return cursor.lastrowid + + def update_shell_audit( + self, + shell_id: int, + audit_status: str, + output: str = "", + exit_code: int | None = None, + ) -> None: + """更新 Shell 命令审核状态及执行结果. + + Args: + shell_id: 记录 ID + audit_status: 审核状态 (APPROVED/REJECTED) + output: 命令执行输出 + exit_code: 命令退出码 + """ + if output or exit_code is not None: + self.execute( + "UPDATE shell_audit SET audit_status = ?, output = ?, exit_code = ?, executed_at = ? WHERE id = ?", + (audit_status, output, exit_code, time.time(), shell_id), + ) + else: + self.execute( + "UPDATE shell_audit SET audit_status = ? WHERE id = ?", + (audit_status, shell_id), + ) + + def get_shell_pending_audits(self) -> list[tuple]: + """获取所有待审核的 Shell 命令. + + Returns: + 待审核记录列表 (id, command, description, timestamp, session_id, audit_status) + """ + return self.fetchall( + "SELECT id, command, description, timestamp, session_id, audit_status" + " FROM shell_audit WHERE audit_status = 'PENDING_AUDIT'" + ) + + def get_shell_by_id(self, shell_id: int) -> tuple | None: + """根据 ID 获取 Shell 命令审核记录. + + Args: + shell_id: 记录 ID + + Returns: + 记录元组或 None + """ + return self.fetchone( + "SELECT id, command, description, timestamp, session_id," + " audit_status, output, exit_code, executed_at" + " FROM shell_audit WHERE id = ?", + (shell_id,), + ) + + def get_shell_completed(self, limit: int = 200) -> list[tuple]: + """获取所有已完成的 Shell 命令(已批准/已拒绝),按执行时间倒序. + + Args: + limit: 最大返回条数 + + Returns: + 已完成记录列表, 每条含 (id, command, description, timestamp, + session_id, audit_status, output, exit_code, executed_at) + """ + return self.fetchall( + "SELECT id, command, description, timestamp, session_id," + " audit_status, output, exit_code, executed_at" + " FROM shell_audit WHERE audit_status != 'PENDING_AUDIT'" + " ORDER BY COALESCE(executed_at, timestamp) DESC LIMIT ?", + (limit,), + ) + # -- Session statistics and management -- def get_session_summary(self, session_id: int) -> dict: @@ -428,6 +560,7 @@ def delete_session(self, session_id: int) -> None: conn.execute("DELETE FROM tool_calls WHERE session_id = ?", (session_id,)) conn.execute("DELETE FROM file_snapshots WHERE session_id = ?", (session_id,)) conn.execute("DELETE FROM file_read_records WHERE session_id = ?", (session_id,)) + conn.execute("DELETE FROM shell_audit WHERE session_id = ?", (session_id,)) conn.execute("DELETE FROM sessions WHERE id = ?", (session_id,)) conn.execute("COMMIT") except Exception: @@ -486,3 +619,98 @@ def get_tool_call_summaries(self, session_id: int) -> list[tuple]: "FROM tool_call_summaries WHERE session_id = ? ORDER BY timestamp DESC", (session_id,), ) + + # -- Configuration management -- + + def get_config(self, key: str, default: str | None = None) -> str | None: + """Get a configuration value by key. + + Args: + key: Configuration key + default: Default value if key not found + + Returns: + Configuration value or default + """ + row = self.fetchone("SELECT value FROM config WHERE key = ?", (key,)) + return row[0] if row else default + + def set_config(self, key: str, value: str, category: str = "general") -> None: + """Set a configuration value. + + Args: + key: Configuration key + value: Configuration value + category: Configuration category (general, skill, env, etc.) + """ + self.execute( + "INSERT INTO config (key, value, category, updated_at) VALUES (?, ?, ?, ?) " + "ON CONFLICT(key) DO UPDATE SET value = excluded.value, category = excluded.category, " + "updated_at = excluded.updated_at", + (key, value, category, time.time()), + ) + + def delete_config(self, key: str) -> None: + """Delete a configuration value. + + Args: + key: Configuration key + """ + self.execute("DELETE FROM config WHERE key = ?", (key,)) + + def get_all_config(self, category: str | None = None) -> list[tuple]: + """Get all configuration values, optionally filtered by category. + + Args: + category: Optional category filter + + Returns: + List of (key, value, category, updated_at) tuples + """ + if category: + return self.fetchall( + "SELECT key, value, category, updated_at FROM config WHERE category = ? ORDER BY key", + (category,), + ) + return self.fetchall("SELECT key, value, category, updated_at FROM config ORDER BY category, key") + + def get_config_by_prefix(self, prefix: str) -> dict[str, str]: + """Get all configuration values with a given key prefix. + + Args: + prefix: Key prefix to filter by + + Returns: + Dictionary of key-value pairs + """ + rows = self.fetchall( + "SELECT key, value FROM config WHERE key LIKE ? ORDER BY key", + (f"{prefix}%",), + ) + return {row[0]: row[1] for row in rows} + + # -- Skill configuration shortcuts -- + + def get_disabled_skills(self) -> set[str]: + """Get the set of disabled skill names. + + Returns: + Set of disabled skill names + """ + value = self.get_config("skills.disabled") + if not value: + return set() + import json + + try: + return set(json.loads(value)) + except json.JSONDecodeError, TypeError: + return set() + + def set_disabled_skills(self, names) -> None: + """Set the disabled skill names. + + Args: + names: Collection of skill names to disable (set, list, or tuple) + """ + self.set_config("skills.disabled", json.dumps(sorted(set(names))), category="skill") diff --git a/src/core/skill_manager.py b/src/core/skill_manager.py new file mode 100644 index 0000000..248f737 --- /dev/null +++ b/src/core/skill_manager.py @@ -0,0 +1,257 @@ +"""Skill 发现和管理服务.""" + +from __future__ import annotations + +import threading +from pathlib import Path +from typing import ClassVar + +from src.models.skill import SkillInfo + + +class SkillManager: + """Skill 发现和管理服务(单例模式). + + 负责从多个位置发现 Skill,并提供查询和加载功能. + """ + + _instance: ClassVar[SkillManager | None] = None + _instance_lock: ClassVar[threading.Lock] = threading.Lock() + + # Skill 发现路径模板 + GLOBAL_PATHS: ClassVar[list[str]] = [ + "~/.claude/skills", + "~/.agents/skills", + ] + + PROJECT_PATHS: ClassVar[list[str]] = [ + ".claude/skills", + ".agents/skills", + ".ManualAid/skills", + ".opencode/skill", + ".opencode/skills", + ] + + def __new__(cls) -> SkillManager: + with cls._instance_lock: + if cls._instance is None: + instance = super().__new__(cls) + instance._initialized = False + cls._instance = instance + return cls._instance + + def __init__(self) -> None: + if self._initialized: + return + + self._skills: dict[str, SkillInfo] = {} + self._disabled_skills: set[str] = set() + self._workspace_root: Path | None = None + self._initialized = True + + def discover(self, workspace_root: Path | None = None) -> dict[str, SkillInfo]: + """发现所有可用的 Skill. + + Args: + workspace_root: 工作区根目录,用于发现项目级 Skill + + Returns: + Skill 名称到 SkillInfo 的映射 + """ + self._skills.clear() + self._workspace_root = workspace_root + + # 1. 从数据库加载禁用的 Skill 列表 + if workspace_root: + self._load_disabled_from_db(workspace_root) + + # 2. 发现全局 Skill + for path_template in self.GLOBAL_PATHS: + skills_dir = Path(path_template).expanduser() + if skills_dir.is_dir(): + self._discover_in_dir(skills_dir, is_global=True) + + # 3. 发现项目级 Skill + if workspace_root: + for relative_path in self.PROJECT_PATHS: + skills_dir = workspace_root / relative_path + if skills_dir.is_dir(): + self._discover_in_dir(skills_dir, is_global=False) + + return self._skills + + def _load_disabled_from_db(self, workspace_root: Path) -> None: + """从数据库加载禁用的 Skill 列表. + + Args: + workspace_root: 工作区根目录 + """ + try: + from src.core.database_manager import DatabaseManager + + db = DatabaseManager(str(workspace_root)) + self._disabled_skills = db.get_disabled_skills() + except Exception: + self._disabled_skills = set() + + def save_disabled_to_db(self, workspace_root: Path | None = None) -> None: + """保存禁用的 Skill 列表到数据库. + + Args: + workspace_root: 工作区根目录,如果为 None 则使用缓存的值 + """ + root = workspace_root or self._workspace_root + if root is None: + return + + try: + from src.core.database_manager import DatabaseManager + + db = DatabaseManager(str(root)) + db.set_disabled_skills(self._disabled_skills) + except Exception: + pass + + def _discover_in_dir(self, skills_dir: Path, is_global: bool = True) -> None: + """在指定目录中发现 Skill. + + Args: + skills_dir: Skill 目录 + is_global: 是否为全局目录 + """ + try: + for item in skills_dir.iterdir(): + if item.is_dir(): + skill_info = SkillInfo.from_dir(item) + if skill_info: + skill_info.metadata["is_global"] = is_global + # 如果已存在同名 Skill,项目级覆盖全局 + if skill_info.name in self._skills: + existing = self._skills[skill_info.name] + # 项目级优先 + if not existing.metadata.get("is_global", True): + continue + # 应用禁用状态 + if skill_info.name in self._disabled_skills: + skill_info.enabled = False + self._skills[skill_info.name] = skill_info + except Exception: + pass + + def get(self, name: str) -> SkillInfo | None: + """获取指定名称的 Skill. + + Args: + name: Skill 名称 + + Returns: + SkillInfo 实例,如果不存在则返回 None + """ + return self._skills.get(name) + + def get_all(self) -> dict[str, SkillInfo]: + """获取所有 Skill. + + Returns: + Skill 名称到 SkillInfo 的映射 + """ + return self._skills.copy() + + def get_enabled(self) -> dict[str, SkillInfo]: + """获取所有启用的 Skill. + + Returns: + Skill 名称到 SkillInfo 的映射 + """ + return {name: skill for name, skill in self._skills.items() if skill.enabled} + + def set_disabled(self, names: set[str], persist: bool = True) -> None: + """设置禁用的 Skill 列表. + + Args: + names: 要禁用的 Skill 名称集合 + persist: 是否持久化到数据库 + """ + self._disabled_skills = names.copy() + for skill in self._skills.values(): + skill.enabled = skill.name not in self._disabled_skills + + if persist: + self.save_disabled_to_db() + + def get_disabled(self) -> set[str]: + """获取禁用的 Skill 名称集合. + + Returns: + 禁用的 Skill 名称集合 + """ + return self._disabled_skills.copy() + + def format_skills_list(self, verbose: bool = False) -> str: + """格式化 Skill 列表为字符串. + + Args: + verbose: 是否输出详细信息 + + Returns: + 格式化后的字符串 + """ + if not self._skills: + return "No skills available." + + if verbose: + lines = [""] + for skill in sorted(self._skills.values(), key=lambda s: s.name): + lines.append(" ") + lines.append(f" {skill.name}") + lines.append(f" {skill.description}") + lines.append(" ") + lines.append("") + return "\n".join(lines) + else: + lines = ["## Available Skills"] + for skill in sorted(self._skills.values(), key=lambda s: s.name): + status = "" if skill.enabled else " [DISABLED]" + lines.append(f"- **{skill.name}**: {skill.description}{status}") + return "\n".join(lines) + + def load_skill_content(self, name: str) -> str | None: + """加载 Skill 的完整内容(用于注入到提示词). + + Args: + name: Skill 名称 + + Returns: + Skill 内容字符串,如果不存在则返回 None + """ + skill = self.get(name) + if not skill: + return None + + lines = [ + f'', + f"# Skill: {skill.name}", + "", + skill.content.strip(), + "", + f"Base directory for this skill: {skill.location}", + "", + ] + + for filename in skill.files: + lines.append(f" - {filename}") + + lines.append("") + lines.append("") + + return "\n".join(lines) + + @classmethod + def reset_instance(cls) -> None: + """重置单例实例(用于测试).""" + with cls._instance_lock: + if cls._instance is not None: + cls._instance._skills.clear() + cls._instance._disabled_skills.clear() + cls._instance._workspace_root = None + cls._instance = None diff --git a/src/core/tool_registry.py b/src/core/tool_registry.py index 55f8650..aed8639 100644 --- a/src/core/tool_registry.py +++ b/src/core/tool_registry.py @@ -85,6 +85,8 @@ def register(self, workspace: Workspace) -> None: from src.workspace.tools.ls_tool import LsTool from src.workspace.tools.read_tool import ReadTool from src.workspace.tools.regex_search_tool import RegexSearchTool + from src.workspace.tools.shell_tool import ShellTool + from src.workspace.tools.skill_tool import SkillTool from src.workspace.tools.stat_tool import StatTool from src.workspace.tools.symbol_ref_tool import SymbolRefTool from src.workspace.tools.write_tool import WriteTool @@ -97,11 +99,13 @@ def register(self, workspace: Workspace) -> None: LsTool, ReadTool, RegexSearchTool, + ShellTool, WriteTool, StatTool, SymbolRefTool, EditTool, GitTool, + SkillTool, ): try: tool = cls(workspace) diff --git a/src/models/agent.py b/src/models/agent.py new file mode 100644 index 0000000..9f064ca --- /dev/null +++ b/src/models/agent.py @@ -0,0 +1,28 @@ +from __future__ import annotations + +from dataclasses import dataclass, field + + +@dataclass +class ToolPermissions: + whitelist: list[str] = field(default_factory=list) + blacklist: list[str] = field(default_factory=list) + + def is_tool_allowed(self, tool_name: str) -> bool: + """判定工具是否应注入到 /ws 输出. + + Priority: blacklist first, then whitelist. + Empty whitelist = allow all. + """ + if tool_name in self.blacklist: + return False + return not self.whitelist or tool_name in self.whitelist + + +@dataclass +class AgentConfig: + name: str + description: str + tool_permissions: ToolPermissions + body_role: str = "" + body_workflow: str = "" diff --git a/src/models/skill.py b/src/models/skill.py new file mode 100644 index 0000000..1a7c2f2 --- /dev/null +++ b/src/models/skill.py @@ -0,0 +1,127 @@ +"""Skill 数据模型.""" + +from __future__ import annotations + +from contextlib import suppress +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any + +import yaml + + +@dataclass +class SkillInfo: + """Skill 信息模型. + + Attributes: + name: Skill 名称(来自目录名) + description: Skill 描述(来自 SKILL.md 第一行或 skill.txt) + location: Skill 所在目录的绝对路径 + content: SKILL.md 的完整内容 + files: Skill 目录中的其他文件列表(排除 SKILL.md) + enabled: 是否启用(用于配置) + """ + + name: str + description: str = "" + location: str = "" + content: str = "" + files: list[str] = field(default_factory=list) + enabled: bool = True + metadata: dict[str, Any] = field(default_factory=dict) + + @classmethod + def from_dir(cls, skill_dir: Path) -> SkillInfo | None: + """从目录加载 Skill 信息. + + Args: + skill_dir: Skill 目录路径 + + Returns: + SkillInfo 实例,如果目录无效则返回 None + """ + skill_md = skill_dir / "SKILL.md" + skill_txt = skill_dir / "skill.txt" + + # 必须有 SKILL.md 文件 + if not skill_md.exists(): + return None + + try: + content = skill_md.read_text(encoding="utf-8") + except Exception: + return None + + # 解析 YAML frontmatter + name = skill_dir.name # 默认使用目录名 + description = "" + + if content.startswith("---"): + # 解析 YAML frontmatter + parts = content.split("---", 2) + if len(parts) >= 3: + try: + frontmatter = yaml.safe_load(parts[1]) + if frontmatter: + name = frontmatter.get("name", name) + description = frontmatter.get("description", "") + except Exception: + pass + + # 如果没有从 frontmatter 获取到描述,尝试其他方式 + if not description: + # 优先从 skill.txt + if skill_txt.exists(): + with suppress(Exception): + description = skill_txt.read_text(encoding="utf-8").strip() + + if not description: + # 从 SKILL.md 第一行提取标题 + first_line = content.split("\n")[0] if content else "" + if first_line.startswith("#"): + description = first_line.lstrip("#").strip() + else: + description = first_line.strip() or skill_dir.name + + # 收集其他文件 + files: list[str] = [] + try: + for f in skill_dir.iterdir(): + if f.is_file() and f.name not in ("SKILL.md", "skill.txt"): + files.append(f.name) + except Exception: + pass + + return cls( + name=name, + description=description, + location=str(skill_dir), + content=content, + files=sorted(files), + enabled=True, + ) + + def to_dict(self) -> dict[str, Any]: + """转换为字典格式.""" + return { + "name": self.name, + "description": self.description, + "location": self.location, + "files": self.files, + "enabled": self.enabled, + "metadata": self.metadata, + } + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> SkillInfo: + """从字典创建实例.""" + return cls( + name=data.get("name", ""), + description=data.get("description", ""), + location=data.get("location", ""), + content=data.get("content", ""), + files=data.get("files", []), + enabled=data.get("enabled", True), + metadata=data.get("metadata", {}), + ) diff --git a/src/utils/binary_detector.py b/src/utils/binary_detector.py index cdbf2a8..39af0f7 100644 --- a/src/utils/binary_detector.py +++ b/src/utils/binary_detector.py @@ -83,6 +83,11 @@ ".vbs", # VBScript ".reg", # Windows Registry ".desktop", + # Godot + ".godot", + ".gd", + ".gd.uid", + ".tscn", } ) @@ -183,6 +188,9 @@ ".db", ".sqlite", ".sqlite3", + ".pdb", + ".pyd", + ".o", } ) diff --git a/src/workspace/exclusion_manager.py b/src/workspace/exclusion_manager.py new file mode 100644 index 0000000..7e8bbe3 --- /dev/null +++ b/src/workspace/exclusion_manager.py @@ -0,0 +1,177 @@ +"""统一排除管理器 —— 合并 gitignore、用户 ignore、默认排除规则. + +区分两类排除: +- 性能排除: 缓存/构建产物等不影响安全的目录 +- 安全排除: 隐私/凭据文件等不应被 AI 访问的路径 +""" + +from __future__ import annotations + +import os +import re +from pathlib import Path +from typing import ClassVar + +from src.workspace.gitignore_loader import is_ignored_by_gitignore, load_gitignore + + +class ExclusionManager: + """排除规则统一管理器 + + 聚合三类排除源: + 1. 默认排除(内置的缓存/构建/IDE 目录) + 2. .gitignore 规则(如项目中有 .gitignore 文件) + 3. 用户临时 ignore 参数 + + Args: + workspace_root: 工作区根目录 + """ + + # 性能排除 —— 缓存、构建产物、IDE 配置等 + PERFORMANCE_EXCLUSIONS: frozenset[str] = frozenset( + { + ".git", + "__pycache__", + "node_modules", + ".venv", + "venv", + "dist", + "build", + ".idea", + ".vscode", + ".ruff_cache", + ".pytest_cache", + ".mypy_cache", + ".hypothesis", + "htmlcov", + ".coverage", + "*.pyc", + "*.pyo", + ".eggs", + "*.egg-info", + ".tox", + ".nox", + ".svn", + ".hg", + ".bzr", + "target", # Rust build + ".next", # Next.js build + ".nuxt", # Nuxt build + ".output", # Nuxt output + } + ) + + # 安全排除 —— 需要精确匹配的敏感文件正则(与 SECURITY_EXCLUSIONS 合并后的唯一来源) + # 注意: (^|/) 前缀表示匹配路径开始或目录分隔符后; .* 前缀表示匹配任意位置的文件扩展名 + SENSITIVE_FILE_PATTERNS: ClassVar[list[str]] = [ + r"(^|/)\.env$", + r"(^|/)\.env\..+$", + r".*\.pem$", + r"(^|/)credentials\..*$", + r".*\.key$", + r".*\.cert$", + r"(^|/)id_rsa$", + r"(^|/)id_ed25519$", + r".*\.cred$", + r".*\.secret$", + r"(^|/)\.ManualAid[/\\].*\.db$", + ] + + def __init__(self, workspace_root: str | Path): + self._workspace_root = Path(workspace_root).resolve() + # 从 .gitignore 加载 + self._raw_gitignore_patterns: list[str] = [] + self._gitignore_exclude_res: list[re.Pattern] = [] + self._gitignore_negate_res: list[re.Pattern] = [] + + self._reload_gitignore() + + def _reload_gitignore(self) -> None: + """(重新)加载 .gitignore.""" + raw, exclude_res, negate_res = load_gitignore(self._workspace_root) + self._raw_gitignore_patterns = raw + self._gitignore_exclude_res = exclude_res + self._gitignore_negate_res = negate_res + + def _check_performance_exclusion(self, rel_path_str: str) -> bool: + """检查路径是否匹配性能排除规则(基于目录名)""" + # 将路径拆分为各层, 检查每层是否在排除集合中 + parts = rel_path_str.replace(os.sep, "/").split("/") + for part in parts: + # 检查部分匹配: "node_modules" 或通配匹配 + if part in self.PERFORMANCE_EXCLUSIONS: + return True + # 检查 *.xxx 模式 + for exclude in self.PERFORMANCE_EXCLUSIONS: + if exclude.startswith("*.") and part.endswith(exclude[1:]): + return True + return False + + def should_exclude_path(self, path: Path) -> bool: + """检查路径是否应被排除(全面检查) + + 依次检查: 默认排除目录名 → gitignore 规则 → 否定规则 + + Args: + path: 文件的绝对路径 + + Returns: + True 表示应排除 + """ + try: + rel_path = path.relative_to(self._workspace_root) + except ValueError: + # 在工作区外, 不在这里处理(由 PathValidator 处理) + return False + + rel_str = str(rel_path).replace(os.sep, "/") + + # 1. 性能排除: 检查所有父目录 + if self._check_performance_exclusion(rel_str): + return True + + # 2. gitignore 排除 + return is_ignored_by_gitignore(rel_str, self._gitignore_exclude_res, self._gitignore_negate_res) + + def merge_ignore_regexes(self, user_ignore: list[str] | None = None) -> list[re.Pattern]: + """合并默认排除 + gitignore + 用户 ignore 为正则列表 + + 用于 search_content 等需要正则匹配排除的场景 + + 敏感文件由 PathValidator 在写入/读取时拦截,搜索场景不额外过滤 + + Args: + user_ignore: 用户传入的忽略正则列表 + + Returns: + 编译后的正则列表 + """ + result: list[re.Pattern] = [] + + # 默认排除目录名 → 正则 + for excl in self.PERFORMANCE_EXCLUSIONS: + # 处理 *.pyc 类模式 + if excl.startswith("*."): + pat = excl[1:] # .pyc + result.append(re.compile(re.escape(pat) + "$")) + else: + # 匹配路径中的此目录名 + result.append(re.compile(r"(^|/)" + re.escape(excl) + r"(/|$)")) + + # gitignore 排除正则 + result.extend(self._gitignore_exclude_res) + + # 用户传入的 ignore 正则 + if user_ignore: + for ign in user_ignore: + try: + result.append(re.compile(ign)) + except re.error: + continue + + return result + + @property + def excluded_dir_names(self) -> set[str]: + """获取所有排除目录名集合(用于快速 in 检查)""" + return {d for d in self.PERFORMANCE_EXCLUSIONS if not d.startswith("*")} diff --git a/src/workspace/gitignore_loader.py b/src/workspace/gitignore_loader.py new file mode 100644 index 0000000..d1a136e --- /dev/null +++ b/src/workspace/gitignore_loader.py @@ -0,0 +1,178 @@ +"""Parse .gitignore files and convert patterns to regex for exclusion matching. + +已知局限性: +- 不支持嵌套 .gitignore(仅读取根目录下的 .gitignore) +- 不支持行尾 \\ 续行 +- 不支持 gitignore 扩展语法中的字符类(如 [abc] 会被错误转义) +- 否定模式的优先级处理与真实 Git 不一致: 当前实现将所有否定模式提升为最高优先级, + 而真实 Git 按行号顺序逐条处理(后出现的规则覆盖先出现的). + 当前行为对于 AI 工具场景偏安全(宁可少排除), 故保留此简化实现. +""" + +from __future__ import annotations + +import os +import re +from pathlib import Path + + +def _convert_gitignore_to_regex(pattern: str) -> str | None: + """将 .gitignore 模式转换为正则表达式. + + Args: + pattern: .gitignore 模式(如 *.log, build/, /foo) + + Returns: + 对应的正则表达式字符串, 如果模式无效则返回 None + """ + # 保留原始模式用于锚定判断 + original = pattern + is_dir_only = pattern.endswith("/") + if is_dir_only: + pattern = pattern.rstrip("/") + + # 处理否定模式(仅用于判断是否为目录模式, 不处理逻辑) + if pattern.startswith("!"): + pattern = pattern[1:] + + # 转义正则特殊字符, 再处理 gitignore 通配符 + # 先处理 ** (多级通配符) + parts = [] + i = 0 + while i < len(pattern): + if pattern[i : i + 2] == "**": + parts.append(".*") + i += 2 + elif pattern[i] == "*": + # 单级通配符, 不匹配路径分隔符 + parts.append(r"[^/]*") + i += 1 + elif pattern[i] == "?": + parts.append(r"[^/]") + i += 1 + elif pattern[i] in ".+^${}()|[]\\": + parts.append("\\" + pattern[i]) + i += 1 + else: + parts.append(pattern[i]) + i += 1 + + regex_str = "".join(parts) + + # 锚定: / 开头表示从根目录匹配, 否则匹配任意路径 + if original.startswith("/"): + regex_str = "^" + regex_str[1:] # 去掉开头的 / + elif original.startswith("!"): + # 处理否定模式 - 保持锚定逻辑不变 + regex_str = "^" + regex_str[1:] if original[1:].startswith("/") else "(^|/)" + regex_str + else: + regex_str = "(^|/)" + regex_str + + if is_dir_only: + regex_str += "(/.*)?$" + else: + regex_str += "$" + + return regex_str + + +def parse_gitignore(gitignore_path: str | Path) -> list[str]: + """解析 .gitignore 文件, 返回非否定排除模式列表. + + Args: + gitignore_path: .gitignore 文件路径 + + Returns: + 排除模式列表(目录名/通配符等原始 gitignore 格式) + """ + patterns: list[str] = [] + gitignore_path = Path(gitignore_path) + + if not gitignore_path.exists(): + return patterns + + try: + text = gitignore_path.read_text(encoding="utf-8") + except Exception: + return patterns + + for line in text.splitlines(): + stripped = line.strip() + + # 跳过空行和注释 + if not stripped or stripped.startswith("#"): + continue + + # 保留否定模式供外部处理, 返回原始行 + patterns.append(stripped) + + return patterns + + +def compile_gitignore_patterns(patterns: list[str]) -> tuple[list[re.Pattern], list[re.Pattern]]: + """将 gitignore 模式编译为正则表达式. + + Args: + patterns: 原始 gitignore 模式列表 + + Returns: + (排除正则列表, 否定排除正则列表) 的元组 + """ + exclude_res: list[re.Pattern] = [] + negate_res: list[re.Pattern] = [] + + for pattern in patterns: + if pattern.startswith("!"): + # 否定模式: 取消排除 + negate_regex = _convert_gitignore_to_regex(pattern) + if negate_regex: + try: + negate_res.append(re.compile(negate_regex)) + except re.error: + continue + else: + regex = _convert_gitignore_to_regex(pattern) + if regex: + try: + exclude_res.append(re.compile(regex)) + except re.error: + continue + + return exclude_res, negate_res + + +def is_ignored_by_gitignore(path: str | Path, exclude_res: list[re.Pattern], negate_res: list[re.Pattern]) -> bool: + """检查路径是否被 .gitignore 规则忽略. + + Args: + path: 要检查的相对路径(字符串形式) + exclude_res: 排除正则列表 + negate_res: 否定排除正则列表 + + Returns: + 是否应该被忽略 + """ + path_str = str(path).replace(os.sep, "/") + + # 先检查否定模式(优先级更高) + for negate_re in negate_res: + if negate_re.search(path_str): + return False + + # 再检查排除模式 + return any(exclude_re.search(path_str) for exclude_re in exclude_res) + + +def load_gitignore(workspace_root: str | Path) -> tuple[list[str], list[re.Pattern], list[re.Pattern]]: + """从工作区根目录加载 .gitignore. + + Args: + workspace_root: 工作区根目录 + + Returns: + (原始模式列表, 排除正则列表, 否定排除正则列表) + """ + gitignore_path = Path(workspace_root) / ".gitignore" + raw_patterns = parse_gitignore(gitignore_path) + exclude_res, negate_res = compile_gitignore_patterns(raw_patterns) + return raw_patterns, exclude_res, negate_res diff --git a/src/workspace/path_validator.py b/src/workspace/path_validator.py index 2a12d7d..99a4bee 100644 --- a/src/workspace/path_validator.py +++ b/src/workspace/path_validator.py @@ -1,5 +1,9 @@ import os +import re from pathlib import Path +from typing import ClassVar + +from src.workspace.exclusion_manager import ExclusionManager class WorkspaceBoundaryError(Exception): @@ -14,6 +18,12 @@ class PathNotFoundError(Exception): pass +class SensitiveFileError(Exception): + """访问敏感文件时抛出""" + + pass + + class PathValidator: """工作区路径安全校验器,防止路径遍历和符号链接逃逸 @@ -21,12 +31,17 @@ class PathValidator: workspace_root: 工作区根目录,默认为当前目录 """ + # 敏感文件匹配模式(从 ExclusionManager 统一来源引用) + SENSITIVE_FILE_PATTERNS: ClassVar[list[re.Pattern]] = [ + re.compile(p) for p in ExclusionManager.SENSITIVE_FILE_PATTERNS + ] + def __init__(self, workspace_root: str | Path = "."): """初始化路径验证器. Args: workspace_root: 工作区根目录路径,可以是字符串或 Path 对象 - 所有后续的路径验证都将以此目录为边界 + 所有后续的路径验证都将以此目录为基准 Raises: FileNotFoundError: 当 workspace_root 不存在时抛出 @@ -71,8 +86,19 @@ def resolve_path(self, target: str | Path) -> Path: if not str(resolved).startswith(str(self.root) + os.sep) and resolved != self.root: raise WorkspaceBoundaryError(f"路径越界: {target}") + # 敏感文件检查 + self._raise_if_sensitive(resolved, target) + return resolved + @classmethod + def _raise_if_sensitive(cls, resolved: Path, original_target: str | Path) -> None: + """检查路径是否匹配敏感文件模式.""" + resolved_str = str(resolved).replace(os.sep, "/") + for pattern in cls.SENSITIVE_FILE_PATTERNS: + if pattern.search(resolved_str): + raise SensitiveFileError(f"禁止访问敏感文件: {original_target}") + def create_file_with_parents(self, target: str | Path, content: str = "") -> Path: """在工作区内创建文件,自动创建所有不存在的父目录. diff --git a/src/workspace/tools/base_tool.py b/src/workspace/tools/base_tool.py index 70e6ab4..e958559 100644 --- a/src/workspace/tools/base_tool.py +++ b/src/workspace/tools/base_tool.py @@ -244,7 +244,7 @@ def handle_tool_exceptions(func) -> Callable[..., ToolResult]: """工具方法异常处理装饰器 —— 将异常转换为 ToolResult 失败结果""" from functools import wraps - from src.workspace.path_validator import PathNotFoundError, WorkspaceBoundaryError + from src.workspace.path_validator import PathNotFoundError, SensitiveFileError, WorkspaceBoundaryError @wraps(func) def wrapper(self, *args, **kwargs): @@ -269,13 +269,20 @@ def wrapper(self, *args, **kwargs): func_kwargs=kwargs, error=f"{err2.__class__.__name__}: {err2}", ) - except PermissionError as err3: + except SensitiveFileError as err3: return ToolResult( success=False, func_name=func.__name__, func_kwargs=kwargs, error=f"{err3.__class__.__name__}: {err3}", ) + except PermissionError as err4: + return ToolResult( + success=False, + func_name=func.__name__, + func_kwargs=kwargs, + error=f"{err4.__class__.__name__}: {err4}", + ) except Exception as err: return ToolResult( success=False, func_name=func.__name__, func_kwargs=kwargs, error=f"{err.__class__.__name__}: {err}" diff --git a/src/workspace/tools/exact_search_tool.py b/src/workspace/tools/exact_search_tool.py index 873df3a..8456b04 100644 --- a/src/workspace/tools/exact_search_tool.py +++ b/src/workspace/tools/exact_search_tool.py @@ -1,8 +1,8 @@ -import contextlib import re from pathlib import Path from src.models.tools.tool_result import ToolResult +from src.utils.binary_detector import is_binary_file from src.workspace.tools.base_tool import BaseTool from src.workspace.workspace import Workspace @@ -86,6 +86,7 @@ def __init__(self, workspace: Workspace): "limit": "最大匹配数量限制", "ignore": "忽略匹配正则的文件或文件夹列表", } + self._exclusion_manager = workspace.exclusion_manager @BaseTool.handle_tool_exceptions def exact_search( @@ -107,12 +108,8 @@ def exact_search( # 准备搜索字符串 search_string = pattern if case_sensitive else pattern.lower() - # 收集忽略模式 - ignore_patterns = [] - if ignore: - for ignore_pattern in ignore: - with contextlib.suppress(re.error): - ignore_patterns.append(re.compile(ignore_pattern)) + # 收集忽略模式: 合并默认排除 + 用户传入的 ignore + ignore_patterns = self._exclusion_manager.merge_ignore_regexes(ignore) # 搜索结果 results = [] @@ -121,12 +118,24 @@ def exact_search( warnings = [""] # 确定要搜索的文件列表(支持单文件或目录) - files_to_search = [search_path] if search_path.is_file() else list(search_path.rglob(file_pattern)) + files_to_search = ( + [search_path] + if search_path.is_file() + else [ + p + for p in search_path.rglob(file_pattern) + if p.is_file() and not self._exclusion_manager.should_exclude_path(p) + ] + ) # 遍历所有文件 for file_path in files_to_search: if not file_path.is_file(): continue + + if is_binary_file(file_path): + continue + # 检查是否达到限制 if total_matches >= limit: break diff --git a/src/workspace/tools/git_tool.py b/src/workspace/tools/git_tool.py index a0cc2eb..d64f4ce 100644 --- a/src/workspace/tools/git_tool.py +++ b/src/workspace/tools/git_tool.py @@ -133,20 +133,25 @@ def git(self, command_str: str) -> ToolResult: kwargs=locals().copy(), error=f"TimeoutExpired(Git 命令执行超时: {time_out_exception})" ) - # 5. 处理输出 + # 5. 处理输出 — 总是保留 stdout 和 stderr, 即使 returncode != 0 (如 git diff --exit-code) + output_parts = [] + if result.stdout: + output_parts.append(result.stdout.rstrip("\n")) + if result.stderr: + output_parts.append(result.stderr.rstrip("\n")) + if result.returncode != 0: stderr = (result.stderr or "").strip() + error_msg = f"Git command exited with code {result.returncode}" + if stderr: + error_msg += f":\n{stderr}" + # 保留 stdout 在 data 中, 同时返回 error return self.make_failed_response( kwargs=locals().copy(), - error=f"Git command failed (exit code {result.returncode})" + f":\n{stderr}" if stderr else "", + data="\n".join(output_parts) if output_parts else "(no output)", + error=error_msg, ) - # Combine stdout and stderr - output_parts = [] - if result.stdout: - output_parts.append(result.stdout.rstrip("\n")) - if result.stderr: - output_parts.append(result.stderr.rstrip("\n")) if result.stdout is None and not result.stderr: # HACK: subprocess.run with capture_output=True returns None for stdout # on this platform. Fallback: re-run with explicit PIPE (bytes mode). @@ -167,9 +172,14 @@ def git(self, command_str: str) -> ToolResult: output_parts.append(out.rstrip("\n")) if err: output_parts.append(err.rstrip("\n")) - if not output_parts and _result2.returncode != 0: + if _result2.returncode != 0: + error_msg2 = f"Git command exited with code {_result2.returncode}" + if err: + error_msg2 += f":\n{err}" return self.make_failed_response( - kwargs=locals().copy(), error=f"Git command failed (exit code {_result2.returncode})" + kwargs=locals().copy(), + data="\n".join(output_parts) if output_parts else "(no output)", + error=error_msg2, ) return self.make_success_response( diff --git a/src/workspace/tools/glob_tool.py b/src/workspace/tools/glob_tool.py index df158c0..f47274a 100644 --- a/src/workspace/tools/glob_tool.py +++ b/src/workspace/tools/glob_tool.py @@ -15,6 +15,7 @@ def __init__(self, workspace: Workspace): "path": "目录路径", "max_ret": "最多返回多少条检索结果", } + self._exclusion_manager = workspace.exclusion_manager @BaseTool.handle_tool_exceptions def glob(self, pattern: str, path: str = ".", max_ret: int = 1000) -> ToolResult: @@ -30,5 +31,6 @@ def glob(self, pattern: str, path: str = ".", max_ret: int = 1000) -> ToolResult data=[ f"{'[Folder]' if item.is_dir() else '[File]'} {item.relative_to(self.workspace.root_path)}" for item in islice(root_path.glob(pattern), max_ret) + if not self._exclusion_manager.should_exclude_path(item) ], ) diff --git a/src/workspace/tools/ls_tool.py b/src/workspace/tools/ls_tool.py index 33f035d..cbb176a 100644 --- a/src/workspace/tools/ls_tool.py +++ b/src/workspace/tools/ls_tool.py @@ -13,6 +13,7 @@ def __init__(self, workspace: Workspace): self.param_descriptions = { "path": "目录路径", } + self._exclusion_manager = workspace.exclusion_manager @BaseTool.handle_tool_exceptions def ls(self, path: str = ".") -> ToolResult: @@ -27,5 +28,6 @@ def ls(self, path: str = ".") -> ToolResult: data=[ f"{'[Folder]' if item.is_dir() else '[File]'} {item.relative_to(self.workspace.root_path)}" for item in folder_path.iterdir() + if not self._exclusion_manager.should_exclude_path(item) ], ) diff --git a/src/workspace/tools/regex_search_tool.py b/src/workspace/tools/regex_search_tool.py index a8b6549..7ec6fad 100644 --- a/src/workspace/tools/regex_search_tool.py +++ b/src/workspace/tools/regex_search_tool.py @@ -1,8 +1,8 @@ -import contextlib import re from pathlib import Path from src.models.tools.tool_result import ToolResult +from src.utils.binary_detector import is_binary_file from src.workspace.tools.base_tool import BaseTool from src.workspace.workspace import Workspace @@ -111,6 +111,7 @@ def __init__(self, workspace: Workspace): "limit": "最大匹配数量限制", "ignore": "忽略匹配正则的文件或文件夹列表", } + self._exclusion_manager = workspace.exclusion_manager @BaseTool.handle_tool_exceptions def regex_search( @@ -134,12 +135,8 @@ def regex_search( except re.error as e: return self.make_failed_response(kwargs=locals().copy(), error=f"无效的正则表达式: {e}") - # 收集忽略模式 - ignore_patterns = [] - if ignore: - for ignore_pattern in ignore: - with contextlib.suppress(re.error): - ignore_patterns.append(re.compile(ignore_pattern)) + # 收集忽略模式: 合并默认排除 + 用户传入的 ignore + ignore_patterns = self._exclusion_manager.merge_ignore_regexes(ignore) # 搜索结果 results = [] @@ -148,12 +145,24 @@ def regex_search( warnings = [""] # 确定要搜索的文件列表(支持单文件或目录) - files_to_search = [search_path] if search_path.is_file() else list(search_path.rglob(file_pattern)) + files_to_search = ( + [search_path] + if search_path.is_file() + else [ + p + for p in search_path.rglob(file_pattern) + if p.is_file() and not self._exclusion_manager.should_exclude_path(p) + ] + ) # 遍历文件 for file_path in files_to_search: if not file_path.is_file(): continue + + if is_binary_file(file_path): + continue + # 检查是否达到限制 if total_matches >= limit: break diff --git a/src/workspace/tools/shell_tool.py b/src/workspace/tools/shell_tool.py new file mode 100644 index 0000000..e2bcfdb --- /dev/null +++ b/src/workspace/tools/shell_tool.py @@ -0,0 +1,53 @@ +"""Shell 命令执行工具 — 审核通过后执行.""" + +from src.models.tools.tool_result import ToolResult +from src.workspace.tools.base_tool import BaseTool +from src.workspace.workspace import Workspace + + +class ShellTool(BaseTool): + """Shell 命令执行工具. + + 所有 Shell 命令均需经过审核机制: + 1. 调用时创建 PENDING_AUDIT 记录, 不立即执行 + 2. 审核 UI 中展示待审核命令 + 3. 审核通过后实际执行命令并记录输出 + """ + + def __init__(self, workspace: Workspace): + super().__init__(workspace, "shell", self.shell.__doc__, write_permission=True) + self.func = self.shell + self.params = BaseTool.extract_params(self.shell) + self.param_descriptions = { + "command": "要执行的 Shell 命令", + "description": "命令描述, 说明命令的目的和作用, 帮助审核人员理解命令意图", + } + + @BaseTool.handle_tool_exceptions + def shell(self, command: str, description: str = "") -> ToolResult: + """ + 执行 Shell 命令(需审核通过) + """ + if not command or not command.strip(): + return self.make_failed_response(kwargs=locals().copy(), error=str(ValueError("command 不能为空"))) + + session_id = self.workspace.session_id + shell_id = self.workspace.db.record_shell_command( + command=command.strip(), + description=description.strip(), + session_id=session_id, + ) + + preview_parts = [ + "命令已提交审核系统, 审核通过后将自动执行", + "", + f"[Shell Preview] ID: {shell_id}", + f"Command: {command.strip()}", + ] + if description.strip(): + preview_parts.append(f"Description: {description.strip()}") + + return self.make_success_response( + kwargs=locals().copy(), + data="\n".join(preview_parts), + ) diff --git a/src/workspace/tools/skill_tool.py b/src/workspace/tools/skill_tool.py new file mode 100644 index 0000000..95c60b0 --- /dev/null +++ b/src/workspace/tools/skill_tool.py @@ -0,0 +1,75 @@ +"""Skill 工具实现.""" + +from __future__ import annotations + +from src.models.tools.tool_result import ToolResult +from src.workspace.tools.base_tool import BaseTool +from src.workspace.workspace import Workspace + + +class SkillTool(BaseTool): + """Skill 加载工具. + + 用于加载指定的 Skill 并将其内容注入到当前会话中. + """ + + def __init__(self, workspace: Workspace) -> None: + super().__init__( + workspace=workspace, + name="skill", + doc="当**当前任务**与系统提示中列出的某一技能相匹配时, 加载该专业技能" + "使用此工具将技能的指令和资源注入当前对话.输出内容可能包含详细的工作流程指导, " + "以及对该技能所在目录中的脚本、文件等的引用" + "技能名称必须与系统提示中列出的某一技能完全一致", + read_permission=True, + write_permission=False, + ) + self.func = self._execute + self.params = self.extract_params(self._execute) + self.param_descriptions = { + "name": "The name of the skill from available_skills", + } + + def _execute(self, name: str) -> ToolResult: + """执行 Skill 加载. + + Args: + name: Skill 名称 + + Returns: + ToolResult 包含 Skill 内容或错误信息 + """ + from src.core.skill_manager import SkillManager + + skill_manager = SkillManager() + + # 确保 Skill 已发现 + if not skill_manager.get_all(): + skill_manager.discover(self.workspace.root_path) + + skill = skill_manager.get(name) + + if skill is None: + return self.make_failed_response( + kwargs={"name": name}, + error=f'Skill "{name}" not found. Use a skill name from the available_skills list.', + ) + + if not skill.enabled: + return self.make_failed_response( + kwargs={"name": name}, + error=f'Skill "{name}" is disabled. Enable it in the configuration.', + ) + + content = skill_manager.load_skill_content(name) + + if content is None: + return self.make_failed_response( + kwargs={"name": name}, + error=f'Failed to load skill "{name}".', + ) + + return self.make_success_response( + kwargs={"name": name}, + data={"title": f"Loaded skill: {name}", "output": content}, + ) diff --git a/src/workspace/workspace.py b/src/workspace/workspace.py index 56699f6..295b2d6 100644 --- a/src/workspace/workspace.py +++ b/src/workspace/workspace.py @@ -5,11 +5,9 @@ from pathlib import Path from src.models.tool_error_response import ToolErrorResponse +from src.workspace.exclusion_manager import ExclusionManager from src.workspace.path_validator import PathNotFoundError, PathValidator, WorkspaceBoundaryError -# 默认排除的目录 后续改为从项目配置加载 -DEFAULT_EXCLUDED_DIRS = {".git", "__pycache__", "node_modules", ".venv", "venv", "dist", "build", ".idea", ".vscode"} - def _highlight_matches(line: str, regex: re.Pattern) -> str: """ @@ -47,6 +45,7 @@ def __init__(self, path: str): return self.root_path = Path(path).resolve() self.path_validator: PathValidator = PathValidator(self.root_path) + self.exclusion_manager: ExclusionManager = ExclusionManager(self.root_path) self.is_git_repo: bool = (self.root_path / ".git").is_dir() self.platform: str = sys.platform self.date: str = date.today().strftime("%y-%m-%d") @@ -84,8 +83,11 @@ def search_content( try: path = self.path_validator.validate(folder_path) - # 初始化排除目录集合 - exclude_set = set(exclude_dirs or DEFAULT_EXCLUDED_DIRS) + # 初始化排除目录集合: 合并默认排除 + 用户传入排除 + if exclude_dirs is not None: + exclude_set = set(exclude_dirs) | self.exclusion_manager.excluded_dir_names + else: + exclude_set = self.exclusion_manager.excluded_dir_names # 编译正则表达式 flags = 0 if case_sensitive else re.IGNORECASE @@ -218,14 +220,8 @@ def search_content_multi_pattern( try: path = self.path_validator.validate(folder_path) - # 预编译 ignore 正则 - ignore_res: list[re.Pattern] = [] - if ignore: - for ign in ignore: - try: - ignore_res.append(re.compile(ign)) - except re.error: - continue + # 预编译 ignore 正则: 合并默认排除 + 用户传入的 ignore + ignore_res: list[re.Pattern] = self.exclusion_manager.merge_ignore_regexes(ignore) # 收集文件(一次遍历) files_to_search: list[Path] = [] @@ -234,8 +230,6 @@ def search_content_multi_pattern( else: for file_path in path.rglob(file_pattern): if file_path.is_file(): - if any(p.name in DEFAULT_EXCLUDED_DIRS for p in file_path.parents): - continue rel = str(file_path.relative_to(self.root_path)) if any(ir.search(rel) for ir in ignore_res): continue diff --git a/tests/core/test_agent_manager.py b/tests/core/test_agent_manager.py new file mode 100644 index 0000000..bb4d794 --- /dev/null +++ b/tests/core/test_agent_manager.py @@ -0,0 +1,159 @@ +"""Tests for AgentManager and frontmatter parser.""" + +from pathlib import Path + +from src.core.agent_manager import AgentManager, _parse_agent_file, _parse_frontmatter + + +class TestParseFrontmatter: + def test_no_frontmatter(self): + content = "## Role\nhello" + meta, body = _parse_frontmatter(content) + assert meta == {} + assert "hello" in body + + def test_basic_frontmatter(self): + content = """--- +name: test-agent +description: A test agent +--- +## Role +Hello World +""" + meta, body = _parse_frontmatter(content) + assert meta["name"] == "test-agent" + assert meta["description"] == "A test agent" + assert "Hello World" in body + + def test_permissions_frontmatter(self): + content = """--- +name: restricted +description: Restricted agent +tool_permissions: + whitelist: + - read + - glob + blacklist: + - git +--- +## Role +test +""" + meta, _body = _parse_frontmatter(content) + assert meta["tool_permissions.whitelist"] == ["read", "glob"] + assert meta["tool_permissions.blacklist"] == ["git"] + + def test_empty_permissions(self): + content = """--- +name: empty +description: Empty perms +tool_permissions: + whitelist: [] + blacklist: [] +--- +## Role +test +""" + meta, _body = _parse_frontmatter(content) + whitelist = meta.get("tool_permissions.whitelist", []) + blacklist = meta.get("tool_permissions.blacklist", []) + assert whitelist == [], f"Expected [], got {whitelist!r}" + assert blacklist == [], f"Expected [], got {blacklist!r}" + + +class TestParseAgentFile: + def test_full_agent_file(self, tmp_path: Path): + md_file = tmp_path / "test-agent.md" + md_file.write_text( + """--- +name: test-agent +description: My test agent +tool_permissions: + whitelist: + - read + - glob + blacklist: + - git +--- + +## Role + +You are a test agent. + +## Workflow + +1. Test things. +2. Verify results. +""", + encoding="utf-8", + ) + agent = _parse_agent_file(md_file) + assert agent is not None + assert agent.name == "test-agent" + assert agent.description == "My test agent" + assert agent.tool_permissions.whitelist == ["read", "glob"] + assert agent.tool_permissions.blacklist == ["git"] + assert "You are a test agent." in agent.body_role + assert "Test things." in agent.body_workflow + + def test_invalid_file_returns_none(self, tmp_path: Path): + md_file = tmp_path / "invalid.md" + md_file.write_bytes(b"\x80\x81\x82") # invalid UTF-8 + agent = _parse_agent_file(md_file) + assert agent is None + + def test_no_role_section(self, tmp_path: Path): + md_file = tmp_path / "no-role.md" + md_file.write_text( + """--- +name: no-role +description: No role +--- +Some content without sections. +""", + encoding="utf-8", + ) + agent = _parse_agent_file(md_file) + assert agent is not None + assert agent.body_role == "" + assert agent.body_workflow == "" + + +class TestAgentManager: + def test_get_default_fallback(self, tmp_path): + """No agents dir -> get_default() returns a fallback AgentConfig.""" + mgr = AgentManager() + mgr.initialize(tmp_path) + default = mgr.get_default() + assert default.name == "default" + assert default.tool_permissions.whitelist == [] + + def test_switch_unknown_returns_false(self, tmp_path): + mgr = AgentManager() + mgr.initialize(tmp_path) + assert mgr.switch_agent("nonexistent") is False + + def test_write_default_creates_file(self, tmp_path): + mgr = AgentManager() + mgr.initialize(tmp_path) + mgr.write_default(tmp_path) + agents_dir = tmp_path / ".ManualAid" / "agents" + assert (agents_dir / "default.md").exists() + + def test_write_default_is_idempotent(self, tmp_path): + mgr = AgentManager() + mgr.initialize(tmp_path) + mgr.write_default(tmp_path) + content_first = (tmp_path / ".ManualAid" / "agents" / "default.md").read_text(encoding="utf-8") + mgr.write_default(tmp_path) # second call should be no-op + content_second = (tmp_path / ".ManualAid" / "agents" / "default.md").read_text(encoding="utf-8") + assert content_first == content_second + + def test_agent_names_sorted(self, tmp_path): + agents_dir = tmp_path / ".ManualAid" / "agents" + agents_dir.mkdir(parents=True) + (agents_dir / "z-agent.md").write_text("---\nname: z-agent\ndescription: Z\n---\n## Role\nx") + (agents_dir / "a-agent.md").write_text("---\nname: a-agent\ndescription: A\n---\n## Role\nx") + mgr = AgentManager() + mgr.initialize(tmp_path) + assert mgr.agent_names() == ["a-agent", "z-agent"] diff --git a/tests/models/test_agent.py b/tests/models/test_agent.py new file mode 100644 index 0000000..8ace23f --- /dev/null +++ b/tests/models/test_agent.py @@ -0,0 +1,59 @@ +"""Tests for agent data models.""" + +from src.models.agent import AgentConfig, ToolPermissions + + +def test_empty_permissions_allow_all(): + p = ToolPermissions() + assert p.is_tool_allowed("read") + assert p.is_tool_allowed("write") + assert p.is_tool_allowed("git") + # Explicit empty lists are equivalent to no arguments + p2 = ToolPermissions(whitelist=[], blacklist=[]) + assert p2.is_tool_allowed("anything") + + +def test_whitelist_restricts_tools(): + p = ToolPermissions(whitelist=["read", "glob", "ls"]) + assert p.is_tool_allowed("read") + assert p.is_tool_allowed("glob") + assert not p.is_tool_allowed("write") + assert not p.is_tool_allowed("git") + + +def test_blacklist_blocks_tools(): + p = ToolPermissions(blacklist=["git"]) + assert not p.is_tool_allowed("git") + assert p.is_tool_allowed("read") + + +def test_blacklist_overrides_whitelist(): + p = ToolPermissions(whitelist=["read", "git"], blacklist=["git"]) + assert p.is_tool_allowed("read") + assert not p.is_tool_allowed("git") + + +def test_agent_config_creation(): + """Verify AgentConfig stores fields correctly.""" + perms = ToolPermissions(whitelist=["read", "glob"]) + agent = AgentConfig( + name="test-agent", + description="A test agent", + tool_permissions=perms, + body_role="## Role\nYou are a test agent.", + body_workflow="## Workflow\n1. Do work.", + ) + assert agent.name == "test-agent" + assert agent.description == "A test agent" + assert agent.tool_permissions is perms + assert "You are a test agent." in agent.body_role + assert "Do work." in agent.body_workflow + + +def test_agent_config_defaults(): + """Verify AgentConfig defaults are empty strings.""" + agent = AgentConfig(name="minimal", description="Minimal", tool_permissions=ToolPermissions()) + assert agent.body_role == "" + assert agent.body_workflow == "" + assert agent.tool_permissions.whitelist == [] + assert agent.tool_permissions.blacklist == []