Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,7 @@ __pycache__/
# kept locally for analysis only, never versioned.
profile*.json.gz
*.json.syms.json

# E2E suite generated run artifacts (regenerated on every run, not versioned)
tests/e2e/report.json
tests/e2e/report.md
4 changes: 4 additions & 0 deletions i18n/en-US/app.ftl
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ btn-host = Host
btn-stop = Stop
status-hosting = hosting…
status-stopped = stopped
# Connect sub-phases (issue #45): shown while a Host is establishing.
status-connect-authorizing = authorizing…
status-connect-relay = connecting relay…
status-connect-ports = forwarding ports…

## Health badges
badge-operational = Operational
Expand Down
80 changes: 74 additions & 6 deletions src/devtunnel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,7 @@ pub fn preflight() -> Preflight {
/// command/parse fails. Best-effort and read-only; safe to call off the UI
/// thread to populate the Settings "Signed in as …" label.
pub fn current_username() -> Option<String> {
let out = command(&bin())
.args(["user", "show", "-j"])
.output()
.ok()?;
let out = command(&bin()).args(["user", "show", "-j"]).output().ok()?;
if !out.status.success() {
return None;
}
Expand Down Expand Up @@ -246,6 +243,25 @@ pub fn is_auth_error(stderr: &str) -> bool {
lower.contains("token") && (lower.contains("invalid") || lower.contains("revoked"))
}

/// Classifies a host connect/port-forward error as **non-recoverable**: retrying
/// with the same inputs can never succeed, so the engine should surface an error
/// and stop instead of looping the reconnect/backoff forever (each cycle re-mints
/// two tokens and re-runs the relay handshake against the service).
///
/// A `400 Bad Request` from the tunnel management API is a request-validation
/// failure — e.g. `add_port` rejected with "the tunnel port protocol cannot be
/// changed" when the forwarded protocol disagrees with the registered one. These
/// are permanent for identical inputs. Auth failures are handled separately by
/// [`is_auth_error`] (they have a recovery path: re-login), so callers should
/// check that first.
#[cfg_attr(not(feature = "hosting"), allow(dead_code))]
pub fn is_fatal_connect_error(stderr: &str) -> bool {
let lower = stderr.to_ascii_lowercase();
lower.contains("400 bad request")
|| lower.contains("cannot be changed")
|| lower.contains("invalid arguments")
}

/// Runs `devtunnel user login` (interactive — opens the system browser and may
/// show a device code) in its own visible console and waits for it to finish.
/// Goes through [`interactive_command`] with inherited stdio — never the silent
Expand Down Expand Up @@ -671,7 +687,11 @@ fn parse_rate_bps(s: &str) -> Option<f64> {

/// Parses a leading integer from a string like `"4 client connections"`.
fn parse_leading_int(s: &str) -> Option<f64> {
let digits: String = s.trim().chars().take_while(|c| c.is_ascii_digit()).collect();
let digits: String = s
.trim()
.chars()
.take_while(|c| c.is_ascii_digit())
.collect();
digits.parse().ok()
}

Expand Down Expand Up @@ -731,14 +751,62 @@ pub fn fetch_rows(loc: &Locale) -> Result<Vec<Row>> {
Ok(rows)
}

/// Fetches the ports of a single tunnel via `devtunnel show <id> -j`, each paired
/// with its configured protocol. Targeted single-subprocess lookup: unlike
/// [`fetch_rows`], it does not enumerate the whole account (`list` + a `show` per
/// tunnel), so hosting one tunnel costs one CLI round-trip regardless of how many
/// tunnels the account holds (issue #44). The protocol is carried through because
/// re-registering a port under a different protocol is rejected by the service
/// and would block hosting (issue #36).
///
/// # Errors
/// Propagates the CLI/JSON failure from the underlying `show` call.
#[cfg_attr(not(feature = "hosting"), allow(dead_code))]
pub fn fetch_tunnel_ports(tunnel_id: &str, loc: &Locale) -> Result<Vec<(u16, String)>> {
let show: ShowResult = run_json(&["show", tunnel_id, "-j"], loc)?;
Ok(tunnel_ports(show))
}

/// Maps a `show -j` result to `(port, protocol)` pairs, dropping ports that are
/// absent (`0`) or outside the valid `u16` range. Pure: split out from
/// [`fetch_tunnel_ports`] so the mapping is unit-tested without the CLI.
#[cfg_attr(not(feature = "hosting"), allow(dead_code))]
fn tunnel_ports(show: ShowResult) -> Vec<(u16, String)> {
show.tunnel
.ports
.into_iter()
.filter(|p| p.port_number > 0)
.filter_map(|p| u16::try_from(p.port_number).ok().map(|n| (n, p.protocol)))
.collect()
}

#[cfg(test)]
mod tests {
use super::{
anonymous_ace_args, classify_anonymous_access, classify_install_result, classify_user_show,
is_auth_error, parse_leading_int, parse_rate_bps, parse_size_bytes, sanitize_tunnel_id,
update_expiration_args, InstallOutcome,
tunnel_ports, update_expiration_args, InstallOutcome, ShowResult,
};

#[test]
fn tunnel_ports_filters_zero_and_preserves_protocol() {
// `show -j` of one tunnel: a plain-http port, an https port, and an
// unconfigured (`0`) entry that must be dropped.
let json = r#"{ "tunnel": { "tunnelId": "x", "ports": [
{ "portNumber": 3000, "protocol": "http" },
{ "portNumber": 8443, "protocol": "https" },
{ "portNumber": 0, "protocol": "auto" }
] } }"#;
let show: ShowResult = serde_json::from_str(json).expect("valid show JSON");
assert_eq!(
tunnel_ports(show),
vec![
(3000u16, "http".to_string()),
(8443u16, "https".to_string())
]
);
}

#[test]
fn parse_size_bytes_handles_units_and_locales() {
assert_eq!(parse_size_bytes("4402 KB"), Some(4402.0 * 1024.0));
Expand Down
195 changes: 195 additions & 0 deletions src/headless.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
//! Headless host runner — a diagnostic/test entrypoint (no GUI, no tray) used by
//! the blackbox E2E resilience harness in `tests/e2e/`.
//!
//! It drives the **production** host engine (`host::spawn` →
//! `engine::host_group` → the keep-alive driver), so the harness exercises the
//! real connect / keep-alive / reconnect path rather than a stand-in. It is
//! activated when `DEVTUNNEL_HEADLESS_HOST=<tunnel-id>[,<tunnel-id>…]` is set;
//! `main` returns through here before building any UI.
//!
//! Observability: every [`host::HostEvent`] is written as one JSON line on
//! stdout (logs stay on stderr via the capturing logger), so an external process
//! can observe state transitions deterministically. Control: it reads simple
//! line commands on stdin — `host <id>` (re-host), `stop <id>`, `stop` (all
//! groups), `drop <id>` (force a relay drop + reconnect without tearing the
//! group down — exercises the reconnect / token-reuse path of issue #47
//! deterministically, no firewall/admin needed), and `quit` (stop all and exit).
//! EOF on stdin is treated as `quit`.
//!
//! Only the `--features hosting` build has a real engine; the default build's
//! `NoopHost` makes this a no-op, which keeps the module compiling everywhere.

use std::io::{BufRead, Write};
use std::time::{Duration, Instant};

use crate::host::{self, HostCommand, HostEvent, HostState};

/// A control command parsed from stdin.
enum Ctl {
/// (Re)start hosting one group by Real Tunnel ID (used to re-host after a
/// `stop`, exercising a clean teardown → reconnect cycle).
Host(String),
/// Stop one group by Real Tunnel ID.
Stop(String),
/// Force one group's relay to drop and reconnect without tearing it down
/// (exercises the real reconnect / token-reuse path; issue #47).
Drop(String),
/// Stop every hosted group.
StopAll,
/// Stop everything and exit.
Quit,
}

/// Runs the headless host loop for the comma-separated `ids_csv`. Returns once a
/// `quit` command (or stdin EOF) is received and the engine has been asked to
/// stop every group.
pub fn run(ids_csv: &str) -> anyhow::Result<()> {
let ids: Vec<String> = ids_csv
.split(',')
.map(str::trim)
.filter(|s| !s.is_empty())
.map(str::to_owned)
.collect();
if ids.is_empty() {
anyhow::bail!("DEVTUNNEL_HEADLESS_HOST is set but lists no tunnel ids");
}

let started = Instant::now();
let (evt_tx, evt_rx) = std::sync::mpsc::channel::<HostEvent>();
let host = host::spawn(evt_tx);

for id in &ids {
host.send(HostCommand::Host {
tunnel_id: id.clone(),
});
}
emit_line(&serde_json::json!({
"elapsed_ms": started.elapsed().as_millis() as u64,
"event": "started",
"tunnel_ids": ids,
}));

// Stdin command reader → control channel. A dedicated thread keeps the main
// thread free to drain host events without blocking on a stdin read.
let (ctl_tx, ctl_rx) = std::sync::mpsc::channel::<Ctl>();
std::thread::spawn(move || {
let stdin = std::io::stdin();
for line in stdin.lock().lines() {
let Ok(line) = line else { break };
let line = line.trim();
let cmd = if line == "quit" || line == "exit" {
Ctl::Quit
} else if line == "stop" {
Ctl::StopAll
} else if let Some(rest) = line.strip_prefix("stop ") {
Ctl::Stop(rest.trim().to_owned())
} else if let Some(rest) = line.strip_prefix("drop ") {
Ctl::Drop(rest.trim().to_owned())
} else if let Some(rest) = line.strip_prefix("host ") {
Ctl::Host(rest.trim().to_owned())
} else {
continue;
};
if ctl_tx.send(cmd).is_err() {
return;
}
}
// EOF on stdin → ask the main loop to quit.
let _ = ctl_tx.send(Ctl::Quit);
});

// Main loop: interleave host events (printed as JSON) with control commands.
// Poll the control channel with a short timeout so host events never starve.
loop {
loop {
match evt_rx.try_recv() {
Ok(evt) => emit_line(&event_json(started, &evt)),
Err(std::sync::mpsc::TryRecvError::Empty) => break,
// The engine thread is gone; nothing more will arrive.
Err(std::sync::mpsc::TryRecvError::Disconnected) => return Ok(()),
}
}
match ctl_rx.recv_timeout(Duration::from_millis(100)) {
Ok(Ctl::Host(id)) => host.send(HostCommand::Host { tunnel_id: id }),
Ok(Ctl::Stop(id)) => host.send(HostCommand::Stop { tunnel_id: id }),
Ok(Ctl::Drop(id)) => host.send(HostCommand::DropRelay { tunnel_id: id }),
Ok(Ctl::StopAll) => stop_all(host.as_ref(), &ids),
Ok(Ctl::Quit) => {
stop_all(host.as_ref(), &ids);
// Give the engine a moment to emit the trailing `Stopped` events
// before exiting, so the harness sees a clean teardown.
std::thread::sleep(Duration::from_millis(300));
while let Ok(evt) = evt_rx.try_recv() {
emit_line(&event_json(started, &evt));
}
return Ok(());
}
// No control input this tick: loop back and drain events again.
Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {}
// The reader thread exited without a final Quit (should not happen);
// keep draining events until the engine disconnects.
Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => {}
}
}
}

/// Sends `Stop` for every group id.
fn stop_all(host: &dyn host::TunnelHost, ids: &[String]) {
for id in ids {
host.send(HostCommand::Stop {
tunnel_id: id.clone(),
});
}
}

/// Renders one [`HostEvent`] as the JSON line emitted on stdout.
fn event_json(started: Instant, evt: &HostEvent) -> serde_json::Value {
let elapsed_ms = started.elapsed().as_millis() as u64;
match evt {
HostEvent::State { tunnel_id, state } => {
let (name, message) = match state {
HostState::Idle => ("Idle", None),
HostState::Connecting => ("Connecting", None),
HostState::Hosting => ("Hosting", None),
HostState::Reconnecting => ("Reconnecting", None),
HostState::Stopped => ("Stopped", None),
HostState::Error(m) => ("Error", Some(m.clone())),
};
serde_json::json!({
"elapsed_ms": elapsed_ms,
"event": "state",
"tunnel_id": tunnel_id,
"state": name,
"message": message,
})
}
HostEvent::Progress { tunnel_id, phase } => {
// Additive to the `state` stream (issue #45): the coarse Connecting /
// Hosting transitions still fire, so a harness keyed on those is
// unaffected; this just exposes the sub-phase for finer diagnostics.
let phase = match phase {
host::ConnectPhase::Authorizing => "authorizing",
host::ConnectPhase::ConnectingRelay => "connecting_relay",
host::ConnectPhase::ForwardingPorts => "forwarding_ports",
};
serde_json::json!({
"elapsed_ms": elapsed_ms,
"event": "progress",
"tunnel_id": tunnel_id,
"phase": phase,
})
}
HostEvent::ReloginRequired { tunnel_id } => serde_json::json!({
"elapsed_ms": elapsed_ms,
"event": "relogin_required",
"tunnel_id": tunnel_id,
}),
}
}

/// Writes one JSON value as a line on stdout and flushes immediately so the
/// harness observes events in real time.
fn emit_line(v: &serde_json::Value) {
println!("{v}");
let _ = std::io::stdout().flush();
}
Loading