diff --git a/CMakeLists.txt b/CMakeLists.txt index b6e8927..fcbb86f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -32,6 +32,7 @@ add_library(fastmcpp_core src/server/http_server.cpp src/server/stdio_server.cpp src/server/sse_server.cpp + src/server/streamable_http_server.cpp src/client/client.cpp src/client/transports.cpp src/util/json_schema.cpp @@ -303,6 +304,11 @@ if(FASTMCPP_BUILD_TESTS) target_link_libraries(fastmcpp_server_sse_http_integration PRIVATE fastmcpp_core) add_test(NAME fastmcpp_server_sse_http_integration COMMAND fastmcpp_server_sse_http_integration) + # Streamable HTTP integration (MCP spec 2025-03-26) + add_executable(fastmcpp_server_streamable_http_integration tests/server/streamable_http_integration.cpp) + target_link_libraries(fastmcpp_server_streamable_http_integration PRIVATE fastmcpp_core) + add_test(NAME fastmcpp_server_streamable_http_integration COMMAND fastmcpp_server_streamable_http_integration) + add_executable(fastmcpp_server_auth_cors_security tests/server/auth_cors_security.cpp) target_link_libraries(fastmcpp_server_auth_cors_security PRIVATE fastmcpp_core) add_test(NAME fastmcpp_server_auth_cors_security COMMAND fastmcpp_server_auth_cors_security) diff --git a/README.md b/README.md index a4e7603..bde1227 100644 --- a/README.md +++ b/README.md @@ -15,16 +15,22 @@ fastmcpp is a C++ port of the Python [fastmcp](https://github.com/jlowin/fastmcp **Status:** Beta – core MCP features track the Python `fastmcp` reference. -**Current version:** 2.13.0 +**Current version:** 2.14.0 ## Features - Core MCP protocol implementation (JSON‑RPC). -- Multiple transports: STDIO, HTTP (SSE), WebSocket. +- Multiple transports: STDIO, HTTP (SSE), Streamable HTTP, WebSocket. +- Streamable HTTP transport (MCP spec 2025-03-26) with session management. - Tool management and invocation. - Resources and prompts support. +- Resource templates with URI pattern matching. - JSON Schema validation. -- Middleware for request/response processing. +- McpApp high-level application class. +- ProxyApp for backend server proxying. +- ServerSession for bidirectional communication, sampling, and server-initiated notifications. +- Built-in middleware: Logging, Timing, Caching, RateLimiting, ErrorHandling. +- Tool transforms for input/output processing. - Integration with MCP‑compatible CLI tools. - Cross‑platform: Windows, Linux, macOS. @@ -105,12 +111,6 @@ ctest --test-dir build -C Release -R fastmcp_smoke --output-on-failure ctest --test-dir build -C Release -N ``` -Current status (CI / WSL configuration): - -- 24/24 tests passing (100% success rate). -- 3 streaming tests disabled due to infrastructure dependencies. -- C++ test line count is much smaller than the Python `fastmcp` suite (see CCSDK parity docs). - ## Basic Usage ### STDIO MCP server @@ -177,6 +177,61 @@ int main() { } ``` +### Streamable HTTP server (MCP spec 2025-03-26) + +```cpp +#include +#include +#include + +int main() { + fastmcpp::tools::ToolManager tm; + // register tools on tm... + + auto handler = fastmcpp::mcp::make_mcp_handler( + "myserver", "1.0.0", tm + ); + + // Streamable HTTP server on /mcp endpoint + fastmcpp::server::StreamableHttpServerWrapper server( + handler, "127.0.0.1", 8080, "/mcp" + ); + server.start(); // non-blocking + + std::this_thread::sleep_for(std::chrono::hours(1)); + server.stop(); + return 0; +} +``` + +### Streamable HTTP client + +```cpp +#include + +int main() { + fastmcpp::client::StreamableHttpTransport transport( + "http://localhost:8080", "/mcp" + ); + + // Send initialize request + auto init_response = transport.request("mcp", { + {"jsonrpc", "2.0"}, + {"id", 1}, + {"method", "initialize"}, + {"params", { + {"protocolVersion", "2024-11-05"}, + {"capabilities", {}}, + {"clientInfo", {{"name", "client"}, {"version", "1.0"}}} + }} + }); + + // Session ID is automatically managed via Mcp-Session-Id header + std::cout << "Session: " << transport.session_id() << std::endl; + return 0; +} +``` + ## Examples See the `examples/` directory for complete programs, including: diff --git a/include/fastmcpp/client/transports.hpp b/include/fastmcpp/client/transports.hpp index 98ce5e0..a7eb176 100644 --- a/include/fastmcpp/client/transports.hpp +++ b/include/fastmcpp/client/transports.hpp @@ -137,4 +137,57 @@ class SseClientTransport : public ITransport std::unordered_map> pending_requests_; }; +/// Streamable HTTP client transport for connecting to MCP servers using the +/// Streamable HTTP protocol (MCP spec version 2025-03-26). +/// +/// This transport is simpler than SSE: +/// 1. Client sends POST requests to a single endpoint (default: /mcp) +/// 2. Server responds with JSON or SSE stream +/// 3. Session ID management via Mcp-Session-Id header +/// +/// Reference: https://spec.modelcontextprotocol.io/specification/2025-03-26/basic/transports/ +class StreamableHttpTransport : public ITransport +{ + public: + /// Construct a Streamable HTTP client transport + /// @param base_url The base URL of the MCP server (e.g., "http://127.0.0.1:8080") + /// @param mcp_path Path for the MCP endpoint (default: "/mcp") + /// @param headers Additional headers to include in requests + explicit StreamableHttpTransport(std::string base_url, std::string mcp_path = "/mcp", + std::unordered_map headers = {}); + + ~StreamableHttpTransport(); + + /// Send a JSON-RPC request and wait for response + fastmcpp::Json request(const std::string& route, const fastmcpp::Json& payload) override; + + /// Get the session ID (set after successful initialize) + std::string session_id() const; + + /// Check if a session ID has been set + bool has_session() const; + + /// Set callback for handling server-initiated notifications during streaming responses + void set_notification_callback(std::function callback); + + private: + void parse_session_id_from_response(const std::string& headers); + fastmcpp::Json parse_response(const std::string& body, const std::string& content_type); + void process_sse_line(const std::string& line, std::vector& messages); + + std::string base_url_; + std::string mcp_path_; + std::unordered_map headers_; + + // Session management + mutable std::mutex session_mutex_; + std::string session_id_; + + // Notification handling + std::function notification_callback_; + + // Request ID generation + std::atomic next_id_{1}; +}; + } // namespace fastmcpp::client diff --git a/include/fastmcpp/server/session.hpp b/include/fastmcpp/server/session.hpp index b6f7089..dcb359e 100644 --- a/include/fastmcpp/server/session.hpp +++ b/include/fastmcpp/server/session.hpp @@ -257,6 +257,41 @@ class ServerSession return true; } + /** + * Send a notification to the client (fire-and-forget, no response expected). + * + * @param method The JSON-RPC method name (e.g., "notifications/progress") + * @param params Notification parameters + */ + void send_notification(const std::string& method, const Json& params = Json::object()) + { + Json notification = {{"jsonrpc", "2.0"}, {"method", method}, {"params", params}}; + + if (send_callback_) + send_callback_(notification); + } + + /** + * Send a progress notification to the client. + * + * @param progress_token Token identifying the operation (from request _meta.progressToken) + * @param progress Current progress value + * @param total Total progress value (optional) + * @param message Progress message (optional) + */ + void send_progress(const std::string& progress_token, double progress, double total = 0.0, + const std::string& message = "") + { + Json params = {{"progressToken", progress_token}, {"progress", progress}}; + + if (total > 0.0) + params["total"] = total; + if (!message.empty()) + params["message"] = message; + + send_notification("notifications/progress", params); + } + /** * Check if a JSON message is a response (has id, no method). */ diff --git a/include/fastmcpp/server/streamable_http_server.hpp b/include/fastmcpp/server/streamable_http_server.hpp new file mode 100644 index 0000000..dffa848 --- /dev/null +++ b/include/fastmcpp/server/streamable_http_server.hpp @@ -0,0 +1,164 @@ +#pragma once +#include "fastmcpp/server/session.hpp" +#include "fastmcpp/types.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace fastmcpp::server +{ + +/** + * Streamable HTTP MCP server wrapper. + * + * This transport implements the Streamable HTTP protocol for MCP communication + * per MCP spec version 2025-03-26: + * - Single POST endpoint (default: /mcp) + * - Session ID management via Mcp-Session-Id header + * - Responses can be JSON or SSE stream + * + * This is a simpler transport than SSE with a single endpoint. + * Clients send JSON-RPC requests via POST and receive responses in the + * same HTTP response (either as JSON or SSE stream for long-running operations). + * + * Usage: + * auto handler = fastmcpp::mcp::make_mcp_handler("myserver", "1.0.0", tools); + * StreamableHttpServerWrapper server(handler); + * server.start(); // Non-blocking - runs in background thread + * // ... server runs ... + * server.stop(); // Graceful shutdown + * + * Reference: https://spec.modelcontextprotocol.io/specification/2025-03-26/basic/transports/ + */ +class StreamableHttpServerWrapper +{ + public: + using McpHandler = std::function; + + /** + * Construct a Streamable HTTP server with an MCP handler. + * + * @param handler Function that processes JSON-RPC requests and returns responses + * @param host Host address to bind to (default: "127.0.0.1") + * @param port Port to listen on (default: 18080) + * @param mcp_path Path for the MCP POST endpoint (default: "/mcp") + * @param auth_token Optional auth token for Bearer authentication (empty = no auth required) + * @param cors_origin Optional CORS origin to allow (empty = no CORS header, use "*" for + * wildcard) + */ + explicit StreamableHttpServerWrapper(McpHandler handler, std::string host = "127.0.0.1", + int port = 18080, std::string mcp_path = "/mcp", + std::string auth_token = "", std::string cors_origin = ""); + + ~StreamableHttpServerWrapper(); + + /** + * Start the server in background (non-blocking). + * + * Launches a background thread that runs the HTTP server. + * Use stop() to terminate. + * + * @return true if server started successfully + */ + bool start(); + + /** + * Stop the server. + * + * Signals the server to stop and joins the background thread. + * Safe to call multiple times. + */ + void stop(); + + /** + * Check if server is currently running. + */ + bool running() const + { + return running_.load(); + } + + /** + * Get the port the server is listening on. + */ + int port() const + { + return port_; + } + + /** + * Get the host address the server is bound to. + */ + const std::string& host() const + { + return host_; + } + + /** + * Get the MCP endpoint path. + */ + const std::string& mcp_path() const + { + return mcp_path_; + } + + /** + * Get the ServerSession for a given session ID. + * + * This allows server-initiated requests (sampling, elicitation) via + * the session's bidirectional transport. + * + * @param session_id The session to get + * @return Shared pointer to ServerSession, or nullptr if not found + */ + std::shared_ptr get_session(const std::string& session_id) const + { + std::lock_guard lock(sessions_mutex_); + auto it = sessions_.find(session_id); + if (it == sessions_.end()) + return nullptr; + return it->second; + } + + /** + * Get the number of active sessions. + */ + size_t session_count() const + { + std::lock_guard lock(sessions_mutex_); + return sessions_.size(); + } + + private: + void run_server(); + std::string generate_session_id(); + bool check_auth(const std::string& auth_header) const; + + McpHandler handler_; + std::string host_; + int port_; + std::string mcp_path_; + std::string auth_token_; // Optional Bearer token for authentication + std::string cors_origin_; // Optional CORS origin (empty = no CORS) + + std::unique_ptr svr_; + std::thread thread_; + std::atomic running_{false}; + + // Security limits + static constexpr size_t MAX_SESSIONS = 1000; + + // Active sessions mapped by session ID + std::unordered_map> sessions_; + mutable std::mutex sessions_mutex_; +}; + +} // namespace fastmcpp::server diff --git a/src/client/transports.cpp b/src/client/transports.cpp index 6f7dc40..3e6aba0 100644 --- a/src/client/transports.cpp +++ b/src/client/transports.cpp @@ -810,4 +810,180 @@ fastmcpp::Json SseClientTransport::request(const std::string& route, const fastm return fastmcpp::Json::object(); } +// ============================================================================= +// StreamableHttpTransport implementation +// ============================================================================= + +StreamableHttpTransport::StreamableHttpTransport( + std::string base_url, std::string mcp_path, + std::unordered_map headers) + : base_url_(std::move(base_url)), mcp_path_(std::move(mcp_path)), headers_(std::move(headers)) +{ +} + +StreamableHttpTransport::~StreamableHttpTransport() = default; + +std::string StreamableHttpTransport::session_id() const +{ + std::lock_guard lock(session_mutex_); + return session_id_; +} + +bool StreamableHttpTransport::has_session() const +{ + std::lock_guard lock(session_mutex_); + return !session_id_.empty(); +} + +void StreamableHttpTransport::set_notification_callback( + std::function callback) +{ + notification_callback_ = std::move(callback); +} + +void StreamableHttpTransport::parse_session_id_from_response(const std::string& header_value) +{ + // The header comes as "Mcp-Session-Id: " + // We receive just the value from httplib + if (!header_value.empty()) + { + std::lock_guard lock(session_mutex_); + session_id_ = header_value; + } +} + +void StreamableHttpTransport::process_sse_line(const std::string& line, + std::vector& messages) +{ + // Parse SSE data lines (format: "data: {json}") + if (line.rfind("data:", 0) == 0) + { + std::string data_part = line.substr(5); + if (!data_part.empty() && data_part[0] == ' ') + data_part.erase(0, 1); + if (!data_part.empty()) + { + try + { + auto json = fastmcpp::util::json::parse(data_part); + messages.push_back(std::move(json)); + } + catch (...) + { + // Ignore parse errors + } + } + } +} + +fastmcpp::Json StreamableHttpTransport::parse_response(const std::string& body, + const std::string& content_type) +{ + // Check if response is SSE stream + bool is_sse = content_type.find("text/event-stream") != std::string::npos; + + if (is_sse) + { + // Parse SSE events and collect all JSON messages + std::vector messages; + std::istringstream stream(body); + std::string line; + + while (std::getline(stream, line)) + { + // Strip trailing \r for CRLF + if (!line.empty() && line.back() == '\r') + line.pop_back(); + + process_sse_line(line, messages); + } + + // Process messages - notifications go to callback, find the main response + fastmcpp::Json response; + for (const auto& msg : messages) + { + // Check if this is a notification (has method, no id) + if (msg.contains("method") && !msg.contains("id")) + { + if (notification_callback_) + notification_callback_(msg); + } + else if (msg.contains("id")) + { + // This is the response we're looking for + response = msg; + } + } + + return response; + } + else + { + // Plain JSON response + return fastmcpp::util::json::parse(body); + } +} + +fastmcpp::Json StreamableHttpTransport::request(const std::string& /*route*/, + const fastmcpp::Json& payload) +{ + auto url = parse_url(base_url_); + + // Create client + std::string full_url = url.scheme + "://" + url.host + ":" + std::to_string(url.port); + httplib::Client cli(full_url.c_str()); + cli.set_connection_timeout(10, 0); + cli.set_read_timeout(60, 0); // Longer timeout for streaming + cli.set_keep_alive(true); + + // Build request headers + httplib::Headers request_headers = {{"Accept", "application/json, text/event-stream"}, + {"Content-Type", "application/json"}}; + + // Add custom headers + for (const auto& [key, value] : headers_) + request_headers.emplace(key, value); + + // Add session ID if we have one + { + std::lock_guard lock(session_mutex_); + if (!session_id_.empty()) + request_headers.emplace("Mcp-Session-Id", session_id_); + } + + // Payload is the full JSON-RPC request + // (StreamableHttp transport accepts complete JSON-RPC requests, unlike other transports) + + // Send request + auto res = cli.Post(mcp_path_.c_str(), request_headers, payload.dump(), "application/json"); + + if (!res) + throw fastmcpp::TransportError("StreamableHttp request failed: no response"); + + if (res->status < 200 || res->status >= 300) + throw fastmcpp::TransportError("StreamableHttp error: " + std::to_string(res->status)); + + // Check for session ID in response headers + auto session_header = res->headers.find("Mcp-Session-Id"); + if (session_header != res->headers.end()) + parse_session_id_from_response(session_header->second); + + // Also check lowercase (some servers may use different casing) + session_header = res->headers.find("mcp-session-id"); + if (session_header != res->headers.end()) + parse_session_id_from_response(session_header->second); + + // Get content type + std::string content_type = "application/json"; + auto ct_header = res->headers.find("Content-Type"); + if (ct_header != res->headers.end()) + content_type = ct_header->second; + + // Parse response + auto rpc_response = parse_response(res->body, content_type); + + // Return full JSON-RPC response (caller handles error/result extraction) + return rpc_response; +} + } // namespace fastmcpp::client diff --git a/src/mcp/handler.cpp b/src/mcp/handler.cpp index 1b06071..bd56f7b 100644 --- a/src/mcp/handler.cpp +++ b/src/mcp/handler.cpp @@ -81,6 +81,12 @@ make_mcp_handler(const std::string& server_name, const std::string& version, }}}; } + if (method == "ping") + { + return fastmcpp::Json{ + {"jsonrpc", "2.0"}, {"id", id}, {"result", fastmcpp::Json::object()}}; + } + if (method == "tools/list") { fastmcpp::Json tools_array = fastmcpp::Json::array(); @@ -257,6 +263,12 @@ std::function make_mcp_handler( {"serverInfo", serverInfo}}}}; } + if (method == "ping") + { + return fastmcpp::Json{ + {"jsonrpc", "2.0"}, {"id", id}, {"result", fastmcpp::Json::object()}}; + } + if (method == "tools/list") { // Build base tools list from tools_meta @@ -473,6 +485,12 @@ make_mcp_handler(const std::string& server_name, const std::string& version, {"serverInfo", serverInfo}}}}; } + if (method == "ping") + { + return fastmcpp::Json{ + {"jsonrpc", "2.0"}, {"id", id}, {"result", fastmcpp::Json::object()}}; + } + if (method == "tools/list") { fastmcpp::Json tools_array = fastmcpp::Json::array(); @@ -632,6 +650,12 @@ make_mcp_handler(const std::string& server_name, const std::string& version, {"serverInfo", serverInfo}}}}; } + if (method == "ping") + { + return fastmcpp::Json{ + {"jsonrpc", "2.0"}, {"id", id}, {"result", fastmcpp::Json::object()}}; + } + if (method == "tools/list") { fastmcpp::Json tools_array = fastmcpp::Json::array(); @@ -904,6 +928,12 @@ std::function make_mcp_handler(const McpA {"serverInfo", serverInfo}}}}; } + if (method == "ping") + { + return fastmcpp::Json{ + {"jsonrpc", "2.0"}, {"id", id}, {"result", fastmcpp::Json::object()}}; + } + if (method == "tools/list") { fastmcpp::Json tools_array = fastmcpp::Json::array(); @@ -1158,6 +1188,12 @@ std::function make_mcp_handler(const Prox {"serverInfo", serverInfo}}}}; } + if (method == "ping") + { + return fastmcpp::Json{ + {"jsonrpc", "2.0"}, {"id", id}, {"result", fastmcpp::Json::object()}}; + } + // Tools if (method == "tools/list") { @@ -1545,6 +1581,12 @@ make_mcp_handler_with_sampling(const McpApp& app, SessionAccessor session_access {"serverInfo", serverInfo}}}}; } + if (method == "ping") + { + return fastmcpp::Json{ + {"jsonrpc", "2.0"}, {"id", id}, {"result", fastmcpp::Json::object()}}; + } + if (method == "tools/list") { fastmcpp::Json tools_array = fastmcpp::Json::array(); diff --git a/src/server/streamable_http_server.cpp b/src/server/streamable_http_server.cpp new file mode 100644 index 0000000..62d4251 --- /dev/null +++ b/src/server/streamable_http_server.cpp @@ -0,0 +1,333 @@ +#include "fastmcpp/server/streamable_http_server.hpp" + +#include "fastmcpp/exceptions.hpp" +#include "fastmcpp/util/json.hpp" + +#include +#include +#include +#include +#include +#include + +namespace fastmcpp::server +{ + +StreamableHttpServerWrapper::StreamableHttpServerWrapper(McpHandler handler, std::string host, + int port, std::string mcp_path, + std::string auth_token, + std::string cors_origin) + : handler_(std::move(handler)), host_(std::move(host)), port_(port), + mcp_path_(std::move(mcp_path)), auth_token_(std::move(auth_token)), + cors_origin_(std::move(cors_origin)) +{ +} + +StreamableHttpServerWrapper::~StreamableHttpServerWrapper() +{ + stop(); +} + +bool StreamableHttpServerWrapper::check_auth(const std::string& auth_header) const +{ + // If no auth token configured, allow all requests + if (auth_token_.empty()) + return true; + + // Check for "Bearer " format + if (auth_header.find("Bearer ") != 0) + return false; + + std::string provided_token = auth_header.substr(7); // Skip "Bearer " + return provided_token == auth_token_; +} + +std::string StreamableHttpServerWrapper::generate_session_id() +{ + // Generate cryptographically secure random session ID (128 bits = 32 hex chars) + std::random_device rd; + std::mt19937_64 gen(rd()); + std::uniform_int_distribution dis; + + uint64_t high = dis(gen); + uint64_t low = dis(gen); + + std::ostringstream oss; + oss << std::hex << std::setfill('0') << std::setw(16) << high << std::setw(16) << low; + return oss.str(); +} + +void StreamableHttpServerWrapper::run_server() +{ + svr_->listen(host_.c_str(), port_); + running_ = false; +} + +bool StreamableHttpServerWrapper::start() +{ + if (running_) + return false; + + svr_ = std::make_unique(); + + // Security: Set payload and timeout limits to prevent DoS + svr_->set_payload_max_length(10 * 1024 * 1024); // 10MB max payload + svr_->set_read_timeout(30, 0); // 30 second read timeout + svr_->set_write_timeout(30, 0); // 30 second write timeout + + // Handle OPTIONS for CORS preflight + if (!cors_origin_.empty()) + { + svr_->Options(mcp_path_, + [this](const httplib::Request&, httplib::Response& res) + { + res.set_header("Access-Control-Allow-Origin", cors_origin_); + res.set_header("Access-Control-Allow-Methods", "POST, OPTIONS"); + res.set_header("Access-Control-Allow-Headers", + "Content-Type, Authorization, Mcp-Session-Id"); + res.status = 204; + }); + } + + // Set up MCP endpoint (POST) + svr_->Post( + mcp_path_, + [this](const httplib::Request& req, httplib::Response& res) + { + try + { + // Security: Check authentication if configured + if (!auth_token_.empty()) + { + auto auth_it = req.headers.find("Authorization"); + if (auth_it == req.headers.end() || !check_auth(auth_it->second)) + { + res.status = 401; + res.set_content("{\"error\":\"Unauthorized\"}", "application/json"); + return; + } + } + + // Security: Only set CORS header if explicitly configured + if (!cors_origin_.empty()) + res.set_header("Access-Control-Allow-Origin", cors_origin_); + + // Parse JSON-RPC message + auto message = fastmcpp::util::json::parse(req.body); + + // Check for Mcp-Session-Id header + std::string session_id; + auto session_it = req.headers.find("Mcp-Session-Id"); + if (session_it != req.headers.end()) + session_id = session_it->second; + + // Handle initialize request - creates new session + bool is_initialize = message.contains("method") && + message["method"].get() == "initialize"; + + if (is_initialize) + { + // Security: Check session limit before creating new session + { + std::lock_guard lock(sessions_mutex_); + if (sessions_.size() >= MAX_SESSIONS) + { + res.status = 503; // Service Unavailable + res.set_content("{\"error\":\"Maximum sessions reached\"}", + "application/json"); + return; + } + } + + // Generate new session ID + session_id = generate_session_id(); + + // Create ServerSession for this session + // Note: For streamable HTTP, responses go back in HTTP response, + // so the send callback is not used for normal responses. + // It could be used for server-initiated requests in the future. + auto session = std::make_shared(session_id, nullptr); + + { + std::lock_guard lock(sessions_mutex_); + sessions_[session_id] = session; + } + } + else if (session_id.empty()) + { + // Non-initialize request without session ID + res.status = 400; + res.set_content("{\"error\":\"Mcp-Session-Id header required\"}", + "application/json"); + return; + } + else + { + // Verify session exists + std::lock_guard lock(sessions_mutex_); + if (sessions_.find(session_id) == sessions_.end()) + { + res.status = 404; + res.set_content("{\"error\":\"Invalid or expired session\"}", + "application/json"); + return; + } + } + + // Inject session_id into request meta for handler access + if (!message.contains("params")) + message["params"] = Json::object(); + if (!message["params"].contains("_meta")) + message["params"]["_meta"] = Json::object(); + message["params"]["_meta"]["session_id"] = session_id; + + // Check if this is a response to a server-initiated request + if (ServerSession::is_response(message)) + { + // Get the session and route the response + std::shared_ptr session; + { + std::lock_guard lock(sessions_mutex_); + auto it = sessions_.find(session_id); + if (it != sessions_.end()) + session = it->second; + } + + if (session) + { + bool handled = session->handle_response(message); + if (handled) + { + res.set_header("Mcp-Session-Id", session_id); + res.set_content("{\"status\":\"ok\"}", "application/json"); + res.status = 200; + return; + } + } + + // Response not handled (unknown request ID) + res.status = 400; + res.set_content("{\"error\":\"Unknown response ID\"}", "application/json"); + return; + } + + // Normal request - process with handler + auto response = handler_(message); + + // Set session ID header in response + res.set_header("Mcp-Session-Id", session_id); + + // Return JSON response + res.set_content(response.dump(), "application/json"); + res.status = 200; + } + catch (const fastmcpp::NotFoundError& e) + { + // Method/tool not found → -32601 + fastmcpp::Json error_response; + error_response["jsonrpc"] = "2.0"; + try + { + auto request = fastmcpp::util::json::parse(req.body); + if (request.contains("id")) + error_response["id"] = request["id"]; + } + catch (...) + { + } + error_response["error"] = {{"code", -32601}, {"message", std::string(e.what())}}; + + res.set_content(error_response.dump(), "application/json"); + res.status = 200; // JSON-RPC errors are still 200 OK at HTTP level + } + catch (const fastmcpp::ValidationError& e) + { + // Invalid params → -32602 + fastmcpp::Json error_response; + error_response["jsonrpc"] = "2.0"; + try + { + auto request = fastmcpp::util::json::parse(req.body); + if (request.contains("id")) + error_response["id"] = request["id"]; + } + catch (...) + { + } + error_response["error"] = {{"code", -32602}, {"message", std::string(e.what())}}; + + res.set_content(error_response.dump(), "application/json"); + res.status = 200; + } + catch (const std::exception& e) + { + // Internal error → -32603 + fastmcpp::Json error_response; + error_response["jsonrpc"] = "2.0"; + try + { + auto request = fastmcpp::util::json::parse(req.body); + if (request.contains("id")) + error_response["id"] = request["id"]; + } + catch (...) + { + } + error_response["error"] = {{"code", -32603}, {"message", std::string(e.what())}}; + + res.set_content(error_response.dump(), "application/json"); + res.status = 500; + } + }); + + // Handle GET request to return 405 Method Not Allowed + svr_->Get(mcp_path_, + [](const httplib::Request&, httplib::Response& res) + { + res.status = 405; + res.set_header("Allow", "POST"); + res.set_header("Content-Type", "application/json"); + + fastmcpp::Json error_response = { + {"error", "Method Not Allowed"}, + {"message", "The MCP endpoint only supports POST requests."}}; + + res.set_content(error_response.dump(), "application/json"); + }); + + running_ = true; + + thread_ = std::thread([this]() { run_server(); }); + + // Wait for server to be ready using GET (returns 405, but shows server is up) + for (int attempt = 0; attempt < 20; ++attempt) + { + httplib::Client probe(host_.c_str(), port_); + probe.set_connection_timeout(std::chrono::seconds(2)); + probe.set_read_timeout(std::chrono::seconds(2)); + auto res = probe.Get(mcp_path_.c_str()); + if (res) + break; + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + } + + return true; +} + +void StreamableHttpServerWrapper::stop() +{ + running_ = false; + + // Clear sessions + { + std::lock_guard lock(sessions_mutex_); + sessions_.clear(); + } + + if (svr_) + svr_->stop(); + if (thread_.joinable()) + thread_.join(); +} + +} // namespace fastmcpp::server diff --git a/tests/client/sse_session_client.cpp b/tests/client/sse_session_client.cpp index 2d1ccd3..9c2de92 100644 --- a/tests/client/sse_session_client.cpp +++ b/tests/client/sse_session_client.cpp @@ -37,7 +37,8 @@ int main() return 1; } - std::this_thread::sleep_for(std::chrono::milliseconds(200)); + // Wait for server to be ready + std::this_thread::sleep_for(std::chrono::milliseconds(500)); std::cout << "[1/1] Test: HttpTransport with real HTTP server\n"; diff --git a/tests/server/patterns.cpp b/tests/server/patterns.cpp index b001db6..ddbe1b3 100644 --- a/tests/server/patterns.cpp +++ b/tests/server/patterns.cpp @@ -26,7 +26,7 @@ void test_multiple_routes() server::HttpServerWrapper http{srv, "127.0.0.1", 18400}; http.start(); - std::this_thread::sleep_for(std::chrono::milliseconds(100)); + std::this_thread::sleep_for(std::chrono::milliseconds(500)); client::HttpTransport client{"127.0.0.1:18400"}; @@ -52,7 +52,7 @@ void test_route_override() server::HttpServerWrapper http{srv, "127.0.0.1", 18401}; http.start(); - std::this_thread::sleep_for(std::chrono::milliseconds(100)); + std::this_thread::sleep_for(std::chrono::milliseconds(500)); client::HttpTransport client{"127.0.0.1:18401"}; assert(client.request("test", Json::object())["version"] == 1); @@ -85,7 +85,7 @@ void test_large_response() server::HttpServerWrapper http{srv, "127.0.0.1", 18402}; http.start(); - std::this_thread::sleep_for(std::chrono::milliseconds(100)); + std::this_thread::sleep_for(std::chrono::milliseconds(500)); client::HttpTransport client{"127.0.0.1:18402"}; @@ -115,7 +115,7 @@ void test_large_request() server::HttpServerWrapper http{srv, "127.0.0.1", 18403}; http.start(); - std::this_thread::sleep_for(std::chrono::milliseconds(100)); + std::this_thread::sleep_for(std::chrono::milliseconds(500)); // Create large request Json values = Json::array(); @@ -160,7 +160,7 @@ void test_handler_with_state() server::HttpServerWrapper http{srv, "127.0.0.1", 18404}; http.start(); - std::this_thread::sleep_for(std::chrono::milliseconds(100)); + std::this_thread::sleep_for(std::chrono::milliseconds(500)); client::HttpTransport client{"127.0.0.1:18404"}; @@ -199,7 +199,7 @@ void test_various_return_types() server::HttpServerWrapper http{srv, "127.0.0.1", 18405}; http.start(); - std::this_thread::sleep_for(std::chrono::milliseconds(100)); + std::this_thread::sleep_for(std::chrono::milliseconds(500)); client::HttpTransport client{"127.0.0.1:18405"}; @@ -225,7 +225,7 @@ void test_unknown_route() server::HttpServerWrapper http{srv, "127.0.0.1", 18406}; http.start(); - std::this_thread::sleep_for(std::chrono::milliseconds(100)); + std::this_thread::sleep_for(std::chrono::milliseconds(500)); client::HttpTransport client{"127.0.0.1:18406"}; @@ -264,7 +264,7 @@ void test_unicode_in_response() server::HttpServerWrapper http{srv, "127.0.0.1", 18407}; http.start(); - std::this_thread::sleep_for(std::chrono::milliseconds(100)); + std::this_thread::sleep_for(std::chrono::milliseconds(500)); client::HttpTransport client{"127.0.0.1:18407"}; @@ -293,7 +293,7 @@ void test_nested_json_request() server::HttpServerWrapper http{srv, "127.0.0.1", 18408}; http.start(); - std::this_thread::sleep_for(std::chrono::milliseconds(100)); + std::this_thread::sleep_for(std::chrono::milliseconds(500)); client::HttpTransport client{"127.0.0.1:18408"}; @@ -318,7 +318,7 @@ void test_sequential_requests() server::HttpServerWrapper http{srv, "127.0.0.1", 18409}; http.start(); - std::this_thread::sleep_for(std::chrono::milliseconds(100)); + std::this_thread::sleep_for(std::chrono::milliseconds(500)); client::HttpTransport client{"127.0.0.1:18409"}; diff --git a/tests/server/sse_mcp_format.cpp b/tests/server/sse_mcp_format.cpp index b55d316..f27499b 100644 --- a/tests/server/sse_mcp_format.cpp +++ b/tests/server/sse_mcp_format.cpp @@ -62,7 +62,8 @@ int main() } std::cout << "[OK] Server started on port " << port << "\n"; - std::this_thread::sleep_for(std::chrono::milliseconds(500)); + // Wait for server to be ready - longer delay for compatibility + std::this_thread::sleep_for(std::chrono::milliseconds(2000)); // Storage for captured SSE events std::vector captured_events; diff --git a/tests/server/sse_session_security.cpp b/tests/server/sse_session_security.cpp index 370b46e..887dc52 100644 --- a/tests/server/sse_session_security.cpp +++ b/tests/server/sse_session_security.cpp @@ -38,7 +38,8 @@ int main() return 1; } - std::this_thread::sleep_for(std::chrono::milliseconds(200)); + // Wait for server to be ready - longer delay for compatibility + std::this_thread::sleep_for(std::chrono::milliseconds(2000)); // Test 1: Verify session_id is cryptographically random (not timestamp) { @@ -137,7 +138,7 @@ int main() // Restart server between tests to ensure clean state sse_server.stop(); - std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + std::this_thread::sleep_for(std::chrono::milliseconds(2000)); if (!sse_server.start()) { @@ -145,7 +146,8 @@ int main() return 1; } - std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + // Wait for server to be ready + std::this_thread::sleep_for(std::chrono::milliseconds(2000)); // Test 2: POST without session_id should be rejected { diff --git a/tests/server/streamable_http_integration.cpp b/tests/server/streamable_http_integration.cpp new file mode 100644 index 0000000..16e1105 --- /dev/null +++ b/tests/server/streamable_http_integration.cpp @@ -0,0 +1,350 @@ +/// @file streamable_http_integration.cpp +/// @brief Integration test for Streamable HTTP transport (MCP spec 2025-03-26) +/// @details Tests real HTTP integration between StreamableHttpServerWrapper and +/// StreamableHttpTransport client. +/// +/// This tests the Streamable HTTP protocol which: +/// - Uses a single POST endpoint (/mcp by default) +/// - Manages sessions via Mcp-Session-Id header +/// - Simpler than SSE (no separate GET endpoint for events) + +#include "fastmcpp/client/transports.hpp" +#include "fastmcpp/mcp/handler.hpp" +#include "fastmcpp/server/streamable_http_server.hpp" +#include "fastmcpp/tools/manager.hpp" + +#include +#include +#include +#include + +using namespace fastmcpp; + +void test_basic_request_response() +{ + std::cout << " test_basic_request_response... " << std::flush; + + const int port = 18350; + const std::string host = "127.0.0.1"; + + // Create MCP handler with simple echo tool + tools::ToolManager tool_mgr; + tools::Tool echo_tool{"echo", + Json{{"type", "object"}, + {"properties", Json{{"message", Json{{"type", "string"}}}}}, + {"required", Json::array({"message"})}}, + Json{{"type", "string"}}, + [](const Json& input) -> Json { return input.at("message"); }}; + tool_mgr.register_tool(echo_tool); + + std::unordered_map descriptions = {{"echo", "Echo the input"}}; + auto handler = mcp::make_mcp_handler("test_server", "1.0.0", tool_mgr, descriptions); + + // Start server + server::StreamableHttpServerWrapper server(handler, host, port, "/mcp"); + bool started = server.start(); + std::cout << "(server.start=" << started << ", running=" << server.running() << ") " + << std::flush; + assert(started && "Server failed to start"); + + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + + try + { + // First test: direct httplib client + std::cout << "(testing direct client) " << std::flush; + httplib::Client direct_cli(host, port); + direct_cli.set_connection_timeout(5, 0); + direct_cli.set_read_timeout(5, 0); + auto direct_res = direct_cli.Get("/mcp"); + std::cout << "(GET result: " << (direct_res ? std::to_string(direct_res->status) : "null") + << ") " << std::flush; + + // Create client transport + client::StreamableHttpTransport transport("http://" + host + ":" + std::to_string(port)); + + // Initialize should work and create a session + Json init_request = {{"jsonrpc", "2.0"}, + {"id", 1}, + {"method", "initialize"}, + {"params", + {{"protocolVersion", "2024-11-05"}, + {"capabilities", Json::object()}, + {"clientInfo", {{"name", "test_client"}, {"version", "1.0.0"}}}}}}; + + auto init_response = transport.request("mcp", init_request); + + // Should have a valid response + assert(init_response.contains("result") && "Initialize should return result"); + assert(init_response["result"].contains("serverInfo") && "Should have serverInfo"); + assert(transport.has_session() && "Should have session after initialize"); + + // List tools + Json list_request = { + {"jsonrpc", "2.0"}, {"id", 2}, {"method", "tools/list"}, {"params", {}}}; + + auto list_response = transport.request("mcp", list_request); + assert(list_response.contains("result") && "tools/list should return result"); + assert(list_response["result"].contains("tools") && "Should have tools array"); + + auto& tools = list_response["result"]["tools"]; + assert(tools.is_array() && tools.size() == 1 && "Should have one tool"); + assert(tools[0]["name"] == "echo" && "Tool should be echo"); + + // Call the echo tool + Json call_request = { + {"jsonrpc", "2.0"}, + {"id", 3}, + {"method", "tools/call"}, + {"params", {{"name", "echo"}, {"arguments", {{"message", "Hello, World!"}}}}}}; + + auto call_response = transport.request("mcp", call_request); + assert(call_response.contains("result") && "tools/call should return result"); + assert(call_response["result"].contains("content") && "Should have content"); + + auto& content = call_response["result"]["content"]; + assert(content.is_array() && content.size() > 0 && "Should have content array"); + assert(content[0]["type"] == "text" && "Content should be text"); + assert(content[0]["text"] == "Hello, World!" && "Echo should return input"); + + std::cout << "PASSED\n"; + } + catch (const std::exception& e) + { + std::cout << "FAILED: " << e.what() << "\n"; + server.stop(); + throw; + } + + server.stop(); +} + +void test_session_management() +{ + std::cout << " test_session_management... " << std::flush; + + const int port = 18351; + const std::string host = "127.0.0.1"; + + // Create minimal handler + tools::ToolManager tool_mgr; + std::unordered_map descriptions; + auto handler = mcp::make_mcp_handler("session_test", "1.0.0", tool_mgr, descriptions); + + server::StreamableHttpServerWrapper server(handler, host, port, "/mcp"); + bool started = server.start(); + std::cout << "(server.start=" << started << ", running=" << server.running() << ") " + << std::flush; + assert(started && "Server failed to start"); + + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + + // Test direct HTTP client first + std::cout << "(testing direct client) " << std::flush; + httplib::Client direct_cli(host, port); + direct_cli.set_connection_timeout(5, 0); + direct_cli.set_read_timeout(5, 0); + auto direct_res = direct_cli.Get("/mcp"); + std::cout << "(GET result: " << (direct_res ? std::to_string(direct_res->status) : "null") + << ") " << std::flush; + + try + { + // Create client without session + client::StreamableHttpTransport transport("http://" + host + ":" + std::to_string(port)); + + // Before initialize, should have no session + assert(!transport.has_session() && "Should have no session before initialize"); + + // Initialize + Json init_request = {{"jsonrpc", "2.0"}, + {"id", 1}, + {"method", "initialize"}, + {"params", + {{"protocolVersion", "2024-11-05"}, + {"capabilities", Json::object()}, + {"clientInfo", {{"name", "test"}, {"version", "1.0"}}}}}}; + + transport.request("mcp", init_request); + + // After initialize, should have session + assert(transport.has_session() && "Should have session after initialize"); + std::string session_id = transport.session_id(); + assert(!session_id.empty() && "Session ID should not be empty"); + + // Session count on server should be 1 + assert(server.session_count() == 1 && "Server should have 1 session"); + + // Second request should reuse session + Json list_request = { + {"jsonrpc", "2.0"}, {"id", 2}, {"method", "tools/list"}, {"params", {}}}; + + transport.request("mcp", list_request); + + // Session ID should still be the same + assert(transport.session_id() == session_id && "Session ID should persist"); + + std::cout << "PASSED\n"; + } + catch (const std::exception& e) + { + std::cout << "FAILED: " << e.what() << "\n"; + server.stop(); + throw; + } + + server.stop(); +} + +void test_server_info() +{ + std::cout << " test_server_info... " << std::flush; + + const int port = 18352; + const std::string host = "127.0.0.1"; + + tools::ToolManager tool_mgr; + std::unordered_map descriptions; + auto handler = mcp::make_mcp_handler("MyTestServer", "2.5.0", tool_mgr, descriptions); + + server::StreamableHttpServerWrapper server(handler, host, port, "/mcp"); + bool started = server.start(); + std::cout << "(server.start=" << started << ", running=" << server.running() << ") " + << std::flush; + assert(started && "Server failed to start"); + + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + + // Test direct HTTP client first (seems to help with connection establishment) + std::cout << "(testing direct client) " << std::flush; + httplib::Client direct_cli(host, port); + direct_cli.set_connection_timeout(5, 0); + direct_cli.set_read_timeout(5, 0); + auto direct_res = direct_cli.Get("/mcp"); + std::cout << "(GET result: " << (direct_res ? std::to_string(direct_res->status) : "null") + << ") " << std::flush; + + try + { + client::StreamableHttpTransport transport("http://" + host + ":" + std::to_string(port)); + + Json init_request = {{"jsonrpc", "2.0"}, + {"id", 1}, + {"method", "initialize"}, + {"params", + {{"protocolVersion", "2024-11-05"}, + {"capabilities", Json::object()}, + {"clientInfo", {{"name", "test"}, {"version", "1.0"}}}}}}; + + auto response = transport.request("mcp", init_request); + + assert(response.contains("result")); + auto& result = response["result"]; + + // Check server info + assert(result.contains("serverInfo")); + auto& server_info = result["serverInfo"]; + assert(server_info["name"] == "MyTestServer"); + assert(server_info["version"] == "2.5.0"); + + std::cout << "PASSED\n"; + } + catch (const std::exception& e) + { + std::cout << "FAILED: " << e.what() << "\n"; + server.stop(); + throw; + } + + server.stop(); +} + +void test_error_handling() +{ + std::cout << " test_error_handling... " << std::flush; + + const int port = 18353; + const std::string host = "127.0.0.1"; + + tools::ToolManager tool_mgr; + std::unordered_map descriptions; + auto handler = mcp::make_mcp_handler("error_test", "1.0.0", tool_mgr, descriptions); + + server::StreamableHttpServerWrapper server(handler, host, port, "/mcp"); + bool started = server.start(); + std::cout << "(server.start=" << started << ", running=" << server.running() << ") " + << std::flush; + assert(started && "Server failed to start"); + + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + + // Test direct HTTP client first (seems to help with connection establishment) + std::cout << "(testing direct client) " << std::flush; + httplib::Client direct_cli(host, port); + direct_cli.set_connection_timeout(5, 0); + direct_cli.set_read_timeout(5, 0); + auto direct_res = direct_cli.Get("/mcp"); + std::cout << "(GET result: " << (direct_res ? std::to_string(direct_res->status) : "null") + << ") " << std::flush; + + try + { + client::StreamableHttpTransport transport("http://" + host + ":" + std::to_string(port)); + + // Initialize first + Json init_request = {{"jsonrpc", "2.0"}, + {"id", 1}, + {"method", "initialize"}, + {"params", + {{"protocolVersion", "2024-11-05"}, + {"capabilities", Json::object()}, + {"clientInfo", {{"name", "test"}, {"version", "1.0"}}}}}}; + + transport.request("mcp", init_request); + + // Call non-existent tool + Json bad_request = {{"jsonrpc", "2.0"}, + {"id", 2}, + {"method", "tools/call"}, + {"params", {{"name", "nonexistent"}, {"arguments", {}}}}}; + + auto error_response = transport.request("mcp", bad_request); + + // Should have error, not result + assert(error_response.contains("error") && "Should have error response"); + assert(error_response["error"].contains("code") && "Error should have code"); + assert(error_response["error"].contains("message") && "Error should have message"); + + std::cout << "PASSED\n"; + } + catch (const std::exception& e) + { + std::cout << "FAILED: " << e.what() << "\n"; + server.stop(); + throw; + } + + server.stop(); +} + +int main() +{ + std::cout << "Streamable HTTP Integration Tests\n"; + std::cout << "==================================\n"; + + try + { + test_basic_request_response(); + test_session_management(); + test_server_info(); + test_error_handling(); + + std::cout << "\nAll tests passed!\n"; + return 0; + } + catch (const std::exception& e) + { + std::cerr << "\nTest failed with exception: " << e.what() << "\n"; + return 1; + } +} diff --git a/tests/server/test_server_session.cpp b/tests/server/test_server_session.cpp index d2407ae..d868f87 100644 --- a/tests/server/test_server_session.cpp +++ b/tests/server/test_server_session.cpp @@ -319,6 +319,64 @@ void test_request_id_generation() std::cout << "PASSED\n"; } +void test_send_notification() +{ + std::cout << " test_send_notification... " << std::flush; + + std::vector sent; + ServerSession session("sess_1", [&](const Json& msg) { sent.push_back(msg); }); + + // Send a generic notification + session.send_notification("notifications/tools/list_changed", {{"reason", "tool_added"}}); + + assert(sent.size() == 1); + assert(sent[0]["jsonrpc"] == "2.0"); + assert(sent[0]["method"] == "notifications/tools/list_changed"); + assert(sent[0]["params"]["reason"] == "tool_added"); + assert(!sent[0].contains("id")); // Notifications have no id + + std::cout << "PASSED\n"; +} + +void test_send_progress() +{ + std::cout << " test_send_progress... " << std::flush; + + std::vector sent; + ServerSession session("sess_1", [&](const Json& msg) { sent.push_back(msg); }); + + // Send progress without message + session.send_progress("token_123", 50, 100); + + assert(sent.size() == 1); + assert(sent[0]["jsonrpc"] == "2.0"); + assert(sent[0]["method"] == "notifications/progress"); + assert(sent[0]["params"]["progressToken"] == "token_123"); + assert(sent[0]["params"]["progress"] == 50); + assert(sent[0]["params"]["total"] == 100); + assert(!sent[0]["params"].contains("message")); + + // Send progress with message + sent.clear(); + session.send_progress("token_456", 75, 100, "Processing..."); + + assert(sent.size() == 1); + assert(sent[0]["params"]["progressToken"] == "token_456"); + assert(sent[0]["params"]["progress"] == 75); + assert(sent[0]["params"]["total"] == 100); + assert(sent[0]["params"]["message"] == "Processing..."); + + // Send progress without total (omits total field) + sent.clear(); + session.send_progress("token_789", 25); + + assert(sent.size() == 1); + assert(sent[0]["params"]["progress"] == 25); + assert(!sent[0]["params"].contains("total")); + + std::cout << "PASSED\n"; +} + int main() { std::cout << "ServerSession Tests\n"; @@ -336,6 +394,8 @@ int main() test_numeric_request_id(); test_multiple_concurrent_requests(); test_request_id_generation(); + test_send_notification(); + test_send_progress(); std::cout << "\nAll tests passed!\n"; return 0;