diff --git a/crates/iceberg/src/arrow/caching_delete_file_loader.rs b/crates/iceberg/src/arrow/caching_delete_file_loader.rs index 5d0b1da712..dfee7ed87b 100644 --- a/crates/iceberg/src/arrow/caching_delete_file_loader.rs +++ b/crates/iceberg/src/arrow/caching_delete_file_loader.rs @@ -235,7 +235,7 @@ impl CachingDeleteFileLoader { PosDelLoadAction::Load => Ok(DeleteFileContext::PosDels { file_path: task.file_path.clone(), stream: basic_delete_file_loader - .parquet_to_batch_stream(&task.file_path) + .parquet_to_batch_stream(&task.file_path, task.file_size_in_bytes) .await?, }), } @@ -254,7 +254,7 @@ impl CachingDeleteFileLoader { let equality_ids_vec = task.equality_ids.clone().unwrap(); let evolved_stream = BasicDeleteFileLoader::evolve_schema( basic_delete_file_loader - .parquet_to_batch_stream(&task.file_path) + .parquet_to_batch_stream(&task.file_path, task.file_size_in_bytes) .await?, schema, &equality_ids_vec, @@ -614,7 +614,10 @@ mod tests { let basic_delete_file_loader = BasicDeleteFileLoader::new(file_io.clone()); let record_batch_stream = basic_delete_file_loader - .parquet_to_batch_stream(&eq_delete_file_path) + .parquet_to_batch_stream( + &eq_delete_file_path, + std::fs::metadata(&eq_delete_file_path).unwrap().len(), + ) .await .expect("could not get batch stream"); @@ -811,7 +814,10 @@ mod tests { let basic_delete_file_loader = BasicDeleteFileLoader::new(file_io.clone()); let batch_stream = basic_delete_file_loader - .parquet_to_batch_stream(&delete_file_path) + .parquet_to_batch_stream( + &delete_file_path, + std::fs::metadata(&delete_file_path).unwrap().len(), + ) .await .unwrap(); @@ -913,7 +919,8 @@ mod tests { // Create FileScanTask with BOTH positional and equality deletes let pos_del = FileScanTaskDeleteFile { - file_path: pos_del_path, + file_path: pos_del_path.clone(), + file_size_in_bytes: std::fs::metadata(&pos_del_path).unwrap().len(), file_type: DataContentType::PositionDeletes, partition_spec_id: 0, equality_ids: None, @@ -921,12 +928,14 @@ mod tests { let eq_del = FileScanTaskDeleteFile { file_path: eq_delete_path.clone(), + file_size_in_bytes: std::fs::metadata(&eq_delete_path).unwrap().len(), file_type: DataContentType::EqualityDeletes, partition_spec_id: 0, equality_ids: Some(vec![2, 3]), // Only use field IDs that exist in both schemas }; let file_scan_task = FileScanTask { + file_size_in_bytes: 0, start: 0, length: 0, record_count: None, @@ -993,7 +1002,7 @@ mod tests { let basic_delete_file_loader = BasicDeleteFileLoader::new(file_io.clone()); let record_batch_stream = basic_delete_file_loader - .parquet_to_batch_stream(&path) + .parquet_to_batch_stream(&path, std::fs::metadata(&path).unwrap().len()) .await .expect("could not get batch stream"); diff --git a/crates/iceberg/src/arrow/delete_file_loader.rs b/crates/iceberg/src/arrow/delete_file_loader.rs index fa47076fef..33744d876f 100644 --- a/crates/iceberg/src/arrow/delete_file_loader.rs +++ b/crates/iceberg/src/arrow/delete_file_loader.rs @@ -54,6 +54,7 @@ impl BasicDeleteFileLoader { pub(crate) async fn parquet_to_batch_stream( &self, data_file_path: &str, + file_size_in_bytes: u64, ) -> Result { /* Essentially a super-cut-down ArrowReader. We can't use ArrowReader directly @@ -65,6 +66,7 @@ impl BasicDeleteFileLoader { false, None, None, + file_size_in_bytes, ) .await? .build()? @@ -102,7 +104,9 @@ impl DeleteFileLoader for BasicDeleteFileLoader { task: &FileScanTaskDeleteFile, schema: SchemaRef, ) -> Result { - let raw_batch_stream = self.parquet_to_batch_stream(&task.file_path).await?; + let raw_batch_stream = self + .parquet_to_batch_stream(&task.file_path, task.file_size_in_bytes) + .await?; // For equality deletes, only evolve the equality_ids columns. // For positional deletes (equality_ids is None), use all field IDs. diff --git a/crates/iceberg/src/arrow/delete_filter.rs b/crates/iceberg/src/arrow/delete_filter.rs index 4af9f6b6ff..3d074ca32e 100644 --- a/crates/iceberg/src/arrow/delete_filter.rs +++ b/crates/iceberg/src/arrow/delete_filter.rs @@ -378,6 +378,12 @@ pub(crate) mod tests { let pos_del_1 = FileScanTaskDeleteFile { file_path: format!("{}/pos-del-1.parquet", table_location.to_str().unwrap()), + file_size_in_bytes: std::fs::metadata(format!( + "{}/pos-del-1.parquet", + table_location.to_str().unwrap() + )) + .unwrap() + .len(), file_type: DataContentType::PositionDeletes, partition_spec_id: 0, equality_ids: None, @@ -385,6 +391,12 @@ pub(crate) mod tests { let pos_del_2 = FileScanTaskDeleteFile { file_path: format!("{}/pos-del-2.parquet", table_location.to_str().unwrap()), + file_size_in_bytes: std::fs::metadata(format!( + "{}/pos-del-2.parquet", + table_location.to_str().unwrap() + )) + .unwrap() + .len(), file_type: DataContentType::PositionDeletes, partition_spec_id: 0, equality_ids: None, @@ -392,6 +404,12 @@ pub(crate) mod tests { let pos_del_3 = FileScanTaskDeleteFile { file_path: format!("{}/pos-del-3.parquet", table_location.to_str().unwrap()), + file_size_in_bytes: std::fs::metadata(format!( + "{}/pos-del-3.parquet", + table_location.to_str().unwrap() + )) + .unwrap() + .len(), file_type: DataContentType::PositionDeletes, partition_spec_id: 0, equality_ids: None, @@ -399,6 +417,7 @@ pub(crate) mod tests { let file_scan_tasks = vec![ FileScanTask { + file_size_in_bytes: 0, start: 0, length: 0, record_count: None, @@ -414,6 +433,7 @@ pub(crate) mod tests { case_sensitive: false, }, FileScanTask { + file_size_in_bytes: 0, start: 0, length: 0, record_count: None, @@ -464,6 +484,7 @@ pub(crate) mod tests { // ---------- fake FileScanTask ---------- let task = FileScanTask { + file_size_in_bytes: 0, start: 0, length: 0, record_count: None, @@ -474,6 +495,7 @@ pub(crate) mod tests { predicate: None, deletes: vec![FileScanTaskDeleteFile { file_path: "eq-del.parquet".to_string(), + file_size_in_bytes: 1, // never read; this test fails before opening the file file_type: DataContentType::EqualityDeletes, partition_spec_id: 0, equality_ids: None, diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index c4c2fa0036..93dbdaa35d 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -33,7 +33,7 @@ use arrow_string::like::starts_with; use bytes::Bytes; use fnv::FnvHashSet; use futures::future::BoxFuture; -use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt, try_join}; +use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt}; use parquet::arrow::arrow_reader::{ ArrowPredicateFn, ArrowReaderOptions, RowFilter, RowSelection, RowSelector, }; @@ -237,6 +237,7 @@ impl ArrowReader { should_load_page_index, None, metadata_size_hint, + task.file_size_in_bytes, ) .await?; @@ -290,6 +291,7 @@ impl ArrowReader { should_load_page_index, Some(options), metadata_size_hint, + task.file_size_in_bytes, ) .await? } else { @@ -494,16 +496,21 @@ impl ArrowReader { should_load_page_index: bool, arrow_reader_options: Option, metadata_size_hint: Option, + file_size_in_bytes: u64, ) -> Result> { // Get the metadata for the Parquet file we need to read and build // a reader for the data within let parquet_file = file_io.new_input(data_file_path)?; - let (parquet_metadata, parquet_reader) = - try_join!(parquet_file.metadata(), parquet_file.reader())?; - let mut parquet_file_reader = ArrowFileReader::new(parquet_metadata, parquet_reader) - .with_preload_column_index(true) - .with_preload_offset_index(true) - .with_preload_page_index(should_load_page_index); + let parquet_reader = parquet_file.reader().await?; + let mut parquet_file_reader = ArrowFileReader::new( + FileMetadata { + size: file_size_in_bytes, + }, + parquet_reader, + ) + .with_preload_column_index(true) + .with_preload_offset_index(true) + .with_preload_page_index(should_load_page_index); if let Some(hint) = metadata_size_hint { parquet_file_reader = parquet_file_reader.with_metadata_size_hint(hint); @@ -2121,6 +2128,9 @@ message schema { ) -> Vec> { let tasks = Box::pin(futures::stream::iter( vec![Ok(FileScanTask { + file_size_in_bytes: std::fs::metadata(format!("{table_location}/1.parquet")) + .unwrap() + .len(), start: 0, length: 0, record_count: None, @@ -2443,6 +2453,7 @@ message schema { // Task 1: read only the first row group let task1 = FileScanTask { + file_size_in_bytes: std::fs::metadata(&file_path).unwrap().len(), start: rg0_start, length: row_group_0.compressed_size() as u64, record_count: Some(100), @@ -2460,6 +2471,7 @@ message schema { // Task 2: read the second and third row groups let task2 = FileScanTask { + file_size_in_bytes: std::fs::metadata(&file_path).unwrap().len(), start: rg1_start, length: file_end - rg1_start, record_count: Some(200), @@ -2588,6 +2600,9 @@ message schema { let reader = ArrowReaderBuilder::new(file_io).build(); let tasks = Box::pin(futures::stream::iter( vec![Ok(FileScanTask { + file_size_in_bytes: std::fs::metadata(format!("{table_location}/old_file.parquet")) + .unwrap() + .len(), start: 0, length: 0, record_count: None, @@ -2755,6 +2770,7 @@ message schema { let reader = ArrowReaderBuilder::new(file_io).build(); let task = FileScanTask { + file_size_in_bytes: std::fs::metadata(&data_file_path).unwrap().len(), start: 0, length: 0, record_count: Some(200), @@ -2764,6 +2780,7 @@ message schema { project_field_ids: vec![1], predicate: None, deletes: vec![FileScanTaskDeleteFile { + file_size_in_bytes: std::fs::metadata(&delete_file_path).unwrap().len(), file_path: delete_file_path, file_type: DataContentType::PositionDeletes, partition_spec_id: 0, @@ -2973,6 +2990,7 @@ message schema { // Create FileScanTask that reads ONLY row group 1 via byte range filtering let task = FileScanTask { + file_size_in_bytes: std::fs::metadata(&data_file_path).unwrap().len(), start: rg1_start, length: rg1_length, record_count: Some(100), // Row group 1 has 100 rows @@ -2982,6 +3000,7 @@ message schema { project_field_ids: vec![1], predicate: None, deletes: vec![FileScanTaskDeleteFile { + file_size_in_bytes: std::fs::metadata(&delete_file_path).unwrap().len(), file_path: delete_file_path, file_type: DataContentType::PositionDeletes, partition_spec_id: 0, @@ -3184,6 +3203,7 @@ message schema { // Create FileScanTask that reads ONLY row group 1 via byte range filtering let task = FileScanTask { + file_size_in_bytes: std::fs::metadata(&data_file_path).unwrap().len(), start: rg1_start, length: rg1_length, record_count: Some(100), // Row group 1 has 100 rows @@ -3193,6 +3213,7 @@ message schema { project_field_ids: vec![1], predicate: None, deletes: vec![FileScanTaskDeleteFile { + file_size_in_bytes: std::fs::metadata(&delete_file_path).unwrap().len(), file_path: delete_file_path, file_type: DataContentType::PositionDeletes, partition_spec_id: 0, @@ -3293,6 +3314,9 @@ message schema { let tasks = Box::pin(futures::stream::iter( vec![Ok(FileScanTask { + file_size_in_bytes: std::fs::metadata(format!("{table_location}/1.parquet")) + .unwrap() + .len(), start: 0, length: 0, record_count: None, @@ -3391,6 +3415,9 @@ message schema { let tasks = Box::pin(futures::stream::iter( vec![Ok(FileScanTask { + file_size_in_bytes: std::fs::metadata(format!("{table_location}/1.parquet")) + .unwrap() + .len(), start: 0, length: 0, record_count: None, @@ -3478,6 +3505,9 @@ message schema { let tasks = Box::pin(futures::stream::iter( vec![Ok(FileScanTask { + file_size_in_bytes: std::fs::metadata(format!("{table_location}/1.parquet")) + .unwrap() + .len(), start: 0, length: 0, record_count: None, @@ -3579,6 +3609,9 @@ message schema { let tasks = Box::pin(futures::stream::iter( vec![Ok(FileScanTask { + file_size_in_bytes: std::fs::metadata(format!("{table_location}/1.parquet")) + .unwrap() + .len(), start: 0, length: 0, record_count: None, @@ -3709,6 +3742,9 @@ message schema { let tasks = Box::pin(futures::stream::iter( vec![Ok(FileScanTask { + file_size_in_bytes: std::fs::metadata(format!("{table_location}/1.parquet")) + .unwrap() + .len(), start: 0, length: 0, record_count: None, @@ -3806,6 +3842,9 @@ message schema { let tasks = Box::pin(futures::stream::iter( vec![Ok(FileScanTask { + file_size_in_bytes: std::fs::metadata(format!("{table_location}/1.parquet")) + .unwrap() + .len(), start: 0, length: 0, record_count: None, @@ -3916,6 +3955,9 @@ message schema { let tasks = Box::pin(futures::stream::iter( vec![Ok(FileScanTask { + file_size_in_bytes: std::fs::metadata(format!("{table_location}/1.parquet")) + .unwrap() + .len(), start: 0, length: 0, record_count: None, @@ -4007,6 +4049,9 @@ message schema { // Create tasks in a specific order: file_0, file_1, file_2 let tasks = vec![ Ok(FileScanTask { + file_size_in_bytes: std::fs::metadata(format!("{table_location}/file_0.parquet")) + .unwrap() + .len(), start: 0, length: 0, record_count: None, @@ -4022,6 +4067,9 @@ message schema { case_sensitive: false, }), Ok(FileScanTask { + file_size_in_bytes: std::fs::metadata(format!("{table_location}/file_1.parquet")) + .unwrap() + .len(), start: 0, length: 0, record_count: None, @@ -4037,6 +4085,9 @@ message schema { case_sensitive: false, }), Ok(FileScanTask { + file_size_in_bytes: std::fs::metadata(format!("{table_location}/file_2.parquet")) + .unwrap() + .len(), start: 0, length: 0, record_count: None, @@ -4216,6 +4267,9 @@ message schema { let reader = ArrowReaderBuilder::new(file_io).build(); let tasks = Box::pin(futures::stream::iter( vec![Ok(FileScanTask { + file_size_in_bytes: std::fs::metadata(format!("{table_location}/data.parquet")) + .unwrap() + .len(), start: 0, length: 0, record_count: None, diff --git a/crates/iceberg/src/scan/context.rs b/crates/iceberg/src/scan/context.rs index 169d8e6405..aa28ffd5a2 100644 --- a/crates/iceberg/src/scan/context.rs +++ b/crates/iceberg/src/scan/context.rs @@ -117,6 +117,7 @@ impl ManifestEntryContext { .await; Ok(FileScanTask { + file_size_in_bytes: self.manifest_entry.file_size_in_bytes(), start: 0, length: self.manifest_entry.file_size_in_bytes(), record_count: Some(self.manifest_entry.record_count()), diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs index c055c12c9a..2a685cd171 100644 --- a/crates/iceberg/src/scan/mod.rs +++ b/crates/iceberg/src/scan/mod.rs @@ -756,7 +756,9 @@ pub mod tests { let current_schema = current_snapshot.schema(self.table.metadata()).unwrap(); let current_partition_spec = self.table.metadata().default_partition_spec(); - // Write data files + // Write the data files first, then use the file size in the manifest entries + let parquet_file_size = self.write_parquet_data_files(); + let mut writer = ManifestWriterBuilder::new( self.next_manifest_file(), Some(current_snapshot.snapshot_id()), @@ -775,7 +777,7 @@ pub mod tests { .content(DataContentType::Data) .file_path(format!("{}/1.parquet", &self.table_location)) .file_format(DataFileFormat::Parquet) - .file_size_in_bytes(100) + .file_size_in_bytes(parquet_file_size) .record_count(1) .partition(Struct::from_iter([Some(Literal::long(100))])) .key_metadata(None) @@ -798,7 +800,7 @@ pub mod tests { .content(DataContentType::Data) .file_path(format!("{}/2.parquet", &self.table_location)) .file_format(DataFileFormat::Parquet) - .file_size_in_bytes(100) + .file_size_in_bytes(parquet_file_size) .record_count(1) .partition(Struct::from_iter([Some(Literal::long(200))])) .build() @@ -820,7 +822,7 @@ pub mod tests { .content(DataContentType::Data) .file_path(format!("{}/3.parquet", &self.table_location)) .file_format(DataFileFormat::Parquet) - .file_size_in_bytes(100) + .file_size_in_bytes(parquet_file_size) .record_count(1) .partition(Struct::from_iter([Some(Literal::long(300))])) .build() @@ -845,8 +847,13 @@ pub mod tests { .add_manifests(vec![data_file_manifest].into_iter()) .unwrap(); manifest_list_write.close().await.unwrap(); + } + + /// Writes identical Parquet data files (1.parquet, 2.parquet, 3.parquet) + /// and returns the file size in bytes. + fn write_parquet_data_files(&self) -> u64 { + std::fs::create_dir_all(&self.table_location).unwrap(); - // prepare data let schema = { let fields = vec![ arrow_schema::Field::new("x", arrow_schema::DataType::Int64, false) @@ -958,6 +965,10 @@ pub mod tests { // writer must be closed to write footer writer.close().unwrap(); } + + std::fs::metadata(format!("{}/1.parquet", &self.table_location)) + .unwrap() + .len() } pub async fn setup_unpartitioned_manifest_files(&mut self) { @@ -968,6 +979,9 @@ pub mod tests { let current_schema = current_snapshot.schema(self.table.metadata()).unwrap(); let current_partition_spec = Arc::new(PartitionSpec::unpartition_spec()); + // Write the data files first, then use the file size in the manifest entries + let parquet_file_size = self.write_parquet_data_files(); + // Write data files using an empty partition for unpartitioned tables. let mut writer = ManifestWriterBuilder::new( self.next_manifest_file(), @@ -991,7 +1005,7 @@ pub mod tests { .content(DataContentType::Data) .file_path(format!("{}/1.parquet", &self.table_location)) .file_format(DataFileFormat::Parquet) - .file_size_in_bytes(100) + .file_size_in_bytes(parquet_file_size) .record_count(1) .partition(empty_partition.clone()) .key_metadata(None) @@ -1015,7 +1029,7 @@ pub mod tests { .content(DataContentType::Data) .file_path(format!("{}/2.parquet", &self.table_location)) .file_format(DataFileFormat::Parquet) - .file_size_in_bytes(100) + .file_size_in_bytes(parquet_file_size) .record_count(1) .partition(empty_partition.clone()) .build() @@ -1038,7 +1052,7 @@ pub mod tests { .content(DataContentType::Data) .file_path(format!("{}/3.parquet", &self.table_location)) .file_format(DataFileFormat::Parquet) - .file_size_in_bytes(100) + .file_size_in_bytes(parquet_file_size) .record_count(1) .partition(empty_partition.clone()) .build() @@ -1064,111 +1078,6 @@ pub mod tests { .add_manifests(vec![data_file_manifest].into_iter()) .unwrap(); manifest_list_write.close().await.unwrap(); - - // prepare data for parquet files - let schema = { - let fields = vec![ - arrow_schema::Field::new("x", arrow_schema::DataType::Int64, false) - .with_metadata(HashMap::from([( - PARQUET_FIELD_ID_META_KEY.to_string(), - "1".to_string(), - )])), - arrow_schema::Field::new("y", arrow_schema::DataType::Int64, false) - .with_metadata(HashMap::from([( - PARQUET_FIELD_ID_META_KEY.to_string(), - "2".to_string(), - )])), - arrow_schema::Field::new("z", arrow_schema::DataType::Int64, false) - .with_metadata(HashMap::from([( - PARQUET_FIELD_ID_META_KEY.to_string(), - "3".to_string(), - )])), - arrow_schema::Field::new("a", arrow_schema::DataType::Utf8, false) - .with_metadata(HashMap::from([( - PARQUET_FIELD_ID_META_KEY.to_string(), - "4".to_string(), - )])), - arrow_schema::Field::new("dbl", arrow_schema::DataType::Float64, false) - .with_metadata(HashMap::from([( - PARQUET_FIELD_ID_META_KEY.to_string(), - "5".to_string(), - )])), - arrow_schema::Field::new("i32", arrow_schema::DataType::Int32, false) - .with_metadata(HashMap::from([( - PARQUET_FIELD_ID_META_KEY.to_string(), - "6".to_string(), - )])), - arrow_schema::Field::new("i64", arrow_schema::DataType::Int64, false) - .with_metadata(HashMap::from([( - PARQUET_FIELD_ID_META_KEY.to_string(), - "7".to_string(), - )])), - arrow_schema::Field::new("bool", arrow_schema::DataType::Boolean, false) - .with_metadata(HashMap::from([( - PARQUET_FIELD_ID_META_KEY.to_string(), - "8".to_string(), - )])), - ]; - Arc::new(arrow_schema::Schema::new(fields)) - }; - - // Build the arrays for the RecordBatch - let col1 = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef; - - let mut values = vec![2; 512]; - values.append(vec![3; 200].as_mut()); - values.append(vec![4; 300].as_mut()); - values.append(vec![5; 12].as_mut()); - let col2 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef; - - let mut values = vec![3; 512]; - values.append(vec![4; 512].as_mut()); - let col3 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef; - - let mut values = vec!["Apache"; 512]; - values.append(vec!["Iceberg"; 512].as_mut()); - let col4 = Arc::new(StringArray::from_iter_values(values)) as ArrayRef; - - let mut values = vec![100.0f64; 512]; - values.append(vec![150.0f64; 12].as_mut()); - values.append(vec![200.0f64; 500].as_mut()); - let col5 = Arc::new(Float64Array::from_iter_values(values)) as ArrayRef; - - let mut values = vec![100i32; 512]; - values.append(vec![150i32; 12].as_mut()); - values.append(vec![200i32; 500].as_mut()); - let col6 = Arc::new(Int32Array::from_iter_values(values)) as ArrayRef; - - let mut values = vec![100i64; 512]; - values.append(vec![150i64; 12].as_mut()); - values.append(vec![200i64; 500].as_mut()); - let col7 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef; - - let mut values = vec![false; 512]; - values.append(vec![true; 512].as_mut()); - let values: BooleanArray = values.into(); - let col8 = Arc::new(values) as ArrayRef; - - let to_write = RecordBatch::try_new(schema.clone(), vec![ - col1, col2, col3, col4, col5, col6, col7, col8, - ]) - .unwrap(); - - // Write the Parquet files - let props = WriterProperties::builder() - .set_compression(Compression::SNAPPY) - .build(); - - for n in 1..=3 { - let file = File::create(format!("{}/{}.parquet", &self.table_location, n)).unwrap(); - let mut writer = - ArrowWriter::try_new(file, to_write.schema(), Some(props.clone())).unwrap(); - - writer.write(&to_write).expect("Writing batch"); - - // writer must be closed to write footer - writer.close().unwrap(); - } } pub async fn setup_deadlock_manifests(&mut self) { @@ -1874,6 +1783,7 @@ pub mod tests { ); let task = FileScanTask { data_file_path: "data_file_path".to_string(), + file_size_in_bytes: 0, start: 0, length: 100, project_field_ids: vec![1, 2, 3], @@ -1892,6 +1802,7 @@ pub mod tests { // with predicate let task = FileScanTask { data_file_path: "data_file_path".to_string(), + file_size_in_bytes: 0, start: 0, length: 100, project_field_ids: vec![1, 2, 3], diff --git a/crates/iceberg/src/scan/task.rs b/crates/iceberg/src/scan/task.rs index 5349a9bdd2..67615c351e 100644 --- a/crates/iceberg/src/scan/task.rs +++ b/crates/iceberg/src/scan/task.rs @@ -51,6 +51,9 @@ where D: serde::Deserializer<'de> { /// A task to scan part of file. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct FileScanTask { + /// The total size of the data file in bytes, from the manifest entry. + /// Used to skip a stat/HEAD request when reading Parquet footers. + pub file_size_in_bytes: u64, /// The start offset of the file to scan. pub start: u64, /// The length of the file to scan. @@ -146,6 +149,7 @@ impl From<&DeleteFileContext> for FileScanTaskDeleteFile { fn from(ctx: &DeleteFileContext) -> Self { FileScanTaskDeleteFile { file_path: ctx.manifest_entry.file_path().to_string(), + file_size_in_bytes: ctx.manifest_entry.file_size_in_bytes(), file_type: ctx.manifest_entry.content_type(), partition_spec_id: ctx.partition_spec_id, equality_ids: ctx.manifest_entry.data_file.equality_ids.clone(), @@ -159,6 +163,9 @@ pub struct FileScanTaskDeleteFile { /// The delete file path pub file_path: String, + /// The total size of the delete file in bytes, from the manifest entry. + pub file_size_in_bytes: u64, + /// delete file type pub file_type: DataContentType,