Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions pkg/apk/apk/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,17 @@ func (f *flightCache[K, V]) Forget(key K) {
delete(f.cache, key)
}

// ForgetFunc removes all keys for which fn returns true.
func (f *flightCache[K, V]) ForgetFunc(fn func(K) bool) {
f.mux.Lock()
defer f.mux.Unlock()
for k := range f.cache {
if fn(k) {
delete(f.cache, k)
}
}
}

type Cache struct {
etagCache *sync.Map
headFlight *singleflight.Group
Expand Down
29 changes: 29 additions & 0 deletions pkg/apk/apk/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package apk

import (
"strings"
"sync"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -100,3 +101,31 @@ func TestFlightCacheCoalescesCalls(t *testing.T) {

require.EqualValues(t, 1, called.Load(), "Function should only be called once")
}

func TestFlightCacheForgetFunc(t *testing.T) {
s := newFlightCache[string, int]()

for k, v := range map[string]int{"a-1": 1, "a-2": 2, "b-1": 3} {
_, err := s.Do(k, func() (int, error) { return v, nil })
require.NoError(t, err)
}

// Forget all keys starting with "a-".
s.ForgetFunc(func(k string) bool {
return strings.HasPrefix(k, "a-")
})

// "a-*" keys should be evicted, so new values are computed.
r, err := s.Do("a-1", func() (int, error) { return 100, nil })
require.NoError(t, err)
require.Equal(t, 100, r)

r, err = s.Do("a-2", func() (int, error) { return 200, nil })
require.NoError(t, err)
require.Equal(t, 200, r)

// "b-1" should still be cached.
r, err = s.Do("b-1", func() (int, error) { return 999, nil })
require.NoError(t, err)
require.Equal(t, 3, r, "b-1 should still return the cached value")
}
95 changes: 31 additions & 64 deletions pkg/apk/apk/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,49 +55,21 @@ type Signature struct {
// We just hold the parsed index in memory rather than re-parsing it every time,
// which requires gunzipping, which is (somewhat) expensive.
var globalIndexCache = &indexCache{
modtimes: map[string]time.Time{},
urlToEtag: map[string]string{},
indexes: newFlightCache[cacheKey, NamedIndex](),
modtimes: map[string]time.Time{},
}

type indexResult struct {
idx NamedIndex
err error
type cacheKey struct {
url string
etag string // Only used for remote indexes.
}

type indexCache struct {
// For remote indexes.
onces sync.Map
urlToEtag map[string]string
etagMu sync.Mutex // guards urlToEtag
indexes *flightCache[cacheKey, NamedIndex]

// For local indexes.
sync.Mutex
modtimes map[string]time.Time

// etag|filename -> indexResult
indexes sync.Map
}

func (i *indexCache) forget(key string) {
i.onces.Delete(key)
i.indexes.Delete(key)
}

func (i *indexCache) store(key string, idx NamedIndex, err error) {
i.indexes.Store(key, indexResult{
idx: idx,
err: err,
})
}

func (i *indexCache) load(key string) (NamedIndex, error) {
v, ok := i.indexes.Load(key)
if !ok {
return nil, fmt.Errorf("indexCache did not see key %q after writing it", key)
}
result := v.(indexResult)

return result.idx, result.err
}

func (i *indexCache) get(ctx context.Context, repoName, repoURL string, keys map[string][]byte, arch string, opts *indexOpts) (NamedIndex, error) {
Expand Down Expand Up @@ -159,30 +131,21 @@ func (i *indexCache) get(ctx context.Context, repoName, repoURL string, keys map
return fetchAndParse(etag)
}

key := fmt.Sprintf("%s@%s", u, etag)

once, _ := i.onces.LoadOrStore(key, &sync.Once{})
once.(*sync.Once).Do(func() {
// If we've seen this URL before, delete any references to old indexes so we can GC them.
// Lock reads/writes to the map, without blocking the fetchAndParse goroutine.
i.etagMu.Lock()
prev, ok := i.urlToEtag[u]
if ok {
prevKey := fmt.Sprintf("%s@%s", u, prev)
i.forget(prevKey)
}
i.etagMu.Unlock()

idx, err := fetchAndParse(etag)
i.store(key, idx, err)
key := cacheKey{url: u, etag: etag}
idx, err := i.indexes.Do(key, func() (NamedIndex, error) {
return fetchAndParse(etag)
})

// Record the current etag for this URL so we can GC it later.
i.etagMu.Lock()
i.urlToEtag[u] = etag
i.etagMu.Unlock()
// Remove any stale entries with the same URL but a different etag.
// This races with concurrent callers that may have a different etag: a
// slower goroutine with an older etag could evict a newer entry. This is
// harmless — the next call will see the current etag via HEAD, re-fetch,
// and repopulate the cache.
i.indexes.ForgetFunc(func(k cacheKey) bool {
return k.url == u && k.etag != etag
})

return i.load(key)
return idx, err
} else {
i.Lock()
defer i.Unlock()
Expand All @@ -193,24 +156,28 @@ func (i *indexCache) get(ctx context.Context, repoName, repoURL string, keys map
return nil, fmt.Errorf("stat: %w", err)
}

key := cacheKey{url: u}

// Evict the cached entry if the file has changed since we last saw it.
// On first access, ok is false and Do below handles the cache miss.
mod := stat.ModTime()
before, ok := i.modtimes[u]
if !ok || mod.After(before) {
if ok && mod.After(before) {
i.indexes.Forget(key)
}
i.modtimes[u] = mod

return i.indexes.Do(key, func() (NamedIndex, error) {
b, err := os.ReadFile(u)
if err != nil {
return nil, fmt.Errorf("reading file: %w", err)
}
// If this is the first time or it has changed since the last time...
idx, err := parseRepositoryIndex(ctx, u, keys, arch, b, opts)
if err != nil {
i.store(u, nil, err)
} else {
i.store(u, NewNamedRepositoryWithIndex(repoName, repoRef.WithIndex(idx)), nil)
return nil, err
}
i.modtimes[u] = mod
}

return i.load(u)
return NewNamedRepositoryWithIndex(repoName, repoRef.WithIndex(idx)), nil
})
}
}

Expand Down
Loading