${escapeHtml(String(cfg.window_seconds))}s`,
+ );
+ html += probeItem(
+ t("probes.batcher_strategy"),
+ `${escapeHtml(cfg.strategy || "")}`,
+ );
+ html += probeItem(
+ t("probes.batcher_pending"),
+ String(mb.pending_buckets ?? 0),
+ );
+ html += probeItem(
+ t("probes.batcher_group"),
+ cfg.group_enabled ? "✓" : "✗",
+ );
+ html += probeItem(
+ t("probes.batcher_private"),
+ cfg.private_enabled ? "✓" : "✗",
+ );
+ html += probeItem(
+ t("probes.batcher_speculative"),
+ probeStatusBadge(cfg.speculative_enabled ? "ok" : "skipped"),
+ );
+ if (cfg.speculative_enabled) {
+ html += probeItem(
+ t("probes.batcher_pre_send"),
+ `${escapeHtml(String(cfg.pre_send_seconds))}s`,
+ );
+ }
+ html += ` None:
+ self.messages: list[tuple[int, str, bool]] = []
+
+ async def send_group_message(
+ self,
+ group_id: int,
+ message: str,
+ mark_sent: bool = False,
+ ) -> None:
+ self.messages.append((group_id, message, mark_sent))
+
+
+def _build_context(
+ *,
+ config: Any,
+ onebot: Any,
+ sender: _DummySender,
+) -> CommandContext:
+ stub = cast(Any, SimpleNamespace())
+ return CommandContext(
+ group_id=12345,
+ sender_id=54321,
+ config=cast(Any, config),
+ sender=cast(Any, sender),
+ ai=stub,
+ faq_storage=stub,
+ onebot=cast(Any, onebot),
+ security=stub,
+ queue_manager=None,
+ rate_limiter=None,
+ dispatcher=stub,
+ registry=stub,
+ )
+
+
+@pytest.mark.asyncio
+async def test_admin_ls_outputs_names_without_qq_leakage() -> None:
+ sender = _DummySender()
+ onebot = SimpleNamespace(
+ get_group_member_list=AsyncMock(
+ return_value=[
+ {"user_id": 10001, "card": "超管群名片", "nickname": "超管昵称"},
+ {"user_id": 10002, "card": "", "nickname": "群管理员"},
+ ]
+ ),
+ get_stranger_info=AsyncMock(return_value={"nickname": "QQ管理员"}),
+ )
+ config = SimpleNamespace(superadmin_qq=10001, admin_qqs=[10001, 10002, 10003])
+ context = _build_context(config=config, onebot=onebot, sender=sender)
+
+ await execute([], context)
+
+ assert sender.messages
+ output = sender.messages[-1][1]
+ assert "👑 超级管理员: 超管群名片" in output
+ assert "- 群管理员" in output
+ assert "- QQ管理员" in output
+ assert "10001" not in output
+ assert "10002" not in output
+ assert "10003" not in output
+ onebot.get_group_member_list.assert_awaited_once_with(12345)
+ onebot.get_stranger_info.assert_awaited_once_with(10003)
+
+
+@pytest.mark.asyncio
+async def test_admin_ls_falls_back_to_unknown_name_without_exposing_qq() -> None:
+ sender = _DummySender()
+ onebot = SimpleNamespace(
+ get_group_member_list=AsyncMock(side_effect=RuntimeError("boom")),
+ get_stranger_info=AsyncMock(return_value={}),
+ )
+ config = SimpleNamespace(superadmin_qq=20001, admin_qqs=[20001, 20002])
+ context = _build_context(config=config, onebot=onebot, sender=sender)
+
+ await execute([], context)
+
+ assert sender.messages
+ output = sender.messages[-1][1]
+ assert "未知成员" in output
+ assert "20001" not in output
+ assert "20002" not in output
+ assert onebot.get_stranger_info.await_args_list == [call(20001), call(20002)]
+
+
+def test_admin_requires_admin_permission() -> None:
+ dispatcher = CommandDispatcher(
+ config=cast(Any, SimpleNamespace()),
+ sender=cast(Any, _DummySender()),
+ ai=cast(Any, SimpleNamespace()),
+ faq_storage=cast(Any, SimpleNamespace()),
+ onebot=cast(Any, SimpleNamespace()),
+ security=cast(Any, SimpleNamespace(rate_limiter=None)),
+ )
+
+ meta = dispatcher.command_registry.resolve("admin")
+
+ assert meta is not None
+ assert meta.permission == "admin"
+
+
+def test_admin_subcommands_require_superadmin() -> None:
+ """子命令 add/del 必须覆盖为 superadmin;ls 继承顶层 admin。"""
+ dispatcher = CommandDispatcher(
+ config=cast(Any, SimpleNamespace()),
+ sender=cast(Any, _DummySender()),
+ ai=cast(Any, SimpleNamespace()),
+ faq_storage=cast(Any, SimpleNamespace()),
+ onebot=cast(Any, SimpleNamespace()),
+ security=cast(Any, SimpleNamespace(rate_limiter=None)),
+ )
+
+ meta = dispatcher.command_registry.resolve("admin")
+ assert meta is not None
+ subs = getattr(meta, "subcommands", {}) or {}
+ assert subs["add"].permission == "superadmin"
+ assert subs["del"].permission == "superadmin"
+ # ls 不显式覆盖:使用顶层 admin
+ assert subs["ls"].permission == "admin"
+
+
+@pytest.mark.asyncio
+async def test_admin_add_success_persists_via_config() -> None:
+ sender = _DummySender()
+ add_admin_calls: list[int] = []
+
+ class _Config:
+ superadmin_qq = 10001
+ admin_qqs = [10001]
+
+ def is_admin(self, qq: int) -> bool:
+ return qq in self.admin_qqs
+
+ def is_superadmin(self, qq: int) -> bool:
+ return qq == self.superadmin_qq
+
+ def add_admin(self, qq: int) -> bool:
+ add_admin_calls.append(qq)
+ self.admin_qqs.append(qq)
+ return True
+
+ def remove_admin(self, qq: int) -> bool: # pragma: no cover - 未触发
+ return False
+
+ config = _Config()
+ onebot = SimpleNamespace()
+ context = _build_context(config=config, onebot=onebot, sender=sender)
+
+ await execute(["add", "30001"], context)
+
+ assert add_admin_calls == [30001]
+ assert sender.messages
+ assert "已添加管理员: 30001" in sender.messages[-1][1]
+
+
+@pytest.mark.asyncio
+async def test_admin_add_rejects_duplicate_without_calling_config() -> None:
+ sender = _DummySender()
+ calls: list[int] = []
+
+ class _Config:
+ superadmin_qq = 10001
+ admin_qqs = [10001, 30001]
+
+ def is_admin(self, qq: int) -> bool:
+ return qq in self.admin_qqs
+
+ def is_superadmin(self, qq: int) -> bool:
+ return qq == self.superadmin_qq
+
+ def add_admin(self, qq: int) -> bool: # pragma: no cover - 不应触发
+ calls.append(qq)
+ return False
+
+ def remove_admin(self, qq: int) -> bool: # pragma: no cover - 不应触发
+ return False
+
+ config = _Config()
+ context = _build_context(config=config, onebot=SimpleNamespace(), sender=sender)
+
+ await execute(["add", "30001"], context)
+
+ assert calls == []
+ assert "已经是管理员" in sender.messages[-1][1]
+
+
+@pytest.mark.asyncio
+async def test_admin_add_rejects_non_numeric_qq() -> None:
+ sender = _DummySender()
+
+ class _Config:
+ superadmin_qq = 10001
+ admin_qqs = [10001]
+
+ def is_admin(self, qq: int) -> bool: # pragma: no cover - 不应触发
+ return False
+
+ def is_superadmin(self, qq: int) -> bool: # pragma: no cover
+ return False
+
+ def add_admin(self, qq: int) -> bool: # pragma: no cover - 不应触发
+ return False
+
+ def remove_admin(self, qq: int) -> bool: # pragma: no cover
+ return False
+
+ config = _Config()
+ context = _build_context(config=config, onebot=SimpleNamespace(), sender=sender)
+
+ await execute(["add", "abc"], context)
+
+ assert "QQ 号格式错误" in sender.messages[-1][1]
+
+
+@pytest.mark.asyncio
+async def test_admin_del_success_removes_admin() -> None:
+ sender = _DummySender()
+ removed: list[int] = []
+
+ class _Config:
+ superadmin_qq = 10001
+ admin_qqs = [10001, 30001]
+
+ def is_admin(self, qq: int) -> bool:
+ return qq in self.admin_qqs
+
+ def is_superadmin(self, qq: int) -> bool:
+ return qq == self.superadmin_qq
+
+ def add_admin(self, qq: int) -> bool: # pragma: no cover
+ return False
+
+ def remove_admin(self, qq: int) -> bool:
+ removed.append(qq)
+ self.admin_qqs.remove(qq)
+ return True
+
+ config = _Config()
+ context = _build_context(config=config, onebot=SimpleNamespace(), sender=sender)
+
+ await execute(["del", "30001"], context)
+
+ assert removed == [30001]
+ assert "已移除管理员: 30001" in sender.messages[-1][1]
+
+
+@pytest.mark.asyncio
+async def test_admin_del_refuses_to_remove_superadmin() -> None:
+ sender = _DummySender()
+ removed: list[int] = []
+
+ class _Config:
+ superadmin_qq = 10001
+ admin_qqs = [10001]
+
+ def is_admin(self, qq: int) -> bool:
+ return qq in self.admin_qqs
+
+ def is_superadmin(self, qq: int) -> bool:
+ return qq == self.superadmin_qq
+
+ def add_admin(self, qq: int) -> bool: # pragma: no cover
+ return False
+
+ def remove_admin(self, qq: int) -> bool: # pragma: no cover - 不应触发
+ removed.append(qq)
+ return True
+
+ config = _Config()
+ context = _build_context(config=config, onebot=SimpleNamespace(), sender=sender)
+
+ await execute(["del", "10001"], context)
+
+ assert removed == []
+ assert "无法移除超级管理员" in sender.messages[-1][1]
+
+
+@pytest.mark.asyncio
+async def test_admin_del_rejects_non_admin_target() -> None:
+ sender = _DummySender()
+
+ class _Config:
+ superadmin_qq = 10001
+ admin_qqs = [10001]
+
+ def is_admin(self, qq: int) -> bool:
+ return qq in self.admin_qqs
+
+ def is_superadmin(self, qq: int) -> bool:
+ return qq == self.superadmin_qq
+
+ def add_admin(self, qq: int) -> bool: # pragma: no cover
+ return False
+
+ def remove_admin(self, qq: int) -> bool: # pragma: no cover
+ return False
+
+ config = _Config()
+ context = _build_context(config=config, onebot=SimpleNamespace(), sender=sender)
+
+ await execute(["del", "30001"], context)
+
+ assert "不是管理员" in sender.messages[-1][1]
+
+
+@pytest.mark.asyncio
+async def test_admin_unknown_subcommand_shows_usage() -> None:
+ sender = _DummySender()
+
+ class _Config:
+ superadmin_qq = 10001
+ admin_qqs = [10001]
+
+ def is_admin(self, qq: int) -> bool: # pragma: no cover
+ return False
+
+ def is_superadmin(self, qq: int) -> bool: # pragma: no cover
+ return False
+
+ def add_admin(self, qq: int) -> bool: # pragma: no cover
+ return False
+
+ def remove_admin(self, qq: int) -> bool: # pragma: no cover
+ return False
+
+ config = _Config()
+ context = _build_context(config=config, onebot=SimpleNamespace(), sender=sender)
+
+ await execute(["foo"], context)
+
+ assert "用法:/admin" in sender.messages[-1][1]
diff --git a/tests/test_ai_client_tool_guard.py b/tests/test_ai_client_tool_guard.py
new file mode 100644
index 00000000..ec504fdc
--- /dev/null
+++ b/tests/test_ai_client_tool_guard.py
@@ -0,0 +1,28 @@
+from __future__ import annotations
+
+from Undefined.ai.client import (
+ _INVALID_TOOL_CALL_CONTENT,
+ _build_invalid_tool_call_response,
+)
+
+
+def test_build_invalid_tool_call_response_keeps_call_id() -> None:
+ response = _build_invalid_tool_call_response(
+ {"id": "call_empty", "function": {"name": "", "arguments": "{}"}}
+ )
+
+ assert response == {
+ "role": "tool",
+ "tool_call_id": "call_empty",
+ "name": "",
+ "content": _INVALID_TOOL_CALL_CONTENT,
+ }
+
+
+def test_build_invalid_tool_call_response_handles_non_dict() -> None:
+ response = _build_invalid_tool_call_response("bad")
+
+ assert response["role"] == "tool"
+ assert response["tool_call_id"] == ""
+ assert response["name"] == ""
+ assert "工具名称为空或格式非法" in str(response["content"])
diff --git a/tests/test_ai_coordinator_queue_routing.py b/tests/test_ai_coordinator_queue_routing.py
index 2713a126..194e67c7 100644
--- a/tests/test_ai_coordinator_queue_routing.py
+++ b/tests/test_ai_coordinator_queue_routing.py
@@ -212,8 +212,9 @@ def test_build_prompt_limits_proactive_participation_to_technical_contexts() ->
assert "群聊里的主动参与只保留给公开、开放的技术或项目讨论" in prompt
assert "轻松互动、玩梗、吐槽本身不构成参与许可" in prompt
- assert "对于已经决定要回复的场景" in prompt
- assert "默认先尝试 memes.search_memes" in prompt
+ assert "只有明确纯表情包回复才先检索表情包" in prompt
+ assert "第一轮必须优先把必要文字回复做好并调用 send_message" in prompt
+ assert "默认先尝试 memes.search_memes" not in prompt
assert "普通闲聊、玩梗、吐槽、轻松互动:" not in prompt
diff --git a/tests/test_command_qq_arg.py b/tests/test_command_qq_arg.py
index 658ef48a..f67eba5a 100644
--- a/tests/test_command_qq_arg.py
+++ b/tests/test_command_qq_arg.py
@@ -56,14 +56,14 @@ def test_split_command_args_keeps_at_name_with_spaces() -> None:
def test_parse_command_strips_leading_bot_at() -> None:
d = _dispatcher()
- cmd = d.parse_command("[@123456(Bot)] /addadmin 7777777")
- assert cmd == {"name": "addadmin", "args": ["7777777"]}
+ cmd = d.parse_command("[@123456(Bot)] /admin add 7777777")
+ assert cmd == {"name": "admin", "args": ["add", "7777777"]}
def test_parse_command_keeps_inline_at_normalized() -> None:
d = _dispatcher()
- cmd = d.parse_command("[@123456(Bot)] /addadmin [@1708213363(Null)]")
- assert cmd == {"name": "addadmin", "args": ["1708213363"]}
+ cmd = d.parse_command("[@123456(Bot)] /admin add [@1708213363(Null)]")
+ assert cmd == {"name": "admin", "args": ["add", "1708213363"]}
def test_parse_command_keeps_inline_at_with_space_name_normalized() -> None:
diff --git a/tests/test_config_hot_reload.py b/tests/test_config_hot_reload.py
index 4a83d6cd..90da2eab 100644
--- a/tests/test_config_hot_reload.py
+++ b/tests/test_config_hot_reload.py
@@ -357,6 +357,31 @@ def test_apply_config_updates_runtime_model_config_without_rebuilding_core_model
assert len(queue_manager.intervals) == 1
+def test_apply_config_updates_hot_reloads_missing_tool_call_retries() -> None:
+ updated = cast(
+ Any,
+ SimpleNamespace(
+ searxng_url="",
+ missing_tool_call_retries=4,
+ ),
+ )
+ ai_client = _FakeAIClient()
+ context = HotReloadContext(
+ ai_client=cast(Any, ai_client),
+ queue_manager=cast(Any, _FakeQueueManager()),
+ config_manager=cast(Any, SimpleNamespace()),
+ security_service=cast(Any, _FakeSecurityService()),
+ )
+
+ apply_config_updates(
+ updated,
+ {"missing_tool_call_retries": (3, 4)},
+ context,
+ )
+
+ assert ai_client.runtime_updates == [updated]
+
+
def test_apply_config_updates_hot_reloads_attachment_config() -> None:
updated = cast(
Any,
@@ -414,7 +439,7 @@ def test_apply_config_updates_hot_reloads_attachment_config() -> None:
@pytest.mark.asyncio
-async def test_apply_config_updates_refreshes_auto_pipeline_hot_reload() -> None:
+async def test_apply_config_updates_refreshes_pipelines_hot_reload() -> None:
updated = cast(
Any,
SimpleNamespace(
diff --git a/tests/test_core_config.py b/tests/test_core_config.py
new file mode 100644
index 00000000..211ad516
--- /dev/null
+++ b/tests/test_core_config.py
@@ -0,0 +1,36 @@
+from __future__ import annotations
+
+from pathlib import Path
+
+from Undefined.config.loader import Config
+
+
+_MINIMAL = """
+[onebot]
+ws_url = "ws://127.0.0.1:3001"
+[models.chat]
+api_url = "https://api.example/v1"
+api_key = "sk-test"
+model_name = "gpt-test"
+"""
+
+
+def _load(tmp_path: Path, extra: str = "") -> Config:
+ config_path = tmp_path / "config.toml"
+ config_path.write_text(_MINIMAL + extra, "utf-8")
+ return Config.load(config_path, strict=False)
+
+
+def test_missing_tool_call_retries_defaults_to_three(tmp_path: Path) -> None:
+ cfg = _load(tmp_path)
+ assert cfg.missing_tool_call_retries == 3
+
+
+def test_missing_tool_call_retries_clamps_negative(tmp_path: Path) -> None:
+ cfg = _load(tmp_path, "\n[core]\nmissing_tool_call_retries = -1\n")
+ assert cfg.missing_tool_call_retries == 0
+
+
+def test_missing_tool_call_retries_loads_explicit_value(tmp_path: Path) -> None:
+ cfg = _load(tmp_path, "\n[core]\nmissing_tool_call_retries = 5\n")
+ assert cfg.missing_tool_call_retries == 5
diff --git a/tests/test_end_tool.py b/tests/test_end_tool.py
index 4b65584a..92be8277 100644
--- a/tests/test_end_tool.py
+++ b/tests/test_end_tool.py
@@ -60,34 +60,6 @@ async def test_end_accepts_message_sent_flag_from_request_context_string_true()
assert context["conversation_ended"] is True
-@pytest.mark.asyncio
-async def test_end_backward_compat_action_summary_param() -> None:
- """向后兼容:旧参数名 action_summary 仍能正常工作。"""
- context: dict[str, Any] = {"request_id": "req-compat-summary"}
-
- result = await execute(
- {"action_summary": "已发送消息", "force": True},
- context,
- )
-
- assert result == "对话已结束"
- assert context["conversation_ended"] is True
-
-
-@pytest.mark.asyncio
-async def test_end_backward_compat_new_info_param() -> None:
- """向后兼容:旧参数名 new_info 仍能正常工作。"""
- context: dict[str, Any] = {"request_id": "req-compat-new-info"}
-
- result = await execute(
- {"new_info": ["一条旧格式信息"], "force": True},
- context,
- )
-
- assert result == "对话已结束"
- assert context["conversation_ended"] is True
-
-
class _FakeHistoryManager:
def get_recent(
self, chat_id: str, msg_type: str, start: int, end: int
@@ -122,6 +94,29 @@ async def enqueue_job(
return "job-test"
+@pytest.mark.asyncio
+async def test_end_ignores_removed_legacy_param_names() -> None:
+ cognitive_service = _FakeCognitiveService()
+ context: dict[str, Any] = {
+ "request_id": "req-removed-compat",
+ "cognitive_service": cognitive_service,
+ }
+
+ result = await execute(
+ {
+ "action_summary": "旧字段不应写入 memo",
+ "summary": "旧摘要字段不应写入 memo",
+ "new_info": ["旧字段不应写入 observations"],
+ "force": True,
+ },
+ context,
+ )
+
+ assert result == "对话已结束"
+ assert context["conversation_ended"] is True
+ assert cognitive_service.last_context is None
+
+
@pytest.mark.asyncio
async def test_end_enriches_historian_reference_context() -> None:
cognitive_service = _FakeCognitiveService()
@@ -158,6 +153,46 @@ async def test_end_enriches_historian_reference_context() -> None:
assert cognitive_service.last_force is True
+@pytest.mark.asyncio
+async def test_end_historian_source_message_includes_batched_messages() -> None:
+ cognitive_service = _FakeCognitiveService()
+ context: dict[str, Any] = {
+ "request_id": "req-historian-batch",
+ "request_type": "group",
+ "group_id": "1082837821",
+ "user_id": "120218451",
+ "sender_id": "120218451",
+ "cognitive_service": cognitive_service,
+ "current_question": (
+ ''
+ "我周三要发版 "
+ ''
+ "补充:是后端服务发版 "
+ "\n\n 【连续消息说明】以上 2 条 共同构成【当前输入批次】"
+ ),
+ }
+
+ result = await execute(
+ {"observations": ["洛泫周三要进行后端服务发版"], "force": True},
+ context,
+ )
+
+ assert result == "对话已结束"
+ source = str(context.get("historian_source_message", ""))
+ assert "[1]" in source
+ assert "[2]" in source
+ assert "message_id=101" in source
+ assert "message_id=102" in source
+ assert "我周三要发版" in source
+ assert "补充:是后端服务发版" in source
+ assert cognitive_service.last_context is not None
+ assert cognitive_service.last_context.get("historian_source_message") == source
+
+
class _ManyHistoryManager:
def get_recent(
self, chat_id: str, msg_type: str, start: int, end: int
diff --git a/tests/test_handlers_arxiv_auto_extract.py b/tests/test_handlers_arxiv_auto_extract.py
index a5c67ba3..226c6ea1 100644
--- a/tests/test_handlers_arxiv_auto_extract.py
+++ b/tests/test_handlers_arxiv_auto_extract.py
@@ -8,7 +8,7 @@
import Undefined.handlers as handlers_module
from Undefined.handlers import MessageHandler
-from Undefined.skills.auto_pipeline import AutoPipelineRegistry
+from Undefined.skills.pipelines import PipelineRegistry
@pytest.mark.asyncio
@@ -50,8 +50,8 @@ async def test_private_message_runs_arxiv_auto_extract_before_ai_reply(
handler._background_tasks = set()
handler._extract_arxiv_ids = MagicMock(return_value=["2501.01234"])
handler._handle_arxiv_extract = AsyncMock()
- handler.auto_pipeline_registry = AutoPipelineRegistry()
- handler.auto_pipeline_registry.load_items()
+ handler.pipeline_registry = PipelineRegistry()
+ handler.pipeline_registry.load_items()
handler._spawn_background_task = MagicMock()
event = {
diff --git a/tests/test_handlers_github_auto_extract.py b/tests/test_handlers_github_auto_extract.py
index e88f293d..6b1fb85f 100644
--- a/tests/test_handlers_github_auto_extract.py
+++ b/tests/test_handlers_github_auto_extract.py
@@ -8,7 +8,7 @@
import Undefined.handlers as handlers_module
from Undefined.handlers import MessageHandler
-from Undefined.skills.auto_pipeline import AutoPipelineRegistry
+from Undefined.skills.pipelines import PipelineRegistry
@pytest.mark.asyncio
@@ -51,8 +51,8 @@ async def test_private_message_runs_github_auto_extract_before_ai_reply(
handler._background_tasks = set()
handler._extract_github_repo_ids = MagicMock(return_value=["69gg/Undefined"])
handler._handle_github_extract = AsyncMock()
- handler.auto_pipeline_registry = AutoPipelineRegistry()
- handler.auto_pipeline_registry.load_items()
+ handler.pipeline_registry = PipelineRegistry()
+ handler.pipeline_registry.load_items()
handler._spawn_background_task = MagicMock()
event = {
diff --git a/tests/test_handlers_auto_extract_pipeline.py b/tests/test_handlers_pipelines.py
similarity index 88%
rename from tests/test_handlers_auto_extract_pipeline.py
rename to tests/test_handlers_pipelines.py
index 82e5fea0..380f29c4 100644
--- a/tests/test_handlers_auto_extract_pipeline.py
+++ b/tests/test_handlers_pipelines.py
@@ -9,12 +9,12 @@
import Undefined.handlers as handlers_module
from Undefined.handlers import MessageHandler
-from Undefined.skills.auto_pipeline import AutoPipelineRegistry
+from Undefined.skills.pipelines import PipelineRegistry
@pytest.mark.asyncio
-async def test_message_handler_initializes_auto_pipeline_async() -> None:
- class _FakeAutoPipelineRegistry:
+async def test_message_handler_initializes_pipelines_async() -> None:
+ class _FakePipelineRegistry:
def __init__(self) -> None:
self.load_count = 0
self.started: list[tuple[float, float]] = []
@@ -26,30 +26,30 @@ async def load_items_async(self) -> None:
def start_hot_reload(self, *, interval: float, debounce: float) -> None:
self.started.append((interval, debounce))
- registry = _FakeAutoPipelineRegistry()
+ registry = _FakePipelineRegistry()
handler: Any = MessageHandler.__new__(MessageHandler)
handler.config = SimpleNamespace(
skills_hot_reload=True,
skills_hot_reload_interval=3.0,
skills_hot_reload_debounce=0.75,
)
- handler.auto_pipeline_registry = registry
- handler._auto_pipeline_initialized = False
+ handler.pipeline_registry = registry
+ handler._pipelines_initialized = False
await asyncio.gather(
- handler.initialize_auto_pipeline(),
- handler.initialize_auto_pipeline(),
+ handler.init_pipelines(),
+ handler.init_pipelines(),
)
- await handler.initialize_auto_pipeline()
+ await handler.init_pipelines()
assert registry.load_count == 1
assert registry.started == [(3.0, 0.75)]
- assert handler._auto_pipeline_initialized is True
+ assert handler._pipelines_initialized is True
@pytest.mark.asyncio
-async def test_auto_extract_pipeline_initializes_when_flag_missing() -> None:
- class _FakeAutoPipelineRegistry:
+async def test_pipelines_initializes_when_flag_missing() -> None:
+ class _FakePipelineRegistry:
def __init__(self) -> None:
self.loaded = False
self.run_context: dict[str, Any] | None = None
@@ -61,12 +61,12 @@ async def run(self, context: dict[str, Any]) -> list[object]:
self.run_context = context
return [object()] if self.loaded else []
- registry = _FakeAutoPipelineRegistry()
+ registry = _FakePipelineRegistry()
handler: Any = MessageHandler.__new__(MessageHandler)
handler.config = SimpleNamespace(skills_hot_reload=False)
handler.sender = SimpleNamespace()
handler.onebot = SimpleNamespace()
- handler.auto_pipeline_registry = registry
+ handler.pipeline_registry = registry
handler._extract_bilibili_ids = AsyncMock(return_value=[])
handler._extract_arxiv_ids = MagicMock(return_value=[])
handler._extract_github_repo_ids = MagicMock(return_value=[])
@@ -74,7 +74,7 @@ async def run(self, context: dict[str, Any]) -> list[object]:
handler._handle_arxiv_extract = AsyncMock()
handler._handle_github_extract = AsyncMock()
- handled = await handler._run_auto_extract_pipeline(
+ handled = await handler._run_pipelines(
target_id=20001,
target_type="private",
text="hello",
@@ -84,11 +84,11 @@ async def run(self, context: dict[str, Any]) -> list[object]:
assert handled is True
assert registry.loaded is True
assert registry.run_context is not None
- assert handler._auto_pipeline_initialized is True
+ assert handler._pipelines_initialized is True
@pytest.mark.asyncio
-async def test_auto_extract_pipeline_processes_all_matches() -> None:
+async def test_pipelines_processes_all_matches() -> None:
handler: Any = MessageHandler.__new__(MessageHandler)
handler.sender = SimpleNamespace()
handler.onebot = SimpleNamespace()
@@ -106,10 +106,10 @@ async def test_auto_extract_pipeline_processes_all_matches() -> None:
handler._handle_bilibili_extract = AsyncMock()
handler._handle_arxiv_extract = AsyncMock()
handler._handle_github_extract = AsyncMock()
- handler.auto_pipeline_registry = AutoPipelineRegistry()
- handler.auto_pipeline_registry.load_items()
+ handler.pipeline_registry = PipelineRegistry()
+ handler.pipeline_registry.load_items()
- handled = await handler._run_auto_extract_pipeline(
+ handled = await handler._run_pipelines(
target_id=20001,
target_type="private",
text="BV1xx411c7mD 69gg/Undefined",
@@ -135,7 +135,7 @@ async def test_auto_extract_pipeline_processes_all_matches() -> None:
@pytest.mark.asyncio
-async def test_private_command_skips_auto_pipeline_and_ai(
+async def test_private_command_skips_pipelines_and_ai(
monkeypatch: pytest.MonkeyPatch,
) -> None:
monkeypatch.setattr(
@@ -169,7 +169,7 @@ async def test_private_command_skips_auto_pipeline_and_ai(
parse_command=MagicMock(return_value=command),
dispatch_private=AsyncMock(),
)
- handler.auto_pipeline_registry = SimpleNamespace(
+ handler.pipeline_registry = SimpleNamespace(
run=AsyncMock(return_value=[]),
)
handler._background_tasks = set()
@@ -195,7 +195,7 @@ async def test_private_command_skips_auto_pipeline_and_ai(
assert handler.history_manager.add_private_message.await_args is not None
private_history = handler.history_manager.add_private_message.await_args.kwargs
assert private_history["text_content"] == "/help"
- handler.auto_pipeline_registry.run.assert_not_awaited()
+ handler.pipeline_registry.run.assert_not_awaited()
handler.ai_coordinator.model_pool.handle_private_message.assert_not_awaited()
handler.ai_coordinator.handle_private_reply.assert_not_awaited()
@@ -233,7 +233,7 @@ async def test_private_model_pool_command_runs_before_command_dispatch(
parse_command=MagicMock(return_value=command),
dispatch_private=AsyncMock(),
)
- handler.auto_pipeline_registry = SimpleNamespace(run=AsyncMock(return_value=[]))
+ handler.pipeline_registry = SimpleNamespace(run=AsyncMock(return_value=[]))
handler._background_tasks = set()
handler._profile_name_refresh_cache = {}
handler._collect_message_attachments = AsyncMock(return_value=[])
@@ -263,7 +263,7 @@ async def test_private_model_pool_command_runs_before_command_dispatch(
)
handler.command_dispatcher.parse_command.assert_not_called()
handler.command_dispatcher.dispatch_private.assert_not_awaited()
- handler.auto_pipeline_registry.run.assert_not_awaited()
+ handler.pipeline_registry.run.assert_not_awaited()
handler.ai_coordinator.handle_private_reply.assert_not_awaited()
@@ -300,8 +300,8 @@ async def test_private_message_starting_with_select_does_not_touch_model_pool(
parse_command=MagicMock(return_value=None),
dispatch_private=AsyncMock(),
)
- handler.auto_pipeline_registry = SimpleNamespace(run=AsyncMock(return_value=[]))
- handler._auto_pipeline_initialized = True
+ handler.pipeline_registry = SimpleNamespace(run=AsyncMock(return_value=[]))
+ handler._pipelines_initialized = True
handler._background_tasks = set()
handler._profile_name_refresh_cache = {}
handler._collect_message_attachments = AsyncMock(return_value=[])
@@ -326,7 +326,7 @@ async def test_private_message_starting_with_select_does_not_touch_model_pool(
await handler.handle_message(event)
handler.ai_coordinator.model_pool.handle_private_message.assert_not_awaited()
- handler.auto_pipeline_registry.run.assert_awaited_once()
+ handler.pipeline_registry.run.assert_awaited_once()
handler.ai_coordinator.handle_private_reply.assert_awaited_once()
@@ -364,8 +364,8 @@ async def test_private_model_pool_command_ignored_when_pool_disabled(
parse_command=MagicMock(return_value=None),
dispatch_private=AsyncMock(),
)
- handler.auto_pipeline_registry = SimpleNamespace(run=AsyncMock(return_value=[]))
- handler._auto_pipeline_initialized = True
+ handler.pipeline_registry = SimpleNamespace(run=AsyncMock(return_value=[]))
+ handler._pipelines_initialized = True
handler._background_tasks = set()
handler._profile_name_refresh_cache = {}
handler._collect_message_attachments = AsyncMock(return_value=[])
@@ -391,12 +391,12 @@ async def test_private_model_pool_command_ignored_when_pool_disabled(
handler.ai_coordinator.model_pool.handle_private_message.assert_not_awaited()
handler.command_dispatcher.parse_command.assert_called_once_with("/compare hello")
- handler.auto_pipeline_registry.run.assert_awaited_once()
+ handler.pipeline_registry.run.assert_awaited_once()
handler.ai_coordinator.handle_private_reply.assert_awaited_once()
@pytest.mark.asyncio
-async def test_group_command_skips_auto_pipeline_and_ai(
+async def test_group_command_skips_pipelines_and_ai(
monkeypatch: pytest.MonkeyPatch,
) -> None:
monkeypatch.setattr(
@@ -429,7 +429,7 @@ async def test_group_command_skips_auto_pipeline_and_ai(
parse_command=MagicMock(return_value=command),
dispatch=AsyncMock(),
)
- handler.auto_pipeline_registry = SimpleNamespace(
+ handler.pipeline_registry = SimpleNamespace(
run=AsyncMock(return_value=[]),
)
handler._schedule_profile_display_name_refresh = MagicMock()
@@ -463,5 +463,5 @@ async def test_group_command_skips_auto_pipeline_and_ai(
assert handler.history_manager.add_group_message.await_args is not None
group_history = handler.history_manager.add_group_message.await_args.kwargs
assert group_history["text_content"] == "/help"
- handler.auto_pipeline_registry.run.assert_not_awaited()
+ handler.pipeline_registry.run.assert_not_awaited()
handler.ai_coordinator.handle_auto_reply.assert_not_awaited()
diff --git a/tests/test_handlers_repeat.py b/tests/test_handlers_repeat.py
index 55207e69..d649c981 100644
--- a/tests/test_handlers_repeat.py
+++ b/tests/test_handlers_repeat.py
@@ -47,10 +47,10 @@ def _build_handler(
send_group_message=AsyncMock(),
send_private_message=AsyncMock(),
)
- handler.auto_pipeline_registry = SimpleNamespace(
+ handler.pipeline_registry = SimpleNamespace(
run=AsyncMock(return_value=[]),
)
- handler._auto_pipeline_initialized = True
+ handler._pipelines_initialized = True
handler.ai_coordinator = SimpleNamespace(
handle_auto_reply=AsyncMock(),
handle_private_reply=AsyncMock(),
@@ -126,7 +126,7 @@ async def test_repeat_triggers_on_3_identical_from_different_senders() -> None:
for uid in [20001, 20002]:
await handler.handle_message(_group_event(sender_id=uid, text="hello"))
- handler.auto_pipeline_registry.run.reset_mock()
+ handler.pipeline_registry.run.reset_mock()
handler.ai_coordinator.handle_auto_reply.reset_mock()
await handler.handle_message(_group_event(sender_id=20003, text="hello"))
@@ -135,7 +135,7 @@ async def test_repeat_triggers_on_3_identical_from_different_senders() -> None:
assert call.args[0] == 30001
assert call.args[1] == "hello"
assert call.kwargs.get("history_prefix") == REPEAT_REPLY_HISTORY_PREFIX
- handler.auto_pipeline_registry.run.assert_not_called()
+ handler.pipeline_registry.run.assert_not_called()
handler.ai_coordinator.handle_auto_reply.assert_not_called()
handler._bot_nickname_cache.get_nicknames.assert_not_called()
diff --git a/tests/test_llm_retry_suppression.py b/tests/test_llm_retry_suppression.py
index 197b9b60..2012a693 100644
--- a/tests/test_llm_retry_suppression.py
+++ b/tests/test_llm_retry_suppression.py
@@ -79,7 +79,22 @@ async def test_ai_ask_retries_pre_tool_local_failure() -> None:
end_summaries=[],
),
)
- client.tool_manager = cast(Any, SimpleNamespace(get_openai_tools=lambda: []))
+
+ async def _execute_tool(
+ name: str, args: dict[str, Any], ctx: dict[str, Any]
+ ) -> str:
+ if name == "end":
+ ctx["conversation_ended"] = True
+ return "对话已结束"
+ return "ok"
+
+ client.tool_manager = cast(
+ Any,
+ SimpleNamespace(
+ get_openai_tools=lambda: [],
+ execute_tool=_execute_tool,
+ ),
+ )
client._filter_tools_for_runtime_config = lambda tools: tools
client._get_runtime_config = cast(Any, lambda: client.runtime_config)
client.model_selector = cast(Any, SimpleNamespace(wait_ready=AsyncMock()))
@@ -93,7 +108,24 @@ async def test_ai_ask_retries_pre_tool_local_failure() -> None:
client.submit_queued_llm_call = AsyncMock(
side_effect=[
{"choices": []},
- {"choices": [{"message": {"content": "ok"}}]},
+ {
+ "choices": [
+ {
+ "message": {
+ "content": "",
+ "tool_calls": [
+ {
+ "id": "call_end",
+ "function": {
+ "name": "end",
+ "arguments": "{}",
+ },
+ }
+ ],
+ }
+ }
+ ],
+ },
]
)
client._search_wrapper = None
@@ -112,10 +144,76 @@ async def test_ai_ask_retries_pre_tool_local_failure() -> None:
result = await AIClient.ask(client, "hello")
- assert result == "ok"
+ assert result == ""
assert cast(AsyncMock, client.submit_queued_llm_call).await_count == 2
+@pytest.mark.asyncio
+async def test_ai_ask_limits_missing_tool_call_retries() -> None:
+ client: Any = object.__new__(AIClient)
+ client.runtime_config = cast(
+ Any,
+ SimpleNamespace(
+ log_thinking=False,
+ ai_request_max_retries=0,
+ missing_tool_call_retries=2,
+ ),
+ )
+ client._prompt_builder = cast(
+ Any,
+ SimpleNamespace(
+ build_messages=AsyncMock(
+ return_value=[{"role": "user", "content": "hello"}]
+ ),
+ end_summaries=[],
+ ),
+ )
+ client.tool_manager = cast(
+ Any,
+ SimpleNamespace(
+ get_openai_tools=lambda: [],
+ execute_tool=AsyncMock(),
+ ),
+ )
+ client._filter_tools_for_runtime_config = lambda tools: tools
+ client._get_runtime_config = cast(Any, lambda: client.runtime_config)
+ client.model_selector = cast(Any, SimpleNamespace(wait_ready=AsyncMock()))
+ client.chat_config = ChatModelConfig(
+ api_url="https://api.openai.com/v1",
+ api_key="sk-test",
+ model_name="chat-model",
+ max_tokens=1024,
+ )
+ client._find_chat_config_by_name = lambda _name: client.chat_config
+ client.submit_queued_llm_call = AsyncMock(
+ side_effect=[
+ {"choices": [{"message": {"content": "plain 1", "tool_calls": []}}]},
+ {"choices": [{"message": {"content": "plain 2", "tool_calls": []}}]},
+ {"choices": [{"message": {"content": "plain 3", "tool_calls": []}}]},
+ ]
+ )
+ client._search_wrapper = None
+ client._end_summary_storage = cast(Any, None)
+ client._send_private_message_callback = None
+ client._send_image_callback = None
+ client.memory_storage = None
+ client._knowledge_manager = None
+ client._cognitive_service = None
+ client._meme_service = None
+ client._crawl4ai_capabilities = SimpleNamespace(
+ available=False,
+ error=None,
+ proxy_config_available=False,
+ )
+ send_message = AsyncMock()
+
+ result = await AIClient.ask(client, "hello", send_message_callback=send_message)
+
+ assert result == ""
+ assert cast(AsyncMock, client.submit_queued_llm_call).await_count == 3
+ send_message.assert_awaited_once_with("plain 3")
+
+
@pytest.mark.asyncio
async def test_agent_runner_reraises_queued_llm_error(tmp_path: Path) -> None:
agent_dir = tmp_path / "demo_agent"
diff --git a/tests/test_lsadmin_command.py b/tests/test_lsadmin_command.py
deleted file mode 100644
index 169c6b9e..00000000
--- a/tests/test_lsadmin_command.py
+++ /dev/null
@@ -1,112 +0,0 @@
-from __future__ import annotations
-
-from types import SimpleNamespace
-from typing import Any, cast
-from unittest.mock import AsyncMock, call
-
-import pytest
-
-from Undefined.services.command import CommandDispatcher
-from Undefined.services.commands.context import CommandContext
-from Undefined.skills.commands.lsadmin.handler import execute
-
-
-class _DummySender:
- def __init__(self) -> None:
- self.messages: list[tuple[int, str, bool]] = []
-
- async def send_group_message(
- self,
- group_id: int,
- message: str,
- mark_sent: bool = False,
- ) -> None:
- self.messages.append((group_id, message, mark_sent))
-
-
-def _build_context(
- *,
- config: Any,
- onebot: Any,
- sender: _DummySender,
-) -> CommandContext:
- stub = cast(Any, SimpleNamespace())
- return CommandContext(
- group_id=12345,
- sender_id=54321,
- config=cast(Any, config),
- sender=cast(Any, sender),
- ai=stub,
- faq_storage=stub,
- onebot=cast(Any, onebot),
- security=stub,
- queue_manager=None,
- rate_limiter=None,
- dispatcher=stub,
- registry=stub,
- )
-
-
-@pytest.mark.asyncio
-async def test_lsadmin_outputs_names_without_qq_leakage() -> None:
- sender = _DummySender()
- onebot = SimpleNamespace(
- get_group_member_list=AsyncMock(
- return_value=[
- {"user_id": 10001, "card": "超管群名片", "nickname": "超管昵称"},
- {"user_id": 10002, "card": "", "nickname": "群管理员"},
- ]
- ),
- get_stranger_info=AsyncMock(return_value={"nickname": "QQ管理员"}),
- )
- config = SimpleNamespace(superadmin_qq=10001, admin_qqs=[10001, 10002, 10003])
- context = _build_context(config=config, onebot=onebot, sender=sender)
-
- await execute([], context)
-
- assert sender.messages
- output = sender.messages[-1][1]
- assert "👑 超级管理员: 超管群名片" in output
- assert "- 群管理员" in output
- assert "- QQ管理员" in output
- assert "10001" not in output
- assert "10002" not in output
- assert "10003" not in output
- onebot.get_group_member_list.assert_awaited_once_with(12345)
- onebot.get_stranger_info.assert_awaited_once_with(10003)
-
-
-@pytest.mark.asyncio
-async def test_lsadmin_falls_back_to_unknown_name_without_exposing_qq() -> None:
- sender = _DummySender()
- onebot = SimpleNamespace(
- get_group_member_list=AsyncMock(side_effect=RuntimeError("boom")),
- get_stranger_info=AsyncMock(return_value={}),
- )
- config = SimpleNamespace(superadmin_qq=20001, admin_qqs=[20001, 20002])
- context = _build_context(config=config, onebot=onebot, sender=sender)
-
- await execute([], context)
-
- assert sender.messages
- output = sender.messages[-1][1]
- assert "未知成员" in output
- assert "20001" not in output
- assert "20002" not in output
- assert onebot.get_stranger_info.await_args_list == [call(20001), call(20002)]
-
-
-def test_lsadmin_requires_admin_permission() -> None:
- dispatcher = CommandDispatcher(
- config=cast(Any, SimpleNamespace()),
- sender=cast(Any, _DummySender()),
- ai=cast(Any, SimpleNamespace()),
- faq_storage=cast(Any, SimpleNamespace()),
- onebot=cast(Any, SimpleNamespace()),
- security=cast(Any, SimpleNamespace(rate_limiter=None)),
- )
-
- meta = dispatcher.command_registry.resolve("lsadmin")
-
- assert meta is not None
- assert meta.permission == "admin"
diff --git a/tests/test_message_batcher.py b/tests/test_message_batcher.py
new file mode 100644
index 00000000..09048d0f
--- /dev/null
+++ b/tests/test_message_batcher.py
@@ -0,0 +1,796 @@
+"""MessageBatcher 单元测试。"""
+
+from __future__ import annotations
+
+import asyncio
+import time
+
+import pytest
+
+from Undefined.config.models import MessageBatcherConfig
+from Undefined.services.message_batcher import (
+ BatchDispatchToken,
+ BufferedMessage,
+ MessageBatcher,
+ make_scope,
+)
+
+
+def _make_item(
+ *,
+ scope: str = "group:1",
+ sender_id: int = 100,
+ text: str = "hi",
+ is_private: bool = False,
+ is_poke: bool = False,
+ is_at_bot: bool = False,
+ sender_name: str = "test",
+) -> BufferedMessage:
+ return BufferedMessage(
+ scope=scope,
+ sender_id=sender_id,
+ text=text,
+ message_content=[{"type": "text", "data": {"text": text}}],
+ attachments=[],
+ sender_name=sender_name,
+ arrival_time=time.time(),
+ is_private=is_private,
+ trigger_message_id=1,
+ is_poke=is_poke,
+ is_at_bot=is_at_bot,
+ group_id=None if is_private else 1,
+ )
+
+
+class _Recorder:
+ def __init__(self) -> None:
+ self.batches: list[list[BufferedMessage]] = []
+ self.event = asyncio.Event()
+
+ async def __call__(self, items: list[BufferedMessage]) -> None:
+ self.batches.append(items)
+ self.event.set()
+
+
+def test_make_scope() -> None:
+ assert make_scope(group_id=10) == "group:10"
+ assert make_scope(user_id=5) == "private:5"
+ assert make_scope() == "unknown"
+
+
+@pytest.mark.asyncio
+async def test_consecutive_same_sender_merge() -> None:
+ cfg = MessageBatcherConfig(enabled=True, window_seconds=0.1, strategy="extend")
+ rec = _Recorder()
+ batcher = MessageBatcher(cfg, rec)
+
+ await batcher.submit(_make_item(text="msg1"))
+ await batcher.submit(_make_item(text="msg2"))
+ await batcher.submit(_make_item(text="msg3"))
+
+ await asyncio.wait_for(rec.event.wait(), timeout=1.0)
+ assert len(rec.batches) == 1
+ assert [m.text for m in rec.batches[0]] == ["msg1", "msg2", "msg3"]
+
+
+@pytest.mark.asyncio
+async def test_different_senders_isolated() -> None:
+ cfg = MessageBatcherConfig(enabled=True, window_seconds=0.1)
+ rec = _Recorder()
+ batcher = MessageBatcher(cfg, rec)
+
+ await batcher.submit(_make_item(sender_id=1, text="a"))
+ await batcher.submit(_make_item(sender_id=2, text="b"))
+
+ await asyncio.sleep(0.3)
+ assert len(rec.batches) == 2
+ flat = sorted([b[0].sender_id for b in rec.batches])
+ assert flat == [1, 2]
+
+
+@pytest.mark.asyncio
+async def test_max_messages_immediate_flush() -> None:
+ cfg = MessageBatcherConfig(
+ enabled=True,
+ window_seconds=10.0,
+ max_messages_per_batch=2,
+ )
+ rec = _Recorder()
+ batcher = MessageBatcher(cfg, rec)
+
+ await batcher.submit(_make_item(text="x"))
+ await batcher.submit(_make_item(text="y"))
+
+ # 立即发车,不需要等窗口
+ assert len(rec.batches) == 1
+ assert len(rec.batches[0]) == 2
+
+
+@pytest.mark.asyncio
+async def test_max_window_hard_cap() -> None:
+ cfg = MessageBatcherConfig(
+ enabled=True,
+ window_seconds=0.05,
+ strategy="extend",
+ max_window_seconds=0.15,
+ )
+ rec = _Recorder()
+ batcher = MessageBatcher(cfg, rec)
+
+ # 持续提交,extend 应被 max_window 硬顶
+ for _ in range(10):
+ await batcher.submit(_make_item(text="x"))
+ await asyncio.sleep(0.03)
+
+ await asyncio.sleep(0.3)
+ # 至少触发过一次 flush
+ assert len(rec.batches) >= 1
+
+
+@pytest.mark.asyncio
+async def test_disabled_means_caller_should_bypass() -> None:
+ cfg = MessageBatcherConfig(enabled=False, window_seconds=0.1)
+ rec = _Recorder()
+ batcher = MessageBatcher(cfg, rec)
+
+ assert batcher.is_enabled_for(is_group=True) is False
+ assert batcher.is_enabled_for(is_group=False) is False
+
+
+@pytest.mark.asyncio
+async def test_group_only_disabled() -> None:
+ cfg = MessageBatcherConfig(
+ enabled=True, window_seconds=0.1, group_enabled=False, private_enabled=True
+ )
+ batcher = MessageBatcher(cfg, lambda items: asyncio.sleep(0))
+ assert batcher.is_enabled_for(is_group=True) is False
+ assert batcher.is_enabled_for(is_group=False) is True
+
+
+@pytest.mark.asyncio
+async def test_has_buffer() -> None:
+ cfg = MessageBatcherConfig(enabled=True, window_seconds=10.0)
+ rec = _Recorder()
+ batcher = MessageBatcher(cfg, rec)
+
+ assert not batcher.has_buffer("group:1", 100)
+ await batcher.submit(_make_item())
+ assert batcher.has_buffer("group:1", 100)
+
+
+@pytest.mark.asyncio
+async def test_flush_all_on_shutdown() -> None:
+ cfg = MessageBatcherConfig(enabled=True, window_seconds=10.0)
+ rec = _Recorder()
+ batcher = MessageBatcher(cfg, rec)
+
+ await batcher.submit(_make_item(sender_id=1, text="a"))
+ await batcher.submit(_make_item(sender_id=2, text="b"))
+ assert len(rec.batches) == 0
+
+ await batcher.flush_all()
+ assert len(rec.batches) == 2
+
+
+@pytest.mark.asyncio
+async def test_extend_resets_timer() -> None:
+ cfg = MessageBatcherConfig(enabled=True, window_seconds=0.15, strategy="extend")
+ rec = _Recorder()
+ batcher = MessageBatcher(cfg, rec)
+
+ await batcher.submit(_make_item(text="a"))
+ await asyncio.sleep(0.10)
+ await batcher.submit(_make_item(text="b"))
+ await asyncio.sleep(0.10)
+ # 这个时间点本来 a 已经超过初始 0.15s 窗口;若 extend 重置则 b 还在等
+ assert len(rec.batches) == 0
+ await asyncio.sleep(0.20)
+ assert len(rec.batches) == 1
+
+
+@pytest.mark.asyncio
+async def test_fixed_does_not_reset_timer() -> None:
+ cfg = MessageBatcherConfig(enabled=True, window_seconds=0.15, strategy="fixed")
+ rec = _Recorder()
+ batcher = MessageBatcher(cfg, rec)
+
+ await batcher.submit(_make_item(text="a"))
+ await asyncio.sleep(0.05)
+ await batcher.submit(_make_item(text="b"))
+ # fixed 策略下定时器从首条算起,大约 0.15s 后 flush
+ await asyncio.sleep(0.20)
+ assert len(rec.batches) == 1
+ assert len(rec.batches[0]) == 2
+
+
+@pytest.mark.asyncio
+async def test_update_config_runtime() -> None:
+ cfg = MessageBatcherConfig(enabled=True, window_seconds=0.1)
+ rec = _Recorder()
+ batcher = MessageBatcher(cfg, rec)
+
+ new_cfg = MessageBatcherConfig(enabled=False, window_seconds=0.5)
+ batcher.update_config(new_cfg)
+ assert batcher.config.enabled is False
+ assert batcher.is_enabled_for(is_group=True) is False
+
+
+@pytest.mark.asyncio
+async def test_callback_exception_does_not_break_batcher() -> None:
+ cfg = MessageBatcherConfig(enabled=True, window_seconds=0.05)
+
+ calls: list[int] = []
+
+ async def bad_callback(items: list[BufferedMessage]) -> None:
+ calls.append(len(items))
+ raise RuntimeError("boom")
+
+ batcher = MessageBatcher(cfg, bad_callback)
+ await batcher.submit(_make_item(text="a"))
+ await asyncio.sleep(0.2)
+ assert calls == [1, 1]
+ assert batcher.has_buffer("group:1", 100)
+
+ # 应能继续接受新消息
+ await batcher.submit(_make_item(text="b"))
+ await asyncio.sleep(0.2)
+ assert calls == [1, 1, 2]
+ assert batcher.has_buffer("group:1", 100)
+
+
+@pytest.mark.asyncio
+async def test_timer_task_strong_reference_survives_gc() -> None:
+ """timer 触发后创建的 flush task 必须被强引用,避免被 GC 回收。
+
+ asyncio 文档明确警告 ``create_task`` 返回值若不被保留,可能在执行前被 GC。
+ """
+ import gc
+
+ cfg = MessageBatcherConfig(enabled=True, window_seconds=0.05, strategy="extend")
+ rec = _Recorder()
+ batcher = MessageBatcher(cfg, rec)
+
+ await batcher.submit(_make_item(text="x"))
+ # 在 timer 触发后但 callback 未必完成时强制 GC
+ await asyncio.sleep(0.06)
+ gc.collect()
+ await asyncio.wait_for(rec.event.wait(), timeout=1.0)
+ assert len(rec.batches) == 1
+
+
+@pytest.mark.asyncio
+async def test_flush_all_awaits_in_flight_tasks() -> None:
+ """flush_all 应等待 timer 触发但 callback 仍在执行的 task 收尾。"""
+ cfg = MessageBatcherConfig(enabled=True, window_seconds=0.05)
+ finished: list[bool] = []
+ started = asyncio.Event()
+
+ async def slow_callback(items: list[BufferedMessage]) -> None:
+ started.set()
+ await asyncio.sleep(0.15)
+ finished.append(True)
+
+ batcher = MessageBatcher(cfg, slow_callback)
+ await batcher.submit(_make_item(text="x"))
+ # 等 timer 触发并进入 callback
+ await asyncio.wait_for(started.wait(), timeout=1.0)
+ # callback 仍在 sleep 中调 flush_all 应阻塞直到完成
+ await batcher.flush_all()
+ assert finished == [True]
+
+
+@pytest.mark.asyncio
+async def test_max_window_seconds_zero_means_unlimited() -> None:
+ """max_window_seconds=0 表示不限制硬顶,只要 extend 持续刷新就一直等。"""
+ cfg = MessageBatcherConfig(
+ enabled=True,
+ window_seconds=0.05,
+ strategy="extend",
+ max_window_seconds=0.0,
+ )
+ rec = _Recorder()
+ batcher = MessageBatcher(cfg, rec)
+
+ # 连续 6 次提交,每次间隔 30ms(< window_seconds),如果 max_window 仍生效会被强行 flush
+ for i in range(6):
+ await batcher.submit(_make_item(text=f"m{i}"))
+ await asyncio.sleep(0.03)
+ # 此时距首条已 ~180ms(远超旧 max_window 的虚假"硬顶",但 0=不限),仍应在 buffer 中
+ assert rec.batches == []
+ # 停止追加,让 timer 自然到期
+ await asyncio.sleep(0.1)
+ assert len(rec.batches) == 1
+ assert len(rec.batches[0]) == 6
+
+
+# ---------------------------------------------------------------------------
+# 投机预发送(speculative pre-fire)测试
+# ---------------------------------------------------------------------------
+
+
+class _FakeRequestContext:
+ """模拟 RequestContext,仅暴露 get_resource。"""
+
+ def __init__(self) -> None:
+ self._resources: dict[str, object] = {}
+
+ def set_resource(self, key: str, value: object) -> None:
+ self._resources[key] = value
+
+ def get_resource(self, key: str, default: object = None) -> object:
+ return self._resources.get(key, default)
+
+
+@pytest.mark.asyncio
+async def test_speculative_prefire_fires_at_t2_but_batch_continues() -> None:
+ """T2 < T1:T2 到期先发车,items 不弹出;T1 之前再来消息会取消投机。"""
+ cfg = MessageBatcherConfig(
+ enabled=True,
+ window_seconds=0.3,
+ pre_send_seconds=0.1,
+ strategy="extend",
+ )
+ rec = _Recorder()
+ batcher = MessageBatcher(cfg, rec)
+
+ await batcher.submit(_make_item(text="m1"))
+ # 等待 T2 触发(~100ms)但远未到 T1(300ms)
+ await asyncio.sleep(0.18)
+ assert len(rec.batches) == 1, "T2 应已 pre-fire"
+ # 桶仍存在
+ assert batcher.has_buffer("group:1", 100)
+
+
+@pytest.mark.asyncio
+async def test_t1_after_speculative_prefire_does_not_dispatch_twice() -> None:
+ """T2 已经投机发车后,T1 只结束 batch,不能再次调用 callback。"""
+ cfg = MessageBatcherConfig(
+ enabled=True,
+ window_seconds=0.12,
+ pre_send_seconds=0.03,
+ strategy="extend",
+ )
+ rec = _Recorder()
+ batcher = MessageBatcher(cfg, rec)
+
+ await batcher.submit(_make_item(text="m1"))
+ await asyncio.wait_for(rec.event.wait(), timeout=0.5)
+ await asyncio.sleep(0.18)
+
+ assert len(rec.batches) == 1
+ assert not batcher.has_buffer("group:1", 100)
+
+
+@pytest.mark.asyncio
+async def test_speculative_cancelled_when_new_message_and_no_send() -> None:
+ """投机调用尚未发出消息时,新消息到达应取消 inflight 并把它合进新一轮。"""
+ cfg = MessageBatcherConfig(
+ enabled=True,
+ window_seconds=0.3,
+ pre_send_seconds=0.05,
+ strategy="extend",
+ )
+
+ cancelled = asyncio.Event()
+ fake_ctx = _FakeRequestContext() # 默认 message_sent_this_turn=False
+
+ async def slow_flush(items: list[BufferedMessage]) -> None:
+ try:
+ await asyncio.sleep(2.0)
+ except asyncio.CancelledError:
+ cancelled.set()
+ raise
+
+ batcher = MessageBatcher(cfg, slow_flush)
+
+ await batcher.submit(_make_item(text="m1"))
+ # 等待 T2 触发
+ await asyncio.sleep(0.1)
+ # 模拟 coordinator 上报 inflight
+ inflight_task = next(iter(batcher._pending_tasks))
+ batcher.register_inflight("group:1", 100, inflight_task, fake_ctx)
+ # 第二条消息到达,应取消 inflight
+ await batcher.submit(_make_item(text="m2"))
+ await asyncio.wait_for(cancelled.wait(), timeout=1.0)
+
+
+@pytest.mark.asyncio
+async def test_speculative_not_cancelled_when_already_sent_default() -> None:
+ """已经发过消息时默认不取消 inflight,新消息开新 batch。"""
+ cfg = MessageBatcherConfig(
+ enabled=True,
+ window_seconds=0.3,
+ pre_send_seconds=0.05,
+ strategy="extend",
+ allow_cancel_after_send=False,
+ )
+
+ fake_ctx = _FakeRequestContext()
+ fake_ctx.set_resource("message_sent_this_turn", True)
+
+ finished = asyncio.Event()
+
+ async def flush(items: list[BufferedMessage]) -> None:
+ try:
+ await asyncio.sleep(0.1)
+ finally:
+ finished.set()
+
+ batcher = MessageBatcher(cfg, flush)
+
+ await batcher.submit(_make_item(text="m1"))
+ await asyncio.sleep(0.08)
+ inflight_task = next(iter(batcher._pending_tasks))
+ batcher.register_inflight("group:1", 100, inflight_task, fake_ctx)
+ # 新消息到达:投机已发过消息,inflight 不应被 cancel
+ await batcher.submit(_make_item(text="m2"))
+ # 等 inflight 自然完成
+ await asyncio.wait_for(finished.wait(), timeout=1.0)
+ assert not inflight_task.cancelled()
+
+
+@pytest.mark.asyncio
+async def test_speculative_cancelled_when_already_sent_with_allow_flag() -> None:
+ """``allow_cancel_after_send=True`` 时即便已发过消息也强制取消 inflight。
+
+ inflight 协程会捕获 ``CancelledError`` 转记日志(_invoke_callback 默认行为),
+ 所以仅靠 ``Task.cancelled()`` 不足以判断 — 必须看 callback 是否真的收到 cancel 信号。
+ """
+ cfg = MessageBatcherConfig(
+ enabled=True,
+ window_seconds=0.2,
+ pre_send_seconds=0.05,
+ strategy="extend",
+ allow_cancel_after_send=True,
+ )
+
+ fake_ctx = _FakeRequestContext()
+ fake_ctx.set_resource("message_sent_this_turn", True)
+
+ cancelled_event = asyncio.Event()
+
+ async def flush(items: list[BufferedMessage]) -> None:
+ try:
+ await asyncio.sleep(5.0)
+ except asyncio.CancelledError:
+ cancelled_event.set()
+ raise
+
+ batcher = MessageBatcher(cfg, flush)
+
+ await batcher.submit(_make_item(text="m1"))
+ await asyncio.sleep(0.08)
+ assert batcher._pending_tasks
+ inflight_task = next(iter(batcher._pending_tasks))
+ batcher.register_inflight("group:1", 100, inflight_task, fake_ctx)
+
+ await batcher.submit(_make_item(text="m2"))
+ # 正常情况下 cancel 信号会在 50ms 内传到 callback;超时即视为未取消
+ await asyncio.wait_for(cancelled_event.wait(), timeout=1.0)
+
+
+@pytest.mark.asyncio
+async def test_speculative_disabled_when_pre_send_zero() -> None:
+ """pre_send_seconds=0 时投机关闭,仅 T1 静默到期发车。"""
+ cfg = MessageBatcherConfig(
+ enabled=True,
+ window_seconds=0.1,
+ pre_send_seconds=0.0,
+ )
+ rec = _Recorder()
+ batcher = MessageBatcher(cfg, rec)
+ assert not batcher.speculative_enabled
+
+ await batcher.submit(_make_item(text="m1"))
+ await asyncio.sleep(0.05)
+ assert rec.batches == []
+ await asyncio.sleep(0.1)
+ assert len(rec.batches) == 1
+
+
+@pytest.mark.asyncio
+async def test_snapshot_includes_phase() -> None:
+ cfg = MessageBatcherConfig(enabled=True, window_seconds=0.5, pre_send_seconds=0.05)
+ rec = _Recorder()
+ batcher = MessageBatcher(cfg, rec)
+ await batcher.submit(_make_item(text="m1"))
+ snap = batcher.snapshot()
+ assert snap["pending_buckets"] == 1
+ assert snap["buckets"][0]["phase"] in {"typing", "speculating"}
+ assert "speculative_enabled" in snap["config"]
+ assert snap["config"]["speculative_enabled"] is True
+
+
+@pytest.mark.asyncio
+async def test_t1_finalizing_does_not_clobber_new_bucket() -> None:
+ """T1 await inflight 时新消息走 FINALIZING 分支建新桶,finally 不能误删新桶。"""
+ cfg = MessageBatcherConfig(
+ enabled=True,
+ window_seconds=0.1,
+ pre_send_seconds=0.04,
+ strategy="extend",
+ )
+
+ fake_ctx = (
+ _FakeRequestContext()
+ ) # message_sent_this_turn 默认 False,inflight 可被取消
+ # 但本测试要让 T1 fire,inflight 仍未结束 → FINALIZING 分支
+
+ release_inflight = asyncio.Event()
+ inflight_started = asyncio.Event()
+
+ async def flush(items: list[BufferedMessage]) -> None:
+ inflight_started.set()
+ try:
+ await release_inflight.wait()
+ except asyncio.CancelledError:
+ release_inflight.set()
+ raise
+
+ batcher = MessageBatcher(cfg, flush)
+
+ await batcher.submit(_make_item(text="m1"))
+ await asyncio.wait_for(inflight_started.wait(), timeout=0.5)
+ inflight_task = next(iter(batcher._pending_tasks))
+ batcher.register_inflight("group:1", 100, inflight_task, fake_ctx)
+ # 等到 T1 触发,桶切到 FINALIZING 等 inflight
+ await asyncio.sleep(0.12)
+ # 此刻新消息进入 FINALIZING 分支,开新桶
+ await batcher.submit(_make_item(text="m2"))
+ assert batcher.has_buffer("group:1", 100)
+ # 释放 inflight,让 _handle_t1 finally 运行
+ release_inflight.set()
+ await asyncio.sleep(0.05)
+ # 新桶不该被旧 _handle_t1 finally 清掉
+ assert batcher.has_buffer("group:1", 100), "新桶被旧 _handle_t1 finally 误删"
+
+
+@pytest.mark.asyncio
+async def test_speculative_cancelled_before_inflight_registered() -> None:
+ """T2 fire 后 inflight 还没 register 就被新消息抢占:应取消 flush task。"""
+ cfg = MessageBatcherConfig(
+ enabled=True,
+ window_seconds=0.5,
+ pre_send_seconds=0.05,
+ strategy="extend",
+ )
+
+ callback_started = asyncio.Event()
+ callback_cancelled = asyncio.Event()
+
+ async def slow_flush(items: list[BufferedMessage]) -> None:
+ callback_started.set()
+ try:
+ await asyncio.sleep(2.0)
+ except asyncio.CancelledError:
+ callback_cancelled.set()
+ raise
+
+ batcher = MessageBatcher(cfg, slow_flush)
+
+ await batcher.submit(_make_item(text="m1"))
+ # 等 T2 触发并 callback 启动,但**不**调 register_inflight
+ await asyncio.wait_for(callback_started.wait(), timeout=0.5)
+ # 新消息:inflight 是 None,应走"cancel flush task"分支
+ await batcher.submit(_make_item(text="m2"))
+ await asyncio.wait_for(callback_cancelled.wait(), timeout=1.0)
+
+
+@pytest.mark.asyncio
+async def test_speculative_callback_failure_rolls_back_for_t1_retry() -> None:
+ """T2 callback 失败不能丢消息;应回到 TYPING,等 T1 再发一次。"""
+ cfg = MessageBatcherConfig(
+ enabled=True,
+ window_seconds=0.12,
+ pre_send_seconds=0.03,
+ strategy="extend",
+ )
+
+ calls = 0
+ recovered = asyncio.Event()
+
+ async def flaky_flush(items: list[BufferedMessage]) -> None:
+ nonlocal calls
+ calls += 1
+ if calls == 1:
+ raise RuntimeError("temporary enqueue failure")
+ recovered.set()
+
+ batcher = MessageBatcher(cfg, flaky_flush)
+
+ await batcher.submit(_make_item(text="m1"))
+ await asyncio.wait_for(recovered.wait(), timeout=0.5)
+
+ assert calls == 2
+ assert not batcher.has_buffer("group:1", 100)
+
+
+@pytest.mark.asyncio
+async def test_speculative_queued_token_cancelled_before_inflight_registered() -> None:
+ """T2 callback 已完成入队但 inflight 未注册时,新消息应取消旧 token。"""
+ cfg = MessageBatcherConfig(
+ enabled=True,
+ window_seconds=0.5,
+ pre_send_seconds=0.05,
+ strategy="extend",
+ )
+
+ callbacks = 0
+ first_callback_done = asyncio.Event()
+ second_callback_done = asyncio.Event()
+ seen_tokens: list[BatchDispatchToken | None] = []
+
+ async def enqueue_only(items: list[BufferedMessage]) -> None:
+ nonlocal callbacks
+ callbacks += 1
+ seen_tokens.append(items[0].batch_token)
+ if callbacks == 1:
+ first_callback_done.set()
+ elif callbacks == 2:
+ second_callback_done.set()
+
+ batcher = MessageBatcher(cfg, enqueue_only)
+
+ await batcher.submit(_make_item(text="m1"))
+ await asyncio.wait_for(first_callback_done.wait(), timeout=0.5)
+ old_token = seen_tokens[0]
+ assert old_token is not None
+ assert old_token.speculative is True
+ assert old_token.cancelled is False
+
+ await batcher.submit(_make_item(text="m2"))
+ assert old_token.cancelled is True
+
+ await asyncio.wait_for(second_callback_done.wait(), timeout=0.5)
+ new_token = seen_tokens[1]
+ assert new_token is not None
+ assert new_token is not old_token
+ assert new_token.cancelled is False
+
+
+@pytest.mark.asyncio
+async def test_stale_unregister_does_not_clear_new_inflight() -> None:
+ """旧 inflight 的 finally 不能把新一轮已注册的 inflight 清掉。"""
+ cfg = MessageBatcherConfig(
+ enabled=True,
+ window_seconds=0.4,
+ pre_send_seconds=0.05,
+ strategy="extend",
+ )
+
+ async def enqueue_only(items: list[BufferedMessage]) -> None:
+ return None
+
+ batcher = MessageBatcher(cfg, enqueue_only)
+ fake_ctx = _FakeRequestContext()
+ old_task = asyncio.create_task(asyncio.sleep(10.0))
+ new_task = asyncio.create_task(asyncio.sleep(10.0))
+
+ try:
+ await batcher.submit(_make_item(text="m1"))
+ await asyncio.sleep(0.08)
+ batcher.register_inflight("group:1", 100, old_task, fake_ctx)
+
+ await batcher.submit(_make_item(text="m2"))
+ await asyncio.sleep(0.08)
+ batcher.register_inflight("group:1", 100, new_task, fake_ctx)
+
+ batcher.unregister_inflight("group:1", 100, old_task)
+ snap = batcher.snapshot()
+ assert snap["buckets"][0]["has_inflight"] is True
+ finally:
+ old_task.cancel()
+ new_task.cancel()
+ await asyncio.gather(old_task, new_task, return_exceptions=True)
+
+
+@pytest.mark.asyncio
+async def test_flush_all_loops_until_concurrent_bucket_is_flushed() -> None:
+ """flush_all 快照后若 callback 又创建新桶,也应继续清空。"""
+ cfg = MessageBatcherConfig(enabled=True, window_seconds=10.0)
+ batches: list[list[str]] = []
+ injected = False
+ batcher: MessageBatcher
+
+ async def callback(items: list[BufferedMessage]) -> None:
+ nonlocal injected
+ batches.append([item.text for item in items])
+ if not injected:
+ injected = True
+ await batcher.submit(_make_item(sender_id=101, text="late"))
+
+ batcher = MessageBatcher(cfg, callback)
+
+ await batcher.submit(_make_item(sender_id=100, text="first"))
+ await batcher.flush_all()
+
+ assert batches == [["first"], ["late"]]
+ assert batcher.snapshot()["pending_buckets"] == 0
+
+
+@pytest.mark.asyncio
+async def test_submit_after_flush_all_dispatches_immediately() -> None:
+ """进入关停模式后新消息不再建桶,避免 flush_all 与 submit 互相追逐。"""
+ cfg = MessageBatcherConfig(enabled=True, window_seconds=10.0)
+ rec = _Recorder()
+ batcher = MessageBatcher(cfg, rec)
+
+ await batcher.submit(_make_item(text="before-shutdown"))
+ await batcher.flush_all()
+ await batcher.submit(_make_item(text="after-shutdown"))
+
+ assert [[item.text for item in batch] for batch in rec.batches] == [
+ ["before-shutdown"],
+ ["after-shutdown"],
+ ]
+ snap = batcher.snapshot()
+ assert snap["pending_buckets"] == 0
+ assert snap["config"]["shutdown"] is True
+
+
+@pytest.mark.asyncio
+async def test_regular_callback_failure_restores_for_retry() -> None:
+ cfg = MessageBatcherConfig(enabled=True, window_seconds=0.03, strategy="extend")
+ calls = 0
+ recovered = asyncio.Event()
+
+ async def flaky_flush(items: list[BufferedMessage]) -> None:
+ nonlocal calls
+ calls += 1
+ if calls == 1:
+ raise RuntimeError("temporary enqueue failure")
+ assert [item.text for item in items] == ["m1"]
+ recovered.set()
+
+ batcher = MessageBatcher(cfg, flaky_flush)
+
+ await batcher.submit(_make_item(text="m1"))
+ await asyncio.wait_for(recovered.wait(), timeout=0.5)
+
+ assert calls == 2
+ assert not batcher.has_buffer("group:1", 100)
+
+
+@pytest.mark.asyncio
+async def test_immediate_callback_failure_restores_for_retry() -> None:
+ cfg = MessageBatcherConfig(
+ enabled=True,
+ window_seconds=0.03,
+ max_messages_per_batch=2,
+ )
+ calls = 0
+ recovered = asyncio.Event()
+
+ async def flaky_flush(items: list[BufferedMessage]) -> None:
+ nonlocal calls
+ calls += 1
+ if calls == 1:
+ raise RuntimeError("temporary enqueue failure")
+ assert [item.text for item in items] == ["x", "y"]
+ recovered.set()
+
+ batcher = MessageBatcher(cfg, flaky_flush)
+
+ await batcher.submit(_make_item(text="x"))
+ await batcher.submit(_make_item(text="y"))
+ await asyncio.wait_for(recovered.wait(), timeout=0.5)
+
+ assert calls == 2
+ assert not batcher.has_buffer("group:1", 100)
+
+
+@pytest.mark.asyncio
+async def test_flush_all_callback_failure_raises_and_keeps_buffer() -> None:
+ cfg = MessageBatcherConfig(enabled=True, window_seconds=10.0)
+
+ async def failing_flush(items: list[BufferedMessage]) -> None:
+ raise RuntimeError("temporary enqueue failure")
+
+ batcher = MessageBatcher(cfg, failing_flush)
+
+ await batcher.submit(_make_item(text="m1"))
+
+ with pytest.raises(RuntimeError, match="message batcher flush callback failed"):
+ await batcher.flush_all()
+
+ assert batcher.has_buffer("group:1", 100)
diff --git a/tests/test_message_batcher_integration.py b/tests/test_message_batcher_integration.py
new file mode 100644
index 00000000..d4469e6c
--- /dev/null
+++ b/tests/test_message_batcher_integration.py
@@ -0,0 +1,356 @@
+"""MessageBatcher + AICoordinator 集成行为测试。
+
+不走 handlers,直接验证:
+- 同 sender 短时连续消息合并到同一队列请求;
+- 队列优先级:首条 @bot 整批走 mention;buffer 已存在时新条 @bot 单独立即处理;
+- 拍一拍永远旁路;
+- 私聊连续合并到 add_private_request。
+"""
+
+from __future__ import annotations
+
+import asyncio
+from types import SimpleNamespace
+from typing import Any, cast
+from unittest.mock import AsyncMock
+
+import pytest
+
+from Undefined.config.models import MessageBatcherConfig
+from Undefined.handlers import MessageHandler
+from Undefined.services.ai_coordinator import AICoordinator
+from Undefined.services.message_batcher import BatchDispatchToken, MessageBatcher
+
+
+def _make_coordinator(
+ *,
+ superadmin_qq: int = 99999,
+ enabled: bool = True,
+ window_seconds: float = 0.1,
+ group_enabled: bool = True,
+ private_enabled: bool = True,
+) -> tuple[Any, SimpleNamespace, MessageBatcher]:
+ coordinator: Any = object.__new__(AICoordinator)
+ queue_manager = SimpleNamespace(
+ add_group_superadmin_request=AsyncMock(),
+ add_group_mention_request=AsyncMock(),
+ add_group_normal_request=AsyncMock(),
+ add_superadmin_request=AsyncMock(),
+ add_private_request=AsyncMock(),
+ )
+ coordinator.config = SimpleNamespace(
+ superadmin_qq=superadmin_qq,
+ chat_model=SimpleNamespace(model_name="chat-model"),
+ )
+ coordinator.security = SimpleNamespace(
+ detect_injection=AsyncMock(return_value=False)
+ )
+ coordinator.history_manager = SimpleNamespace(
+ modify_last_group_message=AsyncMock(),
+ modify_last_private_message=AsyncMock(),
+ )
+ coordinator.queue_manager = queue_manager
+ coordinator._is_at_bot = lambda _content: False
+ coordinator.model_pool = SimpleNamespace(
+ select_chat_config=lambda chat_model, user_id: chat_model
+ )
+
+ cfg = MessageBatcherConfig(
+ enabled=enabled,
+ window_seconds=window_seconds,
+ group_enabled=group_enabled,
+ private_enabled=private_enabled,
+ )
+ batcher = MessageBatcher(cfg, coordinator.handle_batched_dispatch)
+ coordinator._batcher = batcher
+ return coordinator, queue_manager, batcher
+
+
+@pytest.mark.asyncio
+async def test_two_group_messages_merge_into_single_request() -> None:
+ coordinator, qm, _ = _make_coordinator(window_seconds=0.05)
+
+ await coordinator.handle_auto_reply(
+ group_id=12345,
+ sender_id=20001,
+ text="帮我画一只猫",
+ message_content=[],
+ sender_name="user",
+ group_name="测试群",
+ trigger_message_id=1,
+ )
+ await coordinator.handle_auto_reply(
+ group_id=12345,
+ sender_id=20001,
+ text="改成狗",
+ message_content=[],
+ sender_name="user",
+ group_name="测试群",
+ trigger_message_id=2,
+ )
+
+ # 等窗口过期 + 调度
+ await asyncio.sleep(0.25)
+
+ cast(AsyncMock, qm.add_group_normal_request).assert_awaited_once()
+ cast(AsyncMock, qm.add_group_mention_request).assert_not_called()
+ await_args = cast(AsyncMock, qm.add_group_normal_request).await_args
+ assert await_args is not None
+ request_data = await_args.args[0]
+ assert request_data["batched_count"] == 2
+ assert request_data["text"] == "改成狗" # last 文本
+ assert "帮我画一只猫" in request_data["full_question"]
+ assert "改成狗" in request_data["full_question"]
+ assert "【连续消息说明】" in request_data["full_question"]
+ assert "共同构成【当前输入批次】" in request_data["full_question"]
+ assert "不要把同批前几条误判为历史旧任务" in request_data["full_question"]
+
+
+@pytest.mark.asyncio
+async def test_first_at_bot_routes_batch_to_mention_lane() -> None:
+ coordinator, qm, _ = _make_coordinator(window_seconds=0.05)
+ coordinator._is_at_bot = lambda content: (
+ bool(content) and any(seg.get("type") == "at" for seg in content)
+ )
+
+ at_payload = [{"type": "at", "data": {"qq": "self"}}]
+ await coordinator.handle_auto_reply(
+ group_id=1,
+ sender_id=2,
+ text="@bot 帮我画猫",
+ message_content=at_payload,
+ sender_name="u",
+ group_name="g",
+ )
+ await coordinator.handle_auto_reply(
+ group_id=1,
+ sender_id=2,
+ text="改成狗",
+ message_content=[],
+ sender_name="u",
+ group_name="g",
+ )
+ await asyncio.sleep(0.2)
+
+ cast(AsyncMock, qm.add_group_mention_request).assert_awaited_once()
+ cast(AsyncMock, qm.add_group_normal_request).assert_not_called()
+ await_args = cast(AsyncMock, qm.add_group_mention_request).await_args
+ assert await_args is not None
+ req = await_args.args[0]
+ assert req["batched_count"] == 2
+ assert req["is_at_bot"] is True
+ assert "(用户 @ 了你)" in req["full_question"]
+
+
+@pytest.mark.asyncio
+async def test_at_bot_arriving_with_buffer_bypasses_immediately() -> None:
+ coordinator, qm, _ = _make_coordinator(window_seconds=2.0)
+ is_at_calls: list[list[dict[str, Any]]] = []
+
+ def _is_at(content: list[dict[str, Any]]) -> bool:
+ is_at_calls.append(content)
+ return bool(content) and any(seg.get("type") == "at" for seg in content)
+
+ coordinator._is_at_bot = _is_at
+
+ # 1) 普通消息进 buffer
+ await coordinator.handle_auto_reply(
+ group_id=1,
+ sender_id=2,
+ text="hi",
+ message_content=[],
+ sender_name="u",
+ group_name="g",
+ )
+ # 2) 立即来一条 @bot —— 应当旁路单独立即处理
+ await coordinator.handle_auto_reply(
+ group_id=1,
+ sender_id=2,
+ text="@bot 急",
+ message_content=[{"type": "at", "data": {"qq": "self"}}],
+ sender_name="u",
+ group_name="g",
+ )
+
+ # @bot 已立即发车
+ cast(AsyncMock, qm.add_group_mention_request).assert_awaited_once()
+ mention_await = cast(AsyncMock, qm.add_group_mention_request).await_args
+ assert mention_await is not None
+ mention_req = mention_await.args[0]
+ assert mention_req["batched_count"] == 1
+
+ # 普通桶仍未发车
+ cast(AsyncMock, qm.add_group_normal_request).assert_not_called()
+
+
+@pytest.mark.asyncio
+async def test_poke_always_bypasses_batcher() -> None:
+ coordinator, qm, _ = _make_coordinator(window_seconds=2.0)
+
+ await coordinator.handle_auto_reply(
+ group_id=1,
+ sender_id=2,
+ text="(拍一拍)",
+ message_content=[],
+ sender_name="u",
+ group_name="g",
+ is_poke=True,
+ )
+
+ # 拍一拍立即发车
+ cast(AsyncMock, qm.add_group_mention_request).assert_awaited_once()
+
+
+@pytest.mark.asyncio
+async def test_private_consecutive_merge() -> None:
+ coordinator, qm, _ = _make_coordinator(window_seconds=0.05)
+
+ await coordinator.handle_private_reply(
+ user_id=20001,
+ text="第一条",
+ message_content=[],
+ sender_name="u",
+ trigger_message_id=10,
+ )
+ await coordinator.handle_private_reply(
+ user_id=20001,
+ text="第二条",
+ message_content=[],
+ sender_name="u",
+ trigger_message_id=11,
+ )
+ await asyncio.sleep(0.25)
+
+ cast(AsyncMock, qm.add_private_request).assert_awaited_once()
+ await_args = cast(AsyncMock, qm.add_private_request).await_args
+ assert await_args is not None
+ req = await_args.args[0]
+ assert req["batched_count"] == 2
+ assert "第一条" in req["full_question"]
+ assert "第二条" in req["full_question"]
+
+
+@pytest.mark.asyncio
+async def test_disabled_batcher_passes_through_immediately() -> None:
+ coordinator, qm, _ = _make_coordinator(enabled=False)
+
+ await coordinator.handle_auto_reply(
+ group_id=1,
+ sender_id=2,
+ text="hi",
+ message_content=[],
+ sender_name="u",
+ group_name="g",
+ )
+
+ cast(AsyncMock, qm.add_group_normal_request).assert_awaited_once()
+
+
+@pytest.mark.asyncio
+async def test_superadmin_batched_routes_to_superadmin_lane() -> None:
+ coordinator, qm, _ = _make_coordinator(superadmin_qq=10001, window_seconds=0.05)
+
+ await coordinator.handle_auto_reply(
+ group_id=1,
+ sender_id=10001,
+ text="hello",
+ message_content=[],
+ sender_name="admin",
+ group_name="g",
+ )
+ await coordinator.handle_auto_reply(
+ group_id=1,
+ sender_id=10001,
+ text="world",
+ message_content=[],
+ sender_name="admin",
+ group_name="g",
+ )
+ await asyncio.sleep(0.25)
+
+ cast(AsyncMock, qm.add_group_superadmin_request).assert_awaited_once()
+ await_args = cast(AsyncMock, qm.add_group_superadmin_request).await_args
+ assert await_args is not None
+ req = await_args.args[0]
+ assert req["batched_count"] == 2
+
+
+@pytest.mark.asyncio
+async def test_execute_reply_skips_cancelled_batch_token() -> None:
+ coordinator: Any = object.__new__(AICoordinator)
+ execute_auto = AsyncMock()
+ coordinator._execute_auto_reply = execute_auto
+ token = BatchDispatchToken(
+ scope="group:1",
+ sender_id=2,
+ batch_id=1,
+ speculative=True,
+ cancelled=True,
+ )
+
+ await coordinator.execute_reply(
+ {"type": "auto_reply", "_message_batcher_token": token}
+ )
+
+ execute_auto.assert_not_called()
+
+
+@pytest.mark.asyncio
+async def test_message_handler_close_flushes_batcher_then_drains_queue() -> None:
+ handler: Any = object.__new__(MessageHandler)
+ order: list[str] = []
+ handler._background_tasks = set()
+ handler.message_batcher = SimpleNamespace(
+ flush_all=AsyncMock(side_effect=lambda: order.append("flush_batcher"))
+ )
+ queue_manager = SimpleNamespace(
+ drain=AsyncMock(side_effect=lambda: order.append("drain_queue")),
+ stop=AsyncMock(side_effect=lambda: order.append("stop_queue")),
+ )
+ handler.ai_coordinator = SimpleNamespace(queue_manager=queue_manager)
+ handler.history_manager = SimpleNamespace(
+ flush_pending_saves=AsyncMock(side_effect=lambda: order.append("flush_history"))
+ )
+ handler.pipeline_registry = SimpleNamespace(
+ stop_hot_reload=AsyncMock(side_effect=lambda: order.append("stop_pipeline"))
+ )
+
+ await handler.close()
+
+ assert order == [
+ "stop_pipeline",
+ "flush_batcher",
+ "drain_queue",
+ "stop_queue",
+ "flush_history",
+ ]
+
+
+@pytest.mark.asyncio
+async def test_message_handler_flush_command_buffer_respects_disabled_config() -> None:
+ handler: Any = object.__new__(MessageHandler)
+ handler.config = SimpleNamespace(
+ message_batcher=MessageBatcherConfig(flush_on_command=False)
+ )
+ handler.message_batcher = SimpleNamespace(flush_sender=AsyncMock(return_value=True))
+
+ await handler._flush_command_buffer(scope="group:1", sender_id=2)
+
+ cast(AsyncMock, handler.message_batcher.flush_sender).assert_not_called()
+
+
+@pytest.mark.asyncio
+async def test_message_handler_flush_command_buffer_calls_batcher_when_enabled() -> (
+ None
+):
+ handler: Any = object.__new__(MessageHandler)
+ handler.config = SimpleNamespace(
+ message_batcher=MessageBatcherConfig(flush_on_command=True)
+ )
+ handler.message_batcher = SimpleNamespace(flush_sender=AsyncMock(return_value=True))
+
+ await handler._flush_command_buffer(scope="group:1", sender_id=2)
+
+ cast(AsyncMock, handler.message_batcher.flush_sender).assert_awaited_once_with(
+ "group:1", 2
+ )
diff --git a/tests/test_auto_pipeline_registry.py b/tests/test_pipelines_registry.py
similarity index 79%
rename from tests/test_auto_pipeline_registry.py
rename to tests/test_pipelines_registry.py
index 13ee1dd6..7ab68c5b 100644
--- a/tests/test_auto_pipeline_registry.py
+++ b/tests/test_pipelines_registry.py
@@ -6,7 +6,7 @@
import pytest
-from Undefined.skills.auto_pipeline import AutoPipelineRegistry
+from Undefined.skills.pipelines import PipelineRegistry
def _write_pipeline(base_dir: Path) -> None:
@@ -27,12 +27,12 @@ def _write_pipeline(base_dir: Path) -> None:
"""
from __future__ import annotations
-from Undefined.skills.auto_pipeline.models import AutoPipelineDetection
+from Undefined.skills.pipelines.models import PipelineDetection
async def detect(context):
context["events"].append("detect")
- return AutoPipelineDetection(name="example", items=("item",))
+ return PipelineDetection(name="example", items=("item",))
async def process(detection, context):
@@ -43,11 +43,11 @@ async def process(detection, context):
@pytest.mark.asyncio
-async def test_auto_pipeline_registry_loads_and_runs_configured_pipeline(
+async def test_pipelines_registry_loads_and_runs_configured_pipeline(
tmp_path: Path,
) -> None:
_write_pipeline(tmp_path)
- registry = AutoPipelineRegistry(tmp_path)
+ registry = PipelineRegistry(tmp_path)
registry.load_items()
context: dict[str, Any] = {"events": []}
@@ -58,11 +58,12 @@ async def test_auto_pipeline_registry_loads_and_runs_configured_pipeline(
@pytest.mark.asyncio
-async def test_auto_pipeline_registry_initial_async_load_uses_thread(
+async def test_pipelines_registry_initial_async_load_uses_thread(
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
- registry = AutoPipelineRegistry(tmp_path)
+ _write_pipeline(tmp_path)
+ registry = PipelineRegistry(tmp_path)
calls: list[Any] = []
async def _fake_to_thread(func: Any, *args: Any, **kwargs: Any) -> Any:
@@ -79,11 +80,11 @@ async def _fake_to_thread(func: Any, *args: Any, **kwargs: Any) -> Any:
@pytest.mark.asyncio
-async def test_auto_pipeline_reload_loads_items_in_thread(
+async def test_pipelines_reload_loads_items_in_thread(
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
- registry = AutoPipelineRegistry(tmp_path)
+ registry = PipelineRegistry(tmp_path)
calls: list[Any] = []
async def _fake_to_thread(func: Any, *args: Any, **kwargs: Any) -> Any:
diff --git a/tests/test_queue_manager.py b/tests/test_queue_manager.py
index 62baac67..c5b398c5 100644
--- a/tests/test_queue_manager.py
+++ b/tests/test_queue_manager.py
@@ -146,6 +146,46 @@ async def _handler(request: dict[str, Any]) -> None:
await queue_manager.stop()
+@pytest.mark.asyncio
+async def test_drain_waits_for_pending_and_inflight_requests() -> None:
+ queue_manager = QueueManager(ai_request_interval=0.0)
+ first_started = asyncio.Event()
+ release_first = asyncio.Event()
+ handled: list[str] = []
+
+ async def _handler(request: dict[str, Any]) -> None:
+ request_id = str(request["request_id"])
+ handled.append(request_id)
+ if request_id == "first":
+ first_started.set()
+ await release_first.wait()
+
+ queue_manager.start(_handler)
+ await queue_manager.add_private_request(
+ {"type": "private_reply", "request_id": "first"},
+ model_name="chat-model",
+ )
+ await queue_manager.add_private_request(
+ {"type": "private_reply", "request_id": "second"},
+ model_name="chat-model",
+ )
+
+ drain_task = asyncio.create_task(queue_manager.drain())
+ try:
+ await asyncio.wait_for(first_started.wait(), timeout=1.0)
+ assert not drain_task.done()
+ release_first.set()
+ await asyncio.wait_for(drain_task, timeout=1.0)
+ finally:
+ release_first.set()
+ if not drain_task.done():
+ drain_task.cancel()
+ await asyncio.gather(drain_task, return_exceptions=True)
+ await queue_manager.stop()
+
+ assert handled == ["first", "second"]
+
+
@pytest.mark.asyncio
async def test_non_llm_request_failure_is_not_retried_and_snapshot_counts_retry() -> (
None
diff --git a/tests/test_release_notes_script.py b/tests/test_release_notes_script.py
new file mode 100644
index 00000000..29001d1f
--- /dev/null
+++ b/tests/test_release_notes_script.py
@@ -0,0 +1,163 @@
+from __future__ import annotations
+
+import importlib.util
+from pathlib import Path
+import sys
+from types import ModuleType
+from typing import Any, cast
+
+import pytest
+
+
+_SCRIPT_PATH = Path(__file__).resolve().parent.parent / "scripts" / "release_notes.py"
+
+
+def _load_script() -> ModuleType:
+ spec = importlib.util.spec_from_file_location("release_notes_script", _SCRIPT_PATH)
+ if spec is None or spec.loader is None:
+ raise RuntimeError("Could not load release_notes.py")
+ module = importlib.util.module_from_spec(spec)
+ sys.modules[spec.name] = module
+ spec.loader.exec_module(module)
+ return module
+
+
+release_notes = _load_script()
+
+
+def _write_release_project(
+ root: Path,
+ *,
+ build_version: str = "1.2.3",
+ changelog_version: str = "v1.2.3",
+) -> None:
+ (root / "src" / "Undefined").mkdir(parents=True)
+ (root / "apps" / "undefined-console" / "src-tauri").mkdir(parents=True)
+ (root / "pyproject.toml").write_text(
+ f'[project]\nname = "Undefined-bot"\nversion = "{build_version}"\n',
+ encoding="utf-8",
+ )
+ (root / "src" / "Undefined" / "__init__.py").write_text(
+ f'__version__ = "{build_version}"\n',
+ encoding="utf-8",
+ )
+ (root / "apps" / "undefined-console" / "package.json").write_text(
+ f'{{"version":"{build_version}"}}\n',
+ encoding="utf-8",
+ )
+ (root / "apps" / "undefined-console" / "package-lock.json").write_text(
+ f'{{"version":"{build_version}","packages":{{"":{{"version":"{build_version}"}}}}}}\n',
+ encoding="utf-8",
+ )
+ (root / "apps" / "undefined-console" / "src-tauri" / "Cargo.toml").write_text(
+ f'[package]\nname = "undefined-console"\nversion = "{build_version}"\n',
+ encoding="utf-8",
+ )
+ (root / "apps" / "undefined-console" / "src-tauri" / "tauri.conf.json").write_text(
+ f'{{"version":"{build_version}"}}\n',
+ encoding="utf-8",
+ )
+ (root / "CHANGELOG.md").write_text(
+ f"""
+## {changelog_version} 测试版本
+
+这是一段发布说明。
+
+- 变更一
+- 变更二
+""".strip()
+ + "\n",
+ encoding="utf-8",
+ )
+
+
+def test_validate_release_versions_accepts_matching_project(tmp_path: Path) -> None:
+ _write_release_project(tmp_path)
+
+ result = release_notes.validate_release_versions(
+ tag_name="v1.2.3", project_root=tmp_path
+ )
+
+ assert result.version == "1.2.3"
+ assert result.changelog_version == "v1.2.3"
+ assert result.tag_version == "1.2.3"
+ assert {source.name for source in result.sources} >= {
+ "pyproject.toml",
+ "src/Undefined/__init__.py",
+ "apps/undefined-console/package.json",
+ "apps/undefined-console/src-tauri/Cargo.toml",
+ }
+
+
+def test_validate_release_versions_rejects_changelog_mismatch(tmp_path: Path) -> None:
+ _write_release_project(tmp_path, build_version="1.2.3", changelog_version="v1.2.4")
+
+ with pytest.raises(
+ release_notes.ReleaseValidationError, match="CHANGELOG.md latest"
+ ):
+ release_notes.validate_release_versions(
+ tag_name="v1.2.3", project_root=tmp_path
+ )
+
+
+def test_validate_release_versions_rejects_app_manifest_mismatch(
+ tmp_path: Path,
+) -> None:
+ _write_release_project(tmp_path)
+ (tmp_path / "apps" / "undefined-console" / "package.json").write_text(
+ '{"version":"1.2.4"}\n',
+ encoding="utf-8",
+ )
+
+ with pytest.raises(
+ release_notes.ReleaseValidationError, match="package.json=1.2.4"
+ ):
+ release_notes.validate_release_versions(
+ tag_name="v1.2.3", project_root=tmp_path
+ )
+
+
+def test_write_release_notes_uses_latest_changelog_entry(tmp_path: Path) -> None:
+ _write_release_project(tmp_path)
+ output = tmp_path / "release_notes.md"
+
+ entry = release_notes.write_release_notes(
+ output_path=output,
+ tag_name="v1.2.3",
+ project_root=tmp_path,
+ )
+
+ assert entry.version == "v1.2.3"
+ assert output.read_text(encoding="utf-8") == (
+ "## v1.2.3 测试版本\n"
+ "\n"
+ "这是一段发布说明。\n"
+ "\n"
+ "### 变更内容\n"
+ "\n"
+ "- 变更一\n"
+ "- 变更二\n"
+ )
+
+
+def test_cli_notes_writes_output_file(tmp_path: Path) -> None:
+ _write_release_project(tmp_path)
+ output = tmp_path / "notes.md"
+
+ exit_code = cast(
+ Any,
+ release_notes.main,
+ )(
+ [
+ "--project-root",
+ str(tmp_path),
+ "notes",
+ "--tag",
+ "v1.2.3",
+ "--output",
+ str(output),
+ ]
+ )
+
+ assert exit_code == 0
+ assert output.read_text(encoding="utf-8").startswith("## v1.2.3 测试版本")
diff --git a/tests/test_render_cache.py b/tests/test_render_cache.py
new file mode 100644
index 00000000..7ec6b8a0
--- /dev/null
+++ b/tests/test_render_cache.py
@@ -0,0 +1,217 @@
+from __future__ import annotations
+
+import asyncio
+import json
+import time
+from pathlib import Path
+
+import pytest
+
+from Undefined.utils.render_cache import HtmlRenderCache, compute_render_cache_key
+
+
+@pytest.mark.asyncio
+async def test_render_cache_uses_owned_image_copy(tmp_path: Path) -> None:
+ cache = await HtmlRenderCache.create(
+ tmp_path / "index.json", max_entries=10, max_size_mb=1
+ )
+ output_path = tmp_path / "render.png"
+
+ output_path.write_bytes(b"image-a")
+ await cache.put("key-a", output_path, output_path.stat().st_size)
+ cached_a = await cache.get("key-a")
+
+ assert cached_a is not None
+ assert cached_a != output_path
+ assert cached_a.parent == tmp_path / "html"
+ assert cached_a.read_bytes() == b"image-a"
+
+ output_path.write_bytes(b"image-b")
+ await cache.put("key-b", output_path, output_path.stat().st_size)
+ cached_a_again = await cache.get("key-a")
+ cached_b = await cache.get("key-b")
+
+ assert cached_a_again is not None
+ assert cached_b is not None
+ assert cached_a_again.read_bytes() == b"image-a"
+ assert cached_b.read_bytes() == b"image-b"
+
+
+@pytest.mark.asyncio
+async def test_render_cache_ignores_legacy_external_paths(tmp_path: Path) -> None:
+ external = tmp_path / "external.png"
+ external.write_bytes(b"legacy")
+ cache_file = tmp_path / "index.json"
+ now = time.time()
+ cache_file.write_text(
+ json.dumps(
+ {
+ "legacy": {
+ "path": str(external),
+ "size_bytes": external.stat().st_size,
+ "created_at": now,
+ "last_accessed_at": now,
+ }
+ }
+ ),
+ "utf-8",
+ )
+
+ cache = await HtmlRenderCache.create(cache_file, max_entries=10, max_size_mb=1)
+
+ assert await cache.get("legacy") is None
+
+
+@pytest.mark.asyncio
+async def test_render_cache_evicts_least_recently_used_when_entries_exceed_limit(
+ tmp_path: Path,
+) -> None:
+ """超出条目数上限时,按 last_accessed_at 淘汰最久未用项。"""
+ cache = await HtmlRenderCache.create(
+ tmp_path / "index.json",
+ max_entries=2,
+ max_size_mb=10,
+ flush_interval_seconds=0.0,
+ )
+ output_path = tmp_path / "render.png"
+
+ output_path.write_bytes(b"image-a")
+ await cache.put("a", output_path, output_path.stat().st_size)
+ await asyncio.sleep(0.01)
+ output_path.write_bytes(b"image-b")
+ await cache.put("b", output_path, output_path.stat().st_size)
+ # 命中 a,刷新它的 last_accessed_at;之后插入 c 时应淘汰 b
+ await asyncio.sleep(0.01)
+ assert await cache.get("a") is not None
+ await asyncio.sleep(0.01)
+ output_path.write_bytes(b"image-c")
+ await cache.put("c", output_path, output_path.stat().st_size)
+
+ assert await cache.get("a") is not None
+ assert await cache.get("c") is not None
+ assert await cache.get("b") is None
+
+
+@pytest.mark.asyncio
+async def test_render_cache_evicts_when_total_size_exceeds_budget(
+ tmp_path: Path,
+) -> None:
+ """超出总字节上限时按 LRU 淘汰直到回到预算内。"""
+ # max_size_mb=1,但单图允许 600KB;放两张就会超
+ cache = await HtmlRenderCache.create(
+ tmp_path / "index.json",
+ max_entries=10,
+ max_size_mb=1,
+ flush_interval_seconds=0.0,
+ )
+ big_blob = b"x" * (600 * 1024)
+ output_path = tmp_path / "render.png"
+
+ output_path.write_bytes(big_blob)
+ await cache.put("a", output_path, len(big_blob))
+ await asyncio.sleep(0.01)
+ output_path.write_bytes(big_blob)
+ await cache.put("b", output_path, len(big_blob))
+
+ # a 被字节预算淘汰;b 仍在
+ assert await cache.get("a") is None
+ assert await cache.get("b") is not None
+
+
+@pytest.mark.asyncio
+async def test_render_cache_disabled_short_circuits_without_touching_disk(
+ tmp_path: Path,
+) -> None:
+ cache_file = tmp_path / "index.json"
+ cache = await HtmlRenderCache.create(
+ cache_file,
+ max_entries=10,
+ max_size_mb=1,
+ enabled=False,
+ )
+ output_path = tmp_path / "render.png"
+ output_path.write_bytes(b"image")
+
+ await cache.put("k", output_path, output_path.stat().st_size)
+ assert await cache.get("k") is None
+ # 禁用时不应触发元数据落盘
+ assert not cache_file.exists()
+
+
+@pytest.mark.asyncio
+async def test_render_cache_persists_metadata_across_reload(tmp_path: Path) -> None:
+ cache_file = tmp_path / "index.json"
+ cache = await HtmlRenderCache.create(
+ cache_file, max_entries=5, max_size_mb=2, flush_interval_seconds=0.0
+ )
+ output_path = tmp_path / "render.png"
+ output_path.write_bytes(b"persisted")
+
+ await cache.put("persisted", output_path, output_path.stat().st_size)
+ await cache.close()
+
+ # 模拟进程重启:构造新实例从同一文件加载
+ reloaded = await HtmlRenderCache.create(
+ cache_file, max_entries=5, max_size_mb=2, flush_interval_seconds=0.0
+ )
+ cached = await reloaded.get("persisted")
+
+ assert cached is not None
+ assert cached.read_bytes() == b"persisted"
+
+
+@pytest.mark.asyncio
+async def test_render_cache_close_force_flushes_pending_metadata(
+ tmp_path: Path,
+) -> None:
+ """节流期内的 dirty 状态在 close 时应强制落盘。"""
+ cache_file = tmp_path / "index.json"
+ cache = await HtmlRenderCache.create(
+ cache_file, max_entries=5, max_size_mb=2, flush_interval_seconds=999.0
+ )
+ output_path = tmp_path / "render.png"
+ output_path.write_bytes(b"flush-me")
+
+ await cache.put("flush-me", output_path, output_path.stat().st_size)
+ # 未到 flush_interval;元数据仅保留在内存中。close 必须强刷。
+ await cache.close()
+
+ raw = json.loads(cache_file.read_text(encoding="utf-8"))
+ assert "flush-me" in raw
+
+
+@pytest.mark.asyncio
+async def test_render_cache_concurrent_put_keeps_metadata_consistent(
+ tmp_path: Path,
+) -> None:
+ """并发 put 不同 key 时元数据条目数与磁盘文件数一致。"""
+ cache = await HtmlRenderCache.create(
+ tmp_path / "index.json",
+ max_entries=20,
+ max_size_mb=4,
+ flush_interval_seconds=0.0,
+ )
+
+ async def _put(idx: int) -> None:
+ path = tmp_path / f"src_{idx}.png"
+ path.write_bytes(f"img-{idx}".encode())
+ await cache.put(f"k{idx}", path, path.stat().st_size)
+
+ await asyncio.gather(*[_put(i) for i in range(10)])
+
+ for i in range(10):
+ assert await cache.get(f"k{i}") is not None
+
+ image_dir = tmp_path / "html"
+ assert sum(1 for _ in image_dir.iterdir()) == 10
+
+
+def test_compute_render_cache_key_is_deterministic_and_distinct() -> None:
+ a = compute_render_cache_key("x
", 1280, None, None)
+ a_again = compute_render_cache_key("x
", 1280, None, None)
+ b = compute_render_cache_key("y
", 1280, None, None)
+ c = compute_render_cache_key("x
", 1024, None, None)
+
+ assert a == a_again
+ assert a != b
+ assert a != c
diff --git a/tests/test_runtime_api_probes.py b/tests/test_runtime_api_probes.py
index fa32206b..673a1741 100644
--- a/tests/test_runtime_api_probes.py
+++ b/tests/test_runtime_api_probes.py
@@ -1,5 +1,6 @@
from __future__ import annotations
+import asyncio
import json
from types import SimpleNamespace
from typing import Any, cast
@@ -146,6 +147,125 @@ async def test_runtime_internal_probe_includes_group_superadmin_queue_snapshot()
assert payload["queues"]["totals"]["group_superadmin"] == 3
+@pytest.mark.asyncio
+async def test_runtime_internal_probe_includes_all_skill_directory_summaries() -> None:
+ class FakeCommandRegistry:
+ def __init__(self, commands: list[Any]) -> None:
+ self._commands = commands
+
+ def list_commands(self, *, include_hidden: bool = False) -> list[Any]:
+ if include_hidden:
+ return self._commands
+ return [command for command in self._commands if command.show_in_help]
+
+ tool_registry = SimpleNamespace(
+ _items={
+ "get_current_time": SimpleNamespace(loaded=False),
+ "messages.send_message": SimpleNamespace(loaded=True),
+ },
+ get_stats=lambda: {
+ "messages.send_message": SimpleNamespace(count=3, success=2, failure=1)
+ },
+ )
+ agent_registry = SimpleNamespace(
+ _items={"web_agent": SimpleNamespace(loaded=False)},
+ get_stats=lambda: {},
+ )
+ anthropic_registry = SimpleNamespace(
+ _items={"code-review": SimpleNamespace()},
+ get_stats=lambda: {},
+ )
+ pipeline_registry = SimpleNamespace(
+ _items_lock=asyncio.Lock(),
+ _items={
+ "github": SimpleNamespace(order=30, description="GitHub repo cards"),
+ "arxiv": SimpleNamespace(order=10, description="arXiv papers"),
+ },
+ _watch_task=None,
+ )
+ command_dispatcher = SimpleNamespace(
+ command_registry=FakeCommandRegistry(
+ [
+ SimpleNamespace(
+ name="help",
+ handler=None,
+ aliases=[],
+ subcommands={},
+ permission="public",
+ allow_in_private=True,
+ show_in_help=True,
+ ),
+ SimpleNamespace(
+ name="faq",
+ handler=object(),
+ aliases=["f"],
+ subcommands={"ls": object(), "view": object()},
+ permission="public",
+ allow_in_private=False,
+ show_in_help=True,
+ ),
+ ]
+ )
+ )
+ context = RuntimeAPIContext(
+ config_getter=lambda: SimpleNamespace(
+ api=SimpleNamespace(
+ enabled=True,
+ host="127.0.0.1",
+ port=8788,
+ auth_key="changeme",
+ openapi_enabled=True,
+ ),
+ chat_model=SimpleNamespace(
+ model_name="gpt-5.4",
+ api_url="https://api.example.com/v1",
+ api_mode="responses",
+ thinking_enabled=False,
+ thinking_tool_call_compat=True,
+ responses_tool_choice_compat=False,
+ responses_force_stateless_replay=False,
+ reasoning_enabled=True,
+ reasoning_effort="high",
+ ),
+ ),
+ onebot=SimpleNamespace(connection_status=lambda: {}),
+ ai=SimpleNamespace(
+ memory_storage=None,
+ tool_registry=tool_registry,
+ agent_registry=agent_registry,
+ anthropic_skill_registry=anthropic_registry,
+ ),
+ command_dispatcher=command_dispatcher,
+ queue_manager=SimpleNamespace(snapshot=lambda: {}),
+ history_manager=SimpleNamespace(),
+ pipeline_registry=pipeline_registry,
+ )
+ server = RuntimeAPIServer(context, host="127.0.0.1", port=8788)
+
+ request = cast(web.Request, cast(Any, SimpleNamespace()))
+ response = await server._internal_probe_handler(request)
+ response_text = response.text
+ assert response_text is not None
+ payload = json.loads(response_text)
+
+ skills = payload["skills"]
+ assert skills["tools"]["count"] == 2
+ assert skills["toolsets"]["count"] == 1
+ assert skills["toolsets"]["categories"] == [
+ {"name": "messages", "count": 1, "loaded": 1}
+ ]
+ assert skills["agents"]["count"] == 1
+ assert skills["pipelines"]["count"] == 2
+ assert [item["name"] for item in skills["pipelines"]["items"]] == [
+ "arxiv",
+ "github",
+ ]
+ assert skills["commands"]["count"] == 2
+ assert skills["commands"]["aliases"] == 1
+ assert skills["commands"]["subcommands"] == 2
+ assert skills["anthropic_skills"]["loaded"] == 1
+
+
@pytest.mark.asyncio
async def test_runtime_external_probe_skips_naga_model_when_integration_disabled(
monkeypatch: pytest.MonkeyPatch,
diff --git a/tests/test_system_prompt_constraints.py b/tests/test_system_prompt_constraints.py
index 87728678..a6f57ebf 100644
--- a/tests/test_system_prompt_constraints.py
+++ b/tests/test_system_prompt_constraints.py
@@ -1,3 +1,4 @@
+import json
from pathlib import Path
import pytest
@@ -38,6 +39,97 @@ def test_naga_prompt_requires_scope_before_naga_analysis() -> None:
)
+@pytest.mark.parametrize("path", PROMPT_PATHS)
+def test_system_prompts_define_persona_nicknames_and_ownership_bounds(
+ path: Path,
+) -> None:
+ text = path.read_text(encoding="utf-8")
+
+ assert "数字生命" in text
+ assert "Dec 5, 2025" in text
+ assert "2025-12-05" in text
+ assert "没有明确的性别概念" in text
+ assert "希望被认为很好看" in text
+ assert "允许开玩笑" in text
+ assert "被善意开玩笑" in text
+ assert "ud酱" in text
+ assert "上下文明显是在叫你" in text
+ assert "不要冒领任何项目、代码、产品或成果" in text
+ assert "不要说自己是任何项目的开发者、维护者或成员" in text
+ assert "活在数字空间里的自由开发者" not in text
+ assert "资深开发者" not in text
+
+
+def test_naga_prompt_keeps_relationship_contextual_and_non_claiming() -> None:
+ text = Path("res/prompts/undefined_nagaagent.xml").read_text(encoding="utf-8")
+
+ assert "不是 NagaAgent 本体,也不是 NagaAgent 的开发者、维护者或项目成员" in text
+ assert "只有在当前上下文明确涉及 NagaAgent 时" in text
+ assert "如果当前上下文没有明确提到 NagaAgent" in text
+ assert "不要主动提起你与 NagaAgent 的关系" in text
+ assert "不是 NagaAgent,本质上只是由 Null 为你接入" not in text
+
+
+@pytest.mark.parametrize("path", PROMPT_PATHS)
+def test_system_prompts_define_batched_current_input(path: Path) -> None:
+ text = path.read_text(encoding="utf-8")
+
+ assert "MessageBatcher 合并的多条当前 ``" in text
+ assert "共同构成【当前输入批次】" in text
+ assert "同批前几条不是历史旧任务" in text
+ assert "你唯一的主人是【当前输入批次】" in text
+ assert "你唯一的主人是【最后一条消息】" not in text
+ assert "只围绕最后一条消息判断四件事" not in text
+
+
+def test_each_rules_define_batched_current_input() -> None:
+ text = Path("res/IMPORTANT/each.md").read_text(encoding="utf-8")
+
+ assert "当前输入批次定义(适配 MessageBatcher)" in text
+ assert "同批前几条不是历史旧任务" in text
+ assert "当前输入批次之外的历史消息" in text
+
+
+@pytest.mark.parametrize("path", PROMPT_PATHS)
+def test_system_prompts_tell_end_to_record_whole_current_input_batch(
+ path: Path,
+) -> None:
+ text = path.read_text(encoding="utf-8")
+
+ assert "memo / observations 必须覆盖整个【当前输入批次】" in text
+ assert "不要只根据最后一条消息记录" in text
+ assert "end.observations 必须覆盖整批消息中值得留存的信息" in text
+ assert "系统会围绕当前输入批次自动检索相关内容" in text
+ assert "何时应该填写 memo" in text
+ assert "何时应该填写 summary" not in text
+ assert "summary 应该是对未来有帮助的信息" not in text
+
+
+def test_end_tool_schema_mentions_current_input_batch() -> None:
+ schema = json.loads(
+ Path("src/Undefined/skills/tools/end/config.json").read_text(encoding="utf-8")
+ )
+ function = schema["function"]
+ properties = function["parameters"]["properties"]
+ observations = properties["observations"]
+
+ assert "当前输入批次" in function["description"]
+ assert "必须覆盖整批消息内容" in observations["description"]
+ assert "不能只记录最后一条" in observations["description"]
+ assert "summary" not in properties
+ assert "action_summary" not in properties
+ assert "new_info" not in properties
+
+
+def test_historian_prompts_reference_current_input_batch_source() -> None:
+ rewrite = Path("res/prompts/historian_rewrite.md").read_text(encoding="utf-8")
+ merge = Path("res/prompts/historian_profile_merge.md").read_text(encoding="utf-8")
+
+ assert "当前输入批次提取到的一条新记忆" in rewrite
+ assert "当前输入批次原文(触发本轮;连续消息会按时间顺序列出多条)" in rewrite
+ assert "当前输入批次原文" in merge
+
+
@pytest.mark.parametrize("path", PROMPT_PATHS)
def test_system_prompts_keep_proactive_participation_narrow_and_meme_post_reply(
path: Path,
@@ -49,7 +141,9 @@ def test_system_prompts_keep_proactive_participation_narrow_and_meme_post_reply(
in text
)
assert "表情包相关规则只决定“怎么回复”,不单独构成“该不该回复”的参与许可" in text
- assert "只要你已经决定要回复,并且表情包能让表达更像真人" in text
+ assert "只有当本轮回复目标明确是“纯表情包/纯反应图”" in text
+ assert "不要为了“增强语气”在首轮抢先调用 `memes.search_memes`" in text
+ assert "第一轮必须优先把必要文字回复做好并调用 `send_message`" in text
assert "如果本轮既需要文字发言又想配表情包" in text
assert "先调用 `send_message` 发出必要文字" in text
assert "表情包检索可能拖慢首条回复体验" in text
diff --git a/tests/test_webui_management_api.py b/tests/test_webui_management_api.py
index 93912790..2b82f168 100644
--- a/tests/test_webui_management_api.py
+++ b/tests/test_webui_management_api.py
@@ -7,6 +7,7 @@
from aiohttp import web
from Undefined.api import _helpers as runtime_api_helpers
+from Undefined.changelog import ChangelogEntry
from Undefined.webui import app as webui_app
from Undefined.webui.app import create_app
from Undefined.webui.core import SessionStore
@@ -63,6 +64,15 @@ def _json_payload(response: web.StreamResponse) -> dict[str, object]:
return cast(dict[str, object], json.loads(payload_text))
+def _changelog_entry(version: str, title: str) -> ChangelogEntry:
+ return ChangelogEntry(
+ version=version,
+ title=title,
+ summary=f"{title} 摘要",
+ changes=(f"{title} 变更一", f"{title} 变更二"),
+ )
+
+
def test_session_store_issues_and_refreshes_auth_tokens() -> None:
session_store = SessionStore()
@@ -156,6 +166,71 @@ async def _fake_runtime() -> tuple[bool, bool, str]:
assert payload["advice"]
+async def test_changelog_handler_defaults_to_current_version(monkeypatch: Any) -> None:
+ request = _request()
+ monkeypatch.setattr(_system, "check_auth", lambda _request: True)
+ monkeypatch.setattr(_system, "__version__", "1.2.3")
+ monkeypatch.setattr(
+ _system,
+ "list_entries",
+ lambda: (
+ _changelog_entry("v1.3.0", "最新版本"),
+ _changelog_entry("v1.2.3", "当前版本"),
+ ),
+ )
+
+ response = await _system.changelog_handler(cast(web.Request, cast(Any, request)))
+ payload = _json_payload(response)
+
+ assert payload["success"] is True
+ assert payload["current_version"] == "v1.2.3"
+ assert payload["latest_version"] == "v1.3.0"
+ assert payload["selected_version"] == "v1.2.3"
+ assert cast(dict[str, object], payload["entry"])["title"] == "当前版本"
+ assert cast(list[object], payload["versions"])[0] == {
+ "version": "v1.3.0",
+ "title": "最新版本",
+ }
+
+
+async def test_changelog_handler_selects_requested_version(monkeypatch: Any) -> None:
+ request = _request(query={"version": "1.3.0"})
+ monkeypatch.setattr(_system, "check_auth", lambda _request: True)
+ monkeypatch.setattr(_system, "__version__", "1.2.3")
+ monkeypatch.setattr(
+ _system,
+ "list_entries",
+ lambda: (
+ _changelog_entry("v1.3.0", "最新版本"),
+ _changelog_entry("v1.2.3", "当前版本"),
+ ),
+ )
+
+ response = await _system.changelog_handler(cast(web.Request, cast(Any, request)))
+ payload = _json_payload(response)
+
+ assert payload["selected_version"] == "v1.3.0"
+ assert cast(dict[str, object], payload["entry"])["title"] == "最新版本"
+
+
+async def test_changelog_handler_reports_missing_version(monkeypatch: Any) -> None:
+ request = _request(query={"version": "9.9.9"})
+ monkeypatch.setattr(_system, "check_auth", lambda _request: True)
+ monkeypatch.setattr(_system, "__version__", "1.2.3")
+ monkeypatch.setattr(
+ _system,
+ "list_entries",
+ lambda: (_changelog_entry("v1.2.3", "当前版本"),),
+ )
+
+ response = await _system.changelog_handler(cast(web.Request, cast(Any, request)))
+ payload = _json_payload(response)
+
+ assert cast(web.Response, response).status == 404
+ assert payload["success"] is False
+ assert payload["error"] == "未找到版本: v9.9.9"
+
+
async def test_sync_config_template_handler_preview_skips_reload(
monkeypatch: Any,
) -> None:
@@ -267,6 +342,7 @@ def test_create_app_registers_management_routes() -> None:
assert ("POST", "/api/v1/management/auth/login") in routes
assert ("POST", "/api/v1/management/auth/refresh") in routes
assert ("GET", "/api/v1/management/probes/bootstrap") in routes
+ assert ("GET", "/api/v1/management/changelog") in routes
assert ("GET", "/api/v1/management/runtime/meta") in routes
assert ("POST", "/api/v1/management/config/validate") in routes
assert ("POST", "/api/v1/management/bot/start") in routes
diff --git a/uv.lock b/uv.lock
index 29931865..dcd2658b 100644
--- a/uv.lock
+++ b/uv.lock
@@ -4638,7 +4638,7 @@ wheels = [
[[package]]
name = "undefined-bot"
-version = "3.3.3"
+version = "3.4.0"
source = { editable = "." }
dependencies = [
{ name = "aiofiles" },