diff --git a/jetstream-extra/Cargo.toml b/jetstream-extra/Cargo.toml index e10886b..21381ce 100644 --- a/jetstream-extra/Cargo.toml +++ b/jetstream-extra/Cargo.toml @@ -12,7 +12,7 @@ keywords = ["nats", "extra", "extensions", "api", "jetstream"] categories = ["network-programming", "api-bindings"] [dependencies] -async-nats = "0.45.0" +async-nats = "0.48.0" bytes = "1" futures = "0.3" futures-util = "0.3" diff --git a/jetstream-extra/README.md b/jetstream-extra/README.md index 036c4a6..0551bca 100644 --- a/jetstream-extra/README.md +++ b/jetstream-extra/README.md @@ -10,6 +10,7 @@ Set of utilities and extensions for the JetStream NATS of the [async-nats](https ## Features - **Batch Publishing** - Atomic batch publishing ensuring all-or-nothing message storage +- **Fast Ingest Batch Publishing** - High-throughput, non-atomic batch publishing with server-driven flow control (requires nats-server 2.14+) - **Batch Fetching** - Efficient multi-message retrieval using DIRECT.GET API ## Batch Publishing @@ -55,6 +56,59 @@ async fn main() -> Result<(), Box> { } ``` +## Fast Ingest Batch Publishing + +High-throughput, non-atomic batch publishing using JetStream's fast-ingest feature (ADR-50, requires nats-server 2.14 or later). Unlike atomic batch publishing, messages are persisted as they arrive and the server uses a flow-control channel to coordinate throughput across concurrent publishers. + +Use fast ingest when: +- You need to ship millions of messages per batch and don't need all-or-nothing semantics. +- Throughput matters more than atomicity. +- You want the server to dynamically tune ack frequency based on load. + +The stream must have `allow_batched: true`. The publisher owns a dedicated inbox subscription for the duration of the batch and drives ack handling inline — no background task, no locks. + +### Complete example + +```rust +use async_nats::jetstream; +use jetstream_extra::batch_publish_fast::{FastPublishExt, GapMode}; +use std::time::Duration; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let client = async_nats::connect("nats://127.0.0.1:4222").await?; + let jetstream = jetstream::new(client); + + // Stream must have allow_batched: true (not yet exposed in async-nats + // 0.45.0 StreamConfig — create via raw JetStream API until upstream + // adds the field). + + let mut batch = jetstream + .fast_publish() + .flow(100) // ack every 100 messages (ceiling) + .max_outstanding_acks(2) // up to 200 messages in flight + .gap_mode(GapMode::Fail) // abort on any gap (default) + .ack_timeout(Duration::from_secs(10)) + .on_error(|e| eprintln!("fast publish event: {e}")) + .build()?; + + // Stream 10,000 messages. The stall gate transparently waits for flow + // acks and sends pings to recover from any lost acks. + for i in 0..10_000 { + batch.add("metrics.cpu", format!("sample {i}").into()).await?; + } + + // End-of-batch commit — the commit message itself is NOT stored. + // Use `commit(...)` instead if you want a final message persisted. + let ack = batch.close().await?; + println!("committed {} messages as batch {}", ack.batch_size, ack.batch_id); + + Ok(()) +} +``` + +See `examples/fast_publisher.rs` for a runnable example. + ## Batch Fetching Efficient batch fetching of messages from JetStream streams using the DIRECT.GET API, supporting: diff --git a/jetstream-extra/src/batch_publish.rs b/jetstream-extra/src/batch_publish.rs index c14639e..eb6914c 100644 --- a/jetstream-extra/src/batch_publish.rs +++ b/jetstream-extra/src/batch_publish.rs @@ -862,6 +862,8 @@ pub enum BatchPublishErrorKind { BatchPublishNotEnabled, /// Batch publish is incomplete and was abandoned BatchPublishIncomplete, + /// Server has too many inflight batches (server limit: 50) + BatchPublishTooManyInflight, /// Batch uses unsupported headers (Nats-Msg-Id or Nats-Expected-Last-Msg-Id) BatchPublishUnsupportedHeader, /// Other unspecified error @@ -875,6 +877,7 @@ impl BatchPublishErrorKind { match error.error_code() { ErrorCode::ATOMIC_PUBLISH_DISABLED => Self::BatchPublishNotEnabled, ErrorCode::ATOMIC_PUBLISH_INCOMPLETE_BATCH => Self::BatchPublishIncomplete, + ErrorCode::ATOMIC_PUBLISH_TOO_MANY_INFLIGHT => Self::BatchPublishTooManyInflight, ErrorCode::ATOMIC_PUBLISH_UNSUPPORTED_HEADER => Self::BatchPublishUnsupportedHeader, ErrorCode::ATOMIC_PUBLISH_TOO_LARGE_BATCH => Self::MaxMessagesExceeded, _ => Self::Other, @@ -895,6 +898,9 @@ impl Display for BatchPublishErrorKind { Self::BatchPublishIncomplete => { write!(f, "batch publish is incomplete and was abandoned") } + Self::BatchPublishTooManyInflight => { + write!(f, "server has too many inflight batches (limit: 50)") + } Self::BatchPublishUnsupportedHeader => write!( f, "batch uses unsupported headers (Nats-Msg-Id or Nats-Expected-Last-Msg-Id)" diff --git a/jetstream-extra/src/lib.rs b/jetstream-extra/src/lib.rs index 8a9bd46..dd01d19 100644 --- a/jetstream-extra/src/lib.rs +++ b/jetstream-extra/src/lib.rs @@ -34,6 +34,23 @@ //! # } //! ``` //! +//! ## Fast Ingest Batch Publishing +//! +//! The [batch_publish_fast] module provides high-throughput, non-atomic batch publishing +//! using JetStream's fast-ingest feature (nats-server 2.14+): +//! +//! ```no_run +//! # use jetstream_extra::batch_publish_fast::FastPublishExt; +//! # async fn example(client: impl FastPublishExt) -> Result<(), Box> { +//! let mut batch = client.fast_publish().build()?; +//! for i in 0..1000 { +//! batch.add("events.data", format!("msg {i}").into()).await?; +//! } +//! let ack = batch.commit("events.done", "final".into()).await?; +//! # Ok(()) +//! # } +//! ``` +//! //! ## Batch Fetching //! //! The [batch_fetch] module provides efficient batch fetching of messages from streams @@ -61,6 +78,7 @@ pub mod batch_fetch; pub mod batch_publish; +pub mod batch_publish_fast; pub use async_nats::Subject; /// Re-exported type returned by Direct Get operation. diff --git a/jetstream-extra/tests/batch_publish_errors.rs b/jetstream-extra/tests/batch_publish_errors.rs index e462c64..f45b560 100644 --- a/jetstream-extra/tests/batch_publish_errors.rs +++ b/jetstream-extra/tests/batch_publish_errors.rs @@ -235,10 +235,10 @@ mod batch_publish_error_tests { let err = batch.add("test_incomplete.1", "data".into()).await; // This might fail either immediately or on commit - if err.is_err() { + if let Err(err) = err { assert_eq!( - err.unwrap_err().kind(), - BatchPublishErrorKind::BatchPublishIncomplete + err.kind(), + BatchPublishErrorKind::BatchPublishTooManyInflight ); } else { // If add succeeded, commit should fail @@ -246,7 +246,10 @@ mod batch_publish_error_tests { .commit("test_incomplete.2", "final".into()) .await .unwrap_err(); - assert_eq!(err.kind(), BatchPublishErrorKind::BatchPublishIncomplete); + assert_eq!( + err.kind(), + BatchPublishErrorKind::BatchPublishTooManyInflight + ); } } diff --git a/nats-counters/Cargo.toml b/nats-counters/Cargo.toml index f242c77..7aca989 100644 --- a/nats-counters/Cargo.toml +++ b/nats-counters/Cargo.toml @@ -12,9 +12,9 @@ keywords = ["nats","extensions", "api", "jetstream", "counters"] categories = ["network-programming", "api-bindings"] [dependencies] -async-nats = "0.45.0" +async-nats = "0.48.0" futures-util = "0.3" -jetstream-extra = "0.2.1" +jetstream-extra = { version = "0.2.1", path = "../jetstream-extra" } num-bigint = "0.4.6" serde = { version = "1", features = ["derive"] } serde_json = "1.0.145"