Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 29 additions & 4 deletions src/agent/cortex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,10 @@ struct BranchTracker {
branch_id: BranchId,
channel_id: ChannelId,
started_at: Instant,
/// Currently set at branch spawn only. To make this truly activity-based,
/// send a ProcessEvent variant from the hook on tool completions and text
/// deltas, then update this field in the cortex event loop.
last_activity_at: Instant,
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -435,12 +439,14 @@ impl HealthRuntimeState {
}

fn track_branch_start(&mut self, branch_id: BranchId, channel_id: ChannelId) {
let now = Instant::now();
self.branch_trackers.insert(
branch_id,
BranchTracker {
branch_id,
channel_id,
started_at: Instant::now(),
started_at: now,
last_activity_at: now,
},
);
}
Expand Down Expand Up @@ -507,7 +513,7 @@ fn parse_structured_success_flag(result: &str) -> Option<bool> {
fn kill_target_last_activity(target: &KillTarget) -> Instant {
match target {
KillTarget::Worker(tracker) => tracker.last_activity_at,
KillTarget::Branch(tracker) => tracker.started_at,
KillTarget::Branch(tracker) => tracker.last_activity_at,
}
}

Expand Down Expand Up @@ -985,7 +991,7 @@ impl Cortex {
.await;

let now = Instant::now();
let (lagged_control, pending_breaker_trips, overdue_workers, overdue_branches) = {
let (lagged_control, pending_breaker_trips, overdue_workers, overdue_branches, active_branches, active_workers) = {
let mut state = self.health_runtime_state.write().await;
let lagged_control = take_lagged_control_flag(&mut state);

Expand All @@ -1011,19 +1017,34 @@ impl Cortex {
state
.branch_trackers
.values()
.filter(|tracker| now.duration_since(tracker.started_at) >= branch_timeout)
.filter(|tracker| now.duration_since(tracker.last_activity_at) >= branch_timeout)
.cloned()
.collect()
Comment on lines 1017 to 1022
};

let active_branches = state.branch_trackers.len();
let active_workers = state.worker_trackers.len();

(
lagged_control,
pending_breaker_trips,
overdue_workers,
overdue_branches,
active_branches,
active_workers,
)
};

if !lagged_control {
tracing::debug!(
active_branches,
active_workers,
overdue_branches = overdue_branches.len(),
overdue_workers = overdue_workers.len(),
"cortex health tick"
);
}

for trip in pending_breaker_trips {
logger.log(
"circuit_breaker_tripped",
Expand All @@ -1045,6 +1066,8 @@ impl Cortex {
"kill_skipped_due_to_lag": true,
"kill_budget": kill_budget,
"pruned_dead_channels": pruned_dead_channels,
"active_branches": active_branches,
"active_workers": active_workers,
})),
);
return Ok(());
Expand Down Expand Up @@ -5044,12 +5067,14 @@ mod tests {
.expect("valid uuid"),
channel_id: Arc::from("channel-a"),
started_at: older,
last_activity_at: older,
};
let branch_newest = BranchTracker {
branch_id: uuid::Uuid::parse_str("00000000-0000-0000-0000-000000000002")
.expect("valid uuid"),
channel_id: Arc::from("channel-a"),
started_at: newer,
last_activity_at: newer,
};

let targets = build_kill_targets(
Expand Down
46 changes: 37 additions & 9 deletions src/hooks/spacebot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,13 @@ impl SpacebotHook {
}
}

/// Timeout for a single LLM completion call (non-streaming).
///
/// Prevents a hung API connection from blocking a branch, compactor, or
/// ingestion process indefinitely. Set to 5 minutes — generous for complex
/// completions but catches genuine connection stalls.
const LLM_CALL_TIMEOUT_SECS: u64 = 300;
Comment on lines +426 to +431

/// Prompt once with the hook attached and no retry loop.
pub async fn prompt_once<M>(
&self,
Expand All @@ -435,11 +442,22 @@ impl SpacebotHook {
{
self.reset_tool_nudge_state();
self.set_tool_nudge_request_active(false);
agent
.prompt(prompt)
.with_history(history)
.with_hook(self.clone())
.await
tokio::time::timeout(
std::time::Duration::from_secs(Self::LLM_CALL_TIMEOUT_SECS),
agent
.prompt(prompt)
.with_history(history)
.with_hook(self.clone()),
)
.await
.map_err(|_| PromptError::CompletionError(
rig::completion::CompletionError::from(
Box::new(std::io::Error::new(
std::io::ErrorKind::TimedOut,
format!("LLM call timed out after {}s (prompt_once)", Self::LLM_CALL_TIMEOUT_SECS)
)) as Box<dyn std::error::Error + Send + Sync>
)
))?
}

/// Prompt once using Rig's streaming path so text/tool deltas reach the hook.
Expand Down Expand Up @@ -495,13 +513,23 @@ impl SpacebotHook {
});
}

let request = agent
.stream_completion(
let request = tokio::time::timeout(
std::time::Duration::from_secs(Self::LLM_CALL_TIMEOUT_SECS),
agent.stream_completion(
current_prompt.clone(),
chat_history[..chat_history.len() - 1].to_vec(),
),
)
.await
.map_err(|_| PromptError::CompletionError(
rig::completion::CompletionError::from(
Box::new(std::io::Error::new(
std::io::ErrorKind::TimedOut,
format!("LLM stream_completion request timed out after {}s", Self::LLM_CALL_TIMEOUT_SECS)
)) as Box<dyn std::error::Error + Send + Sync>
)
.await
.map_err(PromptError::CompletionError)?;
))?
.map_err(PromptError::CompletionError)?;

Comment on lines +516 to 533
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
rg -n -C3 'prompt_once_streaming|tokio::time::timeout|request\.stream\(\)\.await|stream\.next\(\)\.await' src/hooks/spacebot.rs

Repository: spacedriveapp/spacebot

Length of output: 1232


🏁 Script executed:

sed -n '516,560p' src/hooks/spacebot.rs

Repository: spacedriveapp/spacebot

Length of output: 2103


Streaming timeout only covers request creation; stream establishment and reads can still hang indefinitely.

Lines 516–535 wrap agent.stream_completion(...) with a timeout, but lines 536–541 (request.stream().await) and line 543 (stream.next().await) have no timeout protection. A stalled stream after request creation or during reads blocks the entire channel processing loop indefinitely, defeating hung-connection mitigation.

Proposed fix (apply timeout to stream establishment + each stream read)
-            let mut stream = request
-                .stream()
-                .await
-                .map_err(PromptError::CompletionError)?;
+            let mut stream = tokio::time::timeout(
+                std::time::Duration::from_secs(Self::LLM_CALL_TIMEOUT_SECS),
+                request.stream(),
+            )
+            .await
+            .map_err(|_| PromptError::CompletionError(
+                rig::completion::CompletionError::from(
+                    Box::new(std::io::Error::new(
+                        std::io::ErrorKind::TimedOut,
+                        format!(
+                            "LLM stream start timed out after {}s",
+                            Self::LLM_CALL_TIMEOUT_SECS
+                        ),
+                    )) as Box<dyn std::error::Error + Send + Sync>
+                )
+            ))?
+            .map_err(PromptError::CompletionError)?;
@@
-            while let Some(content) = stream.next().await {
+            while let Some(content) = tokio::time::timeout(
+                std::time::Duration::from_secs(Self::LLM_CALL_TIMEOUT_SECS),
+                stream.next(),
+            )
+            .await
+            .map_err(|_| PromptError::CompletionError(
+                rig::completion::CompletionError::from(
+                    Box::new(std::io::Error::new(
+                        std::io::ErrorKind::TimedOut,
+                        format!(
+                            "LLM stream read timed out after {}s",
+                            Self::LLM_CALL_TIMEOUT_SECS
+                        ),
+                    )) as Box<dyn std::error::Error + Send + Sync>
+                )
+            ))? {
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/hooks/spacebot.rs` around lines 516 - 533, The current timeout only wraps
agent.stream_completion(...) so a stalled stream or blocked reads still hang;
update the code to apply tokio::time::timeout using Self::LLM_CALL_TIMEOUT_SECS
around both the stream establishment (the await on request.stream().await) and
each read from the stream (the await on stream.next().await), mapping timeout
errors to PromptError::CompletionError just like the initial request timeout;
ensure you cancel/close the underlying request on timeout and reuse the same
CompletionError creation pattern (rig::completion::CompletionError via Box<dyn
std::error::Error + Send + Sync>) so all timeouts are handled consistently for
agent.stream_completion, request.stream().await, and stream.next().await.

let mut stream = request
.stream()
Expand Down