Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,14 @@ Add to `.mcp.json`:
| `query_features` | Query with bbox, CQL2 filter, limit, properties |
| `get_feature` | Single feature by ID |
| `upload_dataset` | Upload a spatial data file |
| `run_process` | Single geoprocessing operation (buffer, clip, simplify, etc.) |
| `run_process` | Single synchronous geoprocessing operation |
| `preflight_process` | Validate and normalize a processing request |
| `submit_process_job` | Submit an async processing job |
| `submit_process_batch` | Submit dependent async processing jobs |
| `list_process_jobs` | List async processing jobs |
| `get_process_job` | Inspect an async processing job |
| `cancel_process_job` | Cancel an async processing job |
| `rerun_process_job` | Re-submit an async processing job |
| `run_pipeline` | Multi-step geoprocessing pipeline |
| `convert_format` | Convert between formats (GeoJSON, Shapefile, etc.) |
| `diff_datasets` | Compare two dataset versions |
Expand Down
9 changes: 7 additions & 2 deletions SKILL.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,18 +43,23 @@ Queries must be SELECT-only (read-only).

## Geoprocessing Operations

Use `run_process` for single operations or `run_pipeline` for chains.
Use `preflight_process` to validate and normalize a request first. Use `run_process` for synchronous execution, `submit_process_job` or `submit_process_batch` for async execution, and `run_pipeline` for chains.

Always call `list_operations` first to fetch the live server operation catalog and parameter names. The current server supports a broad set including vector ops (`buffer`, `clip`, `intersect`, `difference`, `dissolve`, `sjoin`), geometry conversion (`centroid`, `convex_hull`, `feature_to_point`, `points_to_line`), stats (`spatial_stats`, `morans_i`, `hotspot`, `kernel_density`), interpolation (`interpolate_idw`, `ordinary_kriging`), validation (`validate`, `make_valid`, `validate_topology`), and optimization (`solve_vrp`, `p_median`, `mclp`).
Always call `list_operations` first to fetch the live server operation catalog and parameter names. The server now returns rich metadata including category, UI availability, projected-CRS requirements, and typed parameter definitions.

Important parameter names for common ops:
- `geodesic_buffer` uses metric `distance` in meters
- `clip` uses `mask`
- `sjoin` uses `right` and `predicate`
- `reproject` uses `from_crs` and `to_crs`
- `dissolve` uses `group_by`

Async process jobs expose queue state, phase, progress, and failure metadata via the `/api/process/jobs*` endpoints.

Use `list_analysis_operations` for advanced analysis catalog endpoints under `/api/analysis/operations`.

The async process workflow is available through `submit_process_job`, `submit_process_batch`, `list_process_jobs`, `get_process_job`, `cancel_process_job`, and `rerun_process_job`.

## Data Catalog & STAC

Roteiro includes a built-in data catalog and supports importing from remote STAC (SpatioTemporal Asset Catalog) servers.
Expand Down
90 changes: 90 additions & 0 deletions mcp/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,96 @@ func (c *Client) RunProcess(payload interface{}) (json.RawMessage, error) {
return json.RawMessage(body), nil
}

// PreflightProcess calls POST /api/process/preflight.
func (c *Client) PreflightProcess(payload interface{}) (json.RawMessage, error) {
body, code, err := c.postJSON("/api/process/preflight", payload)
if err != nil {
return nil, err
}
if code != http.StatusOK {
return nil, fmt.Errorf("POST /api/process/preflight returned %d: %s", code, truncate(body, 500))
}
return json.RawMessage(body), nil
}

// SubmitProcessJob calls POST /api/process/jobs.
func (c *Client) SubmitProcessJob(payload interface{}) (json.RawMessage, error) {
body, code, err := c.postJSON("/api/process/jobs", payload)
if err != nil {
return nil, err
}
if code != http.StatusAccepted {
return nil, fmt.Errorf("POST /api/process/jobs returned %d: %s", code, truncate(body, 500))
}
return json.RawMessage(body), nil
}

// SubmitProcessBatch calls POST /api/process/jobs/batch.
func (c *Client) SubmitProcessBatch(payload interface{}) (json.RawMessage, error) {
body, code, err := c.postJSON("/api/process/jobs/batch", payload)
if err != nil {
return nil, err
}
if code != http.StatusAccepted {
return nil, fmt.Errorf("POST /api/process/jobs/batch returned %d: %s", code, truncate(body, 500))
}
return json.RawMessage(body), nil
}

// ListProcessJobs calls GET /api/process/jobs.
func (c *Client) ListProcessJobs(params map[string]string) (json.RawMessage, error) {
q := url.Values{}
for k, v := range params {
if v != "" {
q.Set(k, v)
}
}
body, code, err := c.get("/api/process/jobs", q)
if err != nil {
return nil, err
}
if code != http.StatusOK {
return nil, fmt.Errorf("GET /api/process/jobs returned %d: %s", code, truncate(body, 500))
}
return json.RawMessage(body), nil
}

// GetProcessJob calls GET /api/process/jobs/{id}.
func (c *Client) GetProcessJob(id string) (json.RawMessage, error) {
body, code, err := c.get("/api/process/jobs/"+url.PathEscape(id), nil)
if err != nil {
return nil, err
}
if code != http.StatusOK {
return nil, fmt.Errorf("GET /api/process/jobs/%s returned %d: %s", id, code, truncate(body, 500))
}
return json.RawMessage(body), nil
}

// CancelProcessJob calls DELETE /api/process/jobs/{id}.
func (c *Client) CancelProcessJob(id string) (json.RawMessage, error) {
body, code, err := c.callJSON("DELETE", "/api/process/jobs/"+url.PathEscape(id), nil, nil)
if err != nil {
return nil, err
}
if code != http.StatusNoContent {
return nil, fmt.Errorf("DELETE /api/process/jobs/%s returned %d: %s", id, code, truncate(body, 500))
}
return json.RawMessage(`{"status":"cancelled"}`), nil
}

// RerunProcessJob calls POST /api/process/jobs/{id}/rerun.
func (c *Client) RerunProcessJob(id string) (json.RawMessage, error) {
body, code, err := c.postJSON("/api/process/jobs/"+url.PathEscape(id)+"/rerun", map[string]interface{}{})
if err != nil {
return nil, err
}
if code != http.StatusAccepted {
return nil, fmt.Errorf("POST /api/process/jobs/%s/rerun returned %d: %s", id, code, truncate(body, 500))
}
return json.RawMessage(body), nil
}

// RunPipeline calls POST /api/pipeline.
func (c *Client) RunPipeline(payload interface{}) (json.RawMessage, error) {
body, code, err := c.postJSON("/api/pipeline", payload)
Expand Down
125 changes: 125 additions & 0 deletions mcp/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,20 @@ func HandleToolCall(client *Client, name string, args json.RawMessage) (string,
return handleUploadDataset(client, params)
case "run_process":
return handleRunProcess(client, params)
case "preflight_process":
return handlePreflightProcess(client, params)
case "submit_process_job":
return handleSubmitProcessJob(client, params)
case "submit_process_batch":
return handleSubmitProcessBatch(client, params)
case "list_process_jobs":
return handleListProcessJobs(client, params)
case "get_process_job":
return handleGetProcessJob(client, params)
case "cancel_process_job":
return handleCancelProcessJob(client, params)
case "rerun_process_job":
return handleRerunProcessJob(client, params)
case "run_pipeline":
return handleRunPipeline(client, params)
case "convert_format":
Expand Down Expand Up @@ -290,13 +304,108 @@ func handleUploadDataset(client *Client, params map[string]interface{}) (string,
}

func handleRunProcess(client *Client, params map[string]interface{}) (string, error) {
normalizeProcessPayload(params)
data, err := client.RunProcess(params)
if err != nil {
return "", err
}
return formatJSON(data), nil
}

func handlePreflightProcess(client *Client, params map[string]interface{}) (string, error) {
normalizeProcessPayload(params)
data, err := client.PreflightProcess(params)
if err != nil {
return "", err
}
return formatJSON(data), nil
}

func handleSubmitProcessJob(client *Client, params map[string]interface{}) (string, error) {
normalizeProcessPayload(params)
data, err := client.SubmitProcessJob(params)
if err != nil {
return "", err
}
return formatJSON(data), nil
}

func handleSubmitProcessBatch(client *Client, params map[string]interface{}) (string, error) {
jobs, ok := params["jobs"].([]interface{})
if !ok || len(jobs) == 0 {
return "", fmt.Errorf("parameter jobs must be a non-empty array")
}
for i, item := range jobs {
job, ok := item.(map[string]interface{})
if !ok {
return "", fmt.Errorf("jobs[%d] must be an object", i)
}
req, ok := job["request"].(map[string]interface{})
if !ok {
return "", fmt.Errorf("jobs[%d].request must be an object", i)
}
normalizeProcessPayload(req)
}
data, err := client.SubmitProcessBatch(params)
if err != nil {
return "", err
}
return formatJSON(data), nil
}

func handleListProcessJobs(client *Client, params map[string]interface{}) (string, error) {
qp := map[string]string{}
for _, key := range []string{"status", "search", "limit", "offset"} {
if v, ok := params[key]; ok {
s, err := stringify(v)
if err == nil && s != "" {
qp[key] = s
}
}
}
data, err := client.ListProcessJobs(qp)
if err != nil {
return "", err
}
return formatJSON(data), nil
}

func handleGetProcessJob(client *Client, params map[string]interface{}) (string, error) {
jobID, err := requireString(params, "job_id")
if err != nil {
return "", err
}
data, err := client.GetProcessJob(jobID)
if err != nil {
return "", err
}
return formatJSON(data), nil
}

func handleCancelProcessJob(client *Client, params map[string]interface{}) (string, error) {
jobID, err := requireString(params, "job_id")
if err != nil {
return "", err
}
data, err := client.CancelProcessJob(jobID)
if err != nil {
return "", err
}
return formatJSON(data), nil
}

func handleRerunProcessJob(client *Client, params map[string]interface{}) (string, error) {
jobID, err := requireString(params, "job_id")
if err != nil {
return "", err
}
data, err := client.RerunProcessJob(jobID)
if err != nil {
return "", err
}
return formatJSON(data), nil
}

func handleRunPipeline(client *Client, params map[string]interface{}) (string, error) {
if _, ok := params["output_name"]; !ok {
if out, ok := params["output"].(string); ok && out != "" {
Expand Down Expand Up @@ -337,6 +446,22 @@ func handleConvertFormat(client *Client, params map[string]interface{}) (string,
return formatJSON(data), nil
}

func normalizeProcessPayload(params map[string]interface{}) {
if _, ok := params["output_name"]; !ok {
if out, ok := params["output"].(string); ok && out != "" {
params["output_name"] = out
}
}
if _, ok := params["output_format"]; !ok {
if format, ok := params["format"].(string); ok && format != "" {
params["output_format"] = format
}
}
if _, ok := params["params"]; !ok || params["params"] == nil {
params["params"] = map[string]interface{}{}
}
}

func handleDiffDatasets(client *Client, params map[string]interface{}) (string, error) {
if _, ok := params["left"]; !ok {
if base, ok := params["base"].(string); ok && base != "" {
Expand Down
Loading
Loading