diff --git a/Cargo.lock b/Cargo.lock index 0823979dd7e..d035e24b51b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9685,6 +9685,7 @@ dependencies = [ "bytes", "bzip2", "clap", + "flate2", "futures", "get_dir", "glob", @@ -9716,6 +9717,7 @@ dependencies = [ "url", "uuid", "vortex", + "vortex-json", "vortex-tensor", ] diff --git a/Cargo.toml b/Cargo.toml index b8ef2179541..fc5cce72717 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -159,6 +159,7 @@ enum-iterator = "2.0.0" env_logger = "0.11" fastlanes = "0.5.1" flatbuffers = "25.2.10" +flate2 = "1.1" fsst-rs = "0.5.11" futures = { version = "0.3.31", default-features = false } fuzzy-matcher = "0.3" diff --git a/benchmarks/compress-bench/src/main.rs b/benchmarks/compress-bench/src/main.rs index 8d9fa73f915..b273b4aae6a 100644 --- a/benchmarks/compress-bench/src/main.rs +++ b/benchmarks/compress-bench/src/main.rs @@ -25,6 +25,7 @@ use vortex_bench::compress::benchmark_decompress; use vortex_bench::compress::calculate_ratios; use vortex_bench::create_output_writer; use vortex_bench::datasets::Dataset; +use vortex_bench::datasets::jsonbench::JsonBench; use vortex_bench::datasets::struct_list_of_ints::StructListOfInts; use vortex_bench::datasets::taxi_data::TaxiData; use vortex_bench::datasets::tpch_l_comment::TPCHLCommentCanonical; @@ -161,6 +162,7 @@ async fn run_compress( &TPCHLCommentCanonical, &DownloadableDataset::RPlace, &DownloadableDataset::AirQuality, + &JsonBench, ] .into_iter() .chain(structlistofints.iter().map(|d| d as &dyn Dataset)) @@ -246,6 +248,7 @@ async fn run_benchmark_for_dataset( CompressOp::Compress => { let result = benchmark_compress( compressor.as_ref(), + dataset_handle, &parquet_path, iterations, bench_name, @@ -277,6 +280,7 @@ async fn run_benchmark_for_dataset( CompressOp::Decompress => { let result = benchmark_decompress( compressor.as_ref(), + dataset_handle, &parquet_path, iterations, bench_name, diff --git a/benchmarks/compress-bench/src/parquet.rs b/benchmarks/compress-bench/src/parquet.rs index cab4f753948..dde37cde6f6 100644 --- a/benchmarks/compress-bench/src/parquet.rs +++ b/benchmarks/compress-bench/src/parquet.rs @@ -21,6 +21,7 @@ use parquet::file::properties::WriterProperties; use vortex_bench::Format; use vortex_bench::compress::Compressor; use vortex_bench::compress::read_projection; +use vortex_bench::datasets::Dataset; /// Compressor implementation for Parquet format with ZSTD compression. pub struct ParquetCompressor { @@ -51,7 +52,11 @@ impl Compressor for ParquetCompressor { Format::Parquet } - async fn compress(&self, parquet_path: &Path) -> anyhow::Result<(u64, Duration)> { + async fn compress( + &self, + _dataset: &dyn Dataset, + parquet_path: &Path, + ) -> anyhow::Result<(u64, Duration)> { // Read the input parquet file let file = File::open(parquet_path)?; let builder = ParquetRecordBatchReaderBuilder::try_new(file)?; @@ -67,7 +72,11 @@ impl Compressor for ParquetCompressor { Ok((size as u64, elapsed)) } - async fn decompress(&self, parquet_path: &Path) -> anyhow::Result { + async fn decompress( + &self, + _dataset: &dyn Dataset, + parquet_path: &Path, + ) -> anyhow::Result { // First compress to get the bytes we'll decompress let file = File::open(parquet_path)?; let builder = ParquetRecordBatchReaderBuilder::try_new(file)?; diff --git a/benchmarks/compress-bench/src/vortex.rs b/benchmarks/compress-bench/src/vortex.rs index 87416cdf924..0884109ddb4 100644 --- a/benchmarks/compress-bench/src/vortex.rs +++ b/benchmarks/compress-bench/src/vortex.rs @@ -13,6 +13,8 @@ use bytes::Bytes; use futures::StreamExt; use futures::pin_mut; use vortex::array::IntoArray; +use vortex::array::VortexSessionExecute; +use vortex::array::arrow::ArrowSessionExt; use vortex::dtype::FieldNames; use vortex::expr::root; use vortex::expr::select; @@ -22,7 +24,7 @@ use vortex_bench::Format; use vortex_bench::SESSION; use vortex_bench::compress::Compressor; use vortex_bench::compress::read_projection; -use vortex_bench::conversions::parquet_to_vortex_chunks; +use vortex_bench::datasets::Dataset; /// Compressor implementation for Vortex format. pub struct VortexCompressor; @@ -33,9 +35,15 @@ impl Compressor for VortexCompressor { Format::OnDiskVortex } - async fn compress(&self, parquet_path: &Path) -> Result<(u64, Duration)> { - // Read the parquet file as an array stream - let uncompressed = parquet_to_vortex_chunks(parquet_path.to_path_buf()).await?; + async fn compress( + &self, + dataset: &dyn Dataset, + parquet_path: &Path, + ) -> Result<(u64, Duration)> { + let mut ctx = SESSION.create_execution_ctx(); + let uncompressed = dataset + .to_vortex_compression_array(&mut ctx, parquet_path) + .await?; let mut buf = Vec::new(); let start = Instant::now(); @@ -49,9 +57,12 @@ impl Compressor for VortexCompressor { Ok((buf.len() as u64, elapsed)) } - async fn decompress(&self, parquet_path: &Path) -> Result { + async fn decompress(&self, dataset: &dyn Dataset, parquet_path: &Path) -> Result { // First compress to get the bytes we'll decompress - let uncompressed = parquet_to_vortex_chunks(parquet_path.to_path_buf()).await?; + let mut ctx = SESSION.create_execution_ctx(); + let uncompressed = dataset + .to_vortex_compression_array(&mut ctx, parquet_path) + .await?; let mut buf = Vec::new(); let mut cursor = Cursor::new(&mut buf); SESSION @@ -72,7 +83,7 @@ impl Compressor for VortexCompressor { let names: FieldNames = cols.iter().map(|i| i.to_string()).collect(); scan = scan.with_projection(select(names, root())); } - let schema = Arc::new(scan.dtype()?.to_arrow_schema()?); + let schema = Arc::new(SESSION.arrow().to_arrow_schema(&scan.dtype()?)?); let stream = scan.into_record_batch_stream(schema)?; pin_mut!(stream); diff --git a/benchmarks/lance-bench/src/compress.rs b/benchmarks/lance-bench/src/compress.rs index 4d8df8f24dd..2fb897bcb70 100644 --- a/benchmarks/lance-bench/src/compress.rs +++ b/benchmarks/lance-bench/src/compress.rs @@ -21,6 +21,7 @@ use tempfile::TempDir; use vortex_bench::Format; use vortex_bench::compress::Compressor; use vortex_bench::compress::read_projection; +use vortex_bench::datasets::Dataset as BenchDataset; use crate::convert::convert_utf8view_batch; use crate::convert::convert_utf8view_schema; @@ -96,7 +97,11 @@ impl Compressor for LanceCompressor { Format::Lance } - async fn compress(&self, parquet_path: &Path) -> anyhow::Result<(u64, Duration)> { + async fn compress( + &self, + _dataset: &dyn BenchDataset, + parquet_path: &Path, + ) -> anyhow::Result<(u64, Duration)> { // Read the input parquet file let file = File::open(parquet_path)?; let builder = ParquetRecordBatchReaderBuilder::try_new(file)?; @@ -135,7 +140,11 @@ impl Compressor for LanceCompressor { Ok((size, elapsed)) } - async fn decompress(&self, parquet_path: &Path) -> anyhow::Result { + async fn decompress( + &self, + _dataset: &dyn BenchDataset, + parquet_path: &Path, + ) -> anyhow::Result { // First compress to get the Lance dataset let file = File::open(parquet_path)?; let builder = ParquetRecordBatchReaderBuilder::try_new(file)?; diff --git a/vortex-bench/Cargo.toml b/vortex-bench/Cargo.toml index fc1766c599f..c19a5ee10d4 100644 --- a/vortex-bench/Cargo.toml +++ b/vortex-bench/Cargo.toml @@ -23,6 +23,7 @@ vortex = { workspace = true, features = [ "tokio", "zstd", ] } +vortex-json = { workspace = true } vortex-tensor = { workspace = true } # TODO(connor): In the future, this might be inside vortex. anyhow = { workspace = true } @@ -33,6 +34,7 @@ async-trait = { workspace = true } bytes = { workspace = true } bzip2 = { workspace = true } clap = { workspace = true, features = ["derive"] } +flate2 = { workspace = true } futures = { workspace = true } get_dir = { workspace = true } glob = { workspace = true } diff --git a/vortex-bench/src/compress/mod.rs b/vortex-bench/src/compress/mod.rs index 6e261b871d2..124c61ec65f 100644 --- a/vortex-bench/src/compress/mod.rs +++ b/vortex-bench/src/compress/mod.rs @@ -13,6 +13,7 @@ use serde::Serialize; use vortex::utils::aliases::hash_map::HashMap; use crate::Format; +use crate::datasets::Dataset; use crate::measurements::CompressionTimingMeasurement; use crate::measurements::CustomUnitMeasurement; @@ -115,11 +116,12 @@ pub trait Compressor: Send + Sync { /// The format this compressor handles. fn format(&self) -> Format; - /// Compress data from a Parquet file, returning the compressed size in bytes and elapsed time. + /// Compress data from a dataset, returning the compressed size in bytes and elapsed time. /// - /// The implementation should read the Parquet file and compress it - /// to the target format. - async fn compress(&self, parquet_path: &Path) -> Result<(u64, Duration)>; + /// Most implementations read the Parquet path. Vortex can instead use dataset-specific + /// Vortex array semantics via [`Dataset::to_vortex_compression_array`]. + async fn compress(&self, dataset: &dyn Dataset, parquet_path: &Path) + -> Result<(u64, Duration)>; /// Decompress data from the Parquet file (after compressing), returning the decompressed size. /// @@ -128,7 +130,7 @@ pub trait Compressor: Send + Sync { /// /// Format implementations apply the fixed wide-table read projection when the input schema /// matches the projection benchmark. - async fn decompress(&self, parquet_path: &Path) -> Result; + async fn decompress(&self, dataset: &dyn Dataset, parquet_path: &Path) -> Result; } /// Run a compression benchmark for the given compressor. @@ -136,6 +138,7 @@ pub trait Compressor: Send + Sync { /// Executes compression `iterations` times and returns timing statistics. pub async fn benchmark_compress( compressor: &dyn Compressor, + dataset: &dyn Dataset, parquet_path: &Path, iterations: usize, bench_name: &str, @@ -146,7 +149,7 @@ pub async fn benchmark_compress( let mut all_runs = Vec::with_capacity(iterations); for _ in 0..iterations { - let (size, elapsed) = compressor.compress(parquet_path).await?; + let (size, elapsed) = compressor.compress(dataset, parquet_path).await?; compressed_size = size; fastest = fastest.min(elapsed); @@ -180,6 +183,7 @@ pub async fn benchmark_compress( /// Benchmarks decompression `iterations` times. pub async fn benchmark_decompress( compressor: &dyn Compressor, + dataset: &dyn Dataset, parquet_path: &Path, iterations: usize, bench_name: &str, @@ -189,7 +193,7 @@ pub async fn benchmark_decompress( let mut all_runs = Vec::with_capacity(iterations); for _ in 0..iterations { - let elapsed = compressor.decompress(parquet_path).await?; + let elapsed = compressor.decompress(dataset, parquet_path).await?; fastest = fastest.min(elapsed); all_runs.push(elapsed); diff --git a/vortex-bench/src/datasets/jsonbench.rs b/vortex-bench/src/datasets/jsonbench.rs new file mode 100644 index 00000000000..047ad219455 --- /dev/null +++ b/vortex-bench/src/datasets/jsonbench.rs @@ -0,0 +1,230 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! [`Dataset`] definition for the [JSONBench] dataset. +//! +//! The dataset has up to 1000 files, each with 1 million JSON lines. This setup only runs it for a single file. +//! +//! [JSONBench]: https://jsonbench.com/ + +use std::fs::File; +use std::io::Read; +use std::path::Path; +use std::path::PathBuf; +use std::sync::Arc; + +use arrow_array::RecordBatch; +use arrow_array::StringArray; +use arrow_schema::DataType; +use arrow_schema::Field; +use arrow_schema::Schema; +use async_trait::async_trait; +use flate2::read::GzDecoder; +use parquet::arrow::ArrowWriter; +use tokio::fs::File as TokioFile; +use vortex::array::ArrayRef; +use vortex::array::EmptyMetadata; +use vortex::array::ExecutionCtx; +use vortex::array::IntoArray; +use vortex::array::VortexSessionExecute; +use vortex::array::arrays::ChunkedArray; +use vortex::array::arrays::ExtensionArray; +use vortex::array::arrays::Struct; +use vortex::array::arrays::StructArray; +use vortex::array::arrays::chunked::ChunkedArrayExt; +use vortex::array::arrays::struct_::StructArrayExt; +use vortex::array::stream::ArrayStreamExt; +use vortex::array::validity::Validity; +use vortex::dtype::FieldNames; +use vortex::dtype::extension::ExtDType; +use vortex::file::OpenOptionsSessionExt; +use vortex::file::WriteOptionsSessionExt; +use vortex::io::VortexWrite; +use vortex_json::Json; + +use crate::IdempotentPath; +use crate::SESSION; +use crate::conversions::parquet_to_vortex_chunks; +use crate::datasets::Dataset; +use crate::datasets::data_downloads::download_data; +use crate::idempotent_async; + +const JSONBENCH_URL: &str = + "https://clickhouse-public-datasets.s3.amazonaws.com/bluesky/file_0001.json.gz"; +const JSONBENCH_SOURCE_PATH: &str = "json_bench/data.json.gz"; +const JSONBENCH_PARQUET_PATH: &str = "json_bench/data.parquet"; + +pub struct JsonBench; + +#[async_trait] +impl Dataset for JsonBench { + fn name(&self) -> &str { + "jsonbench" + } + + async fn to_vortex_array(&self, _ctx: &mut ExecutionCtx) -> anyhow::Result { + let vortex_path = idempotent_async("json_bench/data.vortex", |temp_path| async move { + let mut output_file = TokioFile::create(&temp_path).await?; + let parquet_path = self.to_parquet_path().await?; + let mut ctx = SESSION.create_execution_ctx(); + let data = self + .to_vortex_compression_array(&mut ctx, &parquet_path) + .await?; + + SESSION + .write_options() + .write(&mut output_file, data.to_array_stream()) + .await?; + output_file.flush().await?; + Ok(temp_path) + }) + .await?; + + Ok(SESSION + .open_options() + .open_path(vortex_path) + .await? + .scan()? + .into_array_stream()? + .read_all() + .await?) + } + + async fn to_parquet_path(&self) -> anyhow::Result { + let json_data = download_data(JSONBENCH_SOURCE_PATH.to_data_path(), JSONBENCH_URL).await?; + + idempotent_async(JSONBENCH_PARQUET_PATH, |parquet_path| async move { + write_json_lines_as_parquet(&json_data, &parquet_path).await + }) + .await + } + + async fn to_vortex_compression_array( + &self, + _ctx: &mut ExecutionCtx, + parquet_path: &Path, + ) -> anyhow::Result { + json_extension_array_from_parquet(parquet_path).await + } +} + +async fn json_extension_array_from_parquet(parquet_path: &Path) -> anyhow::Result { + let data = parquet_to_vortex_chunks(parquet_path.to_path_buf()).await?; + let chunks = data + .iter_chunks() + .map(|chunk| { + let chunk = chunk.as_::(); + let storage = chunk.unmasked_field_by_name("data")?.clone(); + let ext_dtype = + ExtDType::::try_new(EmptyMetadata, storage.dtype().clone())?.erased(); + let data = ExtensionArray::new(ext_dtype, storage).into_array(); + Ok(StructArray::try_new( + FieldNames::from(["data"]), + vec![data], + chunk.len(), + Validity::NonNullable, + )? + .into_array()) + }) + .collect::>>()?; + + Ok(ChunkedArray::from_iter(chunks).into_array()) +} + +async fn write_json_lines_as_parquet(json_path: &Path, parquet_path: &Path) -> anyhow::Result<()> { + let compressed_json = tokio::fs::read(json_path).await?; + let mut json = String::new(); + GzDecoder::new(compressed_json.as_slice()).read_to_string(&mut json)?; + + let schema = Arc::new(Schema::new(vec![Field::new("data", DataType::Utf8, false)])); + let mut writer = ArrowWriter::try_new(File::create(parquet_path)?, Arc::clone(&schema), None)?; + let rows = json.lines().collect::>(); + let data = StringArray::from_iter_values(rows); + let batch = RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(data)])?; + writer.write(&batch)?; + writer.close()?; + Ok(()) +} + +#[cfg(test)] +mod tests { + use std::fs::File; + use std::io::Write; + + use arrow_array::RecordBatchReader; + use arrow_array::StringArray; + use arrow_schema::DataType; + use flate2::Compression; + use flate2::write::GzEncoder; + use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; + use vortex::array::arrays::Chunked; + + use super::*; + use crate::temp_download_filepath; + + #[tokio::test] + async fn writes_json_lines_as_single_string_column() -> anyhow::Result<()> { + let json_path = temp_download_filepath(); + let parquet_path = temp_download_filepath(); + let mut encoder = GzEncoder::new(File::create(&json_path)?, Compression::default()); + encoder + .write_all(b"{\"id\":1,\"message\":\"hello\"}\n{\"id\":2,\"message\":\"world\"}\n")?; + encoder.finish()?; + + write_json_lines_as_parquet(&json_path, &parquet_path).await?; + + let reader = + ParquetRecordBatchReaderBuilder::try_new(File::open(&parquet_path)?)?.build()?; + let schema = reader.schema(); + assert_eq!(schema.fields().len(), 1); + assert_eq!(schema.field(0).name(), "data"); + assert_eq!(schema.field(0).data_type(), &DataType::Utf8); + assert!(!schema.field(0).is_nullable()); + + let batches = reader.collect::, _>>()?; + assert_eq!(batches.len(), 1); + let data = batches[0].column(0).as_any().downcast_ref::(); + assert_eq!( + data.map(|array| array.value(0)), + Some("{\"id\":1,\"message\":\"hello\"}") + ); + assert_eq!( + data.map(|array| array.value(1)), + Some("{\"id\":2,\"message\":\"world\"}") + ); + + std::fs::remove_file(json_path)?; + std::fs::remove_file(parquet_path)?; + Ok(()) + } + + #[tokio::test] + async fn compression_array_uses_json_extension_dtype() -> anyhow::Result<()> { + let json_path = temp_download_filepath(); + let parquet_path = temp_download_filepath(); + let mut encoder = GzEncoder::new(File::create(&json_path)?, Compression::default()); + encoder.write_all(b"{\"id\":1}\n")?; + encoder.finish()?; + write_json_lines_as_parquet(&json_path, &parquet_path).await?; + + let mut ctx = SESSION.create_execution_ctx(); + let array = JsonBench + .to_vortex_compression_array(&mut ctx, &parquet_path) + .await?; + let chunked = array.as_::(); + let chunk = chunked.chunk(0).as_::(); + let ext_dtype = chunk + .unmasked_field_by_name("data")? + .dtype() + .as_extension_opt() + .ok_or_else(|| anyhow::anyhow!("expected JSON extension dtype"))? + .clone(); + ext_dtype + .try_downcast::() + .map_err(|_| anyhow::anyhow!("expected vortex_json::Json extension dtype"))?; + + std::fs::remove_file(json_path)?; + std::fs::remove_file(parquet_path)?; + Ok(()) + } +} diff --git a/vortex-bench/src/datasets/mod.rs b/vortex-bench/src/datasets/mod.rs index 3e72ba69e7f..dd6d3b6b082 100644 --- a/vortex-bench/src/datasets/mod.rs +++ b/vortex-bench/src/datasets/mod.rs @@ -2,6 +2,7 @@ // SPDX-FileCopyrightText: Copyright the Vortex contributors use std::fmt::Display; +use std::path::Path; use anyhow::Result; use async_trait::async_trait; @@ -9,11 +10,14 @@ use serde::Deserialize; use serde::Serialize; use vortex::array::ArrayRef; use vortex::array::ExecutionCtx; +use vortex::array::IntoArray; use crate::clickbench::Flavor; +use crate::conversions::parquet_to_vortex_chunks; pub mod data_downloads; pub mod feature_vectors; +pub mod jsonbench; pub mod nested_lists; pub mod nested_structs; pub mod struct_list_of_ints; @@ -34,7 +38,7 @@ pub(crate) fn normalize_benchmark_runner_id(benchmark_runner: &str) -> String { } #[async_trait] -pub trait Dataset { +pub trait Dataset: Send + Sync { fn name(&self) -> &str; /// Map this dataset to the v3 `(dataset, dataset_variant)` pair emitted @@ -50,6 +54,21 @@ pub trait Dataset { async fn to_vortex_array(&self, ctx: &mut ExecutionCtx) -> Result; + /// Build the uncompressed Vortex array used by the compression benchmark. + /// + /// The default keeps the historical compression-benchmark behavior: read the generated + /// Parquet file and convert it to Vortex arrays. Datasets can override this when their + /// Vortex benchmark input has semantics not representable in Parquet alone. + async fn to_vortex_compression_array( + &self, + _ctx: &mut ExecutionCtx, + parquet_path: &Path, + ) -> Result { + Ok(parquet_to_vortex_chunks(parquet_path.to_path_buf()) + .await? + .into_array()) + } + /// Get the path to the parquet file for this dataset. /// /// This method ensures the parquet file exists (downloading if necessary) diff --git a/vortex-bench/src/lib.rs b/vortex-bench/src/lib.rs index 30ff45c97a8..040639a98de 100644 --- a/vortex-bench/src/lib.rs +++ b/vortex-bench/src/lib.rs @@ -72,8 +72,11 @@ use vortex::session::VortexSession; #[global_allocator] static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; -pub static SESSION: LazyLock = - LazyLock::new(|| VortexSession::default().with_tokio()); +pub static SESSION: LazyLock = LazyLock::new(|| { + let session = VortexSession::default().with_tokio(); + vortex_json::initialize(&session); + session +}); #[derive(Clone, Copy, Debug, Hash, PartialEq, Eq, Serialize, Deserialize)] pub struct Target {