Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion examples/stdio_mcp_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ int main()
using Json = nlohmann::json;

fastmcpp::tools::ToolManager tm;
int counter_value = 0;
fastmcpp::tools::Tool add{
"add",
Json{{"type", "object"},
Expand All @@ -29,8 +30,27 @@ int main()
}};
tm.register_tool(add);

fastmcpp::tools::Tool counter{
"counter",
Json{{"type", "object"}, {"properties", Json::object()}},
Json{{"type", "array"},
{"items",
Json::array({Json{{"type", "object"},
{"properties", Json{{"type", Json{{"type", "string"}}},
{"text", Json{{"type", "string"}}}}},
{"required", Json::array({"type", "text"})}}})}},
[&counter_value](const Json&) -> Json
{
counter_value += 1;
return Json{{"content",
Json::array({Json{{"type", "text"}, {"text", std::to_string(counter_value)}}})}};
}};
tm.register_tool(counter);

auto handler =
fastmcpp::mcp::make_mcp_handler("demo_stdio", "0.1.0", tm, {{"add", "Add two numbers"}});
fastmcpp::mcp::make_mcp_handler("demo_stdio", "0.1.0", tm,
{{"add", "Add two numbers"},
{"counter", "Increment and return an in-process counter"}});
fastmcpp::server::StdioServerWrapper server(handler);
server.run();
return 0;
Expand Down
36 changes: 26 additions & 10 deletions include/fastmcpp/client/transports.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,9 @@ class WebSocketTransport : public ITransport
std::string url_;
};

// Launches an MCP stdio server as a subprocess and performs
// a single JSON-RPC request/response per call.
// Launches an MCP stdio server as a subprocess and performs JSON-RPC requests
// over its stdin/stdout. By default, the subprocess is kept alive between calls
// to better match Python fastmcp behavior; pass keep_alive=false to spawn per call.
class StdioTransport : public ITransport
{
public:
Expand All @@ -64,29 +65,44 @@ class StdioTransport : public ITransport
/// @param log_file Optional path where subprocess stderr will be written.
/// If provided, stderr is redirected to this file in append mode.
/// If not provided, stderr is captured and included in error messages.
/// @param keep_alive Whether to keep the subprocess alive between calls. Defaults to true.
explicit StdioTransport(std::string command, std::vector<std::string> args = {},
std::optional<std::filesystem::path> log_file = std::nullopt)
: command_(std::move(command)), args_(std::move(args)), log_file_(std::move(log_file))
{
}
std::optional<std::filesystem::path> log_file = std::nullopt,
bool keep_alive = true);

/// Construct with ostream pointer for stderr (v2.13.0+)
/// @param command The command to execute
/// @param args Command-line arguments
/// @param log_stream Stream pointer where subprocess stderr will be written
/// Caller retains ownership; must remain valid during request()
StdioTransport(std::string command, std::vector<std::string> args, std::ostream* log_stream)
: command_(std::move(command)), args_(std::move(args)), log_stream_(log_stream)
/// @param keep_alive Whether to keep the subprocess alive between calls. Defaults to true.
StdioTransport(std::string command, std::vector<std::string> args, std::ostream* log_stream,
bool keep_alive = true);

StdioTransport(const StdioTransport&) = delete;
StdioTransport& operator=(const StdioTransport&) = delete;
StdioTransport(StdioTransport&&) noexcept;
StdioTransport& operator=(StdioTransport&&) noexcept;

~StdioTransport();

fastmcpp::Json request(const std::string& route, const fastmcpp::Json& payload) override;

bool keep_alive() const noexcept
{
return keep_alive_;
}

fastmcpp::Json request(const std::string& route, const fastmcpp::Json& payload);

private:
std::string command_;
std::vector<std::string> args_;
std::optional<std::filesystem::path> log_file_;
std::ostream* log_stream_ = nullptr;
bool keep_alive_{true};
int64_t next_id_{1};

struct State;
std::unique_ptr<State> state_;
};

/// SSE client transport for connecting to MCP servers using Server-Sent Events protocol.
Expand Down
7 changes: 6 additions & 1 deletion src/cli/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ static int tasks_usage(int exit_code = 1)
std::cout << " --ws <url> WebSocket URL (e.g. ws://127.0.0.1:8765)\n";
std::cout << " --stdio <command> Spawn an MCP stdio server\n";
std::cout << " --stdio-arg <arg> Repeatable args for --stdio\n";
std::cout << " --stdio-one-shot Spawn a fresh process per request (disables keep-alive)\n";
std::cout << "\n";
std::cout << "Notes:\n";
std::cout << " - Python fastmcp's `tasks` CLI is for Docket (distributed workers/Redis).\n";
Expand All @@ -75,6 +76,7 @@ struct TasksConnection
std::string url_or_command;
std::string mcp_path = "/mcp";
std::vector<std::string> stdio_args;
bool stdio_keep_alive = true;
};

static bool is_flag(const std::string& s)
Expand Down Expand Up @@ -158,6 +160,8 @@ static std::optional<TasksConnection> parse_tasks_connection(std::vector<std::st
conn.url_or_command = *stdio;
saw_any = true;
}
if (consume_flag(args, "--stdio-one-shot"))
conn.stdio_keep_alive = false;

while (true)
{
Expand Down Expand Up @@ -185,7 +189,8 @@ static fastmcpp::client::Client make_client_from_connection(const TasksConnectio
case TasksConnection::Kind::WebSocket:
return Client(std::make_unique<WebSocketTransport>(conn.url_or_command));
case TasksConnection::Kind::Stdio:
return Client(std::make_unique<StdioTransport>(conn.url_or_command, conn.stdio_args));
return Client(std::make_unique<StdioTransport>(conn.url_or_command, conn.stdio_args,
std::nullopt, conn.stdio_keep_alive));
}
throw std::runtime_error("Unsupported transport kind");
}
Expand Down
193 changes: 187 additions & 6 deletions src/client/transports.cpp
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
#include "fastmcpp/client/transports.hpp"

#include "fastmcpp/exceptions.hpp"
#include "fastmcpp/util/json.hpp"
#include "fastmcpp/util/json.hpp"

#include <chrono>
#include <easywsclient.hpp>
#include <condition_variable>
#include <deque>
#include <easywsclient.hpp>
#include <fstream>
#include <httplib.h>
#include <mutex>
#include <sstream>
#include <thread>
#ifdef FASTMCPP_POST_STREAMING
Expand All @@ -16,12 +19,28 @@
#include <process.hpp>
#endif

namespace fastmcpp::client
{
namespace fastmcpp::client
{

namespace
struct StdioTransport::State
{
struct ParsedUrl
#ifdef TINY_PROCESS_LIB_AVAILABLE
std::unique_ptr<TinyProcessLib::Process> process;
std::ofstream log_file_stream;
std::ostream* stderr_target{nullptr};

std::mutex request_mutex;
std::mutex mutex;
std::condition_variable cv;
std::string stdout_partial;
std::deque<std::string> stdout_lines;
std::string stderr_data;
#endif
};

namespace
{
struct ParsedUrl
{
std::string scheme; // "http" or "https"
std::string host;
Expand Down Expand Up @@ -508,6 +527,20 @@ void WebSocketTransport::request_stream(const std::string& route, const fastmcpp
ws->close();
}

StdioTransport::StdioTransport(std::string command, std::vector<std::string> args,
std::optional<std::filesystem::path> log_file, bool keep_alive)
: command_(std::move(command)), args_(std::move(args)), log_file_(std::move(log_file)),
keep_alive_(keep_alive)
{
}

StdioTransport::StdioTransport(std::string command, std::vector<std::string> args,
std::ostream* log_stream, bool keep_alive)
: command_(std::move(command)), args_(std::move(args)), log_stream_(log_stream),
keep_alive_(keep_alive)
{
}

fastmcpp::Json StdioTransport::request(const std::string& route, const fastmcpp::Json& payload)
{
// Use TinyProcessLibrary (fetched via CMake) for cross-platform subprocess handling
Expand All @@ -519,6 +552,131 @@ fastmcpp::Json StdioTransport::request(const std::string& route, const fastmcpp:

#ifdef TINY_PROCESS_LIB_AVAILABLE
using namespace TinyProcessLib;

if (keep_alive_)
{
if (!state_)
{
state_ = std::make_unique<State>();

if (log_file_.has_value())
{
state_->log_file_stream.open(log_file_.value(), std::ios::app);
if (state_->log_file_stream.is_open())
state_->stderr_target = &state_->log_file_stream;
}
else if (log_stream_ != nullptr)
{
state_->stderr_target = log_stream_;
}

auto stdout_callback = [st_ptr = state_.get()](const char* bytes, size_t n)
{
std::lock_guard<std::mutex> lock(st_ptr->mutex);
st_ptr->stdout_partial.append(bytes, n);

for (;;)
{
auto pos = st_ptr->stdout_partial.find('\n');
if (pos == std::string::npos)
break;

std::string line = st_ptr->stdout_partial.substr(0, pos);
if (!line.empty() && line.back() == '\r')
line.pop_back();
st_ptr->stdout_lines.push_back(std::move(line));
st_ptr->stdout_partial.erase(0, pos + 1);
}

st_ptr->cv.notify_all();
};

auto stderr_callback = [st_ptr = state_.get()](const char* bytes, size_t n)
{
std::lock_guard<std::mutex> lock(st_ptr->mutex);
if (st_ptr->stderr_target != nullptr)
{
st_ptr->stderr_target->write(bytes, n);
st_ptr->stderr_target->flush();
}
st_ptr->stderr_data.append(bytes, n);
};

state_->process = std::make_unique<Process>(cmd.str(), "", stdout_callback,
stderr_callback, /*open_stdin*/ true);
}

auto* st = state_.get();
std::lock_guard<std::mutex> request_lock(st->request_mutex);

const int64_t id = next_id_++;
fastmcpp::Json request = {
{"jsonrpc", "2.0"},
{"id", id},
{"method", route},
{"params", payload},
};

{
std::lock_guard<std::mutex> lock(st->mutex);
st->stderr_data.clear();
}

if (!st->process->write(request.dump() + "\n"))
throw fastmcpp::TransportError("StdioTransport: failed to write request");

// Wait for a response matching this ID.
// Note: stdio servers may emit notifications or logs; ignore non-matching lines.
for (;;)
{
int exit_status = 0;
if (st->process->try_get_exit_status(exit_status))
{
std::lock_guard<std::mutex> lock(st->mutex);
throw fastmcpp::TransportError(
"StdioTransport process exited with code: " +
std::to_string(exit_status) +
(st->stderr_data.empty() ? std::string("")
: ("; stderr: ") + st->stderr_data));
}

std::unique_lock<std::mutex> lock(st->mutex);
if (!st->cv.wait_for(lock, std::chrono::seconds(30),
[&]() { return !st->stdout_lines.empty(); }))
{
throw fastmcpp::TransportError("StdioTransport: timed out waiting for response");
}

while (!st->stdout_lines.empty())
{
auto line = std::move(st->stdout_lines.front());
st->stdout_lines.pop_front();
lock.unlock();

if (line.empty())
{
lock.lock();
continue;
}

try
{
auto parsed = fastmcpp::util::json::parse(line);
if (parsed.contains("id") && parsed["id"].is_number_integer() &&
parsed["id"].get<int64_t>() == id)
{
return parsed;
}
}
catch (...)
{
// Ignore non-JSON stdout lines (e.g., server logs).
}

lock.lock();
}
}
}
std::string stdout_data;
std::string stderr_data;

Expand Down Expand Up @@ -582,6 +740,29 @@ fastmcpp::Json StdioTransport::request(const std::string& route, const fastmcpp:
#endif
}

StdioTransport::StdioTransport(StdioTransport&&) noexcept = default;
StdioTransport& StdioTransport::operator=(StdioTransport&&) noexcept = default;

StdioTransport::~StdioTransport()
{
#ifdef TINY_PROCESS_LIB_AVAILABLE
if (state_ && state_->process)
{
state_->process->close_stdin();

int exit_status = 0;
for (int i = 0; i < 10; i++)
{
if (state_->process->try_get_exit_status(exit_status))
return;
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}

state_->process->kill(false);
}
#endif
}

// =============================================================================
// SseClientTransport implementation
// =============================================================================
Expand Down
Loading
Loading