From de3e543c45a7ab612a7e1e4f2ed7249d092be5e7 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Tue, 24 Feb 2026 12:53:08 -0500 Subject: [PATCH 1/5] Extract changse from reader_perf. --- .../src/arrow/caching_delete_file_loader.rs | 21 ++- .../iceberg/src/arrow/delete_file_loader.rs | 4 +- crates/iceberg/src/arrow/delete_filter.rs | 7 + crates/iceberg/src/arrow/reader.rs | 29 +++- crates/iceberg/src/scan/context.rs | 1 + crates/iceberg/src/scan/mod.rs | 156 +++--------------- crates/iceberg/src/scan/task.rs | 7 + 7 files changed, 85 insertions(+), 140 deletions(-) diff --git a/crates/iceberg/src/arrow/caching_delete_file_loader.rs b/crates/iceberg/src/arrow/caching_delete_file_loader.rs index 5d0b1da712..dfee7ed87b 100644 --- a/crates/iceberg/src/arrow/caching_delete_file_loader.rs +++ b/crates/iceberg/src/arrow/caching_delete_file_loader.rs @@ -235,7 +235,7 @@ impl CachingDeleteFileLoader { PosDelLoadAction::Load => Ok(DeleteFileContext::PosDels { file_path: task.file_path.clone(), stream: basic_delete_file_loader - .parquet_to_batch_stream(&task.file_path) + .parquet_to_batch_stream(&task.file_path, task.file_size_in_bytes) .await?, }), } @@ -254,7 +254,7 @@ impl CachingDeleteFileLoader { let equality_ids_vec = task.equality_ids.clone().unwrap(); let evolved_stream = BasicDeleteFileLoader::evolve_schema( basic_delete_file_loader - .parquet_to_batch_stream(&task.file_path) + .parquet_to_batch_stream(&task.file_path, task.file_size_in_bytes) .await?, schema, &equality_ids_vec, @@ -614,7 +614,10 @@ mod tests { let basic_delete_file_loader = BasicDeleteFileLoader::new(file_io.clone()); let record_batch_stream = basic_delete_file_loader - .parquet_to_batch_stream(&eq_delete_file_path) + .parquet_to_batch_stream( + &eq_delete_file_path, + std::fs::metadata(&eq_delete_file_path).unwrap().len(), + ) .await .expect("could not get batch stream"); @@ -811,7 +814,10 @@ mod tests { let basic_delete_file_loader = BasicDeleteFileLoader::new(file_io.clone()); let batch_stream = basic_delete_file_loader - .parquet_to_batch_stream(&delete_file_path) + .parquet_to_batch_stream( + &delete_file_path, + std::fs::metadata(&delete_file_path).unwrap().len(), + ) .await .unwrap(); @@ -913,7 +919,8 @@ mod tests { // Create FileScanTask with BOTH positional and equality deletes let pos_del = FileScanTaskDeleteFile { - file_path: pos_del_path, + file_path: pos_del_path.clone(), + file_size_in_bytes: std::fs::metadata(&pos_del_path).unwrap().len(), file_type: DataContentType::PositionDeletes, partition_spec_id: 0, equality_ids: None, @@ -921,12 +928,14 @@ mod tests { let eq_del = FileScanTaskDeleteFile { file_path: eq_delete_path.clone(), + file_size_in_bytes: std::fs::metadata(&eq_delete_path).unwrap().len(), file_type: DataContentType::EqualityDeletes, partition_spec_id: 0, equality_ids: Some(vec![2, 3]), // Only use field IDs that exist in both schemas }; let file_scan_task = FileScanTask { + file_size_in_bytes: 0, start: 0, length: 0, record_count: None, @@ -993,7 +1002,7 @@ mod tests { let basic_delete_file_loader = BasicDeleteFileLoader::new(file_io.clone()); let record_batch_stream = basic_delete_file_loader - .parquet_to_batch_stream(&path) + .parquet_to_batch_stream(&path, std::fs::metadata(&path).unwrap().len()) .await .expect("could not get batch stream"); diff --git a/crates/iceberg/src/arrow/delete_file_loader.rs b/crates/iceberg/src/arrow/delete_file_loader.rs index fa47076fef..4998287876 100644 --- a/crates/iceberg/src/arrow/delete_file_loader.rs +++ b/crates/iceberg/src/arrow/delete_file_loader.rs @@ -54,6 +54,7 @@ impl BasicDeleteFileLoader { pub(crate) async fn parquet_to_batch_stream( &self, data_file_path: &str, + file_size_in_bytes: u64, ) -> Result { /* Essentially a super-cut-down ArrowReader. We can't use ArrowReader directly @@ -65,6 +66,7 @@ impl BasicDeleteFileLoader { false, None, None, + file_size_in_bytes, ) .await? .build()? @@ -102,7 +104,7 @@ impl DeleteFileLoader for BasicDeleteFileLoader { task: &FileScanTaskDeleteFile, schema: SchemaRef, ) -> Result { - let raw_batch_stream = self.parquet_to_batch_stream(&task.file_path).await?; + let raw_batch_stream = self.parquet_to_batch_stream(&task.file_path, task.file_size_in_bytes).await?; // For equality deletes, only evolve the equality_ids columns. // For positional deletes (equality_ids is None), use all field IDs. diff --git a/crates/iceberg/src/arrow/delete_filter.rs b/crates/iceberg/src/arrow/delete_filter.rs index 4af9f6b6ff..ecc28438f8 100644 --- a/crates/iceberg/src/arrow/delete_filter.rs +++ b/crates/iceberg/src/arrow/delete_filter.rs @@ -378,6 +378,7 @@ pub(crate) mod tests { let pos_del_1 = FileScanTaskDeleteFile { file_path: format!("{}/pos-del-1.parquet", table_location.to_str().unwrap()), + file_size_in_bytes: std::fs::metadata(format!("{}/pos-del-1.parquet", table_location.to_str().unwrap())).unwrap().len(), file_type: DataContentType::PositionDeletes, partition_spec_id: 0, equality_ids: None, @@ -385,6 +386,7 @@ pub(crate) mod tests { let pos_del_2 = FileScanTaskDeleteFile { file_path: format!("{}/pos-del-2.parquet", table_location.to_str().unwrap()), + file_size_in_bytes: std::fs::metadata(format!("{}/pos-del-2.parquet", table_location.to_str().unwrap())).unwrap().len(), file_type: DataContentType::PositionDeletes, partition_spec_id: 0, equality_ids: None, @@ -392,6 +394,7 @@ pub(crate) mod tests { let pos_del_3 = FileScanTaskDeleteFile { file_path: format!("{}/pos-del-3.parquet", table_location.to_str().unwrap()), + file_size_in_bytes: std::fs::metadata(format!("{}/pos-del-3.parquet", table_location.to_str().unwrap())).unwrap().len(), file_type: DataContentType::PositionDeletes, partition_spec_id: 0, equality_ids: None, @@ -399,6 +402,7 @@ pub(crate) mod tests { let file_scan_tasks = vec![ FileScanTask { + file_size_in_bytes: 0, start: 0, length: 0, record_count: None, @@ -414,6 +418,7 @@ pub(crate) mod tests { case_sensitive: false, }, FileScanTask { + file_size_in_bytes: 0, start: 0, length: 0, record_count: None, @@ -464,6 +469,7 @@ pub(crate) mod tests { // ---------- fake FileScanTask ---------- let task = FileScanTask { + file_size_in_bytes: 0, start: 0, length: 0, record_count: None, @@ -474,6 +480,7 @@ pub(crate) mod tests { predicate: None, deletes: vec![FileScanTaskDeleteFile { file_path: "eq-del.parquet".to_string(), + file_size_in_bytes: 1, // never read; this test fails before opening the file file_type: DataContentType::EqualityDeletes, partition_spec_id: 0, equality_ids: None, diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index c4c2fa0036..42f0cb61ee 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -33,7 +33,7 @@ use arrow_string::like::starts_with; use bytes::Bytes; use fnv::FnvHashSet; use futures::future::BoxFuture; -use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt, try_join}; +use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt}; use parquet::arrow::arrow_reader::{ ArrowPredicateFn, ArrowReaderOptions, RowFilter, RowSelection, RowSelector, }; @@ -237,6 +237,7 @@ impl ArrowReader { should_load_page_index, None, metadata_size_hint, + task.file_size_in_bytes, ) .await?; @@ -290,6 +291,7 @@ impl ArrowReader { should_load_page_index, Some(options), metadata_size_hint, + task.file_size_in_bytes, ) .await? } else { @@ -494,13 +496,14 @@ impl ArrowReader { should_load_page_index: bool, arrow_reader_options: Option, metadata_size_hint: Option, + file_size_in_bytes: u64, ) -> Result> { // Get the metadata for the Parquet file we need to read and build // a reader for the data within let parquet_file = file_io.new_input(data_file_path)?; - let (parquet_metadata, parquet_reader) = - try_join!(parquet_file.metadata(), parquet_file.reader())?; - let mut parquet_file_reader = ArrowFileReader::new(parquet_metadata, parquet_reader) + let parquet_reader = parquet_file.reader().await?; + let mut parquet_file_reader = + ArrowFileReader::new(FileMetadata { size: file_size_in_bytes }, parquet_reader) .with_preload_column_index(true) .with_preload_offset_index(true) .with_preload_page_index(should_load_page_index); @@ -2121,6 +2124,7 @@ message schema { ) -> Vec> { let tasks = Box::pin(futures::stream::iter( vec![Ok(FileScanTask { + file_size_in_bytes: 0, start: 0, length: 0, record_count: None, @@ -2443,6 +2447,7 @@ message schema { // Task 1: read only the first row group let task1 = FileScanTask { + file_size_in_bytes: 0, start: rg0_start, length: row_group_0.compressed_size() as u64, record_count: Some(100), @@ -2460,6 +2465,7 @@ message schema { // Task 2: read the second and third row groups let task2 = FileScanTask { + file_size_in_bytes: 0, start: rg1_start, length: file_end - rg1_start, record_count: Some(200), @@ -2588,6 +2594,7 @@ message schema { let reader = ArrowReaderBuilder::new(file_io).build(); let tasks = Box::pin(futures::stream::iter( vec![Ok(FileScanTask { + file_size_in_bytes: 0, start: 0, length: 0, record_count: None, @@ -2755,6 +2762,7 @@ message schema { let reader = ArrowReaderBuilder::new(file_io).build(); let task = FileScanTask { + file_size_in_bytes: 0, start: 0, length: 0, record_count: Some(200), @@ -2973,6 +2981,7 @@ message schema { // Create FileScanTask that reads ONLY row group 1 via byte range filtering let task = FileScanTask { + file_size_in_bytes: 0, start: rg1_start, length: rg1_length, record_count: Some(100), // Row group 1 has 100 rows @@ -3184,6 +3193,7 @@ message schema { // Create FileScanTask that reads ONLY row group 1 via byte range filtering let task = FileScanTask { + file_size_in_bytes: 0, start: rg1_start, length: rg1_length, record_count: Some(100), // Row group 1 has 100 rows @@ -3293,6 +3303,7 @@ message schema { let tasks = Box::pin(futures::stream::iter( vec![Ok(FileScanTask { + file_size_in_bytes: 0, start: 0, length: 0, record_count: None, @@ -3391,6 +3402,7 @@ message schema { let tasks = Box::pin(futures::stream::iter( vec![Ok(FileScanTask { + file_size_in_bytes: 0, start: 0, length: 0, record_count: None, @@ -3478,6 +3490,7 @@ message schema { let tasks = Box::pin(futures::stream::iter( vec![Ok(FileScanTask { + file_size_in_bytes: 0, start: 0, length: 0, record_count: None, @@ -3579,6 +3592,7 @@ message schema { let tasks = Box::pin(futures::stream::iter( vec![Ok(FileScanTask { + file_size_in_bytes: 0, start: 0, length: 0, record_count: None, @@ -3709,6 +3723,7 @@ message schema { let tasks = Box::pin(futures::stream::iter( vec![Ok(FileScanTask { + file_size_in_bytes: 0, start: 0, length: 0, record_count: None, @@ -3806,6 +3821,7 @@ message schema { let tasks = Box::pin(futures::stream::iter( vec![Ok(FileScanTask { + file_size_in_bytes: 0, start: 0, length: 0, record_count: None, @@ -3916,6 +3932,7 @@ message schema { let tasks = Box::pin(futures::stream::iter( vec![Ok(FileScanTask { + file_size_in_bytes: 0, start: 0, length: 0, record_count: None, @@ -4007,6 +4024,7 @@ message schema { // Create tasks in a specific order: file_0, file_1, file_2 let tasks = vec![ Ok(FileScanTask { + file_size_in_bytes: 0, start: 0, length: 0, record_count: None, @@ -4022,6 +4040,7 @@ message schema { case_sensitive: false, }), Ok(FileScanTask { + file_size_in_bytes: 0, start: 0, length: 0, record_count: None, @@ -4037,6 +4056,7 @@ message schema { case_sensitive: false, }), Ok(FileScanTask { + file_size_in_bytes: 0, start: 0, length: 0, record_count: None, @@ -4216,6 +4236,7 @@ message schema { let reader = ArrowReaderBuilder::new(file_io).build(); let tasks = Box::pin(futures::stream::iter( vec![Ok(FileScanTask { + file_size_in_bytes: 0, start: 0, length: 0, record_count: None, diff --git a/crates/iceberg/src/scan/context.rs b/crates/iceberg/src/scan/context.rs index 169d8e6405..aa28ffd5a2 100644 --- a/crates/iceberg/src/scan/context.rs +++ b/crates/iceberg/src/scan/context.rs @@ -117,6 +117,7 @@ impl ManifestEntryContext { .await; Ok(FileScanTask { + file_size_in_bytes: self.manifest_entry.file_size_in_bytes(), start: 0, length: self.manifest_entry.file_size_in_bytes(), record_count: Some(self.manifest_entry.record_count()), diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs index c055c12c9a..8fd1b3eedf 100644 --- a/crates/iceberg/src/scan/mod.rs +++ b/crates/iceberg/src/scan/mod.rs @@ -756,7 +756,10 @@ pub mod tests { let current_schema = current_snapshot.schema(self.table.metadata()).unwrap(); let current_partition_spec = self.table.metadata().default_partition_spec(); - // Write data files + // Write the Parquet data files first so we can get the real file size + let parquet_file_size = self.write_parquet_data_files(); + + // Write manifest entries with the real file size let mut writer = ManifestWriterBuilder::new( self.next_manifest_file(), Some(current_snapshot.snapshot_id()), @@ -775,7 +778,7 @@ pub mod tests { .content(DataContentType::Data) .file_path(format!("{}/1.parquet", &self.table_location)) .file_format(DataFileFormat::Parquet) - .file_size_in_bytes(100) + .file_size_in_bytes(parquet_file_size) .record_count(1) .partition(Struct::from_iter([Some(Literal::long(100))])) .key_metadata(None) @@ -798,7 +801,7 @@ pub mod tests { .content(DataContentType::Data) .file_path(format!("{}/2.parquet", &self.table_location)) .file_format(DataFileFormat::Parquet) - .file_size_in_bytes(100) + .file_size_in_bytes(parquet_file_size) .record_count(1) .partition(Struct::from_iter([Some(Literal::long(200))])) .build() @@ -820,7 +823,7 @@ pub mod tests { .content(DataContentType::Data) .file_path(format!("{}/3.parquet", &self.table_location)) .file_format(DataFileFormat::Parquet) - .file_size_in_bytes(100) + .file_size_in_bytes(parquet_file_size) .record_count(1) .partition(Struct::from_iter([Some(Literal::long(300))])) .build() @@ -845,8 +848,13 @@ pub mod tests { .add_manifests(vec![data_file_manifest].into_iter()) .unwrap(); manifest_list_write.close().await.unwrap(); + } + + /// Writes identical Parquet data files (1.parquet, 2.parquet, 3.parquet) + /// and returns the file size in bytes. + fn write_parquet_data_files(&self) -> u64 { + std::fs::create_dir_all(&self.table_location).unwrap(); - // prepare data let schema = { let fields = vec![ arrow_schema::Field::new("x", arrow_schema::DataType::Int64, false) @@ -892,47 +900,38 @@ pub mod tests { ]; Arc::new(arrow_schema::Schema::new(fields)) }; - // x: [1, 1, 1, 1, ...] + let col1 = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef; let mut values = vec![2; 512]; values.append(vec![3; 200].as_mut()); values.append(vec![4; 300].as_mut()); values.append(vec![5; 12].as_mut()); - - // y: [2, 2, 2, 2, ..., 3, 3, 3, 3, ..., 4, 4, 4, 4, ..., 5, 5, 5, 5] let col2 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef; let mut values = vec![3; 512]; values.append(vec![4; 512].as_mut()); - - // z: [3, 3, 3, 3, ..., 4, 4, 4, 4] let col3 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef; - // a: ["Apache", "Apache", "Apache", ..., "Iceberg", "Iceberg", "Iceberg"] let mut values = vec!["Apache"; 512]; values.append(vec!["Iceberg"; 512].as_mut()); let col4 = Arc::new(StringArray::from_iter_values(values)) as ArrayRef; - // dbl: let mut values = vec![100.0f64; 512]; values.append(vec![150.0f64; 12].as_mut()); values.append(vec![200.0f64; 500].as_mut()); let col5 = Arc::new(Float64Array::from_iter_values(values)) as ArrayRef; - // i32: let mut values = vec![100i32; 512]; values.append(vec![150i32; 12].as_mut()); values.append(vec![200i32; 500].as_mut()); let col6 = Arc::new(Int32Array::from_iter_values(values)) as ArrayRef; - // i64: let mut values = vec![100i64; 512]; values.append(vec![150i64; 12].as_mut()); values.append(vec![200i64; 500].as_mut()); let col7 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef; - // bool: let mut values = vec![false; 512]; values.append(vec![true; 512].as_mut()); let values: BooleanArray = values.into(); @@ -943,7 +942,6 @@ pub mod tests { ]) .unwrap(); - // Write the Parquet files let props = WriterProperties::builder() .set_compression(Compression::SNAPPY) .build(); @@ -952,12 +950,13 @@ pub mod tests { let file = File::create(format!("{}/{}.parquet", &self.table_location, n)).unwrap(); let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props.clone())).unwrap(); - writer.write(&to_write).expect("Writing batch"); - - // writer must be closed to write footer writer.close().unwrap(); } + + std::fs::metadata(format!("{}/1.parquet", &self.table_location)) + .unwrap() + .len() } pub async fn setup_unpartitioned_manifest_files(&mut self) { @@ -968,7 +967,10 @@ pub mod tests { let current_schema = current_snapshot.schema(self.table.metadata()).unwrap(); let current_partition_spec = Arc::new(PartitionSpec::unpartition_spec()); - // Write data files using an empty partition for unpartitioned tables. + // Write the Parquet data files first so we can get the real file size + let parquet_file_size = self.write_parquet_data_files(); + + // Write manifest entries with the real file size let mut writer = ManifestWriterBuilder::new( self.next_manifest_file(), Some(current_snapshot.snapshot_id()), @@ -978,7 +980,6 @@ pub mod tests { ) .build_v2_data(); - // Create an empty partition value. let empty_partition = Struct::empty(); writer @@ -991,7 +992,7 @@ pub mod tests { .content(DataContentType::Data) .file_path(format!("{}/1.parquet", &self.table_location)) .file_format(DataFileFormat::Parquet) - .file_size_in_bytes(100) + .file_size_in_bytes(parquet_file_size) .record_count(1) .partition(empty_partition.clone()) .key_metadata(None) @@ -1015,7 +1016,7 @@ pub mod tests { .content(DataContentType::Data) .file_path(format!("{}/2.parquet", &self.table_location)) .file_format(DataFileFormat::Parquet) - .file_size_in_bytes(100) + .file_size_in_bytes(parquet_file_size) .record_count(1) .partition(empty_partition.clone()) .build() @@ -1038,7 +1039,7 @@ pub mod tests { .content(DataContentType::Data) .file_path(format!("{}/3.parquet", &self.table_location)) .file_format(DataFileFormat::Parquet) - .file_size_in_bytes(100) + .file_size_in_bytes(parquet_file_size) .record_count(1) .partition(empty_partition.clone()) .build() @@ -1064,111 +1065,6 @@ pub mod tests { .add_manifests(vec![data_file_manifest].into_iter()) .unwrap(); manifest_list_write.close().await.unwrap(); - - // prepare data for parquet files - let schema = { - let fields = vec![ - arrow_schema::Field::new("x", arrow_schema::DataType::Int64, false) - .with_metadata(HashMap::from([( - PARQUET_FIELD_ID_META_KEY.to_string(), - "1".to_string(), - )])), - arrow_schema::Field::new("y", arrow_schema::DataType::Int64, false) - .with_metadata(HashMap::from([( - PARQUET_FIELD_ID_META_KEY.to_string(), - "2".to_string(), - )])), - arrow_schema::Field::new("z", arrow_schema::DataType::Int64, false) - .with_metadata(HashMap::from([( - PARQUET_FIELD_ID_META_KEY.to_string(), - "3".to_string(), - )])), - arrow_schema::Field::new("a", arrow_schema::DataType::Utf8, false) - .with_metadata(HashMap::from([( - PARQUET_FIELD_ID_META_KEY.to_string(), - "4".to_string(), - )])), - arrow_schema::Field::new("dbl", arrow_schema::DataType::Float64, false) - .with_metadata(HashMap::from([( - PARQUET_FIELD_ID_META_KEY.to_string(), - "5".to_string(), - )])), - arrow_schema::Field::new("i32", arrow_schema::DataType::Int32, false) - .with_metadata(HashMap::from([( - PARQUET_FIELD_ID_META_KEY.to_string(), - "6".to_string(), - )])), - arrow_schema::Field::new("i64", arrow_schema::DataType::Int64, false) - .with_metadata(HashMap::from([( - PARQUET_FIELD_ID_META_KEY.to_string(), - "7".to_string(), - )])), - arrow_schema::Field::new("bool", arrow_schema::DataType::Boolean, false) - .with_metadata(HashMap::from([( - PARQUET_FIELD_ID_META_KEY.to_string(), - "8".to_string(), - )])), - ]; - Arc::new(arrow_schema::Schema::new(fields)) - }; - - // Build the arrays for the RecordBatch - let col1 = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef; - - let mut values = vec![2; 512]; - values.append(vec![3; 200].as_mut()); - values.append(vec![4; 300].as_mut()); - values.append(vec![5; 12].as_mut()); - let col2 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef; - - let mut values = vec![3; 512]; - values.append(vec![4; 512].as_mut()); - let col3 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef; - - let mut values = vec!["Apache"; 512]; - values.append(vec!["Iceberg"; 512].as_mut()); - let col4 = Arc::new(StringArray::from_iter_values(values)) as ArrayRef; - - let mut values = vec![100.0f64; 512]; - values.append(vec![150.0f64; 12].as_mut()); - values.append(vec![200.0f64; 500].as_mut()); - let col5 = Arc::new(Float64Array::from_iter_values(values)) as ArrayRef; - - let mut values = vec![100i32; 512]; - values.append(vec![150i32; 12].as_mut()); - values.append(vec![200i32; 500].as_mut()); - let col6 = Arc::new(Int32Array::from_iter_values(values)) as ArrayRef; - - let mut values = vec![100i64; 512]; - values.append(vec![150i64; 12].as_mut()); - values.append(vec![200i64; 500].as_mut()); - let col7 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef; - - let mut values = vec![false; 512]; - values.append(vec![true; 512].as_mut()); - let values: BooleanArray = values.into(); - let col8 = Arc::new(values) as ArrayRef; - - let to_write = RecordBatch::try_new(schema.clone(), vec![ - col1, col2, col3, col4, col5, col6, col7, col8, - ]) - .unwrap(); - - // Write the Parquet files - let props = WriterProperties::builder() - .set_compression(Compression::SNAPPY) - .build(); - - for n in 1..=3 { - let file = File::create(format!("{}/{}.parquet", &self.table_location, n)).unwrap(); - let mut writer = - ArrowWriter::try_new(file, to_write.schema(), Some(props.clone())).unwrap(); - - writer.write(&to_write).expect("Writing batch"); - - // writer must be closed to write footer - writer.close().unwrap(); - } } pub async fn setup_deadlock_manifests(&mut self) { @@ -1874,6 +1770,7 @@ pub mod tests { ); let task = FileScanTask { data_file_path: "data_file_path".to_string(), + file_size_in_bytes: 0, start: 0, length: 100, project_field_ids: vec![1, 2, 3], @@ -1892,6 +1789,7 @@ pub mod tests { // with predicate let task = FileScanTask { data_file_path: "data_file_path".to_string(), + file_size_in_bytes: 0, start: 0, length: 100, project_field_ids: vec![1, 2, 3], diff --git a/crates/iceberg/src/scan/task.rs b/crates/iceberg/src/scan/task.rs index 5349a9bdd2..67615c351e 100644 --- a/crates/iceberg/src/scan/task.rs +++ b/crates/iceberg/src/scan/task.rs @@ -51,6 +51,9 @@ where D: serde::Deserializer<'de> { /// A task to scan part of file. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct FileScanTask { + /// The total size of the data file in bytes, from the manifest entry. + /// Used to skip a stat/HEAD request when reading Parquet footers. + pub file_size_in_bytes: u64, /// The start offset of the file to scan. pub start: u64, /// The length of the file to scan. @@ -146,6 +149,7 @@ impl From<&DeleteFileContext> for FileScanTaskDeleteFile { fn from(ctx: &DeleteFileContext) -> Self { FileScanTaskDeleteFile { file_path: ctx.manifest_entry.file_path().to_string(), + file_size_in_bytes: ctx.manifest_entry.file_size_in_bytes(), file_type: ctx.manifest_entry.content_type(), partition_spec_id: ctx.partition_spec_id, equality_ids: ctx.manifest_entry.data_file.equality_ids.clone(), @@ -159,6 +163,9 @@ pub struct FileScanTaskDeleteFile { /// The delete file path pub file_path: String, + /// The total size of the delete file in bytes, from the manifest entry. + pub file_size_in_bytes: u64, + /// delete file type pub file_type: DataContentType, From 981ce7c4aaca729ab709d07d8e94583d99e50fb2 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Tue, 24 Feb 2026 13:41:40 -0500 Subject: [PATCH 2/5] Restore accidentally removed comments. --- crates/iceberg/src/scan/mod.rs | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs index 8fd1b3eedf..0ba5981805 100644 --- a/crates/iceberg/src/scan/mod.rs +++ b/crates/iceberg/src/scan/mod.rs @@ -900,38 +900,47 @@ pub mod tests { ]; Arc::new(arrow_schema::Schema::new(fields)) }; - + // x: [1, 1, 1, 1, ...] let col1 = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef; let mut values = vec![2; 512]; values.append(vec![3; 200].as_mut()); values.append(vec![4; 300].as_mut()); values.append(vec![5; 12].as_mut()); + + // y: [2, 2, 2, 2, ..., 3, 3, 3, 3, ..., 4, 4, 4, 4, ..., 5, 5, 5, 5] let col2 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef; let mut values = vec![3; 512]; values.append(vec![4; 512].as_mut()); + + // z: [3, 3, 3, 3, ..., 4, 4, 4, 4] let col3 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef; + // a: ["Apache", "Apache", "Apache", ..., "Iceberg", "Iceberg", "Iceberg"] let mut values = vec!["Apache"; 512]; values.append(vec!["Iceberg"; 512].as_mut()); let col4 = Arc::new(StringArray::from_iter_values(values)) as ArrayRef; + // dbl: let mut values = vec![100.0f64; 512]; values.append(vec![150.0f64; 12].as_mut()); values.append(vec![200.0f64; 500].as_mut()); let col5 = Arc::new(Float64Array::from_iter_values(values)) as ArrayRef; + // i32: let mut values = vec![100i32; 512]; values.append(vec![150i32; 12].as_mut()); values.append(vec![200i32; 500].as_mut()); let col6 = Arc::new(Int32Array::from_iter_values(values)) as ArrayRef; + // i64: let mut values = vec![100i64; 512]; values.append(vec![150i64; 12].as_mut()); values.append(vec![200i64; 500].as_mut()); let col7 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef; + // bool: let mut values = vec![false; 512]; values.append(vec![true; 512].as_mut()); let values: BooleanArray = values.into(); @@ -942,6 +951,7 @@ pub mod tests { ]) .unwrap(); + // Write the Parquet files let props = WriterProperties::builder() .set_compression(Compression::SNAPPY) .build(); @@ -950,7 +960,10 @@ pub mod tests { let file = File::create(format!("{}/{}.parquet", &self.table_location, n)).unwrap(); let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props.clone())).unwrap(); + writer.write(&to_write).expect("Writing batch"); + + // writer must be closed to write footer writer.close().unwrap(); } @@ -970,7 +983,7 @@ pub mod tests { // Write the Parquet data files first so we can get the real file size let parquet_file_size = self.write_parquet_data_files(); - // Write manifest entries with the real file size + // Write manifest entries with the real file size and using an empty partition for unpartitioned tables. let mut writer = ManifestWriterBuilder::new( self.next_manifest_file(), Some(current_snapshot.snapshot_id()), @@ -980,6 +993,7 @@ pub mod tests { ) .build_v2_data(); + // Create an empty partition value. let empty_partition = Struct::empty(); writer From f558510838b3a7b4e9eb3f0b2c3009192caab40f Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Tue, 24 Feb 2026 13:44:28 -0500 Subject: [PATCH 3/5] Clean up comments. --- crates/iceberg/src/scan/mod.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs index 0ba5981805..2a685cd171 100644 --- a/crates/iceberg/src/scan/mod.rs +++ b/crates/iceberg/src/scan/mod.rs @@ -756,10 +756,9 @@ pub mod tests { let current_schema = current_snapshot.schema(self.table.metadata()).unwrap(); let current_partition_spec = self.table.metadata().default_partition_spec(); - // Write the Parquet data files first so we can get the real file size + // Write the data files first, then use the file size in the manifest entries let parquet_file_size = self.write_parquet_data_files(); - // Write manifest entries with the real file size let mut writer = ManifestWriterBuilder::new( self.next_manifest_file(), Some(current_snapshot.snapshot_id()), @@ -980,10 +979,10 @@ pub mod tests { let current_schema = current_snapshot.schema(self.table.metadata()).unwrap(); let current_partition_spec = Arc::new(PartitionSpec::unpartition_spec()); - // Write the Parquet data files first so we can get the real file size + // Write the data files first, then use the file size in the manifest entries let parquet_file_size = self.write_parquet_data_files(); - // Write manifest entries with the real file size and using an empty partition for unpartitioned tables. + // Write data files using an empty partition for unpartitioned tables. let mut writer = ManifestWriterBuilder::new( self.next_manifest_file(), Some(current_snapshot.snapshot_id()), From 422c9d223f07413bc9ea05e9c5e9f410e457fa7a Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Wed, 25 Feb 2026 09:22:34 -0500 Subject: [PATCH 4/5] cargo fmt --- .../iceberg/src/arrow/delete_file_loader.rs | 4 +++- crates/iceberg/src/arrow/delete_filter.rs | 21 ++++++++++++++++--- crates/iceberg/src/arrow/reader.rs | 17 ++++++++++----- 3 files changed, 33 insertions(+), 9 deletions(-) diff --git a/crates/iceberg/src/arrow/delete_file_loader.rs b/crates/iceberg/src/arrow/delete_file_loader.rs index 4998287876..33744d876f 100644 --- a/crates/iceberg/src/arrow/delete_file_loader.rs +++ b/crates/iceberg/src/arrow/delete_file_loader.rs @@ -104,7 +104,9 @@ impl DeleteFileLoader for BasicDeleteFileLoader { task: &FileScanTaskDeleteFile, schema: SchemaRef, ) -> Result { - let raw_batch_stream = self.parquet_to_batch_stream(&task.file_path, task.file_size_in_bytes).await?; + let raw_batch_stream = self + .parquet_to_batch_stream(&task.file_path, task.file_size_in_bytes) + .await?; // For equality deletes, only evolve the equality_ids columns. // For positional deletes (equality_ids is None), use all field IDs. diff --git a/crates/iceberg/src/arrow/delete_filter.rs b/crates/iceberg/src/arrow/delete_filter.rs index ecc28438f8..3d074ca32e 100644 --- a/crates/iceberg/src/arrow/delete_filter.rs +++ b/crates/iceberg/src/arrow/delete_filter.rs @@ -378,7 +378,12 @@ pub(crate) mod tests { let pos_del_1 = FileScanTaskDeleteFile { file_path: format!("{}/pos-del-1.parquet", table_location.to_str().unwrap()), - file_size_in_bytes: std::fs::metadata(format!("{}/pos-del-1.parquet", table_location.to_str().unwrap())).unwrap().len(), + file_size_in_bytes: std::fs::metadata(format!( + "{}/pos-del-1.parquet", + table_location.to_str().unwrap() + )) + .unwrap() + .len(), file_type: DataContentType::PositionDeletes, partition_spec_id: 0, equality_ids: None, @@ -386,7 +391,12 @@ pub(crate) mod tests { let pos_del_2 = FileScanTaskDeleteFile { file_path: format!("{}/pos-del-2.parquet", table_location.to_str().unwrap()), - file_size_in_bytes: std::fs::metadata(format!("{}/pos-del-2.parquet", table_location.to_str().unwrap())).unwrap().len(), + file_size_in_bytes: std::fs::metadata(format!( + "{}/pos-del-2.parquet", + table_location.to_str().unwrap() + )) + .unwrap() + .len(), file_type: DataContentType::PositionDeletes, partition_spec_id: 0, equality_ids: None, @@ -394,7 +404,12 @@ pub(crate) mod tests { let pos_del_3 = FileScanTaskDeleteFile { file_path: format!("{}/pos-del-3.parquet", table_location.to_str().unwrap()), - file_size_in_bytes: std::fs::metadata(format!("{}/pos-del-3.parquet", table_location.to_str().unwrap())).unwrap().len(), + file_size_in_bytes: std::fs::metadata(format!( + "{}/pos-del-3.parquet", + table_location.to_str().unwrap() + )) + .unwrap() + .len(), file_type: DataContentType::PositionDeletes, partition_spec_id: 0, equality_ids: None, diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 42f0cb61ee..2cf37ea35d 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -502,11 +502,15 @@ impl ArrowReader { // a reader for the data within let parquet_file = file_io.new_input(data_file_path)?; let parquet_reader = parquet_file.reader().await?; - let mut parquet_file_reader = - ArrowFileReader::new(FileMetadata { size: file_size_in_bytes }, parquet_reader) - .with_preload_column_index(true) - .with_preload_offset_index(true) - .with_preload_page_index(should_load_page_index); + let mut parquet_file_reader = ArrowFileReader::new( + FileMetadata { + size: file_size_in_bytes, + }, + parquet_reader, + ) + .with_preload_column_index(true) + .with_preload_offset_index(true) + .with_preload_page_index(should_load_page_index); if let Some(hint) = metadata_size_hint { parquet_file_reader = parquet_file_reader.with_metadata_size_hint(hint); @@ -2772,6 +2776,7 @@ message schema { project_field_ids: vec![1], predicate: None, deletes: vec![FileScanTaskDeleteFile { + file_size_in_bytes: std::fs::metadata(&delete_file_path).unwrap().len(), file_path: delete_file_path, file_type: DataContentType::PositionDeletes, partition_spec_id: 0, @@ -2991,6 +2996,7 @@ message schema { project_field_ids: vec![1], predicate: None, deletes: vec![FileScanTaskDeleteFile { + file_size_in_bytes: std::fs::metadata(&delete_file_path).unwrap().len(), file_path: delete_file_path, file_type: DataContentType::PositionDeletes, partition_spec_id: 0, @@ -3203,6 +3209,7 @@ message schema { project_field_ids: vec![1], predicate: None, deletes: vec![FileScanTaskDeleteFile { + file_size_in_bytes: std::fs::metadata(&delete_file_path).unwrap().len(), file_path: delete_file_path, file_type: DataContentType::PositionDeletes, partition_spec_id: 0, From 9ac23823a192c2fbaf5018dfa95e84fd8d769b0f Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Wed, 25 Feb 2026 09:29:23 -0500 Subject: [PATCH 5/5] update tests to use real file sizes --- crates/iceberg/src/arrow/reader.rs | 62 +++++++++++++++++++++--------- 1 file changed, 44 insertions(+), 18 deletions(-) diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 2cf37ea35d..93dbdaa35d 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -2128,7 +2128,9 @@ message schema { ) -> Vec> { let tasks = Box::pin(futures::stream::iter( vec![Ok(FileScanTask { - file_size_in_bytes: 0, + file_size_in_bytes: std::fs::metadata(format!("{table_location}/1.parquet")) + .unwrap() + .len(), start: 0, length: 0, record_count: None, @@ -2451,7 +2453,7 @@ message schema { // Task 1: read only the first row group let task1 = FileScanTask { - file_size_in_bytes: 0, + file_size_in_bytes: std::fs::metadata(&file_path).unwrap().len(), start: rg0_start, length: row_group_0.compressed_size() as u64, record_count: Some(100), @@ -2469,7 +2471,7 @@ message schema { // Task 2: read the second and third row groups let task2 = FileScanTask { - file_size_in_bytes: 0, + file_size_in_bytes: std::fs::metadata(&file_path).unwrap().len(), start: rg1_start, length: file_end - rg1_start, record_count: Some(200), @@ -2598,7 +2600,9 @@ message schema { let reader = ArrowReaderBuilder::new(file_io).build(); let tasks = Box::pin(futures::stream::iter( vec![Ok(FileScanTask { - file_size_in_bytes: 0, + file_size_in_bytes: std::fs::metadata(format!("{table_location}/old_file.parquet")) + .unwrap() + .len(), start: 0, length: 0, record_count: None, @@ -2766,7 +2770,7 @@ message schema { let reader = ArrowReaderBuilder::new(file_io).build(); let task = FileScanTask { - file_size_in_bytes: 0, + file_size_in_bytes: std::fs::metadata(&data_file_path).unwrap().len(), start: 0, length: 0, record_count: Some(200), @@ -2986,7 +2990,7 @@ message schema { // Create FileScanTask that reads ONLY row group 1 via byte range filtering let task = FileScanTask { - file_size_in_bytes: 0, + file_size_in_bytes: std::fs::metadata(&data_file_path).unwrap().len(), start: rg1_start, length: rg1_length, record_count: Some(100), // Row group 1 has 100 rows @@ -3199,7 +3203,7 @@ message schema { // Create FileScanTask that reads ONLY row group 1 via byte range filtering let task = FileScanTask { - file_size_in_bytes: 0, + file_size_in_bytes: std::fs::metadata(&data_file_path).unwrap().len(), start: rg1_start, length: rg1_length, record_count: Some(100), // Row group 1 has 100 rows @@ -3310,7 +3314,9 @@ message schema { let tasks = Box::pin(futures::stream::iter( vec![Ok(FileScanTask { - file_size_in_bytes: 0, + file_size_in_bytes: std::fs::metadata(format!("{table_location}/1.parquet")) + .unwrap() + .len(), start: 0, length: 0, record_count: None, @@ -3409,7 +3415,9 @@ message schema { let tasks = Box::pin(futures::stream::iter( vec![Ok(FileScanTask { - file_size_in_bytes: 0, + file_size_in_bytes: std::fs::metadata(format!("{table_location}/1.parquet")) + .unwrap() + .len(), start: 0, length: 0, record_count: None, @@ -3497,7 +3505,9 @@ message schema { let tasks = Box::pin(futures::stream::iter( vec![Ok(FileScanTask { - file_size_in_bytes: 0, + file_size_in_bytes: std::fs::metadata(format!("{table_location}/1.parquet")) + .unwrap() + .len(), start: 0, length: 0, record_count: None, @@ -3599,7 +3609,9 @@ message schema { let tasks = Box::pin(futures::stream::iter( vec![Ok(FileScanTask { - file_size_in_bytes: 0, + file_size_in_bytes: std::fs::metadata(format!("{table_location}/1.parquet")) + .unwrap() + .len(), start: 0, length: 0, record_count: None, @@ -3730,7 +3742,9 @@ message schema { let tasks = Box::pin(futures::stream::iter( vec![Ok(FileScanTask { - file_size_in_bytes: 0, + file_size_in_bytes: std::fs::metadata(format!("{table_location}/1.parquet")) + .unwrap() + .len(), start: 0, length: 0, record_count: None, @@ -3828,7 +3842,9 @@ message schema { let tasks = Box::pin(futures::stream::iter( vec![Ok(FileScanTask { - file_size_in_bytes: 0, + file_size_in_bytes: std::fs::metadata(format!("{table_location}/1.parquet")) + .unwrap() + .len(), start: 0, length: 0, record_count: None, @@ -3939,7 +3955,9 @@ message schema { let tasks = Box::pin(futures::stream::iter( vec![Ok(FileScanTask { - file_size_in_bytes: 0, + file_size_in_bytes: std::fs::metadata(format!("{table_location}/1.parquet")) + .unwrap() + .len(), start: 0, length: 0, record_count: None, @@ -4031,7 +4049,9 @@ message schema { // Create tasks in a specific order: file_0, file_1, file_2 let tasks = vec![ Ok(FileScanTask { - file_size_in_bytes: 0, + file_size_in_bytes: std::fs::metadata(format!("{table_location}/file_0.parquet")) + .unwrap() + .len(), start: 0, length: 0, record_count: None, @@ -4047,7 +4067,9 @@ message schema { case_sensitive: false, }), Ok(FileScanTask { - file_size_in_bytes: 0, + file_size_in_bytes: std::fs::metadata(format!("{table_location}/file_1.parquet")) + .unwrap() + .len(), start: 0, length: 0, record_count: None, @@ -4063,7 +4085,9 @@ message schema { case_sensitive: false, }), Ok(FileScanTask { - file_size_in_bytes: 0, + file_size_in_bytes: std::fs::metadata(format!("{table_location}/file_2.parquet")) + .unwrap() + .len(), start: 0, length: 0, record_count: None, @@ -4243,7 +4267,9 @@ message schema { let reader = ArrowReaderBuilder::new(file_io).build(); let tasks = Box::pin(futures::stream::iter( vec![Ok(FileScanTask { - file_size_in_bytes: 0, + file_size_in_bytes: std::fs::metadata(format!("{table_location}/data.parquet")) + .unwrap() + .len(), start: 0, length: 0, record_count: None,