From 5c0dc3cb63081c1eeda0180017f00791bb67d7c3 Mon Sep 17 00:00:00 2001 From: i-norden Date: Sun, 22 Mar 2026 12:33:08 -0400 Subject: [PATCH] Add saved pipeline MCP tools --- mcp/client.go | 96 +++++++++++++++++++++++++++++ mcp/handlers.go | 149 +++++++++++++++++++++++++++++++++++++++++++++ mcp/server_test.go | 82 ++++++++++++++++++++++++- mcp/tools.go | 84 +++++++++++++++++++++++++ 4 files changed, 410 insertions(+), 1 deletion(-) diff --git a/mcp/client.go b/mcp/client.go index acb7634..950305f 100644 --- a/mcp/client.go +++ b/mcp/client.go @@ -382,6 +382,102 @@ func (c *Client) RunPipeline(payload interface{}) (json.RawMessage, error) { return json.RawMessage(body), nil } +// ListPipelineTemplates calls GET /api/pipelines/templates. +func (c *Client) ListPipelineTemplates() (json.RawMessage, error) { + body, code, err := c.get("/api/pipelines/templates", nil) + if err != nil { + return nil, err + } + if code != http.StatusOK { + return nil, fmt.Errorf("GET /api/pipelines/templates returned %d: %s", code, truncate(body, 500)) + } + return json.RawMessage(body), nil +} + +// ListPipelines calls GET /api/pipelines. +func (c *Client) ListPipelines() (json.RawMessage, error) { + body, code, err := c.get("/api/pipelines", nil) + if err != nil { + return nil, err + } + if code != http.StatusOK { + return nil, fmt.Errorf("GET /api/pipelines returned %d: %s", code, truncate(body, 500)) + } + return json.RawMessage(body), nil +} + +// GetPipeline calls GET /api/pipelines/{id}. +func (c *Client) GetPipeline(id string) (json.RawMessage, error) { + body, code, err := c.get("/api/pipelines/"+url.PathEscape(id), nil) + if err != nil { + return nil, err + } + if code != http.StatusOK { + return nil, fmt.Errorf("GET /api/pipelines/%s returned %d: %s", id, code, truncate(body, 500)) + } + return json.RawMessage(body), nil +} + +// CreatePipeline calls POST /api/pipelines. +func (c *Client) CreatePipeline(payload interface{}) (json.RawMessage, error) { + body, code, err := c.postJSON("/api/pipelines", payload) + if err != nil { + return nil, err + } + if code != http.StatusCreated { + return nil, fmt.Errorf("POST /api/pipelines returned %d: %s", code, truncate(body, 500)) + } + return json.RawMessage(body), nil +} + +// UpdatePipeline calls PUT /api/pipelines/{id}. +func (c *Client) UpdatePipeline(id string, payload interface{}) (json.RawMessage, error) { + body, code, err := c.callJSON("PUT", "/api/pipelines/"+url.PathEscape(id), payload, nil) + if err != nil { + return nil, err + } + if code != http.StatusOK { + return nil, fmt.Errorf("PUT /api/pipelines/%s returned %d: %s", id, code, truncate(body, 500)) + } + return json.RawMessage(body), nil +} + +// DeletePipeline calls DELETE /api/pipelines/{id}. +func (c *Client) DeletePipeline(id string) (json.RawMessage, error) { + body, code, err := c.callJSON("DELETE", "/api/pipelines/"+url.PathEscape(id), nil, nil) + if err != nil { + return nil, err + } + if code != http.StatusNoContent { + return nil, fmt.Errorf("DELETE /api/pipelines/%s returned %d: %s", id, code, truncate(body, 500)) + } + return json.RawMessage(`{"status":"deleted"}`), nil +} + +// DuplicatePipeline calls POST /api/pipelines/{id}/duplicate. +func (c *Client) DuplicatePipeline(id string) (json.RawMessage, error) { + body, code, err := c.postJSON("/api/pipelines/"+url.PathEscape(id)+"/duplicate", map[string]interface{}{}) + if err != nil { + return nil, err + } + if code != http.StatusCreated { + return nil, fmt.Errorf("POST /api/pipelines/%s/duplicate returned %d: %s", id, code, truncate(body, 500)) + } + return json.RawMessage(body), nil +} + +// ExecutePipeline calls POST /api/pipelines/{id}/execute. +func (c *Client) ExecutePipeline(id string) (json.RawMessage, error) { + body, code, err := c.postJSON("/api/pipelines/"+url.PathEscape(id)+"/execute", map[string]interface{}{}) + if err != nil { + return nil, err + } + if code != http.StatusOK { + return nil, fmt.Errorf("POST /api/pipelines/%s/execute returned %d: %s", id, code, truncate(body, 500)) + } + return json.RawMessage(body), nil +} + // ConvertFormat calls POST /api/convert. func (c *Client) ConvertFormat(payload interface{}) (json.RawMessage, error) { body, code, err := c.postJSON("/api/convert", payload) diff --git a/mcp/handlers.go b/mcp/handlers.go index 072f24d..3e5a73b 100644 --- a/mcp/handlers.go +++ b/mcp/handlers.go @@ -60,6 +60,22 @@ func HandleToolCall(client *Client, name string, args json.RawMessage) (string, return handleCancelProcessJob(client, params) case "rerun_process_job": return handleRerunProcessJob(client, params) + case "list_pipeline_templates": + return handleListPipelineTemplates(client) + case "list_pipelines": + return handleListPipelines(client) + case "get_pipeline": + return handleGetPipeline(client, params) + case "create_pipeline": + return handleCreatePipeline(client, params) + case "update_pipeline": + return handleUpdatePipeline(client, params) + case "delete_pipeline": + return handleDeletePipeline(client, params) + case "duplicate_pipeline": + return handleDuplicatePipeline(client, params) + case "execute_saved_pipeline": + return handleExecuteSavedPipeline(client, params) case "run_pipeline": return handleRunPipeline(client, params) case "convert_format": @@ -368,6 +384,127 @@ func handleRerunProcessJob(client *Client, params map[string]interface{}) (strin return formatJSON(data), nil } +func handleListPipelineTemplates(client *Client) (string, error) { + data, err := client.ListPipelineTemplates() + if err != nil { + return "", err + } + return formatJSON(data), nil +} + +func handleListPipelines(client *Client) (string, error) { + data, err := client.ListPipelines() + if err != nil { + return "", err + } + return formatJSON(data), nil +} + +func handleGetPipeline(client *Client, params map[string]interface{}) (string, error) { + pipelineID, err := requireString(params, "pipeline_id") + if err != nil { + return "", err + } + data, err := client.GetPipeline(pipelineID) + if err != nil { + return "", err + } + return formatJSON(data), nil +} + +func handleCreatePipeline(client *Client, params map[string]interface{}) (string, error) { + name, err := requireString(params, "name") + if err != nil { + return "", err + } + payload := map[string]interface{}{ + "name": name, + } + if desc, err := optionalStringLike(params, "description"); err == nil && desc != "" { + payload["description"] = desc + } + if graph, ok := params["graph"]; ok { + payload["graph"] = graph + } + if canvas, ok := params["canvas"]; ok { + payload["canvas"] = canvas + } + data, err := client.CreatePipeline(payload) + if err != nil { + return "", err + } + return formatJSON(data), nil +} + +func handleUpdatePipeline(client *Client, params map[string]interface{}) (string, error) { + pipelineID, err := requireString(params, "pipeline_id") + if err != nil { + return "", err + } + name, err := requireString(params, "name") + if err != nil { + return "", err + } + version, err := requireIntLike(params, "version") + if err != nil { + return "", err + } + payload := map[string]interface{}{ + "name": name, + "version": version, + } + if desc, err := optionalStringLike(params, "description"); err == nil && desc != "" { + payload["description"] = desc + } + if graph, ok := params["graph"]; ok { + payload["graph"] = graph + } + if canvas, ok := params["canvas"]; ok { + payload["canvas"] = canvas + } + data, err := client.UpdatePipeline(pipelineID, payload) + if err != nil { + return "", err + } + return formatJSON(data), nil +} + +func handleDeletePipeline(client *Client, params map[string]interface{}) (string, error) { + pipelineID, err := requireString(params, "pipeline_id") + if err != nil { + return "", err + } + data, err := client.DeletePipeline(pipelineID) + if err != nil { + return "", err + } + return formatJSON(data), nil +} + +func handleDuplicatePipeline(client *Client, params map[string]interface{}) (string, error) { + pipelineID, err := requireString(params, "pipeline_id") + if err != nil { + return "", err + } + data, err := client.DuplicatePipeline(pipelineID) + if err != nil { + return "", err + } + return formatJSON(data), nil +} + +func handleExecuteSavedPipeline(client *Client, params map[string]interface{}) (string, error) { + pipelineID, err := requireString(params, "pipeline_id") + if err != nil { + return "", err + } + data, err := client.ExecutePipeline(pipelineID) + if err != nil { + return "", err + } + return formatJSON(data), nil +} + func handleRunPipeline(client *Client, params map[string]interface{}) (string, error) { if _, ok := params["output_name"]; !ok { if out, ok := params["output"].(string); ok && out != "" { @@ -652,6 +789,18 @@ func requireStringLike(params map[string]interface{}, key string) (string, error return s, nil } +func requireIntLike(params map[string]interface{}, key string) (int, error) { + s, err := requireStringLike(params, key) + if err != nil { + return 0, err + } + value, err := strconv.Atoi(s) + if err != nil { + return 0, fmt.Errorf("parameter %s must be an integer", key) + } + return value, nil +} + func parseRoutePoint(v interface{}) ([2]float64, error) { if arr, ok := v.([]interface{}); ok { if len(arr) != 2 { diff --git a/mcp/server_test.go b/mcp/server_test.go index 0ff2fe4..7863180 100644 --- a/mcp/server_test.go +++ b/mcp/server_test.go @@ -98,7 +98,9 @@ func TestToolsList(t *testing.T) { "list_datasets", "get_dataset_info", "query_features", "get_feature", "upload_dataset", "run_process", "run_raster_process", "preflight_process", "submit_process_job", "submit_process_batch", "list_process_jobs", "get_process_job", - "cancel_process_job", "rerun_process_job", "run_pipeline", "convert_format", + "cancel_process_job", "rerun_process_job", "list_pipeline_templates", "list_pipelines", + "get_pipeline", "create_pipeline", "update_pipeline", "delete_pipeline", + "duplicate_pipeline", "execute_saved_pipeline", "run_pipeline", "convert_format", "diff_datasets", "execute_sql", "list_spatial_tables", "get_duckdb_info", "list_duckdb_datasets", "geocode", "reverse_geocode", "compute_route", "compute_isochrone", "compute_route_matrix", "compute_service_area", @@ -1001,6 +1003,84 @@ func TestToolsCall_SubmitProcessJob(t *testing.T) { } } +func TestToolsCall_CreatePipeline(t *testing.T) { + mux := http.NewServeMux() + mux.HandleFunc("POST /api/pipelines", func(w http.ResponseWriter, r *http.Request) { + var body map[string]interface{} + if err := json.NewDecoder(r.Body).Decode(&body); err != nil { + t.Fatalf("decode request: %v", err) + } + if body["name"] != "Suitability model" { + w.WriteHeader(http.StatusBadRequest) + fmt.Fprint(w, `{"error":"missing name"}`) + return + } + if _, ok := body["graph"]; !ok { + w.WriteHeader(http.StatusBadRequest) + fmt.Fprint(w, `{"error":"missing graph"}`) + return + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusCreated) + fmt.Fprint(w, `{"id":"pipe_123","name":"Suitability model","version":1}`) + }) + srv := testServer(t, mux) + + resp := sendRequest(t, srv, "tools/call", 241, map[string]interface{}{ + "name": "create_pipeline", + "arguments": map[string]interface{}{ + "name": "Suitability model", + "description": "Buffer and clip", + "graph": map[string]interface{}{ + "nodes": []interface{}{map[string]interface{}{"id": "n1"}}, + "edges": []interface{}{}, + }, + }, + }) + + if resp.Error != nil { + t.Fatalf("unexpected error: %v", resp.Error) + } + result, _ := resp.Result.(map[string]interface{}) + content, _ := result["content"].([]interface{}) + first, _ := content[0].(map[string]interface{}) + text, _ := first["text"].(string) + if !strings.Contains(text, "pipe_123") { + t.Errorf("response should contain pipeline id, got: %s", text) + } +} + +func TestToolsCall_ExecuteSavedPipeline(t *testing.T) { + mux := http.NewServeMux() + mux.HandleFunc("POST /api/pipelines/{id}/execute", func(w http.ResponseWriter, r *http.Request) { + if r.PathValue("id") != "pipe_123" { + w.WriteHeader(http.StatusNotFound) + return + } + w.Header().Set("Content-Type", "application/json") + fmt.Fprint(w, `{"pipeline_id":"pipe_123","status":"submitted","node_count":1,"edge_count":0}`) + }) + srv := testServer(t, mux) + + resp := sendRequest(t, srv, "tools/call", 242, map[string]interface{}{ + "name": "execute_saved_pipeline", + "arguments": map[string]interface{}{ + "pipeline_id": "pipe_123", + }, + }) + + if resp.Error != nil { + t.Fatalf("unexpected error: %v", resp.Error) + } + result, _ := resp.Result.(map[string]interface{}) + content, _ := result["content"].([]interface{}) + first, _ := content[0].(map[string]interface{}) + text, _ := first["text"].(string) + if !strings.Contains(text, `"status": "submitted"`) { + t.Errorf("response should contain submitted status, got: %s", text) + } +} + func TestToolsCall_ListProcessJobs(t *testing.T) { mux := http.NewServeMux() mux.HandleFunc("GET /api/process/jobs", func(w http.ResponseWriter, r *http.Request) { diff --git a/mcp/tools.go b/mcp/tools.go index 34516c7..d7107b3 100644 --- a/mcp/tools.go +++ b/mcp/tools.go @@ -266,6 +266,90 @@ func AllTools() []Tool { Required: []string{"job_id"}, }, }, + { + Name: "list_pipeline_templates", + Description: "List persisted pipeline templates from Cairn's visual pipeline builder.", + InputSchema: InputSchema{Type: "object"}, + }, + { + Name: "list_pipelines", + Description: "List persisted pipelines for the current tenant.", + InputSchema: InputSchema{Type: "object"}, + }, + { + Name: "get_pipeline", + Description: "Fetch a persisted pipeline definition by ID.", + InputSchema: InputSchema{ + Type: "object", + Properties: map[string]PropertySchema{ + "pipeline_id": {Type: "string", Description: "The persisted pipeline ID."}, + }, + Required: []string{"pipeline_id"}, + }, + }, + { + Name: "create_pipeline", + Description: "Create a persisted pipeline definition for the visual pipeline builder.", + InputSchema: InputSchema{ + Type: "object", + Properties: map[string]PropertySchema{ + "name": {Type: "string", Description: "Pipeline name."}, + "description": {Type: "string", Description: "Optional pipeline description."}, + "graph": {Type: "object", Description: "Pipeline graph JSON payload."}, + "canvas": {Type: "object", Description: "Pipeline canvas layout JSON payload."}, + }, + Required: []string{"name"}, + }, + }, + { + Name: "update_pipeline", + Description: "Update a persisted pipeline definition. Requires the current version for optimistic concurrency.", + InputSchema: InputSchema{ + Type: "object", + Properties: map[string]PropertySchema{ + "pipeline_id": {Type: "string", Description: "The persisted pipeline ID."}, + "name": {Type: "string", Description: "Pipeline name."}, + "description": {Type: "string", Description: "Optional pipeline description."}, + "graph": {Type: "object", Description: "Pipeline graph JSON payload."}, + "canvas": {Type: "object", Description: "Pipeline canvas layout JSON payload."}, + "version": {Type: "number", Description: "Current pipeline version from the latest read."}, + }, + Required: []string{"pipeline_id", "name", "version"}, + }, + }, + { + Name: "delete_pipeline", + Description: "Delete a persisted pipeline definition by ID.", + InputSchema: InputSchema{ + Type: "object", + Properties: map[string]PropertySchema{ + "pipeline_id": {Type: "string", Description: "The persisted pipeline ID."}, + }, + Required: []string{"pipeline_id"}, + }, + }, + { + Name: "duplicate_pipeline", + Description: "Duplicate a persisted pipeline definition by ID.", + InputSchema: InputSchema{ + Type: "object", + Properties: map[string]PropertySchema{ + "pipeline_id": {Type: "string", Description: "The persisted pipeline ID."}, + }, + Required: []string{"pipeline_id"}, + }, + }, + { + Name: "execute_saved_pipeline", + Description: "Submit a persisted pipeline definition for execution.", + InputSchema: InputSchema{ + Type: "object", + Properties: map[string]PropertySchema{ + "pipeline_id": {Type: "string", Description: "The persisted pipeline ID."}, + }, + Required: []string{"pipeline_id"}, + }, + }, { Name: "run_pipeline", Description: "Run a multi-step geoprocessing pipeline. Each step's output feeds into the next step. Useful for chaining operations like buffer → clip → simplify.",