feat(mcp): add validate_batch, apply_batch, and tune_node tools#368
Conversation
🤖 Devin AI EngineerI'll be helping with this pull request! Here's what you should know: ✅ I will automatically:
Note: I can only respond to comments from users who have write access to this repository. ⚙️ Control Options:
|
4426dfc to
f59e298
Compare
Extract shared helpers from WebSocket handlers into server/mod.rs and add three new MCP tools for live pipeline mutation: - validate_batch: dry-run validation of batch graph mutations - apply_batch: atomic application of batch graph mutations - tune_node: send control messages (e.g. UpdateParams) to nodes Refactor WebSocket handlers to delegate to the shared helpers, ensuring a single source of truth for batch and tune logic. Add schemars::JsonSchema derives to BatchOperation, ConnectionMode, and NodeControlMessage for MCP schema generation. Add integration tests covering valid/invalid batch operations, full round-trip (create→apply→get→destroy), tune_node with UpdateParams, and permission denial for all three tools. Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
f59e298 to
f76d739
Compare
| // Apply all operations in order. | ||
| let mut engine_operations = Vec::new(); | ||
| { | ||
| let mut pipeline = session.pipeline.lock().await; | ||
| for op in operations { | ||
| match op { | ||
| streamkit_api::BatchOperation::AddNode { node_id, kind, params } => { | ||
| pipeline.nodes.insert( | ||
| node_id.clone(), | ||
| streamkit_api::Node { | ||
| kind: kind.clone(), | ||
| params: params.clone(), | ||
| state: None, | ||
| }, | ||
| ); | ||
| engine_operations.push( | ||
| streamkit_core::control::EngineControlMessage::AddNode { | ||
| node_id, | ||
| kind, | ||
| params, | ||
| }, | ||
| ); | ||
| }, | ||
| streamkit_api::BatchOperation::RemoveNode { node_id } => { | ||
| pipeline.nodes.shift_remove(&node_id); | ||
| pipeline | ||
| .connections | ||
| .retain(|conn| conn.from_node != node_id && conn.to_node != node_id); | ||
| engine_operations.push( | ||
| streamkit_core::control::EngineControlMessage::RemoveNode { node_id }, | ||
| ); | ||
| }, | ||
| streamkit_api::BatchOperation::Connect { | ||
| from_node, | ||
| from_pin, | ||
| to_node, | ||
| to_pin, | ||
| mode, | ||
| } => { | ||
| pipeline.connections.push(streamkit_api::Connection { | ||
| from_node: from_node.clone(), | ||
| from_pin: from_pin.clone(), | ||
| to_node: to_node.clone(), | ||
| to_pin: to_pin.clone(), | ||
| mode, | ||
| }); | ||
| let core_mode = match mode { | ||
| streamkit_api::ConnectionMode::Reliable => { | ||
| streamkit_core::control::ConnectionMode::Reliable | ||
| }, | ||
| streamkit_api::ConnectionMode::BestEffort => { | ||
| streamkit_core::control::ConnectionMode::BestEffort | ||
| }, | ||
| }; | ||
| engine_operations.push( | ||
| streamkit_core::control::EngineControlMessage::Connect { | ||
| from_node, | ||
| from_pin, | ||
| to_node, | ||
| to_pin, | ||
| mode: core_mode, | ||
| }, | ||
| ); | ||
| }, | ||
| streamkit_api::BatchOperation::Disconnect { | ||
| from_node, | ||
| from_pin, | ||
| to_node, | ||
| to_pin, | ||
| } => { | ||
| pipeline.connections.retain(|conn| { | ||
| !(conn.from_node == from_node | ||
| && conn.from_pin == from_pin | ||
| && conn.to_node == to_node | ||
| && conn.to_pin == to_pin) | ||
| }); | ||
| engine_operations.push( | ||
| streamkit_core::control::EngineControlMessage::Disconnect { | ||
| from_node, | ||
| from_pin, | ||
| to_node, | ||
| to_pin, | ||
| }, | ||
| ); | ||
| }, | ||
| } | ||
| } | ||
| drop(pipeline); | ||
| } | ||
|
|
||
| // Send control messages to the engine. | ||
| for msg in engine_operations { | ||
| session.send_control_message(msg).await; | ||
| } | ||
|
|
||
| Ok(()) |
There was a problem hiding this comment.
🚩 Batch operations don't broadcast individual mutation events to WebSocket clients
When apply_batch_operations adds/removes nodes or connections, no NodeAdded/NodeRemoved/ConnectionAdded/ConnectionRemoved events are broadcast to WebSocket clients. This means UI clients watching via WebSocket won't see real-time updates for mutations triggered through apply_batch (whether via MCP or WebSocket). This is a pre-existing design limitation — the old handle_apply_batch also didn't broadcast these events. With the new MCP apply_batch tool, this becomes a more prominent concern since MCP agents can now trigger graph mutations that are invisible to connected UI clients until they re-fetch the pipeline.
Was this helpful? React with 👍 or 👎 to provide feedback.
Debug
Coordinator Review — Sanity PassSolid refactoring overall. The shared-helper extraction is clean and WS handler delegation works correctly. A few observations: 1. Duplicated pre-validation between
|
* feat(mcp): embed MCP server with 6 MVP tools behind feature flag Add an embedded Model Context Protocol (MCP) server to the skit application, gated behind an optional `mcp` Cargo feature flag. When enabled and configured, the server exposes a Streamable HTTP endpoint at `/api/v1/mcp` (configurable) that reuses existing auth, CORS, tracing, and metrics middleware. Phase 1 provides six read/discover/validate/manage tools aimed at automated testing, debugging, and pipeline design use cases: - list_nodes: permission-filtered node definitions with schemas - validate_pipeline: stateless YAML dry-run returning diagnostics - create_session: create a dynamic session from YAML - list_sessions: list active sessions (permission-filtered) - get_pipeline: runtime pipeline snapshot including node states - destroy_session: stop and remove a session Implementation details: - rmcp 1.5 with Streamable HTTP transport and tool_router macro - McpConfig struct (enabled + endpoint) added to server config - StreamKitMcp service with ServerHandler trait implementation - Auth extracted from HTTP request parts injected by rmcp - Public MCP helpers in server module avoid exposing private types Closes: #358 (Phase 1) Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * fix(mcp): assert mcp.endpoint starts with /api/ at startup Prevent misconfiguration where setting mcp.endpoint to a path outside /api/ would silently bypass auth_guard_middleware and origin_guard_middleware. The server now panics at startup if the endpoint is not under /api/. Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * fix(mcp): address PR review findings - Deduplicate validation: refactor validate_pipeline_handler to call validate_pipeline_yaml, eliminating near-identical implementations. - Return typed ValidateResponse instead of serde_json::Value, removing the unnecessary double-serialization pass. - Document StreamableHttpServerConfig defaults (rmcp 1.5) and disable allowed_hosts since auth/origin middleware already guards the endpoint. - Add comment explaining why create_session hard-codes dynamic-mode rules rather than going through check_mode. - Remove racy pre-flight is_name_taken/can_accept_session check in create_session; add_session rechecks under the lock. - Change pub fn wrappers to pub (effectively pub(crate) in this private module) per clippy redundant_pub_crate lint. - Add MCP integration tests: unauthenticated request rejection, authenticated initialization, and validate_pipeline tool call. Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * fix(mcp): address security and correctness review findings MUST FIX: - Move endpoint validation from runtime assert! to config::load time, returning a config error instead of panicking the binary - Tighten endpoint validator: only accept /api/v<digits>/mcp(/...), reject /api/v1/auth/ prefix (auth bypass), path traversal (..) SHOULD FIX: - Warn when mcp.enabled=true but binary compiled without mcp feature - Document SessionConfig keep_alive TTL (5 min) that evicts idle MCP sessions, preventing unbounded growth - Add mcp.allowed_hosts config for DNS rebinding protection on the Host header; disabled by default (acceptable behind auth middleware) - Factor inline oneshot-node check to shared is_synthetic_kind() used by both HTTP and MCP create_session paths - Integration test: panic on PermissionDenied instead of silent skip - Add tests: config validation, permission denial, destroy nonexistent NITS: - Propagate add_session error message instead of hardcoded string - check_file_path_security now accumulates all errors like HTTP path - Set explicit ServerInfo name/version (streamkit + CARGO_PKG_VERSION) - Restore debug! traces in validate_pipeline_yaml for parse/compile - Comment rmcp feature flags in Cargo.toml Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * fix(mcp): reject invalid mode values in validate_pipeline The catch-all arm silently mapped misspelled modes (e.g. 'Dynamic') to None, skipping mode-specific validation and producing false-positive results. Explicitly reject unknown mode values with an error. Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * fix(mcp): pre-flight session checks, better error codes, remove wrappers, add positive-path tests - Add pre-flight session-limit and name-uniqueness checks before Session::create to avoid wasted engine allocation (Finding 1) - Use McpError::invalid_request instead of internal_error for session limit and name collision errors (Finding 2) - Remove thin mcp_ wrapper functions; make originals pub and call them directly from the MCP module (Findings 3+4) - Add positive-path tests: list_nodes, create→list→get→destroy round trip covering all 6 MCP tools (Finding 5) - Replace flake-prone 100ms startup sleep with /healthz polling in integration tests (Finding 5) - Convert call_tool to async fn to satisfy clippy::manual_async_fn Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * style: apply rustfmt formatting Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * feat(mcp): add generate_oneshot_command tool (#366) * feat(mcp): add generate_oneshot_command tool Add a new MCP tool that generates curl or skit-cli commands for executing oneshot (batch processing) pipelines. The tool validates the pipeline YAML before generating the command, returning diagnostics if validation fails. Key details: - Supports 'curl' (default) and 'skit-cli' output formats - Validates YAML with mode=oneshot before generating commands - Uses 'config' as the multipart field name (matching the server API) - Includes integration tests for both formats, invalid YAML, and permission denial Signed-off-by: Devin AI <devin@streamkit.dev> Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * fix(mcp): fix duplicate --input flags and add shell quoting - Fix duplicate --input flags in generate_skit_cli_command when no input has field 'media' (extras and inputs were both iterated) - Shell-quote all interpolated values (paths, URLs) in generated commands to handle spaces and metacharacters safely Signed-off-by: Devin AI <devin@streamkit.dev> Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * fix(mcp): shell-quote field names in skit-cli command generator Quote the entire field=path pair as a single shell token in generate_skit_cli_command, consistent with how generate_curl_command handles it. Prevents shell injection via crafted field names. Signed-off-by: Devin AI <devin@streamkit.dev> Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * fix(mcp): use dynamic heredoc delimiter to prevent content injection Choose a heredoc delimiter that does not appear in the YAML content, preventing premature termination if YAML contains the literal delimiter string. Applies to both curl and skit-cli generators. Signed-off-by: Devin AI <devin@streamkit.dev> Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> --------- Signed-off-by: Devin AI <devin@streamkit.dev> Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-authored-by: StreamKit Devin <devin@streamkit.dev> Co-authored-by: Claudio Costa <cstcld91@gmail.com> * feat(mcp): add design_pipeline and debug_pipeline prompts (#367) Add MCP prompt support with two built-in prompts: - design_pipeline: Returns the full node catalogue (permission-filtered), YAML format explanation, connection rules, pipeline modes, and a design workflow. Optional 'description' argument for user requirements. - debug_pipeline: Fetches pipeline state for a session and returns per-node states, error messages, connection summary, diagnostic checklist, and available remediation tools. Required 'session_id' argument. Implementation: - Enable prompts capability in ServerCapabilities - Implement list_prompts() and get_prompt() on ServerHandler - Permission-filter node definitions in design_pipeline - Reuse session lookup and pipeline state logic from get_pipeline tool - Add 6 integration tests covering both prompts Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-authored-by: StreamKit Devin <devin@streamkit.dev> Co-authored-by: Claudio Costa <cstcld91@gmail.com> * feat(mcp): add validate_batch, apply_batch, and tune_node tools (#368) Extract shared helpers from WebSocket handlers into server/mod.rs and add three new MCP tools for live pipeline mutation: - validate_batch: dry-run validation of batch graph mutations - apply_batch: atomic application of batch graph mutations - tune_node: send control messages (e.g. UpdateParams) to nodes Refactor WebSocket handlers to delegate to the shared helpers, ensuring a single source of truth for batch and tune logic. Add schemars::JsonSchema derives to BatchOperation, ConnectionMode, and NodeControlMessage for MCP schema generation. Add integration tests covering valid/invalid batch operations, full round-trip (create→apply→get→destroy), tune_node with UpdateParams, and permission denial for all three tools. Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-authored-by: StreamKit Devin <devin@streamkit.dev> Co-authored-by: Claudio Costa <cstcld91@gmail.com> * refactor(mcp): split mcp.rs into mcp/ directory module Split the monolithic mcp.rs (1416 lines) into a directory module: - mcp/mod.rs: StreamKitMcp struct, extract_auth(), #[tool_router] impl with all tool methods, ServerHandler impl, service factory - mcp/prompts.rs: prompt content builder functions (build_design_pipeline_content, build_debug_pipeline_content, prompt_metadata) - mcp/oneshot.rs: shell_quote(), unique_heredoc_delimiter(), generate_curl_command(), generate_skit_cli_command() Extract shared helpers on StreamKitMcp: - resolve_session(): combines auth extraction, permission check, session lookup by name/id, and ownership check - json_tool_result(): serializes to pretty JSON and wraps in CallToolResult::success (replaces ~9 duplicated patterns) - filtered_node_definitions(): permission-filtered node list including synthetic oneshot nodes (was duplicated between list_nodes and build_design_pipeline_prompt) - assemble_pipeline_state(): merges node states, view data, and runtime schemas into cloned pipeline (was duplicated between get_pipeline and build_debug_pipeline_prompt) No behavioral changes. Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * refactor(mcp): deduplicate batch validation and expose ValidateResponse.valid - Make ValidateResponse.valid pub(crate) so generate_oneshot_command can check validation.valid directly instead of the fragile serde_json::to_value round-trip. - Extract check_batch_node_id_uniqueness() helper in server/mod.rs to deduplicate the identical node-ID uniqueness checking logic (~20 lines each) between validate_batch_operations and apply_batch_operations. No behavioral changes. Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * chore: update Cargo.lock for schemars dependency Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * refactor(mcp): align prompts to idiomatic rmcp #[prompt_router] macros Replace manual ServerHandler method overrides (list_prompts, get_prompt) and hand-built dispatch with idiomatic rmcp 1.5 macros: - Add #[prompt_router] impl block in prompts.rs with #[prompt] methods - Add #[tool_handler] and #[prompt_handler] on ServerHandler impl - Add PromptRouter<Self> field to StreamKitMcp - Move DesignPipelinePromptArgs and DebugPipelinePromptArgs to prompts.rs - Remove prompt_metadata(), manual dispatch, and prompt helper methods - Keep content builder functions as helpers called by prompt methods Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * feat(mcp): add skit mcp CLI subcommand for STDIO transport Add STDIO transport support for the MCP server, enabling local MCP client integration (Devin, Claude Desktop, Cursor). Changes: - Add transport-io feature to rmcp dependency - Add Mcp variant to CLI Commands enum with handle_mcp_command - Extract create_app_state() from create_app() for reuse - Add start_mcp_stdio() function using rmcp STDIO transport - Handle extract_auth() fallback for STDIO (no HTTP parts, local/trusted) - Add init_logging_stderr() to keep stdout clean for JSON-RPC stream - Add just skit-mcp recipe with --features mcp - Add integration test for STDIO MCP server initialize handshake Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * ci(mcp): add MCP feature to lint and test CI steps The mcp feature was not included in clippy or test runs, so all code behind #[cfg(feature = "mcp")] was unchecked by CI. Add separate clippy and cargo-test invocations with --features "mcp" to both the justfile recipes (lint-skit, test-skit) and the GitHub Actions workflow (skit.yml lint + test jobs). Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * fix(mcp): resolve clippy warnings uncovered by MCP lint Fix 25 clippy warnings that were previously hidden because the mcp feature was not included in the lint CI step: - redundant_pub_crate: change pub(crate) to pub(super) in private submodules (oneshot.rs, prompts.rs) - format_push_string: replace push_str(&format!(...)) with write!() in prompt content builders - map_unwrap_or: use map_or_else instead of map().unwrap_or_else() - if_not_else: flip condition in skit-cli input flag logic - single_match_else + option_if_let_else: use map_or_else in extract_auth Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * fix(mcp): use writeln! instead of write! with trailing newline Clippy's write_with_newline lint catches write!() calls that end with a literal \n. Replace with writeln!() for consistency. Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * fix(mcp): use struct init in test to satisfy field_reassign_with_default Use struct initializer with ..Default::default() instead of assigning fields individually after creating a default instance. Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * refactor(mcp): extract create_dynamic_session shared helper Extract the full session creation flow (YAML parse, compile, permission checks, file-path security, session limit, engine allocation, insert, pipeline population, event broadcast) into a shared create_dynamic_session() function in server/mod.rs. Both the HTTP create_session_handler and MCP create_session tool now delegate to this helper, eliminating ~120 lines of duplicated logic. Each caller maps CreateSessionError variants to protocol-appropriate error types (StatusCode for HTTP, McpError for MCP). Also removes the #[cfg(feature = "mcp")] gate from check_file_path_security so the shared function can call it unconditionally. Addresses Devin Review finding r3141796134. Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * fix(mcp): address review findings from final pass 1. STDIO auth doc: document that skit mcp runs unauthenticated with admin-level permissions; only expose stdin to trusted local processes. 2. Host vs Origin doc: clarify that auth_guard_middleware (bearer-token validation) prevents DNS rebinding, not the Origin header alone. Rewrite the allowed_hosts doc to distinguish Host-header checking from Origin-based CORS protection. 3. STDIO test coverage: add test_mcp_stdio_tool_call that initializes over STDIO and invokes list_nodes, verifying the admin-fallback auth path grants actual tool access (not just the handshake). 4. Predictable temp path: replace fixed /tmp/pipeline.yaml with mktemp /tmp/pipeline-XXXXXX.yaml in generated oneshot commands, preventing clobbering on concurrent runs and TOCTOU on multi-user systems. 5. start_mcp_stdio docs: document startup cost (full plugin + gateway init) and shutdown behaviour (no drain of in-flight session shutdowns). 6. McpConfig::validate comment: reframe .. rejection as 'unsafe path segments' rather than 'path traversal', and note the conservative nature of the check. 7. apply_batch description: replace 'atomically' with 'as a single validated batch' and add caveat about engine-side error recovery. Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * fix(server): preserve 403 FORBIDDEN for file-path security violations check_file_path_security errors are security policy denials (paths outside allowed_file_paths / allowed_write_paths), not malformed input. Map them to CreateSessionError::Forbidden so the HTTP handler returns StatusCode::FORBIDDEN (403), preserving the pre-refactoring API contract. Previously the shared helper mapped these to InvalidInput → 400, which was a breaking change from the old create_session_handler that mapped AppError::Forbidden → 403. Addresses review finding r3141831117. Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> --------- Signed-off-by: StreamKit Devin <devin@streamkit.dev> Signed-off-by: Devin AI <devin@streamkit.dev> Co-authored-by: StreamKit Devin <devin@streamkit.dev> Co-authored-by: Claudio Costa <cstcld91@gmail.com> Co-authored-by: staging-devin-ai-integration[bot] <166158716+staging-devin-ai-integration[bot]@users.noreply.github.com>
Summary
Adds three new MCP tools for live pipeline mutation, building on the Phase 1 MCP server (PR #359):
validate_batch— dry-run validation of batch graph mutations against a running sessionapply_batch— atomic application of batch graph mutations (AddNode, RemoveNode, Connect, Disconnect)tune_node— send control messages (e.g.UpdateParams) to individual nodes at runtimeImplementation approach
Shared helpers extracted from WebSocket handlers into
server/mod.rs:validate_batch_operations()— validates batch ops without applyingapply_batch_operations()— applies ops atomically with pre-validationtune_session_node()— sends control messages with security validation, durable model updates, and event broadcastingWebSocket handlers refactored to delegate to shared helpers, ensuring a single source of truth for batch and tune logic.
schemars::JsonSchemaderived onBatchOperation,ConnectionMode, andNodeControlMessagefor MCP schema generation. Addedschemarsdependency tocrates/api/Cargo.toml.MCP tools follow the established pattern:
extract_auth→ permission check → session lookup + ownership check → delegate to shared helper → return JSON result.Integration tests added for all three tools covering valid/invalid operations, full round-trip, and permission denial.
Review & Testing Checklist for Human
apply_batchatomicity: if any pre-validation fails, no mutations should be appliedtune_nodesecurity validation (file paths forfile_reader/file_writer/scriptnodes) works correctly via MCPmodify_sessionsshould be blocked fromvalidate_batch/apply_batch, roles withouttune_nodesfromtune_nodeNodeParamsChangedfor tune operations)Notes
#[cfg(feature = "mcp")]since the WebSocket handlers also use themapply_batchdoes not broadcast per-operation events (matching the existing WS handler behavior)tune_nodebroadcastsNodeParamsChangedevents via the shared helper, keeping WS clients in syncLink to Devin session: https://staging.itsdev.in/sessions/556047016f2148f6a20fe1e9b1715e6a
Requested by: @streamer45