Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions rust/crates/mpp/src/client/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,13 @@ impl ActiveSession {

/// Record a prepared voucher as accepted by the server.
pub fn record_voucher(&mut self, voucher: &SignedVoucher) -> Result<()> {
if voucher.data.channel_id != self.channel_id_str() {
return Err(Error::Other(format!(
"voucher channel {} does not match active session {}",
voucher.data.channel_id,
self.channel_id_str()
)));
}
let cumulative = voucher
.data
.cumulative
Expand All @@ -169,6 +176,22 @@ impl ActiveSession {
Ok(())
}

/// Reconcile the local watermark to a server-settled cumulative, e.g. the
/// `cumulative` of a `replayed` commit receipt. Advances to `settled` when
/// it is ahead of the current watermark and never regresses, so retrying a
/// delivery the server already accepted (lost-response case) catches the
/// client up without recording the freshly prepared higher voucher.
///
/// When it advances, the request nonce also advances by one, mirroring the
/// `record_voucher` accounting for the delivery the server settled, so the
/// next prepared voucher does not reuse the already-settled nonce.
pub fn reconcile_settled(&mut self, settled: u64) {
if settled > self.cumulative {
self.cumulative = settled;
self.nonce += 1;
}
}
Comment on lines +188 to +193

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 reconcile_settled leaves the nonce counter behind

record_voucher (the fresh-commit path) advances self.nonce to max(current, voucher.nonce). reconcile_settled advances only self.cumulative. In the canonical lost-response scenario this creates an immediate nonce collision: the client starts at nonce=0, prepare_increment(X) emits a voucher with nonce=1, the server settles it at nonce=1, the network drops the response, the client retries, gets Replayed at the correct cumulative, calls reconcile_settled — and is now at cumulative=X, nonce=0. The very next prepare_increment produces another voucher with nonce=1, which the server has already accepted. If the channel program or the session server validates nonce monotonicity (which is the whole point of tracking it client-side), that delivery will be rejected.

reconcile_settled should also advance the nonce, either by accepting it as a second parameter or by having commit_directive call a separate reconcile_nonce(payload.voucher.data.nonce.unwrap_or(0)) helper on the replay branch. The existing regression tests do not exercise a second delivery after a replay, so the failure mode is invisible today.


/// Sign a voucher adding `amount` to the current cumulative.
pub async fn sign_increment(&mut self, amount: u64) -> Result<SignedVoucher> {
self.sign_voucher(self.cumulative + amount).await
Expand Down Expand Up @@ -389,6 +412,53 @@ mod tests {
assert_eq!(s.nonce, 1);
}

#[test]
fn record_voucher_rejects_foreign_channel() {
let mut s = make_session();
let foreign = SignedVoucher {
data: VoucherData {
channel_id: Pubkey::new_unique().to_string(),
cumulative: "100".to_string(),
expires_at: DEFAULT_VOUCHER_EXPIRES_AT,
nonce: Some(1),
},
signature: "sig".to_string(),
};
let err = s.record_voucher(&foreign).unwrap_err();
assert!(err.to_string().contains("does not match active session"));
assert_eq!(s.cumulative, 0);
}

#[test]
fn reconcile_settled_advances_but_never_regresses() {
let mut s = make_session();
// Advancing the watermark also bumps the request nonce.
s.reconcile_settled(100);
assert_eq!(s.cumulative, 100);
assert_eq!(s.nonce, 1);
// A lower settled value (e.g. a stale replayed receipt) does not regress
// and does not touch the nonce.
s.reconcile_settled(40);
assert_eq!(s.cumulative, 100);
assert_eq!(s.nonce, 1);
s.reconcile_settled(250);
assert_eq!(s.cumulative, 250);
assert_eq!(s.nonce, 2);
}

#[tokio::test]
async fn delivery_after_replay_does_not_reuse_settled_nonce() {
// After a lost-response replay reconciles to the settled cumulative, the
// next prepared voucher must carry a fresh nonce, not the one already
// settled by the server.
let mut s = make_session();
let replayed = s.prepare_increment(100).await.unwrap();
let replayed_nonce = replayed.data.nonce.unwrap();
s.reconcile_settled(100); // server settled the lost delivery at 100
let next = s.prepare_increment(50).await.unwrap();
assert!(next.data.nonce.unwrap() > replayed_nonce);
}

#[tokio::test]
async fn sign_voucher_rejects_non_increasing() {
let mut s = make_session();
Expand Down
115 changes: 113 additions & 2 deletions rust/crates/mpp/src/client/session_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::sync::Arc;

use crate::error::{Error, Result};
use crate::protocol::intents::session::{
CommitPayload, CommitReceipt, MeteredEnvelope, MeteringDirective,
CommitPayload, CommitReceipt, CommitStatus, MeteredEnvelope, MeteringDirective,
};

use super::session::ActiveSession;
Expand Down Expand Up @@ -88,7 +88,36 @@ impl<T: CommitTransport> SessionConsumer<T> {
};

let receipt = self.transport.commit(directive, payload.clone()).await?;
self.session.record_voucher(&payload.voucher)?;
// A `replayed` receipt means the server already settled this delivery,
// so its `cumulative` is the authoritative settled position. Recording
// the freshly prepared (higher) voucher would push the local watermark
// past the server's state and let a later close sign for more than was
// agreed; skipping it entirely would instead leave the watermark behind
// the server when the original response was lost, so the next delivery
// signs a non-monotonic cumulative. Reconcile to the receipt cumulative
// on replay (never regressing); record the voucher on a fresh commit.
//
// The server is untrusted: clamp the reported cumulative to the voucher
// just prepared in this call. An honest lost-response replay settles at
// or below it (single-threaded session), so a server reporting a higher
// cumulative cannot push the watermark past what the client actually
// signed — otherwise the next voucher would over-authorize up to the
// on-chain deposit.
if receipt.status == CommitStatus::Replayed {
let settled = receipt
.cumulative
.parse::<u64>()
.map_err(|_| Error::Other("invalid replayed receipt cumulative".to_string()))?;
let prepared = payload
.voucher
.data
.cumulative
.parse::<u64>()
.map_err(|_| Error::Other("invalid prepared voucher cumulative".to_string()))?;
self.session.reconcile_settled(settled.min(prepared));
} else {
self.session.record_voucher(&payload.voucher)?;
}
Ok(receipt)
}

Expand Down Expand Up @@ -318,4 +347,86 @@ mod tests {
assert!(err.to_string().contains("commit failed"));
assert_eq!(consumer.session().cumulative, 0);
}

/// Transport that reports every commit as an idempotent replay settled at a
/// fixed cumulative, regardless of the voucher it was sent.
struct ReplayTransport {
settled_cumulative: String,
}

impl CommitTransport for ReplayTransport {
fn commit<'a>(
&'a self,
directive: &'a MeteringDirective,
_payload: CommitPayload,
) -> Pin<Box<dyn Future<Output = Result<CommitReceipt>> + Send + 'a>> {
Box::pin(async move {
Ok(CommitReceipt {
delivery_id: directive.delivery_id.clone(),
session_id: directive.session_id.clone(),
amount: directive.amount.clone(),
cumulative: self.settled_cumulative.clone(),
status: CommitStatus::Replayed,
})
})
}
}

#[tokio::test]
async fn replayed_receipt_reconciles_to_settled_cumulative() {
let channel_id = Pubkey::new_unique();
let session = ActiveSession::new(channel_id, signer());
let transport = ReplayTransport {
settled_cumulative: "100".to_string(),
};
let mut consumer = SessionConsumer::new(session, transport);
let directive = directive(consumer.session().channel_id_str(), 250);

// Lost-response case: the server already settled this delivery at 100,
// but the client never recorded it (watermark still 0). The freshly
// prepared voucher is for 250; on replay the client must reconcile to
// the server-settled 100, not jump to the prepared 250 and not stay at
// 0 (which would make the next delivery sign a non-monotonic cumulative).
let receipt = consumer.commit_directive(&directive).await.unwrap();
assert_eq!(receipt.status, CommitStatus::Replayed);
assert_eq!(receipt.cumulative, "100");
assert_eq!(consumer.session().cumulative, 100);
}

#[tokio::test]
async fn replayed_receipt_never_regresses_local_watermark() {
let channel_id = Pubkey::new_unique();
let mut session = ActiveSession::new(channel_id, signer());
// Client is already ahead at 300 (later deliveries already settled).
session.reconcile_settled(300);
let transport = ReplayTransport {
settled_cumulative: "100".to_string(),
};
let mut consumer = SessionConsumer::new(session, transport);
let directive = directive(consumer.session().channel_id_str(), 50);

let receipt = consumer.commit_directive(&directive).await.unwrap();
assert_eq!(receipt.status, CommitStatus::Replayed);
assert_eq!(consumer.session().cumulative, 300);
}

#[tokio::test]
async fn replayed_receipt_cumulative_is_clamped_to_prepared_voucher() {
// A malicious/buggy server cannot push the watermark past the voucher
// the client just signed: it reports a replay settled far above the
// prepared cumulative, but the watermark must clamp to the prepared
// value (250), not the inflated server value, so the next voucher does
// not over-authorize.
let channel_id = Pubkey::new_unique();
let session = ActiveSession::new(channel_id, signer());
let transport = ReplayTransport {
settled_cumulative: "1000000".to_string(),
};
let mut consumer = SessionConsumer::new(session, transport);
let directive = directive(consumer.session().channel_id_str(), 250);

let receipt = consumer.commit_directive(&directive).await.unwrap();
assert_eq!(receipt.status, CommitStatus::Replayed);
assert_eq!(consumer.session().cumulative, 250);
}
}
Loading