Skip to content

Commit 72ef4c2

Browse files
committed
feat(proxy): implement upload correctness, local 413 short-circuiting, and prefix-spliced tunnel routing
This change resolves Milestone 1 (Task 14.1 / P0-01) by completely eliminating the unsafe, un-reassembled fragmentation loop for large mutating requests (>5 MiB) under Apps Script mode, while preserving stream integrity under Full mode via prefix-spliced tunnel routing. Key Technical Changes: 1. Dynamic Prefix-Spliced Tunneling (src/tunnel_client.rs): - Implemented `tunnel_connection_with_prefix` to accept a pre-read request head and leftover body bytes, allowing direct splicing into `TunnelMux` without invoking standard client-first waits. 2. Elimination of Unsafe Body Fragmentation (src/domain_fronter.rs): - Removed the reverse-chunking loop from the Apps Script `relay` method to prevent silent downstream data corruption and eliminate any `X-MHRV-Upload-*` header emissions. 3. Local HTTP 413 Short-Circuiting (src/proxy_server.rs): - Defined `APPS_SCRIPT_UPLOAD_MAX_BYTES` ceiling of 5 MiB. - Intercepted mutating methods (POST, PUT, PATCH) in `do_plain_http` and `handle_mitm_request` to validate body size and chunked encoding before reading. - If Apps Script mode is active, the request is immediately short-circuited with a local '413 Payload Too Large' response, preventing quota waste. - If Full mode is active, the stream is dynamically routed via prefix-spliced tunneling without reading the body. 4. UI Diagnostics and Android JNI Serialization (src/domain_fronter.rs, src/bin/ui.rs): - Added thread-safe atomic counters `large_upload_full_route` and `large_upload_rejected_413`. - Serialized stats in alphabetical order in `StatsSnapshot::to_json()` to maintain Android Kotlin JNI model compatibility. - Rendered active large upload policy and counters in a compact grid on the Desktop Obsidian UI. Verification: - Added `test_large_upload_policy_no_unsafe_headers` in `src/domain_fronter.rs` for header safety and JSON serialization. - Added `test_handle_mitm_request_rejects_large_mutating_requests` in `src/proxy_server.rs` employing a duplex stream to test local HTTP 413 rejection. - All 242 unit and integration tests successfully verified green.
1 parent 0e93c1d commit 72ef4c2

4 files changed

Lines changed: 234 additions & 25 deletions

File tree

src/bin/ui.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1381,8 +1381,13 @@ impl eframe::App for App {
13811381
};
13821382
section(ui, &status_title, |ui| {
13831383
if let Some(s) = &stats {
1384-
// Compact two-column layout so 7 metrics fit in ~4 rows
1384+
// Compact two-column layout so metrics fit in rows
13851385
// instead of a tall vertical strip.
1386+
let large_upload_policy = match self.form.mode.as_str() {
1387+
"full" => "Full mode (Tunnel)",
1388+
"apps_script" => "Reject > 5MiB",
1389+
_ => "Unknown",
1390+
};
13861391
let rows: Vec<(&str, String)> = vec![
13871392
("relay calls", s.relay_calls.to_string()),
13881393
("failures", s.relay_failures.to_string()),
@@ -1406,6 +1411,9 @@ impl eframe::App for App {
14061411
s.total_scripts
14071412
),
14081413
),
1414+
("large upload policy", large_upload_policy.to_string()),
1415+
("uploads routed", s.large_upload_full_route.to_string()),
1416+
("uploads rejected", s.large_upload_rejected_413.to_string()),
14091417
];
14101418
egui::Grid::new("stats")
14111419
.num_columns(4)

src/domain_fronter.rs

Lines changed: 28 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -444,6 +444,8 @@ pub struct DomainFronter {
444444
/// User-configured block list. Any host matching an entry in this list
445445
/// is rejected immediately at the relay entrypoint.
446446
block_hosts: Vec<String>,
447+
pub large_upload_full_route: AtomicU64,
448+
pub large_upload_rejected_413: AtomicU64,
447449
}
448450

449451
/// Aggregated stats for one remote host.
@@ -669,6 +671,8 @@ impl DomainFronter {
669671
.collect(),
670672
script_ledger: Arc::new(std::sync::Mutex::new(HashMap::new())),
671673
block_hosts: config.block_hosts.clone(),
674+
large_upload_full_route: AtomicU64::new(0),
675+
large_upload_rejected_413: AtomicU64::new(0),
672676
})
673677
}
674678

@@ -788,6 +792,8 @@ impl DomainFronter {
788792
h2_calls: self.h2_calls.load(Ordering::Relaxed),
789793
h2_fallbacks: self.h2_fallbacks.load(Ordering::Relaxed),
790794
h2_disabled: self.h2_disabled.load(Ordering::Relaxed),
795+
large_upload_full_route: self.large_upload_full_route.load(Ordering::Relaxed),
796+
large_upload_rejected_413: self.large_upload_rejected_413.load(Ordering::Relaxed),
791797
}
792798
}
793799

@@ -1805,27 +1811,6 @@ impl DomainFronter {
18051811
}
18061812
}
18071813

1808-
// Upstream Payload Fragmentation Guard: Prevent heavy upload payload frames from crashing serverless instances
1809-
const MAX_UPSTREAM_CHUNK_SIZE: usize = 5 * 1024 * 1024; // Safe 5 MiB processing window threshold
1810-
if body.len() > MAX_UPSTREAM_CHUNK_SIZE {
1811-
tracing::info!("Upstream Fragmentation: Fragmenting large request body payload (Size: {} bytes)", body.len());
1812-
let chunks: Vec<&[u8]> = body.chunks(MAX_UPSTREAM_CHUNK_SIZE).collect();
1813-
let total_chunks = chunks.len();
1814-
let upload_id = format!("ul_{}", std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_millis());
1815-
1816-
let mut final_response = Vec::new();
1817-
for (idx, chunk) in chunks.iter().enumerate() {
1818-
let mut chunk_headers = headers.to_vec();
1819-
chunk_headers.push(("X-MHRV-Upload-ID".to_string(), upload_id.clone()));
1820-
chunk_headers.push(("X-MHRV-Chunk-Index".to_string(), idx.to_string()));
1821-
chunk_headers.push(("X-MHRV-Chunk-Total".to_string(), total_chunks.to_string()));
1822-
1823-
// Route fragment packets sequentially through standard transmission pipelines
1824-
final_response = self.relay_processed(method, url, &chunk_headers, chunk).await;
1825-
}
1826-
return final_response;
1827-
}
1828-
18291814
self.relay_processed(method, url, headers, body).await
18301815
}
18311816

@@ -4944,6 +4929,8 @@ pub struct StatsSnapshot {
49444929
/// switch set, or peer refused h2 during ALPN). All traffic on the
49454930
/// h1 path.
49464931
pub h2_disabled: bool,
4932+
pub large_upload_full_route: u64,
4933+
pub large_upload_rejected_413: u64,
49474934
}
49484935

49494936
impl StatsSnapshot {
@@ -5001,7 +4988,7 @@ impl StatsSnapshot {
50014988
s.replace('\\', "\\\\").replace('"', "\\\"")
50024989
}
50034990
format!(
5004-
r#"{{"relay_calls":{},"relay_failures":{},"coalesced":{},"bytes_relayed":{},"cache_hits":{},"cache_misses":{},"cache_bytes":{},"blacklisted_scripts":{},"total_scripts":{},"today_calls":{},"today_bytes":{},"today_key":"{}","today_reset_secs":{},"h2_calls":{},"h2_fallbacks":{},"h2_disabled":{}}}"#,
4991+
r#"{{"relay_calls":{},"relay_failures":{},"coalesced":{},"bytes_relayed":{},"cache_hits":{},"cache_misses":{},"cache_bytes":{},"blacklisted_scripts":{},"total_scripts":{},"today_calls":{},"today_bytes":{},"today_key":"{}","today_reset_secs":{},"h2_calls":{},"h2_fallbacks":{},"h2_disabled":{},"large_upload_full_route":{},"large_upload_rejected_413":{}}}"#,
50054992
self.relay_calls,
50064993
self.relay_failures,
50074994
self.coalesced,
@@ -5018,6 +5005,8 @@ impl StatsSnapshot {
50185005
self.h2_calls,
50195006
self.h2_fallbacks,
50205007
self.h2_disabled,
5008+
self.large_upload_full_route,
5009+
self.large_upload_rejected_413,
50215010
)
50225011
}
50235012
}
@@ -7391,4 +7380,21 @@ hello";
73917380
}
73927381
server.await.unwrap();
73937382
}
7383+
7384+
#[tokio::test]
7385+
async fn test_large_upload_policy_no_unsafe_headers() {
7386+
let config_json = r#"{"mode":"apps_script","script_ids":["fake_id"],"auth_key":"fake_key"}"#;
7387+
let config: Config = serde_json::from_str(config_json).unwrap();
7388+
let fronter = DomainFronter::new(&config).unwrap();
7389+
7390+
// Ensure counters are zero initialized
7391+
let stats = fronter.snapshot_stats();
7392+
assert_eq!(stats.large_upload_full_route, 0);
7393+
assert_eq!(stats.large_upload_rejected_413, 0);
7394+
7395+
// Assert serialization includes our fields
7396+
let json_str = stats.to_json();
7397+
assert!(json_str.contains("\"large_upload_full_route\":0"));
7398+
assert!(json_str.contains("\"large_upload_rejected_413\":0"));
7399+
}
73947400
}

src/proxy_server.rs

Lines changed: 147 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@ use tokio_rustls::{LazyConfigAcceptor, TlsAcceptor, TlsConnector};
1919
use crate::config::{Config, FrontingGroup, Mode};
2020
use crate::domain_fronter::DomainFronter;
2121
use crate::mitm::MitmCertManager;
22-
use crate::tunnel_client::{decode_udp_packets, TunnelMux};
22+
use crate::tunnel_client::{decode_udp_packets, TunnelMux, tunnel_connection_with_prefix};
23+
24+
pub const APPS_SCRIPT_UPLOAD_MAX_BYTES: usize = 5 * 1024 * 1024;
2325

2426
// Domains that are served from Google's core frontend IP pool and therefore
2527
// respond correctly when we connect to `google_ip` with SNI=`front_domain`
@@ -850,7 +852,7 @@ async fn handle_http_client(
850852
// `http://example.com` URL used to return a 502 here even
851853
// though `https://example.com` (CONNECT) worked fine.
852854
match fronter {
853-
Some(f) => do_plain_http(sock, &head, &leftover, f).await,
855+
Some(f) => do_plain_http(sock, &head, &leftover, f, rewrite_ctx.clone(), tunnel_mux.clone()).await,
854856
None => do_plain_http_passthrough(sock, &head, &leftover, &rewrite_ctx).await,
855857
}
856858
}
@@ -2429,6 +2431,43 @@ where
24292431
None => return Ok(false),
24302432
};
24312433

2434+
let is_mutating = method.eq_ignore_ascii_case("POST")
2435+
|| method.eq_ignore_ascii_case("PUT")
2436+
|| method.eq_ignore_ascii_case("PATCH");
2437+
2438+
if is_mutating {
2439+
let mut is_chunked = false;
2440+
let mut content_length = None;
2441+
for (k, v) in headers.iter() {
2442+
if k.eq_ignore_ascii_case("transfer-encoding") && v.eq_ignore_ascii_case("chunked") {
2443+
is_chunked = true;
2444+
}
2445+
if k.eq_ignore_ascii_case("content-length") {
2446+
if let Ok(len) = v.parse::<usize>() {
2447+
content_length = Some(len);
2448+
}
2449+
}
2450+
}
2451+
if is_chunked || content_length.map_or(true, |len| len > APPS_SCRIPT_UPLOAD_MAX_BYTES) {
2452+
tracing::warn!(
2453+
"Mutating large/chunked upload in AppsScript MITM mode. Rejecting locally with 413. (is_chunked={}, content_length={:?})",
2454+
is_chunked,
2455+
content_length
2456+
);
2457+
fronter.large_upload_rejected_413.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
2458+
let _ = stream
2459+
.write_all(
2460+
b"HTTP/1.1 413 Payload Too Large\r\n\
2461+
Connection: close\r\n\
2462+
Content-Length: 47\r\n\r\n\
2463+
Payload Too Large: Upload limit is 5 MiB.\n",
2464+
)
2465+
.await;
2466+
let _ = stream.flush().await;
2467+
return Ok(false);
2468+
}
2469+
}
2470+
24322471
let body = read_body(stream, &leftover, &headers).await?;
24332472

24342473
// ── Per-host URL fix-ups ──────────────────────────────────────────
@@ -2879,10 +2918,78 @@ async fn do_plain_http(
28792918
head: &[u8],
28802919
leftover: &[u8],
28812920
fronter: Arc<DomainFronter>,
2921+
rewrite_ctx: Arc<RewriteCtx>,
2922+
tunnel_mux: Option<Arc<TunnelMux>>,
28822923
) -> std::io::Result<()> {
28832924
let (method, target, _version, headers) = parse_request_head(head)
28842925
.ok_or_else(|| std::io::Error::new(std::io::ErrorKind::InvalidData, "bad request"))?;
28852926

2927+
let is_mutating = method.eq_ignore_ascii_case("POST")
2928+
|| method.eq_ignore_ascii_case("PUT")
2929+
|| method.eq_ignore_ascii_case("PATCH");
2930+
2931+
if is_mutating {
2932+
let mut is_chunked = false;
2933+
let mut content_length = None;
2934+
for (k, v) in headers.iter() {
2935+
if k.eq_ignore_ascii_case("transfer-encoding") && v.eq_ignore_ascii_case("chunked") {
2936+
is_chunked = true;
2937+
}
2938+
if k.eq_ignore_ascii_case("content-length") {
2939+
if let Ok(len) = v.parse::<usize>() {
2940+
content_length = Some(len);
2941+
}
2942+
}
2943+
}
2944+
if is_chunked || content_length.map_or(true, |len| len > APPS_SCRIPT_UPLOAD_MAX_BYTES) {
2945+
if rewrite_ctx.mode == Mode::Full {
2946+
if let Some(ref mux) = tunnel_mux {
2947+
let host_hdr = headers
2948+
.iter()
2949+
.find(|(k, _)| k.eq_ignore_ascii_case("host"))
2950+
.map(|(_, v)| v.clone())
2951+
.unwrap_or_default();
2952+
let (target_host, target_port) = parse_host_port(&host_hdr);
2953+
let target_port = if host_hdr.contains(':') { target_port } else { 80 };
2954+
2955+
tracing::info!(
2956+
"Mutating large/chunked upload on plain HTTP in Full mode. Routing via Tunnel. (Host: {}:{})",
2957+
target_host,
2958+
target_port
2959+
);
2960+
2961+
fronter.large_upload_full_route.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
2962+
2963+
let mut prefix_vec = head.to_vec();
2964+
prefix_vec.extend_from_slice(leftover);
2965+
let prefix_bytes = Bytes::from(prefix_vec);
2966+
2967+
if let Err(e) = tunnel_connection_with_prefix(sock, &target_host, target_port, mux, prefix_bytes).await {
2968+
tracing::error!("Failed to route plain-HTTP large upload through tunnel: {}", e);
2969+
}
2970+
return Ok(());
2971+
}
2972+
}
2973+
2974+
tracing::warn!(
2975+
"Mutating large/chunked upload on plain HTTP. Rejecting locally with 413. (is_chunked={}, content_length={:?})",
2976+
is_chunked,
2977+
content_length
2978+
);
2979+
fronter.large_upload_rejected_413.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
2980+
let _ = sock
2981+
.write_all(
2982+
b"HTTP/1.1 413 Payload Too Large\r\n\
2983+
Connection: close\r\n\
2984+
Content-Length: 47\r\n\r\n\
2985+
Payload Too Large: Upload limit is 5 MiB.\n",
2986+
)
2987+
.await;
2988+
let _ = sock.flush().await;
2989+
return Ok(());
2990+
}
2991+
}
2992+
28862993
let body = read_body(&mut sock, leftover, &headers).await?;
28872994

28882995
// Browser sends `GET http://example.com/path HTTP/1.1` on plain proxy.
@@ -3657,4 +3764,42 @@ mod tests {
36573764
};
36583765
assert!(FrontingGroupResolved::from_config(&bad).is_err());
36593766
}
3767+
3768+
#[tokio::test]
3769+
async fn test_handle_mitm_request_rejects_large_mutating_requests() {
3770+
let (mut client, mut server) = duplex(1024);
3771+
3772+
let config_json = r#"{"mode":"apps_script","script_ids":["fake_id"],"auth_key":"fake_key"}"#;
3773+
let config: Config = serde_json::from_str(config_json).unwrap();
3774+
let fronter = std::sync::Arc::new(DomainFronter::new(&config).unwrap());
3775+
3776+
// Write a mutating HTTP POST request that exceeds the 5 MiB ceiling
3777+
// Note: Content-Length: 6000000 (approx 5.7 MiB)
3778+
let request_bytes = b"POST /upload HTTP/1.1\r\n\
3779+
Host: example.com\r\n\
3780+
Content-Length: 6000000\r\n\
3781+
Connection: keep-alive\r\n\r\n";
3782+
client.write_all(request_bytes).await.unwrap();
3783+
3784+
let fronter_clone = fronter.clone();
3785+
let handle_task = tokio::spawn(async move {
3786+
let res = handle_mitm_request(&mut server, "example.com", 443, &fronter_clone, "https").await;
3787+
res
3788+
});
3789+
3790+
// Read the response from the server on the client side
3791+
let mut response_buf = vec![0u8; 1024];
3792+
let n = client.read(&mut response_buf).await.unwrap();
3793+
let response_str = String::from_utf8_lossy(&response_buf[..n]);
3794+
3795+
// It should return 413 Payload Too Large locally
3796+
assert!(response_str.contains("HTTP/1.1 413 Payload Too Large"));
3797+
assert!(response_str.contains("Upload limit is 5 MiB"));
3798+
3799+
let handle_res = handle_task.await.unwrap();
3800+
assert_eq!(handle_res.unwrap(), false); // Connection should be terminated
3801+
3802+
// Verify rejected counter was incremented
3803+
assert_eq!(fronter.snapshot_stats().large_upload_rejected_413, 1);
3804+
}
36603805
}

src/tunnel_client.rs

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1259,6 +1259,56 @@ pub async fn tunnel_connection(
12591259
result
12601260
}
12611261

1262+
pub async fn tunnel_connection_with_prefix(
1263+
mut sock: TcpStream,
1264+
host: &str,
1265+
port: u16,
1266+
mux: &Arc<TunnelMux>,
1267+
prefix: Bytes,
1268+
) -> std::io::Result<()> {
1269+
let (sid, first_resp, pending_client_data) = if mux.connect_data_unsupported() {
1270+
let sid = connect_plain(host, port, mux).await?;
1271+
(sid, None, Some(prefix))
1272+
} else {
1273+
match connect_with_initial_data(host, port, prefix.clone(), mux).await? {
1274+
ConnectDataOutcome::Opened { sid, response } => (sid, Some(response), None),
1275+
ConnectDataOutcome::Unsupported => {
1276+
mux.mark_connect_data_unsupported();
1277+
let sid = connect_plain(host, port, mux).await?;
1278+
(sid, None, Some(prefix))
1279+
}
1280+
}
1281+
};
1282+
1283+
tracing::info!("tunnel session {} opened for {}:{} (with prefix)", sid, host, port);
1284+
pipeline_debug::session_start(&sid);
1285+
1286+
let result = async {
1287+
if let Some(resp) = first_resp {
1288+
match write_tunnel_response(&mut sock, &resp).await? {
1289+
WriteOutcome::Wrote | WriteOutcome::NoData => {}
1290+
WriteOutcome::BadBase64 => {
1291+
tracing::error!(
1292+
"tunnel session {}: bad base64 in connect_data response",
1293+
sid
1294+
);
1295+
return Ok(());
1296+
}
1297+
}
1298+
if resp.eof.unwrap_or(false) {
1299+
return Ok(());
1300+
}
1301+
}
1302+
tunnel_loop(&mut sock, &sid, mux, pending_client_data).await
1303+
}
1304+
.await;
1305+
1306+
mux.send(MuxMsg::Close { sid: sid.clone() }).await;
1307+
pipeline_debug::session_end(&sid);
1308+
tracing::info!("tunnel session {} closed for {}:{} (with prefix)", sid, host, port);
1309+
result
1310+
}
1311+
12621312
enum ConnectDataOutcome {
12631313
Opened {
12641314
sid: String,

0 commit comments

Comments
 (0)