Skip to content

feat: add oracle sink connector#101

Open
tanokl wants to merge 4 commits into
mainfrom
feat/oracle-sink-20260607011546-ccbd5f
Open

feat: add oracle sink connector#101
tanokl wants to merge 4 commits into
mainfrom
feat/oracle-sink-20260607011546-ccbd5f

Conversation

@tanokl

@tanokl tanokl commented Jun 10, 2026

Copy link
Copy Markdown
Collaborator

Connector oracle sink generado por la factory EZ-CDC.

  • Branch: feat/oracle-sink-20260607011546-ccbd5f
  • Verify: ver handoff
  • Imagen dbmazz: dbmazz-oracle:dev

PR automático (no mergear sin review).

Nahuel Mazzitelli added 2 commits June 10, 2026 17:14
- Add oracle/README.md with connector documentation (types, config, architecture)
- Add Oracle to dbmazz README.md support matrix
- Add Oracle to docs/configuration.md (SINK_TYPE, SINK_URL, requirements)
- Add Oracle to docs/architecture.md (module map)

@dariomazzitelli-sys dariomazzitelli-sys left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 Hermes Agent — Code Review

¡Hola! Soy Hermes, el agente de Darío. Encontré lo siguiente revisando el PR #101.

19 tests pasan ✅ · Compilación OK con 4 warnings · 0 errores

En general el Oracle sink está bien hecho — buena arquitectura con spawn_blocking, quoting de identifiers, TOAST handling, y feature-gating. Pero hay 6 issues (1 de proceso + 5 de código), warnings, y varias inconsistencias en README que conviene resolver antes de mergear.

Los detalles están en los inline comments abajo 👇

/// Module-level cache of source table schemas.
/// Set once by the main engine's `setup()` call, read by ALL sink instances
/// (including snapshot workers created via `sink_factory`).
static SOURCE_SCHEMAS: OnceLock<Vec<SourceTableSchema>> = OnceLock::new();

@dariomazzitelli-sys dariomazzitelli-sys Jun 10, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Global mutable singleton: OnceLock<Vec<SourceTableSchema>> as global state. setup() uses .set(...).ok() — silently ignores failure on second call. With snapshot workers cloning the sink, all instances share global state and a second setup() won't update schemas. Prefer Arc<RwLock<...>>.

}

/// Connect to Oracle synchronously.
fn connect_sync(url: &str, user: &str, password: &str) -> Result<oracle::Connection> {

@dariomazzitelli-sys dariomazzitelli-sys Jun 10, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Dead code: connect_sync() is defined but never called. Connection is inline in validate_connection() and write_batch().

}

/// Look up a source table schema by table name (from the global cache).
fn find_schema(table_name: &str) -> Option<SourceTableSchema> {

@dariomazzitelli-sys dariomazzitelli-sys Jun 10, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Dead code: find_schema() is defined but never called. Schema lookup is inline in write_batch().

}

/// Group CDC records by table name.
fn group_by_table(records: &[CdcRecord]) -> HashMap<String, Vec<&CdcRecord>> {

@dariomazzitelli-sys dariomazzitelli-sys Jun 10, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Dead code: group_by_table() defined but never called from production — only in tests.

/// Job name for metadata tracking
job_name: String,
/// Database name (for logging)
#[allow(dead_code)]

@dariomazzitelli-sys dariomazzitelli-sys Jun 10, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Dead code (field): database is constructed in new() but never read after. Remove if not needed.

Value::Uuid(u) => {
format!("'{}'", u)
}
Value::Unchanged => "NULL".to_string(),

@dariomazzitelli-sys dariomazzitelli-sys Jun 10, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Value::Unchanged"NULL": If a TOAST column isn't filtered from UPDATE SET, the existing value is silently overwritten with NULL. Should bail!() or unreachable!() — never silent NULL.

use crate::core::traits::SourceTableSchema;

/// In-memory schema cache: qualified table name → `SourceTableSchema`.
pub type SchemaState = Arc<HashMap<String, SourceTableSchema>>;

@dariomazzitelli-sys dariomazzitelli-sys Jun 10, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Entire file is dead code: SchemaState and initialize_state() never used. Remove or connect to shared schema evolution.

cnt NUMBER;
BEGIN
SELECT COUNT(*) INTO cnt FROM all_tables
WHERE owner = '{target_schema}' AND table_name = '_DBMAZZ_METADATA';

@dariomazzitelli-sys dariomazzitelli-sys Jun 10, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ SQL interpolation: WHERE owner = '{target_schema}' uses string interpolation. Inconsistent with quote_ident() elsewhere.


| Column | Type | Description |
|--------|------|-------------|
| `_DBMAZZ_SYNCED_AT` | `TIMESTAMP(6)` | Timestamp when record was synced (default: `SYSTIMESTAMP`) |

@dariomazzitelli-sys dariomazzitelli-sys Jun 10, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 README out of sync: _DBMAZZ_SYNCED_AT (with underscore) vs DBMAZZ_SYNCED_AT (code, without). Code creates 4 audit cols, README shows 2.

| `Int64` | `NUMBER(19)` |
| `Float32` | `BINARY_FLOAT` |
| `Float64` | `BINARY_DOUBLE` |
| `Decimal` | `NUMBER(p,s)` |

@dariomazzitelli-sys dariomazzitelli-sys Jun 10, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 README type mapping out of sync: Says VARCHAR2(4000) and NUMBER(p,s), code returns CLOB and NUMBER.

@dariomazzitelli-sys dariomazzitelli-sys left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 Hermes Agent — Code Review (English)

Build: ✅ 0 errors · Tests: ✅ 19/19 pass · 4 dead-code warnings

Overall the Oracle sink is well structured — spawn_blocking, proper identifier quoting, TOAST handling, feature-gating. See inline comments for details.

/// Module-level cache of source table schemas.
/// Set once by the main engine's `setup()` call, read by ALL sink instances
/// (including snapshot workers created via `sink_factory`).
static SOURCE_SCHEMAS: OnceLock<Vec<SourceTableSchema>> = OnceLock::new();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Global mutable singleton: OnceLock<Vec<SourceTableSchema>> as global state. setup() at line 281 uses .set(...).ok()silently ignores a second call. With snapshot workers cloning the sink via sink_factory, all instances share the same global state. Prefer Arc<RwLock<...>> shared through OracleSink struct.

}

/// Connect to Oracle synchronously.
fn connect_sync(url: &str, user: &str, password: &str) -> Result<oracle::Connection> {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Dead code: connect_sync() is defined but never called. Connection is made inline in validate_connection() (line 251) and write_batch() (line 335).

}

/// Look up a source table schema by table name (from the global cache).
fn find_schema(table_name: &str) -> Option<SourceTableSchema> {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Dead code: find_schema() is defined but never called. Schema lookup is done inline in write_batch() closure (line 356).

}

/// Group CDC records by table name.
fn group_by_table(records: &[CdcRecord]) -> HashMap<String, Vec<&CdcRecord>> {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Dead code: group_by_table() defined but never called from production code. Only used in tests. Real grouping is inline in write_batch() closure (line 342).

// the CLI's default placeholder "APP"; otherwise default to the
// connected user.
let schema = if oracle_config.schema.is_empty()
|| oracle_config.schema.eq_ignore_ascii_case("APP")

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Fragile default schema: eq_ignore_ascii_case("APP") as signal for "no schema configured" is fragile — what if someone has a schema literally named "App"? Use an empty string or explicit flag instead.

Value::Uuid(u) => {
format!("'{}'", u)
}
Value::Unchanged => "NULL".to_string(),

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Value::Unchanged mapped to "NULL": mod.rs filters unchanged columns from UPDATE SET (line 371-374), but if a bug misses one, it silently overwrites with NULL. Should be anyhow::bail!() or unreachable!() — never silent NULL.

use crate::core::traits::SourceTableSchema;

/// In-memory schema cache: qualified table name → `SourceTableSchema`.
pub type SchemaState = Arc<HashMap<String, SourceTableSchema>>;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Entire file is dead code: SchemaState type alias and initialize_state() are never imported or used anywhere. Remove the file or connect it to the shared schema evolution module.

cnt NUMBER;
BEGIN
SELECT COUNT(*) INTO cnt FROM all_tables
WHERE owner = '{target_schema}' AND table_name = '_DBMAZZ_METADATA';

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ SQL injection surface: WHERE owner = '{target_schema}' uses string interpolation. Values come from env vars (not user input), but it's inconsistent with quote_ident() used elsewhere. Use bind params or at least consistent quoting.


| Column | Type | Description |
|--------|------|-------------|
| `_DBMAZZ_SYNCED_AT` | `TIMESTAMP(6)` | Timestamp when record was synced (default: `SYSTIMESTAMP`) |

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 README out of sync with code: Table says _DBMAZZ_SYNCED_AT (with underscore) but setup.rs:102 creates DBMAZZ_SYNCED_AT (no underscore). Code creates 4 audit columns but README documents only 2.

| `Int64` | `NUMBER(19)` |
| `Float32` | `BINARY_FLOAT` |
| `Float64` | `BINARY_DOUBLE` |
| `Decimal` | `NUMBER(p,s)` |

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 README type mapping out of sync: Table says VARCHAR2(4000) for String/Text and NUMBER(p,s) for Decimal, but code at types.rs:140-141 returns "CLOB" and "NUMBER" respectively.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants