Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ wire a backend):
via the instrumentation `Tracer` SPI and injects a W3C `traceparent` header.
- `WithMetrics(meter)` — installs a metrics policy recording request duration and
in-flight requests via the instrumentation `Meter` SPI.
- `WithTokenCache(cache)` — shares a bearer-token cache (an `auth.TokenCache`, in-memory
by default) across clients so a cached token is reused.
- `WithBasicAuth(username, password)` — authenticates requests with HTTP Basic auth (HTTPS-only).
- `WithAPIKey(header, key)` — sets an API-key header on every request (HTTPS-only).
- `WithRedactionAllowlist(params...)` — preserves the listed query-param values in
Expand Down
57 changes: 37 additions & 20 deletions auth/bearer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"errors"
"fmt"
"net/http"
"strings"
"sync"
"time"

Expand All @@ -24,23 +25,39 @@ const expiryWindow = 5 * time.Minute
var ErrInsecureTransport = errors.New("auth: refusing to send credentials over an insecure (non-HTTPS) connection")

// BearerTokenPolicy attaches an "Authorization: Bearer <token>" header to every
// request, fetching and caching the token from a [TokenCredential]. The cached
// token is refreshed once it is within five minutes of expiry.
// request, fetching the token from a [TokenCredential] and storing it in a
// [TokenCache]. The cached token is refreshed once it is within five minutes of
// expiry.
//
// The policy requires HTTPS and returns [ErrInsecureTransport] otherwise. It
// implements pipeline.Policy and is safe for concurrent use.
type BearerTokenPolicy struct {
cred TokenCredential
scopes []string
cred TokenCredential
scope []string
key string
cache TokenCache

mu sync.Mutex
cached AccessToken
mu sync.Mutex
}

// NewBearerTokenPolicy returns a policy that authenticates requests using cred
// for the given scopes.
// for the given scopes, caching the token in a private in-memory cache.
func NewBearerTokenPolicy(cred TokenCredential, scopes ...string) *BearerTokenPolicy {
return &BearerTokenPolicy{cred: cred, scopes: scopes}
return NewBearerTokenPolicyWithCache(cred, NewInMemoryTokenCache(), scopes...)
}

// NewBearerTokenPolicyWithCache is like [NewBearerTokenPolicy] but stores tokens
// in cache, which may be shared across policies so multiple clients reuse a
// cached token. Each policy serializes its own refresh; when the cached token is
// stale, two policies sharing a cache may refresh independently (last write wins).
// Cross-policy single-flight is not performed.
func NewBearerTokenPolicyWithCache(cred TokenCredential, cache TokenCache, scopes ...string) *BearerTokenPolicy {
return &BearerTokenPolicy{
cred: cred,
scope: scopes,
key: strings.Join(scopes, " "),
cache: cache,
}
}

// Do implements pipeline.Policy.
Expand All @@ -57,32 +74,32 @@ func (p *BearerTokenPolicy) Do(req *pipeline.Request) (*http.Response, error) {
return req.Next()
}

// token returns a cached token when fresh, otherwise acquires a new one. The
// lock is held across the credential call so concurrent requests share a single
// token returns a cached token when fresh, otherwise acquires a new one. The lock
// is held across the credential call so concurrent requests share a single
// refresh rather than stampeding the token endpoint.
func (p *BearerTokenPolicy) token(req *pipeline.Request) (string, error) {
p.mu.Lock()
defer p.mu.Unlock()

if p.fresh() {
return p.cached.Token, nil
if tok, ok := p.cache.Get(p.key); ok && fresh(tok) {
return tok.Token, nil
}
tok, err := p.cred.GetToken(req.Raw().Context(), TokenRequestOptions{Scopes: p.scopes})
tok, err := p.cred.GetToken(req.Raw().Context(), TokenRequestOptions{Scopes: p.scope})
if err != nil {
return "", fmt.Errorf("auth: acquire token: %w", err)
}
p.cached = tok
p.cache.Set(p.key, tok)
return tok.Token, nil
}

// fresh reports whether the cached token is present and not near expiry. A zero
// ExpiresOn means the token never expires.
func (p *BearerTokenPolicy) fresh() bool {
if p.cached.Token == "" {
// fresh reports whether tok is present and not near expiry. A zero ExpiresOn means
// the token never expires.
func fresh(tok AccessToken) bool {
if tok.Token == "" {
return false
}
if p.cached.ExpiresOn.IsZero() {
if tok.ExpiresOn.IsZero() {
return true
}
return time.Until(p.cached.ExpiresOn) > expiryWindow
return time.Until(tok.ExpiresOn) > expiryWindow
}
57 changes: 57 additions & 0 deletions auth/bearer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"io"
"net/http"
"strings"
"sync"
"testing"
"time"

Expand All @@ -25,11 +26,15 @@ func (f transporterFunc) Do(req *http.Request) (*http.Response, error) { return
type countingCredential struct {
token string
exp time.Time

mu sync.Mutex
calls int
}

func (c *countingCredential) GetToken(context.Context, auth.TokenRequestOptions) (auth.AccessToken, error) {
c.mu.Lock()
c.calls++
c.mu.Unlock()
return auth.AccessToken{Token: c.token, ExpiresOn: c.exp}, nil
}

Expand Down Expand Up @@ -96,8 +101,60 @@ func TestBearerPropagatesCredentialError(t *testing.T) {
}
}

func TestBearerSharedCacheReusesToken(t *testing.T) {
t.Parallel()

cred := &countingCredential{token: "tok", exp: time.Now().Add(time.Hour)}
cache := auth.NewInMemoryTokenCache()

run := func(p *auth.BearerTokenPolicy) {
transport := transporterFunc(func(req *http.Request) (*http.Response, error) {
return &http.Response{StatusCode: 200, Body: io.NopCloser(strings.NewReader("")), Request: req}, nil
})
pl := pipeline.New(transport, p)
req, _ := http.NewRequest(http.MethodGet, "https://api.example.test/", nil)
resp, err := pl.Do(req)
if err != nil {
t.Fatalf("Do: %v", err)
}
_ = resp.Body.Close()
}

run(auth.NewBearerTokenPolicyWithCache(cred, cache, "scope/.default"))
run(auth.NewBearerTokenPolicyWithCache(cred, cache, "scope/.default"))

if cred.calls != 1 {
t.Fatalf("GetToken calls = %d, want 1 (shared cache reuses the token)", cred.calls)
}
}

type errCredential struct{ err error }

func (e errCredential) GetToken(context.Context, auth.TokenRequestOptions) (auth.AccessToken, error) {
return auth.AccessToken{}, e.err
}

func TestBearerRefetchesNearExpiryToken(t *testing.T) {
t.Parallel()

// Token expires in one minute — inside the five-minute freshness window, so it
// is never considered fresh and must be re-fetched on every request.
cred := &countingCredential{token: "tok", exp: time.Now().Add(time.Minute)}
pl := pipeline.New(
transporterFunc(func(req *http.Request) (*http.Response, error) {
return &http.Response{StatusCode: 200, Body: io.NopCloser(strings.NewReader("")), Request: req}, nil
}),
auth.NewBearerTokenPolicy(cred, "scope"),
)
for range 2 {
req, _ := http.NewRequest(http.MethodGet, "https://api.example.test/", nil)
resp, err := pl.Do(req)
if err != nil {
t.Fatalf("Do: %v", err)
}
_ = resp.Body.Close()
}
if cred.calls != 2 {
t.Fatalf("GetToken calls = %d, want 2 (near-expiry token re-fetched each request)", cred.calls)
}
}
41 changes: 41 additions & 0 deletions auth/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright (c) 2026 dexpace and Omar Aljarrah.
// Licensed under the MIT License. See LICENSE in the repository root for details.

package auth

import "sync"

// TokenCache stores access tokens keyed by an opaque key (the SDK uses the
// space-joined scope set). Implementations must be safe for concurrent use.
type TokenCache interface {
// Get returns the cached token for key and whether one was present.
Get(key string) (AccessToken, bool)
// Set stores token under key.
Set(key string, token AccessToken)
}

// InMemoryTokenCache is a concurrency-safe in-memory [TokenCache].
type InMemoryTokenCache struct {
mu sync.Mutex
tokens map[string]AccessToken
}

// NewInMemoryTokenCache returns an empty in-memory cache.
func NewInMemoryTokenCache() *InMemoryTokenCache {
return &InMemoryTokenCache{tokens: make(map[string]AccessToken)}
}

// Get implements [TokenCache].
func (c *InMemoryTokenCache) Get(key string) (AccessToken, bool) {
c.mu.Lock()
defer c.mu.Unlock()
t, ok := c.tokens[key]
return t, ok
}

// Set implements [TokenCache].
func (c *InMemoryTokenCache) Set(key string, token AccessToken) {
c.mu.Lock()
defer c.mu.Unlock()
c.tokens[key] = token
}
59 changes: 59 additions & 0 deletions auth/cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright (c) 2026 dexpace and Omar Aljarrah.
// Licensed under the MIT License. See LICENSE in the repository root for details.

package auth_test

import (
"sync"
"testing"

"github.com/dexpace/go-sdk/auth"
)

func TestInMemoryTokenCacheGetSet(t *testing.T) {
t.Parallel()

c := auth.NewInMemoryTokenCache()
if _, ok := c.Get("k"); ok {
t.Fatal("missing key should report not found")
}
c.Set("k", auth.AccessToken{Token: "t"})
got, ok := c.Get("k")
if !ok || got.Token != "t" {
t.Fatalf("Get = (%v, %v), want token t / true", got, ok)
}
}

func TestInMemoryTokenCacheConcurrent(t *testing.T) {
t.Parallel()

c := auth.NewInMemoryTokenCache()
var wg sync.WaitGroup
for i := 0; i < 50; i++ {
wg.Add(1)
go func() {
defer wg.Done()
c.Set("k", auth.AccessToken{Token: "t"})
_, _ = c.Get("k")
}()
}
wg.Wait()
}

func TestInMemoryTokenCacheKeysAreIsolated(t *testing.T) {
t.Parallel()

c := auth.NewInMemoryTokenCache()
c.Set("a", auth.AccessToken{Token: "ta"})
c.Set("a b", auth.AccessToken{Token: "tab"})

if got, ok := c.Get("a"); !ok || got.Token != "ta" {
t.Fatalf("Get(\"a\") = (%v, %v), want ta/true", got, ok)
}
if got, ok := c.Get("a b"); !ok || got.Token != "tab" {
t.Fatalf("Get(\"a b\") = (%v, %v), want tab/true", got, ok)
}
if _, ok := c.Get("b"); ok {
t.Fatal("Get(\"b\") should be a miss (distinct key)")
}
}
6 changes: 5 additions & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,12 @@ func New(opts ...Option) *Client {
}
switch {
case cfg.credential != nil:
cache := cfg.tokenCache
if cache == nil {
cache = auth.NewInMemoryTokenCache()
}
placements = append(placements,
pipeline.At(pipeline.StageAuth, auth.NewBearerTokenPolicy(cfg.credential, cfg.scopes...)))
pipeline.At(pipeline.StageAuth, auth.NewBearerTokenPolicyWithCache(cfg.credential, cache, cfg.scopes...)))
case cfg.basicAuth != nil:
placements = append(placements,
pipeline.At(pipeline.StageAuth, auth.NewBasicAuthPolicy(*cfg.basicAuth)))
Expand Down
33 changes: 33 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -697,6 +697,39 @@ func TestAuthPrecedenceBearerBeatsBasic(t *testing.T) {
}
}

type countingCred struct{ calls int }

func (c *countingCred) GetToken(context.Context, auth.TokenRequestOptions) (auth.AccessToken, error) {
c.calls++
return auth.AccessToken{Token: "t", ExpiresOn: time.Now().Add(time.Hour)}, nil
}

func TestWithTokenCacheSharedAcrossClients(t *testing.T) {
t.Parallel()

cred := &countingCred{}
cache := auth.NewInMemoryTokenCache()

for range 2 {
var captured *http.Request
c := dexpace.New(
dexpace.WithTransport(captureTransport(&captured)),
dexpace.WithCredential(cred, "scope"),
dexpace.WithTokenCache(cache),
)
req, _ := http.NewRequest(http.MethodGet, "https://api.example.test/", nil)
resp, err := c.Do(req)
if err != nil {
t.Fatalf("Do: %v", err)
}
_ = resp.Body.Close()
}

if cred.calls != 1 {
t.Fatalf("GetToken calls = %d, want 1 (token cache shared across clients)", cred.calls)
}
}

func TestWithConfigAppliesHTTPTimeout(t *testing.T) {
t.Setenv("DEXPACE_HTTP_TIMEOUT", "30ms")

Expand Down
3 changes: 3 additions & 0 deletions doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
// Beyond bearer tokens (WithCredential), WithBasicAuth and WithAPIKey authenticate
// requests with HTTP Basic auth or an API-key header; both require HTTPS.
//
// WithTokenCache shares a bearer-token cache across clients (auth.TokenCache, with
// an in-memory default).
//
// WithConfig sources client defaults (User-Agent, retry settings, transport
// timeout) from the environment via the config package, for any setting not set
// explicitly.
Expand Down
Loading
Loading