Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion jetstream-extra/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ keywords = ["nats", "extra", "extensions", "api", "jetstream"]
categories = ["network-programming", "api-bindings"]

[dependencies]
async-nats = "0.45.0"
async-nats = "0.48.0"
bytes = "1"
futures = "0.3"
futures-util = "0.3"
Expand Down
54 changes: 54 additions & 0 deletions jetstream-extra/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ Set of utilities and extensions for the JetStream NATS of the [async-nats](https
## Features

- **Batch Publishing** - Atomic batch publishing ensuring all-or-nothing message storage
- **Fast Ingest Batch Publishing** - High-throughput, non-atomic batch publishing with server-driven flow control (requires nats-server 2.14+)
- **Batch Fetching** - Efficient multi-message retrieval using DIRECT.GET API

## Batch Publishing
Expand Down Expand Up @@ -55,6 +56,59 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}
```

## Fast Ingest Batch Publishing

High-throughput, non-atomic batch publishing using JetStream's fast-ingest feature (ADR-50, requires nats-server 2.14 or later). Unlike atomic batch publishing, messages are persisted as they arrive and the server uses a flow-control channel to coordinate throughput across concurrent publishers.

Use fast ingest when:
- You need to ship millions of messages per batch and don't need all-or-nothing semantics.
- Throughput matters more than atomicity.
- You want the server to dynamically tune ack frequency based on load.

The stream must have `allow_batched: true`. The publisher owns a dedicated inbox subscription for the duration of the batch and drives ack handling inline — no background task, no locks.

### Complete example

```rust
use async_nats::jetstream;
use jetstream_extra::batch_publish_fast::{FastPublishExt, GapMode};
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = async_nats::connect("nats://127.0.0.1:4222").await?;
let jetstream = jetstream::new(client);

// Stream must have allow_batched: true (not yet exposed in async-nats
// 0.45.0 StreamConfig — create via raw JetStream API until upstream
// adds the field).

let mut batch = jetstream
.fast_publish()
.flow(100) // ack every 100 messages (ceiling)
.max_outstanding_acks(2) // up to 200 messages in flight
.gap_mode(GapMode::Fail) // abort on any gap (default)
.ack_timeout(Duration::from_secs(10))
.on_error(|e| eprintln!("fast publish event: {e}"))
.build()?;

// Stream 10,000 messages. The stall gate transparently waits for flow
// acks and sends pings to recover from any lost acks.
for i in 0..10_000 {
batch.add("metrics.cpu", format!("sample {i}").into()).await?;
}

// End-of-batch commit — the commit message itself is NOT stored.
// Use `commit(...)` instead if you want a final message persisted.
let ack = batch.close().await?;
println!("committed {} messages as batch {}", ack.batch_size, ack.batch_id);

Ok(())
}
```

See `examples/fast_publisher.rs` for a runnable example.

## Batch Fetching

Efficient batch fetching of messages from JetStream streams using the DIRECT.GET API, supporting:
Expand Down
6 changes: 6 additions & 0 deletions jetstream-extra/src/batch_publish.rs
Original file line number Diff line number Diff line change
Expand Up @@ -862,6 +862,8 @@ pub enum BatchPublishErrorKind {
BatchPublishNotEnabled,
/// Batch publish is incomplete and was abandoned
BatchPublishIncomplete,
/// Server has too many inflight batches (server limit: 50)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The server limit of 50 is hard-coded in both the doc comment and the Display impl below. If the NATS server changes this limit in a future release the error message will be stale. Consider either omitting the specific number or noting it as the value at the time of the async-nats 0.48 / nats-server 2.14 release:

Suggested change
/// Server has too many inflight batches (server limit: 50)
/// Server has too many inflight batches
BatchPublishTooManyInflight,

BatchPublishTooManyInflight,
/// Batch uses unsupported headers (Nats-Msg-Id or Nats-Expected-Last-Msg-Id)
BatchPublishUnsupportedHeader,
/// Other unspecified error
Expand All @@ -875,6 +877,7 @@ impl BatchPublishErrorKind {
match error.error_code() {
ErrorCode::ATOMIC_PUBLISH_DISABLED => Self::BatchPublishNotEnabled,
ErrorCode::ATOMIC_PUBLISH_INCOMPLETE_BATCH => Self::BatchPublishIncomplete,
ErrorCode::ATOMIC_PUBLISH_TOO_MANY_INFLIGHT => Self::BatchPublishTooManyInflight,
ErrorCode::ATOMIC_PUBLISH_UNSUPPORTED_HEADER => Self::BatchPublishUnsupportedHeader,
ErrorCode::ATOMIC_PUBLISH_TOO_LARGE_BATCH => Self::MaxMessagesExceeded,
_ => Self::Other,
Expand All @@ -895,6 +898,9 @@ impl Display for BatchPublishErrorKind {
Self::BatchPublishIncomplete => {
write!(f, "batch publish is incomplete and was abandoned")
}
Self::BatchPublishTooManyInflight => {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same hard-coded limit concern as the doc comment above. Suggestion:

Suggested change
Self::BatchPublishTooManyInflight => {
Self::BatchPublishTooManyInflight => {
write!(f, "server has too many inflight batches")
}

write!(f, "server has too many inflight batches (limit: 50)")
}
Self::BatchPublishUnsupportedHeader => write!(
f,
"batch uses unsupported headers (Nats-Msg-Id or Nats-Expected-Last-Msg-Id)"
Expand Down
18 changes: 18 additions & 0 deletions jetstream-extra/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,23 @@
//! # }
//! ```
//!
//! ## Fast Ingest Batch Publishing
//!
//! The [batch_publish_fast] module provides high-throughput, non-atomic batch publishing
//! using JetStream's fast-ingest feature (nats-server 2.14+):
//!
//! ```no_run
//! # use jetstream_extra::batch_publish_fast::FastPublishExt;
//! # async fn example(client: impl FastPublishExt) -> Result<(), Box<dyn std::error::Error>> {
//! let mut batch = client.fast_publish().build()?;
//! for i in 0..1000 {
//! batch.add("events.data", format!("msg {i}").into()).await?;
//! }
//! let ack = batch.commit("events.done", "final".into()).await?;
//! # Ok(())
//! # }
//! ```
//!
//! ## Batch Fetching
//!
//! The [batch_fetch] module provides efficient batch fetching of messages from streams
Expand Down Expand Up @@ -61,6 +78,7 @@

pub mod batch_fetch;
pub mod batch_publish;
pub mod batch_publish_fast;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Build-breaking: batch_publish_fast.rs is missing.

pub mod batch_publish_fast; is declared here, and the README/crate-level docs reference jetstream_extra::batch_publish_fast::FastPublishExt, but no batch_publish_fast.rs (or batch_publish_fast/mod.rs) file exists in the repository. The crate will not compile as-is. The file needs to be included in this PR, or the module declaration and its documentation should be deferred to the follow-up PR that implements the module.


pub use async_nats::Subject;
/// Re-exported type returned by Direct Get operation.
Expand Down
11 changes: 7 additions & 4 deletions jetstream-extra/tests/batch_publish_errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,18 +235,21 @@ mod batch_publish_error_tests {
let err = batch.add("test_incomplete.1", "data".into()).await;

// This might fail either immediately or on commit
if err.is_err() {
if let Err(err) = err {
assert_eq!(
err.unwrap_err().kind(),
BatchPublishErrorKind::BatchPublishIncomplete
err.kind(),
BatchPublishErrorKind::BatchPublishTooManyInflight
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The assertion flip from BatchPublishIncomplete to BatchPublishTooManyInflight is correct for this test scenario (51st concurrent batch hitting the server inflight limit), but the test name test_incomplete_batch_error no longer matches what it actually tests. Consider renaming it to test_too_many_inflight_error to avoid confusion.

);
} else {
// If add succeeded, commit should fail
let err = batch
.commit("test_incomplete.2", "final".into())
.await
.unwrap_err();
assert_eq!(err.kind(), BatchPublishErrorKind::BatchPublishIncomplete);
assert_eq!(
err.kind(),
BatchPublishErrorKind::BatchPublishTooManyInflight
);
}
}

Expand Down
4 changes: 2 additions & 2 deletions nats-counters/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ keywords = ["nats","extensions", "api", "jetstream", "counters"]
categories = ["network-programming", "api-bindings"]

[dependencies]
async-nats = "0.45.0"
async-nats = "0.48.0"
futures-util = "0.3"
jetstream-extra = "0.2.1"
jetstream-extra = { version = "0.2.1", path = "../jetstream-extra" }
num-bigint = "0.4.6"
serde = { version = "1", features = ["derive"] }
serde_json = "1.0.145"
Expand Down
Loading