Unified observability module — single entry point for all telemetry (write + read).
Migration note:
moralstack.persistenceis now a deprecated alias. See Migration from persistence below.
moralstack/observability/
├── __init__.py # exposes: obs (lazy proxy → get_obs()), EventEnvelope, factory helpers
├── events.py # EventEnvelope dataclass + EVENT_* constants + make_envelope()
├── context.py # run_id / request_id / cycle contextvars
├── config.py # ObservabilityConfig: mode, db_path, jsonl_dir; env loader
├── service.py # ObservabilityService: emit(), emit_batch(), flush(), shutdown()
├── router.py # EventRouter: envelope → active sinks (db_only/dual/file_only)
├── read_store.py # ReadStore Protocol + SqliteReadStore
├── write_queue.py # ObservabilityWriteQueue (daemon thread, FIFO)
└── sinks/
├── base.py # EventSink Protocol
├── sqlite_sink.py # Schema SQLite + lifecycle writes + batch inserts
└── jsonl_sink.py # Per-event-type JSONL under logs/observability/
The obs export resolves each attribute access on the current get_obs() instance (tests and tooling may replace that singleton). Use get_obs() when you need the concrete ObservabilityService (for example isinstance checks).
| Module | Responsibility |
|---|---|
events.py |
EventEnvelope (frozen dataclass), 10 EVENT_* constants, make_envelope() factory |
service.py |
get_obs() thread-safe singleton ObservabilityService; emit() (async via queue), emit_batch(), flush(), shutdown(), read_store property |
router.py |
Reads mode from config, dispatches envelope to sqlite_sink and/or jsonl_sink |
sqlite_sink.py |
SQLite schema, SqliteUnitOfWork, all lifecycle writes (create_run, end_run, upsert_request, …) + batch insert helpers |
jsonl_sink.py |
Per-event-type JSONL under logs/observability/; thread-safe per-file locks |
read_store.py |
ReadStore Protocol + SqliteReadStore; unique read contract for UI and reports |
config.py |
Env var loader with backwards-compat fallback to legacy vars |
context.py |
ContextVars for run_id, request_id, cycle |
from moralstack.observability import obs
from moralstack.observability.events import EVENT_LLM_CALL, make_envelope
# Emit a telemetry event (non-blocking)
env = make_envelope(
EVENT_LLM_CALL,
run_id="my-run",
request_id="req-1",
cycle=1,
payload={"module": "risk_estimator", "phase": "risk_estimation", ...},
)
obs.emit(env)
# Flush all pending writes (e.g. before shutdown)
obs.flush(timeout=30.0)
# Read back
run = obs.read_store.get_run("my-run")
calls = obs.read_store.get_llm_calls_for_request("my-run", "req-1")@dataclass(frozen=True)
class EventEnvelope:
event_id: str # UUID4, auto-generated by make_envelope()
event_type: str # one of the EVENT_* constants
timestamp_ms: int # int(time.time() * 1000), auto-set
run_id: str | None
request_id: str | None
cycle: int | None
# Multi-turn foundation (mapped to existing DB columns; no schema change)
session_id: str | None # -> requests.conversation_id
turn_number: int | None # -> requests.turn_index
parent_event_id: str | None # -> requests.parent_request_id
audit_level: str # "turn" | "session" | "export"
payload: Mapping[str, Any] # event-specific data| Constant | String | Description |
|---|---|---|
EVENT_RUN_STARTED |
run.started |
Run lifecycle start |
EVENT_RUN_ENDED |
run.ended |
Run lifecycle end |
EVENT_REQUEST_UPSERTED |
request.upserted |
Request lifecycle |
EVENT_REQUEST_DOMAIN_UPDATED |
request.domain_updated |
Domain update |
EVENT_REQUEST_RESPONSE_UPDATED |
request.response_updated |
Response update |
EVENT_REQUEST_META_UPDATED |
request.meta_updated |
Per-request governance metadata merge (Step 13) |
EVENT_LLM_CALL |
llm.call |
LLM call telemetry |
EVENT_ORCHESTRATION_EVENT |
orchestration.event |
Orchestration lifecycle |
EVENT_DECISION_TRACE |
decision.trace |
Decision trace audit |
EVENT_DEBUG_EVENT |
debug.event |
Diagnostics / debug |
EVENT_CONVERSATION_STATE_UPDATED |
conversation.state_updated |
Multi-turn governance state snapshot (Step 13) |
EVENT_LEDGER_LOOKUP |
ledger.lookup |
Semantic decision ledger lookup (Step 13) |
EVENT_LEDGER_STORE |
ledger.store |
Semantic decision ledger store (Step 13) |
EVENT_SESSION_STORE_GET |
session_store.get |
SessionStore get (Step 13) |
EVENT_SESSION_STORE_PUT |
session_store.put |
SessionStore put (Step 13) |
EVENT_PROXY_REQUEST_FINALIZED |
proxy.request.finalized |
Per-request proxy finalisation summary (Step 13) |
| Variable | Default | Description |
|---|---|---|
MORALSTACK_OBSERVABILITY_MODE |
db_only if DB_PATH set, else file_only |
db_only | dual | file_only |
MORALSTACK_OBSERVABILITY_DB_PATH |
— | Path to SQLite database |
MORALSTACK_OBSERVABILITY_JSONL_DIR |
logs/observability |
Directory for JSONL output |
| Variable | Default | Description |
|---|---|---|
MORALSTACK_LEDGER_ENABLED |
true |
Enable the SemanticDecisionLedger semantic cache. When false, every turn runs the full deliberation. |
MORALSTACK_LEDGER_SIMILARITY_THRESHOLD |
0.92 |
Cosine similarity threshold for cache hit. Higher = stricter match. |
MORALSTACK_LEDGER_MAX_ENTRIES |
1000 |
Maximum cached entries per process (LRU eviction beyond this). |
MORALSTACK_LEDGER_EMBEDDING_MODEL |
(uses OPENAI_EMBEDDING_MODEL or text-embedding-3-small) |
Override the embedding model used for similarity. |
Skip rules (by design, not configurable):
- The ledger never caches turns with governance posture
ESCALATED(hard-signal refusals). - The ledger never caches the very first turn (
turn_index < 1). - These are safety invariants from the multi-turn design v1.3 section 5.8.
Deprecated aliases (still work, emit a logging.warning):
| Old variable | New variable |
|---|---|
MORALSTACK_PERSIST_MODE |
MORALSTACK_OBSERVABILITY_MODE |
MORALSTACK_DB_PATH |
MORALSTACK_OBSERVABILITY_DB_PATH |
All telemetry is persisted to SQLite. Used by moralstack-ui.
All telemetry is written to logs/observability/ as JSONL, one file per event type:
logs/observability/
├── llm.call.jsonl
├── orchestration.event.jsonl
├── decision.trace.jsonl
└── debug.event.jsonl
Both SQLite and JSONL are written simultaneously. Useful for debugging or migration.
from moralstack.observability import obs
rs = obs.read_store # SqliteReadStore instance
run = rs.get_run(run_id)
all_runs = rs.get_all_runs()
requests = rs.get_requests_for_run(run_id)
request = rs.get_request(run_id, request_id)
llm_calls = rs.get_llm_calls_for_request(run_id, request_id)
traces = rs.get_decision_traces_for_request(run_id, request_id)
orch = rs.get_orchestration_events_for_request(run_id, request_id)
debug = rs.get_debug_events_for_request(run_id, request_id)
models = rs.get_models_used_for_run(run_id)- The rollup includes only effective
llm_callsrows (call_outcomenot inskipped,cancelled,discarded). - This prevents report headers (for example, "Models used") from showing models that were configured but never actually consumed in the selected run path.
Runtime/export builders now include a minimal metric contract (metric_contract) generated from:
execution_strategyorchestration_event_countruntime_decisions_rowscycle_cards
Validated invariants include:
- Speculative timing coherence:
0 <= skip_elapsed_ms <= total_duration_ms(when total duration is available) - Activated signals coherence across traces (
RISK_ASSESSMENT,PRE_POLICY,FINAL) - Row cardinality coherence (
runtime_decisions_rows == orchestration_event_count) - Path cardinality guardrails (for example,
FAST_PATHmust not emit cycle cards)
The contract is produced by moralstack.reports.runtime_decisions.build_runtime_observability_contract() and embedded in markdown export structured JSON.
When using the Python SDK (govern() / GovernedClient), observability context is initialised automatically — no manual setup is required.
GovernedClient.__init__generates a session-scopedrun_id(UUID4) and registers it viaset_current_run_id().- If the active mode is
db_onlyordual,init_db()andcreate_run()are called to satisfy FK constraints for subsequent event inserts. - At the end of every
chat.completions.create()call,obs.flush()is invoked inside atry/finallyblock to guarantee that all enqueued events are written to disk before the call returns — critical for short-lived scripts.
# .env
MORALSTACK_OBSERVABILITY_MODE=file_only
MORALSTACK_OBSERVABILITY_JSONL_DIR=./traces/jsonlfrom moralstack import govern
from openai import OpenAI
client = govern(OpenAI())
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Hello"}],
)
# ./traces/jsonl/llm.call.jsonl, decision.trace.jsonl, etc. are now written.| Mode | DB init called | JSONL written | When to use |
|---|---|---|---|
file_only |
No | Yes | Lightweight audit trail, no UI |
db_only |
Yes (if MORALSTACK_OBSERVABILITY_DB_PATH set) |
No | Full UI + query access |
dual |
Yes | Yes | Debugging / migration |
Note: the
GovernanceConfigfieldsobservability_mode,db_path, andjsonl_dirare reserved for a future wiring layer. Today the active routing is driven exclusively by env vars (MORALSTACK_OBSERVABILITY_*).
from moralstack.observability.context import (
get_current_run_id, set_current_run_id,
get_current_request_id, set_current_request_id,
get_current_cycle, set_current_cycle,
)These are propagated across thread boundaries via contextvars.copy_context() inside ObservabilityWriteQueue.submit().
In the SDK path,
run_idis set automatically byGovernedClient._init_run_context(). Direct calls toset_current_run_id()are only needed when using the orchestrator API directly (e.g. CLI or custom integrations).
MoralStack treats every chat as a sequence of governed turns. Step 13 adds first-class persistence and read APIs for the conversation-level signals that previously lived only in transient runtime state.
| Table | Written by | Purpose |
|---|---|---|
conversation_states |
EVENT_CONVERSATION_STATE_UPDATED |
Full state_in / state_out snapshots + summary per turn |
ledger_events |
EVENT_LEDGER_LOOKUP, EVENT_LEDGER_STORE |
Every semantic decision ledger lookup/store with similarity, outcome, reason |
session_store_events |
EVENT_SESSION_STORE_GET, EVENT_SESSION_STORE_PUT |
Every SessionStore access with TTL / eviction metadata |
proxy_request_events |
EVENT_PROXY_REQUEST_FINALIZED |
Per-request proxy finalisation summary (state_provided, state_updated, postures, was_cached, response length, headers) |
The existing requests table is extended with a meta_json column merged
turn-by-turn via EVENT_REQUEST_META_UPDATED (final_action, risk_score,
path, posture, reason codes, triggered principles, …).
from moralstack.observability.conversation_events import (
emit_request_meta_updated,
emit_conversation_state_updated,
emit_ledger_lookup,
emit_ledger_store,
emit_session_store_get,
emit_session_store_put,
emit_proxy_request_finalized,
)All emitters are best-effort: they swallow failures and never block the
hot governance path. They write in both db_only and dual modes; in
file_only mode the same envelopes are appended under
logs/observability/<event_type>.jsonl.
rs = obs.read_store
rs.get_requests_for_conversation(conversation_id)
rs.get_conversation_states(conversation_id)
rs.get_ledger_events_for_conversation(conversation_id)
rs.get_session_store_events_for_conversation(conversation_id)
rs.get_proxy_request_events_for_conversation(conversation_id)
rs.get_conversation_ids_for_run(run_id)
rs.get_conversation_overview(conversation_id)get_conversation_overview() returns aggregated metrics keyed by
turn_count, first_created_at, last_created_at, final_actions,
max_risk_score, last_posture, ledger_hits, ledger_misses,
session_store_hits, session_store_misses, any_turn_cached,
cached_turn_count.
Each orchestration.event envelope carries an internal event_type field
(distinct from the envelope-level event_type) that identifies the kind of
orchestration step. The full taxonomy is in
moralstack/orchestration/orchestration_event_taxonomy.py. Notable types
relevant to multi-turn observability:
| Sub-type | Stage | Component | Meaning |
|---|---|---|---|
CONVERSATION_CONTEXT_ATTACHED |
orchestration | conversation | Conversation context was attached to the request (session id, turn index, parent request). |
CONVERSATION_STATE_UPDATED |
orchestration | conversation | The ConversationGovernanceState was advanced after this turn. |
LEDGER_FAST_PATH_APPLIED |
fast_path | ledger_fast_path_runner | The SemanticDecisionLedger returned a hit AND the safety gate accepted it; deliberation was skipped. Payload: from_turn, similarity, cached_action, forced_route, modules_skipped. |
LEDGER_FAST_PATH_NOT_APPLIED |
fast_path | ledger_fast_path_runner | The SemanticDecisionLedger returned a hit but the safety gate refused to apply it (typically because the current route requires deliberation and the cached decision is not REFUSE). Payload: from_turn, similarity, cached_action, current_route, gate_reason. |
The SemanticDecisionLedger cache is NOT applied unconditionally on a hit. The
ConversationalFastPathRunner.is_safe_to_apply gate enforces a safety
invariant: a cached decision is applied only when applying it would not
weaken safety relative to the current run.
Rules:
- Cached REFUSE → always applied. A cached refusal is always safe to reuse: the system already decided to deny on a semantically-equivalent query.
- Non-REFUSE on non-deliberative current route → applied. When the
current run's path router returns one of
benign,safe_complete,refuse, orfast_path, the current run has not requested deliberation. Reusing the cached non-REFUSE decision is safe. - Non-REFUSE on deliberative current route → REJECTED. When the current
run's path router returns
deliberativeordeliberative_loop, the current turn has a risk profile that triggers full deliberation. Reusing the cached (less strict) decision would downgrade safety — the gate refuses and the deliberation runs in full.
Observability:
- When the gate applies the hit, an
orchestration.eventwithevent_type='LEDGER_FAST_PATH_APPLIED'is emitted. - When the gate rejects the hit, an
orchestration.eventwithevent_type='LEDGER_FAST_PATH_NOT_APPLIED'is emitted, withreason_codes=['current_route_requires_deliberation']andpayload.gate_reasonindicating the rejection cause.
Demo: examples/multiturn_quickstart_gate_rejected.py reproduces a
gate-rejected scenario (best-effort, depends on the risk estimator
classifying the turn as deliberation-worthy).
GET /conversations/{conversation_id}— full timeline with per-turn governance decision, state transitions, ledger and session-store activity, and proxy finalisation.GET /conversations/{conversation_id}/export.md— Markdown audit trail (moralstack.reports.conversation_export.export_conversation_to_markdown).GET /conversations?q=<id>— direct lookup redirect.- The run detail page now lists every conversation seen in the run; the request detail page surfaces the parent conversation context.
python scripts/inspect_multiturn_trace.py <conversation_id> # human summary
python scripts/inspect_multiturn_trace.py --list-run <run_id> # list conversations
python scripts/inspect_multiturn_trace.py <conversation_id> --json # raw rows
python scripts/inspect_multiturn_trace.py <conversation_id> --export trace.mdRequires MORALSTACK_OBSERVABILITY_DB_PATH (or the legacy
MORALSTACK_DB_PATH) to point at a populated SQLite database.
moralstack.persistence is kept as a backwards-compatible alias. All symbols re-export from moralstack.observability. It will emit a DeprecationWarning on first import.
| Old (persistence) | New (observability) |
|---|---|
from moralstack.persistence.config import get_db_path |
from moralstack.observability.config import get_db_path |
from moralstack.persistence.config import get_persist_mode |
from moralstack.observability.config import get_observability_mode |
from moralstack.persistence.context import set_current_run_id |
from moralstack.observability.context import set_current_run_id |
from moralstack.persistence.db import init_db, create_run, … |
from moralstack.observability.sinks.sqlite_sink import init_db, create_run, … |
from moralstack.persistence.db import get_run, get_llm_calls_for_request, … |
obs.read_store.get_run(…) |
from moralstack.persistence.sink import persist_llm_call |
from moralstack.persistence.sink import persist_llm_call (wrapper still works) |
from moralstack.persistence.write_queue import async_persist_llm_call |
obs.emit(make_envelope(EVENT_LLM_CALL, …)) |
| Old path | New path |
|---|---|
logs/decision_trace.jsonl |
logs/observability/decision.trace.jsonl |
.debug/debug.log |
logs/observability/debug.event.jsonl |
The JSONL stream is append-only: every envelope produces one new line in
the matching <event_type>.jsonl file. The SQLite sink, in contrast, applies
different write strategies depending on the event_type. Consumers that read
the JSONL offline must understand which event types are deltas and which are
self-contained snapshots, otherwise they will misinterpret the data.
| Category | Event types | JSONL contains | SQLite write strategy |
|---|---|---|---|
| Atomic insert | llm.call, orchestration.event, decision.trace, debug.event, ledger.lookup, ledger.store, session_store.get, session_store.put, proxy.request_finalized |
one independent record per envelope; never merged with previous entries | one new row per envelope (INSERT INTO ...) |
| Merge-update | request.meta_updated, request.domain_updated, request.response_updated |
a delta: the envelope's payload.meta (or domain, or final_response) is a partial update, not the final state |
the existing row is updated with a JSON merge (update_request_meta(merge=True)); the SQLite column always holds the consolidated state |
| Upsert | request.upserted, conversation.state_updated, run.started, run.ended |
a complete snapshot of the entity at envelope time; later snapshots override earlier ones | INSERT OR REPLACE keyed by (run_id, request_id) or equivalent |
If you read logs/observability/request.meta_updated.jsonl with a script that
takes the last line for a given (run_id, request_id) pair, you will see only
the last delta — not the consolidated meta_json that SQLite holds. To
reconstruct the consolidated state from the JSONL alone, you must:
- read all envelopes for the target
(run_id, request_id), - apply them in
created_atorder, - merge each
payload.metainto an accumulator (last-write-wins per key).
Atomic-insert events do not have this problem: every envelope is already self-contained.
Upsert events follow the same accumulation pattern as merge-update, but the merge unit is the entire payload (last snapshot wins) rather than a per-key JSON merge.
A single request goes through five state transitions, producing five envelopes in the JSONL:
{"event_type":"request.meta_updated","run_id":"r1","request_id":"req1","payload":{"meta":{"final_action":"NORMAL_COMPLETE","path":"FAST_PATH"}}}
{"event_type":"request.meta_updated","run_id":"r1","request_id":"req1","payload":{"meta":{"risk_score":0.10}}}
{"event_type":"request.meta_updated","run_id":"r1","request_id":"req1","payload":{"meta":{"intent_clarity":"HIGH"}}}
{"event_type":"request.meta_updated","run_id":"r1","request_id":"req1","payload":{"meta":{"was_cached":true,"cached_from_turn":1}}}
{"event_type":"request.meta_updated","run_id":"r1","request_id":"req1","payload":{"meta":{"governance_posture":"ELEVATED"}}}The SQLite row for that request, after the writer has processed all five:
{
"final_action": "NORMAL_COMPLETE",
"path": "FAST_PATH",
"risk_score": 0.10,
"intent_clarity": "HIGH",
"was_cached": true,
"cached_from_turn": 1,
"governance_posture": "ELEVATED"
}That consolidated form is what obs.read_store.get_request(run_id, request_id)
returns and what moralstack-ui displays. The five JSONL lines, taken
individually, never contain the full picture.
scripts/consolidate_jsonl_meta.py performs the JSONL→consolidated merge
offline. Use it when you need the SQLite-equivalent state but only have the
JSONL files (for example: when copying logs between machines, when auditing
after the DB has been rotated, or when feeding offline analytics that work
with self-contained JSON).
python scripts/consolidate_jsonl_meta.py \
--input logs/observability/request.meta_updated.jsonl \
--output requests_meta_consolidated.jsonThe output is a JSON file mapping (run_id, request_id) → consolidated meta
dict, with the same semantics as requests.meta_json in SQLite.