Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ enum-iterator = "2.0.0"
env_logger = "0.11"
fastlanes = "0.5.1"
flatbuffers = "25.2.10"
flate2 = "1.1"
fsst-rs = "0.5.11"
futures = { version = "0.3.31", default-features = false }
fuzzy-matcher = "0.3"
Expand Down
4 changes: 4 additions & 0 deletions benchmarks/compress-bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use vortex_bench::compress::benchmark_decompress;
use vortex_bench::compress::calculate_ratios;
use vortex_bench::create_output_writer;
use vortex_bench::datasets::Dataset;
use vortex_bench::datasets::jsonbench::JsonBench;
use vortex_bench::datasets::struct_list_of_ints::StructListOfInts;
use vortex_bench::datasets::taxi_data::TaxiData;
use vortex_bench::datasets::tpch_l_comment::TPCHLCommentCanonical;
Expand Down Expand Up @@ -161,6 +162,7 @@ async fn run_compress(
&TPCHLCommentCanonical,
&DownloadableDataset::RPlace,
&DownloadableDataset::AirQuality,
&JsonBench,
]
.into_iter()
.chain(structlistofints.iter().map(|d| d as &dyn Dataset))
Expand Down Expand Up @@ -246,6 +248,7 @@ async fn run_benchmark_for_dataset(
CompressOp::Compress => {
let result = benchmark_compress(
compressor.as_ref(),
dataset_handle,
&parquet_path,
iterations,
bench_name,
Expand Down Expand Up @@ -277,6 +280,7 @@ async fn run_benchmark_for_dataset(
CompressOp::Decompress => {
let result = benchmark_decompress(
compressor.as_ref(),
dataset_handle,
&parquet_path,
iterations,
bench_name,
Expand Down
13 changes: 11 additions & 2 deletions benchmarks/compress-bench/src/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use parquet::file::properties::WriterProperties;
use vortex_bench::Format;
use vortex_bench::compress::Compressor;
use vortex_bench::compress::read_projection;
use vortex_bench::datasets::Dataset;

/// Compressor implementation for Parquet format with ZSTD compression.
pub struct ParquetCompressor {
Expand Down Expand Up @@ -51,7 +52,11 @@ impl Compressor for ParquetCompressor {
Format::Parquet
}

async fn compress(&self, parquet_path: &Path) -> anyhow::Result<(u64, Duration)> {
async fn compress(
&self,
_dataset: &dyn Dataset,
parquet_path: &Path,
) -> anyhow::Result<(u64, Duration)> {
// Read the input parquet file
let file = File::open(parquet_path)?;
let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
Expand All @@ -67,7 +72,11 @@ impl Compressor for ParquetCompressor {
Ok((size as u64, elapsed))
}

async fn decompress(&self, parquet_path: &Path) -> anyhow::Result<Duration> {
async fn decompress(
&self,
_dataset: &dyn Dataset,
parquet_path: &Path,
) -> anyhow::Result<Duration> {
// First compress to get the bytes we'll decompress
let file = File::open(parquet_path)?;
let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
Expand Down
25 changes: 18 additions & 7 deletions benchmarks/compress-bench/src/vortex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ use bytes::Bytes;
use futures::StreamExt;
use futures::pin_mut;
use vortex::array::IntoArray;
use vortex::array::VortexSessionExecute;
use vortex::array::arrow::ArrowSessionExt;
use vortex::dtype::FieldNames;
use vortex::expr::root;
use vortex::expr::select;
Expand All @@ -22,7 +24,7 @@ use vortex_bench::Format;
use vortex_bench::SESSION;
use vortex_bench::compress::Compressor;
use vortex_bench::compress::read_projection;
use vortex_bench::conversions::parquet_to_vortex_chunks;
use vortex_bench::datasets::Dataset;

/// Compressor implementation for Vortex format.
pub struct VortexCompressor;
Expand All @@ -33,9 +35,15 @@ impl Compressor for VortexCompressor {
Format::OnDiskVortex
}

async fn compress(&self, parquet_path: &Path) -> Result<(u64, Duration)> {
// Read the parquet file as an array stream
let uncompressed = parquet_to_vortex_chunks(parquet_path.to_path_buf()).await?;
async fn compress(
&self,
dataset: &dyn Dataset,
parquet_path: &Path,
) -> Result<(u64, Duration)> {
let mut ctx = SESSION.create_execution_ctx();
let uncompressed = dataset
.to_vortex_compression_array(&mut ctx, parquet_path)
.await?;

let mut buf = Vec::new();
let start = Instant::now();
Expand All @@ -49,9 +57,12 @@ impl Compressor for VortexCompressor {
Ok((buf.len() as u64, elapsed))
}

async fn decompress(&self, parquet_path: &Path) -> Result<Duration> {
async fn decompress(&self, dataset: &dyn Dataset, parquet_path: &Path) -> Result<Duration> {
// First compress to get the bytes we'll decompress
let uncompressed = parquet_to_vortex_chunks(parquet_path.to_path_buf()).await?;
let mut ctx = SESSION.create_execution_ctx();
let uncompressed = dataset
.to_vortex_compression_array(&mut ctx, parquet_path)
.await?;
let mut buf = Vec::new();
let mut cursor = Cursor::new(&mut buf);
SESSION
Expand All @@ -72,7 +83,7 @@ impl Compressor for VortexCompressor {
let names: FieldNames = cols.iter().map(|i| i.to_string()).collect();
scan = scan.with_projection(select(names, root()));
}
let schema = Arc::new(scan.dtype()?.to_arrow_schema()?);
let schema = Arc::new(SESSION.arrow().to_arrow_schema(&scan.dtype()?)?);

let stream = scan.into_record_batch_stream(schema)?;
pin_mut!(stream);
Expand Down
13 changes: 11 additions & 2 deletions benchmarks/lance-bench/src/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use tempfile::TempDir;
use vortex_bench::Format;
use vortex_bench::compress::Compressor;
use vortex_bench::compress::read_projection;
use vortex_bench::datasets::Dataset as BenchDataset;

use crate::convert::convert_utf8view_batch;
use crate::convert::convert_utf8view_schema;
Expand Down Expand Up @@ -96,7 +97,11 @@ impl Compressor for LanceCompressor {
Format::Lance
}

async fn compress(&self, parquet_path: &Path) -> anyhow::Result<(u64, Duration)> {
async fn compress(
&self,
_dataset: &dyn BenchDataset,
parquet_path: &Path,
) -> anyhow::Result<(u64, Duration)> {
// Read the input parquet file
let file = File::open(parquet_path)?;
let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
Expand Down Expand Up @@ -135,7 +140,11 @@ impl Compressor for LanceCompressor {
Ok((size, elapsed))
}

async fn decompress(&self, parquet_path: &Path) -> anyhow::Result<Duration> {
async fn decompress(
&self,
_dataset: &dyn BenchDataset,
parquet_path: &Path,
) -> anyhow::Result<Duration> {
// First compress to get the Lance dataset
let file = File::open(parquet_path)?;
let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
Expand Down
2 changes: 2 additions & 0 deletions vortex-bench/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ vortex = { workspace = true, features = [
"tokio",
"zstd",
] }
vortex-json = { workspace = true }
vortex-tensor = { workspace = true } # TODO(connor): In the future, this might be inside vortex.

anyhow = { workspace = true }
Expand All @@ -33,6 +34,7 @@ async-trait = { workspace = true }
bytes = { workspace = true }
bzip2 = { workspace = true }
clap = { workspace = true, features = ["derive"] }
flate2 = { workspace = true }
futures = { workspace = true }
get_dir = { workspace = true }
glob = { workspace = true }
Expand Down
18 changes: 11 additions & 7 deletions vortex-bench/src/compress/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use serde::Serialize;
use vortex::utils::aliases::hash_map::HashMap;

use crate::Format;
use crate::datasets::Dataset;
use crate::measurements::CompressionTimingMeasurement;
use crate::measurements::CustomUnitMeasurement;

Expand Down Expand Up @@ -115,11 +116,12 @@ pub trait Compressor: Send + Sync {
/// The format this compressor handles.
fn format(&self) -> Format;

/// Compress data from a Parquet file, returning the compressed size in bytes and elapsed time.
/// Compress data from a dataset, returning the compressed size in bytes and elapsed time.
///
/// The implementation should read the Parquet file and compress it
/// to the target format.
async fn compress(&self, parquet_path: &Path) -> Result<(u64, Duration)>;
/// Most implementations read the Parquet path. Vortex can instead use dataset-specific
/// Vortex array semantics via [`Dataset::to_vortex_compression_array`].
async fn compress(&self, dataset: &dyn Dataset, parquet_path: &Path)
-> Result<(u64, Duration)>;

/// Decompress data from the Parquet file (after compressing), returning the decompressed size.
///
Expand All @@ -128,14 +130,15 @@ pub trait Compressor: Send + Sync {
///
/// Format implementations apply the fixed wide-table read projection when the input schema
/// matches the projection benchmark.
async fn decompress(&self, parquet_path: &Path) -> Result<Duration>;
async fn decompress(&self, dataset: &dyn Dataset, parquet_path: &Path) -> Result<Duration>;
}

/// Run a compression benchmark for the given compressor.
///
/// Executes compression `iterations` times and returns timing statistics.
pub async fn benchmark_compress(
compressor: &dyn Compressor,
dataset: &dyn Dataset,
parquet_path: &Path,
iterations: usize,
bench_name: &str,
Expand All @@ -146,7 +149,7 @@ pub async fn benchmark_compress(
let mut all_runs = Vec::with_capacity(iterations);

for _ in 0..iterations {
let (size, elapsed) = compressor.compress(parquet_path).await?;
let (size, elapsed) = compressor.compress(dataset, parquet_path).await?;

compressed_size = size;
fastest = fastest.min(elapsed);
Expand Down Expand Up @@ -180,6 +183,7 @@ pub async fn benchmark_compress(
/// Benchmarks decompression `iterations` times.
pub async fn benchmark_decompress(
compressor: &dyn Compressor,
dataset: &dyn Dataset,
parquet_path: &Path,
iterations: usize,
bench_name: &str,
Expand All @@ -189,7 +193,7 @@ pub async fn benchmark_decompress(
let mut all_runs = Vec::with_capacity(iterations);

for _ in 0..iterations {
let elapsed = compressor.decompress(parquet_path).await?;
let elapsed = compressor.decompress(dataset, parquet_path).await?;

fastest = fastest.min(elapsed);
all_runs.push(elapsed);
Expand Down
Loading
Loading