Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion crates/loopal-agent-server/src/agent_loop_params_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ use loopal_runtime::{
InterruptHandle, SessionResumeHook,
};
use loopal_storage::Session;
use loopal_tool_api::{FetchRefinerPolicy, MemoryChannel, OneShotChatService};
use loopal_tool_api::{
FetchRefinerPolicy, MemoryChannel, OneShotChatService, OutstandingTasksDigest,
};
use loopal_turn::Turn;

/// Aggregate inputs for [`assemble_agent_loop_params`] — collapses what
Expand All @@ -29,6 +31,7 @@ pub(crate) struct AgentLoopAssembly {
pub memory_channel: Option<Arc<dyn MemoryChannel>>,
pub one_shot_chat: Option<Arc<dyn OneShotChatService>>,
pub fetch_refiner_policy: Option<Arc<dyn FetchRefinerPolicy>>,
pub outstanding_tasks: Option<Arc<dyn OutstandingTasksDigest>>,
pub goal_session: Option<Arc<GoalRuntimeSession>>,
pub scheduler: Arc<loopal_scheduler::CronScheduler>,
pub decision_cell: loopal_runtime::frontend::DecisionCell,
Expand Down Expand Up @@ -62,6 +65,7 @@ pub(crate) fn assemble_agent_loop_params(a: AgentLoopAssembly) -> AgentLoopParam
Some(p) => builder.fetch_refiner_policy(p),
None => builder,
};
let builder = builder.outstanding_tasks_opt(a.outstanding_tasks);
let builder = builder.goal_session_opt(a.goal_session);
builder.build()
}
2 changes: 2 additions & 0 deletions crates/loopal-agent-server/src/agent_setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ pub async fn build_with_frontend(ctx: AgentSetupContext<'_>) -> anyhow::Result<A
let shared_any: Arc<dyn std::any::Any + Send + Sync> = Arc::new(agent_shared.clone());
let one_shot_chat: Arc<dyn loopal_tool_api::OneShotChatService> = agent_shared.clone();
let fetch_refiner_policy: Arc<dyn loopal_tool_api::FetchRefinerPolicy> = agent_shared.clone();
let outstanding_tasks: Arc<dyn loopal_tool_api::OutstandingTasksDigest> = agent_shared.clone();
let skills: Vec<_> = config.skills.values().map(|e| e.skill.clone()).collect();
let skills_summary = loopal_config::format_skills_summary(&skills);
let tool_defs = kernel.tool_definitions();
Expand Down Expand Up @@ -184,6 +185,7 @@ pub async fn build_with_frontend(ctx: AgentSetupContext<'_>) -> anyhow::Result<A
memory_channel,
one_shot_chat: Some(one_shot_chat),
fetch_refiner_policy: Some(fetch_refiner_policy),
outstanding_tasks: Some(outstanding_tasks),
goal_session,
scheduler: scheduler.clone(),
decision_cell,
Expand Down
1 change: 1 addition & 0 deletions crates/loopal-agent/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod bridge;
pub mod config;
pub mod outstanding_tasks_impl;
pub mod provider_resolver_impl;
pub mod session_resume_adapters;
pub mod shared;
Expand Down
79 changes: 79 additions & 0 deletions crates/loopal-agent/src/outstanding_tasks_impl.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
use async_trait::async_trait;
use loopal_tool_api::OutstandingTasksDigest;

use crate::shared::AgentShared;
use crate::types::{Task, TaskStatus};

#[async_trait]
impl OutstandingTasksDigest for AgentShared {
async fn outstanding_tasks_digest(&self) -> Option<String> {
outstanding_digest(&self.task_store.list().await)
}
}

fn outstanding_digest(tasks: &[Task]) -> Option<String> {
let mut lines = Vec::new();
for t in tasks {
let status = match t.status {
TaskStatus::InProgress => "in_progress",
TaskStatus::Pending => "pending",
TaskStatus::Completed | TaskStatus::Deleted => continue,
};
let af = t
.active_form
.as_deref()
.map(|a| format!(" — {a}"))
.unwrap_or_default();
lines.push(format!("- #{} [{}] {}{}", t.id, status, t.subject, af));
}
if lines.is_empty() {
return None;
}
Some(format!(
"\n\n## Outstanding tasks (carry-forward — reconcile before starting new work)\n{}",
lines.join("\n")
))
}

#[cfg(test)]
mod tests {
use super::outstanding_digest;
use crate::types::{Task, TaskStatus};

fn task(id: &str, status: TaskStatus, subject: &str, af: Option<&str>) -> Task {
Task {
id: id.into(),
subject: subject.into(),
description: String::new(),
active_form: af.map(String::from),
status,
owner: None,
blocked_by: vec![],
blocks: vec![],
metadata: serde_json::Value::Null,
created_at: String::new(),
}
}

#[test]
fn lists_only_non_completed_with_status_and_active_form() {
let tasks = vec![
task("1", TaskStatus::Completed, "done", None),
task("10", TaskStatus::InProgress, "produce", Some("Producing")),
task("60", TaskStatus::Pending, "outreach", None),
task("99", TaskStatus::Deleted, "gone", None),
];
let d = outstanding_digest(&tasks).expect("non-empty");
assert!(d.contains("Outstanding tasks"));
assert!(d.contains("- #10 [in_progress] produce — Producing"));
assert!(d.contains("- #60 [pending] outreach"));
assert!(!d.contains("done"), "completed excluded");
assert!(!d.contains("gone"), "deleted excluded");
}

#[test]
fn none_when_no_outstanding() {
assert!(outstanding_digest(&[task("1", TaskStatus::Completed, "x", None)]).is_none());
assert!(outstanding_digest(&[]).is_none());
}
}
12 changes: 12 additions & 0 deletions crates/loopal-prompt-system/prompts/tools/task-decomposition.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,18 @@ If you do not decompose complex tasks into tracked steps, you **will** forget st
- **TaskUpdate**: Mark each task as `in_progress` when you begin it, and `completed` as soon as you finish it. Do not batch — mark tasks completed one at a time as you go.
- **TaskList**: Check your task list after completing each task to see what remains.

### Reconcile your task list every turn

Tasks are durable state that outlives any single turn, cron trigger, or context
compaction. Before starting new work — **especially when you resume, are woken
by a schedule, or see an `Outstanding tasks` list carried in after a
compaction** — reconcile what is already open:

- **Close what is done.** Mark any `in_progress` task you have actually finished as `completed`. A finished task left `in_progress` is a silent leak.
- **Never strand an `in_progress` task.** If you are switching to different work (a new schedule fired, a new request arrived), do NOT leave a half-done task sitting `in_progress`. Either finish it now, or set it back to `pending` (or `deleted` if it is no longer relevant) before moving on.
- **Drain or prune `pending`.** Pick up the highest-priority `pending` task, or delete it if it is stale/superseded. Do not let pending work pile up untouched across many turns.
- Use **TaskList** at the start of a resumed/scheduled turn to see the real current state — do not rely on memory of earlier turns, which may have been compacted away.

### Rules

- Mark each task as completed as soon as you are done with the task. Do not batch up multiple tasks before marking them as completed.
Expand Down
14 changes: 13 additions & 1 deletion crates/loopal-runtime/src/agent_loop/compaction_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,20 @@ impl AgentLoopRunner {
..
} = output;

// Carry the not-yet-completed task list across the compaction boundary
// (TaskCreate/TaskUpdate records are otherwise summarized away, making
// the agent forget its in-progress/pending work).
let mut summary_text = summary_msg.text_content();
let task_digest = match &self.params.outstanding_tasks {
Some(p) => p.outstanding_tasks_digest().await,
None => None,
};
if let Some(digest) = task_digest {
summary_text.push_str(&digest);
}

if let Err(e) = self.append_step_record(TurnStep::CompactionSummary(CompactionSummary {
summary_text: summary_msg.text_content(),
summary_text,
ack_text: ack_msg.text_content(),
kept_turn_count,
removed_turn_count,
Expand Down
5 changes: 4 additions & 1 deletion crates/loopal-runtime/src/agent_loop/params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ use loopal_kernel::Kernel;
use loopal_protocol::InterruptSignal;
use loopal_provider_api::{SharedModelRouter, ThinkingConfig};
use loopal_storage::Session;
use loopal_tool_api::{FetchRefinerPolicy, MemoryChannel, OneShotChatService, PermissionMode};
use loopal_tool_api::{
FetchRefinerPolicy, MemoryChannel, OneShotChatService, OutstandingTasksDigest, PermissionMode,
};
use tokio::sync::watch;

use crate::frontend::DecisionCell;
Expand Down Expand Up @@ -120,6 +122,7 @@ pub struct AgentLoopParams {
pub memory_channel: Option<Arc<dyn MemoryChannel>>,
pub one_shot_chat: Option<Arc<dyn OneShotChatService>>,
pub fetch_refiner_policy: Option<Arc<dyn FetchRefinerPolicy>>,
pub outstanding_tasks: Option<Arc<dyn OutstandingTasksDigest>>,
pub goal_session: Option<Arc<crate::goal::GoalRuntimeSession>>,
pub scheduled_rx: Option<tokio::sync::mpsc::Receiver<loopal_protocol::Envelope>>,
pub harness: HarnessConfig,
Expand Down
11 changes: 10 additions & 1 deletion crates/loopal-runtime/src/agent_loop/params_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ use loopal_config::HarnessConfig;
use loopal_context::ContextBudget;
use loopal_provider_api::Message;
use loopal_storage::Session;
use loopal_tool_api::{FetchRefinerPolicy, MemoryChannel, OneShotChatService};
use loopal_tool_api::{
FetchRefinerPolicy, MemoryChannel, OneShotChatService, OutstandingTasksDigest,
};
use loopal_turn::Turn;

use super::params::{AgentConfig, AgentDeps, AgentLoopParams, InterruptHandle};
Expand All @@ -22,6 +24,7 @@ pub struct AgentLoopParamsBuilder {
memory_channel: Option<Arc<dyn MemoryChannel>>,
one_shot_chat: Option<Arc<dyn OneShotChatService>>,
fetch_refiner_policy: Option<Arc<dyn FetchRefinerPolicy>>,
outstanding_tasks: Option<Arc<dyn OutstandingTasksDigest>>,
goal_session: Option<Arc<GoalRuntimeSession>>,
scheduled_rx: Option<tokio::sync::mpsc::Receiver<loopal_protocol::Envelope>>,
harness: HarnessConfig,
Expand Down Expand Up @@ -51,6 +54,7 @@ impl AgentLoopParamsBuilder {
memory_channel: None,
one_shot_chat: None,
fetch_refiner_policy: None,
outstanding_tasks: None,
goal_session: None,
scheduled_rx: None,
harness: HarnessConfig::default(),
Expand Down Expand Up @@ -89,6 +93,10 @@ impl AgentLoopParamsBuilder {
self.fetch_refiner_policy = Some(p);
self
}
pub fn outstanding_tasks_opt(mut self, t: Option<Arc<dyn OutstandingTasksDigest>>) -> Self {
self.outstanding_tasks = t;
self
}
pub fn goal_session(mut self, g: Arc<GoalRuntimeSession>) -> Self {
self.goal_session = Some(g);
self
Expand Down Expand Up @@ -141,6 +149,7 @@ impl AgentLoopParamsBuilder {
memory_channel: self.memory_channel,
one_shot_chat: self.one_shot_chat,
fetch_refiner_policy: self.fetch_refiner_policy,
outstanding_tasks: self.outstanding_tasks,
goal_session: self.goal_session,
scheduled_rx: self.scheduled_rx,
harness: self.harness,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
use std::sync::Arc;

use loopal_provider_api::Message;
use loopal_test_support::{HarnessBuilder, chunks};
use loopal_tool_api::OutstandingTasksDigest;
use loopal_turn::TurnStep;

struct FixedTasks;

#[async_trait::async_trait]
impl OutstandingTasksDigest for FixedTasks {
async fn outstanding_tasks_digest(&self) -> Option<String> {
Some("\n\n## Outstanding tasks\n- #42 [in_progress] sentinel task".into())
}
}

#[tokio::test]
async fn compaction_summary_carries_outstanding_tasks_forward() {
let mut h = HarnessBuilder::new()
.calls(vec![chunks::text_turn("<summary>compacted body</summary>")])
.outstanding_tasks(Arc::new(FixedTasks))
.messages(
(1..=5)
.map(|i| Message::user(&format!("turn {i}")))
.collect(),
)
.build()
.await;

let result = h.runner.force_compact(None).await;
assert!(
matches!(result, Ok(true)),
"compaction should succeed: {result:?}"
);

let summary = h
.runner
.turns
.store()
.turns()
.iter()
.flat_map(|t| &t.body.steps)
.find_map(|s| match s {
TurnStep::CompactionSummary(cs) => Some(cs.summary_text.clone()),
_ => None,
})
.expect("a CompactionSummary step");
assert!(
summary.contains("#42 [in_progress] sentinel task"),
"compaction summary must carry the outstanding task list forward; got: {summary}"
);
}
1 change: 1 addition & 0 deletions crates/loopal-runtime/tests/agent_loop/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ mod compact_hooks_e2e_test;
mod compact_instructions_e2e_test;
mod compact_phases_e2e_test;
mod compact_provider_error_e2e_test;
mod compact_tasks_carryover_e2e_test;
mod compact_token_sync_test;
mod compaction_run_e2e_test;
mod cron_e2e_test;
Expand Down
9 changes: 8 additions & 1 deletion crates/loopal-test-support/src/harness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use loopal_runtime::AgentMode;
use loopal_runtime::agent_loop::AgentLoopRunner;
use loopal_runtime::goal::GoalRuntimeSession;
use loopal_session::SessionController;
use loopal_tool_api::PermissionMode;
use loopal_tool_api::{OutstandingTasksDigest, PermissionMode};

use crate::fixture::TestFixture;

Expand All @@ -35,6 +35,7 @@ pub struct HarnessBuilder {
pub(crate) kernel_setup: Option<Box<dyn FnOnce(&mut Kernel)>>,
pub(crate) scheduler: Option<Arc<loopal_scheduler::CronScheduler>>,
pub(crate) goal_session: Option<Arc<GoalRuntimeSession>>,
pub(crate) outstanding_tasks: Option<Arc<dyn OutstandingTasksDigest>>,
pub(crate) llm_chunk_delay: Option<std::time::Duration>,
}

Expand Down Expand Up @@ -62,10 +63,16 @@ impl HarnessBuilder {
kernel_setup: None,
scheduler: None,
goal_session: None,
outstanding_tasks: None,
llm_chunk_delay: None,
}
}

pub fn outstanding_tasks(mut self, p: Arc<dyn OutstandingTasksDigest>) -> Self {
self.outstanding_tasks = Some(p);
self
}

pub fn calls(mut self, c: Vec<Vec<Result<StreamChunk, LoopalError>>>) -> Self {
self.calls = c;
self
Expand Down
1 change: 1 addition & 0 deletions crates/loopal-test-support/src/wiring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ pub(crate) async fn wire(builder: HarnessBuilder) -> (SpawnedHarness, AgentLoopR
.scheduled_rx(scheduled_rx)
.scheduler(scheduler_for_params)
.goal_session_opt(builder.goal_session)
.outstanding_tasks_opt(builder.outstanding_tasks)
.build();

let harness = SpawnedHarness {
Expand Down
2 changes: 2 additions & 0 deletions crates/loopal-tool-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ pub mod head_tail;
pub mod input_normalize;
pub mod memory_channel;
pub mod output_tail;
pub mod outstanding_tasks;
pub mod path;
pub mod permission;
pub mod provider_resolver;
Expand All @@ -31,6 +32,7 @@ pub use goal_session::{GoalSession, GoalSessionError};
pub use head_tail::HeadTail;
pub use memory_channel::MemoryChannel;
pub use output_tail::OutputTail;
pub use outstanding_tasks::OutstandingTasksDigest;
pub use path::ResolvedPath;
pub use permission::{PermissionDecision, PermissionLevel, PermissionMode};
pub use provider_resolver::{FetchRefinerPolicy, OneShotChatError, OneShotChatService};
Expand Down
12 changes: 12 additions & 0 deletions crates/loopal-tool-api/src/outstanding_tasks.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
use async_trait::async_trait;

/// Surfaces the not-yet-completed task list as a compact markdown digest.
///
/// The runtime appends this to the compaction summary so the agent does not
/// lose sight of its `in_progress` / `pending` work when the conversational
/// record of `TaskCreate` / `TaskUpdate` is summarized away.
#[async_trait]
pub trait OutstandingTasksDigest: Send + Sync {
/// `None` when there are no outstanding tasks.
async fn outstanding_tasks_digest(&self) -> Option<String>;
}
Loading