Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
211 changes: 203 additions & 8 deletions internal/carrier/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,18 @@
"bytes"
"context"
"crypto/rand"
"errors"
"fmt"
"io"
"log"
"net"
"net/http"
"os"
"sort"
"strings"
"sync"
"sync/atomic"
"syscall"
"time"

"github.com/kianmhz/GooseRelayVPN/internal/frame"
Expand Down Expand Up @@ -65,8 +69,87 @@
// or tail-latency events without changing protocol behavior.
endpointBlacklistBaseTTL = 3 * time.Second
endpointBlacklistMaxTTL = 1 * time.Hour

// Local offline failures should not ramp a mobile client into the 30m/1h
// endpoint penalty box. Keep the pause long enough to avoid a tight retry
// loop while airplane mode is on, but short enough that new sessions recover
// quickly when the network returns.
localNetworkOfflineBlacklistTTL = 15 * time.Second
localNetworkRecoveryProbeEvery = 5 * time.Second
localNetworkRecoveryProbeTO = 2 * time.Second
)

func isLocalNetworkOffline(err error) bool {
if err == nil {
return false
}
var dnsErr *net.DNSError
if errors.As(err, &dnsErr) {
if dnsErr.IsTimeout || dnsErr.IsTemporary || dnsErr.IsNotFound {
return true
}
}
var opErr *net.OpError
if errors.As(err, &opErr) && strings.EqualFold(opErr.Op, "dial") {
if opErr.Timeout() || errors.Is(opErr.Err, context.DeadlineExceeded) {
return true
}
}
var syscallErr *os.SyscallError
if errors.As(err, &syscallErr) && isLocalOfflineSyscall(syscallErr.Err) {
return true
}
if isLocalOfflineSyscall(err) {
return true
}

// Last-resort fallback for platform-specific wrapped messages, especially
// Windows WSA errors whose Errno values do not always compare cleanly after
// net/http wraps them in url.Error/net.OpError.
msg := strings.ToLower(err.Error())
for _, needle := range []string{
"network is unreachable",
"unreachable network",
"no route to host",
"network is down",
"host is down",
"host is unreachable",
"temporary failure in name resolution",
"no such host",
} {
if strings.Contains(msg, needle) {
return true
}
}
return false
}

func isLocalOfflineSyscall(err error) bool {
for _, target := range []error{
syscall.ENETUNREACH,
syscall.EHOSTUNREACH,
syscall.ENETDOWN,
syscall.EHOSTDOWN,
syscall.ENONET,

Check failure on line 133 in internal/carrier/client.go

View workflow job for this annotation

GitHub Actions / Test (macos-latest)

undefined: syscall.ENONET
} {
if errors.Is(err, target) {
return true
}
}
return false
}

func recoveryProbeAddress(cfg Config) string {
addr := strings.TrimSpace(cfg.Fronting.GoogleIP)
if addr == "" {
return ""
}
if _, _, err := net.SplitHostPort(addr); err == nil {
return addr
}
return net.JoinHostPort(addr, "443")
}

// Config bundles everything the carrier needs to talk to the relay.
type Config struct {
ScriptURLs []string // one or more full https://script.google.com/macros/s/.../exec URLs
Expand Down Expand Up @@ -99,12 +182,13 @@
}

type relayEndpoint struct {
url string
account string // optional human-readable Google account label, "" = unlabeled
blacklistedTill time.Time
failCount int
statsOK uint64
statsFail uint64
url string
account string // optional human-readable Google account label, "" = unlabeled
blacklistedTill time.Time
localNetworkOffline bool
failCount int
statsOK uint64
statsFail uint64

// Per-quota-window counters. dailyCount is the number of HTTP responses
// received from Apps Script in the current window; dailyResetAt is the
Expand Down Expand Up @@ -163,7 +247,7 @@
numWorkers int // (workersPerEndpoint + idleSlotsPerBucket - 1) × bucketCount
bucketCount int // distinct account labels in endpoints; unlabeled all share one bucket
idleSlotsPerBucket int // resolved from Config.IdleSlotsPerBucket, default 1
clientVersion string
clientVersion string

// clientID is a random 16-byte identifier minted once per process. It is
// embedded in every encrypted batch so the server can route downstream
Expand Down Expand Up @@ -197,6 +281,8 @@
coalesceMu sync.Mutex
coalesceTimer *time.Timer // armed during a coalesce window; nil otherwise
coalesceDeadline time.Time // hard cap for the in-flight window

recoveryProbeAddr string
}

// clientStats holds atomic counters surfaced periodically by statsLoop.
Expand Down Expand Up @@ -305,6 +391,7 @@
wake: newWaker(),
coalesceStep: cfg.CoalesceStep,
coalesceMax: cfg.CoalesceMax,
recoveryProbeAddr: recoveryProbeAddress(cfg),
}, nil
}

Expand Down Expand Up @@ -412,6 +499,11 @@
defer wg.Done()
c.runScriptStatsLoop(ctx)
}()
wg.Add(1)
go func() {
defer wg.Done()
c.runEndpointRecoveryLoop(ctx)
}()
wg.Wait()
return ctx.Err()
}
Expand Down Expand Up @@ -446,6 +538,65 @@
}
}

func (c *Client) runEndpointRecoveryLoop(ctx context.Context) {
if c.recoveryProbeAddr == "" {
return
}
t := time.NewTicker(localNetworkRecoveryProbeEvery)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
if c.runEndpointRecoveryProbeOnce(ctx) {
c.wake.Broadcast()
}
}
}
}

func (c *Client) runEndpointRecoveryProbeOnce(ctx context.Context) bool {
if c.recoveryProbeAddr == "" || !c.shouldRunLocalNetworkRecoveryProbe() {
return false
}
probeCtx, cancel := context.WithTimeout(ctx, localNetworkRecoveryProbeTO)
defer cancel()
dialer := net.Dialer{Timeout: localNetworkRecoveryProbeTO}
conn, err := dialer.DialContext(probeCtx, "tcp", c.recoveryProbeAddr)
if err != nil {
return false
}
_ = conn.Close()
cleared := c.resetLocalNetworkFailures()
if cleared > 0 {
log.Printf("[carrier] local network appears reachable again; cleared %d local-offline endpoint backoff(s)", cleared)
}
return cleared > 0
}

func (c *Client) shouldRunLocalNetworkRecoveryProbe() bool {
c.endpointMu.Lock()
defer c.endpointMu.Unlock()
if len(c.endpoints) == 0 {
return false
}
now := time.Now()
allUnavailable := true
hasLocalOffline := false
for i := range c.endpoints {
ep := &c.endpoints[i]
if !ep.blacklistedTill.After(now) {
allUnavailable = false
break
}
if ep.localNetworkOffline && ep.blacklistedTill.After(now) {
hasLocalOffline = true
}
}
return allUnavailable && hasLocalOffline
}

// idleBackoff returns how long a worker should sleep after n consecutive
// no-work polls. The wake channel is selected against this timer so any
// new TX (kick) cancels the sleep immediately and any held server-side
Expand Down Expand Up @@ -577,7 +728,11 @@
if ctx.Err() != nil {
return false
}
c.markEndpointFailure(endpointIdx)
if isLocalNetworkOffline(err) {
c.markEndpointLocalNetworkFailure(endpointIdx)
} else {
c.markEndpointFailure(endpointIdx)
}
if attempt < maxAttempts {
log.Printf("[carrier] relay request failed via %s (attempt %d/%d): %v; retrying alternate script", shortScriptKey(scriptURL), attempt, maxAttempts, err)
continue
Expand Down Expand Up @@ -745,6 +900,23 @@
return -1, ""
}

func (c *Client) resetLocalNetworkFailures() int {
c.endpointMu.Lock()
defer c.endpointMu.Unlock()
cleared := 0
for i := range c.endpoints {
ep := &c.endpoints[i]
if !ep.localNetworkOffline {
continue
}
ep.blacklistedTill = time.Time{}
ep.failCount = 0
ep.localNetworkOffline = false
cleared++
}
return cleared
}

func (c *Client) markEndpointSuccess(endpointIdx int) {
c.endpointMu.Lock()
if endpointIdx < 0 || endpointIdx >= len(c.endpoints) {
Expand All @@ -757,6 +929,7 @@
url := ep.url
ep.failCount = 0
ep.blacklistedTill = time.Time{}
ep.localNetworkOffline = false
c.endpointMu.Unlock()
if wasFailing {
log.Printf("[carrier] endpoint %s recovered (back in rotation)", shortScriptKey(url))
Expand All @@ -769,6 +942,27 @@
c.markEndpointFailureWith(endpointIdx, 0)
}

func (c *Client) markEndpointLocalNetworkFailure(endpointIdx int) {
c.endpointMu.Lock()
if endpointIdx < 0 || endpointIdx >= len(c.endpoints) {
c.endpointMu.Unlock()
return
}
ep := &c.endpoints[endpointIdx]
wasHealthy := ep.failCount == 0 && !ep.blacklistedTill.After(time.Now())
ep.failCount = 0
ep.statsFail++
ep.localNetworkOffline = true
ep.blacklistedTill = time.Now().Add(localNetworkOfflineBlacklistTTL)
url := ep.url
peerCount := len(c.endpoints) - 1
c.endpointMu.Unlock()
if wasHealthy {
log.Printf("[carrier] endpoint %s local network offline; retrying in %s (still rotating across %d others)",
shortScriptKey(url), localNetworkOfflineBlacklistTTL.Round(time.Second), peerCount)
}
}

// markEndpoint403 handles HTTP 403 (quota exhausted or deployment misconfigured).
// Quota walls don't self-heal in seconds; they persist until midnight Pacific.
// Jump straight to the 5-minute tier (failCount floor = 5 → next hit → 6 → 5 min)
Expand Down Expand Up @@ -807,6 +1001,7 @@
}
ep.failCount++
ep.statsFail++
ep.localNetworkOffline = false
ttl := endpointBlacklistTTL(ep.failCount)
ep.blacklistedTill = time.Now().Add(ttl)
url := ep.url
Expand Down
Loading
Loading