Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 34 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,40 @@ All notable changes to Weavegraph will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]
## [0.6.0] - 2026-05-11

### Added

#### WG-006 — Invocation-scoped state lifecycle + normalization profiles

- `StateLifecycle` enum (`Durable` / `InvocationScoped`) in `weavegraph::state`.
- `StateKey<T>::invocation_scoped()` const builder — marks a key as invocation-scoped without changing its identity (equality / hash exclude the lifecycle field).
- `StateKey<T>::lifecycle()` getter returning the stored `StateLifecycle`.
- `StateNormalizeProfile` in `weavegraph::runtimes::replay` — fluent builder for specifying which state keys to ignore during replay comparison. Supports both typed (`ignore_key<T>(StateKey<T>)`) and raw-string (`ignore_extra_keys(impl IntoIterator<Item = &str>)`) forms. Panics at construction time if two registrations conflict on lifecycle annotation.
- `normalize_state_with(state, profile)` — normalizes a `VersionedState` snapshot to a `serde_json::Value` after dropping ignored keys.
- `compare_final_state_with(left, right, profile)` — variant of `compare_final_state` accepting a `StateNormalizeProfile`.
- `compare_replay_runs_with_profile(left, right, profile, event_normalizer)` — variant of `compare_replay_runs_with` accepting a `StateNormalizeProfile`.
- `NodePartial::clear_extra_keys(keys)` — **deletes** the given raw keys from state on the next barrier application. Uses JSON Merge Patch semantics: `MapMerge` now removes keys whose incoming value is `null` (RFC 7396). No separate cleanup reducer is needed.
- `NodePartial::clear_typed_extra_key(key)` — typed companion to `clear_extra_keys`; uses the `StateKey`'s storage key.

#### WG-007 — Runtime observability hooks + metrics adapter

- `RuntimeObserver` trait in `weavegraph::runtimes::observer` — zero-cost (no allocation, no virtual dispatch when unused), always compiled, no feature gate. All methods have default no-op bodies; implementors override only what they need.
- Hook methods: `on_invocation_start`, `on_invocation_finish`, `on_node_finish`, `on_checkpoint_load`, `on_checkpoint_save`, `on_event_bus_emit`.
- Metadata structs (all `#[non_exhaustive]`): `InvocationStartMeta`, `InvocationFinishMeta`, `NodeFinishMeta`, `CheckpointLoadMeta`, `CheckpointSaveMeta`, `EventBusEmitMeta`.
- Outcome enums (all `#[non_exhaustive]`): `InvocationOutcome`, `NodeOutcome`.
- `AppRunnerBuilder::observer(Arc<dyn RuntimeObserver>)` — attaches an observer; no-op overhead when not set.
- Observer panics are caught via `std::panic::catch_unwind` and logged as `tracing::warn!` — a misbehaving observer cannot abort a workflow.
- `ObservingEmitter` (private) — wraps the event bus emitter to fire `on_event_bus_emit` for every emitted event when an observer is attached.
- `MetricsObserver` in `weavegraph::runtimes::metrics_observer` — a `RuntimeObserver` impl that emits standard counters and histograms via the `metrics` crate facade. Available under the `metrics` feature flag.
- Counters: `weavegraph.node.invocations` (labels: `node`, `outcome`), `weavegraph.invocation.count` (`outcome`), `weavegraph.checkpoint.saves` (`backend`), `weavegraph.checkpoint.loads` (`backend`), `weavegraph.event_bus.emits` (`scope`).
- Histograms: `weavegraph.node.step_duration_ms` (`node`), `weavegraph.invocation.duration_ms`, `weavegraph.checkpoint.save_duration_ms` (`backend`).

### Changed (breaking)

- `RunnerError`, `NodeError`, `CheckpointerError`, `StateSlotError`, and `ReplayConformanceError` are now `#[non_exhaustive]`. Exhaustive `match` arms on these types must add a wildcard `_` arm.
- Migration: replace `_ => unreachable!()` with `_ => { /* handle future variants */ }` where appropriate.
- **`MapMerge` reducer now deletes keys whose incoming value is `null`** (JSON Merge Patch / RFC 7396). Previously a `null` was written into state as-is. Any code that deliberately stored `serde_json::Value::Null` via `with_extra` should use a sentinel value instead (e.g. a JSON object with an `absent: true` field).

## [0.5.0] - 2026-05-08

Expand Down
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "weavegraph"
version = "0.5.0"
version = "0.6.0"
edition = "2024"
description = "Graph-driven, concurrent agent workflow framework with versioned state, deterministic barrier merges, and rich diagnostics."
license = "MIT"
Expand Down Expand Up @@ -100,6 +100,7 @@ reqwest = { version = "0.13", default-features = false, features = [
"rustls",
], optional = true }
scraper = { version = "0.25", optional = true }
metrics = { version = "0.24", optional = true }
# wg-ragsmith removed from dependencies to avoid circular dependency.
# For RAG examples, see the wg-ragsmith crate directly.

Expand All @@ -120,6 +121,7 @@ postgres = ["sqlx"]
rig = ["dep:rig-core", "dep:rmcp"]
diagnostics = ["dep:miette"]
examples = ["reqwest", "scraper"]
metrics = ["dep:metrics"]
petgraph-compat = ["petgraph"]

[[example]]
Expand Down
130 changes: 130 additions & 0 deletions docs/MIGRATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,136 @@ migration guidance for upgrading your code.

---

## v0.6.0

### Overview

v0.6.0 adds two major feature groups (WG-006 and WG-007) with only one breaking change: five
public error enums are now `#[non_exhaustive]`. The new APIs are purely additive and backward-compatible.

### Breaking: `#[non_exhaustive]` error enums

The following enums are now `#[non_exhaustive]` to allow adding new variants in future minor releases without a breaking change:

- `RunnerError`
- `NodeError`
- `CheckpointerError`
- `StateSlotError`
- `ReplayConformanceError`

**Migration**: Any exhaustive `match` on these types will now fail to compile. Add a wildcard arm:

```rust
// Before (0.5.0)
match err {
RunnerError::SessionNotFound(_) => { /* ... */ }
RunnerError::StepFailed(_) => { /* ... */ }
// compiler accepted this as exhaustive
}

// After (0.6.0)
match err {
RunnerError::SessionNotFound(_) => { /* ... */ }
RunnerError::StepFailed(_) => { /* ... */ }
_ => { /* handle any future variants */ }
}
```

### Breaking: `MapMerge` now deletes keys with null values (RFC 7396)

`MapMerge`, the built-in reducer for `VersionedState.extra`, previously wrote `serde_json::Value::Null` verbatim into the state map. It now **removes** the key when the incoming value is `null`, following [JSON Merge Patch](https://www.rfc-editor.org/rfc/rfc7396) semantics.

**Impact**: If your graph stores `serde_json::Value::Null` intentionally as a meaningful value (as opposed to an absence marker), replace it with an explicit sentinel, for example:

```json
{ "absent": true }
```

**Benefit**: `NodePartial::clear_extra_keys` and `clear_typed_extra_key` now fully delete keys — no wrapper reducer or post-processing is required.

### New: Invocation-scoped state slots (WG-006)

Mark a `StateKey<T>` as `InvocationScoped` using the new const builder:

```rust
const SCRATCH: StateKey<ScratchPad> = StateKey::new("wq", "scratch", 1)
.invocation_scoped();
```

Keys marked `InvocationScoped` compare equal to a `Durable` key with the same `(namespace, name, schema_version)` — lifecycle is intentionally excluded from equality and hashing so that the same slot can be used across invocations without registry conflicts.

To clear invocation-scoped slots at re-entry (e.g. in iterative sessions), call `clear_typed_extra_key` on the outgoing `NodePartial`:

```rust
partial.clear_typed_extra_key(SCRATCH)
```

This **deletes** the key from `VersionedState.extra` — no separate cleanup reducer is needed.
`MapMerge` (the built-in extra reducer) now follows JSON Merge Patch semantics (RFC 7396):
an incoming `null` removes the key rather than storing a null value.

### New: Replay normalization profiles (WG-006)

Use `StateNormalizeProfile` to ignore volatile or invocation-scoped keys during replay comparison:

```rust
let profile = StateNormalizeProfile::new()
.ignore_key(SCRATCH) // typed — records lifecycle for conflict detection
.ignore_extra_keys(["ts"]); // raw string — no lifecycle

let comparison = compare_final_state_with(&run_a, &run_b, &profile);
assert!(comparison.is_equivalent());
```

### New: Runtime observer (WG-007)

Attach a `RuntimeObserver` to an `AppRunner` for structured hook callbacks at key lifecycle points:

```rust
#[derive(Debug)]
struct MyObserver;

impl RuntimeObserver for MyObserver {
fn on_invocation_finish(&self, meta: &InvocationFinishMeta<'_>) {
println!("run {} completed in {}ms", meta.session_id, meta.duration_ms);
}
}

let runner = AppRunner::builder()
.app(app)
.observer(Arc::new(MyObserver))
.build()
.await?;
```

When no observer is attached, there is zero overhead — the observer field is `Option<Arc<...>>`.

Observer panics are caught via `catch_unwind` and logged as warnings; a misbehaving observer
cannot crash or abort a workflow invocation.

### New: MetricsObserver (WG-007, `metrics` feature)

Enable the `metrics` feature and attach `MetricsObserver` to export standard Prometheus-compatible metrics via any `metrics`-crate recorder (e.g. `metrics-exporter-prometheus`):

```toml
weavegraph = { version = "0.6", features = ["metrics"] }
metrics-exporter-prometheus = "0.17"
```

```rust
use weavegraph::runtimes::MetricsObserver;

let runner = AppRunner::builder()
.app(app)
.observer(Arc::new(MetricsObserver))
.build()
.await?;
```

See the `metrics_observer` module docs for the full metric inventory.

---

## v0.5.0

### Overview
Expand Down
54 changes: 54 additions & 0 deletions src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,59 @@ impl NodePartial {
self.frontier = Some(command);
self
}

/// Remove the given extra keys from state on the next barrier application.
///
/// Writes `serde_json::Value::Null` markers into the partial. [`MapMerge`](crate::reducers::MapMerge)
/// (the built-in extra reducer) follows JSON Merge Patch semantics (RFC 7396) and
/// **deletes** keys whose incoming value is `null`, so no separate cleanup reducer
/// is needed.
///
/// # Examples
///
/// ```rust
/// use weavegraph::node::NodePartial;
///
/// let partial = NodePartial::new()
/// .clear_extra_keys(["wq:feature_snapshot:v1", "wq:signal_event:v1"]);
/// ```
#[must_use]
pub fn clear_extra_keys<I, S>(mut self, keys: I) -> Self
where
I: IntoIterator<Item = S>,
S: Into<String>,
{
let extra = self.extra.get_or_insert_with(FxHashMap::default);
for key in keys {
extra.insert(key.into(), serde_json::Value::Null);
}
self
}

/// Remove a single typed extra key from state on the next barrier application.
///
/// Typed companion to [`clear_extra_keys`](Self::clear_extra_keys). The storage key
/// is derived from the `StateKey`'s `(namespace, name, schema_version)` triple so
/// that the same constant used to write a value can be used to delete it.
///
/// [`MapMerge`](crate::reducers::MapMerge) deletes keys with null values (RFC 7396),
/// so no separate cleanup reducer is needed.
///
/// # Examples
///
/// ```rust
/// use weavegraph::node::NodePartial;
/// use weavegraph::state::{StateKey, StateLifecycle};
///
/// const CURRENT_EVENT: StateKey<u64> =
/// StateKey::new("wq", "event", 1).invocation_scoped();
///
/// let partial = NodePartial::new().clear_typed_extra_key(CURRENT_EVENT);
/// ```
#[must_use]
pub fn clear_typed_extra_key<T>(self, key: crate::state::StateKey<T>) -> Self {
self.clear_extra_keys([key.storage_key()])
}
}

// ============================================================================
Expand Down Expand Up @@ -431,6 +484,7 @@ pub enum NodeContextError {
/// use `NodePartial.errors` instead.
#[derive(Debug, Error)]
#[cfg_attr(feature = "diagnostics", derive(miette::Diagnostic))]
#[non_exhaustive]
pub enum NodeError {
/// Expected input data is missing from the state snapshot.
#[error("missing expected input: {what}")]
Expand Down
18 changes: 17 additions & 1 deletion src/reducers/map_merge.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,20 @@
//! Reducer that shallow-merges incoming extra key-value pairs into the extras channel.
//!
//! Follows the [JSON Merge Patch](https://www.rfc-editor.org/rfc/rfc7396) convention:
//! an incoming `null` value **removes** the key from state rather than setting it to null.
//! This is what makes [`NodePartial::clear_extra_keys`](crate::node::NodePartial::clear_extra_keys)
//! and [`NodePartial::clear_typed_extra_key`](crate::node::NodePartial::clear_typed_extra_key)
//! functional without requiring a separate cleanup reducer.
use super::Reducer;
use crate::{channels::Channel, node::NodePartial, state::VersionedState};

/// Reducer that merges extra key-value pairs from a [`NodePartial`](crate::node::NodePartial) into the state extras channel.
///
/// Uses JSON Merge Patch semantics (RFC 7396): an incoming `null` value **removes** the
/// key from state rather than writing a null entry. This means
/// [`NodePartial::clear_extra_keys`](crate::node::NodePartial::clear_extra_keys) and
/// [`NodePartial::clear_typed_extra_key`](crate::node::NodePartial::clear_typed_extra_key)
/// fully delete the key — no separate cleanup reducer is needed.
#[derive(Debug, PartialEq, Clone, Hash, Eq)]
pub struct MapMerge;
impl Reducer for MapMerge {
Expand All @@ -12,7 +24,11 @@ impl Reducer for MapMerge {
{
let state_map = state.extra.get_mut();
for (k, v) in extras_update.iter() {
state_map.insert(k.clone(), v.clone());
if v.is_null() {
state_map.remove(k);
} else {
state_map.insert(k.clone(), v.clone());
}
}
}
}
Expand Down
1 change: 1 addition & 0 deletions src/runtimes/checkpointer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ impl Checkpoint {
/// Errors from checkpointer operations.
#[derive(Debug, thiserror::Error)]
#[cfg_attr(feature = "diagnostics", derive(miette::Diagnostic))]
#[non_exhaustive]
pub enum CheckpointerError {
/// Session was not found in the checkpointer.
#[error("session not found: {session_id}")]
Expand Down
Loading
Loading