From 73c4ba0e2f560744570b629996352a1f5c62eeff Mon Sep 17 00:00:00 2001 From: luca spolladore Date: Fri, 24 Apr 2026 13:01:44 +0200 Subject: [PATCH] feat: add query and head subcommands for agent data exploration Introduces `sentinel query --sql "..."` for ad-hoc SQL over the registered `data` table and `sentinel head [-n N]` as a thin convenience wrapper, both streaming rows as JSONL for agent consumption. Row cap applied via `DataFrame::limit` so `WITH`/`UNION`/`ORDER BY` pass through unchanged. Also factors row-to-JSON conversion out of `runner.rs` into a shared `arrow_json` helper built on `arrow::json::WriterBuilder`, replacing ~170 lines of hand-rolled per-DataType downcasting and broadening type coverage (timestamps, dates, decimals) for `--show-violations` samples. Co-Authored-By: Claude Opus 4.7 --- CHANGELOG.md | 5 ++ README.md | 51 +++++++++++++- src/arrow_json.rs | 117 ++++++++++++++++++++++++++++++++ src/main.rs | 87 ++++++++++++++++++++++++ src/query.rs | 102 ++++++++++++++++++++++++++++ src/runner.rs | 169 ++-------------------------------------------- 6 files changed, 365 insertions(+), 166 deletions(-) create mode 100644 src/arrow_json.rs create mode 100644 src/query.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index dc80dec..90266de 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] ### Added +- `sentinel query --sql ""` command — runs arbitrary SQL against the registered `data` table and streams rows as JSONL. Defaults to capping output at 1000 rows via `--max-rows` for agent token-budget safety; the cap is applied as a `DataFrame::limit` rather than SQL subquery wrapping, so user `WITH`/`UNION`/`ORDER BY` clauses pass through unchanged. +- `sentinel head [-n N]` command — returns the first N rows of the dataset as JSONL (default 10). Thin wrapper over `query`. + +### Changed +- `--show-violations` sample rows now cover more Arrow types (timestamps, dates, decimals) — the row-to-JSON conversion was rebuilt on `arrow::json::WriterBuilder`. - `sentinel profile ` command — prints per-column stats (type, nulls %, unique, min/max/mean) and a ready-to-use `rules.yaml` block with suggested rules inferred from the data - `mean` field added to `sentinel schema` output for numeric columns - `validate` and `schema` subcommands — CLI now uses `sentinel validate --rules ` (breaking change from flat invocation) diff --git a/README.md b/README.md index 52a915d..e92b49a 100644 --- a/README.md +++ b/README.md @@ -24,7 +24,7 @@ sentinel validate examples/data.csv --rules examples/rules.yaml --format table ## Commands -Sentinel has three subcommands: `validate`, `schema`, and `profile`. +Sentinel has five subcommands: `validate`, `schema`, `profile`, `query`, and `head`. ### validate @@ -83,6 +83,51 @@ Rule suggestion logic: - **`between`** — suggested for numeric columns using observed min/max as bounds - **`unique`** — suggested when all values in the column are distinct +### query + +Run arbitrary SQL against the dataset and stream rows as JSONL. The dataset is registered as the table named `data`, so queries must reference `FROM data`. + +```bash +sentinel query --sql "" [--max-rows ] +``` + +| Flag | Description | +|---|---| +| `-s, --sql ` | SQL to execute (required) | +| `--max-rows ` | Cap on rows returned (default `1000`) — applied via `LIMIT` on top of the user query, safe with `WITH`/`UNION`/etc. | + +One JSON object per row is written to stdout, keyed by column name. Nulls are emitted explicitly. + +```bash +sentinel query examples/data.csv --sql "SELECT * FROM data WHERE age IS NULL OR age > 27" +``` + +```json +{"age":30,"name":"alice"} +{"age":null,"name":"bob"} +``` + +### head + +Return the first N rows of the dataset as JSONL — a convenience wrapper over `query`. + +```bash +sentinel head [-n ] +``` + +| Flag | Description | +|---|---| +| `-n ` | Number of rows to return (default `10`) | + +```bash +sentinel head examples/data.csv -n 2 +``` + +```json +{"age":30,"name":"alice"} +{"age":null,"name":"bob"} +``` + ### schema Inspect the schema and basic stats of a dataset — no rules file needed. @@ -111,9 +156,11 @@ Outputs JSON with per-column info (type, null count, distinct count, min/max/mea | `0` | All rules passed | | `1` | At least one `error`-severity rule failed, or input file is empty | | `2` | Only `warning`-severity rules failed (no errors) | -| `3` | Invalid rules file or schema mismatch | +| `3` | Invalid rules file or schema mismatch (also: bad SQL for `query`) | | `4` | Data file not found or unreadable | +Codes `1` and `2` apply to `validate` only; `query`, `head`, `schema`, and `profile` exit with `0`, `3`, or `4`. + ## Output By default sentinel outputs one JSON object per rule (JSONL), followed by a summary: diff --git a/src/arrow_json.rs b/src/arrow_json.rs new file mode 100644 index 0000000..654ec31 --- /dev/null +++ b/src/arrow_json.rs @@ -0,0 +1,117 @@ +//! Convert Arrow `RecordBatch`es into JSON rows. +//! +//! Used by `runner::fetch_violation_samples` and by agent-facing data-exploration +//! commands (`query`, `head`) to surface rows as JSONL-friendly +//! `serde_json::Value` objects keyed by column name. +//! +//! Explicit nulls are preserved so downstream consumers can rely on every row +//! having every column key. + +use anyhow::Context; +use datafusion::arrow::array::RecordBatch; +use datafusion::arrow::json::writer::JsonArray; +use datafusion::arrow::json::WriterBuilder; +use serde_json::Value as JsonValue; + +/// Serialize `batches` to a `Vec` with one object per row. +/// +/// Empty input (or all-empty batches) returns an empty vec without invoking the writer. +pub fn record_batches_to_json_rows(batches: &[RecordBatch]) -> anyhow::Result> { + if batches.iter().all(|b| b.num_rows() == 0) { + return Ok(vec![]); + } + let mut buf: Vec = Vec::new(); + let mut writer = WriterBuilder::new() + .with_explicit_nulls(true) + .build::<_, JsonArray>(&mut buf); + let refs: Vec<&RecordBatch> = batches.iter().collect(); + writer + .write_batches(&refs) + .context("Failed to serialize RecordBatches to JSON")?; + writer.finish().context("Failed to finalize JSON writer")?; + serde_json::from_slice(&buf).context("Failed to parse arrow-json output into serde_json::Value") +} + +#[cfg(test)] +mod tests { + use super::*; + use datafusion::arrow::array::{BooleanArray, Float64Array, Int32Array, StringArray}; + use datafusion::arrow::datatypes::{DataType, Field, Schema}; + use serde_json::json; + use std::sync::Arc; + + fn make_batch(schema: Schema, columns: Vec) -> RecordBatch { + RecordBatch::try_new(Arc::new(schema), columns).expect("valid batch") + } + + #[test] + fn empty_batches_produce_empty_vec() { + let rows = record_batches_to_json_rows(&[]).unwrap(); + assert!(rows.is_empty()); + } + + #[test] + fn single_batch_yields_object_per_row() { + let schema = Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("name", DataType::Utf8, false), + ]); + let ids = Int32Array::from(vec![1, 2, 3]); + let names = StringArray::from(vec!["a", "b", "c"]); + let batch = make_batch(schema, vec![Arc::new(ids), Arc::new(names)]); + + let rows = record_batches_to_json_rows(&[batch]).unwrap(); + assert_eq!(rows.len(), 3); + assert_eq!(rows[0], json!({"id": 1, "name": "a"})); + assert_eq!(rows[1], json!({"id": 2, "name": "b"})); + assert_eq!(rows[2], json!({"id": 3, "name": "c"})); + } + + #[test] + fn nulls_are_preserved_explicitly() { + let schema = Schema::new(vec![Field::new("age", DataType::Int32, true)]); + let ages = Int32Array::from(vec![Some(30), None, Some(25)]); + let batch = make_batch(schema, vec![Arc::new(ages)]); + + let rows = record_batches_to_json_rows(&[batch]).unwrap(); + assert_eq!(rows.len(), 3); + assert_eq!(rows[0]["age"], json!(30)); + assert!(rows[1]["age"].is_null()); + assert_eq!(rows[2]["age"], json!(25)); + } + + #[test] + fn mixed_types_round_trip() { + let schema = Schema::new(vec![ + Field::new("flag", DataType::Boolean, true), + Field::new("score", DataType::Float64, true), + Field::new("label", DataType::Utf8, true), + ]); + let flags = BooleanArray::from(vec![Some(true), Some(false)]); + let scores = Float64Array::from(vec![Some(1.5), None]); + let labels = StringArray::from(vec![Some("ok"), Some("")]); + let batch = make_batch( + schema, + vec![Arc::new(flags), Arc::new(scores), Arc::new(labels)], + ); + + let rows = record_batches_to_json_rows(&[batch]).unwrap(); + assert_eq!(rows.len(), 2); + assert_eq!(rows[0], json!({"flag": true, "score": 1.5, "label": "ok"})); + assert_eq!(rows[1]["flag"], json!(false)); + assert!(rows[1]["score"].is_null()); + assert_eq!(rows[1]["label"], json!("")); + } + + #[test] + fn multiple_batches_concatenate() { + let schema = Schema::new(vec![Field::new("n", DataType::Int32, false)]); + let b1 = make_batch(schema.clone(), vec![Arc::new(Int32Array::from(vec![1, 2]))]); + let b2 = make_batch(schema, vec![Arc::new(Int32Array::from(vec![3]))]); + + let rows = record_batches_to_json_rows(&[b1, b2]).unwrap(); + assert_eq!(rows.len(), 3); + assert_eq!(rows[0]["n"], json!(1)); + assert_eq!(rows[2]["n"], json!(3)); + } +} diff --git a/src/main.rs b/src/main.rs index 6e2f886..852b524 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,8 +5,10 @@ use std::sync::Arc; use std::time::Instant; use tracing_subscriber::EnvFilter; +mod arrow_json; mod output; mod profile; +mod query; mod rules; mod runner; mod schema; @@ -41,6 +43,10 @@ enum Commands { Schema(SchemaArgs), /// Profile a dataset and suggest starter rules Profile(ProfileArgs), + /// Run a SQL query against the dataset (table name: `data`) and return rows as JSONL + Query(QueryArgs), + /// Return the first N rows of the dataset as JSONL + Head(HeadArgs), } #[derive(Args)] @@ -79,6 +85,27 @@ struct ProfileArgs { file: String, } +#[derive(Args)] +struct QueryArgs { + /// Path to the dataset file (CSV or Parquet) + file: String, + /// SQL to execute. The dataset is registered as the table `data`. + #[arg(short, long)] + sql: String, + /// Maximum rows to return (safety cap for agent token budgets) + #[arg(long, default_value = "1000")] + max_rows: usize, +} + +#[derive(Args)] +struct HeadArgs { + /// Path to the dataset file (CSV or Parquet) + file: String, + /// Number of rows to return + #[arg(short, default_value = "10")] + n: usize, +} + // --------------------------------------------------------------------------- // Agent mode helpers // --------------------------------------------------------------------------- @@ -184,7 +211,67 @@ async fn main() { std::process::exit(1); } } + Commands::Query(args) => match run_query_cmd(args).await { + Ok(code) => std::process::exit(code), + Err(e) => { + let code = e + .downcast_ref::() + .map(|ex| ex.code) + .unwrap_or(1); + eprintln!("Error: {e}"); + std::process::exit(code); + } + }, + Commands::Head(args) => match run_head_cmd(args).await { + Ok(code) => std::process::exit(code), + Err(e) => { + let code = e + .downcast_ref::() + .map(|ex| ex.code) + .unwrap_or(1); + eprintln!("Error: {e}"); + std::process::exit(code); + } + }, + } +} + +// --------------------------------------------------------------------------- +// Query and head subcommands +// --------------------------------------------------------------------------- + +fn print_rows_as_jsonl(rows: &[serde_json::Value]) -> anyhow::Result<()> { + let stdout = std::io::stdout(); + let mut handle = stdout.lock(); + use std::io::Write; + for row in rows { + writeln!(handle, "{}", serde_json::to_string(row)?)?; } + Ok(()) +} + +async fn run_query_cmd(args: QueryArgs) -> anyhow::Result { + let ctx = SessionContext::new(); + register_data(&ctx, &args.file) + .await + .map_err(|e| ExitCodeError::new(4, e))?; + let rows = query::run_query(&ctx, &args.sql, args.max_rows) + .await + .map_err(|e| ExitCodeError::new(3, e))?; + print_rows_as_jsonl(&rows)?; + Ok(0) +} + +async fn run_head_cmd(args: HeadArgs) -> anyhow::Result { + let ctx = SessionContext::new(); + register_data(&ctx, &args.file) + .await + .map_err(|e| ExitCodeError::new(4, e))?; + let rows = query::run_head(&ctx, args.n) + .await + .map_err(|e| ExitCodeError::new(3, e))?; + print_rows_as_jsonl(&rows)?; + Ok(0) } // --------------------------------------------------------------------------- diff --git a/src/query.rs b/src/query.rs new file mode 100644 index 0000000..e26616b --- /dev/null +++ b/src/query.rs @@ -0,0 +1,102 @@ +//! Read-only data-exploration commands (`query`, `head`). +//! +//! Both return rows as `serde_json::Value` objects so the caller can emit JSONL +//! for agents or format them otherwise. The dataset is always registered as the +//! table `data` (see `storage::register_data`), so user SQL must reference that name. + +use crate::arrow_json::record_batches_to_json_rows; +use anyhow::Context; +use datafusion::prelude::*; +use serde_json::Value as JsonValue; + +/// Run arbitrary SQL against the session, capping the result at `max_rows` rows. +/// +/// The cap is applied via `DataFrame::limit` rather than wrapping the user SQL in +/// a subquery, so clauses like `WITH`, `UNION`, or `ORDER BY` work unchanged. +pub async fn run_query( + ctx: &SessionContext, + sql: &str, + max_rows: usize, +) -> anyhow::Result> { + let df = ctx.sql(sql).await.context("SQL query failed")?; + let limited = df + .limit(0, Some(max_rows)) + .context("Failed to apply row limit")?; + let batches = limited + .collect() + .await + .context("Failed to collect query results")?; + record_batches_to_json_rows(&batches) +} + +/// Return the first `n` rows of the registered `data` table. +pub async fn run_head(ctx: &SessionContext, n: usize) -> anyhow::Result> { + run_query(ctx, "SELECT * FROM data", n).await +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + + async fn numeric_ctx() -> SessionContext { + let ctx = SessionContext::new(); + ctx.sql("CREATE TABLE data AS SELECT * FROM (VALUES (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd'), (5, 'e')) AS t(id, name)") + .await + .unwrap() + .collect() + .await + .unwrap(); + ctx + } + + #[tokio::test] + async fn query_returns_rows_as_json_objects() { + let ctx = numeric_ctx().await; + let rows = run_query(&ctx, "SELECT * FROM data WHERE id < 3", 100) + .await + .unwrap(); + assert_eq!(rows.len(), 2); + assert_eq!(rows[0]["id"], json!(1)); + assert_eq!(rows[0]["name"], json!("a")); + } + + #[tokio::test] + async fn query_caps_at_max_rows() { + let ctx = numeric_ctx().await; + let rows = run_query(&ctx, "SELECT * FROM data", 2).await.unwrap(); + assert_eq!(rows.len(), 2); + } + + #[tokio::test] + async fn query_respects_user_limit_when_smaller_than_max() { + let ctx = numeric_ctx().await; + let rows = run_query(&ctx, "SELECT * FROM data LIMIT 1", 100) + .await + .unwrap(); + assert_eq!(rows.len(), 1); + } + + #[tokio::test] + async fn query_bad_sql_returns_error() { + let ctx = numeric_ctx().await; + let res = run_query(&ctx, "SELECT * FROM nonexistent_table", 10).await; + assert!(res.is_err()); + } + + #[tokio::test] + async fn head_returns_first_n_rows() { + let ctx = numeric_ctx().await; + let rows = run_head(&ctx, 3).await.unwrap(); + assert_eq!(rows.len(), 3); + assert_eq!(rows[0]["id"], json!(1)); + assert_eq!(rows[2]["id"], json!(3)); + } + + #[tokio::test] + async fn head_caps_at_table_size() { + let ctx = numeric_ctx().await; + let rows = run_head(&ctx, 100).await.unwrap(); + assert_eq!(rows.len(), 5); + } +} diff --git a/src/runner.rs b/src/runner.rs index 2553660..f3cb731 100644 --- a/src/runner.rs +++ b/src/runner.rs @@ -1,13 +1,10 @@ +use crate::arrow_json::record_batches_to_json_rows; use crate::rules::{Check, Rule, Severity}; use anyhow::Context; -use datafusion::arrow::array::{ - Array, BooleanArray, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, - StringArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array, -}; -use datafusion::arrow::datatypes::DataType; +use datafusion::arrow::array::Int64Array; use datafusion::prelude::*; use serde::Serialize; -use serde_json::{json, Value as JsonValue}; +use serde_json::Value as JsonValue; use std::sync::Arc; #[derive(Debug, Serialize)] @@ -192,164 +189,7 @@ pub async fn fetch_violation_samples( .collect() .await .context("Failed to collect sample results")?; - - let mut rows: Vec = Vec::new(); - for batch in &batches { - let schema = batch.schema(); - let num_rows = batch.num_rows(); - for row_idx in 0..num_rows { - let mut obj = serde_json::Map::new(); - for col_idx in 0..batch.num_columns() { - let col = batch.column(col_idx); - let field = schema.field(col_idx); - let col_name = field.name().clone(); - let value: JsonValue = match field.data_type() { - DataType::Int8 => { - if col.is_null(row_idx) { - JsonValue::Null - } else { - json!(col - .as_any() - .downcast_ref::() - .unwrap() - .value(row_idx)) - } - } - DataType::Int16 => { - if col.is_null(row_idx) { - JsonValue::Null - } else { - json!(col - .as_any() - .downcast_ref::() - .unwrap() - .value(row_idx)) - } - } - DataType::Int32 => { - if col.is_null(row_idx) { - JsonValue::Null - } else { - json!(col - .as_any() - .downcast_ref::() - .unwrap() - .value(row_idx)) - } - } - DataType::Int64 => { - if col.is_null(row_idx) { - JsonValue::Null - } else { - json!(col - .as_any() - .downcast_ref::() - .unwrap() - .value(row_idx)) - } - } - DataType::Float32 => { - if col.is_null(row_idx) { - JsonValue::Null - } else { - json!(col - .as_any() - .downcast_ref::() - .unwrap() - .value(row_idx)) - } - } - DataType::Float64 => { - if col.is_null(row_idx) { - JsonValue::Null - } else { - json!(col - .as_any() - .downcast_ref::() - .unwrap() - .value(row_idx)) - } - } - DataType::Utf8 => { - if col.is_null(row_idx) { - JsonValue::Null - } else { - json!(col - .as_any() - .downcast_ref::() - .unwrap() - .value(row_idx)) - } - } - DataType::UInt8 => { - if col.is_null(row_idx) { - JsonValue::Null - } else { - json!(col - .as_any() - .downcast_ref::() - .unwrap() - .value(row_idx)) - } - } - DataType::UInt16 => { - if col.is_null(row_idx) { - JsonValue::Null - } else { - json!(col - .as_any() - .downcast_ref::() - .unwrap() - .value(row_idx)) - } - } - DataType::UInt32 => { - if col.is_null(row_idx) { - JsonValue::Null - } else { - json!(col - .as_any() - .downcast_ref::() - .unwrap() - .value(row_idx)) - } - } - DataType::UInt64 => { - if col.is_null(row_idx) { - JsonValue::Null - } else { - json!(col - .as_any() - .downcast_ref::() - .unwrap() - .value(row_idx)) - } - } - DataType::Boolean => { - if col.is_null(row_idx) { - JsonValue::Null - } else { - json!(col - .as_any() - .downcast_ref::() - .unwrap() - .value(row_idx)) - } - } - _ => { - if col.is_null(row_idx) { - JsonValue::Null - } else { - json!(format!("{:?}", col.slice(row_idx, 1))) - } - } - }; - obj.insert(col_name, value); - } - rows.push(JsonValue::Object(obj)); - } - } - Ok(rows) + record_batches_to_json_rows(&batches) } pub async fn run_rule( @@ -413,6 +253,7 @@ pub async fn run_rules_parallel( #[cfg(test)] mod test { use super::*; + use serde_json::json; async fn make_ctx(sql: &str) -> Arc { let ctx = SessionContext::new();