diff --git a/pkg/apk/apk/cache.go b/pkg/apk/apk/cache.go index c25ac84ea..c2091788c 100644 --- a/pkg/apk/apk/cache.go +++ b/pkg/apk/apk/cache.go @@ -82,6 +82,17 @@ func (f *flightCache[K, V]) Forget(key K) { delete(f.cache, key) } +// ForgetFunc removes all keys for which fn returns true. +func (f *flightCache[K, V]) ForgetFunc(fn func(K) bool) { + f.mux.Lock() + defer f.mux.Unlock() + for k := range f.cache { + if fn(k) { + delete(f.cache, k) + } + } +} + type Cache struct { etagCache *sync.Map headFlight *singleflight.Group diff --git a/pkg/apk/apk/cache_test.go b/pkg/apk/apk/cache_test.go index 135c32a30..f4d5661ae 100644 --- a/pkg/apk/apk/cache_test.go +++ b/pkg/apk/apk/cache_test.go @@ -15,6 +15,7 @@ package apk import ( + "strings" "sync" "sync/atomic" "testing" @@ -100,3 +101,31 @@ func TestFlightCacheCoalescesCalls(t *testing.T) { require.EqualValues(t, 1, called.Load(), "Function should only be called once") } + +func TestFlightCacheForgetFunc(t *testing.T) { + s := newFlightCache[string, int]() + + for k, v := range map[string]int{"a-1": 1, "a-2": 2, "b-1": 3} { + _, err := s.Do(k, func() (int, error) { return v, nil }) + require.NoError(t, err) + } + + // Forget all keys starting with "a-". + s.ForgetFunc(func(k string) bool { + return strings.HasPrefix(k, "a-") + }) + + // "a-*" keys should be evicted, so new values are computed. + r, err := s.Do("a-1", func() (int, error) { return 100, nil }) + require.NoError(t, err) + require.Equal(t, 100, r) + + r, err = s.Do("a-2", func() (int, error) { return 200, nil }) + require.NoError(t, err) + require.Equal(t, 200, r) + + // "b-1" should still be cached. + r, err = s.Do("b-1", func() (int, error) { return 999, nil }) + require.NoError(t, err) + require.Equal(t, 3, r, "b-1 should still return the cached value") +} diff --git a/pkg/apk/apk/index.go b/pkg/apk/apk/index.go index ccdc806ac..c68f1bd51 100644 --- a/pkg/apk/apk/index.go +++ b/pkg/apk/apk/index.go @@ -55,49 +55,21 @@ type Signature struct { // We just hold the parsed index in memory rather than re-parsing it every time, // which requires gunzipping, which is (somewhat) expensive. var globalIndexCache = &indexCache{ - modtimes: map[string]time.Time{}, - urlToEtag: map[string]string{}, + indexes: newFlightCache[cacheKey, NamedIndex](), + modtimes: map[string]time.Time{}, } -type indexResult struct { - idx NamedIndex - err error +type cacheKey struct { + url string + etag string // Only used for remote indexes. } type indexCache struct { - // For remote indexes. - onces sync.Map - urlToEtag map[string]string - etagMu sync.Mutex // guards urlToEtag + indexes *flightCache[cacheKey, NamedIndex] // For local indexes. sync.Mutex modtimes map[string]time.Time - - // etag|filename -> indexResult - indexes sync.Map -} - -func (i *indexCache) forget(key string) { - i.onces.Delete(key) - i.indexes.Delete(key) -} - -func (i *indexCache) store(key string, idx NamedIndex, err error) { - i.indexes.Store(key, indexResult{ - idx: idx, - err: err, - }) -} - -func (i *indexCache) load(key string) (NamedIndex, error) { - v, ok := i.indexes.Load(key) - if !ok { - return nil, fmt.Errorf("indexCache did not see key %q after writing it", key) - } - result := v.(indexResult) - - return result.idx, result.err } func (i *indexCache) get(ctx context.Context, repoName, repoURL string, keys map[string][]byte, arch string, opts *indexOpts) (NamedIndex, error) { @@ -159,30 +131,21 @@ func (i *indexCache) get(ctx context.Context, repoName, repoURL string, keys map return fetchAndParse(etag) } - key := fmt.Sprintf("%s@%s", u, etag) - - once, _ := i.onces.LoadOrStore(key, &sync.Once{}) - once.(*sync.Once).Do(func() { - // If we've seen this URL before, delete any references to old indexes so we can GC them. - // Lock reads/writes to the map, without blocking the fetchAndParse goroutine. - i.etagMu.Lock() - prev, ok := i.urlToEtag[u] - if ok { - prevKey := fmt.Sprintf("%s@%s", u, prev) - i.forget(prevKey) - } - i.etagMu.Unlock() - - idx, err := fetchAndParse(etag) - i.store(key, idx, err) + key := cacheKey{url: u, etag: etag} + idx, err := i.indexes.Do(key, func() (NamedIndex, error) { + return fetchAndParse(etag) + }) - // Record the current etag for this URL so we can GC it later. - i.etagMu.Lock() - i.urlToEtag[u] = etag - i.etagMu.Unlock() + // Remove any stale entries with the same URL but a different etag. + // This races with concurrent callers that may have a different etag: a + // slower goroutine with an older etag could evict a newer entry. This is + // harmless — the next call will see the current etag via HEAD, re-fetch, + // and repopulate the cache. + i.indexes.ForgetFunc(func(k cacheKey) bool { + return k.url == u && k.etag != etag }) - return i.load(key) + return idx, err } else { i.Lock() defer i.Unlock() @@ -193,24 +156,28 @@ func (i *indexCache) get(ctx context.Context, repoName, repoURL string, keys map return nil, fmt.Errorf("stat: %w", err) } + key := cacheKey{url: u} + + // Evict the cached entry if the file has changed since we last saw it. + // On first access, ok is false and Do below handles the cache miss. mod := stat.ModTime() before, ok := i.modtimes[u] - if !ok || mod.After(before) { + if ok && mod.After(before) { + i.indexes.Forget(key) + } + i.modtimes[u] = mod + + return i.indexes.Do(key, func() (NamedIndex, error) { b, err := os.ReadFile(u) if err != nil { return nil, fmt.Errorf("reading file: %w", err) } - // If this is the first time or it has changed since the last time... idx, err := parseRepositoryIndex(ctx, u, keys, arch, b, opts) if err != nil { - i.store(u, nil, err) - } else { - i.store(u, NewNamedRepositoryWithIndex(repoName, repoRef.WithIndex(idx)), nil) + return nil, err } - i.modtimes[u] = mod - } - - return i.load(u) + return NewNamedRepositoryWithIndex(repoName, repoRef.WithIndex(idx)), nil + }) } }