Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions crates/loopal-agent-hub/src/agent_registry/completion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,16 @@ impl AgentRegistry {
} else {
result.to_string()
};
let content = format!("<agent-result name=\"{child_name}\">\n{body}\n</agent-result>");
// Source carries the child's local view; uplink SNAT stamps the
// origin hub when this completion is delivered to a remote parent.
// Source carries the child's local view (uplink SNAT stamps the origin
// hub on cross-hub delivery). The `<agent-result>` wrapper is rebuilt at
// LLM projection time — the envelope body stays raw so observers render
// it structurally.
let envelope = Envelope::new(
MessageSource::Agent(QualifiedAddress::local(child_name)),
MessageSource::AgentResult {
child: QualifiedAddress::local(child_name),
},
parent.clone(),
content,
body,
);
Some((tx, envelope))
}
Expand Down Expand Up @@ -118,7 +121,8 @@ impl AgentRegistry {
.iter()
.filter(|c| {
self.agents.get(c.as_str()).is_some_and(|a| {
a.info.lifecycle == AgentLifecycle::Running && !a.state.is_shadow() // shadows are remote, can't interrupt locally
a.info.lifecycle == AgentLifecycle::Running && !a.state.is_shadow()
// shadows are remote, can't interrupt locally
})
})
.cloned()
Expand Down
9 changes: 4 additions & 5 deletions crates/loopal-agent-hub/src/finish.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,12 @@ pub async fn finish_and_deliver(
&& parent.is_remote()
&& let Some(ul) = uplink
{
let content = format!("<agent-result name=\"{name}\">\n{output_text}\n</agent-result>");
// Use Agent(local(child)) so uplink SNAT stamps the origin hub.
// System("agent-completed") cannot carry hub info — see review #3.
let envelope = Envelope::new(
MessageSource::Agent(QualifiedAddress::local(name)),
MessageSource::AgentResult {
child: QualifiedAddress::local(name),
},
parent.clone(),
content,
output_text,
);
if let Err(e) = ul.route(&envelope).await {
tracing::warn!(agent = %name, parent = %parent, error = %e,
Expand Down
19 changes: 6 additions & 13 deletions crates/loopal-agent-hub/src/uplink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,13 @@ pub async fn handle_reverse_requests(
let ok = if let Ok(env) =
serde_json::from_value::<loopal_protocol::Envelope>(params)
{
// Remote agent completions arrive with the agent-result
// marker in content. Detect by content (not source tag)
// so it works with the typed Agent source after SNAT.
if let Some(child) = extract_agent_result_name(&env) {
// Remote agent completions arrive as a typed AgentResult
// source (set by the origin hub, survives SNAT). The
// child name is the bare agent segment.
if let loopal_protocol::MessageSource::AgentResult { child } = &env.source {
let output = env.content.text.clone();
crate::finish::deliver_cross_hub_completion(&hub, &child, output).await;
crate::finish::deliver_cross_hub_completion(&hub, &child.agent, output)
.await;
}
// Defense in depth: target should be local at this point
// (MetaHub router consumed the next-hop hub via DNAT).
Expand Down Expand Up @@ -169,11 +170,3 @@ pub async fn handle_reverse_requests(
}
tracing::warn!(hub = %hub_name, "MetaHub reverse handler ended");
}

/// Extract child agent name from `<agent-result name="...">` envelope.
fn extract_agent_result_name(env: &loopal_protocol::Envelope) -> Option<String> {
let text = &env.content.text;
let start = text.find("<agent-result name=\"")? + 20;
let end = text[start..].find('"')? + start;
Some(text[start..end].to_string())
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,15 +97,15 @@ async fn child_completion_delivered_to_parent_via_bridge() {
assert!(
matches!(
envelope.source,
MessageSource::Agent(ref qa) if qa.agent == "child-a" && qa.is_local()
MessageSource::AgentResult { ref child } if child.agent == "child-a" && child.is_local()
),
"source should be Agent(local('child-a')), got: {:?}",
"source should be AgentResult(local('child-a')), got: {:?}",
envelope.source
);
let text = &envelope.content.text;
assert!(
text.contains("child-a") && text.contains("42 issues"),
"should contain child name and result, got: {text}"
text.contains("42 issues"),
"body should be the raw result, got: {text}"
);
}

Expand Down
11 changes: 7 additions & 4 deletions crates/loopal-meta-hub/tests/suite/nat_routing_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,16 +302,19 @@ async fn cross_hub_completion_carries_origin_hub_in_source() {
};
let env: Envelope = serde_json::from_value(params).unwrap();

// Source: Agent(QA{hub=["hub-b"], agent="child"}) — proves SNAT applied.
// Source: AgentResult{child=QA{hub=["hub-b"], agent="child"}} — proves
// SNAT applied to the typed completion source.
assert_eq!(
env.source,
MessageSource::Agent(QualifiedAddress::remote(["hub-b"], "child")),
MessageSource::AgentResult {
child: QualifiedAddress::remote(["hub-b"], "child")
},
"completion source must carry origin hub"
);
// Target: local("parent") — proves DNAT consumed hub-A from the path.
assert_eq!(env.target, QualifiedAddress::local("parent"));
assert!(env.content.text.contains("<agent-result name=\"child\">"));
assert!(env.content.text.contains("ok"));
// Body is raw now — the <agent-result> wrapper is a projection concern.
assert_eq!(env.content.text, "ok");
}

/// Cross-hub spawn: a child registered with a qualified `hub/agent` parent
Expand Down
16 changes: 10 additions & 6 deletions crates/loopal-meta-hub/tests/suite/spawn_completion_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,16 @@ async fn completion_delivery_to_remote_parent() {
let params = match &msg {
Incoming::Request { params, .. } | Incoming::Notification { params, .. } => params,
};
let text = params
.get("content")
.and_then(|c| c.get("text"))
.and_then(|t| t.as_str())
.unwrap_or("");
if text.contains("agent-result") && text.contains("child-worker") {
// Completion is now a typed AgentResult source carrying the child
// name; the body is the raw output, not a wrapped marker.
let is_result = params
.get("source")
.and_then(|s| s.get("AgentResult"))
.and_then(|r| r.get("child"))
.and_then(|c| c.get("agent"))
.and_then(|a| a.as_str())
.is_some_and(|name| name == "child-worker");
if is_result {
return true;
}
}
Expand Down
12 changes: 11 additions & 1 deletion crates/loopal-protocol/src/envelope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ use crate::user_content::UserContent;
pub enum MessageSource {
Human,
Agent(QualifiedAddress),
AgentResult {
child: QualifiedAddress,
},
Channel {
channel: String,
from: QualifiedAddress,
Expand All @@ -27,6 +30,7 @@ impl MessageSource {
match self {
Self::Human => "human".to_string(),
Self::Agent(addr) => addr.to_string(),
Self::AgentResult { child } => child.to_string(),
Self::Channel { from, .. } => from.to_string(),
Self::Scheduled => "scheduled".to_string(),
Self::System(kind) => format!("system:{kind}"),
Expand Down Expand Up @@ -67,7 +71,11 @@ impl MessageSource {
pub fn is_task_boundary(&self) -> bool {
matches!(
self,
Self::Human | Self::Scheduled | Self::Agent(_) | Self::Channel { .. }
Self::Human
| Self::Scheduled
| Self::Agent(_)
| Self::AgentResult { .. }
| Self::Channel { .. }
)
}

Expand All @@ -76,6 +84,7 @@ impl MessageSource {
pub fn prepend_hub(&mut self, self_hub: &str) {
match self {
Self::Agent(addr) => addr.prepend_hub(self_hub.to_string()),
Self::AgentResult { child } => child.prepend_hub(self_hub.to_string()),
Self::Channel { from, .. } => from.prepend_hub(self_hub.to_string()),
_ => {}
}
Expand All @@ -87,6 +96,7 @@ impl MessageSource {
pub fn prepend_hub_if_local(&mut self, self_hub: &str) {
match self {
Self::Agent(addr) => addr.prepend_hub_if_local(self_hub.to_string()),
Self::AgentResult { child } => child.prepend_hub_if_local(self_hub.to_string()),
Self::Channel { from, .. } => from.prepend_hub_if_local(self_hub.to_string()),
_ => {}
}
Expand Down
58 changes: 58 additions & 0 deletions crates/loopal-protocol/tests/suite/envelope_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,3 +248,61 @@ fn test_non_human_sources_are_not_optimistically_rendered() {
.is_optimistically_rendered()
);
}

#[test]
fn test_agent_result_label_uses_child_address() {
let local = MessageSource::AgentResult {
child: QualifiedAddress::local("worker"),
};
assert_eq!(local.label(), "worker");
let remote = MessageSource::AgentResult {
child: QualifiedAddress::remote(["hub-A"], "worker"),
};
assert_eq!(remote.label(), "hub-A/worker");
}

#[test]
fn test_agent_result_is_task_boundary() {
assert!(
MessageSource::AgentResult {
child: QualifiedAddress::local("worker"),
}
.is_task_boundary()
);
}

#[test]
fn test_agent_result_not_optimistically_rendered_nor_ephemeral() {
let src = MessageSource::AgentResult {
child: QualifiedAddress::local("worker"),
};
assert!(!src.is_optimistically_rendered());
assert!(!src.is_ephemeral_in_history());
assert!(!src.wakes_suspended_session());
}

#[test]
fn test_agent_result_snat_stamps_child_hub() {
let mut env = Envelope::new(
MessageSource::AgentResult {
child: QualifiedAddress::local("worker"),
},
"hub-A/parent",
"done",
);
env.apply_snat("hub-B");
let MessageSource::AgentResult { child } = &env.source else {
panic!("expected AgentResult source");
};
assert_eq!(child, &QualifiedAddress::remote(["hub-B"], "worker"));
}

#[test]
fn test_agent_result_serde_roundtrip() {
let src = MessageSource::AgentResult {
child: QualifiedAddress::remote(["hub-A"], "worker"),
};
let json = serde_json::to_string(&src).unwrap();
let back: MessageSource = serde_json::from_str(&json).unwrap();
assert_eq!(src, back);
}
2 changes: 1 addition & 1 deletion crates/loopal-provider-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub use resolver::ProviderResolver;
pub use thinking::*;
pub use wire::{
ContentBlock, ImageSource, Message, MessageOrigin, MessageRole, normalize_messages,
project_turn_to_messages, project_turns_to_messages,
project_turn_to_messages, project_turns_to_messages, trigger_llm_text,
};

// ---------------------------------------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion crates/loopal-provider-api/src/wire/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ pub mod turn_projection;
pub use message::{ContentBlock, ImageSource, Message, MessageRole};
pub use normalize::normalize_messages;
pub use origin::MessageOrigin;
pub use turn_projection::{project_turn_to_messages, project_turns_to_messages};
pub use turn_projection::{project_turn_to_messages, project_turns_to_messages, trigger_llm_text};
3 changes: 3 additions & 0 deletions crates/loopal-provider-api/src/wire/turn_projection/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
mod blocks;
mod compaction;
mod prefix;
mod step;
mod trigger;

pub use self::prefix::trigger_llm_text;

use loopal_turn::{Turn, TurnStep};

use self::step::project_step;
Expand Down
66 changes: 66 additions & 0 deletions crates/loopal-provider-api/src/wire/turn_projection/prefix.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
use loopal_turn::TurnTrigger;

/// SSOT for the LLM-facing text of a turn trigger: applies the source prefix
/// (`[scheduled]`, `[from: ...]`) or `<agent-result>` marker. Returns `None`
/// for triggers that produce no user message (`Resume`). Images on
/// `UserInput` are structural and handled by the caller — this is text only.
pub fn trigger_llm_text(trigger: &TurnTrigger) -> Option<String> {
match trigger {
TurnTrigger::UserInput { content, .. } => Some(content.clone()),
TurnTrigger::Cron { content, .. } => Some(format!("[scheduled] {content}")),
TurnTrigger::Agent { from, content, .. } => Some(format!("[from: {from}] {content}")),
TurnTrigger::AgentResult { from, content, .. } => Some(format!(
"<agent-result name=\"{from}\">\n{content}\n</agent-result>"
)),
TurnTrigger::Channel {
channel,
from,
content,
..
} => Some(format!("[from: #{channel}/{from}] {content}")),
TurnTrigger::GoalContinuation { content, .. } => Some(content.clone()),
TurnTrigger::BackgroundHook { content, .. } => Some(content.clone()),
TurnTrigger::Resume => None,
}
}

#[cfg(test)]
mod tests {
use super::*;

fn agent_result(from: &str, content: &str) -> TurnTrigger {
TurnTrigger::AgentResult {
envelope_id: String::new(),
from: from.into(),
content: content.into(),
}
}

#[test]
fn agent_result_wraps_in_marker() {
assert_eq!(
trigger_llm_text(&agent_result("worker", "done")).unwrap(),
"<agent-result name=\"worker\">\ndone\n</agent-result>"
);
}

#[test]
fn cron_and_agent_carry_source_prefix() {
let cron = TurnTrigger::Cron {
envelope_id: String::new(),
content: "tick".into(),
};
assert_eq!(trigger_llm_text(&cron).unwrap(), "[scheduled] tick");
let agent = TurnTrigger::Agent {
envelope_id: String::new(),
from: "hub/w".into(),
content: "hi".into(),
};
assert_eq!(trigger_llm_text(&agent).unwrap(), "[from: hub/w] hi");
}

#[test]
fn resume_produces_no_text() {
assert_eq!(trigger_llm_text(&TurnTrigger::Resume), None);
}
}
Loading
Loading