Skip to content

Epic: ArrowReader performance improvements for split FileScanTasks #2172

@mbutrovich

Description

@mbutrovich

What's the feature are you trying to implement?

When running Iceberg workloads via DataFusion Comet, we're seeing significant per-file overhead in iceberg-rust's ArrowReader. Comet is an accelerator for Spark workloads that relies on Iceberg Java for scan planning. Iceberg Java generates FileScanTasks that are split by byte range and bin-packed across partitions for parallelism. Comet serializes these over to a native operator backed by iceberg-rust's ArrowReader.

iceberg-rust's ArrowReader treats each FileScanTask as a fully independent operation, with no reuse of I/O clients, metadata, or operators across tasks. With many thousands of tasks (common for large tables, or degenerate cases of partitioned tables with thousands of small data files), the per-task overhead becomes a significant fraction of total CPU time. Flame graphs show ~30% of executor CPU in per-file overhead: operator creation, stat() calls, TLS handshakes, and credential provider initialization.

I currently have a branch with all of these optimizations, and have seen good benefits for our workloads (DataFusion Comet PR). The optimizations below are independent and will be submitted as separate PRs, and I will benchmark them independently.

1. OpenDAL operator caching

Problem: create_operator() builds a new Operator on every call. Every storage operation (metadata(), reader(), etc.) calls create_operator(), so each FileScanTask creates at least 2 operators. Each operator construction involves credential provider initialization, signer creation, and wrapping in a RetryLayer. These are all discarded after a single use.

Fix: Cache operators by bucket/container name within OpenDalStorage (e.g., in a HashMap<String, Operator> behind a Mutex). The operator is stateless with respect to individual files — it's scoped to a bucket — so it's safe to reuse across concurrent tasks.

Scope: crates/iceberg/src/io/storage/opendal/mod.rs — the OpenDalStorage enum variants (S3, Gcs, Oss, Azdls) and create_operator().

2. File size passthrough via FileScanTask

Problem: create_parquet_record_batch_stream_builder() calls try_join!(parquet_file.metadata(), parquet_file.reader()). The metadata() call issues a stat() / HEAD request to object storage just to get the file size. This file size is already known from the manifest entry's file_size_in_bytes field, but FileScanTask doesn't carry it.

Fix: Add file_size_in_bytes: u64 to FileScanTask (populated from the manifest entry during planning), and pass it through to skip the stat() call. When present, construct FileMetadata { size } directly instead of issuing a storage round-trip.

Scope: crates/iceberg/src/scan/task.rs (add field), crates/iceberg/src/arrow/reader.rs (use it), and scan planning code that constructs FileScanTasks.

3. Eliminate double-open for migrated tables

Problem: process_file_scan_task() calls create_parquet_record_batch_stream_builder() once to inspect the Parquet schema for embedded field IDs. If the file lacks field IDs (missing_field_ids is true), it calls create_parquet_record_batch_stream_builder() a second time with ArrowReaderOptions to apply name mapping or fallback IDs. Each call issues metadata() + reader() to storage. For migrated tables, this means 4 operator creations and 2 full metadata fetches per task.

Fix: Separate metadata loading from stream builder construction. Load ParquetMetaData once, inspect it in-memory for field IDs using ArrowReaderMetadata::try_new(), then construct a single ParquetRecordBatchStreamBuilder via new_with_metadata() with the correct options.

Scope: crates/iceberg/src/arrow/reader.rs — restructure process_file_scan_task() and create_parquet_record_batch_stream_builder().

4. Parquet metadata prefetch with size hint

Problem: Without a metadata size hint, arrow-rs's Parquet reader does two round-trips to read the footer: first an 8-byte read to get the magic bytes and footer length, then a second read for the actual footer. On object storage, each round-trip has significant latency.

Fix: Use ArrowFileReader::with_metadata_size_hint() (which wraps arrow-rs's ParquetMetaDataReader::with_prefetch_hint()) to speculatively read a larger chunk from the end of the file in a single request. DataFusion uses 512KB as the default. The exact value can be discussed in the PR.

Scope: crates/iceberg/src/arrow/reader.rs — in create_parquet_record_batch_stream_builder() when constructing ArrowFileReader.

5. Parquet metadata caching across same-file splits

Problem: When Iceberg Java splits a large file into multiple FileScanTasks (different start/length byte ranges of the same file), iceberg-rust re-fetches the Parquet metadata for each split independently. The metadata is identical across splits of the same file.

Fix: Cache ParquetMetaData by (file_path, should_load_page_index) within a read() invocation using a bounded LRU cache (e.g., moka). Subsequent tasks for the same file get a cache hit and skip all metadata I/O. Use ParquetRecordBatchStreamBuilder::new_with_metadata() to pass the cached metadata to the builder.

Scope: crates/iceberg/src/arrow/reader.rs — new ParquetMetadataCache struct shared across tasks via ArrowReader::read().

Note: The benefit of this optimization depends on how the external planner groups tasks. In Spark's default configuration, bin-packing rarely places multiple splits of the same file in the same partition, so cache hits may be infrequent. However, when iceberg-rust plans its own tasks (whole-file), or when external planners explicitly coalesce same-file splits, this provides a measurable reduction in metadata I/O. We tried this already and it didn't make a huge difference on our workload with many small files (#2100).

6. Range coaleasing in reads

Problem: We're seeing bad TPC-H SF1000 performance when doing large table scans. I am concerned it's because we don't coalesce ranges.

Fix: Adopt object_store's range coalescing to reduce S3 requests.

Scope: crates/iceberg/src/arrow/reader.rs — new methods to coalesce byte ranges with reasonable defaults.

Commentary

2, 3, 4, and seem the least controversial to me. I will start with those first. 1 there was some discussion in Iceberg Slack about implementing after all the Storage trait work is done (@CTTY). 5 I added the note that this might not be very effective, and maybe not worth the complication. I will measure it independently and see how it does.

Willingness to contribute

I can contribute to this feature independently

Metadata

Metadata

Assignees

Labels

epicEpic issue

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions