diff --git a/src/agent/cortex.rs b/src/agent/cortex.rs index 226337dca..f8b942bdb 100644 --- a/src/agent/cortex.rs +++ b/src/agent/cortex.rs @@ -354,6 +354,10 @@ struct BranchTracker { branch_id: BranchId, channel_id: ChannelId, started_at: Instant, + /// Currently set at branch spawn only. To make this truly activity-based, + /// send a ProcessEvent variant from the hook on tool completions and text + /// deltas, then update this field in the cortex event loop. + last_activity_at: Instant, } #[derive(Debug, Clone)] @@ -435,12 +439,14 @@ impl HealthRuntimeState { } fn track_branch_start(&mut self, branch_id: BranchId, channel_id: ChannelId) { + let now = Instant::now(); self.branch_trackers.insert( branch_id, BranchTracker { branch_id, channel_id, - started_at: Instant::now(), + started_at: now, + last_activity_at: now, }, ); } @@ -507,7 +513,7 @@ fn parse_structured_success_flag(result: &str) -> Option { fn kill_target_last_activity(target: &KillTarget) -> Instant { match target { KillTarget::Worker(tracker) => tracker.last_activity_at, - KillTarget::Branch(tracker) => tracker.started_at, + KillTarget::Branch(tracker) => tracker.last_activity_at, } } @@ -985,7 +991,7 @@ impl Cortex { .await; let now = Instant::now(); - let (lagged_control, pending_breaker_trips, overdue_workers, overdue_branches) = { + let (lagged_control, pending_breaker_trips, overdue_workers, overdue_branches, active_branches, active_workers) = { let mut state = self.health_runtime_state.write().await; let lagged_control = take_lagged_control_flag(&mut state); @@ -1011,19 +1017,34 @@ impl Cortex { state .branch_trackers .values() - .filter(|tracker| now.duration_since(tracker.started_at) >= branch_timeout) + .filter(|tracker| now.duration_since(tracker.last_activity_at) >= branch_timeout) .cloned() .collect() }; + let active_branches = state.branch_trackers.len(); + let active_workers = state.worker_trackers.len(); + ( lagged_control, pending_breaker_trips, overdue_workers, overdue_branches, + active_branches, + active_workers, ) }; + if !lagged_control { + tracing::debug!( + active_branches, + active_workers, + overdue_branches = overdue_branches.len(), + overdue_workers = overdue_workers.len(), + "cortex health tick" + ); + } + for trip in pending_breaker_trips { logger.log( "circuit_breaker_tripped", @@ -1045,6 +1066,8 @@ impl Cortex { "kill_skipped_due_to_lag": true, "kill_budget": kill_budget, "pruned_dead_channels": pruned_dead_channels, + "active_branches": active_branches, + "active_workers": active_workers, })), ); return Ok(()); @@ -5044,12 +5067,14 @@ mod tests { .expect("valid uuid"), channel_id: Arc::from("channel-a"), started_at: older, + last_activity_at: older, }; let branch_newest = BranchTracker { branch_id: uuid::Uuid::parse_str("00000000-0000-0000-0000-000000000002") .expect("valid uuid"), channel_id: Arc::from("channel-a"), started_at: newer, + last_activity_at: newer, }; let targets = build_kill_targets( diff --git a/src/hooks/spacebot.rs b/src/hooks/spacebot.rs index 9fbe084d3..084c7bf20 100644 --- a/src/hooks/spacebot.rs +++ b/src/hooks/spacebot.rs @@ -423,6 +423,13 @@ impl SpacebotHook { } } + /// Timeout for a single LLM completion call (non-streaming). + /// + /// Prevents a hung API connection from blocking a branch, compactor, or + /// ingestion process indefinitely. Set to 5 minutes — generous for complex + /// completions but catches genuine connection stalls. + const LLM_CALL_TIMEOUT_SECS: u64 = 300; + /// Prompt once with the hook attached and no retry loop. pub async fn prompt_once( &self, @@ -435,11 +442,22 @@ impl SpacebotHook { { self.reset_tool_nudge_state(); self.set_tool_nudge_request_active(false); - agent - .prompt(prompt) - .with_history(history) - .with_hook(self.clone()) - .await + tokio::time::timeout( + std::time::Duration::from_secs(Self::LLM_CALL_TIMEOUT_SECS), + agent + .prompt(prompt) + .with_history(history) + .with_hook(self.clone()), + ) + .await + .map_err(|_| PromptError::CompletionError( + rig::completion::CompletionError::from( + Box::new(std::io::Error::new( + std::io::ErrorKind::TimedOut, + format!("LLM call timed out after {}s (prompt_once)", Self::LLM_CALL_TIMEOUT_SECS) + )) as Box + ) + ))? } /// Prompt once using Rig's streaming path so text/tool deltas reach the hook. @@ -495,13 +513,23 @@ impl SpacebotHook { }); } - let request = agent - .stream_completion( + let request = tokio::time::timeout( + std::time::Duration::from_secs(Self::LLM_CALL_TIMEOUT_SECS), + agent.stream_completion( current_prompt.clone(), chat_history[..chat_history.len() - 1].to_vec(), + ), + ) + .await + .map_err(|_| PromptError::CompletionError( + rig::completion::CompletionError::from( + Box::new(std::io::Error::new( + std::io::ErrorKind::TimedOut, + format!("LLM stream_completion request timed out after {}s", Self::LLM_CALL_TIMEOUT_SECS) + )) as Box ) - .await - .map_err(PromptError::CompletionError)?; + ))? + .map_err(PromptError::CompletionError)?; let mut stream = request .stream()