Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 96 additions & 0 deletions mcp/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,102 @@ func (c *Client) RunPipeline(payload interface{}) (json.RawMessage, error) {
return json.RawMessage(body), nil
}

// ListPipelineTemplates calls GET /api/pipelines/templates.
func (c *Client) ListPipelineTemplates() (json.RawMessage, error) {
body, code, err := c.get("/api/pipelines/templates", nil)
if err != nil {
return nil, err
}
if code != http.StatusOK {
return nil, fmt.Errorf("GET /api/pipelines/templates returned %d: %s", code, truncate(body, 500))
}
return json.RawMessage(body), nil
}

// ListPipelines calls GET /api/pipelines.
func (c *Client) ListPipelines() (json.RawMessage, error) {
body, code, err := c.get("/api/pipelines", nil)
if err != nil {
return nil, err
}
if code != http.StatusOK {
return nil, fmt.Errorf("GET /api/pipelines returned %d: %s", code, truncate(body, 500))
}
return json.RawMessage(body), nil
}

// GetPipeline calls GET /api/pipelines/{id}.
func (c *Client) GetPipeline(id string) (json.RawMessage, error) {
body, code, err := c.get("/api/pipelines/"+url.PathEscape(id), nil)
if err != nil {
return nil, err
}
if code != http.StatusOK {
return nil, fmt.Errorf("GET /api/pipelines/%s returned %d: %s", id, code, truncate(body, 500))
}
return json.RawMessage(body), nil
}

// CreatePipeline calls POST /api/pipelines.
func (c *Client) CreatePipeline(payload interface{}) (json.RawMessage, error) {
body, code, err := c.postJSON("/api/pipelines", payload)
if err != nil {
return nil, err
}
if code != http.StatusCreated {
return nil, fmt.Errorf("POST /api/pipelines returned %d: %s", code, truncate(body, 500))
}
return json.RawMessage(body), nil
}

// UpdatePipeline calls PUT /api/pipelines/{id}.
func (c *Client) UpdatePipeline(id string, payload interface{}) (json.RawMessage, error) {
body, code, err := c.callJSON("PUT", "/api/pipelines/"+url.PathEscape(id), payload, nil)
if err != nil {
return nil, err
}
if code != http.StatusOK {
return nil, fmt.Errorf("PUT /api/pipelines/%s returned %d: %s", id, code, truncate(body, 500))
}
return json.RawMessage(body), nil
}

// DeletePipeline calls DELETE /api/pipelines/{id}.
func (c *Client) DeletePipeline(id string) (json.RawMessage, error) {
body, code, err := c.callJSON("DELETE", "/api/pipelines/"+url.PathEscape(id), nil, nil)
if err != nil {
return nil, err
}
if code != http.StatusNoContent {
return nil, fmt.Errorf("DELETE /api/pipelines/%s returned %d: %s", id, code, truncate(body, 500))
}
return json.RawMessage(`{"status":"deleted"}`), nil
}

// DuplicatePipeline calls POST /api/pipelines/{id}/duplicate.
func (c *Client) DuplicatePipeline(id string) (json.RawMessage, error) {
body, code, err := c.postJSON("/api/pipelines/"+url.PathEscape(id)+"/duplicate", map[string]interface{}{})
if err != nil {
return nil, err
}
if code != http.StatusCreated {
return nil, fmt.Errorf("POST /api/pipelines/%s/duplicate returned %d: %s", id, code, truncate(body, 500))
}
return json.RawMessage(body), nil
}

// ExecutePipeline calls POST /api/pipelines/{id}/execute.
func (c *Client) ExecutePipeline(id string) (json.RawMessage, error) {
body, code, err := c.postJSON("/api/pipelines/"+url.PathEscape(id)+"/execute", map[string]interface{}{})
if err != nil {
return nil, err
}
if code != http.StatusOK {
return nil, fmt.Errorf("POST /api/pipelines/%s/execute returned %d: %s", id, code, truncate(body, 500))
}
return json.RawMessage(body), nil
}

// ConvertFormat calls POST /api/convert.
func (c *Client) ConvertFormat(payload interface{}) (json.RawMessage, error) {
body, code, err := c.postJSON("/api/convert", payload)
Expand Down
149 changes: 149 additions & 0 deletions mcp/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,22 @@ func HandleToolCall(client *Client, name string, args json.RawMessage) (string,
return handleCancelProcessJob(client, params)
case "rerun_process_job":
return handleRerunProcessJob(client, params)
case "list_pipeline_templates":
return handleListPipelineTemplates(client)
case "list_pipelines":
return handleListPipelines(client)
case "get_pipeline":
return handleGetPipeline(client, params)
case "create_pipeline":
return handleCreatePipeline(client, params)
case "update_pipeline":
return handleUpdatePipeline(client, params)
case "delete_pipeline":
return handleDeletePipeline(client, params)
case "duplicate_pipeline":
return handleDuplicatePipeline(client, params)
case "execute_saved_pipeline":
return handleExecuteSavedPipeline(client, params)
case "run_pipeline":
return handleRunPipeline(client, params)
case "convert_format":
Expand Down Expand Up @@ -368,6 +384,127 @@ func handleRerunProcessJob(client *Client, params map[string]interface{}) (strin
return formatJSON(data), nil
}

func handleListPipelineTemplates(client *Client) (string, error) {
data, err := client.ListPipelineTemplates()
if err != nil {
return "", err
}
return formatJSON(data), nil
}

func handleListPipelines(client *Client) (string, error) {
data, err := client.ListPipelines()
if err != nil {
return "", err
}
return formatJSON(data), nil
}

func handleGetPipeline(client *Client, params map[string]interface{}) (string, error) {
pipelineID, err := requireString(params, "pipeline_id")
if err != nil {
return "", err
}
data, err := client.GetPipeline(pipelineID)
if err != nil {
return "", err
}
return formatJSON(data), nil
}

func handleCreatePipeline(client *Client, params map[string]interface{}) (string, error) {
name, err := requireString(params, "name")
if err != nil {
return "", err
}
payload := map[string]interface{}{
"name": name,
}
if desc, err := optionalStringLike(params, "description"); err == nil && desc != "" {
payload["description"] = desc
}
if graph, ok := params["graph"]; ok {
payload["graph"] = graph
}
if canvas, ok := params["canvas"]; ok {
payload["canvas"] = canvas
}
data, err := client.CreatePipeline(payload)
if err != nil {
return "", err
}
return formatJSON(data), nil
}

func handleUpdatePipeline(client *Client, params map[string]interface{}) (string, error) {
pipelineID, err := requireString(params, "pipeline_id")
if err != nil {
return "", err
}
name, err := requireString(params, "name")
if err != nil {
return "", err
}
version, err := requireIntLike(params, "version")
if err != nil {
return "", err
}
payload := map[string]interface{}{
"name": name,
"version": version,
}
if desc, err := optionalStringLike(params, "description"); err == nil && desc != "" {
payload["description"] = desc
}
if graph, ok := params["graph"]; ok {
payload["graph"] = graph
}
if canvas, ok := params["canvas"]; ok {
payload["canvas"] = canvas
}
data, err := client.UpdatePipeline(pipelineID, payload)
if err != nil {
return "", err
}
return formatJSON(data), nil
}

func handleDeletePipeline(client *Client, params map[string]interface{}) (string, error) {
pipelineID, err := requireString(params, "pipeline_id")
if err != nil {
return "", err
}
data, err := client.DeletePipeline(pipelineID)
if err != nil {
return "", err
}
return formatJSON(data), nil
}

func handleDuplicatePipeline(client *Client, params map[string]interface{}) (string, error) {
pipelineID, err := requireString(params, "pipeline_id")
if err != nil {
return "", err
}
data, err := client.DuplicatePipeline(pipelineID)
if err != nil {
return "", err
}
return formatJSON(data), nil
}

func handleExecuteSavedPipeline(client *Client, params map[string]interface{}) (string, error) {
pipelineID, err := requireString(params, "pipeline_id")
if err != nil {
return "", err
}
data, err := client.ExecutePipeline(pipelineID)
if err != nil {
return "", err
}
return formatJSON(data), nil
}

func handleRunPipeline(client *Client, params map[string]interface{}) (string, error) {
if _, ok := params["output_name"]; !ok {
if out, ok := params["output"].(string); ok && out != "" {
Expand Down Expand Up @@ -652,6 +789,18 @@ func requireStringLike(params map[string]interface{}, key string) (string, error
return s, nil
}

func requireIntLike(params map[string]interface{}, key string) (int, error) {
s, err := requireStringLike(params, key)
if err != nil {
return 0, err
}
value, err := strconv.Atoi(s)
if err != nil {
return 0, fmt.Errorf("parameter %s must be an integer", key)
}
return value, nil
}

func parseRoutePoint(v interface{}) ([2]float64, error) {
if arr, ok := v.([]interface{}); ok {
if len(arr) != 2 {
Expand Down
82 changes: 81 additions & 1 deletion mcp/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,9 @@ func TestToolsList(t *testing.T) {
"list_datasets", "get_dataset_info", "query_features", "get_feature",
"upload_dataset", "run_process", "run_raster_process", "preflight_process", "submit_process_job",
"submit_process_batch", "list_process_jobs", "get_process_job",
"cancel_process_job", "rerun_process_job", "run_pipeline", "convert_format",
"cancel_process_job", "rerun_process_job", "list_pipeline_templates", "list_pipelines",
"get_pipeline", "create_pipeline", "update_pipeline", "delete_pipeline",
"duplicate_pipeline", "execute_saved_pipeline", "run_pipeline", "convert_format",
"diff_datasets", "execute_sql", "list_spatial_tables", "get_duckdb_info",
"list_duckdb_datasets", "geocode", "reverse_geocode", "compute_route",
"compute_isochrone", "compute_route_matrix", "compute_service_area",
Expand Down Expand Up @@ -1001,6 +1003,84 @@ func TestToolsCall_SubmitProcessJob(t *testing.T) {
}
}

func TestToolsCall_CreatePipeline(t *testing.T) {
mux := http.NewServeMux()
mux.HandleFunc("POST /api/pipelines", func(w http.ResponseWriter, r *http.Request) {
var body map[string]interface{}
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
t.Fatalf("decode request: %v", err)
}
if body["name"] != "Suitability model" {
w.WriteHeader(http.StatusBadRequest)
fmt.Fprint(w, `{"error":"missing name"}`)
return
}
if _, ok := body["graph"]; !ok {
w.WriteHeader(http.StatusBadRequest)
fmt.Fprint(w, `{"error":"missing graph"}`)
return
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusCreated)
fmt.Fprint(w, `{"id":"pipe_123","name":"Suitability model","version":1}`)
})
srv := testServer(t, mux)

resp := sendRequest(t, srv, "tools/call", 241, map[string]interface{}{
"name": "create_pipeline",
"arguments": map[string]interface{}{
"name": "Suitability model",
"description": "Buffer and clip",
"graph": map[string]interface{}{
"nodes": []interface{}{map[string]interface{}{"id": "n1"}},
"edges": []interface{}{},
},
},
})

if resp.Error != nil {
t.Fatalf("unexpected error: %v", resp.Error)
}
result, _ := resp.Result.(map[string]interface{})
content, _ := result["content"].([]interface{})
first, _ := content[0].(map[string]interface{})
text, _ := first["text"].(string)
if !strings.Contains(text, "pipe_123") {
t.Errorf("response should contain pipeline id, got: %s", text)
}
}

func TestToolsCall_ExecuteSavedPipeline(t *testing.T) {
mux := http.NewServeMux()
mux.HandleFunc("POST /api/pipelines/{id}/execute", func(w http.ResponseWriter, r *http.Request) {
if r.PathValue("id") != "pipe_123" {
w.WriteHeader(http.StatusNotFound)
return
}
w.Header().Set("Content-Type", "application/json")
fmt.Fprint(w, `{"pipeline_id":"pipe_123","status":"submitted","node_count":1,"edge_count":0}`)
})
srv := testServer(t, mux)

resp := sendRequest(t, srv, "tools/call", 242, map[string]interface{}{
"name": "execute_saved_pipeline",
"arguments": map[string]interface{}{
"pipeline_id": "pipe_123",
},
})

if resp.Error != nil {
t.Fatalf("unexpected error: %v", resp.Error)
}
result, _ := resp.Result.(map[string]interface{})
content, _ := result["content"].([]interface{})
first, _ := content[0].(map[string]interface{})
text, _ := first["text"].(string)
if !strings.Contains(text, `"status": "submitted"`) {
t.Errorf("response should contain submitted status, got: %s", text)
}
}

func TestToolsCall_ListProcessJobs(t *testing.T) {
mux := http.NewServeMux()
mux.HandleFunc("GET /api/process/jobs", func(w http.ResponseWriter, r *http.Request) {
Expand Down
Loading
Loading