From c75f93c43d24a4d54362575eb0c996ee6d47eda4 Mon Sep 17 00:00:00 2001 From: Evgeny Zotov Date: Fri, 10 Apr 2026 20:06:32 +0200 Subject: [PATCH 1/2] fix: add per-call LLM timeout and activity-based branch tracking Prevents hung API connections from blocking branches, channels, and other LLM processes indefinitely. Two changes: 1. Wrap prompt_once and prompt_once_streaming in tokio::time::timeout (300s default). On timeout, returns PromptError::CompletionError which the existing retry/error paths handle naturally. 2. Add last_activity_at to BranchTracker (matching WorkerTracker) so the cortex supervisor can detect stalled branches by activity age rather than just wall-clock time since spawn. Also adds debug-level health tick logging with active/overdue counts for observability. --- src/agent/cortex.rs | 28 +++++++++++++++++++++++--- src/hooks/spacebot.rs | 46 ++++++++++++++++++++++++++++++++++--------- 2 files changed, 62 insertions(+), 12 deletions(-) diff --git a/src/agent/cortex.rs b/src/agent/cortex.rs index 226337dca..dad8601ca 100644 --- a/src/agent/cortex.rs +++ b/src/agent/cortex.rs @@ -354,6 +354,7 @@ struct BranchTracker { branch_id: BranchId, channel_id: ChannelId, started_at: Instant, + last_activity_at: Instant, } #[derive(Debug, Clone)] @@ -435,12 +436,14 @@ impl HealthRuntimeState { } fn track_branch_start(&mut self, branch_id: BranchId, channel_id: ChannelId) { + let now = Instant::now(); self.branch_trackers.insert( branch_id, BranchTracker { branch_id, channel_id, - started_at: Instant::now(), + started_at: now, + last_activity_at: now, }, ); } @@ -985,7 +988,7 @@ impl Cortex { .await; let now = Instant::now(); - let (lagged_control, pending_breaker_trips, overdue_workers, overdue_branches) = { + let (lagged_control, pending_breaker_trips, overdue_workers, overdue_branches, active_branches, active_workers) = { let mut state = self.health_runtime_state.write().await; let lagged_control = take_lagged_control_flag(&mut state); @@ -1011,19 +1014,34 @@ impl Cortex { state .branch_trackers .values() - .filter(|tracker| now.duration_since(tracker.started_at) >= branch_timeout) + .filter(|tracker| now.duration_since(tracker.last_activity_at) >= branch_timeout) .cloned() .collect() }; + let active_branches = state.branch_trackers.len(); + let active_workers = state.worker_trackers.len(); + ( lagged_control, pending_breaker_trips, overdue_workers, overdue_branches, + active_branches, + active_workers, ) }; + if !lagged_control { + tracing::debug!( + active_branches, + active_workers, + overdue_branches = overdue_branches.len(), + overdue_workers = overdue_workers.len(), + "cortex health tick" + ); + } + for trip in pending_breaker_trips { logger.log( "circuit_breaker_tripped", @@ -1045,6 +1063,8 @@ impl Cortex { "kill_skipped_due_to_lag": true, "kill_budget": kill_budget, "pruned_dead_channels": pruned_dead_channels, + "active_branches": active_branches, + "active_workers": active_workers, })), ); return Ok(()); @@ -5044,12 +5064,14 @@ mod tests { .expect("valid uuid"), channel_id: Arc::from("channel-a"), started_at: older, + last_activity_at: older, }; let branch_newest = BranchTracker { branch_id: uuid::Uuid::parse_str("00000000-0000-0000-0000-000000000002") .expect("valid uuid"), channel_id: Arc::from("channel-a"), started_at: newer, + last_activity_at: newer, }; let targets = build_kill_targets( diff --git a/src/hooks/spacebot.rs b/src/hooks/spacebot.rs index 9fbe084d3..084c7bf20 100644 --- a/src/hooks/spacebot.rs +++ b/src/hooks/spacebot.rs @@ -423,6 +423,13 @@ impl SpacebotHook { } } + /// Timeout for a single LLM completion call (non-streaming). + /// + /// Prevents a hung API connection from blocking a branch, compactor, or + /// ingestion process indefinitely. Set to 5 minutes — generous for complex + /// completions but catches genuine connection stalls. + const LLM_CALL_TIMEOUT_SECS: u64 = 300; + /// Prompt once with the hook attached and no retry loop. pub async fn prompt_once( &self, @@ -435,11 +442,22 @@ impl SpacebotHook { { self.reset_tool_nudge_state(); self.set_tool_nudge_request_active(false); - agent - .prompt(prompt) - .with_history(history) - .with_hook(self.clone()) - .await + tokio::time::timeout( + std::time::Duration::from_secs(Self::LLM_CALL_TIMEOUT_SECS), + agent + .prompt(prompt) + .with_history(history) + .with_hook(self.clone()), + ) + .await + .map_err(|_| PromptError::CompletionError( + rig::completion::CompletionError::from( + Box::new(std::io::Error::new( + std::io::ErrorKind::TimedOut, + format!("LLM call timed out after {}s (prompt_once)", Self::LLM_CALL_TIMEOUT_SECS) + )) as Box + ) + ))? } /// Prompt once using Rig's streaming path so text/tool deltas reach the hook. @@ -495,13 +513,23 @@ impl SpacebotHook { }); } - let request = agent - .stream_completion( + let request = tokio::time::timeout( + std::time::Duration::from_secs(Self::LLM_CALL_TIMEOUT_SECS), + agent.stream_completion( current_prompt.clone(), chat_history[..chat_history.len() - 1].to_vec(), + ), + ) + .await + .map_err(|_| PromptError::CompletionError( + rig::completion::CompletionError::from( + Box::new(std::io::Error::new( + std::io::ErrorKind::TimedOut, + format!("LLM stream_completion request timed out after {}s", Self::LLM_CALL_TIMEOUT_SECS) + )) as Box ) - .await - .map_err(PromptError::CompletionError)?; + ))? + .map_err(PromptError::CompletionError)?; let mut stream = request .stream() From cb4cbfcd3b4618a23201c3b771bcbeeb56878e2c Mon Sep 17 00:00:00 2001 From: Evgeny Zotov Date: Fri, 10 Apr 2026 23:20:06 +0200 Subject: [PATCH 2/2] fix: use last_activity_at in kill ordering + add TODO for activity updates MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit kill_target_last_activity was still using started_at for branches while timeout detection had been switched to last_activity_at. Also note that last_activity_at is only set at branch spawn — updating it mid-execution requires ProcessEvent plumbing from the hook. --- src/agent/cortex.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/agent/cortex.rs b/src/agent/cortex.rs index dad8601ca..f8b942bdb 100644 --- a/src/agent/cortex.rs +++ b/src/agent/cortex.rs @@ -354,6 +354,9 @@ struct BranchTracker { branch_id: BranchId, channel_id: ChannelId, started_at: Instant, + /// Currently set at branch spawn only. To make this truly activity-based, + /// send a ProcessEvent variant from the hook on tool completions and text + /// deltas, then update this field in the cortex event loop. last_activity_at: Instant, } @@ -510,7 +513,7 @@ fn parse_structured_success_flag(result: &str) -> Option { fn kill_target_last_activity(target: &KillTarget) -> Instant { match target { KillTarget::Worker(tracker) => tracker.last_activity_at, - KillTarget::Branch(tracker) => tracker.started_at, + KillTarget::Branch(tracker) => tracker.last_activity_at, } }