diff --git a/crates/mcp/src/core/orchestrator.rs b/crates/mcp/src/core/orchestrator.rs index 545350f67..1e956ed0c 100644 --- a/crates/mcp/src/core/orchestrator.rs +++ b/crates/mcp/src/core/orchestrator.rs @@ -317,7 +317,8 @@ impl McpOrchestrator { "LRU evicted dynamic server '{}' (tenant: {:?}) - clearing tools from inventory", key.url, key.tenant_id ); - // Tools are registered by URL, so clear by URL + // Tool entries are registered by URL; pool-presence tracking is + // cleared for matching pool identities inside the inventory. inventory_clone.clear_server_tools(&key.url); }); @@ -355,6 +356,10 @@ impl McpOrchestrator { // Start background refresh task orchestrator.spawn_refresh_handler(refresh_rx); + // Start background reaper for idle pooled connections so the configured + // `pool.idle_timeout` actually has an effect. + orchestrator.spawn_idle_reaper(); + info!( "McpOrchestrator initialized with {} static servers", orchestrator.static_servers.len() @@ -639,6 +644,53 @@ impl McpOrchestrator { } } + /// Spawn a background task that periodically reaps pooled connections idle for + /// longer than `config.pool.idle_timeout`. The reaper fires the pool's eviction + /// callback for every removed entry, which keeps the URL-scoped tool inventory + /// consistent. Disabled (no task spawned) when `idle_timeout == 0`. + fn spawn_idle_reaper(&self) { + let idle_timeout_secs = self.config.pool.idle_timeout; + if idle_timeout_secs == 0 { + debug!("Idle reaper disabled (pool.idle_timeout = 0)"); + return; + } + + let idle_for = Duration::from_secs(idle_timeout_secs); + // Sweep frequently enough that worst-case eviction lag stays bounded, but + // never below 30s to keep wake-ups cheap on quiet deployments. + let sweep_interval = std::cmp::max(idle_for / 4, Duration::from_secs(30)); + + let pool = Arc::clone(&self.connection_pool); + let token = self.shutdown_token.clone(); + + #[expect( + clippy::disallowed_methods, + reason = "idle reaper runs for orchestrator lifetime; shutdown is coordinated via CancellationToken" + )] + tokio::spawn(async move { + let mut ticker = tokio::time::interval(sweep_interval); + ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); + ticker.tick().await; + loop { + tokio::select! { + () = token.cancelled() => { + debug!("Idle reaper shutting down"); + break; + } + _ = ticker.tick() => { + let reaped = pool.evict_idle(idle_for); + if reaped > 0 { + debug!( + "Idle reaper evicted {} pooled MCP connection(s) idle for >= {:?}", + reaped, idle_for + ); + } + } + } + } + }); + } + /// Spawn background handler for inventory refresh requests. fn spawn_refresh_handler(&self, mut rx: mpsc::Receiver) { let token = self.shutdown_token.clone(); // @@ -1318,7 +1370,7 @@ impl McpOrchestrator { pub async fn connect_dynamic_server_with_tenant( &self, config: McpServerConfig, - tenant_id: Option, + tenant_id: Option, ) -> McpResult { use rmcp::{ transport::{ @@ -1331,8 +1383,12 @@ impl McpOrchestrator { let pool_key = PoolKey::from_config(&config, tenant_id); - // Check if already connected with same auth/tenant - if self.connection_pool.contains(&pool_key) { + // Fast path only when this exact pool identity still has inventory. + // Without that invariant, a sibling eviction can clear URL-scoped tools + // while this pool entry still claims discovery completed. + if self.connection_pool.tool_discovery_completed(&pool_key) + && self.tool_inventory.has_server_tools(&pool_key) + { return Ok(pool_key.url.clone()); } @@ -1345,7 +1401,7 @@ impl McpOrchestrator { let client = self .connection_pool - .get_or_create(pool_key, config.clone(), |cfg, _proxy| async move { + .get_or_create(pool_key.clone(), config.clone(), |cfg, _proxy| async move { match &cfg.transport { McpTransport::Streamable { url, @@ -1398,9 +1454,17 @@ impl McpOrchestrator { }) .await?; - // Load tools from the server - // Use server_key (URL) as the tool's server identifier so it matches - // what ensure_request_mcp_client adds to server_keys for filtering + // Another caller may have completed discovery while we were connecting. + // Same inventory invariant as the outer fast path. + if self.connection_pool.tool_discovery_completed(&pool_key) + && inventory_clone.has_server_tools(&pool_key) + { + self.metrics.record_connection_opened(); + return Ok(server_key); + } + + // Load tools from the server. Dynamic servers are not built-in tool hosts. + // If discovery fails, remove the connection so the next request retries cleanly. match client.peer().list_all_tools().await { Ok(tools) => { info!( @@ -1408,13 +1472,41 @@ impl McpOrchestrator { tools.len(), server_key ); + // Drop any stale/partial entries from a prior failed discovery before + // re-populating, so the inventory reflects only the current tool set. + inventory_clone.clear_server_tools(&server_key); for tool in tools { let entry = ToolEntry::from_server_tool(&server_key, tool) .with_category(ToolCategory::Dynamic); - inventory_clone.insert_entry(entry); + inventory_clone.insert_pool_entry(&pool_key, entry); } + self.connection_pool + .mark_tool_discovery_completed(&pool_key); + } + Err(e) => { + // If a prior discovery already succeeded for this pool entry, + // a transient `list_all_tools` failure should not tear the + // connection down or surface as `ConnectionFailed`: the cached + // client is still valid and the existing inventory state + // (possibly empty after a sibling-tenant wipe) is the best we + // have. We keep the entry so the next request can retry. + if self.connection_pool.tool_discovery_completed(&pool_key) { + warn!( + "Re-list of tools failed for '{}': {}; keeping cached connection (prior discovery still recorded)", + server_key, e + ); + self.metrics.record_connection_opened(); + return Ok(server_key); + } + warn!( + "Failed to list tools from '{}': {}; removing pooled connection", + server_key, e + ); + self.connection_pool.remove(&pool_key); + return Err(McpError::ConnectionFailed(format!( + "Tool discovery failed for '{server_key}': {e}" + ))); } - Err(e) => warn!("Failed to list tools from '{}': {}", server_key, e), } self.metrics.record_connection_opened(); diff --git a/crates/mcp/src/core/pool.rs b/crates/mcp/src/core/pool.rs index 449af891a..5cac01635 100644 --- a/crates/mcp/src/core/pool.rs +++ b/crates/mcp/src/core/pool.rs @@ -7,6 +7,7 @@ use std::{ atomic::{AtomicUsize, Ordering}, Arc, }, + time::{Duration, Instant}, }; use lru::LruCache; @@ -14,7 +15,7 @@ use parking_lot::Mutex; use rmcp::{service::RunningService, RoleClient}; use super::config::{McpProxyConfig, McpServerConfig, McpTransport}; -use crate::error::McpResult; +use crate::{error::McpResult, tenant::TenantId}; type McpClient = RunningService; type EvictionCallback = Arc; @@ -26,11 +27,11 @@ type EvictionCallback = Arc; pub struct PoolKey { pub url: String, pub auth_hash: u64, - pub tenant_id: Option, + pub tenant_id: Option, } impl PoolKey { - pub fn new(url: impl Into, auth_hash: u64, tenant_id: Option) -> Self { + pub fn new(url: impl Into, auth_hash: u64, tenant_id: Option) -> Self { Self { url: url.into(), auth_hash, @@ -38,7 +39,7 @@ impl PoolKey { } } - pub fn from_config(config: &McpServerConfig, tenant_id: Option) -> Self { + pub fn from_config(config: &McpServerConfig, tenant_id: Option) -> Self { let (url, auth_hash) = match &config.transport { McpTransport::Streamable { url, @@ -95,11 +96,23 @@ impl PoolKey { #[derive(Clone)] pub(crate) struct CachedConnection { pub client: Arc, + /// Set after a successful `list_all_tools` for this pool entry. The orchestrator + /// combines this with a non-empty pool-scoped inventory check before + /// short-circuiting. + pub tools_discovered: bool, + /// Wall-clock-ish timestamp of the last access through `get` / `get_or_create` / + /// the fast-path inspection methods. Used by `evict_idle` to reap connections + /// that have not been touched for longer than the configured idle timeout. + pub last_used: Instant, } impl CachedConnection { pub fn new(client: Arc) -> Self { - Self { client } + Self { + client, + tools_discovered: false, + last_used: Instant::now(), + } } } @@ -158,7 +171,8 @@ impl McpConnectionPool { { { let mut connections = self.connections.lock(); - if let Some(cached) = connections.get(&key) { + if let Some(cached) = connections.get_mut(&key) { + cached.last_used = Instant::now(); return Ok(Arc::clone(&cached.client)); } } @@ -215,18 +229,88 @@ impl McpConnectionPool { .collect() } - /// Get connection, promoting in LRU. + /// Get connection, promoting in LRU and refreshing the idle timestamp. pub fn get(&self, key: &PoolKey) -> Option> { - self.connections - .lock() - .get(key) - .map(|cached| Arc::clone(&cached.client)) + self.connections.lock().get_mut(key).map(|cached| { + cached.last_used = Instant::now(); + Arc::clone(&cached.client) + }) } pub fn contains(&self, key: &PoolKey) -> bool { self.connections.lock().contains(key) } + /// True when the pool holds this key and tool discovery has completed successfully. + /// Touches `last_used` because callers gate live usage of the connection on this. + pub fn tool_discovery_completed(&self, key: &PoolKey) -> bool { + self.connections.lock().get_mut(key).is_some_and(|cached| { + cached.last_used = Instant::now(); + cached.tools_discovered + }) + } + + /// Mark tool discovery as done for a pooled connection. + pub fn mark_tool_discovery_completed(&self, key: &PoolKey) -> bool { + let mut connections = self.connections.lock(); + if let Some(cached) = connections.get_mut(key) { + let now = Instant::now(); + cached.tools_discovered = true; + cached.last_used = now; + true + } else { + false + } + } + + /// Evict every pooled entry that has been idle for at least `idle_for`. Fires the + /// eviction callback for each removed key so downstream state (tool inventory, + /// metrics) stays consistent. Returns the number of entries reaped. + pub fn evict_idle(&self, idle_for: Duration) -> usize { + let now = Instant::now(); + let evicted: Vec = { + let mut connections = self.connections.lock(); + let stale: Vec = connections + .iter() + .filter(|(_, cached)| now.saturating_duration_since(cached.last_used) >= idle_for) + .map(|(key, _)| key.clone()) + .collect(); + for key in &stale { + connections.pop(key); + } + stale + }; + + let count = evicted.len(); + if count > 0 { + self.connection_count.fetch_sub(count, Ordering::Relaxed); + if let Some(callback) = &self.eviction_callback { + for key in &evicted { + callback(key); + } + } + } + count + } + + /// Remove a connection from the pool by key. Returns true if it was present. + pub fn remove(&self, key: &PoolKey) -> bool { + let (removed, callback) = { + let mut connections = self.connections.lock(); + if connections.pop(key).is_some() { + self.connection_count.fetch_sub(1, Ordering::Relaxed); + (true, self.eviction_callback.clone()) + } else { + (false, None) + } + }; + + if let Some(callback) = callback { + callback(key); + } + removed + } + /// Look up a connection by URL only (backward compatibility). /// /// **O(n)** — performs a linear scan of all pooled connections under the @@ -345,8 +429,31 @@ mod tests { assert_ne!(key_with_token.auth_hash, 0); // Token hashed // With tenant - let key_with_tenant = PoolKey::from_config(&config, Some("tenant-123".to_string())); - assert_eq!(key_with_tenant.tenant_id, Some("tenant-123".to_string())); + let key_with_tenant = PoolKey::from_config(&config, Some(TenantId::from("tenant-123"))); + assert_eq!( + key_with_tenant.tenant_id, + Some(TenantId::from("tenant-123")) + ); + } + + #[test] + fn test_tool_discovery_completed_absent_key() { + let pool = McpConnectionPool::new(); + let key = PoolKey::from_config(&create_test_config("http://localhost:3000"), None); + assert!(!pool.tool_discovery_completed(&key)); + assert!(!pool.mark_tool_discovery_completed(&key)); + } + + #[test] + fn test_evict_idle_noop_on_empty_pool() { + // We cannot safely fabricate `RunningService` stubs in unit + // tests (see the absence of get/insert tests above), so behaviour under load + // is covered by the e2e suite. This guards the bookkeeping for empty pools + // and exercises the public surface. + let pool = McpConnectionPool::new(); + assert_eq!(pool.evict_idle(Duration::from_secs(0)), 0); + assert_eq!(pool.evict_idle(Duration::from_secs(300)), 0); + assert_eq!(pool.len(), 0); } #[test] diff --git a/crates/mcp/src/inventory/index.rs b/crates/mcp/src/inventory/index.rs index 33bfea9ba..1d423c2ee 100644 --- a/crates/mcp/src/inventory/index.rs +++ b/crates/mcp/src/inventory/index.rs @@ -13,7 +13,10 @@ use dashmap::DashMap; use tracing::warn; use super::types::{QualifiedToolName, ToolCategory, ToolEntry}; -use crate::core::config::{Prompt, RawResource, Tool}; +use crate::core::{ + config::{Prompt, RawResource, Tool}, + pool::PoolKey, +}; /// Cached prompt with metadata #[derive(Clone)] @@ -31,8 +34,13 @@ pub(crate) struct CachedResource { pub struct ToolInventory { tools_by_qualified: DashMap, - tools_by_simple_name: DashMap>, + /// Tools indexed by simple (unqualified) name. Multiple servers may register + /// the same simple name; the `HashSet` enforces structural deduplication so + /// repeated re-discovery cannot create duplicate entries even if a caller + /// bypasses the explicit dedup check in `insert_entry`. + tools_by_simple_name: DashMap>, tools_by_server: DashMap>, + tools_by_pool: DashMap>, tools_by_category: DashMap>, aliases: DashMap, prompts: DashMap, @@ -45,6 +53,7 @@ impl ToolInventory { tools_by_qualified: DashMap::new(), tools_by_simple_name: DashMap::new(), tools_by_server: DashMap::new(), + tools_by_pool: DashMap::new(), tools_by_category: DashMap::new(), aliases: DashMap::new(), prompts: DashMap::new(), @@ -60,12 +69,13 @@ impl Default for ToolInventory { } impl ToolInventory { - /// Returns first registered tool on collision. Use `get_tool_qualified()` for specific server. + /// Returns an arbitrary registered tool on collision (HashSet iteration order). + /// Use `get_tool_qualified()` to target a specific server deterministically. pub fn get_tool(&self, tool_name: &str) -> Option<(String, Tool)> { let qualified_names = self.tools_by_simple_name.get(tool_name)?; - let qualified = qualified_names.first()?; + let qualified = qualified_names.iter().next()?.clone(); self.tools_by_qualified - .get(qualified) + .get(&qualified) .map(|entry| (entry.server_key().to_string(), entry.tool.clone())) } @@ -83,6 +93,16 @@ impl ToolInventory { self.insert_entry(entry); } + /// Insert a tool discovered for a specific dynamic pool identity. + pub fn insert_pool_entry(&self, pool_key: &PoolKey, entry: ToolEntry) { + let tool_name = entry.qualified_name.tool_name().to_string(); + self.insert_entry(entry); + self.tools_by_pool + .entry(pool_key.clone()) + .or_default() + .insert(tool_name); + } + /// Insert a full tool entry with all metadata. pub fn insert_entry(&self, entry: ToolEntry) { let qualified = entry.qualified_name.clone(); @@ -109,15 +129,11 @@ impl ToolInventory { // Insert into primary index self.tools_by_qualified.insert(qualified.clone(), entry); - // Update simple name index + // Update simple name index — HashSet guarantees deduplication. self.tools_by_simple_name .entry(tool_name.clone()) - .and_modify(|v| { - if !v.contains(&qualified) { - v.push(qualified.clone()); - } - }) - .or_insert_with(|| vec![qualified]); + .or_default() + .insert(qualified); // Update server index self.tools_by_server @@ -328,8 +344,21 @@ impl ToolInventory { .collect() } + /// True if at least one tool is currently registered for this exact pool identity. + /// + /// Used as a defensive invariant by the dynamic-connect fast path: the pool + /// is keyed by `(url, auth_hash, tenant_id)`, so this check must not let a + /// sibling pool entry's inventory stand in for the current entry. + pub fn has_server_tools(&self, pool_key: &PoolKey) -> bool { + self.tools_by_pool + .get(pool_key) + .is_some_and(|tools| !tools.is_empty()) + } + /// Clear all cached items for a server. Uses server index for O(tools_per_server) removal. pub fn clear_server_tools(&self, server_key: &str) { + self.tools_by_pool.retain(|key, _| key.url != server_key); + if let Some((_, tool_names)) = self.tools_by_server.remove(server_key) { for tool_name in tool_names { let qualified = QualifiedToolName::new(server_key, &tool_name); @@ -344,7 +373,7 @@ impl ToolInventory { // Remove from simple name index if let Some(mut entry) = self.tools_by_simple_name.get_mut(&tool_name) { - entry.retain(|q| q != &qualified); + entry.remove(&qualified); } self.tools_by_simple_name .remove_if(&tool_name, |_, v| v.is_empty()); @@ -379,6 +408,7 @@ impl ToolInventory { self.tools_by_qualified.clear(); self.tools_by_simple_name.clear(); self.tools_by_server.clear(); + self.tools_by_pool.clear(); self.tools_by_category.clear(); self.aliases.clear(); self.prompts.clear(); @@ -539,6 +569,23 @@ mod tests { assert_eq!(tools[0].0, "tool2"); } + #[test] + fn test_has_server_tools_is_pool_scoped() { + let inventory = ToolInventory::new(); + let pool_key = PoolKey::new("server1", 1, None); + let sibling_key = PoolKey::new("server1", 2, None); + let entry = ToolEntry::from_server_tool("server1", create_test_tool("tool1")); + + inventory.insert_pool_entry(&pool_key, entry); + + assert!(inventory.has_server_tools(&pool_key)); + assert!(!inventory.has_server_tools(&sibling_key)); + + inventory.clear_server_tools("server1"); + + assert!(!inventory.has_server_tools(&pool_key)); + } + #[test] fn test_prompt_operations() { let inventory = ToolInventory::new(); @@ -687,9 +734,13 @@ mod tests { // Both stored (counts total tools including collisions) assert_eq!(inventory.counts().0, 2); - // Simple lookup returns first registered + // Simple lookup returns one of the registered servers (HashSet iteration + // order is unspecified; callers must use the qualified API for determinism). let (server, _) = inventory.get_tool("read_file").unwrap(); - assert_eq!(server, "server-a"); + assert!( + server == "server-a" || server == "server-b", + "unexpected server returned from simple lookup: {server}" + ); // Qualified lookup can access both assert!(inventory @@ -722,15 +773,20 @@ mod tests { create_test_tool("read_file"), ); - // Initial state: 2 tools, simple lookup returns server-a + // Initial state: 2 tools, simple lookup returns one of the registered + // servers (HashSet iteration order is unspecified — use the qualified + // API to assert deterministically). assert_eq!(inventory.counts().0, 2); let (server, _) = inventory.get_tool("read_file").unwrap(); - assert_eq!(server, "server-a"); + assert!( + server == "server-a" || server == "server-b", + "unexpected server before clear: {server}" + ); // Clear server-a inventory.clear_server_tools("server-a"); - // After clear: 1 tool, simple lookup should return server-b + // After clear: 1 tool, simple lookup must return server-b (only survivor). assert_eq!(inventory.counts().0, 1); let (server, _) = inventory.get_tool("read_file").unwrap(); assert_eq!(server, "server-b");