From a9c6a50ab9686254e9f62f9f3ec2bd03be6c995e Mon Sep 17 00:00:00 2001 From: EfeDurmaz16 Date: Tue, 9 Jun 2026 01:36:40 +0300 Subject: [PATCH 1/3] fix(rust): reconcile session watermark on a replayed commit; guard foreign vouchers On a replayed CommitReceipt the server has already settled the delivery, so its cumulative is authoritative. SessionConsumer::commit_directive now reconciles the local watermark to that cumulative (advancing when behind, e.g. a lost response, and never regressing) instead of recording the freshly prepared higher voucher, which would let a later channel close sign for more than was settled. ActiveSession::record_voucher also rejects a voucher whose channel does not match the active session. Adds reconcile_settled plus regression tests for reconcile, no-regress, and the foreign-channel guard. Surfaced by Greptile/Codex on the Go and Python session ports (#160, #161). --- rust/crates/mpp/src/client/session.rs | 47 +++++++++++ .../crates/mpp/src/client/session_consumer.rs | 82 ++++++++++++++++++- 2 files changed, 127 insertions(+), 2 deletions(-) diff --git a/rust/crates/mpp/src/client/session.rs b/rust/crates/mpp/src/client/session.rs index a0f254667..12b9e220c 100644 --- a/rust/crates/mpp/src/client/session.rs +++ b/rust/crates/mpp/src/client/session.rs @@ -153,6 +153,13 @@ impl ActiveSession { /// Record a prepared voucher as accepted by the server. pub fn record_voucher(&mut self, voucher: &SignedVoucher) -> Result<()> { + if voucher.data.channel_id != self.channel_id_str() { + return Err(Error::Other(format!( + "voucher channel {} does not match active session {}", + voucher.data.channel_id, + self.channel_id_str() + ))); + } let cumulative = voucher .data .cumulative @@ -169,6 +176,17 @@ impl ActiveSession { Ok(()) } + /// Reconcile the local watermark to a server-settled cumulative, e.g. the + /// `cumulative` of a `replayed` commit receipt. Advances to `settled` when + /// it is ahead of the current watermark and never regresses, so retrying a + /// delivery the server already accepted (lost-response case) catches the + /// client up without recording the freshly prepared higher voucher. + pub fn reconcile_settled(&mut self, settled: u64) { + if settled > self.cumulative { + self.cumulative = settled; + } + } + /// Sign a voucher adding `amount` to the current cumulative. pub async fn sign_increment(&mut self, amount: u64) -> Result { self.sign_voucher(self.cumulative + amount).await @@ -389,6 +407,35 @@ mod tests { assert_eq!(s.nonce, 1); } + #[test] + fn record_voucher_rejects_foreign_channel() { + let mut s = make_session(); + let foreign = SignedVoucher { + data: VoucherData { + channel_id: Pubkey::new_unique().to_string(), + cumulative: "100".to_string(), + expires_at: DEFAULT_VOUCHER_EXPIRES_AT, + nonce: Some(1), + }, + signature: "sig".to_string(), + }; + let err = s.record_voucher(&foreign).unwrap_err(); + assert!(err.to_string().contains("does not match active session")); + assert_eq!(s.cumulative, 0); + } + + #[test] + fn reconcile_settled_advances_but_never_regresses() { + let mut s = make_session(); + s.reconcile_settled(100); + assert_eq!(s.cumulative, 100); + // A lower settled value (e.g. a stale replayed receipt) does not regress. + s.reconcile_settled(40); + assert_eq!(s.cumulative, 100); + s.reconcile_settled(250); + assert_eq!(s.cumulative, 250); + } + #[tokio::test] async fn sign_voucher_rejects_non_increasing() { let mut s = make_session(); diff --git a/rust/crates/mpp/src/client/session_consumer.rs b/rust/crates/mpp/src/client/session_consumer.rs index d318f5948..b0c7dfd1e 100644 --- a/rust/crates/mpp/src/client/session_consumer.rs +++ b/rust/crates/mpp/src/client/session_consumer.rs @@ -10,7 +10,7 @@ use std::sync::Arc; use crate::error::{Error, Result}; use crate::protocol::intents::session::{ - CommitPayload, CommitReceipt, MeteredEnvelope, MeteringDirective, + CommitPayload, CommitReceipt, CommitStatus, MeteredEnvelope, MeteringDirective, }; use super::session::ActiveSession; @@ -88,7 +88,23 @@ impl SessionConsumer { }; let receipt = self.transport.commit(directive, payload.clone()).await?; - self.session.record_voucher(&payload.voucher)?; + // A `replayed` receipt means the server already settled this delivery, + // so its `cumulative` is the authoritative settled position. Recording + // the freshly prepared (higher) voucher would push the local watermark + // past the server's state and let a later close sign for more than was + // agreed; skipping it entirely would instead leave the watermark behind + // the server when the original response was lost, so the next delivery + // signs a non-monotonic cumulative. Reconcile to the receipt cumulative + // on replay (never regressing); record the voucher on a fresh commit. + if receipt.status == CommitStatus::Replayed { + let settled = receipt + .cumulative + .parse::() + .map_err(|_| Error::Other("invalid replayed receipt cumulative".to_string()))?; + self.session.reconcile_settled(settled); + } else { + self.session.record_voucher(&payload.voucher)?; + } Ok(receipt) } @@ -318,4 +334,66 @@ mod tests { assert!(err.to_string().contains("commit failed")); assert_eq!(consumer.session().cumulative, 0); } + + /// Transport that reports every commit as an idempotent replay settled at a + /// fixed cumulative, regardless of the voucher it was sent. + struct ReplayTransport { + settled_cumulative: String, + } + + impl CommitTransport for ReplayTransport { + fn commit<'a>( + &'a self, + directive: &'a MeteringDirective, + _payload: CommitPayload, + ) -> Pin> + Send + 'a>> { + Box::pin(async move { + Ok(CommitReceipt { + delivery_id: directive.delivery_id.clone(), + session_id: directive.session_id.clone(), + amount: directive.amount.clone(), + cumulative: self.settled_cumulative.clone(), + status: CommitStatus::Replayed, + }) + }) + } + } + + #[tokio::test] + async fn replayed_receipt_reconciles_to_settled_cumulative() { + let channel_id = Pubkey::new_unique(); + let session = ActiveSession::new(channel_id, signer()); + let transport = ReplayTransport { + settled_cumulative: "100".to_string(), + }; + let mut consumer = SessionConsumer::new(session, transport); + let directive = directive(consumer.session().channel_id_str(), 250); + + // Lost-response case: the server already settled this delivery at 100, + // but the client never recorded it (watermark still 0). The freshly + // prepared voucher is for 250; on replay the client must reconcile to + // the server-settled 100, not jump to the prepared 250 and not stay at + // 0 (which would make the next delivery sign a non-monotonic cumulative). + let receipt = consumer.commit_directive(&directive).await.unwrap(); + assert_eq!(receipt.status, CommitStatus::Replayed); + assert_eq!(receipt.cumulative, "100"); + assert_eq!(consumer.session().cumulative, 100); + } + + #[tokio::test] + async fn replayed_receipt_never_regresses_local_watermark() { + let channel_id = Pubkey::new_unique(); + let mut session = ActiveSession::new(channel_id, signer()); + // Client is already ahead at 300 (later deliveries already settled). + session.reconcile_settled(300); + let transport = ReplayTransport { + settled_cumulative: "100".to_string(), + }; + let mut consumer = SessionConsumer::new(session, transport); + let directive = directive(consumer.session().channel_id_str(), 50); + + let receipt = consumer.commit_directive(&directive).await.unwrap(); + assert_eq!(receipt.status, CommitStatus::Replayed); + assert_eq!(consumer.session().cumulative, 300); + } } From 76fb2c86b03122fe8a61cc467bba38e403fea1c3 Mon Sep 17 00:00:00 2001 From: EfeDurmaz16 Date: Tue, 9 Jun 2026 02:29:39 +0300 Subject: [PATCH 2/3] fix(rust): advance request nonce when reconciling a replayed settled cumulative Greptile #162 follow-up: reconcile_settled advanced cumulative but left the nonce unchanged, so the first delivery after a lost-response replay reused the nonce the server already settled. Bump the nonce by one whenever reconcile advances (mirroring record_voucher's accounting for that delivery). The nonce is client request-counter metadata (not in the signed 48-byte preimage), so this is a consistency fix, not a fund-safety one. Adds a delivery-after-replay regression test. --- rust/crates/mpp/src/client/session.rs | 25 ++++++++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/rust/crates/mpp/src/client/session.rs b/rust/crates/mpp/src/client/session.rs index 12b9e220c..ab48453b7 100644 --- a/rust/crates/mpp/src/client/session.rs +++ b/rust/crates/mpp/src/client/session.rs @@ -181,9 +181,14 @@ impl ActiveSession { /// it is ahead of the current watermark and never regresses, so retrying a /// delivery the server already accepted (lost-response case) catches the /// client up without recording the freshly prepared higher voucher. + /// + /// When it advances, the request nonce also advances by one, mirroring the + /// `record_voucher` accounting for the delivery the server settled, so the + /// next prepared voucher does not reuse the already-settled nonce. pub fn reconcile_settled(&mut self, settled: u64) { if settled > self.cumulative { self.cumulative = settled; + self.nonce += 1; } } @@ -427,13 +432,31 @@ mod tests { #[test] fn reconcile_settled_advances_but_never_regresses() { let mut s = make_session(); + // Advancing the watermark also bumps the request nonce. s.reconcile_settled(100); assert_eq!(s.cumulative, 100); - // A lower settled value (e.g. a stale replayed receipt) does not regress. + assert_eq!(s.nonce, 1); + // A lower settled value (e.g. a stale replayed receipt) does not regress + // and does not touch the nonce. s.reconcile_settled(40); assert_eq!(s.cumulative, 100); + assert_eq!(s.nonce, 1); s.reconcile_settled(250); assert_eq!(s.cumulative, 250); + assert_eq!(s.nonce, 2); + } + + #[tokio::test] + async fn delivery_after_replay_does_not_reuse_settled_nonce() { + // After a lost-response replay reconciles to the settled cumulative, the + // next prepared voucher must carry a fresh nonce, not the one already + // settled by the server. + let mut s = make_session(); + let replayed = s.prepare_increment(100).await.unwrap(); + let replayed_nonce = replayed.data.nonce.unwrap(); + s.reconcile_settled(100); // server settled the lost delivery at 100 + let next = s.prepare_increment(50).await.unwrap(); + assert!(next.data.nonce.unwrap() > replayed_nonce); } #[tokio::test] From d19638d0ce00acf1b5a662ceab6c122a9896fc3b Mon Sep 17 00:00:00 2001 From: EfeDurmaz16 Date: Tue, 9 Jun 2026 22:36:39 +0300 Subject: [PATCH 3/3] fix(rust): clamp replayed-receipt reconcile to the prepared voucher cumulative The replayed-receipt path reconciled the local watermark to the server-reported cumulative with only a never-regress lower bound and no upper bound. The server is untrusted, so a malicious or buggy server could report a replay settled far above the voucher the client just signed, pushing the watermark up; the next voucher would then over-authorize (capped only by the on-chain deposit). Clamp the reported cumulative to the just-prepared voucher's cumulative: an honest lost-response replay settles at or below it (single-threaded session), so this preserves recovery while closing the over-authorization. Adds a regression test. --- .../crates/mpp/src/client/session_consumer.rs | 35 ++++++++++++++++++- 1 file changed, 34 insertions(+), 1 deletion(-) diff --git a/rust/crates/mpp/src/client/session_consumer.rs b/rust/crates/mpp/src/client/session_consumer.rs index b0c7dfd1e..274d9ea37 100644 --- a/rust/crates/mpp/src/client/session_consumer.rs +++ b/rust/crates/mpp/src/client/session_consumer.rs @@ -96,12 +96,25 @@ impl SessionConsumer { // the server when the original response was lost, so the next delivery // signs a non-monotonic cumulative. Reconcile to the receipt cumulative // on replay (never regressing); record the voucher on a fresh commit. + // + // The server is untrusted: clamp the reported cumulative to the voucher + // just prepared in this call. An honest lost-response replay settles at + // or below it (single-threaded session), so a server reporting a higher + // cumulative cannot push the watermark past what the client actually + // signed — otherwise the next voucher would over-authorize up to the + // on-chain deposit. if receipt.status == CommitStatus::Replayed { let settled = receipt .cumulative .parse::() .map_err(|_| Error::Other("invalid replayed receipt cumulative".to_string()))?; - self.session.reconcile_settled(settled); + let prepared = payload + .voucher + .data + .cumulative + .parse::() + .map_err(|_| Error::Other("invalid prepared voucher cumulative".to_string()))?; + self.session.reconcile_settled(settled.min(prepared)); } else { self.session.record_voucher(&payload.voucher)?; } @@ -396,4 +409,24 @@ mod tests { assert_eq!(receipt.status, CommitStatus::Replayed); assert_eq!(consumer.session().cumulative, 300); } + + #[tokio::test] + async fn replayed_receipt_cumulative_is_clamped_to_prepared_voucher() { + // A malicious/buggy server cannot push the watermark past the voucher + // the client just signed: it reports a replay settled far above the + // prepared cumulative, but the watermark must clamp to the prepared + // value (250), not the inflated server value, so the next voucher does + // not over-authorize. + let channel_id = Pubkey::new_unique(); + let session = ActiveSession::new(channel_id, signer()); + let transport = ReplayTransport { + settled_cumulative: "1000000".to_string(), + }; + let mut consumer = SessionConsumer::new(session, transport); + let directive = directive(consumer.session().channel_id_str(), 250); + + let receipt = consumer.commit_directive(&directive).await.unwrap(); + assert_eq!(receipt.status, CommitStatus::Replayed); + assert_eq!(consumer.session().cumulative, 250); + } }