diff --git a/rust/crates/mpp/src/client/session.rs b/rust/crates/mpp/src/client/session.rs index a0f25466..ab48453b 100644 --- a/rust/crates/mpp/src/client/session.rs +++ b/rust/crates/mpp/src/client/session.rs @@ -153,6 +153,13 @@ impl ActiveSession { /// Record a prepared voucher as accepted by the server. pub fn record_voucher(&mut self, voucher: &SignedVoucher) -> Result<()> { + if voucher.data.channel_id != self.channel_id_str() { + return Err(Error::Other(format!( + "voucher channel {} does not match active session {}", + voucher.data.channel_id, + self.channel_id_str() + ))); + } let cumulative = voucher .data .cumulative @@ -169,6 +176,22 @@ impl ActiveSession { Ok(()) } + /// Reconcile the local watermark to a server-settled cumulative, e.g. the + /// `cumulative` of a `replayed` commit receipt. Advances to `settled` when + /// it is ahead of the current watermark and never regresses, so retrying a + /// delivery the server already accepted (lost-response case) catches the + /// client up without recording the freshly prepared higher voucher. + /// + /// When it advances, the request nonce also advances by one, mirroring the + /// `record_voucher` accounting for the delivery the server settled, so the + /// next prepared voucher does not reuse the already-settled nonce. + pub fn reconcile_settled(&mut self, settled: u64) { + if settled > self.cumulative { + self.cumulative = settled; + self.nonce += 1; + } + } + /// Sign a voucher adding `amount` to the current cumulative. pub async fn sign_increment(&mut self, amount: u64) -> Result { self.sign_voucher(self.cumulative + amount).await @@ -389,6 +412,53 @@ mod tests { assert_eq!(s.nonce, 1); } + #[test] + fn record_voucher_rejects_foreign_channel() { + let mut s = make_session(); + let foreign = SignedVoucher { + data: VoucherData { + channel_id: Pubkey::new_unique().to_string(), + cumulative: "100".to_string(), + expires_at: DEFAULT_VOUCHER_EXPIRES_AT, + nonce: Some(1), + }, + signature: "sig".to_string(), + }; + let err = s.record_voucher(&foreign).unwrap_err(); + assert!(err.to_string().contains("does not match active session")); + assert_eq!(s.cumulative, 0); + } + + #[test] + fn reconcile_settled_advances_but_never_regresses() { + let mut s = make_session(); + // Advancing the watermark also bumps the request nonce. + s.reconcile_settled(100); + assert_eq!(s.cumulative, 100); + assert_eq!(s.nonce, 1); + // A lower settled value (e.g. a stale replayed receipt) does not regress + // and does not touch the nonce. + s.reconcile_settled(40); + assert_eq!(s.cumulative, 100); + assert_eq!(s.nonce, 1); + s.reconcile_settled(250); + assert_eq!(s.cumulative, 250); + assert_eq!(s.nonce, 2); + } + + #[tokio::test] + async fn delivery_after_replay_does_not_reuse_settled_nonce() { + // After a lost-response replay reconciles to the settled cumulative, the + // next prepared voucher must carry a fresh nonce, not the one already + // settled by the server. + let mut s = make_session(); + let replayed = s.prepare_increment(100).await.unwrap(); + let replayed_nonce = replayed.data.nonce.unwrap(); + s.reconcile_settled(100); // server settled the lost delivery at 100 + let next = s.prepare_increment(50).await.unwrap(); + assert!(next.data.nonce.unwrap() > replayed_nonce); + } + #[tokio::test] async fn sign_voucher_rejects_non_increasing() { let mut s = make_session(); diff --git a/rust/crates/mpp/src/client/session_consumer.rs b/rust/crates/mpp/src/client/session_consumer.rs index d318f594..274d9ea3 100644 --- a/rust/crates/mpp/src/client/session_consumer.rs +++ b/rust/crates/mpp/src/client/session_consumer.rs @@ -10,7 +10,7 @@ use std::sync::Arc; use crate::error::{Error, Result}; use crate::protocol::intents::session::{ - CommitPayload, CommitReceipt, MeteredEnvelope, MeteringDirective, + CommitPayload, CommitReceipt, CommitStatus, MeteredEnvelope, MeteringDirective, }; use super::session::ActiveSession; @@ -88,7 +88,36 @@ impl SessionConsumer { }; let receipt = self.transport.commit(directive, payload.clone()).await?; - self.session.record_voucher(&payload.voucher)?; + // A `replayed` receipt means the server already settled this delivery, + // so its `cumulative` is the authoritative settled position. Recording + // the freshly prepared (higher) voucher would push the local watermark + // past the server's state and let a later close sign for more than was + // agreed; skipping it entirely would instead leave the watermark behind + // the server when the original response was lost, so the next delivery + // signs a non-monotonic cumulative. Reconcile to the receipt cumulative + // on replay (never regressing); record the voucher on a fresh commit. + // + // The server is untrusted: clamp the reported cumulative to the voucher + // just prepared in this call. An honest lost-response replay settles at + // or below it (single-threaded session), so a server reporting a higher + // cumulative cannot push the watermark past what the client actually + // signed — otherwise the next voucher would over-authorize up to the + // on-chain deposit. + if receipt.status == CommitStatus::Replayed { + let settled = receipt + .cumulative + .parse::() + .map_err(|_| Error::Other("invalid replayed receipt cumulative".to_string()))?; + let prepared = payload + .voucher + .data + .cumulative + .parse::() + .map_err(|_| Error::Other("invalid prepared voucher cumulative".to_string()))?; + self.session.reconcile_settled(settled.min(prepared)); + } else { + self.session.record_voucher(&payload.voucher)?; + } Ok(receipt) } @@ -318,4 +347,86 @@ mod tests { assert!(err.to_string().contains("commit failed")); assert_eq!(consumer.session().cumulative, 0); } + + /// Transport that reports every commit as an idempotent replay settled at a + /// fixed cumulative, regardless of the voucher it was sent. + struct ReplayTransport { + settled_cumulative: String, + } + + impl CommitTransport for ReplayTransport { + fn commit<'a>( + &'a self, + directive: &'a MeteringDirective, + _payload: CommitPayload, + ) -> Pin> + Send + 'a>> { + Box::pin(async move { + Ok(CommitReceipt { + delivery_id: directive.delivery_id.clone(), + session_id: directive.session_id.clone(), + amount: directive.amount.clone(), + cumulative: self.settled_cumulative.clone(), + status: CommitStatus::Replayed, + }) + }) + } + } + + #[tokio::test] + async fn replayed_receipt_reconciles_to_settled_cumulative() { + let channel_id = Pubkey::new_unique(); + let session = ActiveSession::new(channel_id, signer()); + let transport = ReplayTransport { + settled_cumulative: "100".to_string(), + }; + let mut consumer = SessionConsumer::new(session, transport); + let directive = directive(consumer.session().channel_id_str(), 250); + + // Lost-response case: the server already settled this delivery at 100, + // but the client never recorded it (watermark still 0). The freshly + // prepared voucher is for 250; on replay the client must reconcile to + // the server-settled 100, not jump to the prepared 250 and not stay at + // 0 (which would make the next delivery sign a non-monotonic cumulative). + let receipt = consumer.commit_directive(&directive).await.unwrap(); + assert_eq!(receipt.status, CommitStatus::Replayed); + assert_eq!(receipt.cumulative, "100"); + assert_eq!(consumer.session().cumulative, 100); + } + + #[tokio::test] + async fn replayed_receipt_never_regresses_local_watermark() { + let channel_id = Pubkey::new_unique(); + let mut session = ActiveSession::new(channel_id, signer()); + // Client is already ahead at 300 (later deliveries already settled). + session.reconcile_settled(300); + let transport = ReplayTransport { + settled_cumulative: "100".to_string(), + }; + let mut consumer = SessionConsumer::new(session, transport); + let directive = directive(consumer.session().channel_id_str(), 50); + + let receipt = consumer.commit_directive(&directive).await.unwrap(); + assert_eq!(receipt.status, CommitStatus::Replayed); + assert_eq!(consumer.session().cumulative, 300); + } + + #[tokio::test] + async fn replayed_receipt_cumulative_is_clamped_to_prepared_voucher() { + // A malicious/buggy server cannot push the watermark past the voucher + // the client just signed: it reports a replay settled far above the + // prepared cumulative, but the watermark must clamp to the prepared + // value (250), not the inflated server value, so the next voucher does + // not over-authorize. + let channel_id = Pubkey::new_unique(); + let session = ActiveSession::new(channel_id, signer()); + let transport = ReplayTransport { + settled_cumulative: "1000000".to_string(), + }; + let mut consumer = SessionConsumer::new(session, transport); + let directive = directive(consumer.session().channel_id_str(), 250); + + let receipt = consumer.commit_directive(&directive).await.unwrap(); + assert_eq!(receipt.status, CommitStatus::Replayed); + assert_eq!(consumer.session().cumulative, 250); + } }