Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 102 additions & 10 deletions crates/mcp/src/core/orchestrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,8 @@ impl McpOrchestrator {
"LRU evicted dynamic server '{}' (tenant: {:?}) - clearing tools from inventory",
key.url, key.tenant_id
);
// Tools are registered by URL, so clear by URL
// Tool entries are registered by URL; pool-presence tracking is
// cleared for matching pool identities inside the inventory.
inventory_clone.clear_server_tools(&key.url);
});

Expand Down Expand Up @@ -355,6 +356,10 @@ impl McpOrchestrator {
// Start background refresh task
orchestrator.spawn_refresh_handler(refresh_rx);

// Start background reaper for idle pooled connections so the configured
// `pool.idle_timeout` actually has an effect.
orchestrator.spawn_idle_reaper();

info!(
"McpOrchestrator initialized with {} static servers",
orchestrator.static_servers.len()
Expand Down Expand Up @@ -639,6 +644,53 @@ impl McpOrchestrator {
}
}

/// Spawn a background task that periodically reaps pooled connections idle for
/// longer than `config.pool.idle_timeout`. The reaper fires the pool's eviction
/// callback for every removed entry, which keeps the URL-scoped tool inventory
/// consistent. Disabled (no task spawned) when `idle_timeout == 0`.
fn spawn_idle_reaper(&self) {
let idle_timeout_secs = self.config.pool.idle_timeout;
if idle_timeout_secs == 0 {
debug!("Idle reaper disabled (pool.idle_timeout = 0)");
return;
}

let idle_for = Duration::from_secs(idle_timeout_secs);
// Sweep frequently enough that worst-case eviction lag stays bounded, but
// never below 30s to keep wake-ups cheap on quiet deployments.
let sweep_interval = std::cmp::max(idle_for / 4, Duration::from_secs(30));

let pool = Arc::clone(&self.connection_pool);
let token = self.shutdown_token.clone();

#[expect(
clippy::disallowed_methods,
reason = "idle reaper runs for orchestrator lifetime; shutdown is coordinated via CancellationToken"
)]
tokio::spawn(async move {
let mut ticker = tokio::time::interval(sweep_interval);
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
ticker.tick().await;
loop {
tokio::select! {
() = token.cancelled() => {
debug!("Idle reaper shutting down");
break;
}
_ = ticker.tick() => {
let reaped = pool.evict_idle(idle_for);
if reaped > 0 {
debug!(
"Idle reaper evicted {} pooled MCP connection(s) idle for >= {:?}",
reaped, idle_for
);
}
}
}
}
});
}

/// Spawn background handler for inventory refresh requests.
fn spawn_refresh_handler(&self, mut rx: mpsc::Receiver<RefreshRequest>) {
let token = self.shutdown_token.clone(); //
Expand Down Expand Up @@ -1318,7 +1370,7 @@ impl McpOrchestrator {
pub async fn connect_dynamic_server_with_tenant(
&self,
config: McpServerConfig,
tenant_id: Option<String>,
tenant_id: Option<crate::tenant::TenantId>,
) -> McpResult<String> {
use rmcp::{
transport::{
Expand All @@ -1331,8 +1383,12 @@ impl McpOrchestrator {

let pool_key = PoolKey::from_config(&config, tenant_id);

// Check if already connected with same auth/tenant
if self.connection_pool.contains(&pool_key) {
// Fast path only when this exact pool identity still has inventory.
// Without that invariant, a sibling eviction can clear URL-scoped tools
// while this pool entry still claims discovery completed.
if self.connection_pool.tool_discovery_completed(&pool_key)
&& self.tool_inventory.has_server_tools(&pool_key)
{
Comment on lines +1389 to +1391
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Allow fast path after successful empty tool discovery

mark_tool_discovery_completed is documented to treat empty list_all_tools results as successful discovery, but this fast path also requires has_server_tools(url), so zero-tool servers can never hit the cache path and will re-run discovery on every connect. In environments where a dynamic MCP endpoint intentionally exposes no tools (or temporarily returns none), this causes repeated network calls and avoidable ConnectionFailed errors from transient list_all_tools failures despite a previously successful discovery.

Useful? React with 👍 / 👎.

return Ok(pool_key.url.clone());
}

Expand All @@ -1345,7 +1401,7 @@ impl McpOrchestrator {

let client = self
.connection_pool
.get_or_create(pool_key, config.clone(), |cfg, _proxy| async move {
.get_or_create(pool_key.clone(), config.clone(), |cfg, _proxy| async move {
match &cfg.transport {
McpTransport::Streamable {
url,
Expand Down Expand Up @@ -1398,23 +1454,59 @@ impl McpOrchestrator {
})
.await?;

// Load tools from the server
// Use server_key (URL) as the tool's server identifier so it matches
// what ensure_request_mcp_client adds to server_keys for filtering
// Another caller may have completed discovery while we were connecting.
// Same inventory invariant as the outer fast path.
if self.connection_pool.tool_discovery_completed(&pool_key)
&& inventory_clone.has_server_tools(&pool_key)
{
self.metrics.record_connection_opened();
return Ok(server_key);
}
Comment on lines +1483 to +1493
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | 💤 Low value

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Check if get_or_create returns information about whether it was a cache hit
# and how metrics are expected to be recorded
rg -n -A 10 'fn get_or_create' crates/mcp/src/core/pool.rs

Repository: lightseekorg/smg

Length of output: 437


🏁 Script executed:

#!/bin/bash
# Get the full get_or_create implementation
cat -n crates/mcp/src/core/pool.rs | sed -n '172,250p'

Repository: lightseekorg/smg

Length of output: 3150


🏁 Script executed:

#!/bin/bash
# Get the full context around line 1491 and 1542 in orchestrator.rs
wc -l crates/mcp/src/core/orchestrator.rs

Repository: lightseekorg/smg

Length of output: 101


🏁 Script executed:

#!/bin/bash
# Find where record_connection_opened is defined and called
rg -n 'record_connection_opened' crates/mcp/src/

Repository: lightseekorg/smg

Length of output: 672


🏁 Script executed:

#!/bin/bash
# Get context around line 1491 and 1542
cat -n crates/mcp/src/core/orchestrator.rs | sed -n '1420,1555p'

Repository: lightseekorg/smg

Length of output: 7254


🏁 Script executed:

#!/bin/bash
# Check the metrics definition
cat -n crates/mcp/src/core/metrics.rs | sed -n '93,110p'

Repository: lightseekorg/smg

Length of output: 1025


🏁 Script executed:

#!/bin/bash
# Search for record_connection_closed to understand the full lifecycle
rg -n -B 2 -A 2 'record_connection_closed' crates/mcp/src/core/orchestrator.rs

Repository: lightseekorg/smg

Length of output: 226


Metrics may over-count when cached connections trigger record_connection_opened().

get_or_create returns an Arc<McpClient> without indicating whether it was a cache hit (line 186 in pool.rs) or a newly created connection (lines 190–210). The orchestrator then calls record_connection_opened() at lines 1491, 1528, and 1542 regardless of whether the connection was new or cached. Since record_connection_opened() increments active_connections (fetch_add), a reused cached connection will increment the counter multiple times across different requests, creating an accounting mismatch with record_connection_closed() which is only called once during shutdown.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@crates/mcp/src/core/orchestrator.rs` around lines 1483 - 1493, The
orchestrator is unconditionally calling metrics.record_connection_opened() for
connections returned by connection_pool.get_or_create(), causing over-counting
when cached Arc<McpClient> instances are reused; change get_or_create (in
pool.rs) to return an indicator of whether the connection was newly created
(e.g., return (Arc<McpClient>, bool is_new) or provide a separate creation-aware
API) and update the orchestrator call sites that currently call
record_connection_opened() (the branches that check
connection_pool.tool_discovery_completed(&pool_key) and subsequent
early-returns) to only call record_connection_opened() when is_new is true, or
alternatively move the metrics increment into the creation code path inside
connection_pool.get_or_create so cached hits do not increment
active_connections.


// Load tools from the server. Dynamic servers are not built-in tool hosts.
// If discovery fails, remove the connection so the next request retries cleanly.
match client.peer().list_all_tools().await {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-medium medium

Consider calling inventory_clone.clear_server_tools(&server_key) before re-listing tools to prevent stale or partial entries. To prevent vulnerabilities from duplicate entries, ensure the inventory uses a HashSet for tool storage. Additionally, ensure tool names are JSON-escaped before interpolation to handle special characters correctly.

        inventory_clone.clear_server_tools(&server_key);
        match client.peer().list_all_tools().await {
References
  1. To prevent vulnerabilities from duplicate entries, use data structures that inherently enforce uniqueness, such as HashSet, instead of manually deduplicating collections like Vec.
  2. Tool names should be JSON-escaped before interpolation to correctly handle special characters like quotes, backslashes, and unicode.

Ok(tools) => {
info!(
"Discovered {} tools from dynamic server '{}'",
tools.len(),
server_key
);
// Drop any stale/partial entries from a prior failed discovery before
// re-populating, so the inventory reflects only the current tool set.
inventory_clone.clear_server_tools(&server_key);
for tool in tools {
let entry = ToolEntry::from_server_tool(&server_key, tool)
.with_category(ToolCategory::Dynamic);
inventory_clone.insert_entry(entry);
inventory_clone.insert_pool_entry(&pool_key, entry);
}
self.connection_pool
.mark_tool_discovery_completed(&pool_key);
}
Err(e) => {
// If a prior discovery already succeeded for this pool entry,
// a transient `list_all_tools` failure should not tear the
// connection down or surface as `ConnectionFailed`: the cached
// client is still valid and the existing inventory state
// (possibly empty after a sibling-tenant wipe) is the best we
// have. We keep the entry so the next request can retry.
if self.connection_pool.tool_discovery_completed(&pool_key) {
warn!(
"Re-list of tools failed for '{}': {}; keeping cached connection (prior discovery still recorded)",
server_key, e
);
self.metrics.record_connection_opened();
return Ok(server_key);
}
warn!(
"Failed to list tools from '{}': {}; removing pooled connection",
server_key, e
);
self.connection_pool.remove(&pool_key);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Avoid evicting pooled client on concurrent discovery error

Concurrent callers can share the same pooled client and race through list_all_tools because discovery is not serialized per pool key; if one caller succeeds and another transiently fails, this unconditional remove evicts the healthy connection and triggers URL-level inventory clearing, undoing the successful discovery and causing temporary tool loss/flaky failures. The eviction should be conditional on discovery still being incomplete (or protected by a per-key discovery lock) before clearing shared state.

Useful? React with 👍 / 👎.

return Err(McpError::ConnectionFailed(format!(
"Tool discovery failed for '{server_key}': {e}"
)));
}
Err(e) => warn!("Failed to list tools from '{}': {}", server_key, e),
}

self.metrics.record_connection_opened();
Expand Down
133 changes: 120 additions & 13 deletions crates/mcp/src/core/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@ use std::{
atomic::{AtomicUsize, Ordering},
Arc,
},
time::{Duration, Instant},
};

use lru::LruCache;
use parking_lot::Mutex;
use rmcp::{service::RunningService, RoleClient};

use super::config::{McpProxyConfig, McpServerConfig, McpTransport};
use crate::error::McpResult;
use crate::{error::McpResult, tenant::TenantId};

type McpClient = RunningService<RoleClient, ()>;
type EvictionCallback = Arc<dyn Fn(&PoolKey) + Send + Sync>;
Expand All @@ -26,19 +27,19 @@ type EvictionCallback = Arc<dyn Fn(&PoolKey) + Send + Sync>;
pub struct PoolKey {
pub url: String,
pub auth_hash: u64,
pub tenant_id: Option<String>,
pub tenant_id: Option<TenantId>,
}

impl PoolKey {
pub fn new(url: impl Into<String>, auth_hash: u64, tenant_id: Option<String>) -> Self {
pub fn new(url: impl Into<String>, auth_hash: u64, tenant_id: Option<TenantId>) -> Self {
Self {
url: url.into(),
auth_hash,
tenant_id,
}
}

pub fn from_config(config: &McpServerConfig, tenant_id: Option<String>) -> Self {
pub fn from_config(config: &McpServerConfig, tenant_id: Option<TenantId>) -> Self {
let (url, auth_hash) = match &config.transport {
McpTransport::Streamable {
url,
Expand Down Expand Up @@ -95,11 +96,23 @@ impl PoolKey {
#[derive(Clone)]
pub(crate) struct CachedConnection {
pub client: Arc<McpClient>,
/// Set after a successful `list_all_tools` for this pool entry. The orchestrator
/// combines this with a non-empty pool-scoped inventory check before
/// short-circuiting.
pub tools_discovered: bool,
/// Wall-clock-ish timestamp of the last access through `get` / `get_or_create` /
/// the fast-path inspection methods. Used by `evict_idle` to reap connections
/// that have not been touched for longer than the configured idle timeout.
pub last_used: Instant,
}

impl CachedConnection {
pub fn new(client: Arc<McpClient>) -> Self {
Self { client }
Self {
client,
tools_discovered: false,
last_used: Instant::now(),
}
}
}

Expand Down Expand Up @@ -158,7 +171,8 @@ impl McpConnectionPool {
{
{
let mut connections = self.connections.lock();
if let Some(cached) = connections.get(&key) {
if let Some(cached) = connections.get_mut(&key) {
cached.last_used = Instant::now();
return Ok(Arc::clone(&cached.client));
}
}
Expand Down Expand Up @@ -215,18 +229,88 @@ impl McpConnectionPool {
.collect()
}

/// Get connection, promoting in LRU.
/// Get connection, promoting in LRU and refreshing the idle timestamp.
pub fn get(&self, key: &PoolKey) -> Option<Arc<McpClient>> {
self.connections
.lock()
.get(key)
.map(|cached| Arc::clone(&cached.client))
self.connections.lock().get_mut(key).map(|cached| {
cached.last_used = Instant::now();
Arc::clone(&cached.client)
})
}

pub fn contains(&self, key: &PoolKey) -> bool {
self.connections.lock().contains(key)
}

/// True when the pool holds this key and tool discovery has completed successfully.
/// Touches `last_used` because callers gate live usage of the connection on this.
pub fn tool_discovery_completed(&self, key: &PoolKey) -> bool {
self.connections.lock().get_mut(key).is_some_and(|cached| {
cached.last_used = Instant::now();
cached.tools_discovered
})
}

/// Mark tool discovery as done for a pooled connection.
pub fn mark_tool_discovery_completed(&self, key: &PoolKey) -> bool {
let mut connections = self.connections.lock();
if let Some(cached) = connections.get_mut(key) {
let now = Instant::now();
cached.tools_discovered = true;
cached.last_used = now;
true
} else {
false
}
}

/// Evict every pooled entry that has been idle for at least `idle_for`. Fires the
/// eviction callback for each removed key so downstream state (tool inventory,
/// metrics) stays consistent. Returns the number of entries reaped.
pub fn evict_idle(&self, idle_for: Duration) -> usize {
let now = Instant::now();
let evicted: Vec<PoolKey> = {
let mut connections = self.connections.lock();
let stale: Vec<PoolKey> = connections
.iter()
.filter(|(_, cached)| now.saturating_duration_since(cached.last_used) >= idle_for)
.map(|(key, _)| key.clone())
.collect();
for key in &stale {
connections.pop(key);
}
stale
};

let count = evicted.len();
if count > 0 {
self.connection_count.fetch_sub(count, Ordering::Relaxed);
if let Some(callback) = &self.eviction_callback {
Comment on lines +284 to +287
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Guard idle-eviction counter update against concurrent clear

evict_idle decrements connection_count after releasing the pool lock, so a concurrent clear() can reset the counter to 0 between removal and fetch_sub(count). In that race, fetch_sub underflows the usize counter and len()/stats() report a huge bogus value. Keep the decrement synchronized with the removal (or use saturating/CAS logic) to avoid counter corruption during shutdown or admin clears.

Useful? React with 👍 / 👎.

for key in &evicted {
callback(key);
}
}
}
count
}

/// Remove a connection from the pool by key. Returns true if it was present.
pub fn remove(&self, key: &PoolKey) -> bool {
let (removed, callback) = {
let mut connections = self.connections.lock();
if connections.pop(key).is_some() {
self.connection_count.fetch_sub(1, Ordering::Relaxed);
(true, self.eviction_callback.clone())
} else {
(false, None)
}
};

if let Some(callback) = callback {
callback(key);
}
Comment on lines +297 to +310
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The remove method should trigger the eviction_callback when a connection is successfully removed. The McpOrchestrator relies on this callback to clear associated tools from the ToolInventory. Additionally, ensure that long-lived bidirectional streams in this pool implement an idle timeout to prevent resource exhaustion. Finally, since PoolKey is used on hot paths, consider using an interned string type like Arc<str> for its fields to improve performance.

Suggested change
pub fn remove(&self, key: &PoolKey) -> bool {
let mut connections = self.connections.lock();
if connections.pop(key).is_some() {
self.connection_count.fetch_sub(1, Ordering::Relaxed);
true
} else {
false
}
pub fn remove(&self, key: &PoolKey) -> bool {
let mut connections = self.connections.lock();
if connections.pop(key).is_some() {
self.connection_count.fetch_sub(1, Ordering::Relaxed);
if let Some(callback) = &self.eviction_callback {
callback(key);
}
true
} else {
false
}
}
References
  1. Implement an idle timeout for long-lived bidirectional streams to prevent inactive clients from indefinitely consuming server-side resources such as tasks and channels.
  2. For types that are frequently cloned on hot paths and represent a small, repeated set of values (e.g., worker IDs or tenant IDs), use an interned string type like Arc to improve performance.

removed
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

/// Look up a connection by URL only (backward compatibility).
///
/// **O(n)** — performs a linear scan of all pooled connections under the
Expand Down Expand Up @@ -345,8 +429,31 @@ mod tests {
assert_ne!(key_with_token.auth_hash, 0); // Token hashed

// With tenant
let key_with_tenant = PoolKey::from_config(&config, Some("tenant-123".to_string()));
assert_eq!(key_with_tenant.tenant_id, Some("tenant-123".to_string()));
let key_with_tenant = PoolKey::from_config(&config, Some(TenantId::from("tenant-123")));
assert_eq!(
key_with_tenant.tenant_id,
Some(TenantId::from("tenant-123"))
);
}

#[test]
fn test_tool_discovery_completed_absent_key() {
let pool = McpConnectionPool::new();
let key = PoolKey::from_config(&create_test_config("http://localhost:3000"), None);
assert!(!pool.tool_discovery_completed(&key));
assert!(!pool.mark_tool_discovery_completed(&key));
}

#[test]
fn test_evict_idle_noop_on_empty_pool() {
// We cannot safely fabricate `RunningService<RoleClient, ()>` stubs in unit
// tests (see the absence of get/insert tests above), so behaviour under load
// is covered by the e2e suite. This guards the bookkeeping for empty pools
// and exercises the public surface.
let pool = McpConnectionPool::new();
assert_eq!(pool.evict_idle(Duration::from_secs(0)), 0);
assert_eq!(pool.evict_idle(Duration::from_secs(300)), 0);
assert_eq!(pool.len(), 0);
}

#[test]
Expand Down
Loading
Loading