diff --git a/README.md b/README.md index 85c6853..1b659c5 100644 --- a/README.md +++ b/README.md @@ -90,7 +90,14 @@ Add to `.mcp.json`: | `query_features` | Query with bbox, CQL2 filter, limit, properties | | `get_feature` | Single feature by ID | | `upload_dataset` | Upload a spatial data file | -| `run_process` | Single geoprocessing operation (buffer, clip, simplify, etc.) | +| `run_process` | Single synchronous geoprocessing operation | +| `preflight_process` | Validate and normalize a processing request | +| `submit_process_job` | Submit an async processing job | +| `submit_process_batch` | Submit dependent async processing jobs | +| `list_process_jobs` | List async processing jobs | +| `get_process_job` | Inspect an async processing job | +| `cancel_process_job` | Cancel an async processing job | +| `rerun_process_job` | Re-submit an async processing job | | `run_pipeline` | Multi-step geoprocessing pipeline | | `convert_format` | Convert between formats (GeoJSON, Shapefile, etc.) | | `diff_datasets` | Compare two dataset versions | diff --git a/SKILL.md b/SKILL.md index 664be83..c7581f8 100644 --- a/SKILL.md +++ b/SKILL.md @@ -43,18 +43,23 @@ Queries must be SELECT-only (read-only). ## Geoprocessing Operations -Use `run_process` for single operations or `run_pipeline` for chains. +Use `preflight_process` to validate and normalize a request first. Use `run_process` for synchronous execution, `submit_process_job` or `submit_process_batch` for async execution, and `run_pipeline` for chains. -Always call `list_operations` first to fetch the live server operation catalog and parameter names. The current server supports a broad set including vector ops (`buffer`, `clip`, `intersect`, `difference`, `dissolve`, `sjoin`), geometry conversion (`centroid`, `convex_hull`, `feature_to_point`, `points_to_line`), stats (`spatial_stats`, `morans_i`, `hotspot`, `kernel_density`), interpolation (`interpolate_idw`, `ordinary_kriging`), validation (`validate`, `make_valid`, `validate_topology`), and optimization (`solve_vrp`, `p_median`, `mclp`). +Always call `list_operations` first to fetch the live server operation catalog and parameter names. The server now returns rich metadata including category, UI availability, projected-CRS requirements, and typed parameter definitions. Important parameter names for common ops: +- `geodesic_buffer` uses metric `distance` in meters - `clip` uses `mask` - `sjoin` uses `right` and `predicate` - `reproject` uses `from_crs` and `to_crs` - `dissolve` uses `group_by` +Async process jobs expose queue state, phase, progress, and failure metadata via the `/api/process/jobs*` endpoints. + Use `list_analysis_operations` for advanced analysis catalog endpoints under `/api/analysis/operations`. +The async process workflow is available through `submit_process_job`, `submit_process_batch`, `list_process_jobs`, `get_process_job`, `cancel_process_job`, and `rerun_process_job`. + ## Data Catalog & STAC Roteiro includes a built-in data catalog and supports importing from remote STAC (SpatioTemporal Asset Catalog) servers. diff --git a/mcp/client.go b/mcp/client.go index 2794775..1bf285b 100644 --- a/mcp/client.go +++ b/mcp/client.go @@ -229,6 +229,96 @@ func (c *Client) RunProcess(payload interface{}) (json.RawMessage, error) { return json.RawMessage(body), nil } +// PreflightProcess calls POST /api/process/preflight. +func (c *Client) PreflightProcess(payload interface{}) (json.RawMessage, error) { + body, code, err := c.postJSON("/api/process/preflight", payload) + if err != nil { + return nil, err + } + if code != http.StatusOK { + return nil, fmt.Errorf("POST /api/process/preflight returned %d: %s", code, truncate(body, 500)) + } + return json.RawMessage(body), nil +} + +// SubmitProcessJob calls POST /api/process/jobs. +func (c *Client) SubmitProcessJob(payload interface{}) (json.RawMessage, error) { + body, code, err := c.postJSON("/api/process/jobs", payload) + if err != nil { + return nil, err + } + if code != http.StatusAccepted { + return nil, fmt.Errorf("POST /api/process/jobs returned %d: %s", code, truncate(body, 500)) + } + return json.RawMessage(body), nil +} + +// SubmitProcessBatch calls POST /api/process/jobs/batch. +func (c *Client) SubmitProcessBatch(payload interface{}) (json.RawMessage, error) { + body, code, err := c.postJSON("/api/process/jobs/batch", payload) + if err != nil { + return nil, err + } + if code != http.StatusAccepted { + return nil, fmt.Errorf("POST /api/process/jobs/batch returned %d: %s", code, truncate(body, 500)) + } + return json.RawMessage(body), nil +} + +// ListProcessJobs calls GET /api/process/jobs. +func (c *Client) ListProcessJobs(params map[string]string) (json.RawMessage, error) { + q := url.Values{} + for k, v := range params { + if v != "" { + q.Set(k, v) + } + } + body, code, err := c.get("/api/process/jobs", q) + if err != nil { + return nil, err + } + if code != http.StatusOK { + return nil, fmt.Errorf("GET /api/process/jobs returned %d: %s", code, truncate(body, 500)) + } + return json.RawMessage(body), nil +} + +// GetProcessJob calls GET /api/process/jobs/{id}. +func (c *Client) GetProcessJob(id string) (json.RawMessage, error) { + body, code, err := c.get("/api/process/jobs/"+url.PathEscape(id), nil) + if err != nil { + return nil, err + } + if code != http.StatusOK { + return nil, fmt.Errorf("GET /api/process/jobs/%s returned %d: %s", id, code, truncate(body, 500)) + } + return json.RawMessage(body), nil +} + +// CancelProcessJob calls DELETE /api/process/jobs/{id}. +func (c *Client) CancelProcessJob(id string) (json.RawMessage, error) { + body, code, err := c.callJSON("DELETE", "/api/process/jobs/"+url.PathEscape(id), nil, nil) + if err != nil { + return nil, err + } + if code != http.StatusNoContent { + return nil, fmt.Errorf("DELETE /api/process/jobs/%s returned %d: %s", id, code, truncate(body, 500)) + } + return json.RawMessage(`{"status":"cancelled"}`), nil +} + +// RerunProcessJob calls POST /api/process/jobs/{id}/rerun. +func (c *Client) RerunProcessJob(id string) (json.RawMessage, error) { + body, code, err := c.postJSON("/api/process/jobs/"+url.PathEscape(id)+"/rerun", map[string]interface{}{}) + if err != nil { + return nil, err + } + if code != http.StatusAccepted { + return nil, fmt.Errorf("POST /api/process/jobs/%s/rerun returned %d: %s", id, code, truncate(body, 500)) + } + return json.RawMessage(body), nil +} + // RunPipeline calls POST /api/pipeline. func (c *Client) RunPipeline(payload interface{}) (json.RawMessage, error) { body, code, err := c.postJSON("/api/pipeline", payload) diff --git a/mcp/handlers.go b/mcp/handlers.go index 817d8df..897b937 100644 --- a/mcp/handlers.go +++ b/mcp/handlers.go @@ -38,6 +38,20 @@ func HandleToolCall(client *Client, name string, args json.RawMessage) (string, return handleUploadDataset(client, params) case "run_process": return handleRunProcess(client, params) + case "preflight_process": + return handlePreflightProcess(client, params) + case "submit_process_job": + return handleSubmitProcessJob(client, params) + case "submit_process_batch": + return handleSubmitProcessBatch(client, params) + case "list_process_jobs": + return handleListProcessJobs(client, params) + case "get_process_job": + return handleGetProcessJob(client, params) + case "cancel_process_job": + return handleCancelProcessJob(client, params) + case "rerun_process_job": + return handleRerunProcessJob(client, params) case "run_pipeline": return handleRunPipeline(client, params) case "convert_format": @@ -290,6 +304,7 @@ func handleUploadDataset(client *Client, params map[string]interface{}) (string, } func handleRunProcess(client *Client, params map[string]interface{}) (string, error) { + normalizeProcessPayload(params) data, err := client.RunProcess(params) if err != nil { return "", err @@ -297,6 +312,100 @@ func handleRunProcess(client *Client, params map[string]interface{}) (string, er return formatJSON(data), nil } +func handlePreflightProcess(client *Client, params map[string]interface{}) (string, error) { + normalizeProcessPayload(params) + data, err := client.PreflightProcess(params) + if err != nil { + return "", err + } + return formatJSON(data), nil +} + +func handleSubmitProcessJob(client *Client, params map[string]interface{}) (string, error) { + normalizeProcessPayload(params) + data, err := client.SubmitProcessJob(params) + if err != nil { + return "", err + } + return formatJSON(data), nil +} + +func handleSubmitProcessBatch(client *Client, params map[string]interface{}) (string, error) { + jobs, ok := params["jobs"].([]interface{}) + if !ok || len(jobs) == 0 { + return "", fmt.Errorf("parameter jobs must be a non-empty array") + } + for i, item := range jobs { + job, ok := item.(map[string]interface{}) + if !ok { + return "", fmt.Errorf("jobs[%d] must be an object", i) + } + req, ok := job["request"].(map[string]interface{}) + if !ok { + return "", fmt.Errorf("jobs[%d].request must be an object", i) + } + normalizeProcessPayload(req) + } + data, err := client.SubmitProcessBatch(params) + if err != nil { + return "", err + } + return formatJSON(data), nil +} + +func handleListProcessJobs(client *Client, params map[string]interface{}) (string, error) { + qp := map[string]string{} + for _, key := range []string{"status", "search", "limit", "offset"} { + if v, ok := params[key]; ok { + s, err := stringify(v) + if err == nil && s != "" { + qp[key] = s + } + } + } + data, err := client.ListProcessJobs(qp) + if err != nil { + return "", err + } + return formatJSON(data), nil +} + +func handleGetProcessJob(client *Client, params map[string]interface{}) (string, error) { + jobID, err := requireString(params, "job_id") + if err != nil { + return "", err + } + data, err := client.GetProcessJob(jobID) + if err != nil { + return "", err + } + return formatJSON(data), nil +} + +func handleCancelProcessJob(client *Client, params map[string]interface{}) (string, error) { + jobID, err := requireString(params, "job_id") + if err != nil { + return "", err + } + data, err := client.CancelProcessJob(jobID) + if err != nil { + return "", err + } + return formatJSON(data), nil +} + +func handleRerunProcessJob(client *Client, params map[string]interface{}) (string, error) { + jobID, err := requireString(params, "job_id") + if err != nil { + return "", err + } + data, err := client.RerunProcessJob(jobID) + if err != nil { + return "", err + } + return formatJSON(data), nil +} + func handleRunPipeline(client *Client, params map[string]interface{}) (string, error) { if _, ok := params["output_name"]; !ok { if out, ok := params["output"].(string); ok && out != "" { @@ -337,6 +446,22 @@ func handleConvertFormat(client *Client, params map[string]interface{}) (string, return formatJSON(data), nil } +func normalizeProcessPayload(params map[string]interface{}) { + if _, ok := params["output_name"]; !ok { + if out, ok := params["output"].(string); ok && out != "" { + params["output_name"] = out + } + } + if _, ok := params["output_format"]; !ok { + if format, ok := params["format"].(string); ok && format != "" { + params["output_format"] = format + } + } + if _, ok := params["params"]; !ok || params["params"] == nil { + params["params"] = map[string]interface{}{} + } +} + func handleDiffDatasets(client *Client, params map[string]interface{}) (string, error) { if _, ok := params["left"]; !ok { if base, ok := params["base"].(string); ok && base != "" { diff --git a/mcp/server_test.go b/mcp/server_test.go index 833d18f..68cc202 100644 --- a/mcp/server_test.go +++ b/mcp/server_test.go @@ -94,7 +94,9 @@ func TestToolsList(t *testing.T) { } for _, want := range []string{ "list_datasets", "get_dataset_info", "query_features", "get_feature", - "upload_dataset", "run_process", "run_pipeline", "convert_format", + "upload_dataset", "run_process", "preflight_process", "submit_process_job", + "submit_process_batch", "list_process_jobs", "get_process_job", + "cancel_process_job", "rerun_process_job", "run_pipeline", "convert_format", "diff_datasets", "execute_sql", "list_spatial_tables", "get_duckdb_info", "list_duckdb_datasets", "geocode", "reverse_geocode", "compute_route", "compute_isochrone", "compute_route_matrix", "compute_service_area", @@ -596,6 +598,16 @@ func TestToolsCall_RunProcess(t *testing.T) { mux.HandleFunc("POST /api/process", func(w http.ResponseWriter, r *http.Request) { var body map[string]interface{} json.NewDecoder(r.Body).Decode(&body) + if body["output_name"] != "parks_buffered" { + w.WriteHeader(http.StatusBadRequest) + fmt.Fprint(w, `{"error":"missing output_name"}`) + return + } + if body["output_format"] != "parquet" { + w.WriteHeader(http.StatusBadRequest) + fmt.Fprint(w, `{"error":"missing output_format"}`) + return + } w.Header().Set("Content-Type", "application/json") fmt.Fprintf(w, `{"output":"buffered_%s","feature_count":10}`, body["input"]) }) @@ -607,6 +619,8 @@ func TestToolsCall_RunProcess(t *testing.T) { "operation": "buffer", "input": "parks", "params": map[string]interface{}{"distance": 500}, + "output": "parks_buffered", + "format": "parquet", }, }) @@ -621,3 +635,141 @@ func TestToolsCall_RunProcess(t *testing.T) { t.Errorf("response should contain output name, got: %s", text) } } + +func TestToolsCall_PreflightProcess(t *testing.T) { + mux := http.NewServeMux() + mux.HandleFunc("POST /api/process/preflight", func(w http.ResponseWriter, r *http.Request) { + var body map[string]interface{} + json.NewDecoder(r.Body).Decode(&body) + if body["output_name"] != "parks_buffered" { + w.WriteHeader(http.StatusBadRequest) + fmt.Fprint(w, `{"error":"missing output_name"}`) + return + } + w.Header().Set("Content-Type", "application/json") + fmt.Fprint(w, `{"valid":true,"resolved_params":{"distance":500}}`) + }) + srv := testServer(t, mux) + + resp := sendRequest(t, srv, "tools/call", 23, map[string]interface{}{ + "name": "preflight_process", + "arguments": map[string]interface{}{ + "operation": "buffer", + "input": "parks", + "params": map[string]interface{}{"distance": 500}, + "output": "parks_buffered", + }, + }) + + if resp.Error != nil { + t.Fatalf("unexpected error: %v", resp.Error) + } + result, _ := resp.Result.(map[string]interface{}) + content, _ := result["content"].([]interface{}) + first, _ := content[0].(map[string]interface{}) + text, _ := first["text"].(string) + if !strings.Contains(text, `"valid": true`) { + t.Errorf("response should contain valid preflight, got: %s", text) + } +} + +func TestToolsCall_SubmitProcessJob(t *testing.T) { + mux := http.NewServeMux() + mux.HandleFunc("POST /api/process/jobs", func(w http.ResponseWriter, r *http.Request) { + var body map[string]interface{} + json.NewDecoder(r.Body).Decode(&body) + if body["output_name"] != "parks_buffered" { + w.WriteHeader(http.StatusBadRequest) + fmt.Fprint(w, `{"error":"missing output_name"}`) + return + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusAccepted) + fmt.Fprint(w, `{"id":"job_123","status":"queued"}`) + }) + srv := testServer(t, mux) + + resp := sendRequest(t, srv, "tools/call", 24, map[string]interface{}{ + "name": "submit_process_job", + "arguments": map[string]interface{}{ + "operation": "buffer", + "input": "parks", + "params": map[string]interface{}{"distance": 500}, + "output": "parks_buffered", + }, + }) + + if resp.Error != nil { + t.Fatalf("unexpected error: %v", resp.Error) + } + result, _ := resp.Result.(map[string]interface{}) + content, _ := result["content"].([]interface{}) + first, _ := content[0].(map[string]interface{}) + text, _ := first["text"].(string) + if !strings.Contains(text, "job_123") { + t.Errorf("response should contain job id, got: %s", text) + } +} + +func TestToolsCall_ListProcessJobs(t *testing.T) { + mux := http.NewServeMux() + mux.HandleFunc("GET /api/process/jobs", func(w http.ResponseWriter, r *http.Request) { + if r.URL.Query().Get("status") != "queued" || r.URL.Query().Get("limit") != "25" { + w.WriteHeader(http.StatusBadRequest) + fmt.Fprint(w, `{"error":"missing filters"}`) + return + } + w.Header().Set("Content-Type", "application/json") + fmt.Fprint(w, `[{"id":"job_123","status":"queued"}]`) + }) + srv := testServer(t, mux) + + resp := sendRequest(t, srv, "tools/call", 25, map[string]interface{}{ + "name": "list_process_jobs", + "arguments": map[string]interface{}{ + "status": "queued", + "limit": 25.0, + }, + }) + + if resp.Error != nil { + t.Fatalf("unexpected error: %v", resp.Error) + } + result, _ := resp.Result.(map[string]interface{}) + content, _ := result["content"].([]interface{}) + first, _ := content[0].(map[string]interface{}) + text, _ := first["text"].(string) + if !strings.Contains(text, "job_123") { + t.Errorf("response should contain job id, got: %s", text) + } +} + +func TestToolsCall_CancelProcessJob(t *testing.T) { + mux := http.NewServeMux() + mux.HandleFunc("DELETE /api/process/jobs/{id}", func(w http.ResponseWriter, r *http.Request) { + if r.PathValue("id") != "job_123" { + w.WriteHeader(http.StatusNotFound) + return + } + w.WriteHeader(http.StatusNoContent) + }) + srv := testServer(t, mux) + + resp := sendRequest(t, srv, "tools/call", 26, map[string]interface{}{ + "name": "cancel_process_job", + "arguments": map[string]interface{}{ + "job_id": "job_123", + }, + }) + + if resp.Error != nil { + t.Fatalf("unexpected error: %v", resp.Error) + } + result, _ := resp.Result.(map[string]interface{}) + content, _ := result["content"].([]interface{}) + first, _ := content[0].(map[string]interface{}) + text, _ := first["text"].(string) + if !strings.Contains(text, "cancelled") { + t.Errorf("response should contain cancellation status, got: %s", text) + } +} diff --git a/mcp/tools.go b/mcp/tools.go index 3c9544c..6f92821 100644 --- a/mcp/tools.go +++ b/mcp/tools.go @@ -113,14 +113,128 @@ func AllTools() []Tool { Type: "object", Properties: map[string]PropertySchema{ "operation": {Type: "string", Description: "The geoprocessing operation to run (e.g. 'buffer', 'clip', 'simplify')."}, - "input": {Type: "string", Description: "Input dataset name."}, + "input": {Type: "string", Description: "Input dataset name. Provide either 'input' or 'input_geojson'."}, + "input_geojson": { + Type: "object", + Description: "Inline GeoJSON input. Provide either 'input' or 'input_geojson'.", + }, "params": { Type: "object", Description: "Operation-specific parameters (e.g. {\"distance\": 500} for buffer, {\"tolerance\": 0.001} for simplify).", }, - "output": {Type: "string", Description: "Output dataset name (optional, auto-generated if not provided)."}, + "output": {Type: "string", Description: "Compatibility alias for 'output_name'."}, + "output_name": {Type: "string", Description: "Output dataset name when registering or naming results."}, + "output_format": {Type: "string", Description: "Requested output format (for example 'geojson', 'parquet', or 'csv')."}, + "format": {Type: "string", Description: "Compatibility alias for 'output_format'."}, + "register": {Type: "boolean", Description: "Whether to register the result as a dataset."}, + }, + Required: []string{"operation"}, + }, + }, + { + Name: "preflight_process", + Description: "Validate and normalize a processing request via /api/process/preflight before executing it. Useful for checking required params, CRS constraints, and normalized dataset references.", + InputSchema: InputSchema{ + Type: "object", + Properties: map[string]PropertySchema{ + "operation": {Type: "string", Description: "The geoprocessing operation to validate."}, + "input": {Type: "string", Description: "Input dataset name. Provide either 'input' or 'input_geojson'."}, + "input_geojson": {Type: "object", Description: "Inline GeoJSON input. Provide either 'input' or 'input_geojson'."}, + "params": {Type: "object", Description: "Operation-specific parameters."}, + "output": {Type: "string", Description: "Compatibility alias for 'output_name'."}, + "output_name": {Type: "string", Description: "Optional output dataset name."}, + "output_format": {Type: "string", Description: "Requested output format."}, + "format": {Type: "string", Description: "Compatibility alias for 'output_format'."}, + "register": {Type: "boolean", Description: "Whether the eventual result should be registered as a dataset."}, + }, + Required: []string{"operation"}, + }, + }, + { + Name: "submit_process_job", + Description: "Submit an asynchronous processing job via /api/process/jobs. Use this when the operation may take longer, or when preflight recommends async execution.", + InputSchema: InputSchema{ + Type: "object", + Properties: map[string]PropertySchema{ + "operation": {Type: "string", Description: "The geoprocessing operation to queue."}, + "input": {Type: "string", Description: "Input dataset name. Provide either 'input' or 'input_geojson'."}, + "input_geojson": {Type: "object", Description: "Inline GeoJSON input. Provide either 'input' or 'input_geojson'."}, + "params": {Type: "object", Description: "Operation-specific parameters."}, + "output": {Type: "string", Description: "Compatibility alias for 'output_name'."}, + "output_name": {Type: "string", Description: "Optional output dataset name."}, + "output_format": {Type: "string", Description: "Requested output format."}, + "format": {Type: "string", Description: "Compatibility alias for 'output_format'."}, + "register": {Type: "boolean", Description: "Whether to register the result as a dataset."}, + }, + Required: []string{"operation"}, + }, + }, + { + Name: "submit_process_batch", + Description: "Submit a dependent batch of asynchronous processing jobs via /api/process/jobs/batch.", + InputSchema: InputSchema{ + Type: "object", + Properties: map[string]PropertySchema{ + "jobs": { + Type: "array", + Description: "Array of batch jobs. Each item supports client_id, depends_on, and request fields matching submit_process_job.", + Items: &PropertySchema{ + Type: "object", + Properties: map[string]PropertySchema{ + "client_id": {Type: "string", Description: "Optional client-side identifier used for dependency references."}, + "depends_on": {Type: "array", Description: "Optional list of batch client IDs or job IDs this job depends on.", Items: &PropertySchema{Type: "string"}}, + "request": {Type: "object", Description: "Process request payload matching submit_process_job."}, + }, + }, + }, + }, + Required: []string{"jobs"}, + }, + }, + { + Name: "list_process_jobs", + Description: "List asynchronous processing jobs via /api/process/jobs with optional filtering.", + InputSchema: InputSchema{ + Type: "object", + Properties: map[string]PropertySchema{ + "status": {Type: "string", Description: "Optional job status filter: queued, processing, completed, failed, cancelled."}, + "search": {Type: "string", Description: "Optional substring match against operation or job metadata."}, + "limit": {Type: "string", Description: "Optional max jobs to return."}, + "offset": {Type: "string", Description: "Optional pagination offset."}, + }, + }, + }, + { + Name: "get_process_job", + Description: "Fetch a single asynchronous processing job by ID.", + InputSchema: InputSchema{ + Type: "object", + Properties: map[string]PropertySchema{ + "job_id": {Type: "string", Description: "The async processing job ID."}, + }, + Required: []string{"job_id"}, + }, + }, + { + Name: "cancel_process_job", + Description: "Cancel an asynchronous processing job by ID.", + InputSchema: InputSchema{ + Type: "object", + Properties: map[string]PropertySchema{ + "job_id": {Type: "string", Description: "The async processing job ID."}, + }, + Required: []string{"job_id"}, + }, + }, + { + Name: "rerun_process_job", + Description: "Re-submit a previous asynchronous processing job by ID.", + InputSchema: InputSchema{ + Type: "object", + Properties: map[string]PropertySchema{ + "job_id": {Type: "string", Description: "The async processing job ID."}, }, - Required: []string{"operation", "input"}, + Required: []string{"job_id"}, }, }, {