Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions fasten-core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,14 @@ pub enum Error {
/// `fasten_drainer_enqueue` called with no drainer installed.
#[error("no drainer installed; call fasten_drainer_install first")]
DrainerNotInstalled,

/// A host-language insert callback (installed via
/// `fasten_store_open_callback`) returned a non-zero rc. The actual
/// exception lives on the host side; this variant carries only the
/// rc so log lines aren't misattributed (issue #36 — previously
/// reported as `InvalidTableName("insert callback rc=1")`).
#[error("insert callback returned non-zero rc={0}")]
CallbackFailed(i32),
}

// When neither sqlite nor postgres feature is active, the #[from] impls above
Expand Down Expand Up @@ -107,6 +115,7 @@ impl From<&Error> for FastenErrorCode {
Error::DomainMismatch { .. } => FastenErrorCode::ErrDomainMismatch,
Error::DuplicateCode(_) => FastenErrorCode::ErrDuplicateCode,
Error::DrainerNotInstalled => FastenErrorCode::ErrNullArg,
Error::CallbackFailed(_) => FastenErrorCode::ErrBackend,
#[cfg(feature = "sqlite")]
Error::Sqlite(_) => FastenErrorCode::ErrBackend,
#[cfg(feature = "postgres")]
Expand Down
57 changes: 56 additions & 1 deletion fasten-core/src/ffi/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ impl Store for CallbackStore {
let json = serde_json::to_string(row)?;
let cs = CString::new(json).map_err(|_| Error::NullArg)?;
let rc = unsafe { (self.fn_ptr)(cs.as_ptr(), self.ud.0) };
if rc != 0 { Err(Error::InvalidTableName(format!("insert callback rc={rc}"))) }
if rc != 0 { Err(Error::CallbackFailed(rc)) }
else { Ok(()) }
}
fn ping(&self) -> Result<(), Error> { Ok(()) }
Expand Down Expand Up @@ -461,3 +461,58 @@ pub extern "C" fn fasten_store_version() -> *const c_char {
})
.as_ptr()
}

#[cfg(test)]
mod tests {
use super::*;
use crate::row::Row;
use crate::store::Store;

/// Regression for issue #36 — Bug 2: when a host-language insert
/// callback returns non-zero, CallbackStore must report
/// `CallbackFailed(rc)`, not `InvalidTableName("insert callback rc=N")`.
/// The old wording ran an SQL-identifier validation regex against
/// what was actually a free-form error message, masking the real
/// host-side exception (compounded with Bug 1 — see python's
/// _insert_cb test for the round-trip half).
#[test]
fn callback_store_failure_reports_callback_failed_variant() {
// Returns a non-zero rc to simulate a host-side callback that
// caught its own exception and surfaced an error code.
extern "C" fn failing_cb(
_json: *const c_char,
_ud: *mut std::ffi::c_void,
) -> i32 {
7 // arbitrary non-zero
}
let store = CallbackStore {
fn_ptr: failing_cb,
ud: SendRawPtr(std::ptr::null_mut()),
};
let row = Row::default();
let err = store.insert(&row).expect_err("non-zero rc must error");
assert!(
matches!(err, Error::CallbackFailed(7)),
"expected CallbackFailed(7); got {err:?} \
(formatted: {err})",
);
// And the human-readable form is what an adopter actually sees
// in their `audit_drain_failed` log line — no "table name" noise.
assert_eq!(err.to_string(), "insert callback returned non-zero rc=7");
}

#[test]
fn callback_store_success_returns_ok() {
extern "C" fn ok_cb(
_json: *const c_char,
_ud: *mut std::ffi::c_void,
) -> i32 {
0
}
let store = CallbackStore {
fn_ptr: ok_cb,
ud: SendRawPtr(std::ptr::null_mut()),
};
assert!(store.insert(&Row::default()).is_ok());
}
}
16 changes: 15 additions & 1 deletion python/fasten/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -505,13 +505,27 @@ def _install_drainer(
max_attempts: int,
) -> None:
# Build an insert callback that delegates to the Python store.
# The row crosses Python → JSON → Rust queue → Python here, so
# `timestamp` / `shipped_at` arrive as ISO strings and have to
# be parsed back to `datetime` before the store's `_utc_iso`
# tries to read `.tzinfo` (issue #36).
cb_logger = logging.getLogger("fasten")

def _insert_cb(row_json: bytes, _userdata: int) -> int:
row_dict: dict[str, Any] = {}
try:
row_dict = json.loads(row_json.decode("utf-8"))
for fld in ("timestamp", "shipped_at"):
v = row_dict.get(fld)
if isinstance(v, str):
row_dict[fld] = datetime.fromisoformat(v.replace("Z", "+00:00"))
row = AuditRow(**{k: row_dict[k] for k in AuditRow.__dataclass_fields__ if k in row_dict})
store.insert(row)
return 0
except Exception: # noqa: BLE001
except Exception: # noqa: BLE001 — store / drainer reports the dead-letter outcome
cb_logger.exception(
"insert_cb failed for row %s", row_dict.get("id", "<unparsed>"),
)
return 1

cb = _ffi.InsertCallbackFn(_insert_cb)
Expand Down
56 changes: 56 additions & 0 deletions python/tests/test_engine_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,62 @@ def test_log_falls_back_to_stdlib_before_init(caplog):
assert any("startup_probe" in r.message for r in caplog.records)


# ── issue #36: datetime round-trip through the drainer ───────────────────


def test_issue_36_drainer_reconstructs_datetime_from_json_roundtrip():
"""Regression for #36.

fasten.emit -> JSON -> Rust queue -> Python _insert_cb is the
audit row's round-trip through the drainer. After json.loads(),
``timestamp`` arrives as an ISO string, not a ``datetime``. Any
store implementation that calls ``_utc_iso(row.timestamp)`` (both
PostgresStore and SQLiteStore do) then crashes on
``AttributeError: 'str' has no attribute 'tzinfo'`` — and because
``_insert_cb`` swallows exceptions and returns 1, the row is
silently dead-lettered after retry-exhaustion.

The fix reconstructs the two datetime fields in ``_insert_cb``
before constructing ``AuditRow``. This test exercises the
queue-mode path end-to-end and asserts the store receives the
row with a real datetime instance (not a str).
"""
from datetime import datetime as _datetime

received: list = []

class _CapturingStore:
def insert(self, row) -> None:
received.append(row)
def count(self, **_) -> int:
return len(received)
def max_monotonic_seq(self) -> int:
return 0

fasten.init(
service_id="svc", node_id="n",
audit_store=_CapturingStore(),
audit_store_failure_strategy="queue",
)
fasten.emit(code="USER_CREATED", target="u-1")
assert fasten.flush(timeout=2.0), "flush should drain the row"

assert len(received) == 1, (
"row should land via the drainer's _insert_cb; if zero, the "
"drainer dead-lettered the row after exception (the #36 bug)"
)
row = received[0]
assert isinstance(row.timestamp, _datetime), (
f"row.timestamp must be a datetime after the drainer round-trip "
f"(got {type(row.timestamp).__name__!r}); downstream "
"_utc_iso(row.timestamp) reads .tzinfo and crashes on a str"
)
assert row.timestamp.tzinfo is not None, (
"reconstructed timestamp must be tz-aware so PostgresStore / "
"SQLiteStore can serialise it without a naive-datetime detour"
)


# ── sync fallback with no drainer ─────────────────────────────────────────


Expand Down
Loading