From faed5748c5a5c9de1176674371f0e291fbeaeaf2 Mon Sep 17 00:00:00 2001 From: ashish-jabble Date: Tue, 26 May 2026 15:58:56 +0530 Subject: [PATCH 1/4] fix(python): parse timestamp/shipped_at back to datetime in _insert_cb (#36) Row crosses Python -> JSON -> Rust queue -> Python _insert_cb on its way through the drainer. After json.loads, timestamp/shipped_at are ISO strings; both PostgresStore and SQLiteStore call _utc_iso on them which reads dt.tzinfo and crashes with AttributeError on a str. The bare except Exception: return 1 then swallows the cause, the drainer dead-letters the row after retry-exhaustion, and the only on-disk record of the audit event is the stdout NDJSON. Reconstruct the two datetime fields from their ISO strings before constructing AuditRow. Replace the bare except with logger.exception so any future drainer-side failure surfaces in the 'fasten' logger instead of being silently masked. Tested: - New regression in tests/test_engine_init.py exercises the queue-mode drainer end-to-end and asserts the store receives a datetime, not a str. Fails on main, passes on this branch. - 229 of the existing python/tests pass on this branch (17 store_compat parametrized Postgres cases skip without FASTEN_AUDIT_DSN). - E2E verified against a real Postgres in a downstream adopter app: emit -> flush -> SELECT FROM fasten.audit_log returns the row. Signed-off-by: ashish-jabble --- python/fasten/engine.py | 16 ++++++++- python/tests/test_engine_init.py | 56 ++++++++++++++++++++++++++++++++ 2 files changed, 71 insertions(+), 1 deletion(-) diff --git a/python/fasten/engine.py b/python/fasten/engine.py index 577636e..0ec0cac 100644 --- a/python/fasten/engine.py +++ b/python/fasten/engine.py @@ -505,13 +505,27 @@ def _install_drainer( max_attempts: int, ) -> None: # Build an insert callback that delegates to the Python store. + # The row crosses Python → JSON → Rust queue → Python here, so + # `timestamp` / `shipped_at` arrive as ISO strings and have to + # be parsed back to `datetime` before the store's `_utc_iso` + # tries to read `.tzinfo` (issue #36). + cb_logger = logging.getLogger("fasten") + def _insert_cb(row_json: bytes, _userdata: int) -> int: + row_dict: dict[str, Any] = {} try: row_dict = json.loads(row_json.decode("utf-8")) + for fld in ("timestamp", "shipped_at"): + v = row_dict.get(fld) + if isinstance(v, str): + row_dict[fld] = datetime.fromisoformat(v.replace("Z", "+00:00")) row = AuditRow(**{k: row_dict[k] for k in AuditRow.__dataclass_fields__ if k in row_dict}) store.insert(row) return 0 - except Exception: # noqa: BLE001 + except Exception: # noqa: BLE001 — store / drainer reports the dead-letter outcome + cb_logger.exception( + "insert_cb failed for row %s", row_dict.get("id", ""), + ) return 1 cb = _ffi.InsertCallbackFn(_insert_cb) diff --git a/python/tests/test_engine_init.py b/python/tests/test_engine_init.py index 3b414fd..7ee9f1c 100644 --- a/python/tests/test_engine_init.py +++ b/python/tests/test_engine_init.py @@ -186,6 +186,62 @@ def test_log_falls_back_to_stdlib_before_init(caplog): assert any("startup_probe" in r.message for r in caplog.records) +# ── issue #36: datetime round-trip through the drainer ─────────────────── + + +def test_issue_36_drainer_reconstructs_datetime_from_json_roundtrip(): + """Regression for #36. + + fasten.emit -> JSON -> Rust queue -> Python _insert_cb is the + audit row's round-trip through the drainer. After json.loads(), + ``timestamp`` arrives as an ISO string, not a ``datetime``. Any + store implementation that calls ``_utc_iso(row.timestamp)`` (both + PostgresStore and SQLiteStore do) then crashes on + ``AttributeError: 'str' has no attribute 'tzinfo'`` — and because + ``_insert_cb`` swallows exceptions and returns 1, the row is + silently dead-lettered after retry-exhaustion. + + The fix reconstructs the two datetime fields in ``_insert_cb`` + before constructing ``AuditRow``. This test exercises the + queue-mode path end-to-end and asserts the store receives the + row with a real datetime instance (not a str). + """ + from datetime import datetime as _datetime + + received: list = [] + + class _CapturingStore: + def insert(self, row) -> None: + received.append(row) + def count(self, **_) -> int: + return len(received) + def max_monotonic_seq(self) -> int: + return 0 + + fasten.init( + service_id="svc", node_id="n", + audit_store=_CapturingStore(), + audit_store_failure_strategy="queue", + ) + fasten.emit(code="USER_CREATED", target="u-1") + assert fasten.flush(timeout=2.0), "flush should drain the row" + + assert len(received) == 1, ( + "row should land via the drainer's _insert_cb; if zero, the " + "drainer dead-lettered the row after exception (the #36 bug)" + ) + row = received[0] + assert isinstance(row.timestamp, _datetime), ( + f"row.timestamp must be a datetime after the drainer round-trip " + f"(got {type(row.timestamp).__name__!r}); downstream " + "_utc_iso(row.timestamp) reads .tzinfo and crashes on a str" + ) + assert row.timestamp.tzinfo is not None, ( + "reconstructed timestamp must be tz-aware so PostgresStore / " + "SQLiteStore can serialise it without a naive-datetime detour" + ) + + # ── sync fallback with no drainer ───────────────────────────────────────── From 088e4ec2ab9b653963bd5795620af506d7f1662c Mon Sep 17 00:00:00 2001 From: ashish-jabble Date: Tue, 26 May 2026 16:06:03 +0530 Subject: [PATCH 2/4] fix(core): report CallbackFailed instead of InvalidTableName for non-zero callback rc (#36) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a host-language insert callback (Python's _insert_cb, the JS / Go / Swift equivalents) returns a non-zero rc, the FFI shim was wrapping that rc into Error::InvalidTableName(format!("insert callback rc={rc}")). The drainer's audit_drain_failed log line then renders the wrong variant's message, so adopters see: invalid table name "insert callback rc=1": must match ^[A-Za-z_][A-Za-z0-9_]*(\.[A-Za-z_][A-Za-z0-9_]*)?$ — an SQL-identifier validation message run against the callback rc. That misled diagnosis of the actual issue (Python-side store failure) in #36 for ~30 min before the bare except in _insert_cb was unswallowed. New variant Error::CallbackFailed(i32) carries the rc unambiguously: insert callback returned non-zero rc=1 Maps to FastenErrorCode::ErrBackend (the host callback's destination is a store backend; the failure category is the same). Existing tests that use Error::InvalidTableName as a stand-in store failure in drainer_conformance.rs are unchanged — they assert drainer behaviour, not the variant kind, and changing them would obscure the narrow scope of this fix. Tests in fasten-core/src/ffi/mod.rs cover both branches: rc=0 → Ok, rc!=0 → CallbackFailed with the carried code. Signed-off-by: ashish-jabble --- fasten-core/src/error.rs | 9 ++++++ fasten-core/src/ffi/mod.rs | 57 +++++++++++++++++++++++++++++++++++++- 2 files changed, 65 insertions(+), 1 deletion(-) diff --git a/fasten-core/src/error.rs b/fasten-core/src/error.rs index 0234025..a853c1d 100644 --- a/fasten-core/src/error.rs +++ b/fasten-core/src/error.rs @@ -65,6 +65,14 @@ pub enum Error { /// `fasten_drainer_enqueue` called with no drainer installed. #[error("no drainer installed; call fasten_drainer_install first")] DrainerNotInstalled, + + /// A host-language insert callback (installed via + /// `fasten_store_open_callback`) returned a non-zero rc. The actual + /// exception lives on the host side; this variant carries only the + /// rc so log lines aren't misattributed (issue #36 — previously + /// reported as `InvalidTableName("insert callback rc=1")`). + #[error("insert callback returned non-zero rc={0}")] + CallbackFailed(i32), } // When neither sqlite nor postgres feature is active, the #[from] impls above @@ -107,6 +115,7 @@ impl From<&Error> for FastenErrorCode { Error::DomainMismatch { .. } => FastenErrorCode::ErrDomainMismatch, Error::DuplicateCode(_) => FastenErrorCode::ErrDuplicateCode, Error::DrainerNotInstalled => FastenErrorCode::ErrNullArg, + Error::CallbackFailed(_) => FastenErrorCode::ErrBackend, #[cfg(feature = "sqlite")] Error::Sqlite(_) => FastenErrorCode::ErrBackend, #[cfg(feature = "postgres")] diff --git a/fasten-core/src/ffi/mod.rs b/fasten-core/src/ffi/mod.rs index ff73e10..8d33469 100644 --- a/fasten-core/src/ffi/mod.rs +++ b/fasten-core/src/ffi/mod.rs @@ -395,7 +395,7 @@ impl Store for CallbackStore { let json = serde_json::to_string(row)?; let cs = CString::new(json).map_err(|_| Error::NullArg)?; let rc = unsafe { (self.fn_ptr)(cs.as_ptr(), self.ud.0) }; - if rc != 0 { Err(Error::InvalidTableName(format!("insert callback rc={rc}"))) } + if rc != 0 { Err(Error::CallbackFailed(rc)) } else { Ok(()) } } fn ping(&self) -> Result<(), Error> { Ok(()) } @@ -461,3 +461,58 @@ pub extern "C" fn fasten_store_version() -> *const c_char { }) .as_ptr() } + +#[cfg(test)] +mod tests { + use super::*; + use crate::row::Row; + use crate::store::Store; + + /// Regression for issue #36 — Bug 2: when a host-language insert + /// callback returns non-zero, CallbackStore must report + /// `CallbackFailed(rc)`, not `InvalidTableName("insert callback rc=N")`. + /// The old wording ran an SQL-identifier validation regex against + /// what was actually a free-form error message, masking the real + /// host-side exception (compounded with Bug 1 — see python's + /// _insert_cb test for the round-trip half). + #[test] + fn callback_store_failure_reports_callback_failed_variant() { + // Returns a non-zero rc to simulate a host-side callback that + // caught its own exception and surfaced an error code. + extern "C" fn failing_cb( + _json: *const c_char, + _ud: *mut std::ffi::c_void, + ) -> i32 { + 7 // arbitrary non-zero + } + let store = CallbackStore { + fn_ptr: failing_cb, + ud: SendRawPtr(std::ptr::null_mut()), + }; + let row = Row::default(); + let err = store.insert(&row).expect_err("non-zero rc must error"); + assert!( + matches!(err, Error::CallbackFailed(7)), + "expected CallbackFailed(7); got {err:?} \ + (formatted: {err})", + ); + // And the human-readable form is what an adopter actually sees + // in their `audit_drain_failed` log line — no "table name" noise. + assert_eq!(err.to_string(), "insert callback returned non-zero rc=7"); + } + + #[test] + fn callback_store_success_returns_ok() { + extern "C" fn ok_cb( + _json: *const c_char, + _ud: *mut std::ffi::c_void, + ) -> i32 { + 0 + } + let store = CallbackStore { + fn_ptr: ok_cb, + ud: SendRawPtr(std::ptr::null_mut()), + }; + assert!(store.insert(&Row::default()).is_ok()); + } +} From 62596541fc5d91a855ffb9977fbb72071bf58346 Mon Sep 17 00:00:00 2001 From: ashish-jabble Date: Tue, 26 May 2026 20:13:45 +0530 Subject: [PATCH 3/4] fix(python): drop redundant Z-suffix shim in _insert_cb MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit fasten emits timestamps via datetime.isoformat() (attrs.py:to_dict) which always produces '+00:00' format — never 'Z'. The shim only covered hosts that hand-craft Z-suffixed timestamps, which is outside fastens own contract. Drop it; fromisoformat handles '+HH:MM' natively on every supported Python (3.10+). Signed-off-by: ashish-jabble --- python/fasten/engine.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/fasten/engine.py b/python/fasten/engine.py index 0ec0cac..d75d47c 100644 --- a/python/fasten/engine.py +++ b/python/fasten/engine.py @@ -518,7 +518,7 @@ def _insert_cb(row_json: bytes, _userdata: int) -> int: for fld in ("timestamp", "shipped_at"): v = row_dict.get(fld) if isinstance(v, str): - row_dict[fld] = datetime.fromisoformat(v.replace("Z", "+00:00")) + row_dict[fld] = datetime.fromisoformat(v) row = AuditRow(**{k: row_dict[k] for k in AuditRow.__dataclass_fields__ if k in row_dict}) store.insert(row) return 0 From d03dd27e515ab1f19286a366ff31b11a617dedb3 Mon Sep 17 00:00:00 2001 From: ashish-jabble Date: Tue, 26 May 2026 20:16:36 +0530 Subject: [PATCH 4/4] Revert "fix(python): drop redundant Z-suffix shim in _insert_cb" This reverts commit 62596541fc5d91a855ffb9977fbb72071bf58346. Signed-off-by: ashish-jabble --- python/fasten/engine.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/fasten/engine.py b/python/fasten/engine.py index d75d47c..0ec0cac 100644 --- a/python/fasten/engine.py +++ b/python/fasten/engine.py @@ -518,7 +518,7 @@ def _insert_cb(row_json: bytes, _userdata: int) -> int: for fld in ("timestamp", "shipped_at"): v = row_dict.get(fld) if isinstance(v, str): - row_dict[fld] = datetime.fromisoformat(v) + row_dict[fld] = datetime.fromisoformat(v.replace("Z", "+00:00")) row = AuditRow(**{k: row_dict[k] for k in AuditRow.__dataclass_fields__ if k in row_dict}) store.insert(row) return 0