Add Apache Iceberg format support#8148
Conversation
c567422 to
62ee472
Compare
lhoestq
left a comment
There was a problem hiding this comment.
Awesome ! I just have one comment:
| splits.append( | ||
| datasets.SplitGenerator( | ||
| name=split_name, | ||
| gen_kwargs={"scan": scan}, |
There was a problem hiding this comment.
Do you think we can have a list here instead ? This would enable parallel processing/streaming
e.g. one scan object per file maybe
There was a problem hiding this comment.
Thanks for the suggestion! I've refactored the implementation to support num_proc > 1 parallel processing:
Changes:
- List-based gen_kwargs: _split_generators now passes tasks = list(scan.plan_files()) as a list in gen_kwargs, which allows _split_gen_kwargs to distribute FileScanTask objects across
workers automatically. - Picklable scan_context: Instead of passing the unpicklable scan object, I extract a tuple of individually-serializable components (table_metadata, io, projected_schema, row_filter,
case_sensitive, limit) that can reconstruct an ArrowScan reader in each worker. - Drop catalog after use: At the end of _split_generators, both self.config.catalog and self.config_kwargs["catalog"] are set to None so the builder itself can be pickled when sent to
child processes. The catalog is no longer needed after planning — all reading state lives in scan_context. - Per-task reading: _generate_tables now iterates over its assigned tasks list and uses ArrowScan.to_record_batches([task]) for each one, yielding Key(task_idx, batch_idx) for proper
shard-level parallelism.
There was a problem hiding this comment.
@lhoestq
Thanks for the review! I’ve addressed this comment in the latest commit.
Could you please take another look when you have a chance?
|
The docs for this PR live here. All of your documentation changes will be reflected on that endpoint. The docs are available until 30 days after the last update. |
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
lhoestq
left a comment
There was a problem hiding this comment.
Great ! I have one last comment before we merge:
| from pyiceberg.catalog.sql import SqlCatalog | ||
| from pyiceberg.schema import Schema | ||
| from pyiceberg.types import DoubleType, FloatType, ListType, LongType, NestedField, StringType |
There was a problem hiding this comment.
can you move the imports inside the test functions and decorate them with a @require_pyiceberg ? this way people can run the test suite even if they don't have all the test dependencies
There was a problem hiding this comment.
Thanks, and I have added @require_pyiceberg and @require_not_windows on iceberg test cases, since iceberg tests does not support on window.
Add Apache Iceberg format support
Motivation
Apache Iceberg is the most widely adopted open table format for data lakes, supported by Databricks,
Snowflake, AWS Glue, Dremio, and others. A large amount of ML training data lives in Iceberg tables.
Currently, users must manually export Iceberg data to Parquet before loading it into HuggingFace Datasets —
this PR removes that friction.
fix this (#7863)
Usage
Users pass a pre-configured pyiceberg Catalog object and a table identifier:
from pyiceberg.catalog.sql import SqlCatalog
from datasets import load_dataset
catalog = SqlCatalog("my_catalog", uri="sqlite:///catalog.db", warehouse="/tmp/warehouse")
Basic loading
ds = load_dataset("iceberg", catalog=catalog, table="db.my_table")
Column selection + row filtering (predicate pushdown)
ds = load_dataset("iceberg", catalog=catalog, table="db.my_table",
columns=["text", "label"],
filters=[("label", ">", 0)])
Multiple splits from different tables
ds = load_dataset("iceberg", catalog=catalog,
table={"train": "db.train", "test": "db.test"})
Time travel via snapshot_id
ds = load_dataset("iceberg", catalog=catalog, table="db.my_table",
snapshot_id=7051729674881785648)
Streaming
ds = load_dataset("iceberg", catalog=catalog, table="db.my_table", streaming=True)
Works with any pyiceberg-supported catalog backend (REST, Hive, Glue, SQL, etc.) — the builder is agnostic
to how the catalog is configured.
Design decisions
backends (REST, Hive, Glue, SQL each have different auth/connection params). Rather than re-implementing a
"catalog factory" inside the builder, users bring their own catalog — similar to how the sql builder accepts
an existing SQLAlchemy connection. This keeps the builder simple and forward-compatible with new catalog
types.
addressed via catalog + table identifier, not file extensions. Users must specify "iceberg" explicitly as
the path argument.
pools, etc.) are not picklable by dill. The override replaces the catalog with a stable string
representation ("{ClassName}_{name}") before hashing.
reading data files.