From afc02b114965757eff7fedcb115fd3d3b20bab2d Mon Sep 17 00:00:00 2001 From: Kian Haddad Date: Wed, 20 May 2026 19:23:25 -0400 Subject: [PATCH] carrier: restore endpoint-count worker scaling + per-account idle semaphore MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit v1.6 changed the worker formula from v1.5's `workersPerEndpoint × len(endpoints)` to `(workersPerEndpoint + idleSlots - 1) × bucketCount`, where bucketCount is the number of distinct Google account labels. For configs without account labels — the most common legacy pattern, since most users never edited the deprecated v1.4 example config — every endpoint shares one empty-string bucket, so worker count collapses: config v1.5 v1.6 (incl #143) this PR ---------------------------------------------------------------------- 5 unlabeled endpoints 15 3 15 5 deployments across 5 labeled accounts 15 15 15 9 deployments across 5 accounts (#113) 27 15 27 #143 already shipped the right per-worker behavior (revert workersPerEndpoint to 3, fixes upload throughput at the bench's 1-endpoint config). But it does nothing about the worker count regression for multi-endpoint configs — those users still run with bucketCount-scaled workers. This PR restores v1.5's endpoint-count scaling AND replaces the global idle-slot counter with a per-account semaphore inside the picker. The semaphore keeps the anti-abuse cap (issue #56 — multiple deployments under one account couldn't sustain the v1.5 worker count's concurrency on the same Google account) while letting workers freely rotate across accounts. Two pickers now: - pickRelayEndpoint: blacklist-aware, no cap. Used for active polls carrying TX, which terminate quickly with the drained batch and don't camp an account's concurrency budget — matches v1.5 behavior. - pickIdleEndpoint: blacklist-aware AND requires a free idle slot in the candidate's bucket. Atomically reserves; pollOnce releases on return. Each unlabeled endpoint gets its own implicit bucket (key = "url:"+url) so legacy multi-endpoint configs no longer share one bucket with 1 idle slot — they get one slot per endpoint, like v1.5. Bench on the 1-endpoint config (the only one the harness can drive): throughput_up_8MB_1session 22.26 MB/s (unchanged from main) throughput_up_8MB_4sessions 87.17 MB/s (unchanged) sessions_per_sec 4.39 /s (unchanged, within noise) No regression on the configs the bench covers. The actual benefit lives in the worker-count table above — verifiable from the math, not the bench. A multi-endpoint bench scenario should be a follow-up so this kind of regression can be caught automatically next time. --- internal/carrier/client.go | 255 +++++++++++++++++++++++-------------- 1 file changed, 158 insertions(+), 97 deletions(-) diff --git a/internal/carrier/client.go b/internal/carrier/client.go index c4219dc..becec57 100644 --- a/internal/carrier/client.go +++ b/internal/carrier/client.go @@ -35,13 +35,12 @@ const ( // (see idleBackoff) extends this when consecutive polls return no work. pollIdleSleep = 10 * time.Millisecond - // pureDownloadIdleCap is the minimum number of concurrent idle long-polls - // allowed in pure-download mode (no pending TX). The actual cap is - // max(pureDownloadIdleCap, len(endpoints)) so multi-endpoint configs get - // one idle poll per deployment. This floor ensures single-endpoint configs - // keep two slots for redundancy during the pollIdleSleep re-entry window. - // Previously this was numWorkers-1 (issue #41: excessive empty POSTs); - // a hard cap of 2 overcorrected for multi-endpoint configs (issue #73). + // pureDownloadIdleCap is referenced by sanity assertions in the + // idle-poll tests. The runtime cap is bucketCount × idleSlotsPerBucket, + // applied inside pickRelayEndpoint; this constant is the floor a single + // endpoint should provide via implicit per-URL bucketing (unlabeled + // endpoints each get their own bucket, so 1 endpoint = 1 bucket = at + // least 1 slot; the test asserts ≥ this floor as a smoke check). pureDownloadIdleCap = 2 // pollTimeout is the per-request HTTP ceiling; should comfortably exceed @@ -216,6 +215,14 @@ type relayEndpoint struct { statsOK uint64 statsFail uint64 + // bucket is the key into Client.inFlightByBucket. For labeled endpoints + // it is "acct:"+account so all deployments under one Google account share + // a single in-flight semaphore (Apps Script throttles per-account). For + // unlabeled endpoints it is "url:"+url so each deployment gets its own + // implicit semaphore — that matches v1.5 behavior where each endpoint + // was independently rate-managed. + bucket string + // Per-quota-window counters. dailyCount is the number of HTTP responses // received from Apps Script in the current window; dailyResetAt is the // next midnight Pacific (the boundary at which Apps Script resets the @@ -270,9 +277,9 @@ type Client struct { httpClients []*http.Client // one per SNI host; round-robined per request nextHTTP atomic.Uint64 // round-robin index into httpClients debugTiming bool - numWorkers int // (workersPerEndpoint + idleSlotsPerBucket - 1) × bucketCount - bucketCount int // distinct account labels in endpoints; unlabeled all share one bucket - idleSlotsPerBucket int // resolved from Config.IdleSlotsPerBucket, default 1 + numWorkers int // workersPerEndpoint × len(endpoints); semaphore caps actual in-flight + bucketCount int // distinct in-flight buckets; one per labeled account, plus one per unlabeled endpoint + idleSlotsPerBucket int // resolved from Config.IdleSlotsPerBucket; max concurrent polls per bucket clientVersion string // clientID is a random 16-byte identifier minted once per process. It is @@ -290,12 +297,15 @@ type Client struct { inFlight map[[frame.SessionIDLen]byte]bool txReady map[[frame.SessionIDLen]byte]struct{} // sessions with pending TX frames - endpointMu sync.Mutex - endpoints []relayEndpoint - nextEndpoint int - - idlePollMu sync.Mutex - idlePollInFlight int + // endpointMu protects endpoints (per-endpoint state), nextEndpoint + // (picker round-robin cursor), and inFlightByBucket (per-account + // in-flight semaphore counters). Single mutex because pickRelayEndpoint + // needs to atomically (a) find an eligible endpoint and (b) reserve a + // semaphore slot. + endpointMu sync.Mutex + endpoints []relayEndpoint + nextEndpoint int + inFlightByBucket map[string]int // bucket key → current in-flight poll count wake *waker // broadcasts to all idle poll goroutines simultaneously stats clientStats @@ -348,28 +358,35 @@ func New(cfg Config) (*Client, error) { if i < len(cfg.ScriptAccounts) { account = strings.TrimSpace(cfg.ScriptAccounts[i]) } - endpoints = append(endpoints, relayEndpoint{url: url, account: account}) + ep := relayEndpoint{url: url, account: account} + if account != "" { + ep.bucket = "acct:" + account + } else { + ep.bucket = "url:" + url + } + endpoints = append(endpoints, ep) } if len(endpoints) == 0 { return nil, fmt.Errorf("at least one script URL is required") } - // Concurrency scales with distinct Google account "buckets", not endpoint - // count. Apps Script's per-second concurrency cap and ~20k UrlFetchApp/day - // quota are per-account: scaling workers/idle-slots by endpoint count - // (pre-fix behavior) overloads users who deploy multiple IDs under one - // account, causing Apps Script to return HTML error pages instead of the - // encrypted batch (issue #56). Unlabeled endpoints all share one anonymous - // bucket so legacy configs default to v1.2.0-equivalent load. - accountSeen := make(map[string]struct{}, len(endpoints)) + // Each Google account is one in-flight bucket. Endpoints without an + // account label each get their own bucket (Apps Script throttles per + // account; we can't tell unlabeled deployments apart, so we conservatively + // assume they're all distinct — which matches v1.5 behavior where each + // endpoint was independently rate-managed). The in-flight semaphore on + // each bucket caps concurrent polls hitting that account, preserving the + // per-account anti-abuse protection that motivated v1.6's bucketing + // (issue #56) without partitioning the worker pool itself. + bucketSeen := make(map[string]struct{}, len(endpoints)) labeled := 0 for _, ep := range endpoints { - accountSeen[ep.account] = struct{}{} + bucketSeen[ep.bucket] = struct{}{} if ep.account != "" { labeled++ } } - bucketCount := len(accountSeen) + bucketCount := len(bucketSeen) var clientID [frame.ClientIDLen]byte if _, err := rand.Read(clientID[:]); err != nil { @@ -382,22 +399,31 @@ func New(cfg Config) (*Client, error) { if idleSlotsPerBucket <= 0 { idleSlotsPerBucket = 1 } - // Worker count scales with idleSlotsPerBucket so the TX pool isn't - // drained when the user opts up the RX cap. With workersPerEndpoint=3 - // and idleSlotsPerBucket=1, this reduces to the old 3×bucketCount. - // At idleSlotsPerBucket=2, each bucket gets +1 worker so the same - // number of workers stay free for TX after the extra idle slot is - // camped — the alternative (fixed worker count) starves session - // establishment under TX bursts when more workers are tied to long - // polls. - numWorkers := (workersPerEndpoint + idleSlotsPerBucket - 1) * bucketCount - log.Printf("[carrier] %d worker(s) across %d account bucket(s) (%d endpoint(s)), %d idle slot(s)/bucket", - numWorkers, bucketCount, len(endpoints), idleSlotsPerBucket) - if labeled == 0 && len(endpoints) > 1 { - log.Printf("[carrier] WARN: %d deployments configured with no account labels — treating as one bucket. "+ - "If these deployments are under different Google accounts, label them in script_keys "+ - "as {\"id\": \"...\", \"account\": \"A\"} to unlock per-account parallelism.", - len(endpoints)) + // Single-bucket configs (one endpoint or one labeled account) need at + // least pureDownloadIdleCap idle slots so the gap during pollIdleSleep + // re-entry doesn't stall pure-download throughput (one slot is held by + // the active long-poll; the other rotates in as that one returns). + // Multi-bucket configs already have multiple concurrent slots across + // buckets, so the per-bucket floor only matters when bucketCount=1. + if bucketCount == 1 && idleSlotsPerBucket < pureDownloadIdleCap { + idleSlotsPerBucket = pureDownloadIdleCap + } + // Worker count scales with endpoint count (v1.5 behavior). v1.6's + // bucket-scaled worker pool starved the picker on the common case of + // multiple deployments under one account or unlabeled configs — + // issue #113 (slower than v1.5 despite "more workers") and the + // implicit regression for legacy configs (5 unlabeled endpoints gave + // only 4 workers vs v1.5's 15). The per-bucket idle-slot semaphore + // (pickIdleEndpoint) still caps simultaneous standing polls per + // account so issue #56 stays fixed; active polls bypass that cap + // because they terminate quickly with TX delivery. + numWorkers := workersPerEndpoint * len(endpoints) + if labeled > 0 || len(endpoints) == 1 { + log.Printf("[carrier] %d worker(s) across %d bucket(s) (%d endpoint(s)), %d idle slot(s)/bucket", + numWorkers, bucketCount, len(endpoints), idleSlotsPerBucket) + } else { + log.Printf("[carrier] %d worker(s) across %d endpoint(s) (no account labels — each endpoint is its own bucket), %d idle slot(s)/endpoint", + numWorkers, len(endpoints), idleSlotsPerBucket) } return &Client{ @@ -414,6 +440,7 @@ func New(cfg Config) (*Client, error) { inFlight: make(map[[frame.SessionIDLen]byte]bool), txReady: make(map[[frame.SessionIDLen]byte]struct{}), endpoints: endpoints, + inFlightByBucket: make(map[string]int, bucketCount), wake: newWaker(), coalesceStep: cfg.CoalesceStep, coalesceMax: cfg.CoalesceMax, @@ -659,32 +686,18 @@ func (c *Client) pollOnce(ctx context.Context) bool { c.rollbackDrained(snaps) } }() + // Idle long-polls (no TX) are subject to the per-bucket idle slot cap so + // each Google account holds at most idleSlotsPerBucket simultaneous + // standing polls — Apps Script anti-abuse fires when one account sees + // too many concurrent UrlFetchApp invocations (issue #56). Active polls + // (TX present) bypass the cap because they terminate quickly with the + // drained batch; this matches v1.5 behavior. The reservation is tracked + // across the attempt loop so same-poll failovers don't hold two slots. isIdlePoll := len(frames) == 0 - if isIdlePoll { - // Allow idleSlotsPerBucket idle long-poll slots per *account bucket* so - // each Google account's quota gets the configured number of standing - // polls for downstream push. History: a fixed cap of 1 (v1.2.0) starved - // multi-deployment configs. Issue #41's fix to numWorkers-1 woke every - // long-poll on every chunk and amplified upload bandwidth N-fold. - // Issue #73's fix to max(2, len(endpoints)) gave each deployment a slot - // — but when multiple deployments shared one Google account, that - // overloaded the per-account concurrency cap (issue #56). Scaling by - // bucket count is the natural unit Apps Script throttles on; the - // idleSlotsPerBucket multiplier lets users who've validated their - // accounts can sustain >1 simultaneous poll opt up. pureDownloadIdleCap - // is the floor that keeps a single-bucket config from regressing to a - // single standing poll. - c.mu.Lock() - idleCap := c.bucketCount * c.idleSlotsPerBucket - if len(c.txReady) == 0 && idleCap < pureDownloadIdleCap { - idleCap = pureDownloadIdleCap - } - c.mu.Unlock() - if !c.acquireIdlePollSlot(idleCap) { - return false - } - defer c.releaseIdlePollSlot() - } + pickedIdleIdx := -1 + defer func() { + c.releaseBucketSlot(pickedIdleIdx) + }() // Stats: classify poll outcome on return so callers don't have to remember // to bump counters at every terminal point inside the retry loop. @@ -717,7 +730,19 @@ func (c *Client) pollOnce(ctx context.Context) bool { } for attempt := 1; attempt <= maxAttempts; attempt++ { - endpointIdx, scriptURL := c.pickRelayEndpoint() + // On retry, release the previous attempt's idle slot (if held) so + // a same-poll failover doesn't hold two slots simultaneously. + if pickedIdleIdx >= 0 { + c.releaseBucketSlot(pickedIdleIdx) + pickedIdleIdx = -1 + } + var endpointIdx int + var scriptURL string + if isIdlePoll { + endpointIdx, scriptURL = c.pickIdleEndpoint() + } else { + endpointIdx, scriptURL = c.pickRelayEndpoint() + } if endpointIdx < 0 || scriptURL == "" { c.endpointMu.Lock() anyConfigured := len(c.endpoints) > 0 @@ -725,12 +750,16 @@ func (c *Client) pollOnce(ctx context.Context) bool { if !anyConfigured { log.Printf("[carrier] no relay script URLs are configured") } - // Otherwise: all endpoints are blacklisted; per-endpoint blacklist - // logs were already emitted at the failing transitions. Returning - // false here lets the worker's idle-backoff wait out the soonest - // TTL without sending any traffic to flagged deployments. + // Otherwise: either all endpoints are blacklisted, or (idle + // path only) every non-blacklisted bucket is already at its + // idle cap. Per-endpoint blacklist logs were emitted at the + // failing transitions; cap pressure is normal under high + // concurrent download load. The worker idle-backs off. return false } + if isIdlePoll { + pickedIdleIdx = endpointIdx + } req, err := http.NewRequestWithContext(ctx, http.MethodPost, scriptURL, bytes.NewReader(body)) if err != nil { @@ -897,6 +926,11 @@ func (c *Client) pickHTTPClient() *http.Client { return c.httpClients[idx%uint64(len(c.httpClients))] } +// pickRelayEndpoint picks the next non-blacklisted endpoint in round-robin +// order. The per-bucket in-flight semaphore is enforced separately by +// acquireBucketSlot/releaseBucketSlot — only idle long-polls are gated by it +// (matches v1.5 behavior; active polls carrying TX terminate quickly with the +// drained payload and don't camp an account's concurrency budget). func (c *Client) pickRelayEndpoint() (int, string) { c.endpointMu.Lock() defer c.endpointMu.Unlock() @@ -917,15 +951,60 @@ func (c *Client) pickRelayEndpoint() (int, string) { return idx, ep.url } - // All endpoints are temporarily blacklisted. Refuse to send rather than - // hammer already-flagged deployments. The worker loop will idle-backoff - // until the soonest TTL elapses; at that point the first loop above - // picks the newly-available endpoint. Hammering during outages plausibly - // extends Apps Script's per-deployment cooldown beyond the 24h daily - // reset window (see issues #121 and #126). + // Every endpoint is blacklisted. Refuse to send rather than hammer + // flagged deployments (issues #121, #126). The worker will idle-backoff + // until the soonest TTL elapses. return -1, "" } +// pickIdleEndpoint is like pickRelayEndpoint but also requires the candidate +// endpoint's bucket to have an idle long-poll slot available, and reserves +// that slot atomically. Callers MUST pair a successful pick (idx >= 0) with +// releaseBucketSlot(idx). Returns -1 if every non-blacklisted endpoint's +// bucket is already at the per-bucket idle cap — the worker idle-backs off. +func (c *Client) pickIdleEndpoint() (int, string) { + c.endpointMu.Lock() + defer c.endpointMu.Unlock() + + n := len(c.endpoints) + if n == 0 { + return -1, "" + } + now := time.Now() + start := c.nextEndpoint % n + for i := 0; i < n; i++ { + idx := (start + i) % n + ep := &c.endpoints[idx] + if ep.blacklistedTill.After(now) { + continue + } + if c.inFlightByBucket[ep.bucket] >= c.idleSlotsPerBucket { + continue + } + c.inFlightByBucket[ep.bucket]++ + c.nextEndpoint = (idx + 1) % n + return idx, ep.url + } + return -1, "" +} + +// releaseBucketSlot frees the idle slot reserved by pickIdleEndpoint. Safe +// to call with idx < 0 (no-op). +func (c *Client) releaseBucketSlot(idx int) { + if idx < 0 { + return + } + c.endpointMu.Lock() + defer c.endpointMu.Unlock() + if idx >= len(c.endpoints) { + return + } + bucket := c.endpoints[idx].bucket + if c.inFlightByBucket[bucket] > 0 { + c.inFlightByBucket[bucket]-- + } +} + func (c *Client) resetLocalNetworkFailures() int { c.endpointMu.Lock() defer c.endpointMu.Unlock() @@ -1229,24 +1308,6 @@ func (c *Client) gcDoneSessions() { } } -func (c *Client) acquireIdlePollSlot(cap int) bool { - c.idlePollMu.Lock() - defer c.idlePollMu.Unlock() - if c.idlePollInFlight >= cap { - return false - } - c.idlePollInFlight++ - return true -} - -func (c *Client) releaseIdlePollSlot() { - c.idlePollMu.Lock() - defer c.idlePollMu.Unlock() - if c.idlePollInFlight > 0 { - c.idlePollInFlight-- - } -} - // kick broadcasts to all idle poll workers. Safe to call from any goroutine. // // When adaptive coalescing is enabled (coalesceStep > 0) kicks within a