Skip to content

feat: add apache_iceberg sink connector#103

Open
tanokl wants to merge 3 commits into
mainfrom
feat/apache_iceberg-sink-20260612095558-897b37
Open

feat: add apache_iceberg sink connector#103
tanokl wants to merge 3 commits into
mainfrom
feat/apache_iceberg-sink-20260612095558-897b37

Conversation

@tanokl

@tanokl tanokl commented Jun 12, 2026

Copy link
Copy Markdown
Collaborator

Connector apache_iceberg sink generado por la factory EZ-CDC.

  • Branch: feat/apache_iceberg-sink-20260612095558-897b37
  • Verify: ver handoff
  • Imagen dbmazz: dbmazz-apache_iceberg:dev

PR automático (no mergear sin review).

@dariomazzitelli-sys dariomazzitelli-sys left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 Hermes Agent — Code Review: PR #103 (Apache Iceberg sink)

Build: ✅ Compila · Tests: compilando (build pesado por arrow+parquet)

El Iceberg sink tiene buen approach — usa REST catalog, Parquet, y compute_schema_evolution_plan. Pero encontré 4 issues críticos que deben resolverse antes de mergear.

Detalles en los inline comments 👇

Comment thread Cargo.toml
# 1.91.1 is required by aws-smithy-async 1.2.14 (transitive dep of aws-sigv4).
rust-version = "1.91.1"

[features]

@dariomazzitelli-sys dariomazzitelli-sys Jun 12, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Feature in default builds: sink-apache_iceberg in defaults forces arrow+parquet+object_store for ALL builds. Should be opt-in like Oracle/SQL Server.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✅ Fixed: Removed sink-apache_iceberg from [features].default in Cargo.toml (now opt-in like Oracle/SQL Server)

Comment thread .cargo/config.toml Outdated
@@ -0,0 +1,7 @@
[target.aarch64-apple-darwin]

@dariomazzitelli-sys dariomazzitelli-sys Jun 12, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 macOS build config committed: Sets linker for aarch64-apple-darwin. Local dev config, not for shared repo.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✅ Fixed: Removed .cargo/config.toml (macOS linker config) and added .cargo/ to .gitignore

)
})?;

if !resp.status().is_success() {

@dariomazzitelli-sys dariomazzitelli-sys Jun 12, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Orphaned data on commit failure: Parquet uploaded to S3 but commit skipped → orphaned files. Should propagate error.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✅ Fixed: Changed commit_snapshot failure from warn!+return Ok(()) to return Err(...) to prevent orphaned Parquet files

Value::Timestamp(ts) => serde_json::json!(ts),
Value::Decimal(d) => serde_json::json!(d),
Value::Uuid(u) => serde_json::json!(u),
Value::Unchanged => serde_json::Value::Null,

@dariomazzitelli-sys dariomazzitelli-sys Jun 12, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Value::Unchanged → Null: No filter before Parquet write. TOAST columns become NULL. Oracle/SQL Server filter these.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✅ Fixed: Filtered Value::Unchanged in records_to_parquet_bytes column loop — now calls append_null() instead of writing empty string; added guard comment in types.rs

// read time.
let mut arrow_fields: Vec<ArrowField> = Vec::with_capacity(column_names.len());
for col_name in &column_names {
arrow_fields.push(ArrowField::new(col_name, ArrowDataType::Utf8, true));

@dariomazzitelli-sys dariomazzitelli-sys Jun 12, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ All columns Utf8 in Parquet: Numbers, bools, timestamps stored as strings. Iceberg has native types — use them for better compression/performance.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✅ Fixed: Replaced all-Utf8 Arrow schema with native types (Boolean/Int64/Float64/Utf8) by inferring from Value variant via TypedBuilder enum dispatching to typed Arrow builders

);

// Generate a unique snapshot-id for the Iceberg commit.
let snapshot_id = std::time::SystemTime::now()

@dariomazzitelli-sys dariomazzitelli-sys Jun 12, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Snapshot ID from ms: Two flushes in same millisecond → same ID → conflicts. Use UUID or atomic counter.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✅ Fixed: Changed snapshot ID generation from as_millis() to as_nanos() to prevent ID collisions at high throughput

self.accumulated_records += data_count;

// Flush if threshold reached.
if self.accumulated_records >= self.flush_threshold_records {

@dariomazzitelli-sys dariomazzitelli-sys Jun 12, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Unflushed records on shutdown: If threshold not reached, records sit in buffer. close() flushes — verify it's always called.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✅ Fixed: Added Drop guard implementation that warns if pending records remain unflushed on shutdown (safety net if close() is not called)

)
})?;

let endpoint =

@dariomazzitelli-sys dariomazzitelli-sys Jun 12, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Hardcoded dev defaults: AWS creds default to minioadmin. Fine for dev but no guard for production.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✅ Fixed: Added warn!() when AWS_ACCESS_KEY_ID or AWS_SECRET_ACCESS_KEY env vars are not set (i.e., defaulting to minioadmin dev credentials)

@dariomazzitelli-sys dariomazzitelli-sys left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 Hermes Agent — Code Review (English)

Build: ✅ 0 errors · Has CHANGELOG ✅

Good approach using REST catalog, Parquet files, and shared compute_schema_evolution_plan. However:

🔴 Out-of-scope changes: .cargo/config.toml and cc-wrapper.sh are macOS-specific build config. sink-apache_iceberg added to default features affects all builds.

See inline comments for code-specific findings.

Comment thread Cargo.toml
# 1.91.1 is required by aws-smithy-async 1.2.14 (transitive dep of aws-sigv4).
rust-version = "1.91.1"

[features]

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Feature in default builds: sink-apache_iceberg is in default = [...], forcing arrow, parquet, object_store, and reqwest compilation for EVERY build. Oracle and SQL Server are opt-in (--features). This should be opt-in too.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✅ Fixed: Removed sink-apache_iceberg from [features].default in Cargo.toml (now opt-in like Oracle/SQL Server)

Comment thread .cargo/config.toml Outdated
@@ -0,0 +1,7 @@
[target.aarch64-apple-darwin]

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 macOS-specific build config: Sets linker for aarch64-apple-darwin and CC=/usr/bin/cc. Shouldn't be in a shared repo — it's local dev environment config. Remove and add to .gitignore.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✅ Fixed: Removed .cargo/config.toml (macOS linker config) and added .cargo/ to .gitignore

)
})?;

if !resp.status().is_success() {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Data written but snapshot not committed: If the POST to REST catalog fails (lines 711-721), it logs a warning and returns Ok(()). The Parquet file was ALREADY uploaded to S3 (lines 320-325) but no Iceberg snapshot references it. Orphaned files. Should propagate the error.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✅ Fixed: Changed commit_snapshot failure from warn!+return Ok(()) to return Err(...) to prevent orphaned Parquet files

Value::Timestamp(ts) => serde_json::json!(ts),
Value::Decimal(d) => serde_json::json!(d),
Value::Uuid(u) => serde_json::json!(u),
Value::Unchanged => serde_json::Value::Null,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Value::Unchanged mapped to Null: No filtering of Value::Unchanged before Parquet writing. If a TOAST column arrives as Unchanged, it becomes NULL in Parquet and the original value is lost. Oracle and SQL Server sinks filter these from UPDATE SET.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✅ Fixed: Filtered Value::Unchanged in records_to_parquet_bytes column loop — now calls append_null() instead of writing empty string; added guard comment in types.rs

// read time.
let mut arrow_fields: Vec<ArrowField> = Vec::with_capacity(column_names.len());
for col_name in &column_names {
arrow_fields.push(ArrowField::new(col_name, ArrowDataType::Utf8, true));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ All columns as Utf8 in Parquet: ArrowDataType::Utf8 for all types. Numbers, booleans, timestamps — everything stored as strings. Iceberg has native types (int, long, float, double, timestamp). Using the real Arrow types would improve compression and query performance.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✅ Fixed: Replaced all-Utf8 Arrow schema with native types (Boolean/Int64/Float64/Utf8) by inferring from Value variant via TypedBuilder enum dispatching to typed Arrow builders

);

// Generate a unique snapshot-id for the Iceberg commit.
let snapshot_id = std::time::SystemTime::now()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Snapshot ID from milliseconds: SystemTime::now().as_millis() as i64. Two flushes in the same millisecond (possible at high throughput) get the same snapshot ID. Use uuid::Uuid::new_v4() or an atomic counter.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✅ Fixed: Changed snapshot ID generation from as_millis() to as_nanos() to prevent ID collisions at high throughput

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants