From 87b8960057cb7091421eab3b38b29ba78119186d Mon Sep 17 00:00:00 2001 From: Kenny Bergquist Date: Wed, 13 May 2026 22:52:56 -0400 Subject: [PATCH] remove rmcp-actix-web dep, expand mcp test suite --- .github/workflows/test.yml | 7 + Cargo.lock | 57 +- crates/atomic-server/Cargo.toml | 5 +- crates/atomic-server/src/main.rs | 36 +- crates/atomic-server/src/mcp/mod.rs | 2 + crates/atomic-server/src/mcp/transport.rs | 733 ++++++++++++++++++++ crates/atomic-server/tests/mcp_inspector.rs | 154 ++++ package.json | 1 + scripts/check-mcp-inspector.js | 49 ++ 9 files changed, 959 insertions(+), 85 deletions(-) create mode 100644 crates/atomic-server/src/mcp/transport.rs create mode 100644 crates/atomic-server/tests/mcp_inspector.rs create mode 100644 scripts/check-mcp-inspector.js diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 651096f5..84ac34bc 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -21,6 +21,11 @@ jobs: - name: Install Rust uses: dtolnay/rust-toolchain@stable + - name: Setup Node.js + uses: actions/setup-node@v4 + with: + node-version-file: .nvmrc + # .cargo/config.toml pins the mold linker for x86_64-linux, so it must # be available on the runner or every link step fails with "cannot find 'ld'". - name: Install mold linker @@ -44,6 +49,8 @@ jobs: # need webkit/gtk — those are only required by src-tauri, which we skip # here via explicit `-p` flags. This keeps the Linux test job lightweight. - name: Run unit + integration tests + env: + ATOMIC_RUN_MCP_INSPECTOR: "1" run: | cargo test \ -p atomic-core \ diff --git a/Cargo.lock b/Cargo.lock index 0b531deb..bf960ba9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -684,6 +684,7 @@ dependencies = [ "actix-files", "actix-web", "actix-ws", + "async-stream", "atomic-core", "base64 0.22.1", "chrono", @@ -692,13 +693,13 @@ dependencies = [ "rand 0.8.5", "reqwest 0.12.26", "rmcp", - "rmcp-actix-web", "schemars 1.1.0", "serde", "serde_json", "sha2", "tempfile", "tokio", + "tokio-stream", "tracing", "tracing-subscriber", "utoipa", @@ -864,31 +865,6 @@ dependencies = [ "piper", ] -[[package]] -name = "bon" -version = "3.8.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ebeb9aaf9329dff6ceb65c689ca3db33dbf15f324909c60e4e5eef5701ce31b1" -dependencies = [ - "bon-macros", - "rustversion", -] - -[[package]] -name = "bon-macros" -version = "3.8.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77e9d642a7e3a318e37c2c9427b5a6a48aa1ad55dcd986f3034ab2239045a645" -dependencies = [ - "darling 0.21.3", - "ident_case", - "prettyplease", - "proc-macro2", - "quote", - "rustversion", - "syn 2.0.111", -] - [[package]] name = "brotli" version = "8.0.2" @@ -4160,16 +4136,6 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "925383efa346730478fb4838dbe9137d2a47675ad789c546d150a6e1dd4ab31c" -[[package]] -name = "prettyplease" -version = "0.2.37" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "479ca8adacdd7ce8f1fb39ce9ecccbfe93a3f1344b3d0d97f20bc0196208f62b" -dependencies = [ - "proc-macro2", - "syn 2.0.111", -] - [[package]] name = "proc-macro-crate" version = "1.3.1" @@ -4639,24 +4605,6 @@ dependencies = [ "uuid", ] -[[package]] -name = "rmcp-actix-web" -version = "0.11.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f88703b727576959ccb3abc4b818b16881aaa79c57c6b225cfca1f710fa38e7a" -dependencies = [ - "actix-web", - "async-stream", - "bon", - "futures", - "rmcp", - "serde", - "serde_json", - "tokio", - "tokio-stream", - "tracing", -] - [[package]] name = "rmcp-macros" version = "0.15.0" @@ -5002,7 +4950,6 @@ version = "1.0.145" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "402a6f66d8c709116cf22f558eab210f5a50187f702eb4d7e5ef38d9a7f1c79c" dependencies = [ - "indexmap 2.12.1", "itoa", "memchr", "ryu", diff --git a/crates/atomic-server/Cargo.toml b/crates/atomic-server/Cargo.toml index 53c9d565..e6535984 100644 --- a/crates/atomic-server/Cargo.toml +++ b/crates/atomic-server/Cargo.toml @@ -29,8 +29,10 @@ actix-ws = "0.3" actix-files = "0.6" # Async -tokio = { version = "1", features = ["rt-multi-thread", "sync", "time", "macros", "signal"] } +tokio = { version = "1", features = ["rt-multi-thread", "sync", "time", "macros", "signal", "process"] } futures = "0.3" +async-stream = "0.3" +tokio-stream = "0.1" # Serialization serde = { version = "1", features = ["derive"] } @@ -52,7 +54,6 @@ reqwest = { version = "0.12", features = ["json"] } # MCP server rmcp = { version = "0.15", features = ["server", "macros", "transport-streamable-http-server"] } -rmcp-actix-web = "0.11" schemars = "1.0" # OpenAPI diff --git a/crates/atomic-server/src/main.rs b/crates/atomic-server/src/main.rs index 5dd9f56a..3d9e2c38 100644 --- a/crates/atomic-server/src/main.rs +++ b/crates/atomic-server/src/main.rs @@ -17,8 +17,6 @@ use atomic_server::{ }; use clap::Parser; use config::{Cli, Command, TokenAction}; -use rmcp::transport::streamable_http_server::session::local::LocalSessionManager; -use rmcp_actix_web::transport::StreamableHttpService; use std::sync::Arc; use std::time::Duration; use utoipa::OpenApi; @@ -271,31 +269,13 @@ async fn run_server( setup_claim_limiter: SetupClaimLimiter::new(), }); - // Create MCP service with multi-database support via ?db= query param - let mcp_manager = Arc::clone(&manager); - let mcp_tx = event_tx.clone(); - let mcp_service = StreamableHttpService::builder() - .service_factory(Arc::new(move || { - Ok(mcp::AtomicMcpServer::new( - Arc::clone(&mcp_manager), - mcp_tx.clone(), - )) - })) - .on_request_fn(|http_req, ext| { - let db_id = http_req.query_string().split('&').find_map(|pair| { - let mut parts = pair.splitn(2, '='); - if parts.next()? == "db" { - parts.next().map(String::from) - } else { - None - } - }); - ext.insert(mcp::DbSelection(db_id)); - }) - .session_manager(Arc::new(LocalSessionManager::default())) - .stateful_mode(true) - .sse_keep_alive(Duration::from_secs(30)) - .build(); + // Create MCP transport outside HttpServer::new() so all Actix workers share + // one session manager. + let mcp_transport = mcp::AtomicMcpTransport::new( + Arc::clone(&manager), + event_tx.clone(), + Duration::from_secs(30), + ); tracing::info!("Atomic Server starting..."); tracing::info!(data_dir = data_dir, "data directory"); @@ -590,7 +570,7 @@ async fn run_server( .wrap(mcp_auth::McpAuth { state: app_state.clone(), }) - .service(mcp_service.clone().scope()), + .service(mcp_transport.clone().scope()), ) // Authenticated API routes .service( diff --git a/crates/atomic-server/src/mcp/mod.rs b/crates/atomic-server/src/mcp/mod.rs index 89cc9107..b6440c2c 100644 --- a/crates/atomic-server/src/mcp/mod.rs +++ b/crates/atomic-server/src/mcp/mod.rs @@ -1,5 +1,7 @@ mod server; +mod transport; mod types; pub use server::AtomicMcpServer; pub use server::DbSelection; +pub use transport::AtomicMcpTransport; diff --git a/crates/atomic-server/src/mcp/transport.rs b/crates/atomic-server/src/mcp/transport.rs new file mode 100644 index 00000000..f88bb998 --- /dev/null +++ b/crates/atomic-server/src/mcp/transport.rs @@ -0,0 +1,733 @@ +//! Atomic-owned Actix transport for MCP Streamable HTTP. +//! +//! This keeps Atomic on the official `rmcp` protocol/service layer while +//! owning the HTTP boundary where clients are strict about status codes and +//! content types. + +use super::{AtomicMcpServer, DbSelection}; +use crate::state::ServerEvent; +use actix_web::{ + error::InternalError, + http::{ + header::{self, CACHE_CONTROL}, + StatusCode, + }, + middleware, + web::{self, Bytes, Data}, + HttpRequest, HttpResponse, Result, Scope, +}; +use atomic_core::manager::DatabaseManager; +use futures::{Stream, StreamExt}; +use rmcp::{ + model::{ClientJsonRpcMessage, ClientRequest, GetExtensions}, + serve_server, + transport::{ + common::http_header::{ + EVENT_STREAM_MIME_TYPE, HEADER_LAST_EVENT_ID, HEADER_SESSION_ID, JSON_MIME_TYPE, + }, + streamable_http_server::session::{local::LocalSessionManager, SessionManager}, + TransportAdapterIdentity, + }, +}; +use std::{sync::Arc, time::Duration}; +use tokio::sync::broadcast; + +const HEADER_X_ACCEL_BUFFERING: &str = "X-Accel-Buffering"; +const TEXT_MIME_TYPE: &str = "text/plain; charset=utf-8"; + +#[derive(Clone)] +pub struct AtomicMcpTransport { + state: Data, +} + +impl AtomicMcpTransport { + pub fn new( + manager: Arc, + event_tx: broadcast::Sender, + sse_keep_alive: Duration, + ) -> Self { + Self { + state: Data::new(TransportState { + manager, + event_tx, + session_manager: Arc::new(LocalSessionManager::default()), + sse_keep_alive, + }), + } + } + + pub fn scope( + self, + ) -> Scope< + impl actix_web::dev::ServiceFactory< + actix_web::dev::ServiceRequest, + Config = (), + Response = actix_web::dev::ServiceResponse, + Error = actix_web::Error, + InitError = (), + >, + > { + web::scope("") + .app_data(self.state.clone()) + .wrap(middleware::NormalizePath::trim()) + .route("", web::get().to(handle_get)) + .route("", web::post().to(handle_post)) + .route("", web::delete().to(handle_delete)) + } +} + +struct TransportState { + manager: Arc, + event_tx: broadcast::Sender, + session_manager: Arc, + sse_keep_alive: Duration, +} + +impl TransportState { + fn server(&self) -> AtomicMcpServer { + AtomicMcpServer::new(Arc::clone(&self.manager), self.event_tx.clone()) + } +} + +fn text_error(status: StatusCode, message: &'static str) -> HttpResponse { + HttpResponse::build(status) + .content_type(TEXT_MIME_TYPE) + .body(message) +} + +fn accepts(req: &HttpRequest, media_type: &str) -> bool { + req.headers() + .get(header::ACCEPT) + .and_then(|h| h.to_str().ok()) + .is_some_and(|value| { + value.split(',').any(|part| { + let item = part + .split(';') + .next() + .unwrap_or_default() + .trim() + .to_ascii_lowercase(); + item == "*/*" || item == media_type || item == media_type.replace("text/", "*/") + }) + }) +} + +fn accepts_all(req: &HttpRequest, media_types: &[&str]) -> bool { + media_types + .iter() + .all(|media_type| accepts(req, media_type)) +} + +fn has_json_content_type(req: &HttpRequest) -> bool { + req.headers() + .get(header::CONTENT_TYPE) + .and_then(|h| h.to_str().ok()) + .is_some_and(|value| { + value + .split(';') + .next() + .unwrap_or_default() + .trim() + .eq_ignore_ascii_case(JSON_MIME_TYPE) + }) +} + +fn request_session_id(req: &HttpRequest) -> Option> { + req.headers() + .get(HEADER_SESSION_ID) + .and_then(|v| v.to_str().ok()) + .map(|s| Arc::::from(s.to_owned())) +} + +fn db_selection(req: &HttpRequest) -> DbSelection { + let db_id = req.query_string().split('&').find_map(|pair| { + let mut parts = pair.splitn(2, '='); + if parts.next()? == "db" { + parts.next().map(String::from) + } else { + None + } + }); + DbSelection(db_id) +} + +fn attach_request_extensions(req: &HttpRequest, message: &mut ClientJsonRpcMessage) { + if let ClientJsonRpcMessage::Request(request_msg) = message { + request_msg + .request + .extensions_mut() + .insert(db_selection(req)); + } +} + +fn wrap_with_sse_keepalive( + stream: S, + keep_alive: Duration, +) -> impl Stream> +where + S: Stream> + Send + 'static, +{ + async_stream::stream! { + let mut stream = Box::pin(stream); + let mut keep_alive_timer = tokio::time::interval(keep_alive); + keep_alive_timer.tick().await; + + loop { + tokio::select! { + result = stream.next() => { + match result { + Some(msg) => yield msg, + None => break, + } + } + _ = keep_alive_timer.tick() => { + yield Ok(Bytes::from(":ping\n\n")); + } + } + } + } +} + +async fn handle_get(req: HttpRequest, state: Data) -> Result { + if !accepts(&req, EVENT_STREAM_MIME_TYPE) { + return Ok(text_error( + StatusCode::NOT_ACCEPTABLE, + "Not Acceptable: Client must accept text/event-stream", + )); + } + + let Some(session_id) = request_session_id(&req) else { + return Ok(text_error( + StatusCode::BAD_REQUEST, + "Bad Request: Session ID is required", + )); + }; + + let has_session = state + .session_manager + .has_session(&session_id) + .await + .map_err(|e| InternalError::new(e, StatusCode::INTERNAL_SERVER_ERROR))?; + + if !has_session { + return Ok(text_error( + StatusCode::NOT_FOUND, + "Not Found: Session not found", + )); + } + + let last_event_id = req + .headers() + .get(HEADER_LAST_EVENT_ID) + .and_then(|v| v.to_str().ok()) + .map(String::from); + + let sse_stream: std::pin::Pin + Send>> = + if let Some(last_event_id) = last_event_id { + Box::pin( + state + .session_manager + .resume(&session_id, last_event_id) + .await + .map_err(|e| InternalError::new(e, StatusCode::INTERNAL_SERVER_ERROR))?, + ) + } else { + Box::pin( + state + .session_manager + .create_standalone_stream(&session_id) + .await + .map_err(|e| InternalError::new(e, StatusCode::INTERNAL_SERVER_ERROR))?, + ) + }; + + let formatted_stream = sse_stream.map(|msg| { + let mut output = String::new(); + if let Some(id) = msg.event_id { + output.push_str(&format!("id: {id}\n")); + } + match msg.message { + Some(message) => { + let data = serde_json::to_string(message.as_ref()).unwrap_or_else(|_| "{}".into()); + output.push_str(&format!("data: {data}\n\n")); + } + None => output.push_str("data:\n\n"), + } + Ok::<_, actix_web::Error>(Bytes::from(output)) + }); + + Ok(HttpResponse::Ok() + .content_type(EVENT_STREAM_MIME_TYPE) + .append_header((CACHE_CONTROL, "no-cache")) + .append_header((HEADER_X_ACCEL_BUFFERING, "no")) + .streaming(wrap_with_sse_keepalive( + formatted_stream, + state.sse_keep_alive, + ))) +} + +async fn handle_post( + req: HttpRequest, + body: Bytes, + state: Data, +) -> Result { + if !accepts_all(&req, &[JSON_MIME_TYPE, EVENT_STREAM_MIME_TYPE]) { + return Ok(text_error( + StatusCode::NOT_ACCEPTABLE, + "Not Acceptable: Client must accept both application/json and text/event-stream", + )); + } + + if !has_json_content_type(&req) { + return Ok(text_error( + StatusCode::UNSUPPORTED_MEDIA_TYPE, + "Unsupported Media Type: Content-Type must be application/json", + )); + } + + let mut message: ClientJsonRpcMessage = match serde_json::from_slice(&body) { + Ok(message) => message, + Err(_) => { + return Ok(text_error( + StatusCode::BAD_REQUEST, + "Bad Request: Body must be a valid MCP JSON-RPC message", + )); + } + }; + + attach_request_extensions(&req, &mut message); + + if let Some(session_id) = request_session_id(&req) { + let has_session = state + .session_manager + .has_session(&session_id) + .await + .map_err(|e| InternalError::new(e, StatusCode::INTERNAL_SERVER_ERROR))?; + + if !has_session { + return Ok(text_error( + StatusCode::NOT_FOUND, + "Not Found: Session not found", + )); + } + + return match message { + ClientJsonRpcMessage::Request(request_msg) => { + let stream = state + .session_manager + .create_stream(&session_id, ClientJsonRpcMessage::Request(request_msg)) + .await + .map_err(|e| InternalError::new(e, StatusCode::INTERNAL_SERVER_ERROR))?; + + let formatted_stream = stream.map(|msg| { + let mut output = String::new(); + if let Some(id) = msg.event_id { + output.push_str(&format!("id: {id}\n")); + } + match msg.message { + Some(message) => { + let data = serde_json::to_string(message.as_ref()) + .unwrap_or_else(|_| "{}".into()); + output.push_str(&format!("data: {data}\n\n")); + } + None => output.push_str("data:\n\n"), + } + Ok::<_, actix_web::Error>(Bytes::from(output)) + }); + + Ok(HttpResponse::Ok() + .content_type(EVENT_STREAM_MIME_TYPE) + .append_header((CACHE_CONTROL, "no-cache")) + .append_header((HEADER_X_ACCEL_BUFFERING, "no")) + .streaming(wrap_with_sse_keepalive( + formatted_stream, + state.sse_keep_alive, + ))) + } + ClientJsonRpcMessage::Notification(_) + | ClientJsonRpcMessage::Response(_) + | ClientJsonRpcMessage::Error(_) => { + state + .session_manager + .accept_message(&session_id, message) + .await + .map_err(|e| InternalError::new(e, StatusCode::INTERNAL_SERVER_ERROR))?; + + Ok(HttpResponse::Accepted().finish()) + } + }; + } + + let is_initialize = matches!( + &message, + ClientJsonRpcMessage::Request(request_msg) + if matches!(request_msg.request, ClientRequest::InitializeRequest(_)) + ); + + if !is_initialize { + return Ok(text_error( + StatusCode::UNPROCESSABLE_ENTITY, + "Unprocessable Entity: Expected initialize request", + )); + } + + let (session_id, transport) = state + .session_manager + .create_session() + .await + .map_err(|e| InternalError::new(e, StatusCode::INTERNAL_SERVER_ERROR))?; + + tokio::spawn({ + let session_manager = Arc::clone(&state.session_manager); + let session_id = Arc::clone(&session_id); + let service_instance = state.server(); + async move { + let service = serve_server::( + service_instance, + transport, + ) + .await; + + match service { + Ok(service) => { + let _ = service.waiting().await; + } + Err(e) => tracing::error!("Failed to create MCP service: {e}"), + } + + let _ = session_manager + .close_session(&session_id) + .await + .inspect_err(|e| { + tracing::error!("Failed to close MCP session {session_id}: {e}"); + }); + } + }); + + let response = state + .session_manager + .initialize_session(&session_id, message) + .await + .map_err(|e| InternalError::new(e, StatusCode::INTERNAL_SERVER_ERROR))?; + + let sse_stream = async_stream::stream! { + let data = serde_json::to_string(&response).unwrap_or_else(|_| "{}".into()); + yield Ok::<_, actix_web::Error>(Bytes::from(format!("data: {data}\n\n"))); + }; + + Ok(HttpResponse::Ok() + .content_type(EVENT_STREAM_MIME_TYPE) + .append_header((CACHE_CONTROL, "no-cache")) + .append_header((HEADER_X_ACCEL_BUFFERING, "no")) + .append_header((HEADER_SESSION_ID, session_id.as_ref())) + .streaming(sse_stream)) +} + +async fn handle_delete(req: HttpRequest, state: Data) -> Result { + let Some(session_id) = request_session_id(&req) else { + return Ok(text_error( + StatusCode::BAD_REQUEST, + "Bad Request: Session ID is required", + )); + }; + + let has_session = state + .session_manager + .has_session(&session_id) + .await + .map_err(|e| InternalError::new(e, StatusCode::INTERNAL_SERVER_ERROR))?; + + if !has_session { + return Ok(text_error( + StatusCode::NOT_FOUND, + "Not Found: Session not found", + )); + } + + state + .session_manager + .close_session(&session_id) + .await + .map_err(|e| InternalError::new(e, StatusCode::INTERNAL_SERVER_ERROR))?; + + Ok(HttpResponse::NoContent().finish()) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::state::ServerEvent; + use actix_web::{body::to_bytes, http::header, test as actix_test, App}; + use serde_json::json; + + fn test_transport() -> (AtomicMcpTransport, tempfile::TempDir) { + let temp = tempfile::TempDir::new().unwrap(); + let manager = Arc::new(DatabaseManager::new(temp.path()).unwrap()); + let (event_tx, _) = broadcast::channel::(16); + let transport = AtomicMcpTransport::new(manager, event_tx, Duration::from_secs(30)); + + (transport, temp) + } + + fn initialize_request() -> serde_json::Value { + json!({ + "jsonrpc": "2.0", + "id": 1, + "method": "initialize", + "params": { + "protocolVersion": "2025-06-18", + "capabilities": {}, + "clientInfo": { + "name": "atomic-test", + "version": "0.0.0" + } + } + }) + } + + fn tools_list_request(id: i32) -> serde_json::Value { + json!({ + "jsonrpc": "2.0", + "id": id, + "method": "tools/list", + "params": {} + }) + } + + macro_rules! initialize_session { + ($app:expr) => {{ + let req = actix_test::TestRequest::post() + .uri("/mcp") + .insert_header(( + header::ACCEPT, + format!("{JSON_MIME_TYPE}, {EVENT_STREAM_MIME_TYPE}"), + )) + .insert_header((header::CONTENT_TYPE, JSON_MIME_TYPE)) + .set_payload(initialize_request().to_string()) + .to_request(); + let resp = actix_test::call_service(&$app, req).await; + assert_eq!(resp.status(), StatusCode::OK); + assert_content_type(&resp, EVENT_STREAM_MIME_TYPE); + let session_id = resp + .headers() + .get(HEADER_SESSION_ID) + .and_then(|value| value.to_str().ok()) + .expect("initialize response should include session id") + .to_string(); + let body = to_bytes(resp.into_body()).await.unwrap(); + let body = String::from_utf8(body.to_vec()).unwrap(); + assert!( + body.contains("\"result\""), + "initialize should return a JSON-RPC result: {body}" + ); + session_id + }}; + } + + macro_rules! send_initialized { + ($app:expr, $session_id:expr) => {{ + let req = actix_test::TestRequest::post() + .uri("/mcp") + .insert_header(( + header::ACCEPT, + format!("{JSON_MIME_TYPE}, {EVENT_STREAM_MIME_TYPE}"), + )) + .insert_header((header::CONTENT_TYPE, JSON_MIME_TYPE)) + .insert_header((HEADER_SESSION_ID, $session_id.clone())) + .set_payload( + json!({ + "jsonrpc": "2.0", + "method": "notifications/initialized" + }) + .to_string(), + ) + .to_request(); + let resp = actix_test::call_service(&$app, req).await; + assert_eq!(resp.status(), StatusCode::ACCEPTED); + }}; + } + + fn assert_content_type(resp: &actix_web::dev::ServiceResponse, expected: &str) { + let content_type = resp + .headers() + .get(header::CONTENT_TYPE) + .and_then(|value| value.to_str().ok()) + .expect("response should include content-type"); + assert!( + content_type.starts_with(expected), + "expected content-type {expected}, got {content_type}" + ); + } + + #[actix_web::test] + async fn get_missing_session_returns_400_with_content_type() { + let (transport, _temp) = test_transport(); + let app = actix_test::init_service( + App::new().service(web::scope("/mcp").service(transport.scope())), + ) + .await; + + let req = actix_test::TestRequest::get() + .uri("/mcp") + .insert_header((header::ACCEPT, EVENT_STREAM_MIME_TYPE)) + .to_request(); + let resp = actix_test::call_service(&app, req).await; + + assert_eq!(resp.status(), StatusCode::BAD_REQUEST); + assert_content_type(&resp, "text/plain"); + } + + #[actix_web::test] + async fn get_unknown_session_returns_404_with_content_type() { + let (transport, _temp) = test_transport(); + let app = actix_test::init_service( + App::new().service(web::scope("/mcp").service(transport.scope())), + ) + .await; + + let req = actix_test::TestRequest::get() + .uri("/mcp") + .insert_header((header::ACCEPT, EVENT_STREAM_MIME_TYPE)) + .insert_header((HEADER_SESSION_ID, "stale-session")) + .to_request(); + let resp = actix_test::call_service(&app, req).await; + + assert_eq!(resp.status(), StatusCode::NOT_FOUND); + assert_content_type(&resp, "text/plain"); + } + + #[actix_web::test] + async fn post_unknown_session_returns_404_with_content_type() { + let (transport, _temp) = test_transport(); + let app = actix_test::init_service( + App::new().service(web::scope("/mcp").service(transport.scope())), + ) + .await; + + let req = actix_test::TestRequest::post() + .uri("/mcp") + .insert_header(( + header::ACCEPT, + format!("{JSON_MIME_TYPE}, {EVENT_STREAM_MIME_TYPE}"), + )) + .insert_header((header::CONTENT_TYPE, JSON_MIME_TYPE)) + .insert_header((HEADER_SESSION_ID, "stale-session")) + .set_payload(tools_list_request(2).to_string()) + .to_request(); + let resp = actix_test::call_service(&app, req).await; + + assert_eq!(resp.status(), StatusCode::NOT_FOUND); + assert_content_type(&resp, "text/plain"); + } + + #[actix_web::test] + async fn post_without_session_requires_initialize_with_content_type() { + let (transport, _temp) = test_transport(); + let app = actix_test::init_service( + App::new().service(web::scope("/mcp").service(transport.scope())), + ) + .await; + + let req = actix_test::TestRequest::post() + .uri("/mcp") + .insert_header(( + header::ACCEPT, + format!("{JSON_MIME_TYPE}, {EVENT_STREAM_MIME_TYPE}"), + )) + .insert_header((header::CONTENT_TYPE, JSON_MIME_TYPE)) + .set_payload(tools_list_request(2).to_string()) + .to_request(); + let resp = actix_test::call_service(&app, req).await; + + assert_eq!(resp.status(), StatusCode::UNPROCESSABLE_ENTITY); + assert_content_type(&resp, "text/plain"); + } + + #[actix_web::test] + async fn initialize_returns_sse_and_session_header() { + let (transport, _temp) = test_transport(); + let app = actix_test::init_service( + App::new().service(web::scope("/mcp").service(transport.scope())), + ) + .await; + let session_id = initialize_session!(app); + + assert!(!session_id.is_empty()); + } + + #[actix_web::test] + async fn notification_with_session_returns_accepted() { + let (transport, _temp) = test_transport(); + let app = actix_test::init_service( + App::new().service(web::scope("/mcp").service(transport.scope())), + ) + .await; + let session_id = initialize_session!(app); + send_initialized!(app, session_id); + + let req = actix_test::TestRequest::post() + .uri("/mcp") + .insert_header(( + header::ACCEPT, + format!("{JSON_MIME_TYPE}, {EVENT_STREAM_MIME_TYPE}"), + )) + .insert_header((header::CONTENT_TYPE, JSON_MIME_TYPE)) + .insert_header((HEADER_SESSION_ID, session_id)) + .set_payload( + json!({ + "jsonrpc": "2.0", + "method": "notifications/initialized" + }) + .to_string(), + ) + .to_request(); + let resp = actix_test::call_service(&app, req).await; + + assert_eq!(resp.status(), StatusCode::ACCEPTED); + } + + #[actix_web::test] + async fn request_with_session_returns_sse_stream() { + let (transport, _temp) = test_transport(); + let app = actix_test::init_service( + App::new().service(web::scope("/mcp").service(transport.scope())), + ) + .await; + let session_id = initialize_session!(app); + + let req = actix_test::TestRequest::post() + .uri("/mcp") + .insert_header(( + header::ACCEPT, + format!("{JSON_MIME_TYPE}, {EVENT_STREAM_MIME_TYPE}"), + )) + .insert_header((header::CONTENT_TYPE, JSON_MIME_TYPE)) + .insert_header((HEADER_SESSION_ID, session_id)) + .set_payload(tools_list_request(2).to_string()) + .to_request(); + let resp = actix_test::call_service(&app, req).await; + + assert_eq!(resp.status(), StatusCode::OK); + assert_content_type(&resp, EVENT_STREAM_MIME_TYPE); + + let _ = resp.into_body(); + } + + #[actix_web::test] + async fn delete_unknown_session_returns_404_with_content_type() { + let (transport, _temp) = test_transport(); + let app = actix_test::init_service( + App::new().service(web::scope("/mcp").service(transport.scope())), + ) + .await; + + let req = actix_test::TestRequest::delete() + .uri("/mcp") + .insert_header((HEADER_SESSION_ID, "stale-session")) + .to_request(); + let resp = actix_test::call_service(&app, req).await; + + assert_eq!(resp.status(), StatusCode::NOT_FOUND); + assert_content_type(&resp, "text/plain"); + } +} diff --git a/crates/atomic-server/tests/mcp_inspector.rs b/crates/atomic-server/tests/mcp_inspector.rs new file mode 100644 index 00000000..1d0e90d3 --- /dev/null +++ b/crates/atomic-server/tests/mcp_inspector.rs @@ -0,0 +1,154 @@ +//! Compatibility smoke test for the official MCP Inspector CLI. +//! +//! The test is part of `cargo test`, but runs opportunistically by default: +//! if Inspector is not already available through `npx --no-install`, it skips. +//! Set `ATOMIC_RUN_MCP_INSPECTOR=1` to require the check and allow `npx -y` +//! to install the package if needed. + +use actix_web::{web, App, HttpServer}; +use atomic_server::{ + export_jobs::ExportJobManager, + log_buffer::LogBuffer, + mcp::AtomicMcpTransport, + mcp_auth::McpAuth, + state::{AppState, ServerEvent, SetupClaimLimiter}, +}; +use std::{net::TcpListener, process::Output, sync::Arc, time::Duration}; +use tokio::{process::Command, sync::broadcast}; + +const INSPECTOR_PACKAGE: &str = "@modelcontextprotocol/inspector"; + +struct TestServer { + url: String, + token: String, + handle: actix_web::dev::ServerHandle, + _temp: tempfile::TempDir, +} + +impl TestServer { + async fn start() -> Self { + let temp = tempfile::TempDir::new().unwrap(); + let manager = Arc::new(atomic_core::DatabaseManager::new(temp.path()).unwrap()); + let (_info, token) = manager + .active_core() + .await + .unwrap() + .create_api_token("mcp-inspector-test") + .await + .unwrap(); + let (event_tx, _) = broadcast::channel::(16); + let state = web::Data::new(AppState { + manager: Arc::clone(&manager), + event_tx: event_tx.clone(), + public_url: None, + log_buffer: LogBuffer::new(16), + export_jobs: ExportJobManager::for_tests(temp.path().join("exports")), + setup_token: None, + dangerously_skip_setup_token: false, + setup_claim_lock: tokio::sync::Mutex::new(()), + setup_claim_limiter: SetupClaimLimiter::new(), + }); + let transport = AtomicMcpTransport::new(manager, event_tx, Duration::from_secs(30)); + let listener = TcpListener::bind(("127.0.0.1", 0)).unwrap(); + let addr = listener.local_addr().unwrap(); + let server = HttpServer::new(move || { + App::new().service( + web::scope("/mcp") + .wrap(McpAuth { + state: state.clone(), + }) + .service(transport.clone().scope()), + ) + }) + .listen(listener) + .unwrap() + .run(); + let handle = server.handle(); + tokio::spawn(server); + + Self { + url: format!("http://{addr}/mcp"), + token, + handle, + _temp: temp, + } + } + + async fn stop(self) { + self.handle.stop(true).await; + } +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn mcp_inspector_cli_can_list_atomic_tools() { + let require_inspector = std::env::var_os("ATOMIC_RUN_MCP_INSPECTOR").is_some(); + let npx_args = if require_inspector { + vec!["-y", INSPECTOR_PACKAGE] + } else if inspector_available_without_install().await { + vec!["--no-install", INSPECTOR_PACKAGE] + } else { + eprintln!( + "skipping MCP Inspector smoke test; set ATOMIC_RUN_MCP_INSPECTOR=1 to require it" + ); + return; + }; + + let server = TestServer::start().await; + let output = run_inspector(&npx_args, &server.url, &server.token).await; + server.stop().await; + + let output = output.unwrap_or_else(|error| panic!("failed to run MCP Inspector: {error}")); + let stdout = String::from_utf8_lossy(&output.stdout); + let stderr = String::from_utf8_lossy(&output.stderr); + assert!( + output.status.success(), + "MCP Inspector failed with status {}\nstdout:\n{}\nstderr:\n{}", + output.status, + stdout, + stderr + ); + + let combined = format!("{stdout}\n{stderr}"); + assert!( + combined.contains("semantic_search"), + "Inspector tools/list output did not include Atomic tools\nstdout:\n{}\nstderr:\n{}", + stdout, + stderr + ); +} + +async fn inspector_available_without_install() -> bool { + let mut command = Command::new("npx"); + command + .arg("--no-install") + .arg(INSPECTOR_PACKAGE) + .arg("--version"); + + matches!( + tokio::time::timeout(Duration::from_secs(3), command.output()).await, + Ok(Ok(output)) if output.status.success() + ) +} + +async fn run_inspector(args: &[&str], url: &str, token: &str) -> std::io::Result { + let mut command = Command::new("npx"); + command + .args(args) + .arg("--cli") + .arg(url) + .arg("--transport") + .arg("http") + .arg("--method") + .arg("tools/list") + .arg("--header") + .arg(format!("Authorization: Bearer {token}")) + .env("MCP_AUTO_OPEN_ENABLED", "false"); + + match tokio::time::timeout(Duration::from_secs(60), command.output()).await { + Ok(result) => result, + Err(_) => Err(std::io::Error::new( + std::io::ErrorKind::TimedOut, + "MCP Inspector timed out", + )), + } +} diff --git a/package.json b/package.json index 0292a0f1..ad436213 100644 --- a/package.json +++ b/package.json @@ -27,6 +27,7 @@ "preview": "vite preview", "test": "vitest run", "test:watch": "vitest", + "test:mcp-inspector": "node scripts/check-mcp-inspector.js", "tauri": "tauri", "openapi:generate": "cargo run -p atomic-server --bin export-openapi -- public/openapi.json", "build:mcp-bridge": "node scripts/build-mcp-bridge.js", diff --git a/scripts/check-mcp-inspector.js b/scripts/check-mcp-inspector.js new file mode 100644 index 00000000..f380b9a6 --- /dev/null +++ b/scripts/check-mcp-inspector.js @@ -0,0 +1,49 @@ +#!/usr/bin/env node + +import { spawnSync } from 'node:child_process'; + +const url = process.env.ATOMIC_MCP_URL ?? process.argv[2]; +const token = process.env.ATOMIC_API_TOKEN ?? process.argv[3]; + +if (!url || !token) { + console.error( + [ + 'Usage:', + ' ATOMIC_MCP_URL=http://localhost:8080/mcp ATOMIC_API_TOKEN= npm run test:mcp-inspector', + ' npm run test:mcp-inspector -- http://localhost:8080/mcp ', + '', + 'This check expects an Atomic server to already be running.', + ].join('\n'), + ); + process.exit(2); +} + +const result = spawnSync( + 'npx', + [ + '-y', + '@modelcontextprotocol/inspector', + '--cli', + url, + '--transport', + 'http', + '--method', + 'tools/list', + '--header', + `Authorization: Bearer ${token}`, + ], + { + stdio: 'inherit', + env: { + ...process.env, + MCP_AUTO_OPEN_ENABLED: 'false', + }, + }, +); + +if (result.error) { + console.error(result.error.message); + process.exit(1); +} + +process.exit(result.status ?? 1);