Skip to content

Bump dependencies and implement fast ingest#24

Merged
Jarema merged 9 commits into
mainfrom
fast-ingest
May 11, 2026
Merged

Bump dependencies and implement fast ingest#24
Jarema merged 9 commits into
mainfrom
fast-ingest

Conversation

@Jarema
Copy link
Copy Markdown
Collaborator

@Jarema Jarema commented May 8, 2026

Signed-off-by: Tomasz Pietrek tomasz@synadia.com

Jarema and others added 3 commits May 8, 2026 12:29
Signed-off-by: Tomasz Pietrek <tomasz@synadia.com>
Signed-off-by: Tomasz Pietrek <tomasz@synadia.com>
Previous commit 1571c83 declared `pub mod batch_publish_fast;` in lib.rs
but did not include the module file itself, breaking CI with E0583.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Tomasz Pietrek <tomasz@synadia.com>
@synadia-claude-reviewer
Copy link
Copy Markdown

synadia-claude-reviewer Bot commented May 8, 2026

Claude finished @Jarema's task in 5m 18s —— View job


PR Review: Bump dependencies and implement fast ingest

  • Gathering context
  • Reading changed files
  • Analyzing fast ingest implementation
  • Checking for correctness, security, and performance issues
  • Posting inline comments and review summary

Summary

This is a thorough and well-structured implementation of ADR-50 fast ingest. The architecture — inline state machine, lazily-created subscription, no background tasks, no shared locks — is the right call and mirrors the Go reference cleanly. The SharedHandlerState borrow-splitting trick for drain_nonblocking is idiomatic and avoids the Sync requirement correctly. The stall gate formula matches ADR-50 precisely, and the saturation arithmetic prevents overflow on pathological inputs. Test coverage is good: unit tests for all wire-format paths, unit tests for the stall formula, and integration tests that mirror the Go reference suite.

Issues found:

  1. Dead code in validate_inbox_shape (line ~286): The inbox.is_empty() check is unreachable — an empty string has 0 dots and is already caught by matches('.').count() != 1. The subsequent split_once('.').unwrap() is safe but benefits from an expect to document the invariant. Left an inline comment with a simplified alternative. Fix this →

  2. FastPublishErrorKind missing #[non_exhaustive]: Unlike BatchPublishErrorKind (annotated in this same PR), FastPublishErrorKind is not marked #[non_exhaustive]. Adding a new error variant in a future release is currently a breaking change for downstream exhaustive match arms. Fix this →

  3. Timing-sensitive integration tests: flow_err_on_expected_last_sequence_mismatch_fail and flow_err_callback_fires_in_ok_mode use tokio::time::sleep(Duration::from_millis(200)) to wait for async server delivery of BatchFlowErr. These are inherently racy on slow CI. Adding extra messages after the sleep (forcing a drain pass through drain_nonblocking) is a more reliable trigger than a bare sleep.

  4. InvalidState for config validation at build() (line ~537): Returning InvalidState for an out-of-range max_outstanding_acks at build time is misleading — no state exists yet at that point. Consider reusing Other with a descriptive message, or adding an InvalidConfig variant.

  5. Raw error code integers in from_api_error: Codes 10205–10211 are matched as bare integers. Named constants would make the mapping auditable and searchable. Fix this →

No correctness bugs found. The stall gate, reply-subject encoding, gap/flow-error handling, EOB commit, single-message fast path, ping recovery, and subscriber lifecycle are all correct. The nats-counters path dependency is fine — cargo publish strips path and falls back to the version constraint.


// ---------------------------------------------------------------------------
// Errors
// ---------------------------------------------------------------------------
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FastPublishErrorKind is missing #[non_exhaustive], unlike BatchPublishErrorKind which was annotated in this same PR. Without it, adding a new variant in a future minor release is a breaking change for downstream users who exhaustively match on error kinds.

serde_json::from_slice(payload)
.map_err(|e| FastPublishError::with_source(FastPublishErrorKind::Serialization, e))?;

Ok(match resp {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dead code: an empty string has 0 dots and is already caught by matches('.').count() != 1 above — this branch is unreachable. The same logic issue means the function can be simplified:

pub(crate) fn validate_inbox_shape(inbox: &str) -> Result<(), FastPublishError> {
    let mut parts = inbox.splitn(3, '.');
    let a = parts.next().unwrap_or(""); // always has at least one part
    let b = parts.next().unwrap_or("");
    let no_third = parts.next().is_none();
    if a.is_empty() || b.is_empty() || !no_third {
        return Err(FastPublishError::new(FastPublishErrorKind::InvalidInboxShape));
    }
    Ok(())
}

for h in handles {
let ack = h.await.unwrap();
assert_eq!(ack.batch_size, 101);
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This 200 ms sleep is a timing dependency that makes the test fragile on slow CI runners. The test depends on the server delivering a BatchFlowErr within that window, but there's no guarantee under load. Consider polling batch.commit() with a short retry loop instead, or structuring the test so the error surfaces synchronously (e.g. by completing the stall gate cycle before committing, which forces a drain).

/// reference implementation and avoids wasted subscriptions when a
/// publisher is built and then dropped unused.
///
/// # Errors
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error returned for max_outstanding_acks outside 1..=3 is InvalidState, but at build() time no state exists yet — this is a configuration validation error. InvalidState implies a runtime-state violation (e.g. calling add on a closed publisher). A more descriptive kind or an adjustment to the error message would make the failure easier to diagnose.

InvalidPattern,
/// 10207 — batch id exceeds 64 characters or is otherwise invalid.
InvalidBatchId,
/// 10208 — server has forgotten this batch (timed out, leader change in
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error codes are matched as raw integer literals. If the server-side error code definitions change (unlikely, but not impossible), this mapping silently diverges. Consider defining these as named constants alongside their mapping, similar to how async-nats exposes ErrorCode::ATOMIC_PUBLISH_DISABLED for the atomic-batch path, so the intent is searchable and auditable.

serde_json::from_slice(payload)
.map_err(|e| FastPublishError::with_source(FastPublishErrorKind::Serialization, e))?;

Ok(match resp {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dead code: an empty string has 0 dots, so it is already rejected by the != 1 count check above. This branch can never be reached.

// remove the unreachable check
if inbox.is_empty() {
    return Err(FastPublishError::new(FastPublishErrorKind::InvalidInboxShape));
}

The same applies to the subsequent split_once('.').unwrap() — it's safe (count==1 guarantees a dot exists), but an expect("exactly one dot confirmed above") would make the invariant explicit and avoid a silent panic if the logic changes.

Jarema added 3 commits May 8, 2026 14:17
Signed-off-by: Tomasz Pietrek <tomasz@synadia.com>
Signed-off-by: Tomasz Pietrek <tomasz@synadia.com>
Signed-off-by: Tomasz Pietrek <tomasz@synadia.com>
@Jarema
Copy link
Copy Markdown
Collaborator Author

Jarema commented May 8, 2026

@claude review again

@synadia-claude-reviewer
Copy link
Copy Markdown

synadia-claude-reviewer Bot commented May 8, 2026

Claude finished @Jarema's task in 3m 39s —— View job


PR Review: Bump dependencies and implement fast ingest

  • Gathering context
  • Reading changed files
  • Analyzing fast ingest implementation
  • Checking for correctness, security, and performance issues
  • Posting review summary

Summary

All five issues from the previous review have been addressed. No new issues found. The implementation is ready to merge.


Previous issues — all resolved ✓

1. Dead code in validate_inbox_shape — ✅ Fixed
The old inbox.is_empty() guard and .unwrap() are gone. The current implementation at line 334 uses splitn(3, '.') to extract exactly three possible parts, then rejects any inbox where the first or second token is empty or a third token exists. This is correct, clean, and covers the empty-string case naturally.

2. FastPublishErrorKind missing #[non_exhaustive] — ✅ Fixed
#[non_exhaustive] is now at line 110, consistent with BatchPublishErrorKind.

3. Timing-sensitive integration tests — ✅ Fixed

  • flow_err_on_expected_last_sequence_mismatch_fail: removed the 200 ms sleep entirely. The test now uses flow(2).max_outstanding_acks(1) to saturate the outstanding-ack window after one message, so the second add() blocks in wait_for_flow_event_with_pings until the BatchFlowErr arrives from the server — no fixed sleep needed.
  • flow_err_callback_fires_in_ok_mode: replaced the sleep with a bounded polling loop of up to 20 iterations + yield_now(). The loop breaks as soon as the callback fires. This is significantly more reliable on slow CI.

4. InvalidState for config validation at build() — ✅ Fixed
build() now returns FastPublishErrorKind::InvalidConfig for an out-of-range max_outstanding_acks.

5. Raw error code integers in from_api_error — ✅ Fixed
All five fast-ingest error codes are now matched via ErrorCode::BATCH_PUBLISH_DISABLED, ::BATCH_PUBLISH_INVALID_PATTERN, ::BATCH_PUBLISH_INVALID_BATCH_ID, ::BATCH_PUBLISH_UNKNOWN_BATCH_ID, and ::BATCH_PUBLISH_TOO_MANY_INFLIGHT. Searchable, auditable, immune to copy/paste drift.


Changes to batch_publish.rs (bonus cleanup in this PR)

These are net improvements:

  • BatchPublishErrorKind is now #[non_exhaustive] and derives Eq + Hash — consistent with FastPublishErrorKind.
  • closed: bool + BatchClosed error variant: calling add after a prior error now returns a clear error instead of silently sending to the server.
  • InvalidAck variant + ack validation in commit_request: the returned ack is validated (stream name non-empty, batch id matches, count matches sequence). Catches server misbehavior early.
  • reject_protocol_headers centralises header validation: now also blocks Nats-Batch-Commit, Nats-Batch-Id, and Nats-Batch-Sequence (protocol headers the user must not set), and enforces that Nats-Expected-Last-Sequence is only allowed on the first message (prior_sequence == 0) per ADR-50.
  • Six new error kinds with complete from_api_error mappings: BatchPublishTooManyInflight, BatchPublishMissingSeq, BatchPublishInvalidId, BatchPublishInvalidCommit, BatchPublishDuplicateMsgId, BatchPublishMirror. All mapped via named ErrorCode::* constants.
  • Removed unreachable BatchFull variant (the old pre-increment > MAX_BATCH_SIZE check was unreachable after the sequence was already incremented).
  • BatchPubAck.value added for counter-stream support (val field, Option<String>, serde(default)).

No issues found

The stall gate, reply-subject encoding, gap/flow-error state machine, EOB commit, single-message immediate-commit fast path, ping recovery, SharedHandlerState borrow-splitting, and subscriber lifecycle are all correct. The #[non_exhaustive] placement, compile-time Send assertion, saturating arithmetic, and lazy subscription design are all good. Test coverage is thorough.

Jarema and others added 3 commits May 8, 2026 16:50
- New example `bench_normal_vs_fast` compares sync, async (bounded
  inflight), core+js, atomic batch, and fast batch publishing on an R1
  stream (100k messages each).
- `build_reply` now returns `Subject` instead of `String`; the publisher
  stamps the reply via `From<String> for Subject` which is zero-copy,
  saving one `Bytes::copy_from_slice` per fast-publish message.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Tomasz Pietrek <tomasz@synadia.com>
Sweeps `flow` × `max_outstanding_acks` × payload-size for fast batch
publishing on a local R1 stream. On this machine `max_outstanding_acks=3`
beats `=2` by 4-7%; flow size has minimal effect within a max_acks group.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Tomasz Pietrek <tomasz@synadia.com>
Signed-off-by: Tomasz Pietrek <tomasz@synadia.com>
/// Per-message error sent when a batch message fails a server-side check
/// (e.g. expected-last-seq mismatch). Wire tag: `"type":"err"`.
#[derive(Debug, Clone, Deserialize)]
pub(crate) struct BatchFlowErr {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Super nitpick but, for consistency would be?

Suggested change
pub(crate) struct BatchFlowErr {
pub(crate) struct BatchFlowError {

async fn wait_for_flow_event_with_pings(&mut self) -> Result<(), FastPublishError> {
// Split timeout into 3 intervals for up to 2 pings before giving up.
// Clamp floor to 100ms so tiny ack_timeouts don't spin.
let ping_interval = (self.ack_timeout / 3).max(Duration::from_millis(100));
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Magic number? promote toa const maybe?

@Jarema Jarema merged commit f8ce9f3 into main May 11, 2026
26 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants