Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
210 changes: 207 additions & 3 deletions apps/skit/src/mcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,32 @@ pub struct SessionIdArgs {
pub session_id: String,
}

#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct ValidateBatchArgs {
/// Session ID or name.
pub session_id: String,
/// List of batch operations to validate.
pub operations: Vec<streamkit_api::BatchOperation>,
}

#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct ApplyBatchArgs {
/// Session ID or name.
pub session_id: String,
/// List of batch operations to apply atomically.
pub operations: Vec<streamkit_api::BatchOperation>,
}

#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct TuneNodeArgs {
/// Session ID or name.
pub session_id: String,
/// Node ID to send the control message to.
pub node_id: String,
/// The control message (e.g., UpdateParams with a JSON value).
pub message: streamkit_core::control::NodeControlMessage,
}

// ---------------------------------------------------------------------------
// MCP prompt argument structs
// ---------------------------------------------------------------------------
Expand Down Expand Up @@ -623,6 +649,181 @@ impl StreamKitMcp {
.map_err(|e| McpError::internal_error(format!("serialization error: {e}"), None))?,
)]))
}

// -- validate_batch ----------------------------------------------------

#[tool(
description = "Validate a batch of graph mutations against a running session without applying them. Returns validation errors for any operations that would fail. Operations: addnode, removenode, connect, disconnect."
)]
async fn validate_batch(
&self,
Parameters(args): Parameters<ValidateBatchArgs>,
ctx: RequestContext<RoleServer>,
) -> Result<CallToolResult, McpError> {
let (role_name, perms) = extract_auth(&ctx, &self.app_state)?;

if !perms.modify_sessions {
return Err(McpError::invalid_request(
"Permission denied: modify_sessions required",
None,
));
}

let session = {
let sm = self.app_state.session_manager.lock().await;
sm.get_session_by_name_or_id(&args.session_id)
};

let Some(session) = session else {
return Err(McpError::invalid_params(
format!("Session '{}' not found", args.session_id),
None,
));
};

if !perms.access_all_sessions
&& session.created_by.as_ref().is_some_and(|c| c != &role_name)
{
return Err(McpError::invalid_request(
"Permission denied: you do not own this session",
None,
));
}

let errors = crate::server::validate_batch_operations(
&session,
&args.operations,
&perms,
&self.app_state.config.security,
)
.await;

info!(
session_id = %args.session_id,
operation_count = args.operations.len(),
error_count = errors.len(),
"MCP validate_batch"
);

let json = serde_json::to_string_pretty(&errors)
.map_err(|e| McpError::internal_error(format!("serialization error: {e}"), None))?;

Ok(CallToolResult::success(vec![Content::text(json)]))
}

// -- apply_batch -------------------------------------------------------

#[tool(
description = "Apply a batch of graph mutations atomically to a running session. All operations succeed or all fail together. Operations: addnode, removenode, connect, disconnect."
)]
async fn apply_batch(
&self,
Parameters(args): Parameters<ApplyBatchArgs>,
ctx: RequestContext<RoleServer>,
) -> Result<CallToolResult, McpError> {
let (role_name, perms) = extract_auth(&ctx, &self.app_state)?;

if !perms.modify_sessions {
return Err(McpError::invalid_request(
"Permission denied: modify_sessions required",
None,
));
}

let session = {
let sm = self.app_state.session_manager.lock().await;
sm.get_session_by_name_or_id(&args.session_id)
};

let Some(session) = session else {
return Err(McpError::invalid_params(
format!("Session '{}' not found", args.session_id),
None,
));
};

if !perms.access_all_sessions
&& session.created_by.as_ref().is_some_and(|c| c != &role_name)
{
return Err(McpError::invalid_request(
"Permission denied: you do not own this session",
None,
));
}

crate::server::apply_batch_operations(
&session,
args.operations,
&perms,
&self.app_state.config.security,
)
.await
.map_err(|e| McpError::invalid_params(e, None))?;

info!(session_id = %args.session_id, "MCP apply_batch");

let result = serde_json::json!({ "success": true });
Ok(CallToolResult::success(vec![Content::text(
serde_json::to_string_pretty(&result)
.map_err(|e| McpError::internal_error(format!("serialization error: {e}"), None))?,
)]))
}

// -- tune_node ---------------------------------------------------------

#[tool(
description = "Send a control message to a specific node in a running session. Commonly used with UpdateParams to modify node parameters at runtime."
)]
async fn tune_node(
&self,
Parameters(args): Parameters<TuneNodeArgs>,
ctx: RequestContext<RoleServer>,
) -> Result<CallToolResult, McpError> {
let (role_name, perms) = extract_auth(&ctx, &self.app_state)?;

if !perms.tune_nodes {
return Err(McpError::invalid_request("Permission denied: tune_nodes required", None));
}

let session = {
let sm = self.app_state.session_manager.lock().await;
sm.get_session_by_name_or_id(&args.session_id)
};

let Some(session) = session else {
return Err(McpError::invalid_params(
format!("Session '{}' not found", args.session_id),
None,
));
};

if !perms.access_all_sessions
&& session.created_by.as_ref().is_some_and(|c| c != &role_name)
{
return Err(McpError::invalid_request(
"Permission denied: you do not own this session",
None,
));
}

crate::server::tune_session_node(
&session,
args.node_id.clone(),
args.message,
&self.app_state.config.security,
&self.app_state.event_tx,
)
.await
.map_err(|e| McpError::invalid_params(e, None))?;

info!(session_id = %args.session_id, node_id = %args.node_id, "MCP tune_node");

let result = serde_json::json!({ "success": true });
Ok(CallToolResult::success(vec![Content::text(
serde_json::to_string_pretty(&result)
.map_err(|e| McpError::internal_error(format!("serialization error: {e}"), None))?,
)]))
}
}

// ---------------------------------------------------------------------------
Expand Down Expand Up @@ -1082,10 +1283,13 @@ impl ServerHandler for StreamKitMcp {
"StreamKit MCP server. Use list_nodes to discover available \
processing nodes, validate_pipeline to check YAML, \
create_session / list_sessions / get_pipeline / destroy_session \
to manage dynamic pipeline sessions, and \
to manage dynamic pipeline sessions, \
generate_oneshot_command to get a curl or skit-cli command for \
batch processing via the HTTP data plane. Two built-in prompts \
are available: design_pipeline (guided pipeline creation) and \
batch processing via the HTTP data plane, validate_batch and \
apply_batch to atomically mutate a running session's graph, \
and tune_node to send control messages (e.g. UpdateParams) to \
individual nodes at runtime. Two built-in prompts are available: \
design_pipeline (guided pipeline creation) and \
debug_pipeline (session diagnostics).",
);
info.server_info = rmcp::model::Implementation::new("streamkit", env!("CARGO_PKG_VERSION"));
Expand Down
Loading