diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 409a9ed..c513dda 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -100,7 +100,7 @@ jobs: - uses: actions/checkout@v4 - uses: dtolnay/rust-toolchain@stable with: - toolchain: ${{ env.RUST_VERSION }} + toolchain: stable - uses: taiki-e/install-action@v2 with: tool: cargo-semver-checks @@ -108,7 +108,8 @@ jobs: with: prefix-key: semver-checks - name: Check semver - run: cargo semver-checks check-release --workspace + # cargo-semver-checks requires rustc >= 1.91.0; run on stable, not pinned MSRV + run: cargo +stable semver-checks check-release --workspace deny: name: cargo deny diff --git a/CHANGELOG.md b/CHANGELOG.md index cbe340a..887a06e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,27 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [0.4.0] - 2026-04-01 + +### Added +- `DIAGNOSTIC_SCOPE` constant exported from `weavegraph::event_bus` for identifying internal diagnostic events +- `examples/production_streaming.rs` — golden-path reference for Axum + SSE + Postgres checkpointing +- `[[example]]` entry with `required-features = ["postgres", "examples"]` for `production_streaming` +- `#![warn(missing_docs)]` now enforced — all 228 previously undocumented public API items are documented + +### Changed +- `RuntimeConfig::new()` signature changed: removed middle `checkpointer: Option` parameter; now takes `(session_id: Option, sqlite_db_name: Option)` +- Feature flags table in crate-level docs updated to remove the removed `llm` alias +- `docs/MIGRATION.md` updated with v0.3.0 → v0.4.0 migration guide + +### Removed +- **BREAKING**: `Message::new(role: &str, content: &str)` removed (deprecated since v0.3.0) — use `Message::with_role(Role::..., ...)` or convenience constructors +- **BREAKING**: `RuntimeConfig.checkpointer` field removed — configure checkpointer via `AppRunner::builder().checkpointer(...)` +- **BREAKING**: `RuntimeConfig::with_checkpointer()` and `RuntimeConfig::checkpointer_type()` removed +- **BREAKING**: `AppRunner::new()`, `from_arc()`, `with_options()`, `with_options_arc()`, `with_options_and_bus()`, `with_options_arc_and_bus()` removed (deprecated since v0.2.0) — use `AppRunner::builder()` +- **BREAKING**: `LadderError` type alias removed (deprecated since v0.3.0) — use `WeaveError` directly +- **BREAKING**: `llm` feature flag alias removed (deprecated since v0.3.0) — use `features = ["rig"]` + ## [0.3.0] - 2026-03-07 ### Added diff --git a/Cargo.toml b/Cargo.toml index 893f979..d0b8a1e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "weavegraph" -version = "0.3.0" +version = "0.4.0" edition = "2024" description = "Graph-driven, concurrent agent workflow framework with versioned state, deterministic barrier merges, and rich diagnostics." license = "MIT" @@ -118,11 +118,14 @@ sqlite = ["sqlx"] postgres-migrations = ["postgres"] postgres = ["sqlx"] rig = ["dep:rig-core", "dep:rmcp"] -llm = ["rig"] diagnostics = ["dep:miette"] examples = ["reqwest", "scraper"] petgraph-compat = ["petgraph"] +[[example]] +name = "production_streaming" +required-features = ["postgres", "examples"] + [[bench]] name = "event_bus_throughput" harness = false diff --git a/docs/MIGRATION.md b/docs/MIGRATION.md index 8d0b912..94dc828 100644 --- a/docs/MIGRATION.md +++ b/docs/MIGRATION.md @@ -5,6 +5,167 @@ migration guidance for upgrading your code. --- +## v0.4.0 + +### Overview + +v0.4.0 is the **API freeze** release. All items deprecated in v0.2.0 and v0.3.0 +have been removed. No new public APIs were added. If you are already on v0.3.0 +with no deprecation warnings, upgrading requires only the signature change to +`RuntimeConfig::new()`. + +### Breaking Changes + +#### 1. `Message::new(role: &str, content: &str)` removed + +**Removed in:** v0.4.0 (deprecated since v0.3.0) + +Use the typed constructors instead: + +```rust +// Before +let m = Message::new("user", "hello"); + +// After — typed Role enum +let m = Message::with_role(Role::User, "hello"); + +// Or use the convenience constructors +let m = Message::user("hello"); +let m = Message::assistant("reply"); +let m = Message::system("you are a helpful assistant"); +``` + +--- + +#### 2. `RuntimeConfig::new()` signature changed + +**Removed in:** v0.4.0 + +The `checkpointer: Option` middle parameter is removed. + +```rust +// Before (v0.3.0) +let config = RuntimeConfig::new( + Some("session-id".into()), + Some(CheckpointerType::InMemory), + None, +); + +// After (v0.4.0) — two parameters only +let config = RuntimeConfig::new( + Some("session-id".into()), + None, // sqlite_db_name +); +``` + +Set the checkpointer type via `AppRunner::builder()`: + +```rust +AppRunner::builder() + .app_arc(app) + .checkpointer(CheckpointerType::SQLite) + .build() + .await?; +``` + +--- + +#### 3. `RuntimeConfig.checkpointer` field, `with_checkpointer()`, and `checkpointer_type()` removed + +**Removed in:** v0.4.0 + +Configure the checkpointer exclusively through `AppRunner::builder()`: + +```rust +// Before — field on RuntimeConfig +let config = RuntimeConfig { checkpointer: Some(CheckpointerType::Postgres), ..Default::default() }; +// or +let config = RuntimeConfig::default().with_checkpointer(CheckpointerType::Postgres); + +// After — builder method on AppRunner +AppRunner::builder() + .app_arc(app) + .checkpointer(CheckpointerType::Postgres) + .build() + .await?; + +// For a fully custom checkpointer — still on RuntimeConfig +let config = RuntimeConfig::new(None, None) + .checkpointer_custom(Arc::new(my_checkpointer)); +``` + +--- + +#### 4. Legacy `AppRunner` constructors removed + +**Removed in:** v0.4.0 (deprecated since v0.2.0) + +All free-standing constructors have been removed. Use `AppRunner::builder()` exclusively: + +| Removed | Replacement | +|---------|-------------| +| `AppRunner::new(app)` | `AppRunner::builder().app(app).build().await` | +| `AppRunner::from_arc(app)` | `AppRunner::builder().app_arc(app).build().await` | +| `AppRunner::with_options(app, config)` | `AppRunner::builder().app(app)` + config methods | +| `AppRunner::with_options_arc(app, config)` | `AppRunner::builder().app_arc(app)` + config methods | +| `AppRunner::with_options_and_bus(app, config, bus)` | `AppRunner::builder().app(app).event_bus(bus)` | +| `AppRunner::with_options_arc_and_bus(app, config, bus)` | `AppRunner::builder().app_arc(app).event_bus(bus)` | + +```rust +// Before +let runner = AppRunner::with_options_and_bus(app, config, bus).await?; + +// After +let runner = AppRunner::builder() + .app(app) + .checkpointer(CheckpointerType::InMemory) + .event_bus(bus) + .build() + .await?; +``` + +--- + +#### 5. `LadderError` type alias removed + +**Removed in:** v0.4.0 (deprecated since v0.3.0) + +```rust +// Before +use weavegraph::channels::errors::LadderError; +fn my_fn() -> Result<(), LadderError> { ... } + +// After +use weavegraph::channels::errors::WeaveError; +fn my_fn() -> Result<(), WeaveError> { ... } +``` + +--- + +#### 6. `llm` feature flag alias removed + +**Removed in:** v0.4.0 (deprecated since v0.3.0) + +```toml +# Before +weavegraph = { version = "0.3", features = ["llm"] } + +# After +weavegraph = { version = "0.4", features = ["rig"] } +``` + +--- + +### New in v0.4.0 + +- `DIAGNOSTIC_SCOPE` constant exported from `weavegraph::event_bus` — use to + identify internal diagnostic events when filtering the event stream. +- `#![warn(missing_docs)]` is now enforced — all public API items are documented. +- `examples/production_streaming.rs` — golden-path reference for Axum + SSE + + Postgres checkpointing (requires `--features postgres,examples`). + +--- + ## v0.3.0 (Upcoming) ### Breaking Changes diff --git a/examples/convenience_streaming.rs b/examples/convenience_streaming.rs index d8ca112..bc25c0a 100644 --- a/examples/convenience_streaming.rs +++ b/examples/convenience_streaming.rs @@ -4,7 +4,7 @@ //! - `App::invoke_with_channel()` - Simple streaming with a channel //! - `App::invoke_with_sinks()` - Multiple custom sinks //! -//! These methods simplify the common case while the full `AppRunner::with_options_and_bus()` +//! These methods simplify the common case while the `AppRunner::builder()` //! pattern remains available for advanced use cases like web servers. //! //! ## When to Use Each Pattern @@ -19,7 +19,7 @@ //! - Single execution with custom event routing //! - More control than `invoke_with_channel()` //! -//! ### `AppRunner::with_options_and_bus()` - Web Servers +//! ### `AppRunner::builder()` - Web Servers //! - Per-request event isolation required //! - SSE or WebSocket streaming //! - Multiple concurrent clients @@ -215,7 +215,7 @@ async fn main() -> ExampleResult<()> { info!(" • More flexible than channel-only\n"); info!("💡 For web servers with per-request isolation:"); - info!(" Use AppRunner::with_options_and_bus() instead"); + info!(" Use AppRunner::builder() with .event_bus() instead"); info!(" (See examples/streaming_events.rs)\n"); Ok(()) diff --git a/examples/production_streaming.rs b/examples/production_streaming.rs new file mode 100644 index 0000000..5c26bab --- /dev/null +++ b/examples/production_streaming.rs @@ -0,0 +1,392 @@ +//! # Production Streaming: Axum SSE + Postgres Checkpointing +//! +//! The **golden path** reference example for production web-server consumers. +//! +//! Demonstrates the complete pattern for a production web server that: +//! +//! - Compiles a [`GraphBuilder`] with [`RuntimeConfig`] once at startup +//! - Shares the compiled [`App`] across concurrent requests via [`Arc`] +//! - Checkpoints state to Postgres via [`PostgresCheckpointer`] for durable resumption +//! - Streams workflow events to HTTP clients via Server-Sent Events (SSE) +//! - Terminates the SSE stream cleanly on [`STREAM_END_SCOPE`] +//! - Supports per-request cancellation via [`InvocationHandle::abort`] +//! - Handles node errors uniformly with [`NodeError::Other`] +//! +//! ## Architecture +//! +//! ```text +//! HTTP Client GET /run?prompt=hello +//! │ +//! ▼ +//! ┌──────────────────────────────────────────────────────┐ +//! │ Axum Handler run_handler() │ +//! │ ┌─ app.invoke_streaming(state) ──────────────────┐ │ +//! │ │ Returns (InvocationHandle, EventStream) │ │ +//! │ │ Workflow runs in background tokio task │ │ +//! │ └────────────────────────────────────────────────┘ │ +//! │ Returns Sse> │ +//! └──────────────────────────────────────────────────────┘ +//! │ +//! │ data: {"kind":"llm","message":"token1",...} +//! │ data: {"kind":"diagnostic","scope":"__weavegraph_stream_end__",...} +//! │ [stream closed by server] +//! ▼ +//! HTTP Client +//! ``` +//! +//! ## Per-Request Isolation +//! +//! Each request gets its own [`AppRunner`] (and therefore its own [`EventBus`]) +//! via [`App::invoke_streaming`]. The [`App`] itself is a cheap [`Arc`] clone. +//! This is the canonical concurrency pattern for streaming workflows. +//! +//! ## Feature Requirements +//! +//! ```bash +//! cargo run --example production_streaming --features postgres,examples +//! ``` +//! +//! Set `DATABASE_URL` before running: +//! +//! ```bash +//! export DATABASE_URL="postgres://postgres:postgres@localhost/weavegraph" +//! cargo run --example production_streaming --features postgres,examples +//! ``` +//! +//! ## Testing +//! +//! ```bash +//! curl -N "http://localhost:3000/run?prompt=hello+world" +//! ``` + +use std::{convert::Infallible, sync::Arc, time::Duration}; + +use async_trait::async_trait; +use axum::{ + Router, + extract::{Query, State}, + response::{ + IntoResponse, + sse::{Event as SseEvent, KeepAlive, Sse}, + }, + routing::get, +}; +use futures_util::{Stream, StreamExt}; +use serde::{Deserialize, Serialize}; +use tracing::{error, info, warn}; +use tracing_subscriber::{EnvFilter, layer::SubscriberExt, util::SubscriberInitExt}; + +use weavegraph::{ + app::{App, InvocationHandle}, + channels::Channel, + event_bus::{Event, EventStream, STREAM_END_SCOPE}, + graphs::GraphBuilder, + message::{Message, Role}, + node::{Node, NodeContext, NodeError, NodePartial, NodeResultExt}, + runtimes::{EventBusConfig, PostgresCheckpointer, RuntimeConfig}, + state::{StateSnapshot, VersionedState}, + types::NodeKind, +}; + +type BoxError = Box; + +// ============================================================================ +// Node definitions +// ============================================================================ + +/// Simulates an LLM node that streams a response token by token. +/// +/// In a real application this would call an LLM provider and emit each +/// chunk via [`NodeContext::emit`] so clients receive tokens as they arrive. +#[derive(Clone)] +struct LlmNode; + +#[async_trait] +impl Node for LlmNode { + async fn run( + &self, + snapshot: StateSnapshot, + ctx: NodeContext, + ) -> Result { + let prompt = snapshot + .messages + .last() + .map(|m| m.content.as_str()) + .unwrap_or("(no input)"); + + // Simulate token streaming — in production, replace with your LLM call. + // Note: ctx.emit() produces a NodeEvent (SSE kind="node"). Real LLM + // provider streaming via the `rig` feature produces Event::LLM (kind="llm"). + let tokens = [ + "Hello", + ", ", + "I", + " am", + " a", + " streaming", + " assistant", + "!", + ]; + for token in tokens { + ctx.emit("llm.token", format!("Response to '{}': {}", prompt, token))?; + // Simulate token generation latency. + tokio::time::sleep(Duration::from_millis(150)).await; + } + + Ok(NodePartial::new().with_messages(vec![Message::with_role( + Role::Assistant, + &format!("Response to '{}'", prompt), + )])) + } +} + +/// A validation node demonstrating `NodeError::Other` for recoverable failures. +/// +/// Input validation belongs at the node boundary where the error context is +/// richest. Use [`NodeResultExt::node_err`] to lift arbitrary errors into +/// [`NodeError::Other`] without losing the original message. +#[derive(Clone)] +struct ValidateNode; + +#[async_trait] +impl Node for ValidateNode { + async fn run( + &self, + snapshot: StateSnapshot, + _ctx: NodeContext, + ) -> Result { + let prompt = snapshot + .messages + .last() + .map(|m| m.content.as_str()) + .unwrap_or(""); + + if prompt.trim().is_empty() { + return Err(NodeError::Other("prompt must not be empty".into())); + } + + if prompt.len() > 4096 { + return Err(NodeError::Other( + format!("prompt too long: {} chars (max 4096)", prompt.len()).into(), + )); + } + + // Use NodeResultExt for fallible stdlib operations. + let _validated = std::str::from_utf8(prompt.as_bytes()).node_err()?; + + Ok(NodePartial::new()) + } +} + +// ============================================================================ +// Application state +// ============================================================================ + +/// Shared application state injected into every Axum handler. +#[derive(Clone)] +struct AppState { + app: Arc, +} + +// ============================================================================ +// HTTP handlers +// ============================================================================ + +#[derive(Debug, Deserialize)] +struct RunQuery { + #[serde(default = "default_prompt")] + prompt: String, +} + +fn default_prompt() -> String { + "Hello, weavegraph!".to_string() +} + +/// `GET /run?prompt=...` +/// +/// Starts a workflow invocation and returns an SSE stream of events. +/// +/// Each event is a JSON-serialized [`weavegraph::event_bus::Event`]. +/// The stream terminates with a special diagnostic event whose scope is +/// [`STREAM_END_SCOPE`]; consumers should close the connection on receipt. +/// +/// ## Per-Request Isolation +/// +/// Each request gets its own [`AppRunner`] (via `App::invoke_streaming`). +/// The shared [`App`] is a cheap [`Arc`] clone; only the runner (with its +/// own [`EventBus`]) is created per request. This is the canonical pattern +/// for concurrent SSE in production. +async fn run_handler( + State(state): State, + Query(query): Query, +) -> impl IntoResponse { + info!(prompt = %query.prompt, "starting workflow invocation"); + + let initial_state = VersionedState::new_with_user_message(&query.prompt); + + // invoke_streaming returns immediately; the workflow runs in a background task. + let (handle, event_stream) = state.app.invoke_streaming(initial_state).await; + + // Convert the EventStream into an SSE-compatible futures Stream. + let sse_stream = build_sse_stream(handle, event_stream); + + Sse::new(sse_stream).keep_alive( + KeepAlive::new() + .interval(Duration::from_secs(15)) + .text("keep-alive"), + ) +} + +/// Wraps the weavegraph [`EventStream`] as a futures [`Stream`] of SSE frames. +/// +/// - Serializes each event to JSON and wraps it in an `SseEvent`. +/// - Watches for [`STREAM_END_SCOPE`] to terminate the stream gracefully. +/// - Aborts the workflow task via [`InvocationHandle`] if the client disconnects. +fn build_sse_stream( + handle: InvocationHandle, + event_stream: EventStream, +) -> impl Stream> { + let handle = Arc::new(tokio::sync::Mutex::new(Some(handle))); + + // Convert EventStream into an async stream of SseEvent. + let stream = event_stream.into_async_stream().map(move |event| { + let is_end = event + .scope_label() + .map(|s| s == STREAM_END_SCOPE) + .unwrap_or(false); + + let payload = serde_json::to_string(&SsePayload::from(&event)) + .unwrap_or_else(|_| r#"{"error":"serialization failed"}"#.to_string()); + + let sse = SseEvent::default().data(payload); + + (sse, is_end) + }); + + // Take-while inclusive: emit the STREAM_END event, then stop. + futures_util::stream::unfold( + (stream.boxed(), false, handle), + move |(mut stream, done, handle)| async move { + if done { + // Join the workflow handle so its task is properly reaped. + if let Some(h) = handle.lock().await.take() { + match h.join().await { + Ok(state) => info!( + messages = state.messages.len(), + "workflow completed successfully" + ), + Err(e) => warn!(error = %e, "workflow ended with error"), + } + } + return None; + } + + match stream.next().await { + Some((sse, is_end)) => Some((Ok(sse), (stream, is_end, handle))), + None => { + // Stream closed unexpectedly (e.g., workflow panicked). + error!("event stream closed without STREAM_END_SCOPE"); + None + } + } + }, + ) +} + +/// Lightweight SSE payload wrapping the weavegraph event. +/// +/// In production you may want to normalise the shape further — this keeps +/// the full event detail available while adding a top-level discriminant. +#[derive(Debug, Serialize)] +struct SsePayload { + kind: &'static str, + message: String, + scope: Option, +} + +impl From<&Event> for SsePayload { + fn from(event: &Event) -> Self { + Self { + kind: match event { + Event::Node(_) => "node", + Event::Diagnostic(_) => "diagnostic", + Event::LLM(_) => "llm", + }, + message: event.message().to_string(), + scope: event.scope_label().map(str::to_string), + } + } +} + +/// `GET /healthz` — liveness probe for container orchestration. +async fn healthz() -> &'static str { + "ok" +} + +// ============================================================================ +// Startup and graph compilation +// ============================================================================ + +/// Build and compile the workflow graph with Postgres checkpointing. +/// +/// This runs **once** at startup. The compiled [`App`] is wrapped in [`Arc`] +/// and shared across all handlers for the lifetime of the server. Graph +/// compilation is O(V+E) and negligible relative to request handling. +async fn build_app() -> Result { + dotenvy::dotenv().ok(); + let db_url = std::env::var("DATABASE_URL") + .unwrap_or_else(|_| "postgres://postgres:postgres@localhost/weavegraph".to_string()); + + // Connect to Postgres. When the `postgres-migrations` feature is enabled, + // schema migrations are run automatically on connect. + let pg = PostgresCheckpointer::connect(&db_url).await?; + + // Attach the postgres checkpointer via checkpointer_custom(). + // This takes precedence over any CheckpointerType enum variant. + let runtime_config = RuntimeConfig::new(None, None) + .checkpointer_custom(Arc::new(pg)) + .with_event_bus(EventBusConfig::with_stdout_only()); + + let app = GraphBuilder::new() + .add_node(NodeKind::Custom("validate".into()), ValidateNode) + .add_node(NodeKind::Custom("llm".into()), LlmNode) + .add_edge(NodeKind::Start, NodeKind::Custom("validate".into())) + .add_edge( + NodeKind::Custom("validate".into()), + NodeKind::Custom("llm".into()), + ) + .add_edge(NodeKind::Custom("llm".into()), NodeKind::End) + .with_runtime_config(runtime_config) + .compile()?; + + info!(db_url = %db_url, "graph compiled with postgres checkpointing"); + Ok(app) +} + +// ============================================================================ +// Main entry point +// ============================================================================ + +#[tokio::main] +async fn main() -> Result<(), BoxError> { + tracing_subscriber::registry() + .with(tracing_subscriber::fmt::layer()) + .with(EnvFilter::from_default_env().add_directive("info".parse().unwrap())) + .init(); + + let app = build_app().await?; + let state = AppState { app: Arc::new(app) }; + + let router = Router::new() + .route("/run", get(run_handler)) + .route("/healthz", get(healthz)) + .with_state(state); + + let addr = "0.0.0.0:3000"; + info!(addr, "production_streaming server listening"); + + let listener = tokio::net::TcpListener::bind(addr).await?; + axum::serve(listener, router).await?; + + Ok(()) +} diff --git a/scripts/ci-local.sh b/scripts/ci-local.sh index 8b11786..0983cf7 100755 --- a/scripts/ci-local.sh +++ b/scripts/ci-local.sh @@ -85,7 +85,8 @@ fi run_check "cargo doc (nightly)" "RUSTDOCFLAGS='--cfg docsrs -D warnings' cargo +nightly doc --workspace --all-features --no-deps" # 5. Cargo semver-checks (blocking) -run_check "cargo semver-checks" "cargo semver-checks check-release --workspace" +# Note: cargo-semver-checks requires rustc >= 1.91.0; run on stable, not pinned MSRV +run_check "cargo semver-checks" "cargo +stable semver-checks check-release --workspace" # 6. Cargo deny (blocking) run_check "cargo deny" "cargo deny check" diff --git a/src/app.rs b/src/app.rs index a05daba..1fa7dce 100644 --- a/src/app.rs +++ b/src/app.rs @@ -1,3 +1,7 @@ +//! Application layer providing the high-level [`App`] entry point for workflow invocation. +//! +//! `App` manages node registration, graph compilation, and dispatches execution to +//! an [`AppRunner`]. use rustc_hash::FxHashMap; use std::sync::Arc; @@ -80,6 +84,7 @@ pub struct AppEventStream { #[derive(Debug, Error)] #[cfg_attr(feature = "diagnostics", derive(miette::Diagnostic))] pub enum AppEventStreamError { + /// The event stream has already been taken from this invocation handle. #[error("event stream has already been taken")] #[cfg_attr( feature = "diagnostics", @@ -316,13 +321,13 @@ impl App { /// .split() /// .expect("fresh event stream handle should still own the stream"); /// - /// let mut runner = AppRunner::with_options_and_bus( - /// app.clone(), - /// CheckpointerType::InMemory, - /// false, - /// event_bus, - /// true, - /// ).await; + /// let mut runner = AppRunner::builder() + /// .app(app.clone()) + /// .checkpointer(CheckpointerType::InMemory) + /// .autosave(false) + /// .event_bus(event_bus) + /// .build() + /// .await; /// /// tokio::spawn(async move { /// let mut stream = event_stream.into_async_stream(); @@ -349,11 +354,7 @@ impl App { &self, override_config: Option, ) -> (CheckpointerType, Option>) { - let checkpointer_type = override_config - .or_else(|| self.runtime_config.checkpointer_type()) - .unwrap_or(CheckpointerType::InMemory); - // Precedence rule: custom checkpointer always wins when provided. - // The enum-based factory (checkpointer_type) is only used if custom is None. + let checkpointer_type = override_config.unwrap_or(CheckpointerType::InMemory); let custom_checkpointer = self.runtime_config.custom_checkpointer(); (checkpointer_type, custom_checkpointer) } @@ -512,10 +513,10 @@ impl App { /// For streaming-first scenarios consider [`invoke_streaming`](Self::invoke_streaming), /// [`invoke_with_channel`](Self::invoke_with_channel), or /// [`invoke_with_sinks`](Self::invoke_with_sinks). Drop down to - /// [`AppRunner::with_options_and_bus`](crate::runtimes::runner::AppRunner::with_options_and_bus) + /// [`AppRunner::builder()`](crate::runtimes::runner::AppRunner::builder) /// when you need per-request isolation or bespoke runner lifecycle management. /// - /// See [`AppRunner::with_options_and_bus()`](crate::runtimes::runner::AppRunner::with_options_and_bus) + /// See [`AppRunner::builder()`](crate::runtimes::runner::AppRunner::builder) /// for streaming events to custom sinks. /// /// # Parameters @@ -561,13 +562,13 @@ impl App { /// ]); /// /// // Use AppRunner with custom EventBus - /// let mut runner = AppRunner::with_options_and_bus( - /// app, - /// CheckpointerType::InMemory, - /// false, - /// bus, - /// true, - /// ).await; + /// let mut runner = AppRunner::builder() + /// .app(app) + /// .checkpointer(CheckpointerType::InMemory) + /// .autosave(false) + /// .event_bus(bus) + /// .build() + /// .await; /// /// let session_id = "my-session".to_string(); /// let initial = VersionedState::new_with_user_message("Process this"); @@ -604,7 +605,7 @@ impl App { /// Execute workflow with event streaming to a channel. /// - /// This is a convenience method that combines `AppRunner::with_options_and_bus()` + /// This is a convenience method that combines `AppRunner::builder()` /// with channel creation and management. It's ideal for simple use cases where /// you want to stream events without manually managing the EventBus. /// @@ -616,7 +617,7 @@ impl App { /// /// # When NOT to Use This /// - /// - Web servers with per-request streaming (use `AppRunner::with_options_and_bus()`) + /// - Web servers with per-request streaming (use `AppRunner::builder()`) /// - Need multiple EventSinks beyond ChannelSink (use `invoke_with_sinks()`) /// - Need fine-grained control over EventBus lifecycle /// @@ -704,14 +705,14 @@ impl App { /// This method internally: /// 1. Creates a `flume::unbounded()` channel /// 2. Builds an EventBus from the runtime configuration and appends a `ChannelSink` - /// 3. Uses `AppRunner::with_options_and_bus()` with the custom EventBus + /// 3. Uses `AppRunner::builder()` with the custom EventBus /// 4. Returns both the execution result and receiver /// /// # See Also /// /// - [`invoke_with_sinks()`](Self::invoke_with_sinks) - For multiple EventSinks /// - [`invoke_streaming()`](Self::invoke_streaming) - Async `EventStream` helper - /// - [`AppRunner::with_options_and_bus()`](crate::runtimes::runner::AppRunner::with_options_and_bus) - For web servers + /// - [`AppRunner::builder()`](crate::runtimes::runner::AppRunner::builder) - For web servers /// - [`invoke()`](Self::invoke) - Simple execution without streaming #[instrument(skip(self, initial_state))] pub async fn invoke_with_channel( @@ -745,7 +746,7 @@ impl App { /// /// # When NOT to Use This /// - /// - Web servers with per-request streaming (use `AppRunner::with_options_and_bus()`) + /// - Web servers with per-request streaming (use `AppRunner::builder()`) /// - Need to create EventBus instances per HTTP request /// - Require fine-grained control over runner lifecycle /// @@ -821,7 +822,7 @@ impl App { /// /// - [`invoke_with_channel()`](Self::invoke_with_channel) - Simpler channel-only variant /// - [`invoke_streaming()`](Self::invoke_streaming) - Async `EventStream` without channels - /// - [`AppRunner::with_options_and_bus()`](crate::runtimes::runner::AppRunner::with_options_and_bus) - Full control + /// - [`AppRunner::builder()`](crate::runtimes::runner::AppRunner::builder) - Full control #[instrument(skip(self, initial_state, sinks), err)] pub async fn invoke_with_sinks( &self, diff --git a/src/channels/errors.rs b/src/channels/errors.rs index 8d4a6fa..b35edc6 100644 --- a/src/channels/errors.rs +++ b/src/channels/errors.rs @@ -1,3 +1,4 @@ +//! Error event types used to capture and propagate structured errors through the workflow. use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; @@ -62,14 +63,19 @@ use crate::telemetry::{FormatterMode, PlainFormatter, TelemetryFormatter}; /// ``` #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)] pub struct ErrorEvent { + /// Timestamp at which the error occurred. #[serde(default = "chrono::Utc::now")] pub when: DateTime, + /// Scope identifying where in the workflow the error originated. #[serde(default)] pub scope: ErrorScope, + /// Structured error payload describing the failure. #[serde(default)] pub error: WeaveError, + /// Arbitrary string tags for filtering and categorization. #[serde(default)] pub tags: Vec, + /// Optional additional context data as a JSON value. #[serde(default)] pub context: serde_json::Value, } @@ -201,17 +207,26 @@ impl ErrorEvent { #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)] #[serde(tag = "scope", rename_all = "snake_case")] pub enum ErrorScope { + /// Error originated in a node execution. Node { + /// Node kind identifier. kind: String, + /// Workflow step at which the error occurred. step: u64, }, + /// Error originated in the scheduler. Scheduler { + /// Workflow step at which the error occurred. step: u64, }, + /// Error originated in the runner. Runner { + /// Session identifier associated with the error. session: String, + /// Workflow step at which the error occurred. step: u64, }, + /// Error originated at the application level (default). #[default] App, } @@ -276,13 +291,6 @@ impl WeaveError { } } -/// Deprecated compatibility alias retained in 0.3.x. -#[deprecated( - since = "0.3.0", - note = "Use WeaveError instead; this alias is removed in 0.4.0" -)] -pub type LadderError = WeaveError; - /// Format error events with explicit color mode control. /// /// This function allows you to control whether ANSI color codes are included in the output: diff --git a/src/channels/errors_channel.rs b/src/channels/errors_channel.rs index ab6e9ac..3bac2f4 100644 --- a/src/channels/errors_channel.rs +++ b/src/channels/errors_channel.rs @@ -2,6 +2,7 @@ use super::Channel; use super::errors::ErrorEvent; use serde::{Deserialize, Serialize}; +/// Channel that accumulates [`ErrorEvent`] entries for the current workflow execution. #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] pub struct ErrorsChannel { value: Vec, @@ -9,6 +10,7 @@ pub struct ErrorsChannel { } impl ErrorsChannel { + /// Create a new `ErrorsChannel` with the given events and version counter. pub fn new(events: Vec, version: u32) -> Self { Self { value: events, diff --git a/src/channels/extras.rs b/src/channels/extras.rs index da4c168..e2af80f 100644 --- a/src/channels/extras.rs +++ b/src/channels/extras.rs @@ -4,6 +4,7 @@ use super::Channel; use crate::types::ChannelType; type ChannelValue = FxHashMap; +/// Channel that stores arbitrary key-value extra data for the workflow state. #[derive(Clone, Debug, PartialEq, Eq)] pub struct ExtrasChannel { value: ChannelValue, @@ -11,6 +12,7 @@ pub struct ExtrasChannel { } impl ExtrasChannel { + /// Create a new `ExtrasChannel` with the given map and version counter. pub fn new(extras: ChannelValue, version: u32) -> Self { Self { value: extras, diff --git a/src/channels/messages.rs b/src/channels/messages.rs index 0943399..3c89905 100644 --- a/src/channels/messages.rs +++ b/src/channels/messages.rs @@ -2,6 +2,7 @@ use super::Channel; use crate::{message::Message, types::ChannelType}; type ChannelValue = Vec; +/// Channel that stores the ordered list of conversation messages. #[derive(Clone, Debug, PartialEq, Eq)] pub struct MessagesChannel { value: ChannelValue, @@ -9,6 +10,7 @@ pub struct MessagesChannel { } impl MessagesChannel { + /// Create a new `MessagesChannel` with the given messages and version counter. pub fn new(messages: ChannelValue, version: u32) -> Self { Self { value: messages, diff --git a/src/channels/mod.rs b/src/channels/mod.rs index 8b300e8..e71d184 100644 --- a/src/channels/mod.rs +++ b/src/channels/mod.rs @@ -1,5 +1,7 @@ +//! Channel types that form the typed state slots of a workflow's [`VersionedState`](crate::state::VersionedState). use crate::types::ChannelType; +/// Error event and scope types for structured workflow error capture. pub mod errors; mod errors_channel; mod extras; @@ -10,13 +12,25 @@ pub use errors_channel::ErrorsChannel; pub use extras::ExtrasChannel; pub use messages::MessagesChannel; +/// Core trait for a typed, versioned workflow state channel. +/// +/// Each implementing type wraps a value of type `T` with a version counter +/// used by the scheduler for change-detection gating. pub trait Channel: Sync + Send { + /// Returns the [`ChannelType`] discriminant for this channel. fn get_channel_type(&self) -> ChannelType; + /// Returns a clone of the current channel value. fn snapshot(&self) -> T; + /// Returns the number of items in the channel. fn len(&self) -> usize; + /// Returns `true` if the channel contains no items. fn is_empty(&self) -> bool; + /// Returns the current version counter. fn version(&self) -> u32; + /// Sets the version counter to the given value. fn set_version(&mut self, version: u32) -> (); + /// Returns a mutable reference to the underlying value. fn get_mut(&mut self) -> &mut T; + /// Returns `true` if this channel's data should be persisted across steps. fn persistent(&self) -> bool; } diff --git a/src/event_bus/bus.rs b/src/event_bus/bus.rs index 8382a3d..a4fabf9 100644 --- a/src/event_bus/bus.rs +++ b/src/event_bus/bus.rs @@ -1,3 +1,4 @@ +//! [`EventBus`] implementation: fan-out broadcast to registered [`EventSink`] workers. use std::io; use std::sync::Arc; use std::sync::Mutex; @@ -76,13 +77,13 @@ use chrono::Utc; /// ]); /// /// // Pass EventBus to AppRunner -/// let mut runner = AppRunner::with_options_and_bus( -/// app, -/// CheckpointerType::InMemory, -/// false, -/// bus, // Custom EventBus -/// true, -/// ).await; +/// let mut runner = AppRunner::builder() +/// .app(app) +/// .checkpointer(CheckpointerType::InMemory) +/// .autosave(false) +/// .event_bus(bus) +/// .build() +/// .await; /// /// let session_id = "client-123".to_string(); /// runner.create_session( @@ -120,13 +121,13 @@ use chrono::Utc; /// let bus = EventBus::with_sinks(vec![Box::new(ChannelSink::new(tx))]); /// /// // Reuse the App, create new runner with isolated EventBus -/// let mut runner = AppRunner::with_options_and_bus( -/// Arc::try_unwrap(app).unwrap_or_else(|arc| (*arc).clone()), -/// CheckpointerType::InMemory, -/// false, -/// bus, // Isolated EventBus for this request -/// true, -/// ).await; +/// let mut runner = AppRunner::builder() +/// .app(Arc::try_unwrap(app).unwrap_or_else(|arc| (*arc).clone())) +/// .checkpointer(CheckpointerType::InMemory) +/// .autosave(false) +/// .event_bus(bus) +/// .build() +/// .await; /// /// // Run workflow - events are isolated to this request /// let session_id = uuid::Uuid::new_v4().to_string(); @@ -148,11 +149,15 @@ use chrono::Utc; /// /// # See Also /// -/// - [`AppRunner::with_options_and_bus()`](crate::runtimes::runner::AppRunner::with_options_and_bus) - How to use custom EventBus +/// - [`AppRunner::builder()`](crate::runtimes::runner::AppRunner::builder) - How to use custom EventBus /// - [`ChannelSink`](crate::event_bus::ChannelSink) - For streaming events /// - Example: `examples/streaming_events.rs` - Complete streaming demonstration const DEFAULT_BUFFER_CAPACITY: usize = 1024; +/// Central event broadcasting system that fans out workflow events to registered sinks. +/// +/// Create with [`EventBus::with_sink`] or [`EventBus::with_sinks`] and pass to +/// [`AppRunner`](crate::runtimes::runner::AppRunner) for per-request event isolation. pub struct EventBus { sinks: Arc>>, hub: Arc, @@ -174,6 +179,7 @@ impl Default for EventBus { } impl EventBus { + /// Create an `EventBus` with a single sink. pub fn with_sink(sink: T) -> Self where T: EventSink + 'static, @@ -181,6 +187,7 @@ impl EventBus { Self::with_sinks(vec![Box::new(sink)]) } + /// Create an `EventBus` backed by the provided collection of sinks. pub fn with_sinks(sinks: Vec>) -> Self { Self::with_capacity(sinks, DEFAULT_BUFFER_CAPACITY) } @@ -217,6 +224,7 @@ impl EventBus { } } + /// Add a typed sink to this bus, starting a worker if the bus is already live. pub fn add_sink(&self, sink: T) { self.add_boxed_sink(Box::new(sink)); } @@ -240,6 +248,7 @@ impl EventBus { sinks_guard.push(entry); } + /// Return an [`EventEmitter`] handle for publishing events to this bus. pub fn get_emitter(&self) -> Arc { Arc::new(self.hub.emitter()) } @@ -249,6 +258,7 @@ impl EventBus { self.hub.metrics() } + /// Subscribe to the event stream, starting workers if not yet started. pub fn subscribe(&self) -> EventStream { self.listen_for_events(); self.hub.subscribe() @@ -320,6 +330,7 @@ impl EventBus { } } + /// Close the underlying hub channel, signalling all subscribers that the stream has ended. pub fn close_channel(&self) { self.hub.close(); } diff --git a/src/event_bus/diagnostics.rs b/src/event_bus/diagnostics.rs index 86c4a0a..66bac15 100644 --- a/src/event_bus/diagnostics.rs +++ b/src/event_bus/diagnostics.rs @@ -1,3 +1,4 @@ +//! Sink health diagnostics: per-sink error tracking and the diagnostics broadcast stream. use std::time::Duration; use chrono::{DateTime, Utc}; @@ -22,19 +23,26 @@ pub struct SinkDiagnostic { /// Public snapshot type representing per-sink health. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct SinkHealth { + /// Name of the sink this health snapshot belongs to. pub sink: String, + /// Total number of errors encountered by this sink. pub error_count: u64, #[serde(skip_serializing_if = "Option::is_none")] + /// Description of the most recent error, if any. pub last_error: Option, #[serde(skip_serializing_if = "Option::is_none")] + /// Timestamp of the most recent error, if any. pub last_error_at: Option>, } /// Internal accumulator for health tracking. #[derive(Debug, Default, Clone)] pub struct HealthState { + /// Running count of errors recorded for the sink. pub error_count: u64, + /// Description of the most recent error, if any. pub last_error: Option, + /// Timestamp of the most recent error, if any. pub last_error_at: Option>, } @@ -45,6 +53,7 @@ pub struct DiagnosticsStream { } impl DiagnosticsStream { + /// Create a new `DiagnosticsStream` from a broadcast receiver. pub fn new(receiver: Receiver) -> Self { Self { receiver } } diff --git a/src/event_bus/emitter.rs b/src/event_bus/emitter.rs index 3460ed9..d4aef33 100644 --- a/src/event_bus/emitter.rs +++ b/src/event_bus/emitter.rs @@ -1,3 +1,4 @@ +//! [`EventEmitter`] trait and [`EmitterError`] for publishing events to the bus. use std::fmt; use thiserror::Error; @@ -13,6 +14,7 @@ pub trait EventEmitter: Send + Sync + fmt::Debug { #[derive(Debug, Error)] #[cfg_attr(feature = "diagnostics", derive(miette::Diagnostic))] pub enum EmitterError { + /// The event hub has been shut down and no longer accepts events. #[error("event hub closed")] #[cfg_attr( feature = "diagnostics", @@ -22,6 +24,7 @@ pub enum EmitterError { ) )] Closed, + /// Event emission failed for a reason other than hub closure. #[error("event emission failed: {0}")] #[cfg_attr( feature = "diagnostics", @@ -34,6 +37,7 @@ pub enum EmitterError { } impl EmitterError { + /// Construct an [`EmitterError::Other`] from any string-convertible error message. pub fn other(error: impl Into) -> Self { Self::Other(error.into()) } diff --git a/src/event_bus/event.rs b/src/event_bus/event.rs index a0dd5cf..fb997d8 100644 --- a/src/event_bus/event.rs +++ b/src/event_bus/event.rs @@ -1,3 +1,4 @@ +//! Core event types emitted by workflow nodes and the framework itself. use std::fmt; use chrono::{DateTime, Utc}; @@ -5,20 +6,38 @@ use rustc_hash::FxHashMap; use serde::{Deserialize, Serialize}; use serde_json::Value; +/// Scope constant marking the end of a streaming invocation. +/// +/// An event with this scope is emitted by the framework when the event stream closes +/// so that consumers can detect clean stream termination. pub const STREAM_END_SCOPE: &str = "__weavegraph_stream_end__"; +/// Scope constant for diagnostic events emitted by the framework. +/// +/// Use this scope when emitting internal diagnostic information +/// to distinguish framework diagnostics from user node events. +/// Consumers can filter on this scope to capture framework-level +/// telemetry without polluting the main event stream. +pub const DIAGNOSTIC_SCOPE: &str = "__weavegraph_diagnostic__"; + +/// A workflow event that can be emitted by nodes or the framework itself. #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] pub enum Event { + /// A structured event emitted by a workflow node. Node(NodeEvent), + /// A framework-internal diagnostic event. Diagnostic(DiagnosticEvent), + /// An LLM streaming chunk or final/error marker. LLM(LLMStreamingEvent), } impl Event { + /// Create a node event with only a scope and message (no node ID or step). pub fn node_message(scope: impl Into, message: impl Into) -> Self { Event::Node(NodeEvent::new(None, None, scope.into(), message.into())) } + /// Create a node event with full metadata (node ID, step, scope, message). pub fn node_message_with_meta( node_id: impl Into, step: u64, @@ -33,6 +52,7 @@ impl Event { )) } + /// Create a diagnostic event with the given scope and message. pub fn diagnostic(scope: impl Into, message: impl Into) -> Self { Event::Diagnostic(DiagnosticEvent { scope: scope.into(), @@ -40,6 +60,7 @@ impl Event { }) } + /// Return the scope label string if the event carries one. pub fn scope_label(&self) -> Option<&str> { match self { Event::Node(node) => Some(node.scope()), @@ -48,6 +69,7 @@ impl Event { } } + /// Return the primary message text for this event. pub fn message(&self) -> &str { match self { Event::Node(node) => node.message(), @@ -193,6 +215,7 @@ impl fmt::Display for Event { } } +/// A structured event emitted by a workflow node during execution. #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] pub struct NodeEvent { node_id: Option, @@ -202,6 +225,7 @@ pub struct NodeEvent { } impl NodeEvent { + /// Create a new `NodeEvent` with optional node ID and step number. pub fn new(node_id: Option, step: Option, scope: String, message: String) -> Self { Self { node_id, @@ -211,23 +235,28 @@ impl NodeEvent { } } + /// Returns the node identifier, if set. pub fn node_id(&self) -> Option<&str> { self.node_id.as_deref() } + /// Returns the step number at which this event was emitted, if set. pub fn step(&self) -> Option { self.step } + /// Returns the scope label for this event. pub fn scope(&self) -> &str { &self.scope } + /// Returns the event message text. pub fn message(&self) -> &str { &self.message } } +/// A framework-internal diagnostic event emitted outside normal node execution. #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] pub struct DiagnosticEvent { scope: String, @@ -235,20 +264,27 @@ pub struct DiagnosticEvent { } impl DiagnosticEvent { + /// Returns the scope label for this diagnostic event. pub fn scope(&self) -> &str { &self.scope } + /// Returns the diagnostic message text. pub fn message(&self) -> &str { &self.message } } +/// Scope discriminant for LLM streaming events. #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] pub enum LLMStreamingEventScope { + /// An in-progress streaming session (default scope). Streaming, + /// A single text chunk within a streaming response. Chunk, + /// The final chunk marking the end of the stream. Final, + /// An error event terminating the stream. Error, } @@ -263,6 +299,7 @@ impl AsRef for LLMStreamingEventScope { } } +/// An event carrying an LLM response chunk, final marker, or error from a streaming session. #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] pub struct LLMStreamingEvent { session_id: Option, @@ -277,6 +314,7 @@ pub struct LLMStreamingEvent { impl LLMStreamingEvent { #[allow(clippy::too_many_arguments)] + /// Create a new `LLMStreamingEvent` with full field control. pub fn new( session_id: Option, node_id: Option, @@ -299,6 +337,7 @@ impl LLMStreamingEvent { } } + /// Create a chunk event representing a partial LLM response. pub fn chunk_event( session_id: Option, node_id: Option, @@ -318,6 +357,7 @@ impl LLMStreamingEvent { ) } + /// Create a final event marking the end of an LLM streaming session. pub fn final_event( session_id: Option, node_id: Option, @@ -337,6 +377,7 @@ impl LLMStreamingEvent { ) } + /// Create an error event marking a failed LLM streaming session. pub fn error_event( session_id: Option, node_id: Option, @@ -357,43 +398,53 @@ impl LLMStreamingEvent { ) } + /// Returns the session identifier, if set. pub fn session_id(&self) -> Option<&str> { self.session_id.as_deref() } + /// Returns the node identifier, if set. pub fn node_id(&self) -> Option<&str> { self.node_id.as_deref() } + /// Returns the stream identifier, if set. pub fn stream_id(&self) -> Option<&str> { self.stream_id.as_deref() } + /// Returns the text chunk carried by this event. pub fn chunk(&self) -> &str { &self.chunk } + /// Returns `true` if this event marks the final chunk of the stream. pub fn is_final(&self) -> bool { self.is_final } + /// Returns the scope of this streaming event. pub fn scope(&self) -> &LLMStreamingEventScope { &self.scope } + /// Returns the metadata map attached to this event. pub fn metadata(&self) -> &FxHashMap { &self.metadata } + /// Returns the timestamp at which this event was created. pub fn timestamp(&self) -> DateTime { self.timestamp } + /// Return a new event with the given metadata map replacing the existing one. pub fn with_metadata(mut self, metadata: FxHashMap) -> Self { self.metadata = metadata; self } + /// Return a new event with the given timestamp replacing the existing one. pub fn with_timestamp(mut self, timestamp: DateTime) -> Self { self.timestamp = timestamp; self diff --git a/src/event_bus/hub.rs b/src/event_bus/hub.rs index 4873692..89e0eac 100644 --- a/src/event_bus/hub.rs +++ b/src/event_bus/hub.rs @@ -1,3 +1,4 @@ +//! [`EventHub`] broadcast channel, [`EventStream`] receiver, and blocking iterator. use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; use std::time::Duration; @@ -22,6 +23,7 @@ pub struct EventHubMetrics { pub dropped: usize, } +/// Broadcast hub that owns the Tokio broadcast channel used by [`EventBus`](crate::event_bus::EventBus). #[derive(Debug)] pub struct EventHub { sender: RwLock>>, @@ -79,14 +81,17 @@ impl EventHub { } } + /// Returns the configured buffer capacity of the underlying broadcast channel. pub fn capacity(&self) -> usize { self.capacity } + /// Returns the total count of events dropped due to slow subscribers. pub fn dropped(&self) -> usize { self.dropped_events.load(Ordering::Relaxed) } + /// Returns a snapshot of current hub health metrics. pub fn metrics(&self) -> EventHubMetrics { EventHubMetrics { capacity: self.capacity(), @@ -94,6 +99,7 @@ impl EventHub { } } + /// Create a [`HubEmitter`] that publishes events to this hub. pub fn emitter(self: &Arc) -> HubEmitter { HubEmitter { hub: Arc::clone(self), @@ -134,6 +140,7 @@ impl EventHub { } } +/// [`EventEmitter`] implementation backed by an [`EventHub`] broadcast channel. #[derive(Clone, Debug)] pub struct HubEmitter { hub: Arc, @@ -145,6 +152,7 @@ impl EventEmitter for HubEmitter { } } +/// Async receive handle for a subscription to an [`EventHub`]. #[derive(Debug)] pub struct EventStream { receiver: Receiver, @@ -153,6 +161,7 @@ pub struct EventStream { } impl EventStream { + /// Receive the next event, awaiting if the channel is empty. pub async fn recv(&mut self) -> Result { match self.receiver.recv().await { Ok(event) => Ok(event), @@ -164,6 +173,7 @@ impl EventStream { } } + /// Try to receive an event without blocking; returns immediately if none is available. pub fn try_recv(&mut self) -> Result { match self.receiver.try_recv() { Ok(event) => Ok(event), @@ -175,10 +185,12 @@ impl EventStream { } } + /// Consume the stream and return the raw broadcast receiver. pub fn into_inner(self) -> Receiver { self.receiver } + /// Convert this stream into a synchronous blocking iterator. pub fn into_blocking_iter(self) -> BlockingEventIter { BlockingEventIter { receiver: self.receiver, @@ -186,6 +198,7 @@ impl EventStream { } } + /// Attach a shutdown watch channel; the stream ends when the watch value becomes `true`. pub fn with_shutdown(mut self, shutdown: watch::Receiver) -> Self { // Consumers can share a `watch` channel to terminate the stream early when // the producer side shuts down (e.g. HTTP connection dropped). @@ -193,6 +206,7 @@ impl EventStream { self } + /// Convert this stream into a pinned `BoxStream` for use with async combinators. pub fn into_async_stream(self) -> BoxStream<'static, Event> { // Convert the broadcast receiver into a boxed stream so callers can plug it into // combinators without worrying about pinning or generics at the call site. @@ -238,6 +252,7 @@ impl EventStream { .boxed() } + /// Receive the next event, waiting at most `duration`; returns `None` on timeout or close. pub async fn next_timeout(&mut self, duration: Duration) -> Option { // Keep polling until we either obtain an event, the channel closes, or the // deadline elapses. Lagged notifications simply increment drop metrics and retry. @@ -252,6 +267,7 @@ impl EventStream { } } +/// Synchronous blocking iterator over events from an [`EventHub`]. pub struct BlockingEventIter { receiver: Receiver, hub: Arc, diff --git a/src/event_bus/mod.rs b/src/event_bus/mod.rs index c770261..e47e39b 100644 --- a/src/event_bus/mod.rs +++ b/src/event_bus/mod.rs @@ -23,6 +23,6 @@ pub mod sink; pub use bus::EventBus; pub use diagnostics::{DiagnosticsStream, SinkDiagnostic}; pub use emitter::{EmitterError, EventEmitter}; -pub use event::{Event, LLMStreamingEvent, NodeEvent, STREAM_END_SCOPE}; +pub use event::{DIAGNOSTIC_SCOPE, Event, LLMStreamingEvent, NodeEvent, STREAM_END_SCOPE}; pub use hub::{BlockingEventIter, EventHub, EventHubMetrics, EventStream, HubEmitter}; pub use sink::{ChannelSink, EventSink, JsonLinesSink, MemorySink, StdOutSink}; diff --git a/src/event_bus/sink.rs b/src/event_bus/sink.rs index 6893483..3786e78 100644 --- a/src/event_bus/sink.rs +++ b/src/event_bus/sink.rs @@ -1,3 +1,4 @@ +//! [`EventSink`] trait and built-in sink implementations: stdout, in-memory, channel, and JSON lines. use flume; use std::any::type_name; use std::fs::File; @@ -41,6 +42,7 @@ impl Default for StdOutSink { } impl StdOutSink { + /// Create a `StdOutSink` that formats events using the given `TelemetryFormatter`. pub fn with_formatter(formatter: F) -> Self { Self { handle: io::stdout(), @@ -64,6 +66,7 @@ pub struct MemorySink { } impl MemorySink { + /// Create a new, empty `MemorySink`. pub fn new() -> Self { Self::default() } @@ -276,7 +279,7 @@ impl EventSink for JsonLinesSink { /// /// ✅ CORRECT: /// let bus = EventBus::with_sinks(vec![Box::new(ChannelSink::new(tx))]); -/// let runner = AppRunner::with_options_and_bus(app, ..., bus, true).await; +/// let runner = AppRunner::builder().app(app).event_bus(bus).build().await; /// runner.run_until_complete(&session_id).await; /// ``` /// @@ -298,13 +301,13 @@ impl EventSink for JsonLinesSink { /// let bus = EventBus::with_sinks(vec![Box::new(ChannelSink::new(tx))]); /// /// // Use AppRunner with custom EventBus -/// let mut runner = AppRunner::with_options_and_bus( -/// app, -/// CheckpointerType::InMemory, -/// false, -/// bus, -/// true, -/// ).await; +/// let mut runner = AppRunner::builder() +/// .app(app) +/// .checkpointer(CheckpointerType::InMemory) +/// .autosave(false) +/// .event_bus(bus) +/// .build() +/// .await; /// /// let session_id = "my-session".to_string(); /// runner.create_session( @@ -339,13 +342,13 @@ impl EventSink for JsonLinesSink { /// let bus = EventBus::with_sinks(vec![Box::new(ChannelSink::new(tx))]); /// /// // Create isolated runner for this request -/// let mut runner = AppRunner::with_options_and_bus( -/// Arc::try_unwrap(app).unwrap_or_else(|arc| (*arc).clone()), -/// CheckpointerType::InMemory, -/// false, -/// bus, -/// true, -/// ).await; +/// let mut runner = AppRunner::builder() +/// .app(Arc::try_unwrap(app).unwrap_or_else(|arc| (*arc).clone())) +/// .checkpointer(CheckpointerType::InMemory) +/// .autosave(false) +/// .event_bus(bus) +/// .build() +/// .await; /// /// let session_id = format!("request-{}", request_id); /// runner.create_session( @@ -372,13 +375,13 @@ impl EventSink for JsonLinesSink { /// let bus = EventBus::with_sinks(vec![Box::new(ChannelSink::new(tx))]); /// /// tokio::spawn(async move { -/// let mut runner = AppRunner::with_options_and_bus( -/// Arc::try_unwrap(app).unwrap_or_else(|arc| (*arc).clone()), -/// CheckpointerType::InMemory, -/// false, -/// bus, -/// true, -/// ).await; +/// let mut runner = AppRunner::builder() +/// .app(Arc::try_unwrap(app).unwrap_or_else(|arc| (*arc).clone())) +/// .checkpointer(CheckpointerType::InMemory) +/// .autosave(false) +/// .event_bus(bus) +/// .build() +/// .await; /// /// let session_id = uuid::Uuid::new_v4().to_string(); /// runner.create_session( @@ -404,7 +407,7 @@ impl EventSink for JsonLinesSink { /// /// # See Also /// -/// - [`AppRunner::with_options_and_bus()`](crate::runtimes::runner::AppRunner::with_options_and_bus) - How to inject custom EventBus +/// - [`AppRunner::builder()`](crate::runtimes::runner::AppRunner::builder) - How to inject custom EventBus /// - [`EventBus::with_sinks()`](crate::event_bus::EventBus::with_sinks) - Create EventBus with sinks /// - Example: `examples/streaming_events.rs` - Complete working example pub struct ChannelSink { diff --git a/src/lib.rs b/src/lib.rs index d602fec..c992fc8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -67,7 +67,6 @@ //! | `postgres-migrations` | no | Enables Postgres migration support for checkpointer setup. | //! | `postgres` | no | Enables PostgreSQL checkpointer APIs and runtime backend. | //! | `rig` | no | Enables Rig-based LLM interop and adapters. | -//! | `llm` | no | Compatibility alias to `rig` for 0.3.x (planned removal in 0.4.0). | //! | `diagnostics` | no | Adds `miette` diagnostic metadata to error types. | //! | `examples` | no | Pulls additional deps used by selected examples. | //! | `petgraph-compat` | no | Exposes petgraph conversion helpers for graph analysis and visualization. | @@ -78,9 +77,71 @@ //! - `docs/OPERATIONS.md` for runtime operations, persistence, and deployment concerns. //! - `docs/STREAMING.md` for event streaming patterns and production guidance. //! - `docs/ARCHITECTURE.md` for internal architecture and execution model details. +//! +//! # Common Patterns +//! +//! ## Graph lifecycle +//! +//! ```rust,no_run +//! # use weavegraph::graphs::GraphBuilder; +//! # use weavegraph::types::NodeKind; +//! # use weavegraph::state::VersionedState; +//! # use weavegraph::runtimes::RuntimeConfig; +//! # async fn example() -> Result<(), Box> { +//! // 1. Build — declare nodes and edges. +//! // 2. Compile — validate topology, attach runtime config. +//! // 3. Invoke — run once or stream events to clients. +//! let app = GraphBuilder::new() +//! /* .add_node(...).add_edge(...) */ +//! .compile()?; +//! +//! let state = VersionedState::new_with_user_message("hello"); +//! let final_state = app.invoke(state).await?; +//! # Ok(()) +//! # } +//! ``` +//! +//! See `examples/graph_execution.rs` for a runnable graph lifecycle example. +//! +//! ## Streaming events via SSE +//! +//! ```rust,no_run +//! # use std::sync::Arc; +//! # use weavegraph::app::App; +//! # use weavegraph::event_bus::STREAM_END_SCOPE; +//! # use weavegraph::state::VersionedState; +//! # async fn example(app: Arc) { +//! // Each call gets an isolated runner + event bus. +//! let state = VersionedState::new_with_user_message("hello"); +//! let (handle, event_stream) = app.invoke_streaming(state).await; +//! +//! // Convert to an async stream and forward to your SSE layer. +//! // Terminate when STREAM_END_SCOPE is observed. +//! let _ = event_stream.into_async_stream(); // futures::Stream +//! let _ = handle; // join or abort the background task +//! # } +//! ``` +//! +//! See `examples/production_streaming.rs` for the full Axum + Postgres reference. +//! +//! ## Error handling in nodes +//! +//! ```rust,no_run +//! # use weavegraph::node::{NodeError, NodeResultExt}; +//! // Return a domain error from any node: +//! fn validate(input: &str) -> Result<(), NodeError> { +//! if input.is_empty() { +//! return Err(NodeError::Other("input must not be empty".into())); +//! } +//! // Lift arbitrary std::error::Error with ?: +//! std::str::from_utf8(input.as_bytes()).node_err()?; +//! Ok(()) +//! } +//! ``` +//! +//! See `examples/errors_pretty.rs` for error display patterns. -// TODO(pre-v0.3.0): Re-enable once public API is documented (~232 items) -// #![warn(missing_docs)] +#![warn(missing_docs)] pub mod app; pub mod channels; diff --git a/src/llm/rig_adapter.rs b/src/llm/rig_adapter.rs index 5b39dc2..42c7d38 100644 --- a/src/llm/rig_adapter.rs +++ b/src/llm/rig_adapter.rs @@ -1,3 +1,4 @@ +//! Adapter implementing the weavegraph LLM traits for the [Rig](https://github.com/0xPlaygrounds/rig) framework. use crate::message::{Message, Role}; use rig::completion::message::{ AssistantContent, Message as RigMessage, ToolResultContent, UserContent, diff --git a/src/llm/traits.rs b/src/llm/traits.rs index ec52833..bfdcd83 100644 --- a/src/llm/traits.rs +++ b/src/llm/traits.rs @@ -1,3 +1,4 @@ +//! Framework-agnostic traits for LLM providers (non-streaming and streaming). use crate::message::Message; use async_trait::async_trait; use futures_util::stream::BoxStream; @@ -8,7 +9,9 @@ pub type LlmError = Box; /// Completed response from an LLM provider. #[derive(Clone, Debug, Default)] pub struct LlmResponse { + /// The generated text content returned by the LLM. pub content: String, + /// Optional provider-specific metadata (token counts, finish reason, etc.). pub metadata: serde_json::Value, } diff --git a/src/message.rs b/src/message.rs index c11ec42..eb82e6f 100644 --- a/src/message.rs +++ b/src/message.rs @@ -1,3 +1,4 @@ +//! Message types representing chat turns and content in a workflow conversation. use serde::{Deserialize, Serialize}; use std::fmt; @@ -152,21 +153,6 @@ mod role_serde { } impl Message { - /// Creates a new message with the specified role string and content. - /// - /// For type-safe role handling, prefer [`with_role()`](Self::with_role). - #[must_use] - #[deprecated( - since = "0.3.0", - note = "Use Message::with_role(Role::..., ...) or Message::user()/assistant()/system()/tool()" - )] - pub fn new(role: &str, content: &str) -> Self { - Self { - role: Role::from(role), - content: content.to_string(), - } - } - /// Creates a new message with a typed [`Role`] and content. /// /// This is the recommended way to create messages with standard roles. @@ -246,13 +232,6 @@ mod tests { assert_eq!(msg.role, Role::Custom("custom".into())); } - #[test] - #[allow(deprecated)] - fn test_message_new_deprecated_compat() { - let msg = Message::new("custom", "data"); - assert_eq!(msg.role, Role::Custom("custom".into())); - } - #[test] fn test_message_with_role() { let msg = Message::with_role(Role::Tool, "result"); diff --git a/src/node.rs b/src/node.rs index 9263f37..70c2dfd 100644 --- a/src/node.rs +++ b/src/node.rs @@ -265,6 +265,7 @@ pub struct NodePartial { } impl NodePartial { + /// Create an empty `NodePartial` with all fields set to `None`. pub fn new() -> Self { Self { ..Default::default() @@ -364,13 +365,18 @@ pub enum NodeError { help("Check that the previous node produced the required data: {what}.") ) )] - MissingInput { what: &'static str }, + MissingInput { + /// Description of the missing input data. + what: &'static str, + }, /// External provider or service error. #[error("provider error ({provider}): {message}")] #[cfg_attr(feature = "diagnostics", diagnostic(code(weavegraph::node::provider)))] Provider { + /// Name of the external provider that produced the error. provider: &'static str, + /// Human-readable description of the error. message: String, }, diff --git a/src/reducers/add_errors.rs b/src/reducers/add_errors.rs index 7ba8e47..1ac3cac 100644 --- a/src/reducers/add_errors.rs +++ b/src/reducers/add_errors.rs @@ -1,6 +1,8 @@ +//! Reducer that appends incoming [`ErrorEvent`](crate::channels::errors::ErrorEvent) entries to the errors channel. use super::Reducer; use crate::{channels::Channel, node::NodePartial, state::VersionedState}; +/// Reducer that appends error events from a [`NodePartial`](crate::node::NodePartial) to the state errors channel. #[derive(Debug, PartialEq, Clone, Hash, Eq)] pub struct AddErrors; diff --git a/src/reducers/add_messages.rs b/src/reducers/add_messages.rs index 938a294..4569470 100644 --- a/src/reducers/add_messages.rs +++ b/src/reducers/add_messages.rs @@ -1,6 +1,8 @@ +//! Reducer that appends incoming messages to the messages channel. use super::Reducer; use crate::{channels::Channel, node::NodePartial, state::VersionedState}; +/// Reducer that appends messages from a [`NodePartial`](crate::node::NodePartial) to the state messages channel. #[derive(Debug, PartialEq, Clone, Hash, Eq)] pub struct AddMessages; diff --git a/src/reducers/map_merge.rs b/src/reducers/map_merge.rs index 62c69f9..23a2e8a 100644 --- a/src/reducers/map_merge.rs +++ b/src/reducers/map_merge.rs @@ -1,6 +1,8 @@ +//! Reducer that shallow-merges incoming extra key-value pairs into the extras channel. use super::Reducer; use crate::{channels::Channel, node::NodePartial, state::VersionedState}; +/// Reducer that merges extra key-value pairs from a [`NodePartial`](crate::node::NodePartial) into the state extras channel. #[derive(Debug, PartialEq, Clone, Hash, Eq)] pub struct MapMerge; impl Reducer for MapMerge { diff --git a/src/reducers/mod.rs b/src/reducers/mod.rs index 1bd9c20..c6513cc 100644 --- a/src/reducers/mod.rs +++ b/src/reducers/mod.rs @@ -1,3 +1,4 @@ +//! State reducers that apply [`NodePartial`] updates to [`VersionedState`]. mod add_errors; mod add_messages; mod map_merge; @@ -16,12 +17,15 @@ use thiserror::Error; /// Unified reducer trait: every reducer mutates VersionedState using a NodePartial delta. /// Channels currently implemented: messages (append) and extra (shallow JSON map merge). pub trait Reducer: Send + Sync { + /// Apply the partial update `update` to `state`, mutating it in place. fn apply(&self, state: &mut VersionedState, update: &NodePartial); } +/// Errors that can occur when applying reducers to workflow state. #[derive(Debug, Error)] #[cfg_attr(feature = "diagnostics", derive(miette::Diagnostic))] pub enum ReducerError { + /// No reducer is registered for the specified channel type. #[error("no reducers registered for channel: {0:?}")] #[cfg_attr( feature = "diagnostics", @@ -29,13 +33,16 @@ pub enum ReducerError { )] UnknownChannel(ChannelType), + /// A reducer failed while applying an update to a channel. #[error("reducer apply failed for channel {channel:?}: {message}")] #[cfg_attr( feature = "diagnostics", diagnostic(code(weavegraph::reducers::apply_failed)) )] Apply { + /// The channel type for which the reducer failed. channel: ChannelType, + /// Human-readable description of the failure. message: String, }, } diff --git a/src/reducers/reducer_registry.rs b/src/reducers/reducer_registry.rs index 431fef2..e50b618 100644 --- a/src/reducers/reducer_registry.rs +++ b/src/reducers/reducer_registry.rs @@ -9,6 +9,7 @@ use crate::{ }; use tracing::instrument; +/// Registry mapping channel types to ordered lists of reducers. #[derive(Clone)] pub struct ReducerRegistry { reducer_map: FxHashMap>>, @@ -100,6 +101,7 @@ impl ReducerRegistry { } #[instrument(skip(self, state, to_update), err)] + /// Apply all reducers for `channel_type` to `state` using `to_update` as the delta. pub fn try_update( &self, channel_type: ChannelType, @@ -122,6 +124,7 @@ impl ReducerRegistry { } #[instrument(skip(self, state, merged_updates), err)] + /// Apply all registered reducers across all channels to `state`. pub fn apply_all( &self, state: &mut VersionedState, diff --git a/src/runtimes/checkpointer.rs b/src/runtimes/checkpointer.rs index 330c592..c1b6fe4 100644 --- a/src/runtimes/checkpointer.rs +++ b/src/runtimes/checkpointer.rs @@ -41,12 +41,19 @@ use crate::{ /// to enable full session resumption and audit trails. #[derive(Debug, Clone)] pub struct Checkpoint { + /// Unique identifier of the workflow session this checkpoint belongs to. pub session_id: String, + /// Execution step number at the time of this checkpoint. pub step: u64, + /// Full versioned state snapshot captured at this step. pub state: VersionedState, + /// Node frontier to resume from when restoring this checkpoint. pub frontier: Vec, + /// Scheduler version-gating state for change detection. pub versions_seen: FxHashMap>, // scheduler gating + /// Maximum concurrent nodes configured for this session. pub concurrency_limit: usize, + /// Timestamp at which this checkpoint was created. pub created_at: DateTime, /// Nodes that executed in this step (empty for step 0) pub ran_nodes: Vec, @@ -164,7 +171,10 @@ pub enum CheckpointerError { ) ) )] - NotFound { session_id: String }, + NotFound { + /// The session ID that was not found. + session_id: String, + }, /// Backend storage error (database, filesystem, etc.). #[error("backend error: {message}")] @@ -175,7 +185,10 @@ pub enum CheckpointerError { help("Check backend connectivity and permissions; backend message: {message}.") ) )] - Backend { message: String }, + Backend { + /// Description of the backend storage error. + message: String, + }, /// Other checkpointer errors. #[error("checkpointer error: {message}")] @@ -183,7 +196,10 @@ pub enum CheckpointerError { feature = "diagnostics", diagnostic(code(weavegraph::checkpointer::other)) )] - Other { message: String }, + Other { + /// Human-readable description of the error. + message: String, + }, } /// Selects the backing implementation of the `Checkpointer` trait. @@ -213,6 +229,7 @@ pub enum CheckpointerType { Postgres, } +/// Convenience alias for checkpointer operation results. pub type Result = std::result::Result; /// Trait for persistent storage and retrieval of workflow execution state. diff --git a/src/runtimes/checkpointer_sqlite.rs b/src/runtimes/checkpointer_sqlite.rs index f4e3f91..ae26ef4 100644 --- a/src/runtimes/checkpointer_sqlite.rs +++ b/src/runtimes/checkpointer_sqlite.rs @@ -107,9 +107,11 @@ pub struct StepQueryResult { pub page_info: PageInfo, } +/// Errors that can occur within the SQLite-backed checkpointer. #[derive(Debug, Error)] #[cfg_attr(feature = "diagnostics", derive(miette::Diagnostic))] pub enum SQLiteCheckpointerError { + /// An underlying SQLx database error. #[error("SQLx error: {0}")] #[cfg_attr( feature = "diagnostics", @@ -120,6 +122,7 @@ pub enum SQLiteCheckpointerError { )] Sqlx(#[from] sqlx::Error), + /// A JSON serialization or deserialization error. #[error("JSON serialization error: {0}")] #[cfg_attr( feature = "diagnostics", @@ -130,6 +133,7 @@ pub enum SQLiteCheckpointerError { )] Serde(#[from] serde_json::Error), + /// A required field was missing from a persisted row. #[error("Missing persisted field: {0}")] #[cfg_attr( feature = "diagnostics", @@ -140,10 +144,12 @@ pub enum SQLiteCheckpointerError { )] Missing(&'static str), + /// A generic backend error. #[error("Backend error: {0}")] #[cfg_attr(feature = "diagnostics", diagnostic(code(weavegraph::sqlite::backend)))] Backend(String), + /// Any other error not covered by the above variants. #[error("Other error: {0}")] #[cfg_attr(feature = "diagnostics", diagnostic(code(weavegraph::sqlite::other)))] Other(String), diff --git a/src/runtimes/persistence.rs b/src/runtimes/persistence.rs index e8a4163..6743ce1 100644 --- a/src/runtimes/persistence.rs +++ b/src/runtimes/persistence.rs @@ -46,7 +46,9 @@ where /// Channel that stores a vector collection (e.g., messages) with version metadata. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct PersistedVecChannel { + /// Version counter for change-detection. pub version: u32, + /// The stored items. #[serde(default)] pub items: Vec, } @@ -63,7 +65,9 @@ impl Default for PersistedVecChannel { /// Channel that stores a map collection (e.g., extra) with version metadata. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct PersistedMapChannel { + /// Version counter for change-detection. pub version: u32, + /// The stored key-value map. #[serde(default)] pub map: FxHashMap, } @@ -80,8 +84,11 @@ impl Default for PersistedMapChannel { /// Complete persisted shape of the in‑memory VersionedState. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct PersistedState { + /// Persisted messages channel. pub messages: PersistedVecChannel, + /// Persisted extra key-value channel. pub extra: PersistedMapChannel, + /// Persisted errors channel. #[serde(default)] pub errors: PersistedVecChannel, } @@ -94,12 +101,17 @@ pub struct PersistedVersionsSeen(pub FxHashMap>); /// (Step history tables may store multiple instances of this shape.) #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct PersistedCheckpoint { + /// Unique session identifier. pub session_id: String, + /// Workflow step number for this checkpoint. pub step: u64, + /// Full state snapshot at this step. pub state: PersistedState, /// Frontier encoded as string vector using NodeKind::encode(). pub frontier: Vec, + /// Scheduler version-gating state. pub versions_seen: PersistedVersionsSeen, + /// Maximum concurrent nodes for this session. pub concurrency_limit: usize, /// RFC3339 string form of creation time (keeps chrono::DateTime out of serialized shape). pub created_at: String, @@ -120,6 +132,7 @@ use thiserror::Error; #[derive(Debug, Error)] #[cfg_attr(feature = "diagnostics", derive(miette::Diagnostic))] pub enum PersistenceError { + /// A required field was absent from the persisted data. #[error("missing field: {0}")] #[cfg_attr( feature = "diagnostics", @@ -130,6 +143,7 @@ pub enum PersistenceError { )] MissingField(&'static str), + /// A JSON serialization or deserialization error. #[error("JSON serialization/deserialization failed: {source}")] #[cfg_attr( feature = "diagnostics", @@ -139,10 +153,12 @@ pub enum PersistenceError { ) )] Serde { + /// The underlying serde_json error. #[source] source: serde_json::Error, }, + /// Any other persistence error. #[error("persistence error: {0}")] #[cfg_attr( feature = "diagnostics", @@ -151,6 +167,7 @@ pub enum PersistenceError { Other(String), } +/// Convenience alias for persistence operation results. pub type Result = std::result::Result; /* ---------- VersionedState <-> PersistedState Conversions ---------- */ diff --git a/src/runtimes/runner.rs b/src/runtimes/runner.rs index 1bc5759..0726cd7 100644 --- a/src/runtimes/runner.rs +++ b/src/runtimes/runner.rs @@ -59,7 +59,7 @@ use tracing::instrument; /// /// ```text /// ❌ WRONG: App.invoke() → Uses default EventBus (stdout only) -/// ✅ RIGHT: AppRunner::with_options_and_bus() → Custom EventBus with your sinks +/// ✅ RIGHT: AppRunner::builder() with .event_bus(bus) → Custom EventBus with your sinks /// ``` /// /// # Usage Patterns @@ -98,13 +98,13 @@ use tracing::instrument; /// let bus = EventBus::with_sinks(vec![Box::new(ChannelSink::new(tx))]); /// /// // Create runner with custom EventBus -/// let mut runner = AppRunner::with_options_and_bus( -/// app, -/// CheckpointerType::InMemory, -/// false, -/// bus, -/// true, -/// ).await; +/// let mut runner = AppRunner::builder() +/// .app(app) +/// .checkpointer(CheckpointerType::InMemory) +/// .autosave(false) +/// .event_bus(bus) +/// .build() +/// .await; /// /// let session_id = "my-session".to_string(); /// runner.create_session( @@ -126,7 +126,7 @@ use tracing::instrument; /// /// # See Also /// -/// - [`with_options_and_bus()`](Self::with_options_and_bus) - Recommended for custom event handling +/// - [`builder()`](Self::builder) - Recommended for custom event handling /// - [`App::invoke()`](crate::app::App::invoke) - Simple execution with defaults /// - Example: `examples/streaming_events.rs` - Complete streaming demonstration pub struct AppRunner { @@ -148,7 +148,10 @@ pub enum RunnerError { feature = "diagnostics", diagnostic(code(weavegraph::runner::session_not_found)) )] - SessionNotFound { session_id: String }, + SessionNotFound { + /// The session ID that was not found. + session_id: String, + }, /// No nodes are reachable from the Start node. #[error("no nodes to run from START (empty frontier)")] @@ -213,8 +216,8 @@ pub enum RunnerError { /// Builder for constructing [`AppRunner`] instances with a fluent API. /// -/// This builder consolidates all the various constructors (`new`, `with_options`, -/// `with_options_and_bus`, etc.) into a single, discoverable interface. +/// This builder is the canonical way to construct `AppRunner` instances. +/// It provides a single, discoverable interface for all configuration options. /// /// # Examples /// @@ -445,70 +448,6 @@ impl AppRunner { AppRunnerBuilder::new() } - /// Create a new AppRunner with default EventBus (stdout only). - /// - /// This is the simplest constructor, used internally by [`App::invoke()`](crate::app::App::invoke). - /// For custom event handling (streaming to web clients, etc.), use - /// [`with_options_and_bus()`](Self::with_options_and_bus) instead. - /// - /// # Parameters - /// - /// * `app` - The compiled workflow graph - /// * `checkpointer_type` - Persistence strategy (InMemory or SQLite) - /// - /// # Returns - /// - /// An AppRunner with: - /// - Default EventBus (stdout sink only) - /// - Autosave enabled - /// - Event listener started - /// - /// # Example - /// - /// ```rust,no_run - /// # use weavegraph::app::App; - /// use weavegraph::runtimes::{AppRunner, CheckpointerType}; - /// use weavegraph::state::VersionedState; - /// # async fn example(app: App) -> Result<(), Box> { - /// - /// let mut runner = AppRunner::new(app, CheckpointerType::InMemory).await; - /// - /// let session_id = "my-session".to_string(); - /// runner.create_session( - /// session_id.clone(), - /// VersionedState::new_with_user_message("Hello") - /// ).await?; - /// - /// runner.run_until_complete(&session_id).await?; - /// # Ok(()) - /// # } - /// ``` - /// - /// # See Also - /// - /// - [`builder()`](Self::builder) - **Preferred**: Fluent builder API - /// - [`with_options_and_bus()`](Self::with_options_and_bus) - For custom EventBus - /// - [`App::invoke()`](crate::app::App::invoke) - Higher-level API using this internally - #[deprecated( - since = "0.2.0", - note = "Use AppRunner::builder().app(app).checkpointer(type).build().await instead" - )] - #[must_use] - #[allow(deprecated)] - pub async fn new(app: App, checkpointer_type: CheckpointerType) -> Self { - Self::with_options(app, checkpointer_type, true).await - } - - #[deprecated( - since = "0.2.0", - note = "Use AppRunner::builder().app_arc(app).checkpointer(type).build().await instead" - )] - #[must_use] - #[allow(deprecated)] - pub async fn from_arc(app: Arc, checkpointer_type: CheckpointerType) -> Self { - Self::with_options_arc(app, checkpointer_type, true).await - } - async fn create_checkpointer( checkpointer_type: CheckpointerType, sqlite_db_name: Option, @@ -581,217 +520,6 @@ impl AppRunner { } } - /// Create with explicit checkpointer + autosave toggle - #[deprecated( - since = "0.2.0", - note = "Use AppRunner::builder().app(app).checkpointer(type).autosave(bool).build().await instead" - )] - pub async fn with_options( - app: App, - checkpointer_type: CheckpointerType, - autosave: bool, - ) -> Self { - let bus = app.runtime_config().event_bus.build_event_bus(); - let app = Arc::new(app); - Self::with_arc_and_bus(app, checkpointer_type, None, autosave, bus, true).await - } - - #[deprecated( - since = "0.2.0", - note = "Use AppRunner::builder().app_arc(app).checkpointer(type).autosave(bool).build().await instead" - )] - pub async fn with_options_arc( - app: Arc, - checkpointer_type: CheckpointerType, - autosave: bool, - ) -> Self { - let bus = app.runtime_config().event_bus.build_event_bus(); - Self::with_arc_and_bus(app, checkpointer_type, None, autosave, bus, true).await - } - - /// Create an AppRunner with a custom EventBus for advanced event handling. - /// - /// Use this method when you need to stream events to custom sinks (e.g., web clients, - /// logging systems, monitoring dashboards). This is the **preferred method** for - /// production applications that need real-time event streaming. - /// - /// # Why Use This Instead of `App.invoke()`? - /// - /// - `App.invoke()` uses a **default EventBus** (stdout only) - /// - This method lets you **inject a custom EventBus** with multiple sinks - /// - Essential for streaming events to web clients via SSE, WebSocket, etc. - /// - Allows per-request event isolation in web servers - /// - /// # Architecture - /// - /// The EventBus is a **runtime concern** managed by `AppRunner`, not `App`: - /// - /// ```text - /// GraphBuilder → App (graph structure) - /// ↓ - /// AppRunner::with_options_and_bus(app, ..., custom_bus) - /// ↓ - /// AppRunner { app, event_bus: custom_bus } - /// ↓ - /// NodeContext gets event_emitter - /// ↓ - /// Events → EventBus → Your custom sinks - /// ``` - /// - /// This design allows multiple AppRunners to share the same App with different - /// EventBus configurations (e.g., one EventBus per HTTP client connection). - /// - /// # Parameters - /// - /// * `app` - The compiled workflow graph - /// * `checkpointer_type` - Persistence strategy (InMemory or SQLite) - /// * `autosave` - Whether to automatically save checkpoints after each step - /// * `event_bus` - Your custom EventBus with desired sinks - /// * `start_listener` - Whether to start the EventBus listener immediately - /// - /// # Returns - /// - /// A configured `AppRunner` ready to execute workflows with custom event handling. - /// - /// # Examples - /// - /// ## Streaming Events to Web Clients (SSE) - /// - /// ```rust,no_run - /// use weavegraph::event_bus::{EventBus, ChannelSink, StdOutSink}; - /// use weavegraph::runtimes::{AppRunner, CheckpointerType}; - /// use weavegraph::state::VersionedState; - /// # use weavegraph::app::App; - /// # async fn example(app: App) -> Result<(), Box> { - /// - /// // Create a streaming channel (one per client in production) - /// let (tx, rx) = flume::unbounded(); - /// - /// // Create EventBus with both stdout and channel sinks - /// let bus = EventBus::with_sinks(vec![ - /// Box::new(StdOutSink::default()), // For server logs - /// Box::new(ChannelSink::new(tx)), // For client streaming - /// ]); - /// - /// // Create runner with custom EventBus - /// let mut runner = AppRunner::with_options_and_bus( - /// app, - /// CheckpointerType::InMemory, - /// false, // Don't autosave - /// bus, // Our custom EventBus - /// true, // Start listener - /// ).await; - /// - /// // Run workflow - events stream to the channel - /// let session_id = "client-123".to_string(); - /// let initial_state = VersionedState::new_with_user_message("Process this"); - /// runner.create_session(session_id.clone(), initial_state).await?; - /// - /// // Consume events in parallel - /// tokio::spawn(async move { - /// while let Ok(event) = rx.recv_async().await { - /// // Send to web client via SSE, WebSocket, etc. - /// println!("Stream to client: {:?}", event); - /// } - /// }); - /// - /// runner.run_until_complete(&session_id).await?; - /// # Ok(()) - /// # } - /// ``` - /// - /// ## Per-Request Event Isolation (Web Server Pattern) - /// - /// ```rust,no_run - /// use std::sync::Arc; - /// use weavegraph::event_bus::{EventBus, ChannelSink}; - /// use weavegraph::runtimes::{AppRunner, CheckpointerType}; - /// use weavegraph::state::VersionedState; - /// # use weavegraph::app::App; - /// # async fn handle_request(app: Arc, request_id: String) -> Result<(), Box> { - /// - /// // Each request gets its own EventBus and channel - /// let (tx, rx) = flume::unbounded(); - /// let bus = EventBus::with_sinks(vec![Box::new(ChannelSink::new(tx))]); - /// - /// // Clone the app (cheap Arc clone), create isolated runner - /// let mut runner = AppRunner::with_options_and_bus( - /// Arc::try_unwrap(app.clone()).unwrap_or_else(|arc| (*arc).clone()), - /// CheckpointerType::InMemory, - /// false, - /// bus, - /// true, - /// ).await; - /// - /// let session_id = format!("request-{}", request_id); - /// let initial = VersionedState::new_with_user_message("User request"); - /// runner.create_session(session_id.clone(), initial).await?; - /// - /// // Events are isolated to this request's channel - /// runner.run_until_complete(&session_id).await?; - /// # Ok(()) - /// # } - /// ``` - /// - /// # See Also - /// - /// - [`App::invoke()`](crate::app::App::invoke) - Simple execution with default EventBus - /// - [`EventBus::with_sinks()`](crate::event_bus::EventBus::with_sinks) - Create EventBus with custom sinks - /// - [`ChannelSink`](crate::event_bus::ChannelSink) - Stream events to async channels - /// - Example: `examples/streaming_events.rs` - Complete streaming demonstration - #[deprecated( - since = "0.2.0", - note = "Use AppRunner::builder().app(app).checkpointer(type).autosave(bool).event_bus(bus).start_listener(bool).build().await instead" - )] - pub async fn with_options_and_bus( - app: App, - checkpointer_type: CheckpointerType, - autosave: bool, - event_bus: EventBus, - start_listener: bool, - ) -> Self { - let app = Arc::new(app); - Self::with_arc_and_bus( - app, - checkpointer_type, - None, - autosave, - event_bus, - start_listener, - ) - .await - } - - /// Variant that accepts a preconfigured EventBus for an existing `Arc`. - /// - /// Same as [`with_options_and_bus()`](Self::with_options_and_bus) but accepts - /// an `Arc` to avoid unnecessary cloning when you already have the app - /// wrapped in an Arc. - /// - /// See [`with_options_and_bus()`](Self::with_options_and_bus) for detailed - /// documentation and examples. - #[deprecated( - since = "0.2.0", - note = "Use AppRunner::builder().app_arc(app).checkpointer(type).autosave(bool).event_bus(bus).start_listener(bool).build().await instead" - )] - pub async fn with_options_arc_and_bus( - app: Arc, - checkpointer_type: CheckpointerType, - autosave: bool, - event_bus: EventBus, - start_listener: bool, - ) -> Self { - Self::with_arc_and_bus( - app, - checkpointer_type, - None, - autosave, - event_bus, - start_listener, - ) - .await - } - async fn with_arc_and_bus( app: Arc, checkpointer_type: CheckpointerType, diff --git a/src/runtimes/runtime_config.rs b/src/runtimes/runtime_config.rs index e3c236f..e592cbc 100644 --- a/src/runtimes/runtime_config.rs +++ b/src/runtimes/runtime_config.rs @@ -1,19 +1,20 @@ +//! Runtime configuration types for controlling event bus, sinks, and diagnostics. use std::sync::Arc; use crate::event_bus::{EventBus, EventSink, MemorySink, StdOutSink}; -use super::{Checkpointer, CheckpointerType}; +use super::Checkpointer; +/// Configuration for a single [`AppRunner`](crate::runtimes::runner::AppRunner) instance. #[derive(Clone)] pub struct RuntimeConfig { + /// Optional session ID to use; a new UUID is generated if `None`. pub session_id: Option, - #[deprecated( - since = "0.3.4", - note = "Use RuntimeConfig::with_checkpointer(...) for enum convenience or RuntimeConfig::checkpointer_custom(...) for custom backends; field will be removed in 0.4.0" - )] - pub checkpointer: Option, + /// Custom [`Checkpointer`] to use instead of the built-in types. pub checkpointer_custom: Option>, + /// SQLite database file name; defaults to `SQLITE_DB_NAME` env var or `weavegraph.db`. pub sqlite_db_name: Option, + /// Event bus configuration used to build the [`EventBus`]. pub event_bus: EventBusConfig, } @@ -21,7 +22,6 @@ impl std::fmt::Debug for RuntimeConfig { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("RuntimeConfig") .field("session_id", &self.session_id) - .field("checkpointer", &self.checkpointer_type()) .field("checkpointer_custom", &self.checkpointer_custom.is_some()) .field("sqlite_db_name", &self.sqlite_db_name) .field("event_bus", &self.event_bus) @@ -30,12 +30,9 @@ impl std::fmt::Debug for RuntimeConfig { } impl Default for RuntimeConfig { - #[allow(deprecated)] fn default() -> Self { Self { - // Generate session identifiers lazily so helpers can pick a fresh id per run. session_id: None, - checkpointer: Some(CheckpointerType::InMemory), checkpointer_custom: None, sqlite_db_name: Self::resolve_sqlite_db_name(None), event_bus: EventBusConfig::default(), @@ -52,81 +49,74 @@ impl RuntimeConfig { Some(std::env::var("SQLITE_DB_NAME").unwrap_or_else(|_| "weavegraph.db".to_string())) } - #[allow(deprecated)] - pub fn new( - session_id: Option, - checkpointer: Option, - sqlite_db_name: Option, - ) -> Self { + /// Create a new `RuntimeConfig` with the given session ID and optional SQLite DB name. + pub fn new(session_id: Option, sqlite_db_name: Option) -> Self { Self { session_id, - checkpointer, checkpointer_custom: None, sqlite_db_name: Self::resolve_sqlite_db_name(sqlite_db_name), event_bus: EventBusConfig::default(), } } - #[allow(deprecated)] - #[must_use] - pub fn with_checkpointer(mut self, checkpointer: Option) -> Self { - self.checkpointer = checkpointer; - self - } - - #[must_use] - pub fn checkpointer_type(&self) -> Option { - #[allow(deprecated)] - { - self.checkpointer.clone() - } - } - #[must_use] + /// Set a custom [`Checkpointer`] for this configuration. pub fn checkpointer_custom(mut self, checkpointer: Arc) -> Self { self.checkpointer_custom = Some(checkpointer); self } #[must_use] + /// Return the custom checkpointer if one has been set. pub fn custom_checkpointer(&self) -> Option> { self.checkpointer_custom.clone() } #[must_use] + /// Replace the event bus configuration for this runtime. pub fn with_event_bus(mut self, event_bus: EventBusConfig) -> Self { self.event_bus = event_bus; self } #[must_use] + /// Configure the runtime with a stdout-only event bus. pub fn with_stdout_event_bus(self) -> Self { self.with_event_bus(EventBusConfig::with_stdout_only()) } #[must_use] + /// Configure the runtime with an in-memory event bus (useful for testing). pub fn with_memory_event_bus(self) -> Self { self.with_event_bus(EventBusConfig::with_memory_sink()) } } +/// Selects the output target for an [`EventBusConfig`] sink entry. #[derive(Clone, Debug, PartialEq, Eq)] pub enum SinkConfig { + /// Write events to standard output. StdOut, + /// Capture events in memory (useful for testing). Memory, } +/// Configuration for building the [`EventBus`] used by a runtime. #[derive(Clone, Debug)] pub struct EventBusConfig { + /// Broadcast channel capacity; events are dropped when the buffer is full. pub buffer_capacity: usize, + /// Ordered list of sink targets that will receive events. pub sinks: Vec, diagnostics: DiagnosticsConfig, } impl EventBusConfig { + /// Default broadcast channel capacity. pub const DEFAULT_BUFFER_CAPACITY: usize = 1024; #[must_use] + /// Create an `EventBusConfig` with the given capacity and sinks. pub fn new(buffer_capacity: usize, sinks: Vec) -> Self { Self { buffer_capacity: if buffer_capacity == 0 { @@ -140,17 +130,20 @@ impl EventBusConfig { } #[must_use] + /// Create an `EventBusConfig` with a single stdout sink at the default capacity. pub fn with_stdout_only() -> Self { Self::new(Self::DEFAULT_BUFFER_CAPACITY, vec![SinkConfig::StdOut]) } #[must_use] + /// Create an `EventBusConfig` with a single in-memory sink (silent stdout) at the default capacity. pub fn with_memory_sink() -> Self { // Memory sink intentionally omits stdout so callers get a silent capture by default. Self::new(Self::DEFAULT_BUFFER_CAPACITY, vec![SinkConfig::Memory]) } #[must_use] + /// Add a sink to this configuration, ignoring duplicates. pub fn add_sink(mut self, sink: SinkConfig) -> Self { if !self.sinks.contains(&sink) { self.sinks.push(sink); @@ -158,21 +151,25 @@ impl EventBusConfig { self } + /// Returns the configured broadcast buffer capacity. pub fn buffer_capacity(&self) -> usize { self.buffer_capacity } + /// Returns the configured sink list. pub fn sinks(&self) -> &[SinkConfig] { &self.sinks } #[must_use] + /// Override the diagnostics configuration for this event bus. pub fn with_diagnostics(mut self, diagnostics: DiagnosticsConfig) -> Self { self.diagnostics = diagnostics.with_default_capacity(self.buffer_capacity); self } #[must_use] + /// Build and return the configured [`EventBus`]. pub fn build_event_bus(&self) -> EventBus { let mut sinks: Vec> = if self.sinks.is_empty() { vec![Box::new(StdOutSink::default())] @@ -204,10 +201,14 @@ impl Default for EventBusConfig { } } +/// Configuration controlling the diagnostics (sink health) broadcast channel. #[derive(Clone, Debug, PartialEq, Eq)] pub struct DiagnosticsConfig { + /// Whether sink diagnostics are enabled. pub enabled: bool, + /// Optional override for the diagnostics channel capacity; falls back to the event bus capacity. pub buffer_capacity: Option, + /// Whether diagnostics should also be forwarded into the main event stream. pub emit_to_events: bool, } @@ -216,6 +217,7 @@ impl DiagnosticsConfig { capacity.max(1) } + /// Create a default `DiagnosticsConfig` with the given event bus capacity. pub fn default_with_capacity(event_bus_capacity: usize) -> Self { Self { enabled: true, @@ -224,6 +226,7 @@ impl DiagnosticsConfig { } } + /// Fill in the buffer capacity from `event_bus_capacity` if not already set. pub fn with_default_capacity(mut self, event_bus_capacity: usize) -> Self { if self.buffer_capacity.is_none() { self.buffer_capacity = Some(Self::normalize_capacity(event_bus_capacity)); @@ -231,6 +234,7 @@ impl DiagnosticsConfig { self } + /// Return the effective diagnostics channel capacity, falling back to `event_bus_capacity`. pub fn effective_capacity(&self, event_bus_capacity: usize) -> usize { self.buffer_capacity .unwrap_or_else(|| Self::normalize_capacity(event_bus_capacity)) diff --git a/src/schedulers/mod.rs b/src/schedulers/mod.rs index 222ad83..53f2dae 100644 --- a/src/schedulers/mod.rs +++ b/src/schedulers/mod.rs @@ -1,3 +1,4 @@ +//! Frontier-based workflow scheduler with version gating and bounded concurrency. pub mod scheduler; pub use scheduler::{Scheduler, SchedulerError, SchedulerState, StepRunResult}; diff --git a/src/schedulers/scheduler.rs b/src/schedulers/scheduler.rs index c12a8df..da8714c 100644 --- a/src/schedulers/scheduler.rs +++ b/src/schedulers/scheduler.rs @@ -167,6 +167,7 @@ pub struct SchedulerState { /// ``` #[derive(Debug, Default, Clone)] pub struct Scheduler { + /// Maximum number of nodes that may execute concurrently in a single superstep. pub concurrency_limit: usize, } @@ -223,7 +224,12 @@ pub enum SchedulerError { help("Ensure all nodes in the graph are registered before execution.") ) )] - NodeNotFound { kind: NodeKind, step: u64 }, + NodeNotFound { + /// The node kind that was expected in the registry. + kind: NodeKind, + /// The workflow step at which the lookup failed. + step: u64, + }, /// A node failed during execution. /// @@ -238,9 +244,12 @@ pub enum SchedulerError { #[error("node run error at step {step} for {kind:?}: {source}")] #[cfg_attr(feature = "diagnostics", diagnostic(code(weavegraph::scheduler::node)))] NodeRun { + /// The node kind that encountered an error. kind: NodeKind, + /// The workflow step at which the node failed. step: u64, #[source] + /// The underlying node error. source: NodeError, }, diff --git a/src/telemetry/mod.rs b/src/telemetry/mod.rs index 189efd4..b362e1c 100644 --- a/src/telemetry/mod.rs +++ b/src/telemetry/mod.rs @@ -1,10 +1,14 @@ +//! Telemetry formatting utilities for rendering workflow events as human-readable or machine-readable output. use crate::channels::errors::ErrorEvent; use crate::event_bus::Event; use std::io::IsTerminal; use std::sync::OnceLock; +/// ANSI escape code for green context text in telemetry output. pub const CONTEXT_COLOR: &str = "\x1b[32m"; // green +/// ANSI escape code for magenta line text in telemetry output. pub const LINE_COLOR: &str = "\x1b[35m"; // magenta / dark pink +/// ANSI escape code to reset terminal color after colored output. pub const RESET_COLOR: &str = "\x1b[0m"; static IS_STDERR_TERMINAL: OnceLock = OnceLock::new(); @@ -73,18 +77,24 @@ impl FormatterMode { /// Rendered output for a telemetry item that can be consumed by sinks. #[derive(Clone, Debug, Default)] pub struct EventRender { + /// Optional context prefix shown before the event lines. pub context: Option, + /// One or more formatted lines for this event. pub lines: Vec, } impl EventRender { + /// Concatenate all lines into a single string. pub fn join_lines(&self) -> String { self.lines.join("") } } +/// Trait for formatting workflow events and errors into rendered output. pub trait TelemetryFormatter: Send + Sync { + /// Render a single [`Event`] into an [`EventRender`]. fn render_event(&self, event: &Event) -> EventRender; + /// Render a slice of [`ErrorEvent`]s, one [`EventRender`] per error. fn render_errors(&self, errors: &[ErrorEvent]) -> Vec; } diff --git a/src/utils/id_generator.rs b/src/utils/id_generator.rs index 937584c..bc7c53a 100644 --- a/src/utils/id_generator.rs +++ b/src/utils/id_generator.rs @@ -19,7 +19,10 @@ pub enum IdError { feature = "diagnostics", diagnostic(code(weavegraph::id::invalid_format)) )] - InvalidFormat { format: String }, + InvalidFormat { + /// The invalid format string that caused the error. + format: String, + }, /// ID generation failed due to system constraints. #[error("ID generation failed: {reason}")] @@ -27,7 +30,10 @@ pub enum IdError { feature = "diagnostics", diagnostic(code(weavegraph::id::generation_failed)) )] - GenerationFailed { reason: String }, + GenerationFailed { + /// Human-readable description of why generation failed. + reason: String, + }, } /// Configuration for ID generation behavior. diff --git a/src/utils/json_ext.rs b/src/utils/json_ext.rs index be36704..608b0dd 100644 --- a/src/utils/json_ext.rs +++ b/src/utils/json_ext.rs @@ -17,7 +17,10 @@ pub enum JsonError { feature = "diagnostics", diagnostic(code(weavegraph::json::invalid_pointer)) )] - InvalidPointer { pointer: String }, + InvalidPointer { + /// The invalid JSON pointer string. + pointer: String, + }, /// JSON merge conflict that cannot be resolved. #[error("Merge conflict at path '{path}': cannot merge {left_type} with {right_type}")] @@ -26,8 +29,11 @@ pub enum JsonError { diagnostic(code(weavegraph::json::merge_conflict)) )] MergeConflict { + /// JSON path where the conflict occurred. path: String, + /// Type of the left operand at the conflict point. left_type: String, + /// Type of the right operand at the conflict point. right_type: String, }, @@ -35,6 +41,7 @@ pub enum JsonError { #[error("JSON serialization error: {source}")] #[cfg_attr(feature = "diagnostics", diagnostic(code(weavegraph::json::serde)))] Serde { + /// The underlying serde_json error. #[from] source: serde_json::Error, }, diff --git a/tests/graphs.rs b/tests/graphs.rs index cc6f924..91aebce 100644 --- a/tests/graphs.rs +++ b/tests/graphs.rs @@ -125,7 +125,7 @@ fn test_builder_fluent_api() { fn test_runtime_config_integration() { use weavegraph::runtimes::RuntimeConfig; - let config = RuntimeConfig::new(Some("test_session".into()), None, None); + let config = RuntimeConfig::new(Some("test_session".into()), None); let builder = GraphBuilder::new() .add_edge(NodeKind::Start, NodeKind::End) diff --git a/tests/runtimes_runner.rs b/tests/runtimes_runner.rs index 03196ae..a0a176e 100644 --- a/tests/runtimes_runner.rs +++ b/tests/runtimes_runner.rs @@ -250,13 +250,8 @@ async fn test_runtime_config_custom_checkpointer_takes_precedence() { ); let probe = Arc::new(ProbeCheckpointer::with_checkpoint(checkpoint)); - let runtime_config = RuntimeConfig::new( - Some(session_id.to_string()), - Some(CheckpointerType::InMemory), - None, - ) - .checkpointer_custom(probe.clone()) - .with_checkpointer(Some(CheckpointerType::InMemory)); + let runtime_config = + RuntimeConfig::new(Some(session_id.to_string()), None).checkpointer_custom(probe.clone()); let app = GraphBuilder::new() .add_node(NodeKind::Custom("test".into()), TestNode { name: "test" }) @@ -519,7 +514,6 @@ async fn test_resume_from_checkpoint() { .add_edge(NodeKind::Custom("test".into()), NodeKind::End) .with_runtime_config(RuntimeConfig::new( None, - Some(CheckpointerType::SQLite), Some(db_path.display().to_string()), )) .compile()