Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,12 @@ func main() {
}

srv, err := exit.New(exit.Config{
ListenAddr: cfg.ListenAddr,
AESKeyHex: cfg.AESKeyHex,
DebugTiming: cfg.DebugTiming,
UpstreamProxy: cfg.UpstreamProxy,
Version: version,
ListenAddr: cfg.ListenAddr,
AESKeyHex: cfg.AESKeyHex,
DebugTiming: cfg.DebugTiming,
UpstreamProxy: cfg.UpstreamProxy,
InitialResponseBytesPreEncode: cfg.InitialResponseBytesPreEncode,
Version: version,
})
if err != nil {
log.Fatalf("exit: %v", err)
Expand Down
25 changes: 17 additions & 8 deletions internal/config/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@ import (

// Server is the VPS exit server config.
type Server struct {
ListenAddr string
AESKeyHex string
DebugTiming bool
UpstreamProxy string // optional socks5://host:port; when set, all outbound dials go through this proxy
ListenAddr string
AESKeyHex string
DebugTiming bool
UpstreamProxy string // optional socks5://host:port; when set, all outbound dials go through this proxy
InitialResponseBytesPreEncode int
}

type serverFile struct {
Expand All @@ -36,6 +37,10 @@ type serverFile struct {
// datacenter IP is blocked by certain sites.
UpstreamProxy string `json:"upstream_proxy"`

// Optional: cap the first downstream response for a newly opened session.
// 0 uses the server default.
InitialResponseBytesPreEncode int `json:"initial_response_bytes_pre_encode"`

// Legacy keys kept as fallback for existing deployments.
ListenAddr string `json:"listen_addr"`
AESKeyHex string `json:"aes_key_hex"`
Expand Down Expand Up @@ -104,12 +109,16 @@ func LoadServer(path string) (*Server, error) {
}
upstreamProxy = u.Host
}
if f.InitialResponseBytesPreEncode < 0 {
return nil, fmt.Errorf("initial_response_bytes_pre_encode must be >= 0")
}

c := Server{
ListenAddr: net.JoinHostPort(listenHost, strconv.Itoa(listenPort)),
AESKeyHex: key,
DebugTiming: f.DebugTiming,
UpstreamProxy: upstreamProxy,
ListenAddr: net.JoinHostPort(listenHost, strconv.Itoa(listenPort)),
AESKeyHex: key,
DebugTiming: f.DebugTiming,
UpstreamProxy: upstreamProxy,
InitialResponseBytesPreEncode: f.InitialResponseBytesPreEncode,
}
return &c, nil
}
50 changes: 50 additions & 0 deletions internal/config/server_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package config

import (
"os"
"path/filepath"
"strings"
"testing"
)

const testServerKeyHex = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"

func TestLoadServerInitialResponseBytesPreEncode(t *testing.T) {
path := filepath.Join(t.TempDir(), "server.json")
body := `{
"server_host": "127.0.0.1",
"server_port": 8443,
"tunnel_key": "` + testServerKeyHex + `",
"initial_response_bytes_pre_encode": 131072
}`
if err := os.WriteFile(path, []byte(body), 0o600); err != nil {
t.Fatalf("write config: %v", err)
}
cfg, err := LoadServer(path)
if err != nil {
t.Fatalf("LoadServer: %v", err)
}
if cfg.InitialResponseBytesPreEncode != 131072 {
t.Fatalf("InitialResponseBytesPreEncode = %d, want 131072", cfg.InitialResponseBytesPreEncode)
}
}

func TestLoadServerRejectsNegativeInitialResponseBytes(t *testing.T) {
path := filepath.Join(t.TempDir(), "server.json")
body := `{
"server_host": "127.0.0.1",
"server_port": 8443,
"tunnel_key": "` + testServerKeyHex + `",
"initial_response_bytes_pre_encode": -1
}`
if err := os.WriteFile(path, []byte(body), 0o600); err != nil {
t.Fatalf("write config: %v", err)
}
_, err := LoadServer(path)
if err == nil {
t.Fatal("LoadServer succeeded with negative initial_response_bytes_pre_encode")
}
if !strings.Contains(err.Error(), "initial_response_bytes_pre_encode") {
t.Fatalf("LoadServer err = %v, want initial_response_bytes_pre_encode validation", err)
}
}
76 changes: 48 additions & 28 deletions internal/exit/exit.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,12 @@ const (
// last drained session.
maxResponseBytesPreEncode = 22 * 1024 * 1024

// initialResponseBytesPreEncode caps the first downstream response for a
// newly-opened session. Apps Script buffers full HTTP responses, so keeping
// the first file-download/header burst small improves browser-visible start
// time without reducing later bulk throughput.
initialResponseBytesPreEncode = 512 * 1024

// dialFailureBackoff is how long we suppress repeated SYN dial attempts to a
// target after a structural network/DNS failure.
dialFailureBackoff = 2 * time.Second
Expand All @@ -116,21 +122,23 @@ const (

// Config is the VPS server's configuration.
type Config struct {
ListenAddr string // "0.0.0.0:8443"
AESKeyHex string // 64-char hex
DebugTiming bool // when true, log per-session dial breakdown and first-read latency
UpstreamProxy string // optional "host:port" of a local SOCKS5 proxy (e.g. WARP on 127.0.0.1:40000)
Version string // build version string (exposed in /healthz and version probe)
ListenAddr string // "0.0.0.0:8443"
AESKeyHex string // 64-char hex
DebugTiming bool // when true, log per-session dial breakdown and first-read latency
UpstreamProxy string // optional "host:port" of a local SOCKS5 proxy (e.g. WARP on 127.0.0.1:40000)
InitialResponseBytesPreEncode int // optional cap for first downstream response; <=0 uses default
Version string // build version string (exposed in /healthz and version probe)
}

// Server holds the per-process session state.
type Server struct {
cfg Config
aead *frame.Crypto
dial func(network, address string, timeout time.Duration) (net.Conn, error)
dns *dnsCache
debugTiming bool
version string
cfg Config
aead *frame.Crypto
dial func(network, address string, timeout time.Duration) (net.Conn, error)
dns *dnsCache
debugTiming bool
version string
initialResponseBytesPreEncode int

mu sync.Mutex
sessions map[[frame.SessionIDLen]byte]*session.Session
Expand Down Expand Up @@ -178,23 +186,28 @@ func New(cfg Config) (*Server, error) {
return nil, err
}
dialFn := dialFunc(cfg.UpstreamProxy)
initialResponseCap := cfg.InitialResponseBytesPreEncode
if initialResponseCap <= 0 {
initialResponseCap = initialResponseBytesPreEncode
}
s := &Server{
cfg: cfg,
aead: aead,
dial: dialFn,
dns: newDNSCache(),
debugTiming: cfg.DebugTiming,
version: cfg.Version,
sessions: make(map[[frame.SessionIDLen]byte]*session.Session),
sessionOwners: make(map[[frame.SessionIDLen]byte][frame.ClientIDLen]byte),
txReady: make(map[[frame.SessionIDLen]byte]struct{}),
firstReply: make(map[[frame.SessionIDLen]byte]struct{}),
upstreams: make(map[[frame.SessionIDLen]byte]net.Conn),
lastActivity: make(map[[frame.SessionIDLen]byte]time.Time),
dialFail: make(map[string]time.Time),
pendingRSTs: make(map[[frame.ClientIDLen]byte][]*frame.Frame),
pendingCtrl: make(map[[frame.ClientIDLen]byte][]*frame.Frame),
activity: make(map[[frame.ClientIDLen]byte]chan struct{}),
cfg: cfg,
aead: aead,
dial: dialFn,
dns: newDNSCache(),
debugTiming: cfg.DebugTiming,
version: cfg.Version,
initialResponseBytesPreEncode: initialResponseCap,
sessions: make(map[[frame.SessionIDLen]byte]*session.Session),
sessionOwners: make(map[[frame.SessionIDLen]byte][frame.ClientIDLen]byte),
txReady: make(map[[frame.SessionIDLen]byte]struct{}),
firstReply: make(map[[frame.SessionIDLen]byte]struct{}),
upstreams: make(map[[frame.SessionIDLen]byte]net.Conn),
lastActivity: make(map[[frame.SessionIDLen]byte]time.Time),
dialFail: make(map[string]time.Time),
pendingRSTs: make(map[[frame.ClientIDLen]byte][]*frame.Frame),
pendingCtrl: make(map[[frame.ClientIDLen]byte][]*frame.Frame),
activity: make(map[[frame.ClientIDLen]byte]chan struct{}),
}
s.upstreamReadPool.New = func() interface{} {
buf := make([]byte, upstreamReadBuf)
Expand Down Expand Up @@ -695,7 +708,14 @@ func (s *Server) drainAll(owner [frame.ClientIDLen]byte, byteBudget int) ([]*fra
if remaining < perSessionCap {
perSessionCap = remaining
}
frames := sess.DrainTxLimited(MaxFramePayload, perSessionCap)
maxPayload := MaxFramePayload
if _, isFirst := s.firstReply[id]; isFirst && perSessionCap > 0 {
firstPayload := (s.initialResponseBytesPreEncode + perSessionCap - 1) / perSessionCap
if firstPayload > 0 && firstPayload < maxPayload {
maxPayload = firstPayload
}
}
frames := sess.DrainTxLimited(maxPayload, perSessionCap)
// Only clear from txReady when fully drained. A partial drain (cap
// hit before all data + a trailing FIN could be emitted) needs to
// stay queued, otherwise the session is stranded with no path back
Expand Down
50 changes: 50 additions & 0 deletions internal/exit/exit_timing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,56 @@ func TestDrainAll_RespectsByteBudget(t *testing.T) {
}
}

func TestDrainAll_CapsInitialResponseOnly(t *testing.T) {
s := mustExitTimingServer(t)
id := benchSessionID(777)
var owner [frame.ClientIDLen]byte
owner[0] = 0x77

payload := bytes.Repeat([]byte("x"), maxResponseBytesPreEncode)
sess := session.New(id, "x:1", false)
sess.EnqueueTx(payload)
s.sessions[id] = sess
s.sessionOwners[id] = owner
s.firstReply[id] = struct{}{}
s.txReady[id] = struct{}{}

frames, urgent := s.drainAll(owner, maxResponseBytesPreEncode)
if !urgent {
t.Fatal("first downstream response should be urgent")
}
firstBytes := sumFramePayloadBytes(frames)
if firstBytes == 0 {
t.Fatal("first drain returned no payload")
}
if firstBytes > s.initialResponseBytesPreEncode {
t.Fatalf("first response bytes = %d, want <= %d", firstBytes, s.initialResponseBytesPreEncode)
}
if _, stillFirst := s.firstReply[id]; stillFirst {
t.Fatal("firstReply marker was not cleared after first downstream drain")
}
if !sess.HasPendingTx() {
t.Fatal("test setup expected payload to remain after capped first response")
}

frames, urgent = s.drainAll(owner, maxResponseBytesPreEncode)
if urgent {
t.Fatal("second downstream response should not be urgent after firstReply is cleared")
}
secondBytes := sumFramePayloadBytes(frames)
if secondBytes <= s.initialResponseBytesPreEncode {
t.Fatalf("second response bytes = %d, want normal larger drain above %d", secondBytes, s.initialResponseBytesPreEncode)
}
}

func sumFramePayloadBytes(frames []*frame.Frame) int {
var total int
for _, f := range frames {
total += len(f.Payload)
}
return total
}

// BenchmarkExitRouteIncoming_NSessions measures the cost of routing a data
// frame to one of N already-open sessions on the server. This surfaces any
// regression in lock contention or per-frame routing work as session fan-out
Expand Down
Loading