From ab84411a07246e78ffec95ac30421ea87ea5b703 Mon Sep 17 00:00:00 2001 From: MohanKumar21! Date: Fri, 15 May 2026 16:49:46 +0530 Subject: [PATCH 1/4] fix(mcp): evict pooled connections when tool discovery fails Signed-off-by: MohanKumar21! --- crates/mcp/src/core/orchestrator.rs | 33 +++++++++++++++++++++++------ crates/mcp/src/core/pool.rs | 11 ++++++++++ crates/mcp/src/inventory/index.rs | 7 ++++++ 3 files changed, 44 insertions(+), 7 deletions(-) diff --git a/crates/mcp/src/core/orchestrator.rs b/crates/mcp/src/core/orchestrator.rs index 545350f67..55f50b441 100644 --- a/crates/mcp/src/core/orchestrator.rs +++ b/crates/mcp/src/core/orchestrator.rs @@ -1331,8 +1331,12 @@ impl McpOrchestrator { let pool_key = PoolKey::from_config(&config, tenant_id); - // Check if already connected with same auth/tenant - if self.connection_pool.contains(&pool_key) { + // Fast path: connection exists AND tools are already registered in the inventory. + // If the connection is pooled but tools are missing (e.g. prior list_all_tools failed), + // fall through to re-fetch tools using the cached client. + if self.connection_pool.contains(&pool_key) + && self.tool_inventory.has_server_tools(&pool_key.url) + { return Ok(pool_key.url.clone()); } @@ -1345,7 +1349,7 @@ impl McpOrchestrator { let client = self .connection_pool - .get_or_create(pool_key, config.clone(), |cfg, _proxy| async move { + .get_or_create(pool_key.clone(), config.clone(), |cfg, _proxy| async move { match &cfg.transport { McpTransport::Streamable { url, @@ -1398,9 +1402,15 @@ impl McpOrchestrator { }) .await?; - // Load tools from the server - // Use server_key (URL) as the tool's server identifier so it matches - // what ensure_request_mcp_client adds to server_keys for filtering + // Skip tool listing if tools are already in the inventory (pool hit with existing tools) + if inventory_clone.has_server_tools(&server_key) { + self.metrics.record_connection_opened(); + return Ok(server_key); + } + + // Load tools from the server. + // If this fails, remove the connection from the pool so the next request + // retries from scratch instead of perpetually seeing zero tools. match client.peer().list_all_tools().await { Ok(tools) => { info!( @@ -1414,7 +1424,16 @@ impl McpOrchestrator { inventory_clone.insert_entry(entry); } } - Err(e) => warn!("Failed to list tools from '{}': {}", server_key, e), + Err(e) => { + warn!( + "Failed to list tools from '{}': {}; removing pooled connection", + server_key, e + ); + self.connection_pool.remove(&pool_key); + return Err(McpError::ConnectionFailed(format!( + "Tool discovery failed for '{server_key}': {e}" + ))); + } } self.metrics.record_connection_opened(); diff --git a/crates/mcp/src/core/pool.rs b/crates/mcp/src/core/pool.rs index 449af891a..79737b6fd 100644 --- a/crates/mcp/src/core/pool.rs +++ b/crates/mcp/src/core/pool.rs @@ -227,6 +227,17 @@ impl McpConnectionPool { self.connections.lock().contains(key) } + /// Remove a connection from the pool by key. Returns true if it was present. + pub fn remove(&self, key: &PoolKey) -> bool { + let mut connections = self.connections.lock(); + if connections.pop(key).is_some() { + self.connection_count.fetch_sub(1, Ordering::Relaxed); + true + } else { + false + } + } + /// Look up a connection by URL only (backward compatibility). /// /// **O(n)** — performs a linear scan of all pooled connections under the diff --git a/crates/mcp/src/inventory/index.rs b/crates/mcp/src/inventory/index.rs index 33bfea9ba..b6cffb270 100644 --- a/crates/mcp/src/inventory/index.rs +++ b/crates/mcp/src/inventory/index.rs @@ -328,6 +328,13 @@ impl ToolInventory { .collect() } + /// Check if a server has any registered tools. + pub fn has_server_tools(&self, server_key: &str) -> bool { + self.tools_by_server + .get(server_key) + .is_some_and(|tools| !tools.is_empty()) + } + /// Clear all cached items for a server. Uses server index for O(tools_per_server) removal. pub fn clear_server_tools(&self, server_key: &str) { if let Some((_, tool_names)) = self.tools_by_server.remove(server_key) { From 76e07152b362ea74139f1229a7abe43d7ab89cba Mon Sep 17 00:00:00 2001 From: MohanKumar21! Date: Fri, 15 May 2026 17:25:24 +0530 Subject: [PATCH 2/4] fix(mcp): track tool discovery on pool to fix flaky dynamic MCP tests Signed-off-by: MohanKumar21! --- crates/mcp/src/core/orchestrator.rs | 31 +++++++++++------ crates/mcp/src/core/pool.rs | 52 +++++++++++++++++++++++++---- crates/mcp/src/inventory/index.rs | 40 +++++++++++++--------- 3 files changed, 91 insertions(+), 32 deletions(-) diff --git a/crates/mcp/src/core/orchestrator.rs b/crates/mcp/src/core/orchestrator.rs index 55f50b441..b3b63dc15 100644 --- a/crates/mcp/src/core/orchestrator.rs +++ b/crates/mcp/src/core/orchestrator.rs @@ -1318,7 +1318,7 @@ impl McpOrchestrator { pub async fn connect_dynamic_server_with_tenant( &self, config: McpServerConfig, - tenant_id: Option, + tenant_id: Option, ) -> McpResult { use rmcp::{ transport::{ @@ -1331,10 +1331,14 @@ impl McpOrchestrator { let pool_key = PoolKey::from_config(&config, tenant_id); - // Fast path: connection exists AND tools are already registered in the inventory. - // If the connection is pooled but tools are missing (e.g. prior list_all_tools failed), - // fall through to re-fetch tools using the cached client. - if self.connection_pool.contains(&pool_key) + // Fast path: pool reports discovery completed AND the inventory still holds + // tools for this URL. The two invariants can desync because pool keys + // include (auth, tenant) but the inventory is keyed by URL only — a + // sibling pool entry's eviction can wipe the URL's inventory while this + // entry's `tools_discovered` flag remains true. Re-running discovery is + // the safe recovery; empty-tool servers fall through to the slower path + // and re-list, which is acceptable because that case is rare. + if self.connection_pool.tool_discovery_completed(&pool_key) && self.tool_inventory.has_server_tools(&pool_key.url) { return Ok(pool_key.url.clone()); @@ -1402,15 +1406,17 @@ impl McpOrchestrator { }) .await?; - // Skip tool listing if tools are already in the inventory (pool hit with existing tools) - if inventory_clone.has_server_tools(&server_key) { + // Another caller may have completed discovery while we were connecting. + // Same defensive AND as the outer fast path — see note above. + if self.connection_pool.tool_discovery_completed(&pool_key) + && inventory_clone.has_server_tools(&server_key) + { self.metrics.record_connection_opened(); return Ok(server_key); } - // Load tools from the server. - // If this fails, remove the connection from the pool so the next request - // retries from scratch instead of perpetually seeing zero tools. + // Load tools from the server. Dynamic servers are not built-in tool hosts. + // If discovery fails, remove the connection so the next request retries cleanly. match client.peer().list_all_tools().await { Ok(tools) => { info!( @@ -1418,11 +1424,16 @@ impl McpOrchestrator { tools.len(), server_key ); + // Drop any stale/partial entries from a prior failed discovery before + // re-populating, so the inventory reflects only the current tool set. + inventory_clone.clear_server_tools(&server_key); for tool in tools { let entry = ToolEntry::from_server_tool(&server_key, tool) .with_category(ToolCategory::Dynamic); inventory_clone.insert_entry(entry); } + self.connection_pool + .mark_tool_discovery_completed(&pool_key); } Err(e) => { warn!( diff --git a/crates/mcp/src/core/pool.rs b/crates/mcp/src/core/pool.rs index 79737b6fd..2d82514b5 100644 --- a/crates/mcp/src/core/pool.rs +++ b/crates/mcp/src/core/pool.rs @@ -14,7 +14,7 @@ use parking_lot::Mutex; use rmcp::{service::RunningService, RoleClient}; use super::config::{McpProxyConfig, McpServerConfig, McpTransport}; -use crate::error::McpResult; +use crate::{error::McpResult, tenant::TenantId}; type McpClient = RunningService; type EvictionCallback = Arc; @@ -26,11 +26,11 @@ type EvictionCallback = Arc; pub struct PoolKey { pub url: String, pub auth_hash: u64, - pub tenant_id: Option, + pub tenant_id: Option, } impl PoolKey { - pub fn new(url: impl Into, auth_hash: u64, tenant_id: Option) -> Self { + pub fn new(url: impl Into, auth_hash: u64, tenant_id: Option) -> Self { Self { url: url.into(), auth_hash, @@ -38,7 +38,7 @@ impl PoolKey { } } - pub fn from_config(config: &McpServerConfig, tenant_id: Option) -> Self { + pub fn from_config(config: &McpServerConfig, tenant_id: Option) -> Self { let (url, auth_hash) = match &config.transport { McpTransport::Streamable { url, @@ -95,11 +95,16 @@ impl PoolKey { #[derive(Clone)] pub(crate) struct CachedConnection { pub client: Arc, + /// Set after a successful `list_all_tools` for this pool entry (including empty tool lists). + pub tools_discovered: bool, } impl CachedConnection { pub fn new(client: Arc) -> Self { - Self { client } + Self { + client, + tools_discovered: false, + } } } @@ -227,11 +232,33 @@ impl McpConnectionPool { self.connections.lock().contains(key) } + /// True when the pool holds this key and tool discovery has completed successfully. + pub fn tool_discovery_completed(&self, key: &PoolKey) -> bool { + self.connections + .lock() + .get(key) + .is_some_and(|cached| cached.tools_discovered) + } + + /// Mark tool discovery as done for a pooled connection (including zero-tool servers). + pub fn mark_tool_discovery_completed(&self, key: &PoolKey) -> bool { + let mut connections = self.connections.lock(); + if let Some(cached) = connections.get_mut(key) { + cached.tools_discovered = true; + true + } else { + false + } + } + /// Remove a connection from the pool by key. Returns true if it was present. pub fn remove(&self, key: &PoolKey) -> bool { let mut connections = self.connections.lock(); if connections.pop(key).is_some() { self.connection_count.fetch_sub(1, Ordering::Relaxed); + if let Some(callback) = &self.eviction_callback { + callback(key); + } true } else { false @@ -356,8 +383,19 @@ mod tests { assert_ne!(key_with_token.auth_hash, 0); // Token hashed // With tenant - let key_with_tenant = PoolKey::from_config(&config, Some("tenant-123".to_string())); - assert_eq!(key_with_tenant.tenant_id, Some("tenant-123".to_string())); + let key_with_tenant = PoolKey::from_config(&config, Some(TenantId::from("tenant-123"))); + assert_eq!( + key_with_tenant.tenant_id, + Some(TenantId::from("tenant-123")) + ); + } + + #[test] + fn test_tool_discovery_completed_absent_key() { + let pool = McpConnectionPool::new(); + let key = PoolKey::from_config(&create_test_config("http://localhost:3000"), None); + assert!(!pool.tool_discovery_completed(&key)); + assert!(!pool.mark_tool_discovery_completed(&key)); } #[test] diff --git a/crates/mcp/src/inventory/index.rs b/crates/mcp/src/inventory/index.rs index b6cffb270..c25e9e9ea 100644 --- a/crates/mcp/src/inventory/index.rs +++ b/crates/mcp/src/inventory/index.rs @@ -31,7 +31,11 @@ pub(crate) struct CachedResource { pub struct ToolInventory { tools_by_qualified: DashMap, - tools_by_simple_name: DashMap>, + /// Tools indexed by simple (unqualified) name. Multiple servers may register + /// the same simple name; the `HashSet` enforces structural deduplication so + /// repeated re-discovery cannot create duplicate entries even if a caller + /// bypasses the explicit dedup check in `insert_entry`. + tools_by_simple_name: DashMap>, tools_by_server: DashMap>, tools_by_category: DashMap>, aliases: DashMap, @@ -60,12 +64,13 @@ impl Default for ToolInventory { } impl ToolInventory { - /// Returns first registered tool on collision. Use `get_tool_qualified()` for specific server. + /// Returns an arbitrary registered tool on collision (HashSet iteration order). + /// Use `get_tool_qualified()` to target a specific server deterministically. pub fn get_tool(&self, tool_name: &str) -> Option<(String, Tool)> { let qualified_names = self.tools_by_simple_name.get(tool_name)?; - let qualified = qualified_names.first()?; + let qualified = qualified_names.iter().next()?.clone(); self.tools_by_qualified - .get(qualified) + .get(&qualified) .map(|entry| (entry.server_key().to_string(), entry.tool.clone())) } @@ -109,15 +114,11 @@ impl ToolInventory { // Insert into primary index self.tools_by_qualified.insert(qualified.clone(), entry); - // Update simple name index + // Update simple name index — HashSet guarantees deduplication. self.tools_by_simple_name .entry(tool_name.clone()) - .and_modify(|v| { - if !v.contains(&qualified) { - v.push(qualified.clone()); - } - }) - .or_insert_with(|| vec![qualified]); + .or_default() + .insert(qualified); // Update server index self.tools_by_server @@ -328,7 +329,12 @@ impl ToolInventory { .collect() } - /// Check if a server has any registered tools. + /// True if at least one tool is currently registered under this server key. + /// + /// Used as a defensive invariant by the dynamic-connect fast path: the pool + /// is keyed by `(url, auth_hash, tenant_id)` while the inventory is keyed + /// by URL only, so the inventory for a URL can be wiped by a sibling pool + /// entry's eviction even when this entry still claims `tools_discovered`. pub fn has_server_tools(&self, server_key: &str) -> bool { self.tools_by_server .get(server_key) @@ -351,7 +357,7 @@ impl ToolInventory { // Remove from simple name index if let Some(mut entry) = self.tools_by_simple_name.get_mut(&tool_name) { - entry.retain(|q| q != &qualified); + entry.remove(&qualified); } self.tools_by_simple_name .remove_if(&tool_name, |_, v| v.is_empty()); @@ -694,9 +700,13 @@ mod tests { // Both stored (counts total tools including collisions) assert_eq!(inventory.counts().0, 2); - // Simple lookup returns first registered + // Simple lookup returns one of the registered servers (HashSet iteration + // order is unspecified; callers must use the qualified API for determinism). let (server, _) = inventory.get_tool("read_file").unwrap(); - assert_eq!(server, "server-a"); + assert!( + server == "server-a" || server == "server-b", + "unexpected server returned from simple lookup: {server}" + ); // Qualified lookup can access both assert!(inventory From 9b12527c181777e66caf4e0644729be18fb90a17 Mon Sep 17 00:00:00 2001 From: MohanKumar21! Date: Mon, 18 May 2026 11:02:13 +0530 Subject: [PATCH 3/4] fix(mcp): harden dynamic connection pool and enforce idle timeout Signed-off-by: MohanKumar21! --- crates/mcp/src/core/orchestrator.rs | 116 +++++++++++++++++++++++--- crates/mcp/src/core/pool.rs | 125 +++++++++++++++++++++++----- crates/mcp/src/inventory/index.rs | 61 +++++++++++--- 3 files changed, 259 insertions(+), 43 deletions(-) diff --git a/crates/mcp/src/core/orchestrator.rs b/crates/mcp/src/core/orchestrator.rs index b3b63dc15..501bc5238 100644 --- a/crates/mcp/src/core/orchestrator.rs +++ b/crates/mcp/src/core/orchestrator.rs @@ -67,6 +67,21 @@ use crate::{ tenant::TenantContext, }; +/// How long a successful `list_all_tools` is trusted as the basis for the +/// dynamic-connect fast path when the inventory is empty for that URL. This +/// bounds two competing concerns: +/// +/// * Genuinely zero-tool servers re-list at most once per TTL, not on every +/// request — keeping bandwidth and the chance of a transient +/// `list_all_tools` failure low. +/// * Warm-up races where the server briefly returned no tools self-heal on +/// the next request after the TTL expires, so the empty state is never +/// terminal. +/// +/// 60s is short enough that warm-up issues resolve quickly and long enough +/// that legitimately-empty servers don't generate per-request traffic. +const EMPTY_DISCOVERY_TTL: Duration = Duration::from_secs(60); + /// Build request headers from token and custom headers. fn build_request_headers( token: Option<&str>, @@ -317,7 +332,8 @@ impl McpOrchestrator { "LRU evicted dynamic server '{}' (tenant: {:?}) - clearing tools from inventory", key.url, key.tenant_id ); - // Tools are registered by URL, so clear by URL + // Tool entries are registered by URL; pool-presence tracking is + // cleared for matching pool identities inside the inventory. inventory_clone.clear_server_tools(&key.url); }); @@ -355,6 +371,10 @@ impl McpOrchestrator { // Start background refresh task orchestrator.spawn_refresh_handler(refresh_rx); + // Start background reaper for idle pooled connections so the configured + // `pool.idle_timeout` actually has an effect. + orchestrator.spawn_idle_reaper(); + info!( "McpOrchestrator initialized with {} static servers", orchestrator.static_servers.len() @@ -639,6 +659,53 @@ impl McpOrchestrator { } } + /// Spawn a background task that periodically reaps pooled connections idle for + /// longer than `config.pool.idle_timeout`. The reaper fires the pool's eviction + /// callback for every removed entry, which keeps the URL-scoped tool inventory + /// consistent. Disabled (no task spawned) when `idle_timeout == 0`. + fn spawn_idle_reaper(&self) { + let idle_timeout_secs = self.config.pool.idle_timeout; + if idle_timeout_secs == 0 { + debug!("Idle reaper disabled (pool.idle_timeout = 0)"); + return; + } + + let idle_for = Duration::from_secs(idle_timeout_secs); + // Sweep frequently enough that worst-case eviction lag stays bounded, but + // never below 30s to keep wake-ups cheap on quiet deployments. + let sweep_interval = std::cmp::max(idle_for / 4, Duration::from_secs(30)); + + let pool = Arc::clone(&self.connection_pool); + let token = self.shutdown_token.clone(); + + #[expect( + clippy::disallowed_methods, + reason = "idle reaper runs for orchestrator lifetime; shutdown is coordinated via CancellationToken" + )] + tokio::spawn(async move { + let mut ticker = tokio::time::interval(sweep_interval); + ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); + ticker.tick().await; + loop { + tokio::select! { + () = token.cancelled() => { + debug!("Idle reaper shutting down"); + break; + } + _ = ticker.tick() => { + let reaped = pool.evict_idle(idle_for); + if reaped > 0 { + debug!( + "Idle reaper evicted {} pooled MCP connection(s) idle for >= {:?}", + reaped, idle_for + ); + } + } + } + } + }); + } + /// Spawn background handler for inventory refresh requests. fn spawn_refresh_handler(&self, mut rx: mpsc::Receiver) { let token = self.shutdown_token.clone(); // @@ -1331,15 +1398,22 @@ impl McpOrchestrator { let pool_key = PoolKey::from_config(&config, tenant_id); - // Fast path: pool reports discovery completed AND the inventory still holds - // tools for this URL. The two invariants can desync because pool keys - // include (auth, tenant) but the inventory is keyed by URL only — a - // sibling pool entry's eviction can wipe the URL's inventory while this - // entry's `tools_discovered` flag remains true. Re-running discovery is - // the safe recovery; empty-tool servers fall through to the slower path - // and re-list, which is acceptable because that case is rare. + // Fast path: pool reports discovery completed AND either + // (a) the inventory still holds tools for this pool identity, OR + // (b) the last successful discovery is fresh within + // `EMPTY_DISCOVERY_TTL`. + // + // The inventory check guards against sibling-tenant state standing in + // for this pool entry after URL-scoped inventory changes. The TTL + // branch lets genuinely-empty servers short-circuit briefly without + // making the empty state terminal: a warm-up race that returned no + // tools naturally re-validates after the TTL expires and tools that + // have since been registered become visible. if self.connection_pool.tool_discovery_completed(&pool_key) - && self.tool_inventory.has_server_tools(&pool_key.url) + && (self.tool_inventory.has_server_tools(&pool_key) + || self + .connection_pool + .discovery_fresh_within(&pool_key, EMPTY_DISCOVERY_TTL)) { return Ok(pool_key.url.clone()); } @@ -1407,9 +1481,12 @@ impl McpOrchestrator { .await?; // Another caller may have completed discovery while we were connecting. - // Same defensive AND as the outer fast path — see note above. + // Same combined check as the outer fast path — see note above. if self.connection_pool.tool_discovery_completed(&pool_key) - && inventory_clone.has_server_tools(&server_key) + && (inventory_clone.has_server_tools(&pool_key) + || self + .connection_pool + .discovery_fresh_within(&pool_key, EMPTY_DISCOVERY_TTL)) { self.metrics.record_connection_opened(); return Ok(server_key); @@ -1430,12 +1507,27 @@ impl McpOrchestrator { for tool in tools { let entry = ToolEntry::from_server_tool(&server_key, tool) .with_category(ToolCategory::Dynamic); - inventory_clone.insert_entry(entry); + inventory_clone.insert_pool_entry(&pool_key, entry); } self.connection_pool .mark_tool_discovery_completed(&pool_key); } Err(e) => { + // If a prior discovery already succeeded for this pool entry, + // a transient `list_all_tools` failure should not tear the + // connection down or surface as `ConnectionFailed`: the cached + // client is still valid and the existing inventory state + // (possibly empty after a sibling-tenant wipe) is the best we + // have. We keep the entry so the next request can retry; the + // TTL'd fast-path check above provides bounded backoff. + if self.connection_pool.tool_discovery_completed(&pool_key) { + warn!( + "Re-list of tools failed for '{}': {}; keeping cached connection (prior discovery still recorded)", + server_key, e + ); + self.metrics.record_connection_opened(); + return Ok(server_key); + } warn!( "Failed to list tools from '{}': {}; removing pooled connection", server_key, e diff --git a/crates/mcp/src/core/pool.rs b/crates/mcp/src/core/pool.rs index 2d82514b5..aabd1f6a5 100644 --- a/crates/mcp/src/core/pool.rs +++ b/crates/mcp/src/core/pool.rs @@ -7,6 +7,7 @@ use std::{ atomic::{AtomicUsize, Ordering}, Arc, }, + time::{Duration, Instant}, }; use lru::LruCache; @@ -95,8 +96,23 @@ impl PoolKey { #[derive(Clone)] pub(crate) struct CachedConnection { pub client: Arc, - /// Set after a successful `list_all_tools` for this pool entry (including empty tool lists). + /// Set after a successful `list_all_tools` for this pool entry, regardless of how + /// many tools were returned. The orchestrator combines this with a non-empty + /// inventory check before short-circuiting — see the comment above the fast path + /// in `connect_dynamic_server_with_tenant`. We deliberately do NOT cache a + /// separate "discovered empty" terminal shortcut: a single empty response during + /// server warm-up would otherwise poison the fast path until LRU/idle eviction. pub tools_discovered: bool, + /// Timestamp of the most recent successful `list_all_tools` for this pool entry. + /// Combined with a TTL by `discovery_fresh_within`, this gives a bounded grace + /// window in which an empty discovery may be trusted (avoiding re-listing on + /// every request for genuinely zero-tool servers) without making the empty + /// state terminal (a warm-up race re-validates after the TTL expires). + pub last_discovery_at: Option, + /// Wall-clock-ish timestamp of the last access through `get` / `get_or_create` / + /// the fast-path inspection methods. Used by `evict_idle` to reap connections + /// that have not been touched for longer than the configured idle timeout. + pub last_used: Instant, } impl CachedConnection { @@ -104,6 +120,8 @@ impl CachedConnection { Self { client, tools_discovered: false, + last_discovery_at: None, + last_used: Instant::now(), } } } @@ -163,7 +181,8 @@ impl McpConnectionPool { { { let mut connections = self.connections.lock(); - if let Some(cached) = connections.get(&key) { + if let Some(cached) = connections.get_mut(&key) { + cached.last_used = Instant::now(); return Ok(Arc::clone(&cached.client)); } } @@ -220,12 +239,12 @@ impl McpConnectionPool { .collect() } - /// Get connection, promoting in LRU. + /// Get connection, promoting in LRU and refreshing the idle timestamp. pub fn get(&self, key: &PoolKey) -> Option> { - self.connections - .lock() - .get(key) - .map(|cached| Arc::clone(&cached.client)) + self.connections.lock().get_mut(key).map(|cached| { + cached.last_used = Instant::now(); + Arc::clone(&cached.client) + }) } pub fn contains(&self, key: &PoolKey) -> bool { @@ -233,36 +252,90 @@ impl McpConnectionPool { } /// True when the pool holds this key and tool discovery has completed successfully. + /// Touches `last_used` because callers gate live usage of the connection on this. pub fn tool_discovery_completed(&self, key: &PoolKey) -> bool { - self.connections - .lock() - .get(key) - .is_some_and(|cached| cached.tools_discovered) + self.connections.lock().get_mut(key).is_some_and(|cached| { + cached.last_used = Instant::now(); + cached.tools_discovered + }) } - /// Mark tool discovery as done for a pooled connection (including zero-tool servers). + /// Mark tool discovery as done for a pooled connection. Records "ran + /// successfully" without caching "discovered zero tools" as a terminal + /// state — see `CachedConnection::last_discovery_at` for the rationale. pub fn mark_tool_discovery_completed(&self, key: &PoolKey) -> bool { let mut connections = self.connections.lock(); if let Some(cached) = connections.get_mut(key) { + let now = Instant::now(); cached.tools_discovered = true; + cached.last_discovery_at = Some(now); + cached.last_used = now; true } else { false } } + /// True when the most recent successful discovery for `key` happened within + /// `fresh_within`. Lets the orchestrator's fast path short-circuit briefly + /// for genuinely-empty servers without ever caching the empty state as + /// terminal — the cache lapses on TTL and the next request re-validates. + pub fn discovery_fresh_within(&self, key: &PoolKey, fresh_within: Duration) -> bool { + let now = Instant::now(); + self.connections.lock().get_mut(key).is_some_and(|cached| { + cached.last_used = now; + cached + .last_discovery_at + .is_some_and(|at| now.saturating_duration_since(at) < fresh_within) + }) + } + + /// Evict every pooled entry that has been idle for at least `idle_for`. Fires the + /// eviction callback for each removed key so downstream state (tool inventory, + /// metrics) stays consistent. Returns the number of entries reaped. + pub fn evict_idle(&self, idle_for: Duration) -> usize { + let now = Instant::now(); + let evicted: Vec = { + let mut connections = self.connections.lock(); + let stale: Vec = connections + .iter() + .filter(|(_, cached)| now.saturating_duration_since(cached.last_used) >= idle_for) + .map(|(key, _)| key.clone()) + .collect(); + for key in &stale { + connections.pop(key); + } + stale + }; + + let count = evicted.len(); + if count > 0 { + self.connection_count.fetch_sub(count, Ordering::Relaxed); + if let Some(callback) = &self.eviction_callback { + for key in &evicted { + callback(key); + } + } + } + count + } + /// Remove a connection from the pool by key. Returns true if it was present. pub fn remove(&self, key: &PoolKey) -> bool { - let mut connections = self.connections.lock(); - if connections.pop(key).is_some() { - self.connection_count.fetch_sub(1, Ordering::Relaxed); - if let Some(callback) = &self.eviction_callback { - callback(key); + let (removed, callback) = { + let mut connections = self.connections.lock(); + if connections.pop(key).is_some() { + self.connection_count.fetch_sub(1, Ordering::Relaxed); + (true, self.eviction_callback.clone()) + } else { + (false, None) } - true - } else { - false + }; + + if let Some(callback) = callback { + callback(key); } + removed } /// Look up a connection by URL only (backward compatibility). @@ -398,6 +471,18 @@ mod tests { assert!(!pool.mark_tool_discovery_completed(&key)); } + #[test] + fn test_evict_idle_noop_on_empty_pool() { + // We cannot safely fabricate `RunningService` stubs in unit + // tests (see the absence of get/insert tests above), so behaviour under load + // is covered by the e2e suite. This guards the bookkeeping for empty pools + // and exercises the public surface. + let pool = McpConnectionPool::new(); + assert_eq!(pool.evict_idle(Duration::from_secs(0)), 0); + assert_eq!(pool.evict_idle(Duration::from_secs(300)), 0); + assert_eq!(pool.len(), 0); + } + #[test] fn test_pool_key_different_tokens() { let config1 = McpServerConfig { diff --git a/crates/mcp/src/inventory/index.rs b/crates/mcp/src/inventory/index.rs index c25e9e9ea..1d423c2ee 100644 --- a/crates/mcp/src/inventory/index.rs +++ b/crates/mcp/src/inventory/index.rs @@ -13,7 +13,10 @@ use dashmap::DashMap; use tracing::warn; use super::types::{QualifiedToolName, ToolCategory, ToolEntry}; -use crate::core::config::{Prompt, RawResource, Tool}; +use crate::core::{ + config::{Prompt, RawResource, Tool}, + pool::PoolKey, +}; /// Cached prompt with metadata #[derive(Clone)] @@ -37,6 +40,7 @@ pub struct ToolInventory { /// bypasses the explicit dedup check in `insert_entry`. tools_by_simple_name: DashMap>, tools_by_server: DashMap>, + tools_by_pool: DashMap>, tools_by_category: DashMap>, aliases: DashMap, prompts: DashMap, @@ -49,6 +53,7 @@ impl ToolInventory { tools_by_qualified: DashMap::new(), tools_by_simple_name: DashMap::new(), tools_by_server: DashMap::new(), + tools_by_pool: DashMap::new(), tools_by_category: DashMap::new(), aliases: DashMap::new(), prompts: DashMap::new(), @@ -88,6 +93,16 @@ impl ToolInventory { self.insert_entry(entry); } + /// Insert a tool discovered for a specific dynamic pool identity. + pub fn insert_pool_entry(&self, pool_key: &PoolKey, entry: ToolEntry) { + let tool_name = entry.qualified_name.tool_name().to_string(); + self.insert_entry(entry); + self.tools_by_pool + .entry(pool_key.clone()) + .or_default() + .insert(tool_name); + } + /// Insert a full tool entry with all metadata. pub fn insert_entry(&self, entry: ToolEntry) { let qualified = entry.qualified_name.clone(); @@ -329,20 +344,21 @@ impl ToolInventory { .collect() } - /// True if at least one tool is currently registered under this server key. + /// True if at least one tool is currently registered for this exact pool identity. /// /// Used as a defensive invariant by the dynamic-connect fast path: the pool - /// is keyed by `(url, auth_hash, tenant_id)` while the inventory is keyed - /// by URL only, so the inventory for a URL can be wiped by a sibling pool - /// entry's eviction even when this entry still claims `tools_discovered`. - pub fn has_server_tools(&self, server_key: &str) -> bool { - self.tools_by_server - .get(server_key) + /// is keyed by `(url, auth_hash, tenant_id)`, so this check must not let a + /// sibling pool entry's inventory stand in for the current entry. + pub fn has_server_tools(&self, pool_key: &PoolKey) -> bool { + self.tools_by_pool + .get(pool_key) .is_some_and(|tools| !tools.is_empty()) } /// Clear all cached items for a server. Uses server index for O(tools_per_server) removal. pub fn clear_server_tools(&self, server_key: &str) { + self.tools_by_pool.retain(|key, _| key.url != server_key); + if let Some((_, tool_names)) = self.tools_by_server.remove(server_key) { for tool_name in tool_names { let qualified = QualifiedToolName::new(server_key, &tool_name); @@ -392,6 +408,7 @@ impl ToolInventory { self.tools_by_qualified.clear(); self.tools_by_simple_name.clear(); self.tools_by_server.clear(); + self.tools_by_pool.clear(); self.tools_by_category.clear(); self.aliases.clear(); self.prompts.clear(); @@ -552,6 +569,23 @@ mod tests { assert_eq!(tools[0].0, "tool2"); } + #[test] + fn test_has_server_tools_is_pool_scoped() { + let inventory = ToolInventory::new(); + let pool_key = PoolKey::new("server1", 1, None); + let sibling_key = PoolKey::new("server1", 2, None); + let entry = ToolEntry::from_server_tool("server1", create_test_tool("tool1")); + + inventory.insert_pool_entry(&pool_key, entry); + + assert!(inventory.has_server_tools(&pool_key)); + assert!(!inventory.has_server_tools(&sibling_key)); + + inventory.clear_server_tools("server1"); + + assert!(!inventory.has_server_tools(&pool_key)); + } + #[test] fn test_prompt_operations() { let inventory = ToolInventory::new(); @@ -739,15 +773,20 @@ mod tests { create_test_tool("read_file"), ); - // Initial state: 2 tools, simple lookup returns server-a + // Initial state: 2 tools, simple lookup returns one of the registered + // servers (HashSet iteration order is unspecified — use the qualified + // API to assert deterministically). assert_eq!(inventory.counts().0, 2); let (server, _) = inventory.get_tool("read_file").unwrap(); - assert_eq!(server, "server-a"); + assert!( + server == "server-a" || server == "server-b", + "unexpected server before clear: {server}" + ); // Clear server-a inventory.clear_server_tools("server-a"); - // After clear: 1 tool, simple lookup should return server-b + // After clear: 1 tool, simple lookup must return server-b (only survivor). assert_eq!(inventory.counts().0, 1); let (server, _) = inventory.get_tool("read_file").unwrap(); assert_eq!(server, "server-b"); From 08c3731f5585e599f8042b76292ee01d705f14c4 Mon Sep 17 00:00:00 2001 From: MohanKumar21! Date: Mon, 18 May 2026 15:01:53 +0530 Subject: [PATCH 4/4] fix(mcp): Fix MCP dynamic inventory fast-path isolation Signed-off-by: MohanKumar21! --- crates/mcp/src/core/orchestrator.rs | 44 +++++------------------------ crates/mcp/src/core/pool.rs | 35 +++-------------------- 2 files changed, 11 insertions(+), 68 deletions(-) diff --git a/crates/mcp/src/core/orchestrator.rs b/crates/mcp/src/core/orchestrator.rs index 501bc5238..1e956ed0c 100644 --- a/crates/mcp/src/core/orchestrator.rs +++ b/crates/mcp/src/core/orchestrator.rs @@ -67,21 +67,6 @@ use crate::{ tenant::TenantContext, }; -/// How long a successful `list_all_tools` is trusted as the basis for the -/// dynamic-connect fast path when the inventory is empty for that URL. This -/// bounds two competing concerns: -/// -/// * Genuinely zero-tool servers re-list at most once per TTL, not on every -/// request — keeping bandwidth and the chance of a transient -/// `list_all_tools` failure low. -/// * Warm-up races where the server briefly returned no tools self-heal on -/// the next request after the TTL expires, so the empty state is never -/// terminal. -/// -/// 60s is short enough that warm-up issues resolve quickly and long enough -/// that legitimately-empty servers don't generate per-request traffic. -const EMPTY_DISCOVERY_TTL: Duration = Duration::from_secs(60); - /// Build request headers from token and custom headers. fn build_request_headers( token: Option<&str>, @@ -1398,22 +1383,11 @@ impl McpOrchestrator { let pool_key = PoolKey::from_config(&config, tenant_id); - // Fast path: pool reports discovery completed AND either - // (a) the inventory still holds tools for this pool identity, OR - // (b) the last successful discovery is fresh within - // `EMPTY_DISCOVERY_TTL`. - // - // The inventory check guards against sibling-tenant state standing in - // for this pool entry after URL-scoped inventory changes. The TTL - // branch lets genuinely-empty servers short-circuit briefly without - // making the empty state terminal: a warm-up race that returned no - // tools naturally re-validates after the TTL expires and tools that - // have since been registered become visible. + // Fast path only when this exact pool identity still has inventory. + // Without that invariant, a sibling eviction can clear URL-scoped tools + // while this pool entry still claims discovery completed. if self.connection_pool.tool_discovery_completed(&pool_key) - && (self.tool_inventory.has_server_tools(&pool_key) - || self - .connection_pool - .discovery_fresh_within(&pool_key, EMPTY_DISCOVERY_TTL)) + && self.tool_inventory.has_server_tools(&pool_key) { return Ok(pool_key.url.clone()); } @@ -1481,12 +1455,9 @@ impl McpOrchestrator { .await?; // Another caller may have completed discovery while we were connecting. - // Same combined check as the outer fast path — see note above. + // Same inventory invariant as the outer fast path. if self.connection_pool.tool_discovery_completed(&pool_key) - && (inventory_clone.has_server_tools(&pool_key) - || self - .connection_pool - .discovery_fresh_within(&pool_key, EMPTY_DISCOVERY_TTL)) + && inventory_clone.has_server_tools(&pool_key) { self.metrics.record_connection_opened(); return Ok(server_key); @@ -1518,8 +1489,7 @@ impl McpOrchestrator { // connection down or surface as `ConnectionFailed`: the cached // client is still valid and the existing inventory state // (possibly empty after a sibling-tenant wipe) is the best we - // have. We keep the entry so the next request can retry; the - // TTL'd fast-path check above provides bounded backoff. + // have. We keep the entry so the next request can retry. if self.connection_pool.tool_discovery_completed(&pool_key) { warn!( "Re-list of tools failed for '{}': {}; keeping cached connection (prior discovery still recorded)", diff --git a/crates/mcp/src/core/pool.rs b/crates/mcp/src/core/pool.rs index aabd1f6a5..5cac01635 100644 --- a/crates/mcp/src/core/pool.rs +++ b/crates/mcp/src/core/pool.rs @@ -96,19 +96,10 @@ impl PoolKey { #[derive(Clone)] pub(crate) struct CachedConnection { pub client: Arc, - /// Set after a successful `list_all_tools` for this pool entry, regardless of how - /// many tools were returned. The orchestrator combines this with a non-empty - /// inventory check before short-circuiting — see the comment above the fast path - /// in `connect_dynamic_server_with_tenant`. We deliberately do NOT cache a - /// separate "discovered empty" terminal shortcut: a single empty response during - /// server warm-up would otherwise poison the fast path until LRU/idle eviction. + /// Set after a successful `list_all_tools` for this pool entry. The orchestrator + /// combines this with a non-empty pool-scoped inventory check before + /// short-circuiting. pub tools_discovered: bool, - /// Timestamp of the most recent successful `list_all_tools` for this pool entry. - /// Combined with a TTL by `discovery_fresh_within`, this gives a bounded grace - /// window in which an empty discovery may be trusted (avoiding re-listing on - /// every request for genuinely zero-tool servers) without making the empty - /// state terminal (a warm-up race re-validates after the TTL expires). - pub last_discovery_at: Option, /// Wall-clock-ish timestamp of the last access through `get` / `get_or_create` / /// the fast-path inspection methods. Used by `evict_idle` to reap connections /// that have not been touched for longer than the configured idle timeout. @@ -120,7 +111,6 @@ impl CachedConnection { Self { client, tools_discovered: false, - last_discovery_at: None, last_used: Instant::now(), } } @@ -260,15 +250,12 @@ impl McpConnectionPool { }) } - /// Mark tool discovery as done for a pooled connection. Records "ran - /// successfully" without caching "discovered zero tools" as a terminal - /// state — see `CachedConnection::last_discovery_at` for the rationale. + /// Mark tool discovery as done for a pooled connection. pub fn mark_tool_discovery_completed(&self, key: &PoolKey) -> bool { let mut connections = self.connections.lock(); if let Some(cached) = connections.get_mut(key) { let now = Instant::now(); cached.tools_discovered = true; - cached.last_discovery_at = Some(now); cached.last_used = now; true } else { @@ -276,20 +263,6 @@ impl McpConnectionPool { } } - /// True when the most recent successful discovery for `key` happened within - /// `fresh_within`. Lets the orchestrator's fast path short-circuit briefly - /// for genuinely-empty servers without ever caching the empty state as - /// terminal — the cache lapses on TTL and the next request re-validates. - pub fn discovery_fresh_within(&self, key: &PoolKey, fresh_within: Duration) -> bool { - let now = Instant::now(); - self.connections.lock().get_mut(key).is_some_and(|cached| { - cached.last_used = now; - cached - .last_discovery_at - .is_some_and(|at| now.saturating_duration_since(at) < fresh_within) - }) - } - /// Evict every pooled entry that has been idle for at least `idle_for`. Fires the /// eviction callback for each removed key so downstream state (tool inventory, /// metrics) stays consistent. Returns the number of entries reaped.