feat(google): fetch groups in parallel#4777
Conversation
nabokihms
left a comment
There was a problem hiding this comment.
Thanks for tackling this. Direction is right, but I'd like a few things addressed before merging.
| } | ||
|
|
||
| var traverse func(string) | ||
| traverse = func(groupEmail string) { |
There was a problem hiding this comment.
This whole block - WaitGroup + sync.Once + atomic.Bool + chan semaphore - is golang.org/x/sync/errgroup with SetLimit(n). It's already a transitive dep. Switching cuts ~40 lines, removes hand-rolled first-error logic, and gives us context cancellation for free. Please rewrite using errgroup.
| const ( | ||
| issuerURL = "https://accounts.google.com" | ||
| wildcardDomainToAdminEmail = "*" | ||
| maxConcurrentGroupLookups = 10 |
There was a problem hiding this comment.
Please move this to Config (keeping 10 as the default). At 200ms per call this already pushes 3000 req/min, which is over the default Directory API per-user quota. Tenants with stricter quotas will see 429s and there's no retry path.
| @@ -300,50 +304,121 @@ func (c *googleConnector) createIdentity(ctx context.Context, identity connector | |||
| // getGroups creates a connection to the admin directory service and lists | |||
| // all groups the user is a member of | |||
| func (c *googleConnector) getGroups(email string, fetchTransitiveGroupMembership bool, checkedGroups map[string]struct{}) ([]string, error) { | |||
There was a problem hiding this comment.
Once we're issuing parallel API calls, we need a context.Context here. createIdentity already has one - please thread it through getGroups - listGroupEmails - adminSrv.Groups.List().Context(ctx).Do(). Without it, a cancelled or timed-out login keeps burning Google Directory API quota until every in-flight goroutine returns.
| groupsList := &admin.Groups{} | ||
| domain := c.extractDomainFromEmail(email) | ||
| adminSrv, err := c.findAdminService(domain) | ||
| if checkedGroups == nil { |
There was a problem hiding this comment.
checkedGroups is now dead - the function is no longer recursive, the closure inside traverse owns the dedup map, and the only call-site passes nil. Please drop the parameter.
|
I implemented the suggestions. I also deployed them in my environment to test the changes. Thanks for the feedback. |
nabokihms
left a comment
There was a problem hiding this comment.
My feedback is mostly addressed - errgroup, ctx, config knob, dead param gone. Nice.
One blocker: recursive g.Go under SetLimit deadlocks. Need a regression test for it.
Other minor stuff inline. Also please squash to one commit and rename to feat(google): - sort order changed and there's a new config field, not really a refactor anymore (and do not forget to sign the commit, otherwise we cannot merge this).
| // TODO (joelspeed): Make desired group key configurable | ||
| userGroups = append(userGroups, group.Email) | ||
| g, gctx := errgroup.WithContext(ctx) | ||
| g.SetLimit(c.maxConcurrentGroupLookups) |
There was a problem hiding this comment.
errgroup.Group.Go() under SetLimit(N) blocks the caller on g.sem <- token{} when N goroutines are already active. Since enqueue calls g.Go recursively from inside an already-running worker, the workers can hold all N slots while simultaneously trying to enqueue more - none can return to release a slot.
Minimal repro: MaxConcurrentGroupLookups=1, user -> A -> B. Worker for A fetches parents, calls enqueue(B) -> g.Go blocks on the full sem -> worker A can't return -> hang. Existing tests don't catch it because the graphs are small enough that workers find leaves and return before N concurrent enqueues happen.
The fundamental issue: SetLimit bounds goroutines, but what we actually need to bound is concurrent API calls.
| if _, exists := checkedGroups[group.Email]; exists { | ||
| continue | ||
| } | ||
| if !fetchTransitiveGroupMembership || len(seeds) == 0 { |
There was a problem hiding this comment.
Sort is applied below for the transitive branch but not here - so the two paths return groups in different orders depending on FetchTransitiveGroupMembership. Move sort.Strings(userGroups) above this early-return so both branches always return alphabetically sorted output.
| "groups_0@dexidp.com": {}, | ||
| } | ||
| callCounter = make(map[string]int) | ||
| callCounterMu sync.Mutex |
There was a problem hiding this comment.
Package-level globals here will silently break the day someone adds t.Parallel() to these tests - counts will leak between tests, mutex or not. Not blocking this PR, but worth a follow-up: have testSetup() allocate and return its own counter, pass it through.
| groupEmails := []string{} | ||
| groupsList := &admin.Groups{} | ||
| for { | ||
| if err := ctx.Err(); err != nil { |
There was a problem hiding this comment.
NIT: redundant — Do() already respects the context attached via .Context(ctx), and any cancellation will surface as the Do() error on the next iteration. Can drop these three lines for readability. Not blocking.
Signed-off-by: Codrut Panea <codrut@flowx.ai>
84d941d to
ac189b6
Compare
|
I think i addressed all the feedback, thank you |
Overview
What this PR does / why we need it
Fetches nested (transitive) group membership in parallel (with a concurrency limit) instead of strictly sequential API calls, so large/deep group graphs complete faster.
In my tests the loading times went form 10-15 seconds down to 2-3 seconds. I tested the changes with 20 nested groups. And I'm using this fork for over a month with no problems.
Closes #3929
Special notes for your reviewer