Skip to content

feat: add S3/Iceberg sink (append-only changelog)#100

Open
dariomazzitellireplik-coder wants to merge 2 commits into
mainfrom
feat/iceberg-sink
Open

feat: add S3/Iceberg sink (append-only changelog)#100
dariomazzitellireplik-coder wants to merge 2 commits into
mainfrom
feat/iceberg-sink

Conversation

@dariomazzitellireplik-coder

Copy link
Copy Markdown
Collaborator

Summary

New IcebergSink behind the opt-in cargo feature sink-iceberg (not in default builds). Streams CDC into Apache Iceberg tables on S3-compatible stores via an Iceberg REST catalog (Lakekeeper, Polaris, Nessie, Tabular, Glue REST endpoint).

Replaces the abandoned feat/s3-iceberg-sink branch, rebuilt from scratch after review found its write path unsalvageable (no-op catalog, deferred commits past LSN confirmation, deletes written as live rows, Parquet without Iceberg field-ids).

Design

  • Append-only changelog: every INSERT/UPDATE/DELETE becomes one row with the source columns plus _cdc_op / _cdc_position / _cdc_ts metadata columns. Consumers materialize current state (dedup by PK + max _cdc_position, drop D).
  • Durability before confirmation: each write_batch() writes Parquet via iceberg-rust's DataFileWriter (spec field-ids + column stats) and commits a FastAppend snapshot per touched table before returning — the pipeline's LSN confirmation never outruns durable data. No staging window; uncommitted files from a crash are simply unreferenced.
  • Strict, exhaustive type mapping: UInt64decimal(20,0) (full range, no wrap), NUMERICdecimal(p,s), UUIDuuid, temporal types to their Iceberg counterparts. Text-encoded values (snapshot path) are parsed strictly; unparseable/mismatched values fail the batch — never coerced to defaults.
  • Honest capabilities: supports_upsert: false, supports_schema_evolution: false (iceberg-rust 0.9 transactions cannot update schemas — SchemaChange events feed the existing schema_evolution_skipped metric), optimal_flush_interval_ms: 60s to bound snapshot count.
  • Tables auto-created (V2, unpartitioned, flat schema__table naming inside the SINK_DATABASE namespace); existing tables validated for column compatibility.
  • REST catalog wrapper avoids the spec's HEAD endpoints (some servers, e.g. tabulario/iceberg-rest, reject them with 400) and handles this client version's 404 error mapping.

Also in this PR

  • arrow/parquet 53 → 57 (required by iceberg 0.9). The Snowflake sink — the other arrow consumer — compiles and passes its test suite against v57.
  • docs/configuration.md: Iceberg env var reference + operator requirements (snapshot expiry/compaction is operator-owned).
  • Per-sink README: src/connectors/sinks/iceberg/README.md.
  • Version 2.5.0 → 2.6.0, CHANGELOG under [Unreleased].

Verification

  • cargo test --features sink-iceberg: 183 passed (includes Snowflake regression on arrow 57).
  • cargo check on default, --features sink-iceberg, and --no-default-features --features 'sink-postgres sink-iceberg' — default build never references iceberg crates.
  • cargo fmt --check + clippy --all-targets -- -D warnings clean.
  • End-to-end against MinIO + tabulario/iceberg-rest + PG16: initial snapshot (3 rows) + live INSERT/UPDATE/DELETE each committed as Iceberg snapshots; read back exactly with pyiceberg 0.11 (field-ids, decimals, UUIDs, NULLs, timestamptz, op ordering by _cdc_position).
  • Crash semantics: SIGKILL + DML while down + restart → resumed from checkpoint, zero loss, only at-least-once duplicate I rows from the engine's re-snapshot; materialized state matched the source exactly.

v1 non-goals (documented)

Merge-on-read/equality deletes, schema evolution auto-apply, partitioned tables, non-REST catalogs, table maintenance (expiry/compaction).

New IcebergSink behind the opt-in cargo feature sink-iceberg (not in
default builds). Streams CDC into Apache Iceberg tables on S3 via a
REST catalog (Lakekeeper, Polaris, Nessie, Tabular, Glue REST).

Model: append-only changelog — every INSERT/UPDATE/DELETE becomes one
row carrying the source columns plus _cdc_op / _cdc_position / _cdc_ts
metadata columns. Consumers materialize current state downstream.

Durability: each write_batch() writes Parquet through iceberg-rust's
DataFileWriter (spec field-ids + stats) and commits a FastAppend
snapshot per touched table before returning, so the pipeline's LSN
confirmation never outruns durable data. No staging window; commit
retries with table refresh on transient catalog failures.

Type mapping is exhaustive and strict: UInt64 -> decimal(20,0),
NUMERIC -> decimal(p,s), UUID -> uuid, etc. Text-encoded values from
the snapshot path are parsed strictly; unparseable or mismatched
values fail the batch instead of being coerced to defaults. TOAST
Unchanged becomes NULL with a WARN counter.

Schema evolution is not auto-applied (iceberg-rust 0.9 transactions
cannot update schemas): SchemaChange events increment the existing
schema_evolution_skipped metric; capabilities report it honestly.

Also bumps arrow/parquet 53 -> 57 (required by iceberg 0.9); the
Snowflake sink passes its suite against v57.

Verified end-to-end against MinIO + tabulario/iceberg-rest + PG16:
snapshot + live I/U/D committed and read back exactly via pyiceberg,
including crash/restart recovery with zero loss.
iceberg-catalog-rest 0.9.1 requires rustc 1.92; default-feature builds
still compile on 1.91.1.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants