diff --git a/src/lineage.rs b/src/lineage.rs index e568144..6b098e0 100644 --- a/src/lineage.rs +++ b/src/lineage.rs @@ -140,6 +140,21 @@ impl LineageManager { Ok(()) } + + /// Returns generated batch identifier for this run. + pub fn batch_id(&self) -> &str { + &self.batch_id + } + + /// Returns generated batch hash for this run. + pub fn batch_hash(&self) -> &str { + &self.batch_hash + } + + /// Returns generated batch timestamp for this run. + pub fn batch_timestamp(&self) -> DateTime { + self.batch_timestamp + } } fn parse_requested_columns(lineage: Option<&str>) -> Result, DtooError> { diff --git a/src/main.rs b/src/main.rs index dcedd01..2cde342 100644 --- a/src/main.rs +++ b/src/main.rs @@ -10,6 +10,8 @@ mod fingerprint; #[allow(dead_code)] mod lineage; #[allow(dead_code)] +mod manifest; +#[allow(dead_code)] mod masking; #[allow(dead_code)] mod on_error; diff --git a/src/manifest.rs b/src/manifest.rs new file mode 100644 index 0000000..b2d61a7 --- /dev/null +++ b/src/manifest.rs @@ -0,0 +1,150 @@ +use std::{collections::BTreeMap, path::Path}; + +use chrono::{DateTime, Utc}; +use serde::Serialize; + +use crate::{cli::QueryArgs, error::DtooError}; + +#[derive(Clone, Debug, Serialize)] +pub struct Manifest { + pub batch_id: String, + pub batch_hash: String, + pub batch_timestamp: String, + pub dtoo_version: String, + pub command: ManifestCommand, + pub files: ManifestFiles, + pub output: ManifestOutput, + pub timing: ManifestTiming, + #[serde(skip_serializing_if = "Option::is_none")] + pub error: Option, +} + +#[derive(Clone, Debug, Serialize)] +pub struct ManifestCommand { + pub glob: Option, + pub exclude: Vec, + #[serde(rename = "where")] + pub where_clause: Option, + pub filter_sql: Option, + pub post_sql: Option, + #[serde(rename = "ref")] + pub ref_tables: BTreeMap, + pub schema: Option, + pub output_format: String, + pub lineage: Option, + pub mask: Vec, + pub on_error: String, +} + +#[derive(Clone, Debug, Serialize)] +pub struct ManifestFiles { + pub total: usize, + pub processed: usize, + pub skipped: usize, + pub details: Vec, +} + +#[derive(Clone, Debug, Serialize)] +pub struct ManifestFileDetail { + pub path: String, + pub rows_matched: usize, + pub status: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub error: Option, +} + +#[derive(Clone, Debug, Serialize)] +pub struct ManifestOutput { + pub path: Option, + pub format: String, + pub rows: usize, + pub fingerprint: Option, + pub compressed: bool, +} + +#[derive(Clone, Debug, Serialize)] +pub struct ManifestTiming { + pub started: String, + pub finished: String, + pub duration_seconds: f64, +} + +pub struct ManifestWriter; + +impl ManifestWriter { + pub fn write(path: &Path, manifest: &Manifest) -> Result<(), DtooError> { + if let Some(parent) = path.parent() + && !parent.as_os_str().is_empty() + { + std::fs::create_dir_all(parent).map_err(|source| DtooError::Output { + message: format!( + "failed to create manifest directory {}: {source}", + parent.display() + ), + })?; + } + + let contents = serde_yaml::to_string(manifest).map_err(|source| DtooError::Output { + message: format!("failed to serialize manifest YAML: {source}"), + })?; + + std::fs::write(path, contents).map_err(|source| DtooError::Output { + message: format!("failed to write manifest {}: {source}", path.display()), + }) + } +} + +pub fn build_command(args: &QueryArgs) -> ManifestCommand { + let mut ref_tables = BTreeMap::new(); + for reference in &args.refs { + if let Some((name, path)) = reference.split_once('=') { + ref_tables.insert(name.trim().to_string(), path.trim().to_string()); + } + } + + ManifestCommand { + glob: args.glob.clone(), + exclude: args.exclude.clone(), + where_clause: args.where_clause.clone(), + filter_sql: args.filter_sql.clone(), + post_sql: args.post_sql.clone(), + ref_tables, + schema: args.schema.as_ref().map(|path| path.display().to_string()), + output_format: output_format_label(args), + lineage: args.lineage.clone(), + mask: mask_columns(args.mask.as_deref()), + on_error: on_error_label(args), + } +} + +pub fn duration_seconds(started: DateTime, finished: DateTime) -> f64 { + let millis = (finished - started).num_milliseconds(); + (millis as f64) / 1000.0 +} + +fn output_format_label(args: &QueryArgs) -> String { + match args.output_format { + crate::cli::OutputFormat::Csv => "csv".to_string(), + crate::cli::OutputFormat::Parquet => "parquet".to_string(), + crate::cli::OutputFormat::Ndjson => "ndjson".to_string(), + } +} + +fn on_error_label(args: &QueryArgs) -> String { + match args.on_error { + crate::cli::OnErrorMode::Skip => "skip".to_string(), + crate::cli::OnErrorMode::Fail => "fail".to_string(), + } +} + +fn mask_columns(mask: Option<&str>) -> Vec { + mask.map(|value| { + value + .split(',') + .map(str::trim) + .filter(|column| !column.is_empty()) + .map(ToString::to_string) + .collect::>() + }) + .unwrap_or_default() +} diff --git a/src/query_pipeline.rs b/src/query_pipeline.rs index b26f7ab..ae7d6d8 100644 --- a/src/query_pipeline.rs +++ b/src/query_pipeline.rs @@ -1,4 +1,6 @@ +use chrono::Utc; use std::time::Instant; +use uuid::Uuid; use crate::{ cli::{CompressMethod, OnErrorMode, OutputFormat, PipeMode, QueryArgs, StdinFormat}, @@ -9,6 +11,10 @@ use crate::{ file_resolution::{FileFormat, FileResolver, FileResolverConfig, ResolutionReport}, fingerprint::fingerprint_file, lineage::{LineageContext, LineageManager}, + manifest::{ + Manifest, ManifestFileDetail, ManifestFiles, ManifestOutput, ManifestTiming, + ManifestWriter, build_command, duration_seconds, + }, masking::MaskingEngine, output_writer::{OutputWriter, OutputWriterConfig}, profiler::{ProfileOptions, Profiler}, @@ -106,264 +112,302 @@ pub struct PipelineResult { pub fingerprint: Option, pub batch_id: Option, pub batch_hash: Option, + pub batch_timestamp: Option, + pub output_path: Option, pub timing: Timing, } impl QueryPipeline { /// Run the query pipeline from parsed CLI arguments. pub fn run(args: &QueryArgs) -> Result<(), DtooError> { + let started_at = Utc::now(); + let mut summary = PipelineResult { + batch_id: Some(Uuid::new_v4().to_string()), + batch_hash: Some(Uuid::new_v4().to_string()), + batch_timestamp: Some(started_at.to_rfc3339()), + ..PipelineResult::default() + }; + let logger = VerboseLogger::new(args.verbose); logger.log("Starting dtoo query"); - - let resolution = if args.dry_run { - resolve_files_for_dry_run(args)? - } else { - resolve_files(args)? - }; - let files = resolution.files; - logger.log(format!( - "Resolved {} files from {}", - files.len(), - describe_input_source(args) - )); - if resolution.excluded_count > 0 { + let result = (|| -> Result<(), DtooError> { + let resolution = if args.dry_run { + resolve_files_for_dry_run(args)? + } else { + resolve_files(args)? + }; + let files = resolution.files; + summary.files_total = files.len(); logger.log(format!( - "Excluded {} files", - format_count(resolution.excluded_count) + "Resolved {} files from {}", + files.len(), + describe_input_source(args) )); - } - let _temp_cleanup = TempInputCleanup::from_files(&files); - if args.dry_run { - eprintln!("{}", build_dry_run_plan(args, &files)); - return Ok(()); - } - - if files.is_empty() { - return Err(DtooError::Config { - message: "no input files resolved".to_string(), - }); - } + if resolution.excluded_count > 0 { + logger.log(format!( + "Excluded {} files", + format_count(resolution.excluded_count) + )); + } + let _temp_cleanup = TempInputCleanup::from_files(&files); + if args.dry_run { + eprintln!("{}", build_dry_run_plan(args, &files)); + return Ok(()); + } - let engine = DuckDbEngine::new(EngineConfig { - cloud: CloudSettings { - s3_region: args.s3_region.clone(), - s3_access_key_id: None, - gcs_project_id: args.gcs_project.clone(), - azure_storage_account_name: args.azure_account.clone(), - }, - load_extensions: false, - })?; - - if let Some(schema_path) = &args.schema { - logger.log(format!("Using explicit schema: {}", schema_path.display())); - } else { - logger.log("Auto-detecting schema"); - } + if files.is_empty() { + return Err(DtooError::Config { + message: "no input files resolved".to_string(), + }); + } - let refs = parse_reference_tables(&args.refs, args.delimiter, args.sheet.as_deref())?; - let loaded_refs = load_reference_tables(&engine, &refs)?; - for loaded in &loaded_refs { - logger.log(format!( - "Loading ref table: {} ({}) — {} rows", - loaded.name, - loaded.path, - format_count(loaded.row_count) - )); - } + let engine = DuckDbEngine::new(EngineConfig { + cloud: CloudSettings { + s3_region: args.s3_region.clone(), + s3_access_key_id: None, + gcs_project_id: args.gcs_project.clone(), + azure_storage_account_name: args.azure_account.clone(), + }, + load_extensions: false, + })?; + + if let Some(schema_path) = &args.schema { + logger.log(format!("Using explicit schema: {}", schema_path.display())); + } else { + logger.log("Auto-detecting schema"); + } - let schema_manager = SchemaManager::from_schema_path(args.schema.as_deref())?; - let mut lineage_manager = LineageManager::new( - args.lineage.as_deref(), - LineageContext { - where_clause: args.where_clause.clone(), - filter_sql: args.filter_sql.clone(), - post_sql: args.post_sql.clone(), - files: files.iter().map(|f| f.path.clone()).collect(), - schema_path: args - .schema - .as_ref() - .map(|path| path.to_string_lossy().to_string()), - }, - )?; - - let mut processed = 0usize; - let mut skipped = 0usize; - let mut first_insert_done = false; - for (idx, file) in files.iter().enumerate() { - let input_format = to_input_format(file.format.clone(), file.sheet.clone()); - let result = (|| -> Result { - engine.register_magic_table(&file.path, &input_format)?; - let query_sql = prepare_filter_query( - &engine, - args.where_clause.as_deref(), - args.filter_sql.as_deref(), - )?; - - if first_insert_done { - let rows = schema_manager.insert_file_rows(&engine, Some(&query_sql))?; - lineage_manager.tag_rows_with_origin(&engine, &file.path)?; - Ok(rows) - } else { - let rows = schema_manager.initialize_temp_results(&engine, Some(&query_sql))?; - first_insert_done = true; - lineage_manager.tag_rows_with_origin(&engine, &file.path)?; - Ok(rows) - } - })(); + let refs = parse_reference_tables(&args.refs, args.delimiter, args.sheet.as_deref())?; + let loaded_refs = load_reference_tables(&engine, &refs)?; + for loaded in &loaded_refs { + logger.log(format!( + "Loading ref table: {} ({}) — {} rows", + loaded.name, + loaded.path, + format_count(loaded.row_count) + )); + } - match result { - Ok(rows_matched) => { - processed += 1; - logger.log(format!( - "[{}/{}] {} — {} rows matched", - idx + 1, - files.len(), - file.path, - format_count(rows_matched) - )); - } - Err(err) => { - if args.on_error == OnErrorMode::Fail { - return Err(err); + let schema_manager = SchemaManager::from_schema_path(args.schema.as_deref())?; + let mut lineage_manager = LineageManager::new( + args.lineage.as_deref(), + LineageContext { + where_clause: args.where_clause.clone(), + filter_sql: args.filter_sql.clone(), + post_sql: args.post_sql.clone(), + files: files.iter().map(|f| f.path.clone()).collect(), + schema_path: args + .schema + .as_ref() + .map(|path| path.to_string_lossy().to_string()), + }, + )?; + summary.batch_id = Some(lineage_manager.batch_id().to_string()); + summary.batch_hash = Some(lineage_manager.batch_hash().to_string()); + summary.batch_timestamp = Some(lineage_manager.batch_timestamp().to_rfc3339()); + + let mut processed = 0usize; + let mut skipped = 0usize; + let mut first_insert_done = false; + for (idx, file) in files.iter().enumerate() { + let input_format = to_input_format(file.format.clone(), file.sheet.clone()); + let result = (|| -> Result { + engine.register_magic_table(&file.path, &input_format)?; + let query_sql = prepare_filter_query( + &engine, + args.where_clause.as_deref(), + args.filter_sql.as_deref(), + )?; + + if first_insert_done { + let rows = schema_manager.insert_file_rows(&engine, Some(&query_sql))?; + lineage_manager.tag_rows_with_origin(&engine, &file.path)?; + Ok(rows) + } else { + let rows = + schema_manager.initialize_temp_results(&engine, Some(&query_sql))?; + first_insert_done = true; + lineage_manager.tag_rows_with_origin(&engine, &file.path)?; + Ok(rows) + } + })(); + + match result { + Ok(rows_matched) => { + processed += 1; + summary.file_details.push(FileResult { + path: file.path.clone(), + rows: rows_matched, + skipped: false, + message: None, + }); + logger.log(format!( + "[{}/{}] {} — {} rows matched", + idx + 1, + files.len(), + file.path, + format_count(rows_matched) + )); + } + Err(err) => { + if args.on_error == OnErrorMode::Fail { + return Err(err); + } + skipped += 1; + summary.file_details.push(FileResult { + path: file.path.clone(), + rows: 0, + skipped: true, + message: Some(err.to_string()), + }); + logger.log(format!( + "[{}/{}] {} — SKIPPED ({})", + idx + 1, + files.len(), + file.path, + err + )); + eprintln!("Warning: Skipping {}: {err}", file.path); } - skipped += 1; - logger.log(format!( - "[{}/{}] {} — SKIPPED ({})", - idx + 1, - files.len(), - file.path, - err - )); - eprintln!("Warning: Skipping {}: {err}", file.path); } } - } - - if processed == 0 { - return Err(DtooError::PartialFailure { - processed, - total: files.len(), - skipped, - }); - } + summary.files_processed = processed; + summary.files_skipped = skipped; + + if processed == 0 { + return Err(DtooError::PartialFailure { + processed, + total: files.len(), + skipped, + }); + } - if let Some(post_sql) = &args.post_sql { - logger.log("Applying post-sql..."); - engine.execute("CREATE OR REPLACE VIEW _ AS SELECT * FROM temp_results")?; - engine.execute(&format!( - "CREATE OR REPLACE TABLE temp_results AS {post_sql}" - ))?; - let post_count = engine.query_count("SELECT COUNT(*) FROM temp_results")?; - logger.log(format!( - "Post-sql complete: {} rows", - format_count(post_count) - )); - } + if let Some(post_sql) = &args.post_sql { + logger.log("Applying post-sql..."); + engine.execute("CREATE OR REPLACE VIEW _ AS SELECT * FROM temp_results")?; + engine.execute(&format!( + "CREATE OR REPLACE TABLE temp_results AS {post_sql}" + ))?; + let post_count = engine.query_count("SELECT COUNT(*) FROM temp_results")?; + logger.log(format!( + "Post-sql complete: {} rows", + format_count(post_count) + )); + } - if let Some(mask) = &args.mask { - logger.log(format!("Applying masking: [{mask}]")); - } - let masking = MaskingEngine::new(args.mask.as_deref(), &args.mask_salt)?; - masking.apply(&engine)?; + if let Some(mask) = &args.mask { + logger.log(format!("Applying masking: [{mask}]")); + } + let masking = MaskingEngine::new(args.mask.as_deref(), &args.mask_salt)?; + masking.apply(&engine)?; - if let Some(lineage) = &args.lineage { - logger.log(format!("Adding lineage columns: [{lineage}]")); - } - lineage_manager.apply_columns(&engine)?; + if let Some(lineage) = &args.lineage { + logger.log(format!("Adding lineage columns: [{lineage}]")); + } + lineage_manager.apply_columns(&engine)?; - if let Some(limit) = args.limit { - logger.log(format!("Applying limit: {}", format_count(limit))); - engine.execute(&format!( - "CREATE OR REPLACE TABLE temp_results AS SELECT * FROM temp_results LIMIT {limit}" - ))?; - } + if let Some(limit) = args.limit { + logger.log(format!("Applying limit: {}", format_count(limit))); + engine.execute(&format!( + "CREATE OR REPLACE TABLE temp_results AS SELECT * FROM temp_results LIMIT {limit}" + ))?; + } - let row_count = engine.query_count("SELECT COUNT(*) FROM temp_results")?; - logger.log(format!( - "Accumulation complete: {} rows", - format_count(row_count) - )); - if let Some(expected_min) = args.expect_at_least - && row_count < expected_min - { - return Err(DtooError::ExpectAtLeast { - expected: expected_min, - actual: row_count, - }); - } - if let Some(expected_min) = args.expect_at_least { + let row_count = engine.query_count("SELECT COUNT(*) FROM temp_results")?; + summary.rows_output = row_count; logger.log(format!( - "Validation: {} rows (expected at least {})", - format_count(row_count), - format_count(expected_min) + "Accumulation complete: {} rows", + format_count(row_count) )); - } + if let Some(expected_min) = args.expect_at_least + && row_count < expected_min + { + return Err(DtooError::ExpectAtLeast { + expected: expected_min, + actual: row_count, + }); + } + if let Some(expected_min) = args.expect_at_least { + logger.log(format!( + "Validation: {} rows (expected at least {})", + format_count(row_count), + format_count(expected_min) + )); + } - if args.count { - logger.log(format!("Row count: {}", format_count(row_count))); - println!("{row_count}"); - } + if args.count { + logger.log(format!("Row count: {}", format_count(row_count))); + println!("{row_count}"); + } - let should_write_output = args.output.is_some() || !args.count; - let mut written_output_path = None; - if should_write_output { - logger.log(format!("Writing output: {}", summarize_output(args))); - let writer = OutputWriter::new(OutputWriterConfig { - output: args.output.clone(), - format: to_export_format(args.output_format), - header: !args.no_header, - delimiter: args.delimiter, - compression: args.compress.map(to_compression_codec), - }); - written_output_path = writer.write_and_get_destination(&engine)?; - } + let should_write_output = args.output.is_some() || !args.count; + let mut written_output_path = None; + if should_write_output { + logger.log(format!("Writing output: {}", summarize_output(args))); + let writer = OutputWriter::new(OutputWriterConfig { + output: args.output.clone(), + format: to_export_format(args.output_format), + header: !args.no_header, + delimiter: args.delimiter, + compression: args.compress.map(to_compression_codec), + }); + written_output_path = writer.write_and_get_destination(&engine)?; + summary.output_path = written_output_path + .as_ref() + .map(|path| path.display().to_string()); + } - if let Some(profile_path) = &args.profile { - if args.count && args.output.is_none() { - eprintln!("Warning: --profile is skipped when using --count without --output"); - } else { - logger.log(format!( - "Writing profile: {} ({})", - profile_path.display(), - profile_format_to_str(args.profile_format) - )); - Profiler::generate( - &engine, - &ProfileOptions { - path: profile_path.clone(), - format: args.profile_format, - sample_percentage: args.profile_sample, - }, - )?; + if let Some(profile_path) = &args.profile { + if args.count && args.output.is_none() { + eprintln!("Warning: --profile is skipped when using --count without --output"); + } else { + logger.log(format!( + "Writing profile: {} ({})", + profile_path.display(), + profile_format_to_str(args.profile_format) + )); + Profiler::generate( + &engine, + &ProfileOptions { + path: profile_path.clone(), + format: args.profile_format, + sample_percentage: args.profile_sample, + }, + )?; + } } - } - if args.fingerprint { - if let Some(path) = written_output_path { - let hash = fingerprint_file(&path)?; - logger.log(format!("Fingerprint: sha256:{hash}")); - eprintln!("{hash} {}", path.display()); - } else { - eprintln!( - "Warning: --fingerprint requires --output (cannot fingerprint stdout stream)" - ); + if args.fingerprint { + if let Some(path) = written_output_path { + let hash = fingerprint_file(&path)?; + summary.fingerprint = Some(format!("sha256:{hash}")); + logger.log(format!("Fingerprint: sha256:{hash}")); + eprintln!("{hash} {}", path.display()); + } else { + eprintln!( + "Warning: --fingerprint requires --output (cannot fingerprint stdout stream)" + ); + } } - } - if skipped > 0 { - return Err(DtooError::PartialFailure { - processed, - total: files.len(), - skipped, - }); - } + if skipped > 0 { + return Err(DtooError::PartialFailure { + processed, + total: files.len(), + skipped, + }); + } + + logger.log(format!( + "Done. {} rows written in {}", + format_count(row_count), + logger.elapsed_seconds_1dp() + )); + Ok(()) + })(); - logger.log(format!( - "Done. {} rows written in {}", - format_count(row_count), - logger.elapsed_seconds_1dp() - )); - Ok(()) + write_manifest_if_requested(args, &summary, started_at, result.as_ref().err()); + result } } @@ -625,6 +669,71 @@ fn describe_input_source(args: &QueryArgs) -> String { "explicit paths".to_string() } +fn write_manifest_if_requested( + args: &QueryArgs, + summary: &PipelineResult, + started_at: chrono::DateTime, + error: Option<&DtooError>, +) { + let Some(path) = &args.manifest else { + return; + }; + + let finished_at = Utc::now(); + let manifest = Manifest { + batch_id: summary.batch_id.clone().unwrap_or_default(), + batch_hash: summary.batch_hash.clone().unwrap_or_default(), + batch_timestamp: summary + .batch_timestamp + .clone() + .unwrap_or_else(|| started_at.to_rfc3339()), + dtoo_version: env!("CARGO_PKG_VERSION").to_string(), + command: build_command(args), + files: ManifestFiles { + total: summary.files_total, + processed: summary.files_processed, + skipped: summary.files_skipped, + details: summary + .file_details + .iter() + .map(|detail| ManifestFileDetail { + path: detail.path.clone(), + rows_matched: detail.rows, + status: if detail.skipped { + "skipped".to_string() + } else { + "ok".to_string() + }, + error: detail.message.clone(), + }) + .collect(), + }, + output: ManifestOutput { + path: summary + .output_path + .clone() + .or_else(|| args.output.as_ref().map(|p| p.display().to_string())), + format: output_format_to_str(args.output_format).to_string(), + rows: summary.rows_output, + fingerprint: summary.fingerprint.clone(), + compressed: args.compress.is_some(), + }, + timing: ManifestTiming { + started: started_at.to_rfc3339(), + finished: finished_at.to_rfc3339(), + duration_seconds: duration_seconds(started_at, finished_at), + }, + error: error.map(ToString::to_string), + }; + + if let Err(err) = ManifestWriter::write(path, &manifest) { + eprintln!( + "Warning: failed to write manifest {}: {err}", + path.display() + ); + } +} + fn prepare_filter_query( engine: &DuckDbEngine, where_clause: Option<&str>, @@ -1089,6 +1198,122 @@ mod tests { let _ = fs::remove_file(input_csv); } + #[test] + fn pipeline_writes_manifest_on_success() { + let input_csv = temp_path("pipeline-manifest-input", "csv"); + fs::write(&input_csv, "id,value\n1,100\n").expect("write input"); + let output_csv = temp_path("pipeline-manifest-output", "csv"); + let manifest_yaml = temp_path("pipeline-manifest", "yaml"); + + let args = QueryArgs { + paths: vec![input_csv.to_string_lossy().into_owned()], + glob: None, + exclude: Vec::new(), + pipe: None, + stdin_format: None, + sheet: None, + where_clause: None, + filter_sql: None, + post_sql: None, + refs: Vec::new(), + schema: None, + output: Some(output_csv.clone()), + output_format: OutputFormat::Csv, + delimiter: ',', + lineage: None, + mask: None, + mask_salt: String::new(), + profile: None, + profile_format: ProfileFormat::Json, + profile_sample: 100, + limit: None, + no_header: false, + compress: None, + count: false, + expect_at_least: None, + fingerprint: false, + dry_run: false, + verbose: false, + on_error: OnErrorMode::Fail, + manifest: Some(manifest_yaml.clone()), + s3_region: None, + s3_profile: None, + gcs_project: None, + azure_account: None, + config: None, + }; + + QueryPipeline::run(&args).expect("pipeline should run"); + let manifest = fs::read_to_string(&manifest_yaml).expect("read manifest"); + assert!(manifest.contains("batch_id:")); + assert!(manifest.contains("dtoo_version:")); + assert!(manifest.contains("files:")); + assert!(manifest.contains("output:")); + assert!(manifest.contains("rows: 1")); + + let _ = fs::remove_file(input_csv); + let _ = fs::remove_file(output_csv); + let _ = fs::remove_file(manifest_yaml); + } + + #[test] + fn pipeline_writes_manifest_on_expect_at_least_failure() { + let input_csv = temp_path("pipeline-manifest-fail-input", "csv"); + fs::write(&input_csv, "id,value\n1,100\n").expect("write input"); + let output_csv = temp_path("pipeline-manifest-fail-output", "csv"); + let manifest_yaml = temp_path("pipeline-manifest-fail", "yaml"); + + let args = QueryArgs { + paths: vec![input_csv.to_string_lossy().into_owned()], + glob: None, + exclude: Vec::new(), + pipe: None, + stdin_format: None, + sheet: None, + where_clause: None, + filter_sql: None, + post_sql: None, + refs: Vec::new(), + schema: None, + output: Some(output_csv.clone()), + output_format: OutputFormat::Csv, + delimiter: ',', + lineage: None, + mask: None, + mask_salt: String::new(), + profile: None, + profile_format: ProfileFormat::Json, + profile_sample: 100, + limit: None, + no_header: false, + compress: None, + count: false, + expect_at_least: Some(10), + fingerprint: false, + dry_run: false, + verbose: false, + on_error: OnErrorMode::Fail, + manifest: Some(manifest_yaml.clone()), + s3_region: None, + s3_profile: None, + gcs_project: None, + azure_account: None, + config: None, + }; + + let err = QueryPipeline::run(&args).expect_err("expect-at-least should fail"); + assert!(matches!(err, DtooError::ExpectAtLeast { .. })); + + let manifest = fs::read_to_string(&manifest_yaml).expect("read manifest"); + assert!(manifest.contains("error:")); + assert!(manifest.contains("Expected at least")); + assert!(manifest.contains("rows: 1")); + + let _ = fs::remove_file(input_csv); + let _ = fs::remove_file(output_csv); + let _ = fs::remove_file(manifest_yaml); + } + #[test] fn dry_run_plan_truncates_file_list_and_shows_summary_fields() { let args = base_query_args();