From bb79743f9895a2a62d63cd899d83739bde7cef0e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20Th=C3=B6mmes?= Date: Tue, 14 Apr 2026 13:27:28 +0200 Subject: [PATCH] Use flightCache for index cache implementation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Currently, there's a race between store, load and forget if indexes get looked up concurrently and change mid-way through the process. The r ace goes like this: 1. A: HEAD request → gets etag1, key = "url@etag1" 2. A: Once.Do runs: fetches, parses, calls store("url@etag1", idx), then sets urlToEtag[url] = etag1 3. B: HEAD request → gets etag2 (index was updated server-side), key = "url@etag2" 4. B: Once.Do runs: sees prev = etag1, calls forget("url@etag1") — which deletes indexes["url@etag1"] 5. A: calls load("url@etag1") → key not found → consistency error Using the inFlight cache fixes that as values are not stored and loaded separately. Separately, there's another race where we'd leak indexes with outdated etags if two HEAD requests would race outside of the critical section of adjusting the etag pointers. That's fixed through forgetting all "old" etag values for a given index URL as well. --- pkg/apk/apk/cache.go | 11 +++++ pkg/apk/apk/cache_test.go | 29 ++++++++++++ pkg/apk/apk/index.go | 95 +++++++++++++-------------------------- 3 files changed, 71 insertions(+), 64 deletions(-) diff --git a/pkg/apk/apk/cache.go b/pkg/apk/apk/cache.go index c25ac84ea..c2091788c 100644 --- a/pkg/apk/apk/cache.go +++ b/pkg/apk/apk/cache.go @@ -82,6 +82,17 @@ func (f *flightCache[K, V]) Forget(key K) { delete(f.cache, key) } +// ForgetFunc removes all keys for which fn returns true. +func (f *flightCache[K, V]) ForgetFunc(fn func(K) bool) { + f.mux.Lock() + defer f.mux.Unlock() + for k := range f.cache { + if fn(k) { + delete(f.cache, k) + } + } +} + type Cache struct { etagCache *sync.Map headFlight *singleflight.Group diff --git a/pkg/apk/apk/cache_test.go b/pkg/apk/apk/cache_test.go index 135c32a30..f4d5661ae 100644 --- a/pkg/apk/apk/cache_test.go +++ b/pkg/apk/apk/cache_test.go @@ -15,6 +15,7 @@ package apk import ( + "strings" "sync" "sync/atomic" "testing" @@ -100,3 +101,31 @@ func TestFlightCacheCoalescesCalls(t *testing.T) { require.EqualValues(t, 1, called.Load(), "Function should only be called once") } + +func TestFlightCacheForgetFunc(t *testing.T) { + s := newFlightCache[string, int]() + + for k, v := range map[string]int{"a-1": 1, "a-2": 2, "b-1": 3} { + _, err := s.Do(k, func() (int, error) { return v, nil }) + require.NoError(t, err) + } + + // Forget all keys starting with "a-". + s.ForgetFunc(func(k string) bool { + return strings.HasPrefix(k, "a-") + }) + + // "a-*" keys should be evicted, so new values are computed. + r, err := s.Do("a-1", func() (int, error) { return 100, nil }) + require.NoError(t, err) + require.Equal(t, 100, r) + + r, err = s.Do("a-2", func() (int, error) { return 200, nil }) + require.NoError(t, err) + require.Equal(t, 200, r) + + // "b-1" should still be cached. + r, err = s.Do("b-1", func() (int, error) { return 999, nil }) + require.NoError(t, err) + require.Equal(t, 3, r, "b-1 should still return the cached value") +} diff --git a/pkg/apk/apk/index.go b/pkg/apk/apk/index.go index ccdc806ac..c68f1bd51 100644 --- a/pkg/apk/apk/index.go +++ b/pkg/apk/apk/index.go @@ -55,49 +55,21 @@ type Signature struct { // We just hold the parsed index in memory rather than re-parsing it every time, // which requires gunzipping, which is (somewhat) expensive. var globalIndexCache = &indexCache{ - modtimes: map[string]time.Time{}, - urlToEtag: map[string]string{}, + indexes: newFlightCache[cacheKey, NamedIndex](), + modtimes: map[string]time.Time{}, } -type indexResult struct { - idx NamedIndex - err error +type cacheKey struct { + url string + etag string // Only used for remote indexes. } type indexCache struct { - // For remote indexes. - onces sync.Map - urlToEtag map[string]string - etagMu sync.Mutex // guards urlToEtag + indexes *flightCache[cacheKey, NamedIndex] // For local indexes. sync.Mutex modtimes map[string]time.Time - - // etag|filename -> indexResult - indexes sync.Map -} - -func (i *indexCache) forget(key string) { - i.onces.Delete(key) - i.indexes.Delete(key) -} - -func (i *indexCache) store(key string, idx NamedIndex, err error) { - i.indexes.Store(key, indexResult{ - idx: idx, - err: err, - }) -} - -func (i *indexCache) load(key string) (NamedIndex, error) { - v, ok := i.indexes.Load(key) - if !ok { - return nil, fmt.Errorf("indexCache did not see key %q after writing it", key) - } - result := v.(indexResult) - - return result.idx, result.err } func (i *indexCache) get(ctx context.Context, repoName, repoURL string, keys map[string][]byte, arch string, opts *indexOpts) (NamedIndex, error) { @@ -159,30 +131,21 @@ func (i *indexCache) get(ctx context.Context, repoName, repoURL string, keys map return fetchAndParse(etag) } - key := fmt.Sprintf("%s@%s", u, etag) - - once, _ := i.onces.LoadOrStore(key, &sync.Once{}) - once.(*sync.Once).Do(func() { - // If we've seen this URL before, delete any references to old indexes so we can GC them. - // Lock reads/writes to the map, without blocking the fetchAndParse goroutine. - i.etagMu.Lock() - prev, ok := i.urlToEtag[u] - if ok { - prevKey := fmt.Sprintf("%s@%s", u, prev) - i.forget(prevKey) - } - i.etagMu.Unlock() - - idx, err := fetchAndParse(etag) - i.store(key, idx, err) + key := cacheKey{url: u, etag: etag} + idx, err := i.indexes.Do(key, func() (NamedIndex, error) { + return fetchAndParse(etag) + }) - // Record the current etag for this URL so we can GC it later. - i.etagMu.Lock() - i.urlToEtag[u] = etag - i.etagMu.Unlock() + // Remove any stale entries with the same URL but a different etag. + // This races with concurrent callers that may have a different etag: a + // slower goroutine with an older etag could evict a newer entry. This is + // harmless — the next call will see the current etag via HEAD, re-fetch, + // and repopulate the cache. + i.indexes.ForgetFunc(func(k cacheKey) bool { + return k.url == u && k.etag != etag }) - return i.load(key) + return idx, err } else { i.Lock() defer i.Unlock() @@ -193,24 +156,28 @@ func (i *indexCache) get(ctx context.Context, repoName, repoURL string, keys map return nil, fmt.Errorf("stat: %w", err) } + key := cacheKey{url: u} + + // Evict the cached entry if the file has changed since we last saw it. + // On first access, ok is false and Do below handles the cache miss. mod := stat.ModTime() before, ok := i.modtimes[u] - if !ok || mod.After(before) { + if ok && mod.After(before) { + i.indexes.Forget(key) + } + i.modtimes[u] = mod + + return i.indexes.Do(key, func() (NamedIndex, error) { b, err := os.ReadFile(u) if err != nil { return nil, fmt.Errorf("reading file: %w", err) } - // If this is the first time or it has changed since the last time... idx, err := parseRepositoryIndex(ctx, u, keys, arch, b, opts) if err != nil { - i.store(u, nil, err) - } else { - i.store(u, NewNamedRepositoryWithIndex(repoName, repoRef.WithIndex(idx)), nil) + return nil, err } - i.modtimes[u] = mod - } - - return i.load(u) + return NewNamedRepositoryWithIndex(repoName, repoRef.WithIndex(idx)), nil + }) } }