From f4d1627c747f351582cc55662ade11928b7aa8d2 Mon Sep 17 00:00:00 2001 From: SpollaL Date: Thu, 23 Apr 2026 21:09:14 +0200 Subject: [PATCH] feat: add --show-violations flag for violation row export - Adds --show-violations [N] flag (default N=5) to fetch the first N failing rows per rule - JSON output gains a sample_rows array on each failing rule result (omitted when flag not used) - Table output gains a SAMPLE VIOLATIONS column showing compact JSON of violating rows - fetch_violation_samples() in runner.rs executes SELECT * for each check type and converts Arrow record batches to JSON - Custom check returns empty samples - 10 new tests covering all check types, limit behavior, no-violation case, and custom check Co-Authored-By: Claude Sonnet 4.6 --- src/main.rs | 15 ++- src/output.rs | 65 ++++++++-- src/runner.rs | 322 +++++++++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 391 insertions(+), 11 deletions(-) diff --git a/src/main.rs b/src/main.rs index fa02f08..8c52c6c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -10,7 +10,7 @@ mod storage; use output::OutputFormat; use rules::RulesFile; -use runner::run_rule; +use runner::{fetch_violation_samples, run_rule}; use storage::register_data; use crate::{ @@ -35,6 +35,9 @@ struct Cli { /// Print full error chain on failure #[arg(long)] verbose: bool, + /// Show first N violating rows per failed rule (default 5) + #[arg(long, value_name = "N", default_missing_value = "5", num_args = 0..=1)] + show_violations: Option, } #[tokio::main] @@ -102,11 +105,19 @@ async fn run(args: Cli) -> anyhow::Result<()> { } let mut results: Vec = Vec::new(); for rule in &rules.rules { - let result = run_rule(&ctx, rule, total_rows) + let mut result = run_rule(&ctx, rule, total_rows) .await .with_context(|| format!("Rule '{}' failed to execute", rule.name))?; if matches!(result.status, RuleStatus::Fail) { any_failed = true; + if let Some(limit) = args.show_violations { + let samples = fetch_violation_samples(&ctx, rule, limit) + .await + .with_context(|| { + format!("Failed to fetch violation samples for rule '{}'", rule.name) + })?; + result.sample_rows = Some(samples); + } } results.push(result); } diff --git a/src/output.rs b/src/output.rs index a0fff82..4a88ade 100644 --- a/src/output.rs +++ b/src/output.rs @@ -45,16 +45,65 @@ pub fn build_json(results: &[RuleResult]) -> String { } pub fn build_table(results: &[RuleResult]) -> String { + let has_samples = results + .iter() + .any(|r| r.sample_rows.as_ref().is_some_and(|s| !s.is_empty())); + let mut table = Table::new(); - table.set_header(["RULE", "STATUS", "VIOLATIONS", "TOTAL", "RATE"]); - results.iter().for_each(|res| { - table.add_row([ - res.name.clone(), - format!("{}", res.status), - res.violations.to_string(), - res.total_rows.to_string(), - format!("{:.1}%", res.violation_rate * 100.0), + if has_samples { + table.set_header([ + "RULE", + "STATUS", + "VIOLATIONS", + "TOTAL", + "RATE", + "SAMPLE VIOLATIONS", ]); + } else { + table.set_header(["RULE", "STATUS", "VIOLATIONS", "TOTAL", "RATE"]); + } + results.iter().for_each(|res| { + if has_samples { + let sample_str = match &res.sample_rows { + Some(rows) if !rows.is_empty() => { + let parts: Vec = rows + .iter() + .map(|row| { + // Render each row as compact JSON; truncate long values + let s = serde_json::to_string(row).unwrap_or_default(); + if s.len() > 60 { + format!("{}…", &s[..60]) + } else { + s + } + }) + .collect(); + let joined = parts.join(", "); + if joined.len() > 120 { + format!("{}…", &joined[..120]) + } else { + joined + } + } + _ => String::new(), + }; + table.add_row([ + res.name.clone(), + format!("{}", res.status), + res.violations.to_string(), + res.total_rows.to_string(), + format!("{:.1}%", res.violation_rate * 100.0), + sample_str, + ]); + } else { + table.add_row([ + res.name.clone(), + format!("{}", res.status), + res.violations.to_string(), + res.total_rows.to_string(), + format!("{:.1}%", res.violation_rate * 100.0), + ]); + } }); table.to_string() } diff --git a/src/runner.rs b/src/runner.rs index 2e5ffcc..caa876c 100644 --- a/src/runner.rs +++ b/src/runner.rs @@ -1,8 +1,13 @@ use crate::rules::{Check, Rule}; use anyhow::Context; -use datafusion::arrow::array::Int64Array; +use datafusion::arrow::array::{ + Array, BooleanArray, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, + StringArray, +}; +use datafusion::arrow::datatypes::DataType; use datafusion::prelude::*; use serde::Serialize; +use serde_json::{json, Value as JsonValue}; #[derive(Debug, Serialize)] #[serde(rename_all = "snake_case")] @@ -27,6 +32,8 @@ pub struct RuleResult { pub violations: u64, pub total_rows: u64, pub violation_rate: f64, + #[serde(skip_serializing_if = "Option::is_none")] + pub sample_rows: Option>, } fn build_sql(rule: &Rule) -> anyhow::Result { @@ -83,6 +90,59 @@ fn build_sql(rule: &Rule) -> anyhow::Result { } } +/// Build a SQL query that SELECTs the violating rows (not a COUNT). +/// Returns `Ok(None)` for `Custom` checks, which cannot produce sample rows. +pub fn build_violation_sample_sql(rule: &Rule, limit: u64) -> anyhow::Result> { + match &rule.check { + Check::NotNull => Ok(Some(format!( + "SELECT * FROM data WHERE \"{}\" IS NULL LIMIT {}", + rule.column, limit + ))), + Check::NotEmpty => Ok(Some(format!( + "SELECT * FROM data WHERE \"{}\" = '' LIMIT {}", + rule.column, limit + ))), + Check::Min => { + let min = rule.min.context("Min check requires a min value")?; + Ok(Some(format!( + "SELECT * FROM data WHERE \"{}\" < {} LIMIT {}", + rule.column, min, limit + ))) + } + Check::Max => { + let max = rule.max.context("Max check requires a max value")?; + Ok(Some(format!( + "SELECT * FROM data WHERE \"{}\" > {} LIMIT {}", + rule.column, max, limit + ))) + } + Check::Between => { + let min = rule.min.context("Between check requires a min value")?; + let max = rule.max.context("Between check requires a max value")?; + Ok(Some(format!( + "SELECT * FROM data WHERE \"{}\" < {} OR \"{}\" > {} LIMIT {}", + rule.column, min, rule.column, max, limit + ))) + } + Check::Unique => Ok(Some(format!( + "SELECT * FROM data WHERE \"{}\" IN (SELECT \"{}\" FROM data GROUP BY \"{}\" HAVING COUNT(*) > 1) LIMIT {}", + rule.column, rule.column, rule.column, limit + ))), + Check::Regex => { + let pattern = rule + .pattern + .clone() + .context("Regex check requires a pattern value")?; + let escaped = pattern.replace('\'', "''"); + Ok(Some(format!( + "SELECT * FROM data WHERE REGEXP_MATCH(\"{}\", '{}') IS NULL LIMIT {}", + rule.column, escaped, limit + ))) + } + Check::Custom => Ok(None), + } +} + pub fn validate_rule(rule: &Rule) -> anyhow::Result<()> { build_sql(rule)?; Ok(()) @@ -115,6 +175,137 @@ pub async fn run_sql(ctx: &SessionContext, sql: String) -> anyhow::Result { Ok(values) } +/// Fetch up to `limit` rows that violate the rule, as JSON objects. +/// Returns an empty vec for Custom checks or when there are no violations. +pub async fn fetch_violation_samples( + ctx: &SessionContext, + rule: &Rule, + limit: u64, +) -> anyhow::Result> { + let Some(sql) = build_violation_sample_sql(rule, limit)? else { + return Ok(vec![]); + }; + let df = ctx.sql(&sql).await.context("Sample SQL query failed")?; + let batches = df + .collect() + .await + .context("Failed to collect sample results")?; + + let mut rows: Vec = Vec::new(); + for batch in &batches { + let schema = batch.schema(); + let num_rows = batch.num_rows(); + for row_idx in 0..num_rows { + let mut obj = serde_json::Map::new(); + for col_idx in 0..batch.num_columns() { + let col = batch.column(col_idx); + let field = schema.field(col_idx); + let col_name = field.name().clone(); + let value: JsonValue = match field.data_type() { + DataType::Int8 => { + if col.is_null(row_idx) { + JsonValue::Null + } else { + json!(col + .as_any() + .downcast_ref::() + .unwrap() + .value(row_idx)) + } + } + DataType::Int16 => { + if col.is_null(row_idx) { + JsonValue::Null + } else { + json!(col + .as_any() + .downcast_ref::() + .unwrap() + .value(row_idx)) + } + } + DataType::Int32 => { + if col.is_null(row_idx) { + JsonValue::Null + } else { + json!(col + .as_any() + .downcast_ref::() + .unwrap() + .value(row_idx)) + } + } + DataType::Int64 => { + if col.is_null(row_idx) { + JsonValue::Null + } else { + json!(col + .as_any() + .downcast_ref::() + .unwrap() + .value(row_idx)) + } + } + DataType::Float32 => { + if col.is_null(row_idx) { + JsonValue::Null + } else { + json!(col + .as_any() + .downcast_ref::() + .unwrap() + .value(row_idx)) + } + } + DataType::Float64 => { + if col.is_null(row_idx) { + JsonValue::Null + } else { + json!(col + .as_any() + .downcast_ref::() + .unwrap() + .value(row_idx)) + } + } + DataType::Utf8 => { + if col.is_null(row_idx) { + JsonValue::Null + } else { + json!(col + .as_any() + .downcast_ref::() + .unwrap() + .value(row_idx)) + } + } + DataType::Boolean => { + if col.is_null(row_idx) { + JsonValue::Null + } else { + json!(col + .as_any() + .downcast_ref::() + .unwrap() + .value(row_idx)) + } + } + _ => { + if col.is_null(row_idx) { + JsonValue::Null + } else { + json!(format!("{:?}", col.slice(row_idx, 1))) + } + } + }; + obj.insert(col_name, value); + } + rows.push(JsonValue::Object(obj)); + } + } + Ok(rows) +} + pub async fn run_rule( ctx: &SessionContext, rule: &Rule, @@ -136,6 +327,7 @@ pub async fn run_rule( violations, total_rows, violation_rate, + sample_rows: None, }) } @@ -441,4 +633,132 @@ mod test { ]; assert!(validate_threshold(&rules).is_ok()); } + + // ------------------------------------------------------------------ + // fetch_violation_samples tests + // ------------------------------------------------------------------ + + #[tokio::test] + async fn test_fetch_samples_not_null() { + let ctx = + make_ctx("CREATE TABLE data AS SELECT * FROM (VALUES (1), (NULL), (3)) AS t(age)") + .await; + let rule = make_rule("age_not_null", "age", Check::NotNull); + let samples = fetch_violation_samples(&ctx, &rule, 5).await.unwrap(); + assert_eq!(samples.len(), 1); + assert!(samples[0]["age"].is_null()); + } + + #[tokio::test] + async fn test_fetch_samples_min() { + let ctx = + make_ctx("CREATE TABLE data AS SELECT * FROM (VALUES (1), (5), (10)) AS t(age)").await; + let rule = Rule { + min: Some(3.0), + ..make_rule("age_min", "age", Check::Min) + }; + let samples = fetch_violation_samples(&ctx, &rule, 5).await.unwrap(); + assert_eq!(samples.len(), 1); + // value 1 is below min 3 + assert_eq!(samples[0]["age"], json!(1_i64)); + } + + #[tokio::test] + async fn test_fetch_samples_max() { + let ctx = + make_ctx("CREATE TABLE data AS SELECT * FROM (VALUES (1), (5), (10)) AS t(age)").await; + let rule = Rule { + max: Some(4.0), + ..make_rule("age_max", "age", Check::Max) + }; + let samples = fetch_violation_samples(&ctx, &rule, 5).await.unwrap(); + // values 5 and 10 are above max 4 + assert_eq!(samples.len(), 2); + } + + #[tokio::test] + async fn test_fetch_samples_between() { + let ctx = + make_ctx("CREATE TABLE data AS SELECT * FROM (VALUES (1), (5), (10)) AS t(age)").await; + let rule = Rule { + min: Some(2.0), + max: Some(8.0), + ..make_rule("age_between", "age", Check::Between) + }; + let samples = fetch_violation_samples(&ctx, &rule, 5).await.unwrap(); + // values 1 and 10 are out of range + assert_eq!(samples.len(), 2); + } + + #[tokio::test] + async fn test_fetch_samples_not_empty() { + let ctx = make_ctx( + "CREATE TABLE data AS SELECT * FROM (VALUES ('hello'), (''), ('world')) AS t(name)", + ) + .await; + let rule = make_rule("name_not_empty", "name", Check::NotEmpty); + let samples = fetch_violation_samples(&ctx, &rule, 5).await.unwrap(); + assert_eq!(samples.len(), 1); + assert_eq!(samples[0]["name"], json!("")); + } + + #[tokio::test] + async fn test_fetch_samples_unique() { + let ctx = + make_ctx("CREATE TABLE data AS SELECT * FROM (VALUES ('a'), ('b'), ('a')) AS t(name)") + .await; + let rule = make_rule("name_unique", "name", Check::Unique); + let samples = fetch_violation_samples(&ctx, &rule, 5).await.unwrap(); + // both 'a' rows are duplicates + assert_eq!(samples.len(), 2); + assert!(samples.iter().all(|r| r["name"] == json!("a"))); + } + + #[tokio::test] + async fn test_fetch_samples_regex() { + let ctx = make_ctx("CREATE TABLE data AS SELECT * FROM (VALUES ('foo@bar.com'), ('notanemail')) AS t(email)").await; + let rule = Rule { + pattern: Some("^[^@]+@[^@]+$".to_string()), + ..make_rule("email_regex", "email", Check::Regex) + }; + let samples = fetch_violation_samples(&ctx, &rule, 5).await.unwrap(); + assert_eq!(samples.len(), 1); + assert_eq!(samples[0]["email"], json!("notanemail")); + } + + #[tokio::test] + async fn test_fetch_samples_limit() { + // 10 violating rows, request only 3 + let ctx = make_ctx( + "CREATE TABLE data AS SELECT * FROM (VALUES \ + (NULL),(NULL),(NULL),(NULL),(NULL),(NULL),(NULL),(NULL),(NULL),(NULL)\ + ) AS t(age)", + ) + .await; + let rule = make_rule("age_not_null", "age", Check::NotNull); + let samples = fetch_violation_samples(&ctx, &rule, 3).await.unwrap(); + assert_eq!(samples.len(), 3); + } + + #[tokio::test] + async fn test_fetch_samples_no_violations() { + let ctx = + make_ctx("CREATE TABLE data AS SELECT * FROM (VALUES (1), (2), (3)) AS t(age)").await; + let rule = make_rule("age_not_null", "age", Check::NotNull); + let samples = fetch_violation_samples(&ctx, &rule, 5).await.unwrap(); + assert!(samples.is_empty()); + } + + #[tokio::test] + async fn test_fetch_samples_custom_returns_empty() { + let ctx = + make_ctx("CREATE TABLE data AS SELECT * FROM (VALUES (1), (NULL)) AS t(age)").await; + let rule = Rule { + sql: Some("SELECT COUNT(*) FROM data WHERE age IS NULL".into()), + ..make_rule("age_not_null", "age", Check::Custom) + }; + // Custom check cannot produce sample rows — must return empty vec + let samples = fetch_violation_samples(&ctx, &rule, 5).await.unwrap(); + assert!(samples.is_empty()); + } }