Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]

### Added
- `sentinel query <file> --sql "<SQL>"` command — runs arbitrary SQL against the registered `data` table and streams rows as JSONL. Defaults to capping output at 1000 rows via `--max-rows` for agent token-budget safety; the cap is applied as a `DataFrame::limit` rather than SQL subquery wrapping, so user `WITH`/`UNION`/`ORDER BY` clauses pass through unchanged.
- `sentinel head <file> [-n N]` command — returns the first N rows of the dataset as JSONL (default 10). Thin wrapper over `query`.

### Changed
- `--show-violations` sample rows now cover more Arrow types (timestamps, dates, decimals) — the row-to-JSON conversion was rebuilt on `arrow::json::WriterBuilder`.
- `sentinel profile <file>` command — prints per-column stats (type, nulls %, unique, min/max/mean) and a ready-to-use `rules.yaml` block with suggested rules inferred from the data
- `mean` field added to `sentinel schema` output for numeric columns
- `validate` and `schema` subcommands — CLI now uses `sentinel validate <file> --rules <rules>` (breaking change from flat invocation)
Expand Down
51 changes: 49 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ sentinel validate examples/data.csv --rules examples/rules.yaml --format table

## Commands

Sentinel has three subcommands: `validate`, `schema`, and `profile`.
Sentinel has five subcommands: `validate`, `schema`, `profile`, `query`, and `head`.

### validate

Expand Down Expand Up @@ -83,6 +83,51 @@ Rule suggestion logic:
- **`between`** — suggested for numeric columns using observed min/max as bounds
- **`unique`** — suggested when all values in the column are distinct

### query

Run arbitrary SQL against the dataset and stream rows as JSONL. The dataset is registered as the table named `data`, so queries must reference `FROM data`.

```bash
sentinel query <data-file> --sql "<SQL>" [--max-rows <N>]
```

| Flag | Description |
|---|---|
| `-s, --sql <sql>` | SQL to execute (required) |
| `--max-rows <N>` | Cap on rows returned (default `1000`) — applied via `LIMIT` on top of the user query, safe with `WITH`/`UNION`/etc. |

One JSON object per row is written to stdout, keyed by column name. Nulls are emitted explicitly.

```bash
sentinel query examples/data.csv --sql "SELECT * FROM data WHERE age IS NULL OR age > 27"
```

```json
{"age":30,"name":"alice"}
{"age":null,"name":"bob"}
```

### head

Return the first N rows of the dataset as JSONL — a convenience wrapper over `query`.

```bash
sentinel head <data-file> [-n <N>]
```

| Flag | Description |
|---|---|
| `-n <N>` | Number of rows to return (default `10`) |

```bash
sentinel head examples/data.csv -n 2
```

```json
{"age":30,"name":"alice"}
{"age":null,"name":"bob"}
```

### schema

Inspect the schema and basic stats of a dataset — no rules file needed.
Expand Down Expand Up @@ -111,9 +156,11 @@ Outputs JSON with per-column info (type, null count, distinct count, min/max/mea
| `0` | All rules passed |
| `1` | At least one `error`-severity rule failed, or input file is empty |
| `2` | Only `warning`-severity rules failed (no errors) |
| `3` | Invalid rules file or schema mismatch |
| `3` | Invalid rules file or schema mismatch (also: bad SQL for `query`) |
| `4` | Data file not found or unreadable |

Codes `1` and `2` apply to `validate` only; `query`, `head`, `schema`, and `profile` exit with `0`, `3`, or `4`.

## Output

By default sentinel outputs one JSON object per rule (JSONL), followed by a summary:
Expand Down
117 changes: 117 additions & 0 deletions src/arrow_json.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
//! Convert Arrow `RecordBatch`es into JSON rows.
//!
//! Used by `runner::fetch_violation_samples` and by agent-facing data-exploration
//! commands (`query`, `head`) to surface rows as JSONL-friendly
//! `serde_json::Value` objects keyed by column name.
//!
//! Explicit nulls are preserved so downstream consumers can rely on every row
//! having every column key.

use anyhow::Context;
use datafusion::arrow::array::RecordBatch;
use datafusion::arrow::json::writer::JsonArray;
use datafusion::arrow::json::WriterBuilder;
use serde_json::Value as JsonValue;

/// Serialize `batches` to a `Vec<JsonValue>` with one object per row.
///
/// Empty input (or all-empty batches) returns an empty vec without invoking the writer.
pub fn record_batches_to_json_rows(batches: &[RecordBatch]) -> anyhow::Result<Vec<JsonValue>> {
if batches.iter().all(|b| b.num_rows() == 0) {
return Ok(vec![]);
}
let mut buf: Vec<u8> = Vec::new();
let mut writer = WriterBuilder::new()
.with_explicit_nulls(true)
.build::<_, JsonArray>(&mut buf);
let refs: Vec<&RecordBatch> = batches.iter().collect();
writer
.write_batches(&refs)
.context("Failed to serialize RecordBatches to JSON")?;
writer.finish().context("Failed to finalize JSON writer")?;
serde_json::from_slice(&buf).context("Failed to parse arrow-json output into serde_json::Value")
}

#[cfg(test)]
mod tests {
use super::*;
use datafusion::arrow::array::{BooleanArray, Float64Array, Int32Array, StringArray};
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use serde_json::json;
use std::sync::Arc;

fn make_batch(schema: Schema, columns: Vec<datafusion::arrow::array::ArrayRef>) -> RecordBatch {
RecordBatch::try_new(Arc::new(schema), columns).expect("valid batch")
}

#[test]
fn empty_batches_produce_empty_vec() {
let rows = record_batches_to_json_rows(&[]).unwrap();
assert!(rows.is_empty());
}

#[test]
fn single_batch_yields_object_per_row() {
let schema = Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("name", DataType::Utf8, false),
]);
let ids = Int32Array::from(vec![1, 2, 3]);
let names = StringArray::from(vec!["a", "b", "c"]);
let batch = make_batch(schema, vec![Arc::new(ids), Arc::new(names)]);

let rows = record_batches_to_json_rows(&[batch]).unwrap();
assert_eq!(rows.len(), 3);
assert_eq!(rows[0], json!({"id": 1, "name": "a"}));
assert_eq!(rows[1], json!({"id": 2, "name": "b"}));
assert_eq!(rows[2], json!({"id": 3, "name": "c"}));
}

#[test]
fn nulls_are_preserved_explicitly() {
let schema = Schema::new(vec![Field::new("age", DataType::Int32, true)]);
let ages = Int32Array::from(vec![Some(30), None, Some(25)]);
let batch = make_batch(schema, vec![Arc::new(ages)]);

let rows = record_batches_to_json_rows(&[batch]).unwrap();
assert_eq!(rows.len(), 3);
assert_eq!(rows[0]["age"], json!(30));
assert!(rows[1]["age"].is_null());
assert_eq!(rows[2]["age"], json!(25));
}

#[test]
fn mixed_types_round_trip() {
let schema = Schema::new(vec![
Field::new("flag", DataType::Boolean, true),
Field::new("score", DataType::Float64, true),
Field::new("label", DataType::Utf8, true),
]);
let flags = BooleanArray::from(vec![Some(true), Some(false)]);
let scores = Float64Array::from(vec![Some(1.5), None]);
let labels = StringArray::from(vec![Some("ok"), Some("")]);
let batch = make_batch(
schema,
vec![Arc::new(flags), Arc::new(scores), Arc::new(labels)],
);

let rows = record_batches_to_json_rows(&[batch]).unwrap();
assert_eq!(rows.len(), 2);
assert_eq!(rows[0], json!({"flag": true, "score": 1.5, "label": "ok"}));
assert_eq!(rows[1]["flag"], json!(false));
assert!(rows[1]["score"].is_null());
assert_eq!(rows[1]["label"], json!(""));
}

#[test]
fn multiple_batches_concatenate() {
let schema = Schema::new(vec![Field::new("n", DataType::Int32, false)]);
let b1 = make_batch(schema.clone(), vec![Arc::new(Int32Array::from(vec![1, 2]))]);
let b2 = make_batch(schema, vec![Arc::new(Int32Array::from(vec![3]))]);

let rows = record_batches_to_json_rows(&[b1, b2]).unwrap();
assert_eq!(rows.len(), 3);
assert_eq!(rows[0]["n"], json!(1));
assert_eq!(rows[2]["n"], json!(3));
}
}
87 changes: 87 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ use std::sync::Arc;
use std::time::Instant;
use tracing_subscriber::EnvFilter;

mod arrow_json;
mod output;
mod profile;
mod query;
mod rules;
mod runner;
mod schema;
Expand Down Expand Up @@ -41,6 +43,10 @@ enum Commands {
Schema(SchemaArgs),
/// Profile a dataset and suggest starter rules
Profile(ProfileArgs),
/// Run a SQL query against the dataset (table name: `data`) and return rows as JSONL
Query(QueryArgs),
/// Return the first N rows of the dataset as JSONL
Head(HeadArgs),
}

#[derive(Args)]
Expand Down Expand Up @@ -79,6 +85,27 @@ struct ProfileArgs {
file: String,
}

#[derive(Args)]
struct QueryArgs {
/// Path to the dataset file (CSV or Parquet)
file: String,
/// SQL to execute. The dataset is registered as the table `data`.
#[arg(short, long)]
sql: String,
/// Maximum rows to return (safety cap for agent token budgets)
#[arg(long, default_value = "1000")]
max_rows: usize,
}

#[derive(Args)]
struct HeadArgs {
/// Path to the dataset file (CSV or Parquet)
file: String,
/// Number of rows to return
#[arg(short, default_value = "10")]
n: usize,
}

// ---------------------------------------------------------------------------
// Agent mode helpers
// ---------------------------------------------------------------------------
Expand Down Expand Up @@ -184,7 +211,67 @@ async fn main() {
std::process::exit(1);
}
}
Commands::Query(args) => match run_query_cmd(args).await {
Ok(code) => std::process::exit(code),
Err(e) => {
let code = e
.downcast_ref::<ExitCodeError>()
.map(|ex| ex.code)
.unwrap_or(1);
eprintln!("Error: {e}");
std::process::exit(code);
}
},
Commands::Head(args) => match run_head_cmd(args).await {
Ok(code) => std::process::exit(code),
Err(e) => {
let code = e
.downcast_ref::<ExitCodeError>()
.map(|ex| ex.code)
.unwrap_or(1);
eprintln!("Error: {e}");
std::process::exit(code);
}
},
}
}

// ---------------------------------------------------------------------------
// Query and head subcommands
// ---------------------------------------------------------------------------

fn print_rows_as_jsonl(rows: &[serde_json::Value]) -> anyhow::Result<()> {
let stdout = std::io::stdout();
let mut handle = stdout.lock();
use std::io::Write;
for row in rows {
writeln!(handle, "{}", serde_json::to_string(row)?)?;
}
Ok(())
}

async fn run_query_cmd(args: QueryArgs) -> anyhow::Result<i32> {
let ctx = SessionContext::new();
register_data(&ctx, &args.file)
.await
.map_err(|e| ExitCodeError::new(4, e))?;
let rows = query::run_query(&ctx, &args.sql, args.max_rows)
.await
.map_err(|e| ExitCodeError::new(3, e))?;
print_rows_as_jsonl(&rows)?;
Ok(0)
}

async fn run_head_cmd(args: HeadArgs) -> anyhow::Result<i32> {
let ctx = SessionContext::new();
register_data(&ctx, &args.file)
.await
.map_err(|e| ExitCodeError::new(4, e))?;
let rows = query::run_head(&ctx, args.n)
.await
.map_err(|e| ExitCodeError::new(3, e))?;
print_rows_as_jsonl(&rows)?;
Ok(0)
}

// ---------------------------------------------------------------------------
Expand Down
Loading
Loading