Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
255 changes: 158 additions & 97 deletions internal/carrier/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,12 @@ const (
// (see idleBackoff) extends this when consecutive polls return no work.
pollIdleSleep = 10 * time.Millisecond

// pureDownloadIdleCap is the minimum number of concurrent idle long-polls
// allowed in pure-download mode (no pending TX). The actual cap is
// max(pureDownloadIdleCap, len(endpoints)) so multi-endpoint configs get
// one idle poll per deployment. This floor ensures single-endpoint configs
// keep two slots for redundancy during the pollIdleSleep re-entry window.
// Previously this was numWorkers-1 (issue #41: excessive empty POSTs);
// a hard cap of 2 overcorrected for multi-endpoint configs (issue #73).
// pureDownloadIdleCap is referenced by sanity assertions in the
// idle-poll tests. The runtime cap is bucketCount × idleSlotsPerBucket,
// applied inside pickRelayEndpoint; this constant is the floor a single
// endpoint should provide via implicit per-URL bucketing (unlabeled
// endpoints each get their own bucket, so 1 endpoint = 1 bucket = at
// least 1 slot; the test asserts ≥ this floor as a smoke check).
pureDownloadIdleCap = 2

// pollTimeout is the per-request HTTP ceiling; should comfortably exceed
Expand Down Expand Up @@ -216,6 +215,14 @@ type relayEndpoint struct {
statsOK uint64
statsFail uint64

// bucket is the key into Client.inFlightByBucket. For labeled endpoints
// it is "acct:"+account so all deployments under one Google account share
// a single in-flight semaphore (Apps Script throttles per-account). For
// unlabeled endpoints it is "url:"+url so each deployment gets its own
// implicit semaphore — that matches v1.5 behavior where each endpoint
// was independently rate-managed.
bucket string

// Per-quota-window counters. dailyCount is the number of HTTP responses
// received from Apps Script in the current window; dailyResetAt is the
// next midnight Pacific (the boundary at which Apps Script resets the
Expand Down Expand Up @@ -270,9 +277,9 @@ type Client struct {
httpClients []*http.Client // one per SNI host; round-robined per request
nextHTTP atomic.Uint64 // round-robin index into httpClients
debugTiming bool
numWorkers int // (workersPerEndpoint + idleSlotsPerBucket - 1) × bucketCount
bucketCount int // distinct account labels in endpoints; unlabeled all share one bucket
idleSlotsPerBucket int // resolved from Config.IdleSlotsPerBucket, default 1
numWorkers int // workersPerEndpoint × len(endpoints); semaphore caps actual in-flight
bucketCount int // distinct in-flight buckets; one per labeled account, plus one per unlabeled endpoint
idleSlotsPerBucket int // resolved from Config.IdleSlotsPerBucket; max concurrent polls per bucket
clientVersion string

// clientID is a random 16-byte identifier minted once per process. It is
Expand All @@ -290,12 +297,15 @@ type Client struct {
inFlight map[[frame.SessionIDLen]byte]bool
txReady map[[frame.SessionIDLen]byte]struct{} // sessions with pending TX frames

endpointMu sync.Mutex
endpoints []relayEndpoint
nextEndpoint int

idlePollMu sync.Mutex
idlePollInFlight int
// endpointMu protects endpoints (per-endpoint state), nextEndpoint
// (picker round-robin cursor), and inFlightByBucket (per-account
// in-flight semaphore counters). Single mutex because pickRelayEndpoint
// needs to atomically (a) find an eligible endpoint and (b) reserve a
// semaphore slot.
endpointMu sync.Mutex
endpoints []relayEndpoint
nextEndpoint int
inFlightByBucket map[string]int // bucket key → current in-flight poll count

wake *waker // broadcasts to all idle poll goroutines simultaneously
stats clientStats
Expand Down Expand Up @@ -348,28 +358,35 @@ func New(cfg Config) (*Client, error) {
if i < len(cfg.ScriptAccounts) {
account = strings.TrimSpace(cfg.ScriptAccounts[i])
}
endpoints = append(endpoints, relayEndpoint{url: url, account: account})
ep := relayEndpoint{url: url, account: account}
if account != "" {
ep.bucket = "acct:" + account
} else {
ep.bucket = "url:" + url
}
endpoints = append(endpoints, ep)
}
if len(endpoints) == 0 {
return nil, fmt.Errorf("at least one script URL is required")
}

// Concurrency scales with distinct Google account "buckets", not endpoint
// count. Apps Script's per-second concurrency cap and ~20k UrlFetchApp/day
// quota are per-account: scaling workers/idle-slots by endpoint count
// (pre-fix behavior) overloads users who deploy multiple IDs under one
// account, causing Apps Script to return HTML error pages instead of the
// encrypted batch (issue #56). Unlabeled endpoints all share one anonymous
// bucket so legacy configs default to v1.2.0-equivalent load.
accountSeen := make(map[string]struct{}, len(endpoints))
// Each Google account is one in-flight bucket. Endpoints without an
// account label each get their own bucket (Apps Script throttles per
// account; we can't tell unlabeled deployments apart, so we conservatively
// assume they're all distinct — which matches v1.5 behavior where each
// endpoint was independently rate-managed). The in-flight semaphore on
// each bucket caps concurrent polls hitting that account, preserving the
// per-account anti-abuse protection that motivated v1.6's bucketing
// (issue #56) without partitioning the worker pool itself.
bucketSeen := make(map[string]struct{}, len(endpoints))
labeled := 0
for _, ep := range endpoints {
accountSeen[ep.account] = struct{}{}
bucketSeen[ep.bucket] = struct{}{}
if ep.account != "" {
labeled++
}
}
bucketCount := len(accountSeen)
bucketCount := len(bucketSeen)

var clientID [frame.ClientIDLen]byte
if _, err := rand.Read(clientID[:]); err != nil {
Expand All @@ -382,22 +399,31 @@ func New(cfg Config) (*Client, error) {
if idleSlotsPerBucket <= 0 {
idleSlotsPerBucket = 1
}
// Worker count scales with idleSlotsPerBucket so the TX pool isn't
// drained when the user opts up the RX cap. With workersPerEndpoint=3
// and idleSlotsPerBucket=1, this reduces to the old 3×bucketCount.
// At idleSlotsPerBucket=2, each bucket gets +1 worker so the same
// number of workers stay free for TX after the extra idle slot is
// camped — the alternative (fixed worker count) starves session
// establishment under TX bursts when more workers are tied to long
// polls.
numWorkers := (workersPerEndpoint + idleSlotsPerBucket - 1) * bucketCount
log.Printf("[carrier] %d worker(s) across %d account bucket(s) (%d endpoint(s)), %d idle slot(s)/bucket",
numWorkers, bucketCount, len(endpoints), idleSlotsPerBucket)
if labeled == 0 && len(endpoints) > 1 {
log.Printf("[carrier] WARN: %d deployments configured with no account labels — treating as one bucket. "+
"If these deployments are under different Google accounts, label them in script_keys "+
"as {\"id\": \"...\", \"account\": \"A\"} to unlock per-account parallelism.",
len(endpoints))
// Single-bucket configs (one endpoint or one labeled account) need at
// least pureDownloadIdleCap idle slots so the gap during pollIdleSleep
// re-entry doesn't stall pure-download throughput (one slot is held by
// the active long-poll; the other rotates in as that one returns).
// Multi-bucket configs already have multiple concurrent slots across
// buckets, so the per-bucket floor only matters when bucketCount=1.
if bucketCount == 1 && idleSlotsPerBucket < pureDownloadIdleCap {
idleSlotsPerBucket = pureDownloadIdleCap
}
// Worker count scales with endpoint count (v1.5 behavior). v1.6's
// bucket-scaled worker pool starved the picker on the common case of
// multiple deployments under one account or unlabeled configs —
// issue #113 (slower than v1.5 despite "more workers") and the
// implicit regression for legacy configs (5 unlabeled endpoints gave
// only 4 workers vs v1.5's 15). The per-bucket idle-slot semaphore
// (pickIdleEndpoint) still caps simultaneous standing polls per
// account so issue #56 stays fixed; active polls bypass that cap
// because they terminate quickly with TX delivery.
numWorkers := workersPerEndpoint * len(endpoints)
if labeled > 0 || len(endpoints) == 1 {
log.Printf("[carrier] %d worker(s) across %d bucket(s) (%d endpoint(s)), %d idle slot(s)/bucket",
numWorkers, bucketCount, len(endpoints), idleSlotsPerBucket)
} else {
log.Printf("[carrier] %d worker(s) across %d endpoint(s) (no account labels — each endpoint is its own bucket), %d idle slot(s)/endpoint",
numWorkers, len(endpoints), idleSlotsPerBucket)
}

return &Client{
Expand All @@ -414,6 +440,7 @@ func New(cfg Config) (*Client, error) {
inFlight: make(map[[frame.SessionIDLen]byte]bool),
txReady: make(map[[frame.SessionIDLen]byte]struct{}),
endpoints: endpoints,
inFlightByBucket: make(map[string]int, bucketCount),
wake: newWaker(),
coalesceStep: cfg.CoalesceStep,
coalesceMax: cfg.CoalesceMax,
Expand Down Expand Up @@ -659,32 +686,18 @@ func (c *Client) pollOnce(ctx context.Context) bool {
c.rollbackDrained(snaps)
}
}()
// Idle long-polls (no TX) are subject to the per-bucket idle slot cap so
// each Google account holds at most idleSlotsPerBucket simultaneous
// standing polls — Apps Script anti-abuse fires when one account sees
// too many concurrent UrlFetchApp invocations (issue #56). Active polls
// (TX present) bypass the cap because they terminate quickly with the
// drained batch; this matches v1.5 behavior. The reservation is tracked
// across the attempt loop so same-poll failovers don't hold two slots.
isIdlePoll := len(frames) == 0
if isIdlePoll {
// Allow idleSlotsPerBucket idle long-poll slots per *account bucket* so
// each Google account's quota gets the configured number of standing
// polls for downstream push. History: a fixed cap of 1 (v1.2.0) starved
// multi-deployment configs. Issue #41's fix to numWorkers-1 woke every
// long-poll on every chunk and amplified upload bandwidth N-fold.
// Issue #73's fix to max(2, len(endpoints)) gave each deployment a slot
// — but when multiple deployments shared one Google account, that
// overloaded the per-account concurrency cap (issue #56). Scaling by
// bucket count is the natural unit Apps Script throttles on; the
// idleSlotsPerBucket multiplier lets users who've validated their
// accounts can sustain >1 simultaneous poll opt up. pureDownloadIdleCap
// is the floor that keeps a single-bucket config from regressing to a
// single standing poll.
c.mu.Lock()
idleCap := c.bucketCount * c.idleSlotsPerBucket
if len(c.txReady) == 0 && idleCap < pureDownloadIdleCap {
idleCap = pureDownloadIdleCap
}
c.mu.Unlock()
if !c.acquireIdlePollSlot(idleCap) {
return false
}
defer c.releaseIdlePollSlot()
}
pickedIdleIdx := -1
defer func() {
c.releaseBucketSlot(pickedIdleIdx)
}()

// Stats: classify poll outcome on return so callers don't have to remember
// to bump counters at every terminal point inside the retry loop.
Expand Down Expand Up @@ -717,20 +730,36 @@ func (c *Client) pollOnce(ctx context.Context) bool {
}

for attempt := 1; attempt <= maxAttempts; attempt++ {
endpointIdx, scriptURL := c.pickRelayEndpoint()
// On retry, release the previous attempt's idle slot (if held) so
// a same-poll failover doesn't hold two slots simultaneously.
if pickedIdleIdx >= 0 {
c.releaseBucketSlot(pickedIdleIdx)
pickedIdleIdx = -1
}
var endpointIdx int
var scriptURL string
if isIdlePoll {
endpointIdx, scriptURL = c.pickIdleEndpoint()
} else {
endpointIdx, scriptURL = c.pickRelayEndpoint()
}
if endpointIdx < 0 || scriptURL == "" {
c.endpointMu.Lock()
anyConfigured := len(c.endpoints) > 0
c.endpointMu.Unlock()
if !anyConfigured {
log.Printf("[carrier] no relay script URLs are configured")
}
// Otherwise: all endpoints are blacklisted; per-endpoint blacklist
// logs were already emitted at the failing transitions. Returning
// false here lets the worker's idle-backoff wait out the soonest
// TTL without sending any traffic to flagged deployments.
// Otherwise: either all endpoints are blacklisted, or (idle
// path only) every non-blacklisted bucket is already at its
// idle cap. Per-endpoint blacklist logs were emitted at the
// failing transitions; cap pressure is normal under high
// concurrent download load. The worker idle-backs off.
return false
}
if isIdlePoll {
pickedIdleIdx = endpointIdx
}

req, err := http.NewRequestWithContext(ctx, http.MethodPost, scriptURL, bytes.NewReader(body))
if err != nil {
Expand Down Expand Up @@ -897,6 +926,11 @@ func (c *Client) pickHTTPClient() *http.Client {
return c.httpClients[idx%uint64(len(c.httpClients))]
}

// pickRelayEndpoint picks the next non-blacklisted endpoint in round-robin
// order. The per-bucket in-flight semaphore is enforced separately by
// acquireBucketSlot/releaseBucketSlot — only idle long-polls are gated by it
// (matches v1.5 behavior; active polls carrying TX terminate quickly with the
// drained payload and don't camp an account's concurrency budget).
func (c *Client) pickRelayEndpoint() (int, string) {
c.endpointMu.Lock()
defer c.endpointMu.Unlock()
Expand All @@ -917,15 +951,60 @@ func (c *Client) pickRelayEndpoint() (int, string) {
return idx, ep.url
}

// All endpoints are temporarily blacklisted. Refuse to send rather than
// hammer already-flagged deployments. The worker loop will idle-backoff
// until the soonest TTL elapses; at that point the first loop above
// picks the newly-available endpoint. Hammering during outages plausibly
// extends Apps Script's per-deployment cooldown beyond the 24h daily
// reset window (see issues #121 and #126).
// Every endpoint is blacklisted. Refuse to send rather than hammer
// flagged deployments (issues #121, #126). The worker will idle-backoff
// until the soonest TTL elapses.
return -1, ""
}

// pickIdleEndpoint is like pickRelayEndpoint but also requires the candidate
// endpoint's bucket to have an idle long-poll slot available, and reserves
// that slot atomically. Callers MUST pair a successful pick (idx >= 0) with
// releaseBucketSlot(idx). Returns -1 if every non-blacklisted endpoint's
// bucket is already at the per-bucket idle cap — the worker idle-backs off.
func (c *Client) pickIdleEndpoint() (int, string) {
c.endpointMu.Lock()
defer c.endpointMu.Unlock()

n := len(c.endpoints)
if n == 0 {
return -1, ""
}
now := time.Now()
start := c.nextEndpoint % n
for i := 0; i < n; i++ {
idx := (start + i) % n
ep := &c.endpoints[idx]
if ep.blacklistedTill.After(now) {
continue
}
if c.inFlightByBucket[ep.bucket] >= c.idleSlotsPerBucket {
continue
}
c.inFlightByBucket[ep.bucket]++
c.nextEndpoint = (idx + 1) % n
return idx, ep.url
}
return -1, ""
}

// releaseBucketSlot frees the idle slot reserved by pickIdleEndpoint. Safe
// to call with idx < 0 (no-op).
func (c *Client) releaseBucketSlot(idx int) {
if idx < 0 {
return
}
c.endpointMu.Lock()
defer c.endpointMu.Unlock()
if idx >= len(c.endpoints) {
return
}
bucket := c.endpoints[idx].bucket
if c.inFlightByBucket[bucket] > 0 {
c.inFlightByBucket[bucket]--
}
}

func (c *Client) resetLocalNetworkFailures() int {
c.endpointMu.Lock()
defer c.endpointMu.Unlock()
Expand Down Expand Up @@ -1229,24 +1308,6 @@ func (c *Client) gcDoneSessions() {
}
}

func (c *Client) acquireIdlePollSlot(cap int) bool {
c.idlePollMu.Lock()
defer c.idlePollMu.Unlock()
if c.idlePollInFlight >= cap {
return false
}
c.idlePollInFlight++
return true
}

func (c *Client) releaseIdlePollSlot() {
c.idlePollMu.Lock()
defer c.idlePollMu.Unlock()
if c.idlePollInFlight > 0 {
c.idlePollInFlight--
}
}

// kick broadcasts to all idle poll workers. Safe to call from any goroutine.
//
// When adaptive coalescing is enabled (coalesceStep > 0) kicks within a
Expand Down
Loading