diff --git a/fasten-core/src/error.rs b/fasten-core/src/error.rs index 0234025..a853c1d 100644 --- a/fasten-core/src/error.rs +++ b/fasten-core/src/error.rs @@ -65,6 +65,14 @@ pub enum Error { /// `fasten_drainer_enqueue` called with no drainer installed. #[error("no drainer installed; call fasten_drainer_install first")] DrainerNotInstalled, + + /// A host-language insert callback (installed via + /// `fasten_store_open_callback`) returned a non-zero rc. The actual + /// exception lives on the host side; this variant carries only the + /// rc so log lines aren't misattributed (issue #36 — previously + /// reported as `InvalidTableName("insert callback rc=1")`). + #[error("insert callback returned non-zero rc={0}")] + CallbackFailed(i32), } // When neither sqlite nor postgres feature is active, the #[from] impls above @@ -107,6 +115,7 @@ impl From<&Error> for FastenErrorCode { Error::DomainMismatch { .. } => FastenErrorCode::ErrDomainMismatch, Error::DuplicateCode(_) => FastenErrorCode::ErrDuplicateCode, Error::DrainerNotInstalled => FastenErrorCode::ErrNullArg, + Error::CallbackFailed(_) => FastenErrorCode::ErrBackend, #[cfg(feature = "sqlite")] Error::Sqlite(_) => FastenErrorCode::ErrBackend, #[cfg(feature = "postgres")] diff --git a/fasten-core/src/ffi/mod.rs b/fasten-core/src/ffi/mod.rs index ff73e10..8d33469 100644 --- a/fasten-core/src/ffi/mod.rs +++ b/fasten-core/src/ffi/mod.rs @@ -395,7 +395,7 @@ impl Store for CallbackStore { let json = serde_json::to_string(row)?; let cs = CString::new(json).map_err(|_| Error::NullArg)?; let rc = unsafe { (self.fn_ptr)(cs.as_ptr(), self.ud.0) }; - if rc != 0 { Err(Error::InvalidTableName(format!("insert callback rc={rc}"))) } + if rc != 0 { Err(Error::CallbackFailed(rc)) } else { Ok(()) } } fn ping(&self) -> Result<(), Error> { Ok(()) } @@ -461,3 +461,58 @@ pub extern "C" fn fasten_store_version() -> *const c_char { }) .as_ptr() } + +#[cfg(test)] +mod tests { + use super::*; + use crate::row::Row; + use crate::store::Store; + + /// Regression for issue #36 — Bug 2: when a host-language insert + /// callback returns non-zero, CallbackStore must report + /// `CallbackFailed(rc)`, not `InvalidTableName("insert callback rc=N")`. + /// The old wording ran an SQL-identifier validation regex against + /// what was actually a free-form error message, masking the real + /// host-side exception (compounded with Bug 1 — see python's + /// _insert_cb test for the round-trip half). + #[test] + fn callback_store_failure_reports_callback_failed_variant() { + // Returns a non-zero rc to simulate a host-side callback that + // caught its own exception and surfaced an error code. + extern "C" fn failing_cb( + _json: *const c_char, + _ud: *mut std::ffi::c_void, + ) -> i32 { + 7 // arbitrary non-zero + } + let store = CallbackStore { + fn_ptr: failing_cb, + ud: SendRawPtr(std::ptr::null_mut()), + }; + let row = Row::default(); + let err = store.insert(&row).expect_err("non-zero rc must error"); + assert!( + matches!(err, Error::CallbackFailed(7)), + "expected CallbackFailed(7); got {err:?} \ + (formatted: {err})", + ); + // And the human-readable form is what an adopter actually sees + // in their `audit_drain_failed` log line — no "table name" noise. + assert_eq!(err.to_string(), "insert callback returned non-zero rc=7"); + } + + #[test] + fn callback_store_success_returns_ok() { + extern "C" fn ok_cb( + _json: *const c_char, + _ud: *mut std::ffi::c_void, + ) -> i32 { + 0 + } + let store = CallbackStore { + fn_ptr: ok_cb, + ud: SendRawPtr(std::ptr::null_mut()), + }; + assert!(store.insert(&Row::default()).is_ok()); + } +} diff --git a/python/fasten/engine.py b/python/fasten/engine.py index 577636e..0ec0cac 100644 --- a/python/fasten/engine.py +++ b/python/fasten/engine.py @@ -505,13 +505,27 @@ def _install_drainer( max_attempts: int, ) -> None: # Build an insert callback that delegates to the Python store. + # The row crosses Python → JSON → Rust queue → Python here, so + # `timestamp` / `shipped_at` arrive as ISO strings and have to + # be parsed back to `datetime` before the store's `_utc_iso` + # tries to read `.tzinfo` (issue #36). + cb_logger = logging.getLogger("fasten") + def _insert_cb(row_json: bytes, _userdata: int) -> int: + row_dict: dict[str, Any] = {} try: row_dict = json.loads(row_json.decode("utf-8")) + for fld in ("timestamp", "shipped_at"): + v = row_dict.get(fld) + if isinstance(v, str): + row_dict[fld] = datetime.fromisoformat(v.replace("Z", "+00:00")) row = AuditRow(**{k: row_dict[k] for k in AuditRow.__dataclass_fields__ if k in row_dict}) store.insert(row) return 0 - except Exception: # noqa: BLE001 + except Exception: # noqa: BLE001 — store / drainer reports the dead-letter outcome + cb_logger.exception( + "insert_cb failed for row %s", row_dict.get("id", ""), + ) return 1 cb = _ffi.InsertCallbackFn(_insert_cb) diff --git a/python/tests/test_engine_init.py b/python/tests/test_engine_init.py index 3b414fd..7ee9f1c 100644 --- a/python/tests/test_engine_init.py +++ b/python/tests/test_engine_init.py @@ -186,6 +186,62 @@ def test_log_falls_back_to_stdlib_before_init(caplog): assert any("startup_probe" in r.message for r in caplog.records) +# ── issue #36: datetime round-trip through the drainer ─────────────────── + + +def test_issue_36_drainer_reconstructs_datetime_from_json_roundtrip(): + """Regression for #36. + + fasten.emit -> JSON -> Rust queue -> Python _insert_cb is the + audit row's round-trip through the drainer. After json.loads(), + ``timestamp`` arrives as an ISO string, not a ``datetime``. Any + store implementation that calls ``_utc_iso(row.timestamp)`` (both + PostgresStore and SQLiteStore do) then crashes on + ``AttributeError: 'str' has no attribute 'tzinfo'`` — and because + ``_insert_cb`` swallows exceptions and returns 1, the row is + silently dead-lettered after retry-exhaustion. + + The fix reconstructs the two datetime fields in ``_insert_cb`` + before constructing ``AuditRow``. This test exercises the + queue-mode path end-to-end and asserts the store receives the + row with a real datetime instance (not a str). + """ + from datetime import datetime as _datetime + + received: list = [] + + class _CapturingStore: + def insert(self, row) -> None: + received.append(row) + def count(self, **_) -> int: + return len(received) + def max_monotonic_seq(self) -> int: + return 0 + + fasten.init( + service_id="svc", node_id="n", + audit_store=_CapturingStore(), + audit_store_failure_strategy="queue", + ) + fasten.emit(code="USER_CREATED", target="u-1") + assert fasten.flush(timeout=2.0), "flush should drain the row" + + assert len(received) == 1, ( + "row should land via the drainer's _insert_cb; if zero, the " + "drainer dead-lettered the row after exception (the #36 bug)" + ) + row = received[0] + assert isinstance(row.timestamp, _datetime), ( + f"row.timestamp must be a datetime after the drainer round-trip " + f"(got {type(row.timestamp).__name__!r}); downstream " + "_utc_iso(row.timestamp) reads .tzinfo and crashes on a str" + ) + assert row.timestamp.tzinfo is not None, ( + "reconstructed timestamp must be tz-aware so PostgresStore / " + "SQLiteStore can serialise it without a naive-datetime detour" + ) + + # ── sync fallback with no drainer ─────────────────────────────────────────