From 368b6c1f6a5ac6767ed8ea69ef2dc0e6b30091cc Mon Sep 17 00:00:00 2001 From: yishuiliunian Date: Thu, 25 Jun 2026 22:07:19 +0800 Subject: [PATCH] =?UTF-8?q?feat(compaction):=20=E8=B7=A8=E5=8E=8B=E7=BC=A9?= =?UTF-8?q?=E8=BE=B9=E7=95=8C=E6=90=BA=E5=B8=A6=E6=9C=AA=E5=AE=8C=E6=88=90?= =?UTF-8?q?=E4=BB=BB=E5=8A=A1=20+=20=E5=BC=BA=E5=8C=96=20task=20=E5=AF=B9?= =?UTF-8?q?=E8=B4=A6=E6=8F=90=E7=A4=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit cron 驱动的长跑 session 里,agent 标了 in_progress 的任务在一次压缩后 就从上下文消失(TaskCreate/TaskUpdate 记录被摘要掉),于是再也不回头 关闭它们 —— 生产 session 攒了 6 个孤儿 in_progress + 5 个没消费的 pending。 根因是任务这种持久状态在压缩边界被丢失、且没有机制提醒 agent 对账。 两层修复: - **awareness**:新增 `OutstandingTasksDigest` trait(loopal-tool-api), AgentShared 从 TaskStore 拉未完成任务拼成 digest;compaction 在 CompactionSummary.summary_text 末尾确定性追加(仅压缩时一次,resume 自动覆盖,不做每轮注入)。注入链复刻 one_shot_chat 模式,runtime 依赖 trait 抽象、由 agent-server 注入,无新增跨层依赖。 - **action**:`task-decomposition.md` 加 "Reconcile your task list every turn" 段 —— 关闭已完成、绝不遗留 in_progress、drain/prune pending、 resumed/scheduled 轮次先 TaskList。 测试:`outstanding_digest` 格式化单测(非完成/状态/active_form/空→None) + e2e 证明 digest 端到端进入 CompactionSummary。全量 94/94、clippy 零警告。 --- .../src/agent_loop_params_factory.rs | 6 +- crates/loopal-agent-server/src/agent_setup.rs | 2 + crates/loopal-agent/src/lib.rs | 1 + .../src/outstanding_tasks_impl.rs | 79 +++++++++++++++++++ .../prompts/tools/task-decomposition.md | 12 +++ .../src/agent_loop/compaction_run.rs | 14 +++- .../loopal-runtime/src/agent_loop/params.rs | 5 +- .../src/agent_loop/params_builder.rs | 11 ++- .../compact_tasks_carryover_e2e_test.rs | 52 ++++++++++++ crates/loopal-runtime/tests/agent_loop/mod.rs | 1 + crates/loopal-test-support/src/harness.rs | 9 ++- crates/loopal-test-support/src/wiring.rs | 1 + crates/loopal-tool-api/src/lib.rs | 2 + .../loopal-tool-api/src/outstanding_tasks.rs | 12 +++ 14 files changed, 202 insertions(+), 5 deletions(-) create mode 100644 crates/loopal-agent/src/outstanding_tasks_impl.rs create mode 100644 crates/loopal-runtime/tests/agent_loop/compact_tasks_carryover_e2e_test.rs create mode 100644 crates/loopal-tool-api/src/outstanding_tasks.rs diff --git a/crates/loopal-agent-server/src/agent_loop_params_factory.rs b/crates/loopal-agent-server/src/agent_loop_params_factory.rs index 93aceda4..a36e82a1 100644 --- a/crates/loopal-agent-server/src/agent_loop_params_factory.rs +++ b/crates/loopal-agent-server/src/agent_loop_params_factory.rs @@ -7,7 +7,9 @@ use loopal_runtime::{ InterruptHandle, SessionResumeHook, }; use loopal_storage::Session; -use loopal_tool_api::{FetchRefinerPolicy, MemoryChannel, OneShotChatService}; +use loopal_tool_api::{ + FetchRefinerPolicy, MemoryChannel, OneShotChatService, OutstandingTasksDigest, +}; use loopal_turn::Turn; /// Aggregate inputs for [`assemble_agent_loop_params`] — collapses what @@ -29,6 +31,7 @@ pub(crate) struct AgentLoopAssembly { pub memory_channel: Option>, pub one_shot_chat: Option>, pub fetch_refiner_policy: Option>, + pub outstanding_tasks: Option>, pub goal_session: Option>, pub scheduler: Arc, pub decision_cell: loopal_runtime::frontend::DecisionCell, @@ -62,6 +65,7 @@ pub(crate) fn assemble_agent_loop_params(a: AgentLoopAssembly) -> AgentLoopParam Some(p) => builder.fetch_refiner_policy(p), None => builder, }; + let builder = builder.outstanding_tasks_opt(a.outstanding_tasks); let builder = builder.goal_session_opt(a.goal_session); builder.build() } diff --git a/crates/loopal-agent-server/src/agent_setup.rs b/crates/loopal-agent-server/src/agent_setup.rs index 5e30e2d7..2591a6fa 100644 --- a/crates/loopal-agent-server/src/agent_setup.rs +++ b/crates/loopal-agent-server/src/agent_setup.rs @@ -105,6 +105,7 @@ pub async fn build_with_frontend(ctx: AgentSetupContext<'_>) -> anyhow::Result = Arc::new(agent_shared.clone()); let one_shot_chat: Arc = agent_shared.clone(); let fetch_refiner_policy: Arc = agent_shared.clone(); + let outstanding_tasks: Arc = agent_shared.clone(); let skills: Vec<_> = config.skills.values().map(|e| e.skill.clone()).collect(); let skills_summary = loopal_config::format_skills_summary(&skills); let tool_defs = kernel.tool_definitions(); @@ -184,6 +185,7 @@ pub async fn build_with_frontend(ctx: AgentSetupContext<'_>) -> anyhow::Result Option { + outstanding_digest(&self.task_store.list().await) + } +} + +fn outstanding_digest(tasks: &[Task]) -> Option { + let mut lines = Vec::new(); + for t in tasks { + let status = match t.status { + TaskStatus::InProgress => "in_progress", + TaskStatus::Pending => "pending", + TaskStatus::Completed | TaskStatus::Deleted => continue, + }; + let af = t + .active_form + .as_deref() + .map(|a| format!(" — {a}")) + .unwrap_or_default(); + lines.push(format!("- #{} [{}] {}{}", t.id, status, t.subject, af)); + } + if lines.is_empty() { + return None; + } + Some(format!( + "\n\n## Outstanding tasks (carry-forward — reconcile before starting new work)\n{}", + lines.join("\n") + )) +} + +#[cfg(test)] +mod tests { + use super::outstanding_digest; + use crate::types::{Task, TaskStatus}; + + fn task(id: &str, status: TaskStatus, subject: &str, af: Option<&str>) -> Task { + Task { + id: id.into(), + subject: subject.into(), + description: String::new(), + active_form: af.map(String::from), + status, + owner: None, + blocked_by: vec![], + blocks: vec![], + metadata: serde_json::Value::Null, + created_at: String::new(), + } + } + + #[test] + fn lists_only_non_completed_with_status_and_active_form() { + let tasks = vec![ + task("1", TaskStatus::Completed, "done", None), + task("10", TaskStatus::InProgress, "produce", Some("Producing")), + task("60", TaskStatus::Pending, "outreach", None), + task("99", TaskStatus::Deleted, "gone", None), + ]; + let d = outstanding_digest(&tasks).expect("non-empty"); + assert!(d.contains("Outstanding tasks")); + assert!(d.contains("- #10 [in_progress] produce — Producing")); + assert!(d.contains("- #60 [pending] outreach")); + assert!(!d.contains("done"), "completed excluded"); + assert!(!d.contains("gone"), "deleted excluded"); + } + + #[test] + fn none_when_no_outstanding() { + assert!(outstanding_digest(&[task("1", TaskStatus::Completed, "x", None)]).is_none()); + assert!(outstanding_digest(&[]).is_none()); + } +} diff --git a/crates/loopal-prompt-system/prompts/tools/task-decomposition.md b/crates/loopal-prompt-system/prompts/tools/task-decomposition.md index 9f072cb7..7bc16225 100644 --- a/crates/loopal-prompt-system/prompts/tools/task-decomposition.md +++ b/crates/loopal-prompt-system/prompts/tools/task-decomposition.md @@ -23,6 +23,18 @@ If you do not decompose complex tasks into tracked steps, you **will** forget st - **TaskUpdate**: Mark each task as `in_progress` when you begin it, and `completed` as soon as you finish it. Do not batch — mark tasks completed one at a time as you go. - **TaskList**: Check your task list after completing each task to see what remains. +### Reconcile your task list every turn + +Tasks are durable state that outlives any single turn, cron trigger, or context +compaction. Before starting new work — **especially when you resume, are woken +by a schedule, or see an `Outstanding tasks` list carried in after a +compaction** — reconcile what is already open: + +- **Close what is done.** Mark any `in_progress` task you have actually finished as `completed`. A finished task left `in_progress` is a silent leak. +- **Never strand an `in_progress` task.** If you are switching to different work (a new schedule fired, a new request arrived), do NOT leave a half-done task sitting `in_progress`. Either finish it now, or set it back to `pending` (or `deleted` if it is no longer relevant) before moving on. +- **Drain or prune `pending`.** Pick up the highest-priority `pending` task, or delete it if it is stale/superseded. Do not let pending work pile up untouched across many turns. +- Use **TaskList** at the start of a resumed/scheduled turn to see the real current state — do not rely on memory of earlier turns, which may have been compacted away. + ### Rules - Mark each task as completed as soon as you are done with the task. Do not batch up multiple tasks before marking them as completed. diff --git a/crates/loopal-runtime/src/agent_loop/compaction_run.rs b/crates/loopal-runtime/src/agent_loop/compaction_run.rs index 87c35ec6..e17baa91 100644 --- a/crates/loopal-runtime/src/agent_loop/compaction_run.rs +++ b/crates/loopal-runtime/src/agent_loop/compaction_run.rs @@ -104,8 +104,20 @@ impl AgentLoopRunner { .. } = output; + // Carry the not-yet-completed task list across the compaction boundary + // (TaskCreate/TaskUpdate records are otherwise summarized away, making + // the agent forget its in-progress/pending work). + let mut summary_text = summary_msg.text_content(); + let task_digest = match &self.params.outstanding_tasks { + Some(p) => p.outstanding_tasks_digest().await, + None => None, + }; + if let Some(digest) = task_digest { + summary_text.push_str(&digest); + } + if let Err(e) = self.append_step_record(TurnStep::CompactionSummary(CompactionSummary { - summary_text: summary_msg.text_content(), + summary_text, ack_text: ack_msg.text_content(), kept_turn_count, removed_turn_count, diff --git a/crates/loopal-runtime/src/agent_loop/params.rs b/crates/loopal-runtime/src/agent_loop/params.rs index c5df2c49..c9719f9c 100644 --- a/crates/loopal-runtime/src/agent_loop/params.rs +++ b/crates/loopal-runtime/src/agent_loop/params.rs @@ -7,7 +7,9 @@ use loopal_kernel::Kernel; use loopal_protocol::InterruptSignal; use loopal_provider_api::{SharedModelRouter, ThinkingConfig}; use loopal_storage::Session; -use loopal_tool_api::{FetchRefinerPolicy, MemoryChannel, OneShotChatService, PermissionMode}; +use loopal_tool_api::{ + FetchRefinerPolicy, MemoryChannel, OneShotChatService, OutstandingTasksDigest, PermissionMode, +}; use tokio::sync::watch; use crate::frontend::DecisionCell; @@ -120,6 +122,7 @@ pub struct AgentLoopParams { pub memory_channel: Option>, pub one_shot_chat: Option>, pub fetch_refiner_policy: Option>, + pub outstanding_tasks: Option>, pub goal_session: Option>, pub scheduled_rx: Option>, pub harness: HarnessConfig, diff --git a/crates/loopal-runtime/src/agent_loop/params_builder.rs b/crates/loopal-runtime/src/agent_loop/params_builder.rs index 0fdeaa2a..94dc6f48 100644 --- a/crates/loopal-runtime/src/agent_loop/params_builder.rs +++ b/crates/loopal-runtime/src/agent_loop/params_builder.rs @@ -4,7 +4,9 @@ use loopal_config::HarnessConfig; use loopal_context::ContextBudget; use loopal_provider_api::Message; use loopal_storage::Session; -use loopal_tool_api::{FetchRefinerPolicy, MemoryChannel, OneShotChatService}; +use loopal_tool_api::{ + FetchRefinerPolicy, MemoryChannel, OneShotChatService, OutstandingTasksDigest, +}; use loopal_turn::Turn; use super::params::{AgentConfig, AgentDeps, AgentLoopParams, InterruptHandle}; @@ -22,6 +24,7 @@ pub struct AgentLoopParamsBuilder { memory_channel: Option>, one_shot_chat: Option>, fetch_refiner_policy: Option>, + outstanding_tasks: Option>, goal_session: Option>, scheduled_rx: Option>, harness: HarnessConfig, @@ -51,6 +54,7 @@ impl AgentLoopParamsBuilder { memory_channel: None, one_shot_chat: None, fetch_refiner_policy: None, + outstanding_tasks: None, goal_session: None, scheduled_rx: None, harness: HarnessConfig::default(), @@ -89,6 +93,10 @@ impl AgentLoopParamsBuilder { self.fetch_refiner_policy = Some(p); self } + pub fn outstanding_tasks_opt(mut self, t: Option>) -> Self { + self.outstanding_tasks = t; + self + } pub fn goal_session(mut self, g: Arc) -> Self { self.goal_session = Some(g); self @@ -141,6 +149,7 @@ impl AgentLoopParamsBuilder { memory_channel: self.memory_channel, one_shot_chat: self.one_shot_chat, fetch_refiner_policy: self.fetch_refiner_policy, + outstanding_tasks: self.outstanding_tasks, goal_session: self.goal_session, scheduled_rx: self.scheduled_rx, harness: self.harness, diff --git a/crates/loopal-runtime/tests/agent_loop/compact_tasks_carryover_e2e_test.rs b/crates/loopal-runtime/tests/agent_loop/compact_tasks_carryover_e2e_test.rs new file mode 100644 index 00000000..954aaa4f --- /dev/null +++ b/crates/loopal-runtime/tests/agent_loop/compact_tasks_carryover_e2e_test.rs @@ -0,0 +1,52 @@ +use std::sync::Arc; + +use loopal_provider_api::Message; +use loopal_test_support::{HarnessBuilder, chunks}; +use loopal_tool_api::OutstandingTasksDigest; +use loopal_turn::TurnStep; + +struct FixedTasks; + +#[async_trait::async_trait] +impl OutstandingTasksDigest for FixedTasks { + async fn outstanding_tasks_digest(&self) -> Option { + Some("\n\n## Outstanding tasks\n- #42 [in_progress] sentinel task".into()) + } +} + +#[tokio::test] +async fn compaction_summary_carries_outstanding_tasks_forward() { + let mut h = HarnessBuilder::new() + .calls(vec![chunks::text_turn("compacted body")]) + .outstanding_tasks(Arc::new(FixedTasks)) + .messages( + (1..=5) + .map(|i| Message::user(&format!("turn {i}"))) + .collect(), + ) + .build() + .await; + + let result = h.runner.force_compact(None).await; + assert!( + matches!(result, Ok(true)), + "compaction should succeed: {result:?}" + ); + + let summary = h + .runner + .turns + .store() + .turns() + .iter() + .flat_map(|t| &t.body.steps) + .find_map(|s| match s { + TurnStep::CompactionSummary(cs) => Some(cs.summary_text.clone()), + _ => None, + }) + .expect("a CompactionSummary step"); + assert!( + summary.contains("#42 [in_progress] sentinel task"), + "compaction summary must carry the outstanding task list forward; got: {summary}" + ); +} diff --git a/crates/loopal-runtime/tests/agent_loop/mod.rs b/crates/loopal-runtime/tests/agent_loop/mod.rs index e9643a7a..a1f37f6f 100644 --- a/crates/loopal-runtime/tests/agent_loop/mod.rs +++ b/crates/loopal-runtime/tests/agent_loop/mod.rs @@ -63,6 +63,7 @@ mod compact_hooks_e2e_test; mod compact_instructions_e2e_test; mod compact_phases_e2e_test; mod compact_provider_error_e2e_test; +mod compact_tasks_carryover_e2e_test; mod compact_token_sync_test; mod compaction_run_e2e_test; mod cron_e2e_test; diff --git a/crates/loopal-test-support/src/harness.rs b/crates/loopal-test-support/src/harness.rs index 606272f1..709a4822 100644 --- a/crates/loopal-test-support/src/harness.rs +++ b/crates/loopal-test-support/src/harness.rs @@ -14,7 +14,7 @@ use loopal_runtime::AgentMode; use loopal_runtime::agent_loop::AgentLoopRunner; use loopal_runtime::goal::GoalRuntimeSession; use loopal_session::SessionController; -use loopal_tool_api::PermissionMode; +use loopal_tool_api::{OutstandingTasksDigest, PermissionMode}; use crate::fixture::TestFixture; @@ -35,6 +35,7 @@ pub struct HarnessBuilder { pub(crate) kernel_setup: Option>, pub(crate) scheduler: Option>, pub(crate) goal_session: Option>, + pub(crate) outstanding_tasks: Option>, pub(crate) llm_chunk_delay: Option, } @@ -62,10 +63,16 @@ impl HarnessBuilder { kernel_setup: None, scheduler: None, goal_session: None, + outstanding_tasks: None, llm_chunk_delay: None, } } + pub fn outstanding_tasks(mut self, p: Arc) -> Self { + self.outstanding_tasks = Some(p); + self + } + pub fn calls(mut self, c: Vec>>) -> Self { self.calls = c; self diff --git a/crates/loopal-test-support/src/wiring.rs b/crates/loopal-test-support/src/wiring.rs index ab89868a..7de463cc 100644 --- a/crates/loopal-test-support/src/wiring.rs +++ b/crates/loopal-test-support/src/wiring.rs @@ -170,6 +170,7 @@ pub(crate) async fn wire(builder: HarnessBuilder) -> (SpawnedHarness, AgentLoopR .scheduled_rx(scheduled_rx) .scheduler(scheduler_for_params) .goal_session_opt(builder.goal_session) + .outstanding_tasks_opt(builder.outstanding_tasks) .build(); let harness = SpawnedHarness { diff --git a/crates/loopal-tool-api/src/lib.rs b/crates/loopal-tool-api/src/lib.rs index b8a9fdde..98cbcdc9 100644 --- a/crates/loopal-tool-api/src/lib.rs +++ b/crates/loopal-tool-api/src/lib.rs @@ -7,6 +7,7 @@ pub mod head_tail; pub mod input_normalize; pub mod memory_channel; pub mod output_tail; +pub mod outstanding_tasks; pub mod path; pub mod permission; pub mod provider_resolver; @@ -31,6 +32,7 @@ pub use goal_session::{GoalSession, GoalSessionError}; pub use head_tail::HeadTail; pub use memory_channel::MemoryChannel; pub use output_tail::OutputTail; +pub use outstanding_tasks::OutstandingTasksDigest; pub use path::ResolvedPath; pub use permission::{PermissionDecision, PermissionLevel, PermissionMode}; pub use provider_resolver::{FetchRefinerPolicy, OneShotChatError, OneShotChatService}; diff --git a/crates/loopal-tool-api/src/outstanding_tasks.rs b/crates/loopal-tool-api/src/outstanding_tasks.rs new file mode 100644 index 00000000..456d6e91 --- /dev/null +++ b/crates/loopal-tool-api/src/outstanding_tasks.rs @@ -0,0 +1,12 @@ +use async_trait::async_trait; + +/// Surfaces the not-yet-completed task list as a compact markdown digest. +/// +/// The runtime appends this to the compaction summary so the agent does not +/// lose sight of its `in_progress` / `pending` work when the conversational +/// record of `TaskCreate` / `TaskUpdate` is summarized away. +#[async_trait] +pub trait OutstandingTasksDigest: Send + Sync { + /// `None` when there are no outstanding tasks. + async fn outstanding_tasks_digest(&self) -> Option; +}