Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,18 @@ pub struct ProfileArgs {

#[arg(long, default_value = ",")]
pub delimiter: char,

#[arg(long = "s3-region")]
pub s3_region: Option<String>,

#[arg(long = "s3-profile")]
pub s3_profile: Option<String>,

#[arg(long = "gcs-project")]
pub gcs_project: Option<String>,

#[arg(long = "azure-account")]
pub azure_account: Option<String>,
}

/// Arguments for `dtoo inspect`.
Expand Down Expand Up @@ -635,6 +647,37 @@ mod tests {
assert_eq!(args.format, ProfileFormat::Json);
assert_eq!(args.sample, 100);
assert_eq!(args.delimiter, ',');
assert_eq!(args.s3_region, None);
assert_eq!(args.s3_profile, None);
assert_eq!(args.gcs_project, None);
assert_eq!(args.azure_account, None);
}
_ => panic!("expected profile command"),
}
}

#[test]
fn parses_profile_cloud_options() {
let cli = parse([
"dtoo",
"profile",
"s3://bucket/data.parquet",
"--s3-region",
"eu-west-1",
"--s3-profile",
"analytics",
"--gcs-project",
"proj-1",
"--azure-account",
"acct1",
]);

match cli.command {
Commands::Profile(args) => {
assert_eq!(args.s3_region.as_deref(), Some("eu-west-1"));
assert_eq!(args.s3_profile.as_deref(), Some("analytics"));
assert_eq!(args.gcs_project.as_deref(), Some("proj-1"));
assert_eq!(args.azure_account.as_deref(), Some("acct1"));
}
_ => panic!("expected profile command"),
}
Expand Down
5 changes: 4 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ mod on_error;
#[allow(dead_code)]
mod output_writer;
#[allow(dead_code)]
mod profile_command;
#[allow(dead_code)]
mod profiler;
#[allow(dead_code)]
mod query_pipeline;
Expand All @@ -34,6 +36,7 @@ use cli::{Cli, Commands, QueryArgs};
use error::DtooError;
use fingerprint::fingerprint_file;
use inspect::run as run_inspect;
use profile_command::run as run_profile;
use query_pipeline::QueryPipeline;

fn main() -> ExitCode {
Expand Down Expand Up @@ -65,7 +68,7 @@ fn run() -> ExitCode {
fn dispatch(cli: Cli) -> Result<(), DtooError> {
match cli.command {
Commands::Query(args) => QueryPipeline::run(&args),
Commands::Profile(_args) => Ok(()),
Commands::Profile(args) => run_profile(&args),
Commands::Inspect(args) => run_inspect(&args),
Commands::Fingerprint(args) => {
let hash = fingerprint_file(&args.path)?;
Expand Down
226 changes: 226 additions & 0 deletions src/profile_command.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
use std::path::PathBuf;

use crate::{
cli::{OnErrorMode, PipeMode, ProfileArgs, StdinFormat},
engine::{CloudSettings, DuckDbEngine, EngineConfig, InputFormat},
error::DtooError,
file_resolution::{FileFormat, FileResolver, FileResolverConfig},
profiler::{ProfileOptions, Profiler},
};

pub fn run(args: &ProfileArgs) -> Result<(), DtooError> {
let input = args.path.to_string_lossy().to_string();
let resolver = FileResolver::new(FileResolverConfig {
glob: None,
paths: vec![input],
pipe_mode: None::<PipeMode>,
stdin_format: None::<StdinFormat>,
sheet: None,
exclude: Vec::new(),
delimiter: args.delimiter,
on_error: OnErrorMode::Fail,
s3_region: args.s3_region.clone(),
s3_profile: args.s3_profile.clone(),
gcs_project: args.gcs_project.clone(),
azure_account: args.azure_account.clone(),
});
let resolved = resolver.resolve()?;
let file = resolved.first().ok_or_else(|| DtooError::Config {
message: "profile requires exactly one input file".to_string(),
})?;

let engine = DuckDbEngine::new(EngineConfig {
cloud: CloudSettings {
s3_region: args.s3_region.clone(),
s3_profile: args.s3_profile.clone(),
s3_access_key_id: None,
gcs_project_id: args.gcs_project.clone(),
azure_storage_account_name: args.azure_account.clone(),
},
load_extensions: is_cloud_path(&file.path),
})?;
let format = to_input_format(&file.format, file.sheet.as_deref());

engine.register_magic_table(&file.path, &format)?;
engine.execute("DROP VIEW IF EXISTS temp_results")?;
engine.execute("DROP TABLE IF EXISTS _profile_data")?;
engine.execute("CREATE TABLE _profile_data AS SELECT * FROM _")?;
engine.execute("CREATE VIEW temp_results AS SELECT * FROM _profile_data")?;

let options = ProfileOptions {
path: args.output.clone().unwrap_or_else(|| PathBuf::from("-")),
format: args.format,
sample_percentage: args.sample,
};
Profiler::generate(&engine, &options)
}

fn to_input_format(format: &FileFormat, sheet: Option<&str>) -> InputFormat {
match format {
FileFormat::Parquet => InputFormat::Parquet,
FileFormat::Csv { delimiter } => InputFormat::Csv {
delimiter: *delimiter,
},
FileFormat::Ndjson => InputFormat::Ndjson,
FileFormat::Excel { .. } => InputFormat::Excel {
sheet: sheet.unwrap_or("Sheet1").to_string(),
},
}
}

fn is_cloud_path(path: &str) -> bool {
path.starts_with("s3://")
|| path.starts_with("gs://")
|| path.starts_with("az://")
|| path.starts_with("abfss://")
}

#[cfg(test)]
mod tests {
use std::{
fs,
path::PathBuf,
time::{SystemTime, UNIX_EPOCH},
};

use crate::cli::ProfileFormat;

use super::*;

#[test]
fn profiles_csv_input_to_json_file() {
let stamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("clock after epoch")
.as_nanos();
let input = std::env::temp_dir().join(format!("dtoo-profile-input-{stamp}.csv"));
let output = std::env::temp_dir().join(format!("dtoo-profile-output-{stamp}.json"));
fs::write(&input, "id,name\n1,Ada\n2,Turing\n").expect("write input file");

let args = ProfileArgs {
path: PathBuf::from(&input),
format: ProfileFormat::Json,
output: Some(output.clone()),
sample: 100,
delimiter: ',',
s3_region: None,
s3_profile: None,
gcs_project: None,
azure_account: None,
};

run(&args).expect("profile command should succeed");
let report = fs::read_to_string(&output).expect("read profile output");
assert!(report.contains("\"row_count\": 2"));

let _ = fs::remove_file(input);
let _ = fs::remove_file(output);
}

#[test]
fn profiles_empty_csv_with_zero_rows() {
let stamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("clock after epoch")
.as_nanos();
let input = std::env::temp_dir().join(format!("dtoo-profile-empty-{stamp}.csv"));
let output = std::env::temp_dir().join(format!("dtoo-profile-empty-{stamp}.json"));
fs::write(&input, "id,name\n").expect("write input file");

let args = ProfileArgs {
path: PathBuf::from(&input),
format: ProfileFormat::Json,
output: Some(output.clone()),
sample: 100,
delimiter: ',',
s3_region: None,
s3_profile: None,
gcs_project: None,
azure_account: None,
};

run(&args).expect("profile command should succeed");
let report = fs::read_to_string(&output).expect("read profile output");
assert!(report.contains("\"row_count\": 0"));

let _ = fs::remove_file(input);
let _ = fs::remove_file(output);
}

#[test]
fn returns_file_not_found_for_missing_input() {
let args = ProfileArgs {
path: PathBuf::from("/tmp/definitely-missing-dtoo-file.csv"),
format: ProfileFormat::Json,
output: None,
sample: 100,
delimiter: ',',
s3_region: None,
s3_profile: None,
gcs_project: None,
azure_account: None,
};

let err = run(&args).expect_err("missing input should fail");
assert!(matches!(err, DtooError::FileNotFound { .. }));
}

#[test]
fn returns_output_error_for_unwritable_destination() {
let stamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("clock after epoch")
.as_nanos();
let input = std::env::temp_dir().join(format!("dtoo-profile-input-{stamp}.csv"));
fs::write(&input, "id,name\n1,Ada\n").expect("write input file");
let output = PathBuf::from(format!("/tmp/dtoo-missing-dir-{stamp}/report.json"));

let args = ProfileArgs {
path: PathBuf::from(&input),
format: ProfileFormat::Json,
output: Some(output),
sample: 100,
delimiter: ',',
s3_region: None,
s3_profile: None,
gcs_project: None,
azure_account: None,
};

let err = run(&args).expect_err("missing output directory should fail");
assert!(matches!(err, DtooError::Output { .. }));

let _ = fs::remove_file(input);
}

#[test]
fn resolver_supports_excel_colon_sheet_syntax() {
let stamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("clock after epoch")
.as_nanos();
let excel = std::env::temp_dir().join(format!("dtoo-profile-{stamp}.xlsx"));
fs::write(&excel, "stub").expect("write excel stub");

let resolver = FileResolver::new(FileResolverConfig {
glob: None,
paths: vec![format!("{}:Sheet2", excel.display())],
pipe_mode: None::<PipeMode>,
stdin_format: None::<StdinFormat>,
sheet: None,
exclude: Vec::new(),
delimiter: ',',
on_error: OnErrorMode::Fail,
s3_region: None,
s3_profile: None,
gcs_project: None,
azure_account: None,
});

let files = resolver.resolve().expect("resolve should succeed");
assert_eq!(files.len(), 1);
assert_eq!(files[0].sheet.as_deref(), Some("Sheet2"));

let _ = fs::remove_file(excel);
}
}
Loading