diff --git a/CHANGELOG.md b/CHANGELOG.md index b35915c..ce3b5ac 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,7 +5,40 @@ All notable changes to Weavegraph will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). -## [Unreleased] +## [0.6.0] - 2026-05-11 + +### Added + +#### WG-006 — Invocation-scoped state lifecycle + normalization profiles + +- `StateLifecycle` enum (`Durable` / `InvocationScoped`) in `weavegraph::state`. +- `StateKey::invocation_scoped()` const builder — marks a key as invocation-scoped without changing its identity (equality / hash exclude the lifecycle field). +- `StateKey::lifecycle()` getter returning the stored `StateLifecycle`. +- `StateNormalizeProfile` in `weavegraph::runtimes::replay` — fluent builder for specifying which state keys to ignore during replay comparison. Supports both typed (`ignore_key(StateKey)`) and raw-string (`ignore_extra_keys(impl IntoIterator)`) forms. Panics at construction time if two registrations conflict on lifecycle annotation. +- `normalize_state_with(state, profile)` — normalizes a `VersionedState` snapshot to a `serde_json::Value` after dropping ignored keys. +- `compare_final_state_with(left, right, profile)` — variant of `compare_final_state` accepting a `StateNormalizeProfile`. +- `compare_replay_runs_with_profile(left, right, profile, event_normalizer)` — variant of `compare_replay_runs_with` accepting a `StateNormalizeProfile`. +- `NodePartial::clear_extra_keys(keys)` — **deletes** the given raw keys from state on the next barrier application. Uses JSON Merge Patch semantics: `MapMerge` now removes keys whose incoming value is `null` (RFC 7396). No separate cleanup reducer is needed. +- `NodePartial::clear_typed_extra_key(key)` — typed companion to `clear_extra_keys`; uses the `StateKey`'s storage key. + +#### WG-007 — Runtime observability hooks + metrics adapter + +- `RuntimeObserver` trait in `weavegraph::runtimes::observer` — zero-cost (no allocation, no virtual dispatch when unused), always compiled, no feature gate. All methods have default no-op bodies; implementors override only what they need. +- Hook methods: `on_invocation_start`, `on_invocation_finish`, `on_node_finish`, `on_checkpoint_load`, `on_checkpoint_save`, `on_event_bus_emit`. +- Metadata structs (all `#[non_exhaustive]`): `InvocationStartMeta`, `InvocationFinishMeta`, `NodeFinishMeta`, `CheckpointLoadMeta`, `CheckpointSaveMeta`, `EventBusEmitMeta`. +- Outcome enums (all `#[non_exhaustive]`): `InvocationOutcome`, `NodeOutcome`. +- `AppRunnerBuilder::observer(Arc)` — attaches an observer; no-op overhead when not set. +- Observer panics are caught via `std::panic::catch_unwind` and logged as `tracing::warn!` — a misbehaving observer cannot abort a workflow. +- `ObservingEmitter` (private) — wraps the event bus emitter to fire `on_event_bus_emit` for every emitted event when an observer is attached. +- `MetricsObserver` in `weavegraph::runtimes::metrics_observer` — a `RuntimeObserver` impl that emits standard counters and histograms via the `metrics` crate facade. Available under the `metrics` feature flag. + - Counters: `weavegraph.node.invocations` (labels: `node`, `outcome`), `weavegraph.invocation.count` (`outcome`), `weavegraph.checkpoint.saves` (`backend`), `weavegraph.checkpoint.loads` (`backend`), `weavegraph.event_bus.emits` (`scope`). + - Histograms: `weavegraph.node.step_duration_ms` (`node`), `weavegraph.invocation.duration_ms`, `weavegraph.checkpoint.save_duration_ms` (`backend`). + +### Changed (breaking) + +- `RunnerError`, `NodeError`, `CheckpointerError`, `StateSlotError`, and `ReplayConformanceError` are now `#[non_exhaustive]`. Exhaustive `match` arms on these types must add a wildcard `_` arm. + - Migration: replace `_ => unreachable!()` with `_ => { /* handle future variants */ }` where appropriate. +- **`MapMerge` reducer now deletes keys whose incoming value is `null`** (JSON Merge Patch / RFC 7396). Previously a `null` was written into state as-is. Any code that deliberately stored `serde_json::Value::Null` via `with_extra` should use a sentinel value instead (e.g. a JSON object with an `absent: true` field). ## [0.5.0] - 2026-05-08 diff --git a/Cargo.toml b/Cargo.toml index 110f541..92c77b4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "weavegraph" -version = "0.5.0" +version = "0.6.0" edition = "2024" description = "Graph-driven, concurrent agent workflow framework with versioned state, deterministic barrier merges, and rich diagnostics." license = "MIT" @@ -100,6 +100,7 @@ reqwest = { version = "0.13", default-features = false, features = [ "rustls", ], optional = true } scraper = { version = "0.25", optional = true } +metrics = { version = "0.24", optional = true } # wg-ragsmith removed from dependencies to avoid circular dependency. # For RAG examples, see the wg-ragsmith crate directly. @@ -120,6 +121,7 @@ postgres = ["sqlx"] rig = ["dep:rig-core", "dep:rmcp"] diagnostics = ["dep:miette"] examples = ["reqwest", "scraper"] +metrics = ["dep:metrics"] petgraph-compat = ["petgraph"] [[example]] diff --git a/docs/MIGRATION.md b/docs/MIGRATION.md index 86964c2..031f2a0 100644 --- a/docs/MIGRATION.md +++ b/docs/MIGRATION.md @@ -5,6 +5,136 @@ migration guidance for upgrading your code. --- +## v0.6.0 + +### Overview + +v0.6.0 adds two major feature groups (WG-006 and WG-007) with only one breaking change: five +public error enums are now `#[non_exhaustive]`. The new APIs are purely additive and backward-compatible. + +### Breaking: `#[non_exhaustive]` error enums + +The following enums are now `#[non_exhaustive]` to allow adding new variants in future minor releases without a breaking change: + +- `RunnerError` +- `NodeError` +- `CheckpointerError` +- `StateSlotError` +- `ReplayConformanceError` + +**Migration**: Any exhaustive `match` on these types will now fail to compile. Add a wildcard arm: + +```rust +// Before (0.5.0) +match err { + RunnerError::SessionNotFound(_) => { /* ... */ } + RunnerError::StepFailed(_) => { /* ... */ } + // compiler accepted this as exhaustive +} + +// After (0.6.0) +match err { + RunnerError::SessionNotFound(_) => { /* ... */ } + RunnerError::StepFailed(_) => { /* ... */ } + _ => { /* handle any future variants */ } +} +``` + +### Breaking: `MapMerge` now deletes keys with null values (RFC 7396) + +`MapMerge`, the built-in reducer for `VersionedState.extra`, previously wrote `serde_json::Value::Null` verbatim into the state map. It now **removes** the key when the incoming value is `null`, following [JSON Merge Patch](https://www.rfc-editor.org/rfc/rfc7396) semantics. + +**Impact**: If your graph stores `serde_json::Value::Null` intentionally as a meaningful value (as opposed to an absence marker), replace it with an explicit sentinel, for example: + +```json +{ "absent": true } +``` + +**Benefit**: `NodePartial::clear_extra_keys` and `clear_typed_extra_key` now fully delete keys — no wrapper reducer or post-processing is required. + +### New: Invocation-scoped state slots (WG-006) + +Mark a `StateKey` as `InvocationScoped` using the new const builder: + +```rust +const SCRATCH: StateKey = StateKey::new("wq", "scratch", 1) + .invocation_scoped(); +``` + +Keys marked `InvocationScoped` compare equal to a `Durable` key with the same `(namespace, name, schema_version)` — lifecycle is intentionally excluded from equality and hashing so that the same slot can be used across invocations without registry conflicts. + +To clear invocation-scoped slots at re-entry (e.g. in iterative sessions), call `clear_typed_extra_key` on the outgoing `NodePartial`: + +```rust +partial.clear_typed_extra_key(SCRATCH) +``` + +This **deletes** the key from `VersionedState.extra` — no separate cleanup reducer is needed. +`MapMerge` (the built-in extra reducer) now follows JSON Merge Patch semantics (RFC 7396): +an incoming `null` removes the key rather than storing a null value. + +### New: Replay normalization profiles (WG-006) + +Use `StateNormalizeProfile` to ignore volatile or invocation-scoped keys during replay comparison: + +```rust +let profile = StateNormalizeProfile::new() + .ignore_key(SCRATCH) // typed — records lifecycle for conflict detection + .ignore_extra_keys(["ts"]); // raw string — no lifecycle + +let comparison = compare_final_state_with(&run_a, &run_b, &profile); +assert!(comparison.is_equivalent()); +``` + +### New: Runtime observer (WG-007) + +Attach a `RuntimeObserver` to an `AppRunner` for structured hook callbacks at key lifecycle points: + +```rust +#[derive(Debug)] +struct MyObserver; + +impl RuntimeObserver for MyObserver { + fn on_invocation_finish(&self, meta: &InvocationFinishMeta<'_>) { + println!("run {} completed in {}ms", meta.session_id, meta.duration_ms); + } +} + +let runner = AppRunner::builder() + .app(app) + .observer(Arc::new(MyObserver)) + .build() + .await?; +``` + +When no observer is attached, there is zero overhead — the observer field is `Option>`. + +Observer panics are caught via `catch_unwind` and logged as warnings; a misbehaving observer +cannot crash or abort a workflow invocation. + +### New: MetricsObserver (WG-007, `metrics` feature) + +Enable the `metrics` feature and attach `MetricsObserver` to export standard Prometheus-compatible metrics via any `metrics`-crate recorder (e.g. `metrics-exporter-prometheus`): + +```toml +weavegraph = { version = "0.6", features = ["metrics"] } +metrics-exporter-prometheus = "0.17" +``` + +```rust +use weavegraph::runtimes::MetricsObserver; + +let runner = AppRunner::builder() + .app(app) + .observer(Arc::new(MetricsObserver)) + .build() + .await?; +``` + +See the `metrics_observer` module docs for the full metric inventory. + +--- + ## v0.5.0 ### Overview diff --git a/src/node.rs b/src/node.rs index 6393d54..e1c3892 100644 --- a/src/node.rs +++ b/src/node.rs @@ -402,6 +402,59 @@ impl NodePartial { self.frontier = Some(command); self } + + /// Remove the given extra keys from state on the next barrier application. + /// + /// Writes `serde_json::Value::Null` markers into the partial. [`MapMerge`](crate::reducers::MapMerge) + /// (the built-in extra reducer) follows JSON Merge Patch semantics (RFC 7396) and + /// **deletes** keys whose incoming value is `null`, so no separate cleanup reducer + /// is needed. + /// + /// # Examples + /// + /// ```rust + /// use weavegraph::node::NodePartial; + /// + /// let partial = NodePartial::new() + /// .clear_extra_keys(["wq:feature_snapshot:v1", "wq:signal_event:v1"]); + /// ``` + #[must_use] + pub fn clear_extra_keys(mut self, keys: I) -> Self + where + I: IntoIterator, + S: Into, + { + let extra = self.extra.get_or_insert_with(FxHashMap::default); + for key in keys { + extra.insert(key.into(), serde_json::Value::Null); + } + self + } + + /// Remove a single typed extra key from state on the next barrier application. + /// + /// Typed companion to [`clear_extra_keys`](Self::clear_extra_keys). The storage key + /// is derived from the `StateKey`'s `(namespace, name, schema_version)` triple so + /// that the same constant used to write a value can be used to delete it. + /// + /// [`MapMerge`](crate::reducers::MapMerge) deletes keys with null values (RFC 7396), + /// so no separate cleanup reducer is needed. + /// + /// # Examples + /// + /// ```rust + /// use weavegraph::node::NodePartial; + /// use weavegraph::state::{StateKey, StateLifecycle}; + /// + /// const CURRENT_EVENT: StateKey = + /// StateKey::new("wq", "event", 1).invocation_scoped(); + /// + /// let partial = NodePartial::new().clear_typed_extra_key(CURRENT_EVENT); + /// ``` + #[must_use] + pub fn clear_typed_extra_key(self, key: crate::state::StateKey) -> Self { + self.clear_extra_keys([key.storage_key()]) + } } // ============================================================================ @@ -431,6 +484,7 @@ pub enum NodeContextError { /// use `NodePartial.errors` instead. #[derive(Debug, Error)] #[cfg_attr(feature = "diagnostics", derive(miette::Diagnostic))] +#[non_exhaustive] pub enum NodeError { /// Expected input data is missing from the state snapshot. #[error("missing expected input: {what}")] diff --git a/src/reducers/map_merge.rs b/src/reducers/map_merge.rs index 23a2e8a..71063e4 100644 --- a/src/reducers/map_merge.rs +++ b/src/reducers/map_merge.rs @@ -1,8 +1,20 @@ //! Reducer that shallow-merges incoming extra key-value pairs into the extras channel. +//! +//! Follows the [JSON Merge Patch](https://www.rfc-editor.org/rfc/rfc7396) convention: +//! an incoming `null` value **removes** the key from state rather than setting it to null. +//! This is what makes [`NodePartial::clear_extra_keys`](crate::node::NodePartial::clear_extra_keys) +//! and [`NodePartial::clear_typed_extra_key`](crate::node::NodePartial::clear_typed_extra_key) +//! functional without requiring a separate cleanup reducer. use super::Reducer; use crate::{channels::Channel, node::NodePartial, state::VersionedState}; /// Reducer that merges extra key-value pairs from a [`NodePartial`](crate::node::NodePartial) into the state extras channel. +/// +/// Uses JSON Merge Patch semantics (RFC 7396): an incoming `null` value **removes** the +/// key from state rather than writing a null entry. This means +/// [`NodePartial::clear_extra_keys`](crate::node::NodePartial::clear_extra_keys) and +/// [`NodePartial::clear_typed_extra_key`](crate::node::NodePartial::clear_typed_extra_key) +/// fully delete the key — no separate cleanup reducer is needed. #[derive(Debug, PartialEq, Clone, Hash, Eq)] pub struct MapMerge; impl Reducer for MapMerge { @@ -12,7 +24,11 @@ impl Reducer for MapMerge { { let state_map = state.extra.get_mut(); for (k, v) in extras_update.iter() { - state_map.insert(k.clone(), v.clone()); + if v.is_null() { + state_map.remove(k); + } else { + state_map.insert(k.clone(), v.clone()); + } } } } diff --git a/src/runtimes/checkpointer.rs b/src/runtimes/checkpointer.rs index c1b6fe4..35acc14 100644 --- a/src/runtimes/checkpointer.rs +++ b/src/runtimes/checkpointer.rs @@ -159,6 +159,7 @@ impl Checkpoint { /// Errors from checkpointer operations. #[derive(Debug, thiserror::Error)] #[cfg_attr(feature = "diagnostics", derive(miette::Diagnostic))] +#[non_exhaustive] pub enum CheckpointerError { /// Session was not found in the checkpointer. #[error("session not found: {session_id}")] diff --git a/src/runtimes/metrics_observer.rs b/src/runtimes/metrics_observer.rs new file mode 100644 index 0000000..216747f --- /dev/null +++ b/src/runtimes/metrics_observer.rs @@ -0,0 +1,121 @@ +//! Feature-gated [`RuntimeObserver`] implementation using the [`metrics`] crate facade. +//! +//! Enable with `features = ["metrics"]`. This module emits standard counters and +//! histograms that any `metrics`-compatible recorder (e.g. `metrics-exporter-prometheus`) +//! can capture. +//! +//! # Metric inventory +//! +//! | Metric | Kind | Labels | Description | +//! |--------|------|--------|-------------| +//! | `weavegraph.node.invocations` | counter | `node`, `outcome` | Completed node executions | +//! | `weavegraph.node.step_duration_ms` | histogram | `node` | Superstep duration (shared across parallel nodes) | +//! | `weavegraph.invocation.count` | counter | `outcome` | Completed workflow invocations | +//! | `weavegraph.invocation.duration_ms` | histogram | (none) | Invocation wall-clock duration | +//! | `weavegraph.checkpoint.saves` | counter | `backend` | Successful checkpoint saves | +//! | `weavegraph.checkpoint.save_duration_ms` | histogram | `backend` | Checkpoint save duration | +//! | `weavegraph.checkpoint.loads` | counter | `backend` | Sessions resumed from a checkpoint | +//! | `weavegraph.event_bus.emits` | counter | `scope` | Events emitted through the event bus | +//! +//! # Cardinality note +//! +//! Labels are kept conservative by default. `session_id` and `invocation_id` are +//! intentionally **not** included as labels to avoid unbounded cardinality in +//! long-running services. The `node` label uses the node kind's string encoding +//! (e.g. `"features"`, `"strategy"`). +//! +//! # Usage +//! +//! ```rust,no_run +//! use std::sync::Arc; +//! use weavegraph::runtimes::{AppRunner, metrics_observer::MetricsObserver}; +//! # use weavegraph::app::App; +//! +//! # async fn example(app: App) { +//! let runner = AppRunner::builder() +//! .app(app) +//! .observer(Arc::new(MetricsObserver)) +//! .build() +//! .await; +//! # } +//! ``` + +use std::panic::RefUnwindSafe; + +use crate::runtimes::observer::{ + CheckpointLoadMeta, CheckpointSaveMeta, EventBusEmitMeta, InvocationFinishMeta, + InvocationStartMeta, NodeFinishMeta, NodeOutcome, RuntimeObserver, +}; + +/// A [`RuntimeObserver`] that emits metrics via the [`metrics`] crate facade. +/// +/// Install a compatible recorder (e.g. `metrics-exporter-prometheus`) before +/// starting the runner to have these metrics exported to your observability stack. +/// +/// See the [module documentation](self) for the full metric inventory. +#[derive(Debug, Clone, Copy)] +pub struct MetricsObserver; + +// MetricsObserver holds no interior mutability and all metrics calls are +// thread-safe through the global recorder, so RefUnwindSafe is safe to assert. +impl RefUnwindSafe for MetricsObserver {} + +impl RuntimeObserver for MetricsObserver { + fn on_invocation_start(&self, _meta: &InvocationStartMeta<'_>) { + // Nothing to emit at start — counts and durations are emitted on finish. + } + + fn on_invocation_finish(&self, meta: &InvocationFinishMeta<'_>) { + let outcome = match meta.outcome { + crate::runtimes::observer::InvocationOutcome::Completed => "completed", + crate::runtimes::observer::InvocationOutcome::Error => "error", + }; + metrics::counter!("weavegraph.invocation.count", "outcome" => outcome).increment(1); + metrics::histogram!("weavegraph.invocation.duration_ms").record(meta.duration_ms as f64); + } + + fn on_node_finish(&self, meta: &NodeFinishMeta<'_>) { + let node = meta.node_kind.encode().to_string(); + let outcome = match meta.outcome { + NodeOutcome::Completed => "completed", + NodeOutcome::Error => "error", + NodeOutcome::Skipped => "skipped", + }; + metrics::counter!( + "weavegraph.node.invocations", + "node" => node.clone(), + "outcome" => outcome + ) + .increment(1); + if meta.outcome != NodeOutcome::Skipped { + metrics::histogram!("weavegraph.node.step_duration_ms", "node" => node) + .record(meta.step_duration_ms as f64); + } + } + + fn on_checkpoint_load(&self, meta: &CheckpointLoadMeta<'_>) { + metrics::counter!( + "weavegraph.checkpoint.loads", + "backend" => meta.backend.to_string() + ) + .increment(1); + } + + fn on_checkpoint_save(&self, meta: &CheckpointSaveMeta<'_>) { + let backend = meta.backend.to_string(); + metrics::counter!("weavegraph.checkpoint.saves", "backend" => backend.clone()).increment(1); + metrics::histogram!( + "weavegraph.checkpoint.save_duration_ms", + "backend" => backend + ) + .record(meta.duration_ms as f64); + } + + fn on_event_bus_emit(&self, meta: &EventBusEmitMeta<'_>) { + metrics::counter!( + "weavegraph.event_bus.emits", + "scope" => meta.scope.to_string() + ) + .increment(1); + } +} diff --git a/src/runtimes/mod.rs b/src/runtimes/mod.rs index 101b3a2..cca02a8 100644 --- a/src/runtimes/mod.rs +++ b/src/runtimes/mod.rs @@ -53,6 +53,10 @@ pub mod checkpointer_sqlite; #[cfg(feature = "sqlite")] mod checkpointer_sqlite_helpers; pub mod execution; +#[cfg(feature = "metrics")] +#[cfg_attr(docsrs, doc(cfg(feature = "metrics")))] +pub mod metrics_observer; +pub mod observer; pub mod persistence; pub mod replay; pub mod runner; @@ -85,9 +89,17 @@ pub use session::{SessionInit, SessionState, StateVersions}; pub use runner::{AppRunner, AppRunnerBuilder, RunMetadata}; pub use replay::{ - ReplayComparison, ReplayConformanceError, ReplayRun, compare_event_sequences, - compare_event_sequences_with, compare_final_state, compare_replay_runs, - compare_replay_runs_with, normalize_event, normalize_state, + ReplayComparison, ReplayConformanceError, ReplayRun, StateNormalizeProfile, + compare_event_sequences, compare_event_sequences_with, compare_final_state, + compare_final_state_with, compare_replay_runs, compare_replay_runs_with, + compare_replay_runs_with_profile, normalize_event, normalize_state, normalize_state_with, }; pub use runtime_config::{EventBusConfig, RuntimeConfig, SinkConfig}; pub use types::{SessionId, StepNumber}; + +#[cfg(feature = "metrics")] +pub use metrics_observer::MetricsObserver; +pub use observer::{ + CheckpointLoadMeta, CheckpointSaveMeta, EventBusEmitMeta, InvocationFinishMeta, + InvocationOutcome, InvocationStartMeta, NodeFinishMeta, NodeOutcome, RuntimeObserver, +}; diff --git a/src/runtimes/observer.rs b/src/runtimes/observer.rs new file mode 100644 index 0000000..0f3962a --- /dev/null +++ b/src/runtimes/observer.rs @@ -0,0 +1,211 @@ +//! Runtime observer trait and metadata types for workflow telemetry hooks. +//! +//! `RuntimeObserver` is an opt-in interface that receives structured callbacks +//! at key points during graph execution: invocation boundaries, per-node +//! completion, checkpoint operations, and event-bus emissions. All methods +//! have default no-op implementations, so implementors only override the hooks +//! they care about. +//! +//! # Usage +//! +//! Register an observer when building a runner: +//! +//! ```rust,no_run +//! use std::sync::Arc; +//! use weavegraph::runtimes::{AppRunner, observer::{RuntimeObserver, NodeFinishMeta}}; +//! use weavegraph::app::App; +//! +//! #[derive(Debug)] +//! struct CountingObserver { +//! count: std::sync::atomic::AtomicU64, +//! } +//! +//! impl RuntimeObserver for CountingObserver { +//! fn on_node_finish(&self, meta: &NodeFinishMeta<'_>) { +//! self.count.fetch_add(1, std::sync::atomic::Ordering::Relaxed); +//! } +//! } +//! +//! # async fn example(app: App) { +//! let observer = Arc::new(CountingObserver { count: Default::default() }); +//! let runner = AppRunner::builder() +//! .app(app) +//! .observer(observer) +//! .build() +//! .await; +//! # } +//! ``` +//! +//! # Performance contract +//! +//! Observer hooks are called **synchronously** on the execution thread. They must +//! be fast and non-blocking. Panics inside hooks are caught by the runner, which +//! logs a warning via [`tracing`] and continues execution — the graph is never +//! brought down by an observer failure. +//! +//! # Note on per-node timing in 0.6.0 +//! +//! In this release, `step_duration_ms` in [`NodeFinishMeta`] reflects the elapsed +//! time for the **entire superstep** that contained the node, not the per-node +//! wall time. Nodes within the same superstep share the step's duration. Per-node +//! timing would require scheduler-level instrumentation and is planned for a +//! future release. + +use std::fmt; +use std::panic::RefUnwindSafe; + +use crate::types::NodeKind; + +// ============================================================================ +// Outcome enums +// ============================================================================ + +/// Outcome of a completed workflow invocation. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[non_exhaustive] +pub enum InvocationOutcome { + /// The invocation ran to completion successfully. + Completed, + /// The invocation ended with a runtime error. + Error, +} + +/// Outcome of a completed node execution within a superstep. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[non_exhaustive] +pub enum NodeOutcome { + /// The node ran and returned a `NodePartial`. + Completed, + /// The node returned a fatal `NodeError`. + Error, + /// The node was skipped (version-gated or terminal `End` node). + Skipped, +} + +// ============================================================================ +// Metadata structs — all #[non_exhaustive] so fields can be added without +// breaking implementors that destructure them (though &-access is idiomatic). +// ============================================================================ + +/// Metadata supplied to [`RuntimeObserver::on_invocation_start`]. +#[derive(Debug)] +#[non_exhaustive] +pub struct InvocationStartMeta<'a> { + /// The session identifier for this invocation. + pub session_id: &'a str, + /// Stable fingerprint of the compiled graph definition. + /// + /// Computed by [`App::graph_definition_hash`](crate::app::App::graph_definition_hash). + pub graph_id: &'a str, +} + +/// Metadata supplied to [`RuntimeObserver::on_invocation_finish`]. +#[derive(Debug)] +#[non_exhaustive] +pub struct InvocationFinishMeta<'a> { + /// The session identifier. + pub session_id: &'a str, + /// Stable fingerprint of the compiled graph definition. + pub graph_id: &'a str, + /// Wall-clock elapsed time for the full invocation in milliseconds. + pub duration_ms: u64, + /// Outcome of the invocation. + pub outcome: InvocationOutcome, +} + +/// Metadata supplied to [`RuntimeObserver::on_node_finish`]. +/// +/// See [module-level note](self) on per-node timing in 0.6.0. +#[derive(Debug)] +#[non_exhaustive] +pub struct NodeFinishMeta<'a> { + /// The node that completed. + pub node_kind: &'a NodeKind, + /// The session identifier. + pub session_id: &'a str, + /// The step number within which this node executed. + pub step: u64, + /// Elapsed time for the superstep containing this node, in milliseconds. + /// + /// All nodes in the same superstep share this value. Per-node timing + /// is not available in 0.6.0. + pub step_duration_ms: u64, + /// Outcome of this node. + pub outcome: NodeOutcome, +} + +/// Metadata supplied to [`RuntimeObserver::on_checkpoint_load`]. +#[derive(Debug)] +#[non_exhaustive] +pub struct CheckpointLoadMeta<'a> { + /// The session identifier. + pub session_id: &'a str, + /// Human-readable backend name (e.g. `"sqlite"`, `"postgres"`, `"in-memory"`). + pub backend: &'a str, + /// The step number that was loaded from the checkpoint. + pub step: u64, +} + +/// Metadata supplied to [`RuntimeObserver::on_checkpoint_save`]. +#[derive(Debug)] +#[non_exhaustive] +pub struct CheckpointSaveMeta<'a> { + /// The session identifier. + pub session_id: &'a str, + /// Human-readable backend name. + pub backend: &'a str, + /// The step number that was saved. + pub step: u64, + /// Wall-clock duration of the save operation in milliseconds. + pub duration_ms: u64, +} + +/// Metadata supplied to [`RuntimeObserver::on_event_bus_emit`]. +#[derive(Debug)] +#[non_exhaustive] +pub struct EventBusEmitMeta<'a> { + /// The scope label of the emitted event (e.g. `"features"`, `"__weavegraph_stream_end__"`). + pub scope: &'a str, +} + +// ============================================================================ +// RuntimeObserver trait +// ============================================================================ + +/// Observer interface for runtime telemetry hooks. +/// +/// Register an implementation with +/// [`AppRunnerBuilder::observer`](crate::runtimes::runner::AppRunnerBuilder::observer). +/// All methods default to no-ops; implement only the callbacks you need. +/// +/// # Safety contract +/// +/// Implementations **must not panic** — panics are caught by the runner and +/// produce a `tracing::warn!` log entry. The supertrait bound [`RefUnwindSafe`] +/// is required to make this catch-and-continue safe without `AssertUnwindSafe` +/// wrappers at every callsite. +/// +/// Implementations must be `Send + Sync` as the runner can share them across +/// async tasks. +pub trait RuntimeObserver: Send + Sync + fmt::Debug + RefUnwindSafe + 'static { + /// Called immediately before a workflow invocation begins running. + fn on_invocation_start(&self, _meta: &InvocationStartMeta<'_>) {} + + /// Called after a workflow invocation finishes (successfully or with an error). + fn on_invocation_finish(&self, _meta: &InvocationFinishMeta<'_>) {} + + /// Called once for each node after the superstep containing it completes. + /// + /// In 0.6.0, `step_duration_ms` is the superstep duration shared by all + /// nodes in the same parallel step. See the [module note](self). + fn on_node_finish(&self, _meta: &NodeFinishMeta<'_>) {} + + /// Called after a checkpoint is successfully loaded during session creation. + fn on_checkpoint_load(&self, _meta: &CheckpointLoadMeta<'_>) {} + + /// Called after a checkpoint is successfully saved. + fn on_checkpoint_save(&self, _meta: &CheckpointSaveMeta<'_>) {} + + /// Called after each event is emitted through the event bus. + fn on_event_bus_emit(&self, _meta: &EventBusEmitMeta<'_>) {} +} diff --git a/src/runtimes/replay.rs b/src/runtimes/replay.rs index 99ced6a..cdd9c72 100644 --- a/src/runtimes/replay.rs +++ b/src/runtimes/replay.rs @@ -7,7 +7,11 @@ use serde_json::{Value, json}; use thiserror::Error; -use crate::{channels::Channel, event_bus::Event, state::VersionedState}; +use crate::{ + channels::Channel, + event_bus::Event, + state::{StateKey, StateLifecycle, VersionedState}, +}; /// Captured output from one workflow run. #[derive(Debug, Clone)] @@ -78,6 +82,7 @@ impl ReplayComparison { /// Errors returned by replay conformance helpers. #[derive(Debug, Error)] #[cfg_attr(feature = "diagnostics", derive(miette::Diagnostic))] +#[non_exhaustive] pub enum ReplayConformanceError { /// The compared runs were not equivalent. #[error("replay conformance mismatch: {differences:?}")] @@ -207,3 +212,189 @@ where ReplayComparison::with_differences(differences) } + +// ============================================================================ +// Normalization profiles (WG-006) +// ============================================================================ + +/// A filter profile for [`normalize_state_with`] and [`compare_final_state_with`]. +/// +/// A profile lists extra-map keys that should be excluded from normalized state +/// output. This is the primary mechanism for separating durable state from +/// per-invocation scratch values during replay comparison and resume assertions. +/// +/// # Conflict detection +/// +/// When a key is added via [`ignore_key`](Self::ignore_key), the profile records +/// the key's [`StateLifecycle`] annotation. If the same storage key is later +/// registered with a **different** lifecycle annotation, the method panics with a +/// clear message. This prevents subtle bugs from defining the same slot constant +/// twice with conflicting policies. +/// +/// Raw-string keys added via [`ignore_extra_keys`](Self::ignore_extra_keys) carry +/// no lifecycle annotation and do not trigger conflict detection. +/// +/// # Examples +/// +/// ```rust +/// use weavegraph::runtimes::replay::{StateNormalizeProfile, normalize_state_with}; +/// use weavegraph::state::{StateKey, StateLifecycle}; +/// use weavegraph::state::VersionedState; +/// +/// const TICK_EVENT: StateKey = StateKey::new("wq", "event", 1).invocation_scoped(); +/// +/// let profile = StateNormalizeProfile::new().ignore_key(TICK_EVENT); +/// +/// let state = VersionedState::new_with_user_message("hello"); +/// let _normalized = normalize_state_with(&state, &profile); +/// ``` +#[derive(Debug, Default, Clone)] +pub struct StateNormalizeProfile { + /// (storage_key, optional lifecycle annotation). + /// `None` = added via raw string; `Some(lc)` = added via typed StateKey. + ignored: Vec<(String, Option)>, +} + +impl StateNormalizeProfile { + /// Create an empty profile (no keys ignored; equivalent to `normalize_state`). + #[must_use] + pub fn new() -> Self { + Self::default() + } + + /// Ignore the given raw storage key strings during normalization. + /// + /// Use this for quick ad-hoc ignores. Prefer [`ignore_key`](Self::ignore_key) + /// when you have a typed `StateKey` constant, as it also validates lifecycle + /// consistency. + #[must_use] + pub fn ignore_extra_keys(mut self, keys: I) -> Self + where + I: IntoIterator, + S: Into, + { + for k in keys { + self.add_raw(k.into(), None); + } + self + } + + /// Ignore the storage slot identified by `key` during normalization. + /// + /// The key's [`StateLifecycle`] annotation is recorded. If the same storage + /// key has previously been registered with a different lifecycle annotation, + /// this method **panics** — this is intentional: it surfaces a configuration + /// mistake at test/startup time rather than silently producing wrong results. + #[must_use] + pub fn ignore_key(mut self, key: StateKey) -> Self { + self.add_raw(key.storage_key(), Some(key.lifecycle())); + self + } + + fn add_raw(&mut self, storage_key: String, lifecycle: Option) { + if let Some((_, existing_lc)) = self.ignored.iter().find(|(k, _)| k == &storage_key) { + match (existing_lc, &lifecycle) { + (Some(a), Some(b)) if a != b => { + panic!( + "StateNormalizeProfile: conflicting lifecycle annotations for key {:?}: \ + already registered as {:?}, attempted to re-register as {:?}. \ + Ensure the same StateKey constant is used throughout.", + storage_key, a, b + ); + } + _ => {} // duplicate or compatible — idempotent + } + return; + } + self.ignored.push((storage_key, lifecycle)); + } + + /// Iterate over the concrete storage key strings this profile ignores. + pub fn ignored_keys(&self) -> impl Iterator { + self.ignored.iter().map(|(k, _)| k.as_str()) + } +} + +/// Normalize a final state into a JSON value, excluding keys listed in `profile`. +/// +/// Identical to [`normalize_state`] except the caller can suppress named keys +/// from the `extra` map. Use this to compare only durable state when some extra +/// entries are per-invocation scratch that should not influence the comparison. +/// +/// # Examples +/// +/// ```rust +/// use weavegraph::runtimes::replay::{StateNormalizeProfile, normalize_state_with}; +/// use weavegraph::state::{StateKey, VersionedState}; +/// +/// const TICK: StateKey = StateKey::new("wq", "tick", 1).invocation_scoped(); +/// +/// let profile = StateNormalizeProfile::new().ignore_key(TICK); +/// let state = VersionedState::new_with_user_message("hello"); +/// let _value = normalize_state_with(&state, &profile); +/// ``` +#[must_use] +pub fn normalize_state_with(state: &VersionedState, profile: &StateNormalizeProfile) -> Value { + let mut extra = state.extra.snapshot(); + for key in profile.ignored_keys() { + extra.remove(key); + } + json!({ + "messages": state.messages.snapshot(), + "messages_version": state.messages.version(), + "extra": extra, + "extra_version": state.extra.version(), + "errors": state.errors.snapshot(), + "errors_version": state.errors.version(), + }) +} + +/// Compare two final states using a caller-provided normalization profile. +/// +/// Equivalent to [`compare_final_state`] but filters the `extra` map through +/// `profile` before comparing. Use this to assert that durable state matches +/// while ignoring known per-invocation scratch keys. +#[must_use] +pub fn compare_final_state_with( + left: &VersionedState, + right: &VersionedState, + profile: &StateNormalizeProfile, +) -> ReplayComparison { + let left_value = normalize_state_with(left, profile); + let right_value = normalize_state_with(right, profile); + if left_value == right_value { + ReplayComparison::matched() + } else { + ReplayComparison::with_differences(vec![format!( + "final state differs: left={left_value} right={right_value}" + )]) + } +} + +/// Compare two captured runs using a state profile and a caller-provided event normalizer. +/// +/// Combines [`compare_final_state_with`] and [`compare_event_sequences_with`] into +/// a single assertion. Use this as the single call in iterative resume tests that +/// need both durable-state filtering and custom event normalization. +#[must_use] +pub fn compare_replay_runs_with_profile( + left: &ReplayRun, + right: &ReplayRun, + state_profile: &StateNormalizeProfile, + event_normalizer: F, +) -> ReplayComparison +where + F: Fn(&Event) -> Value, +{ + let mut differences = Vec::new(); + + let state_comparison = + compare_final_state_with(&left.final_state, &right.final_state, state_profile); + differences.extend(state_comparison.differences().iter().cloned()); + + let event_comparison = + compare_event_sequences_with(&left.events, &right.events, event_normalizer); + differences.extend(event_comparison.differences().iter().cloned()); + + ReplayComparison::with_differences(differences) +} diff --git a/src/runtimes/runner.rs b/src/runtimes/runner.rs index 1266d63..15dc3fe 100644 --- a/src/runtimes/runner.rs +++ b/src/runtimes/runner.rs @@ -13,12 +13,18 @@ use crate::app::{App, BarrierOutcome}; use crate::channels::Channel; use crate::channels::errors::{ErrorEvent, ErrorScope, WeaveError}; use crate::control::{FrontierCommand, NodeRoute}; +use crate::event_bus::emitter::{EmitterError, EventEmitter}; +use crate::event_bus::event::Event; use crate::event_bus::{EventBus, EventStream}; use crate::node::NodePartial; use crate::runtimes::CheckpointerType; use crate::runtimes::execution::{ PausedReason, PausedReport, SchedulerOutcome, StepOptions, StepReport, StepResult, }; +use crate::runtimes::observer::{ + CheckpointLoadMeta, CheckpointSaveMeta, EventBusEmitMeta, InvocationFinishMeta, + InvocationOutcome, InvocationStartMeta, NodeFinishMeta, NodeOutcome, RuntimeObserver, +}; use crate::runtimes::session::{SessionInit, SessionState, StateVersions}; use crate::runtimes::streaming::{StreamEndReason, emit_invocation_end, finalize_event_stream}; use crate::runtimes::{ @@ -29,11 +35,67 @@ use crate::state::VersionedState; use crate::types::NodeKind; use crate::utils::clock::Clock; use rustc_hash::FxHashMap; +use std::fmt; use std::sync::Arc; use thiserror::Error; use tokio::task::JoinError; use tracing::instrument; +// ============================================================================ +// Private helpers +// ============================================================================ + +/// An [`EventEmitter`] wrapper that calls an observer's `on_event_bus_emit` +/// hook after each successful (or failed) emit attempt. +/// +/// Built lazily in `schedule_step` when an observer is present; otherwise the +/// raw emitter is used directly, paying zero overhead. +struct ObservingEmitter { + inner: Arc, + observer: Arc, +} + +impl fmt::Debug for ObservingEmitter { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ObservingEmitter") + .field("observer", &self.observer) + .finish_non_exhaustive() + } +} + +impl EventEmitter for ObservingEmitter { + fn emit(&self, event: Event) -> Result<(), EmitterError> { + let scope = event.scope_label().unwrap_or("unknown").to_owned(); + let result = self.inner.emit(event); + let meta = EventBusEmitMeta { scope: &scope }; + // Safety: `Arc` is AssertUnwindSafe-safe + // because we require RefUnwindSafe as a supertrait on RuntimeObserver. + if std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { + self.observer.on_event_bus_emit(&meta) + })) + .is_err() + { + tracing::warn!("RuntimeObserver::on_event_bus_emit panicked; execution continues"); + } + result + } +} + +/// Call an observer hook, catching any panic and logging it as a warning. +/// +/// Hooks must not kill graph execution; this helper enforces that contract. +fn call_observer_hook(f: F, hook_name: &'static str) +where + F: FnOnce() + std::panic::UnwindSafe, +{ + if std::panic::catch_unwind(f).is_err() { + tracing::warn!( + hook = hook_name, + "RuntimeObserver hook panicked; execution continues" + ); + } +} + /// Runtime execution engine for workflow graphs with session management and event streaming. /// /// `AppRunner` wraps an [`App`] and manages the runtime execution environment, @@ -139,11 +201,13 @@ pub struct AppRunner { event_stream_taken: bool, clock: Option>, checkpointer_descriptor: String, + observer: Option>, } /// Errors that can occur during workflow execution. #[derive(Debug, Error)] #[cfg_attr(feature = "diagnostics", derive(miette::Diagnostic))] +#[non_exhaustive] pub enum RunnerError { /// The requested session was not found. #[error("session not found: {session_id}")] @@ -248,6 +312,7 @@ pub struct RunMetadata { struct RunnerRuntimeMetadata { clock: Option>, checkpointer_descriptor: String, + observer: Option>, } #[derive(Clone, Copy, Debug, PartialEq, Eq)] @@ -342,6 +407,7 @@ pub struct AppRunnerBuilder { event_bus: Option, start_listener: bool, clock: Option>, + observer: Option>, } impl Default for AppRunnerBuilder { @@ -368,6 +434,7 @@ impl AppRunnerBuilder { event_bus: None, start_listener: true, clock: None, + observer: None, } } @@ -444,6 +511,42 @@ impl AppRunnerBuilder { self } + /// Attach a [`RuntimeObserver`] to receive telemetry hooks during execution. + /// + /// The observer is called synchronously at invocation boundaries, per-node + /// completion, checkpoint operations, and event-bus emissions. It pays zero + /// runtime cost when not set (`None` default). + /// + /// # Examples + /// + /// ```rust,no_run + /// use std::sync::Arc; + /// use weavegraph::runtimes::{AppRunner, observer::{RuntimeObserver, NodeFinishMeta}}; + /// # use weavegraph::app::App; + /// + /// #[derive(Debug)] + /// struct LogObserver; + /// + /// impl RuntimeObserver for LogObserver { + /// fn on_node_finish(&self, meta: &NodeFinishMeta<'_>) { + /// tracing::info!(node = ?meta.node_kind, step = meta.step, "node finished"); + /// } + /// } + /// + /// # async fn example(app: App) { + /// let runner = AppRunner::builder() + /// .app(app) + /// .observer(Arc::new(LogObserver)) + /// .build() + /// .await; + /// # } + /// ``` + #[must_use] + pub fn observer(mut self, observer: Arc) -> Self { + self.observer = Some(observer); + self + } + /// Build the [`AppRunner`]. /// /// # Panics @@ -471,6 +574,7 @@ impl AppRunnerBuilder { let runtime_metadata = RunnerRuntimeMetadata { clock, checkpointer_descriptor, + observer: self.observer, }; Some( @@ -625,6 +729,7 @@ impl AppRunner { event_stream_taken: false, clock: runtime_metadata.clock, checkpointer_descriptor: runtime_metadata.checkpointer_descriptor, + observer: runtime_metadata.observer, } } @@ -659,7 +764,22 @@ impl AppRunner { if let Some(stored) = restored_checkpoint { let restored = restore_session_state(&stored); - self.sessions.insert(session_id, restored); + let restored_step = stored.step; + self.sessions.insert(session_id.clone(), restored); + if let Some(obs) = &self.observer { + let backend = self.checkpointer_descriptor.as_str(); + let sid = session_id.as_str(); + call_observer_hook( + || { + obs.on_checkpoint_load(&CheckpointLoadMeta { + session_id: sid, + backend, + step: restored_step, + }) + }, + "on_checkpoint_load", + ); + } return Ok(SessionInit::Resumed { checkpoint_step: stored.step, }); @@ -1028,6 +1148,15 @@ impl AppRunner { step: u64, ) -> Result { let snapshot = session_state.state.snapshot(); + // If an observer is attached, wrap the emitter to fire on_event_bus_emit for each emit. + let emitter: Arc = if let Some(obs) = &self.observer { + Arc::new(ObservingEmitter { + inner: self.event_bus.get_emitter(), + observer: Arc::clone(obs), + }) + } else { + self.event_bus.get_emitter() + }; let result = session_state .scheduler .superstep( @@ -1037,7 +1166,7 @@ impl AppRunner { snapshot.clone(), step, SchedulerRunContext { - event_emitter: self.event_bus.get_emitter(), + event_emitter: emitter, clock: self.clock.clone(), invocation_id: Some(session_id.to_string()), }, @@ -1197,9 +1326,27 @@ impl AppRunner { && let Some(checkpointer) = &self.checkpointer && let Some(session_state) = self.sessions.get(session_id) { - let _ = checkpointer + let start = std::time::Instant::now(); + let result = checkpointer .save(Checkpoint::from_session(session_id, session_state)) .await; + let duration_ms = start.elapsed().as_millis() as u64; + if result.is_ok() + && let Some(obs) = &self.observer + { + let backend = self.checkpointer_descriptor.as_str(); + call_observer_hook( + || { + obs.on_checkpoint_save(&CheckpointSaveMeta { + session_id, + backend, + step, + duration_ms, + }) + }, + "on_checkpoint_save", + ); + } } }) .await; @@ -1217,6 +1364,7 @@ impl AppRunner { ) -> Result { session_state.step += 1; let step = session_state.step; + let step_start = std::time::Instant::now(); tracing::debug!(step, "starting superstep"); @@ -1285,6 +1433,39 @@ impl AppRunner { extra_version: session_state.state.extra.version(), }; + // Emit per-node finish hooks (step-level timing, shared across all nodes in superstep). + if let Some(obs) = &self.observer { + let step_duration_ms = step_start.elapsed().as_millis() as u64; + for node_kind in &scheduler_outcome.ran_nodes { + call_observer_hook( + || { + obs.on_node_finish(&NodeFinishMeta { + node_kind, + session_id, + step, + step_duration_ms, + outcome: NodeOutcome::Completed, + }) + }, + "on_node_finish", + ); + } + for node_kind in &scheduler_outcome.skipped_nodes { + call_observer_hook( + || { + obs.on_node_finish(&NodeFinishMeta { + node_kind, + session_id, + step, + step_duration_ms, + outcome: NodeOutcome::Skipped, + }) + }, + "on_node_finish", + ); + } + } + Ok(StepReport { step, ran_nodes: scheduler_outcome.ran_nodes, @@ -1296,7 +1477,11 @@ impl AppRunner { }) } - /// Run until completion (End nodes or no frontier) - the canonical execution method + /// Runs the workflow to completion (until End nodes or an empty frontier is reached). + /// + /// This is the canonical single-invocation execution method. For iterative + /// (multi-input) sessions, use [`create_iterative_session`](Self::create_iterative_session) + /// and [`invoke_next`](Self::invoke_next) instead. #[instrument(skip(self, session_id), err)] pub async fn run_until_complete( &mut self, @@ -1313,6 +1498,22 @@ impl AppRunner { ) -> Result { tracing::info!(session = %session_id, "workflow run started"); + let graph_id = self.app.graph_definition_hash(); + if let Some(obs) = &self.observer { + let sid = session_id; + let gid = graph_id.as_str(); + call_observer_hook( + || { + obs.on_invocation_start(&InvocationStartMeta { + session_id: sid, + graph_id: gid, + }) + }, + "on_invocation_start", + ); + } + let invocation_start = std::time::Instant::now(); + loop { // Check if we're done before trying to run let session_state = @@ -1345,6 +1546,23 @@ impl AppRunner { }, completion_policy, ); + if let Some(obs) = &self.observer { + let duration_ms = invocation_start.elapsed().as_millis() as u64; + let graph_id = self.app.graph_definition_hash(); + let sid = session_id; + let gid = graph_id.as_str(); + call_observer_hook( + || { + obs.on_invocation_finish(&InvocationFinishMeta { + session_id: sid, + graph_id: gid, + duration_ms, + outcome: InvocationOutcome::Error, + }) + }, + "on_invocation_finish", + ); + } return Err(err); } }; @@ -1414,6 +1632,21 @@ impl AppRunner { StreamEndReason::Completed { step: final_step }, completion_policy, ); + if let Some(obs) = &self.observer { + let duration_ms = invocation_start.elapsed().as_millis() as u64; + let gid = graph_id.as_str(); + call_observer_hook( + || { + obs.on_invocation_finish(&InvocationFinishMeta { + session_id, + graph_id: gid, + duration_ms, + outcome: InvocationOutcome::Completed, + }) + }, + "on_invocation_finish", + ); + } Ok(final_state) } diff --git a/src/state.rs b/src/state.rs index 43bb91d..c6befb0 100644 --- a/src/state.rs +++ b/src/state.rs @@ -38,6 +38,7 @@ use rustc_hash::FxHashMap; use serde::{Serialize, de::DeserializeOwned}; use serde_json::Value; +use std::hash::{Hash, Hasher}; use std::marker::PhantomData; use thiserror::Error; @@ -46,17 +47,53 @@ use crate::{ message::{Message, Role}, }; +/// Lifecycle classification for a state slot. +/// +/// A slot's lifecycle is **metadata** — it does not change the storage key or +/// affect `PartialEq` / `Hash` comparisons. Two `StateKey` values with the +/// same `(namespace, name, schema_version)` but different lifecycle annotations +/// refer to the same underlying storage slot and compare as equal. +/// +/// Lifecycle is consumed by [`StateNormalizeProfile`](crate::runtimes::replay::StateNormalizeProfile) +/// and by [`NodePartial::clear_typed_extra_key`](crate::node::NodePartial::clear_typed_extra_key) +/// to distinguish durable state from per-invocation scratch values. +/// +/// # Registration-time conflict detection +/// +/// When you register a key with a lifecycle annotation (e.g. via +/// `StateNormalizeProfile::ignore_key`), the profile detects and panics on +/// conflicting annotations for the same storage key. This catches the common +/// mistake of defining the same slot constant twice with different lifecycle +/// policies. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum StateLifecycle { + /// The slot contains durable state that persists across invocations. + /// + /// This is the default. + Durable, + /// The slot contains per-invocation scratch data that should be excluded + /// from durable state comparisons and resume normalization. + InvocationScoped, +} + /// A schema-versioned key for typed values stored in [`VersionedState::extra`]. /// /// `StateKey` is a thin helper over the JSON-compatible `extra` map. Domain /// crates can define constants and use them from nodes, reducers, tests, and /// replay code without repeating string literals. /// +/// # Equality and hashing +/// +/// `PartialEq`, `Eq`, and `Hash` are based solely on `(namespace, name, +/// schema_version)`. The `lifecycle` field is metadata and is **excluded** +/// from equality comparisons, so that two keys for the same slot compare equal +/// regardless of their lifecycle annotation. +/// /// # Examples /// /// ```rust /// use serde::{Deserialize, Serialize}; -/// use weavegraph::state::StateKey; +/// use weavegraph::state::{StateKey, StateLifecycle}; /// /// #[derive(Serialize, Deserialize)] /// struct PortfolioSnapshot { @@ -66,13 +103,23 @@ use crate::{ /// const PORTFOLIO: StateKey = /// StateKey::new("wq", "portfolio_snapshot", 1); /// +/// const CURRENT_EVENT: StateKey = +/// StateKey::new("wq", "event", 1).invocation_scoped(); +/// /// assert_eq!(PORTFOLIO.storage_key(), "wq:portfolio_snapshot:v1"); +/// assert_eq!(CURRENT_EVENT.lifecycle(), StateLifecycle::InvocationScoped); +/// // Same slot identity regardless of lifecycle annotation: +/// assert_eq!( +/// StateKey::::new("wq", "event", 1), +/// StateKey::::new("wq", "event", 1).invocation_scoped(), +/// ); /// ``` -#[derive(Debug, PartialEq, Eq, Hash)] +#[derive(Debug)] pub struct StateKey { namespace: &'static str, name: &'static str, schema_version: u32, + lifecycle: StateLifecycle, _marker: PhantomData T>, } @@ -84,17 +131,65 @@ impl Clone for StateKey { impl Copy for StateKey {} +// Equality and Hash intentionally exclude `lifecycle` — it is metadata, +// not identity. Two keys for the same slot are the same key. +impl PartialEq for StateKey { + fn eq(&self, other: &Self) -> bool { + self.namespace == other.namespace + && self.name == other.name + && self.schema_version == other.schema_version + } +} + +impl Eq for StateKey {} + +impl Hash for StateKey { + fn hash(&self, state: &mut H) { + self.namespace.hash(state); + self.name.hash(state); + self.schema_version.hash(state); + } +} + impl StateKey { - /// Create a typed state key. + /// Create a typed state key with [`StateLifecycle::Durable`] (default). pub const fn new(namespace: &'static str, name: &'static str, schema_version: u32) -> Self { Self { namespace, name, schema_version, + lifecycle: StateLifecycle::Durable, _marker: PhantomData, } } + /// Return a copy of this key annotated as [`StateLifecycle::InvocationScoped`]. + /// + /// The returned key compares equal to the original — lifecycle is metadata, + /// not identity. Use this when defining constants that represent + /// per-invocation scratch slots so that normalization profiles and cleanup + /// helpers can distinguish them from durable state. + /// + /// ```rust + /// use weavegraph::state::{StateKey, StateLifecycle}; + /// + /// const TICK_EVENT: StateKey = + /// StateKey::new("wq", "tick_event", 1).invocation_scoped(); + /// + /// assert_eq!(TICK_EVENT.lifecycle(), StateLifecycle::InvocationScoped); + /// ``` + #[must_use] + pub const fn invocation_scoped(mut self) -> Self { + self.lifecycle = StateLifecycle::InvocationScoped; + self + } + + /// Return the lifecycle classification of this key. + #[must_use] + pub fn lifecycle(&self) -> StateLifecycle { + self.lifecycle + } + /// Return the namespace component. #[must_use] pub fn namespace(&self) -> &'static str { @@ -127,6 +222,7 @@ impl StateKey { /// Errors produced by typed state-slot helpers. #[derive(Debug, Error)] #[cfg_attr(feature = "diagnostics", derive(miette::Diagnostic))] +#[non_exhaustive] pub enum StateSlotError { /// The requested typed slot was not present in the state. #[error("state slot not found: {key}")]