From b556ad562ae499311da1e7322dc3cb370f733835 Mon Sep 17 00:00:00 2001 From: Zach Smith Date: Fri, 6 Mar 2026 12:23:19 -0800 Subject: [PATCH] feat: add rate-limited retry and observability for per-project GC partition setup --- .../controller-manager/controllermanager.go | 17 +- cmd/milo/controller-manager/core.go | 4 +- .../garbagecollector/garbagecollector.go | 158 ++++-- .../garbagecollector_sync_test.go | 530 ++++++++++++++++++ .../garbagecollector/partition_metrics.go | 69 +++ .../projectprovider/metrics/metrics.go | 68 +++ .../controllers/projectprovider/provider.go | 120 +++- .../projectprovider/provider_test.go | 146 +++++ 8 files changed, 1048 insertions(+), 64 deletions(-) create mode 100644 internal/controllers/garbagecollector/garbagecollector_sync_test.go create mode 100644 internal/controllers/garbagecollector/partition_metrics.go create mode 100644 internal/controllers/projectprovider/metrics/metrics.go create mode 100644 internal/controllers/projectprovider/provider_test.go diff --git a/cmd/milo/controller-manager/controllermanager.go b/cmd/milo/controller-manager/controllermanager.go index 20793e0f..1ccffc90 100644 --- a/cmd/milo/controller-manager/controllermanager.go +++ b/cmd/milo/controller-manager/controllermanager.go @@ -98,6 +98,7 @@ import ( quotav1alpha1 "go.miloapis.com/milo/pkg/apis/quota/v1alpha1" resourcemanagerv1alpha1 "go.miloapis.com/milo/pkg/apis/resourcemanager/v1alpha1" miloprovider "go.miloapis.com/milo/pkg/multicluster-runtime/milo" + "go.miloapis.com/milo/internal/controllers/projectprovider" milowebhook "go.miloapis.com/milo/pkg/webhook" apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" ) @@ -304,6 +305,11 @@ func NewCommand() *cobra.Command { fs.IntVar(&s.ControllerRuntimeWebhookPort, "controller-runtime-webhook-port", 9443, "The port to use for the controller-runtime webhook server.") + fs.IntVar(&s.ProjectProvider.Workers, "project-provider-workers", s.ProjectProvider.Workers, "Number of concurrent workers that process project additions.") + fs.IntVar(&s.ProjectProvider.MaxRetries, "project-provider-max-retries", s.ProjectProvider.MaxRetries, "Maximum retry attempts for a failed project addition before giving up.") + fs.Float64Var(&s.ProjectProvider.RateLimit, "project-provider-rate-limit", s.ProjectProvider.RateLimit, "Sustained per-second rate at which projects are added.") + fs.IntVar(&s.ProjectProvider.RateBurst, "project-provider-rate-burst", s.ProjectProvider.RateBurst, "Burst allowance for the project addition rate limiter.") + fs.StringVar(&AssignableRolesNamespace, "assignable-roles-namespace", "datum-cloud", "An extra namespace that the system allows to be used for assignable roles.") s.InfraCluster.AddFlags(namedFlagSets.FlagSet("Infrastructure Cluster")) @@ -341,6 +347,10 @@ type Options struct { // The port to use for the controller-runtime webhook server. ControllerRuntimeWebhookPort int + + // ProjectProvider holds tunable parameters for the project provider's + // rate-limiting and retry behaviour during bulk project onboarding. + ProjectProvider projectprovider.Config } // NewOptions creates a new Options object with default values. @@ -355,7 +365,8 @@ func NewOptions() (*Options, error) { InfraCluster: &infracluster.Options{ KubeconfigFile: baseOpts.Generic.ClientConnection.Kubeconfig, }, - ControlPlane: &controlplane.Options{}, + ControlPlane: &controlplane.Options{}, + ProjectProvider: projectprovider.DefaultConfig(), } return opts, nil @@ -415,6 +426,7 @@ func Run(ctx context.Context, c *config.CompletedConfig, opts *Options) error { logger.Error(err, "Error building controller context") klog.FlushAndExit(klog.ExitFlushTimeout, 1) } + controllerContext.ProjectProviderConfig = opts.ProjectProvider // Create a controller manager for the core control plane. // @@ -953,6 +965,9 @@ type ControllerContext struct { // GraphBuilder gives an access to dependencyGraphBuilder which keeps tracks of resources in the cluster GraphBuilder *garbagecollector.GraphBuilder + + // ProjectProviderConfig carries tunable parameters for the project provider. + ProjectProviderConfig projectprovider.Config } // IsControllerEnabled checks if the context's controllers enabled or not diff --git a/cmd/milo/controller-manager/core.go b/cmd/milo/controller-manager/core.go index ee3775af..e6688aad 100644 --- a/cmd/milo/controller-manager/core.go +++ b/cmd/milo/controller-manager/core.go @@ -64,7 +64,7 @@ func startModifiedNamespaceController(ctx context.Context, controllerContext Con Finalizer: v1.FinalizerKubernetes, } - prov, err := projectprovider.New(nsKubeconfig, sink) + prov, err := projectprovider.New(nsKubeconfig, sink, controllerContext.ProjectProviderConfig) if err != nil { return nil, true, err } @@ -134,7 +134,7 @@ func startGarbageCollectorController(ctx context.Context, controllerContext Cont InformersStarted: controllerContext.InformersStarted, InitialSyncPeriod: 30 * time.Second, } - prov, err := projectprovider.New(cfg, gcSink) + prov, err := projectprovider.New(cfg, gcSink, controllerContext.ProjectProviderConfig) if err != nil { return nil, true, fmt.Errorf("failed to start project provider for GC: %w", err) } diff --git a/internal/controllers/garbagecollector/garbagecollector.go b/internal/controllers/garbagecollector/garbagecollector.go index 5de17e25..2207fd5f 100644 --- a/internal/controllers/garbagecollector/garbagecollector.go +++ b/internal/controllers/garbagecollector/garbagecollector.go @@ -77,6 +77,12 @@ type GarbageCollector struct { dependencyGraphBuilders []*GraphBuilder cancels map[string]context.CancelFunc + + // resyncNeeded is set when a new per-project builder is added via + // AddProject. It forces the next Sync tick to resync all builders even + // if root discovery reports the same resource set, covering the case + // where the new builder was seeded with incomplete monitors. + resyncNeeded bool } var _ controller.Interface = (*GarbageCollector)(nil) @@ -93,17 +99,22 @@ func (gc *GarbageCollector) AddProject( discover discovery.ServerResourcesInterface, initialSyncTimeout time.Duration, ) error { - // ensure maps gc.mu.Lock() if gc.cancels == nil { gc.cancels = make(map[string]context.CancelFunc) } + // Skip projects that are already registered (idempotent for retries). + if _, exists := gc.cancels[project]; exists { + gc.mu.Unlock() + return nil + } gc.mu.Unlock() + logger := klog.FromContext(parent) + // Reuse shared queues/cache from GC (created from the root GB). atd, ato, absent := gc.attemptToDelete, gc.attemptToOrphan, gc.absentOwnerCache - // Build per-partition GraphBuilder using shared plumbing + shared broadcaster. gb := NewDependencyGraphBuilderWithShared( parent, md, @@ -114,29 +125,38 @@ func (gc *GarbageCollector) AddProject( atd, ato, absent, - gc.eventBroadcaster, // share events across partitions + gc.eventBroadcaster, ) gb.SetProject(project) - // Track and start + // Seed monitors BEFORE registering the builder. If discovery or monitor + // creation fails (e.g. apiserver throttling during startup burst), the + // builder is never registered and the caller can retry cleanly. + newResources, err := GetDeletableResources(logger, discover) + if err != nil { + logger.V(2).Info("GC: partial discovery for project", "project", project, "error", err) + } + if len(newResources) == 0 { + return fmt.Errorf("gc(%s): discovery returned no resources", project) + } + if err := gb.syncMonitors(logger, newResources); err != nil { + return fmt.Errorf("gc(%s): syncMonitors: %w", project, err) + } + + // Monitors created successfully — register the builder and start it. ctx, cancel := context.WithCancel(parent) + gc.mu.Lock() gc.dependencyGraphBuilders = append(gc.dependencyGraphBuilders, gb) gc.cancels[project] = cancel + gc.resyncNeeded = true gc.mu.Unlock() - go gb.Run(ctx) + partitionCount.Inc() + partitionMonitorCount.WithLabelValues(project).Set(float64(monitorCountForBuilder(gb))) - // Seed monitors for this partition immediately - logger := klog.FromContext(parent) - newResources, _ := GetDeletableResources(logger, discover) - if err := gb.syncMonitors(logger, newResources); err != nil { - cancel() - return fmt.Errorf("gc(%s): syncMonitors: %w", project, err) - } - gb.startMonitors(logger) + go gb.Run(ctx) - // Optional: wait briefly for first sync to reduce initial GC latency ok := cache.WaitForNamedCacheSync( "gc-"+project, waitForStopOrTimeout(ctx.Done(), initialSyncTimeout), @@ -146,6 +166,8 @@ func (gc *GarbageCollector) AddProject( logger.Info("GC: partition monitors not fully synced; continuing", "project", project) } + partitionSynced.WithLabelValues(project).Set(boolToFloat(ok)) + return nil } @@ -155,7 +177,6 @@ func (gc *GarbageCollector) RemoveProject(project string) { cancel() delete(gc.cancels, project) } - // drop from slice dst := gc.dependencyGraphBuilders[:0] for _, gb := range gc.dependencyGraphBuilders { if gb.project != project { @@ -164,6 +185,10 @@ func (gc *GarbageCollector) RemoveProject(project string) { } gc.dependencyGraphBuilders = dst gc.mu.Unlock() + + partitionCount.Dec() + partitionMonitorCount.Delete(map[string]string{"project": project}) + partitionSynced.Delete(map[string]string{"project": project}) } // NewGarbageCollector creates a new GarbageCollector. @@ -217,6 +242,7 @@ func NewComposedGarbageCollectorMulti( } metrics.Register() + registerPartitionMetricsOnce() return gc, nil } @@ -234,6 +260,9 @@ func (gc *GarbageCollector) resyncMonitors( return err } gb.startMonitors(logger) + if gb.project != "" { + partitionMonitorCount.WithLabelValues(gb.project).Set(float64(monitorCountForBuilder(gb))) + } } return nil } @@ -321,48 +350,75 @@ func (gc *GarbageCollector) Sync(ctx context.Context, discoveryClient discovery. oldResources := make(map[schema.GroupVersionResource]struct{}) wait.UntilWithContext(ctx, func(ctx context.Context) { - logger := klog.FromContext(ctx) - - // 1) Discover deletable resources - newResources, err := GetDeletableResources(logger, discoveryClient) - if len(newResources) == 0 { - logger.V(2).Info("no resources reported by discovery, skipping garbage collector sync") - metrics.GarbageCollectorResourcesSyncError.Inc() - return + newOld, ok := gc.syncOnce(ctx, discoveryClient, oldResources, period) + if ok { + oldResources = newOld } + }, period) +} - // 2) Handle partial discovery: keep already-synced monitors for failed groups - if groupLookupFailures, isLookupFailure := discovery.GroupDiscoveryFailedErrorGroups(err); isLookupFailure { - for k, v := range oldResources { - if _, failed := groupLookupFailures[k.GroupVersion()]; failed && gc.anyBuilderResourceSynced(k) { - newResources[k] = v - } +// syncOnce performs a single sync cycle. It returns the resource set to +// remember and true when a resync was performed successfully. If the tick +// was skipped or the resync failed, it returns nil/false so the caller +// retains the previous oldResources. +func (gc *GarbageCollector) syncOnce( + ctx context.Context, + discoveryClient discovery.ServerResourcesInterface, + oldResources map[schema.GroupVersionResource]struct{}, + waitPeriod time.Duration, +) (map[schema.GroupVersionResource]struct{}, bool) { + logger := klog.FromContext(ctx) + + // 1) Discover deletable resources + newResources, err := GetDeletableResources(logger, discoveryClient) + if len(newResources) == 0 { + logger.V(2).Info("no resources reported by discovery, skipping garbage collector sync") + metrics.GarbageCollectorResourcesSyncError.Inc() + return nil, false + } + + // 2) Handle partial discovery: keep already-synced monitors for failed groups + if groupLookupFailures, isLookupFailure := discovery.GroupDiscoveryFailedErrorGroups(err); isLookupFailure { + for k, v := range oldResources { + if _, failed := groupLookupFailures[k.GroupVersion()]; failed && gc.anyBuilderResourceSynced(k) { + newResources[k] = v } } + } - // 3) Short-circuit if nothing changed - if reflect.DeepEqual(oldResources, newResources) { - logger.V(5).Info("no resource updates from discovery, skipping garbage collector sync") - return - } + // 3) Short-circuit if nothing changed AND no new builders need resyncing + gc.mu.RLock() + forceResync := gc.resyncNeeded + gc.mu.RUnlock() - logger.V(2).Info("syncing garbage collector with updated resources from discovery", - "diff", printDiff(oldResources, newResources)) + if !forceResync && reflect.DeepEqual(oldResources, newResources) { + logger.V(5).Info("no resource updates from discovery, skipping garbage collector sync") + return nil, false + } - // 4) Reset REST mapper (invalidates its underlying discovery cache) - gc.restMapper.Reset() - logger.V(4).Info("reset restmapper") + logger.V(2).Info("syncing garbage collector with updated resources from discovery", + "diff", printDiff(oldResources, newResources), + "forceResync", forceResync) - // 5) Resync monitors across ALL builders - if err := gc.resyncMonitors(logger, newResources); err != nil { - utilruntime.HandleError(fmt.Errorf("failed to sync resource monitors: %w", err)) - metrics.GarbageCollectorResourcesSyncError.Inc() - return - } - logger.V(4).Info("resynced monitors") + // 4) Reset REST mapper (invalidates its underlying discovery cache) + gc.restMapper.Reset() + logger.V(4).Info("reset restmapper") + + // 5) Resync monitors across ALL builders + if err := gc.resyncMonitors(logger, newResources); err != nil { + utilruntime.HandleError(fmt.Errorf("failed to sync resource monitors: %w", err)) + metrics.GarbageCollectorResourcesSyncError.Inc() + return nil, false + } + logger.V(4).Info("resynced monitors") - // 6) Periodically check that ALL builders report cache synced (for logs/metrics) - cacheSynced := cache.WaitForNamedCacheSync("garbage collector", waitForStopOrTimeout(ctx.Done(), period), func() bool { + gc.mu.Lock() + gc.resyncNeeded = false + gc.mu.Unlock() + + // 6) Periodically check that ALL builders report cache synced (for logs/metrics) + if waitPeriod > 0 { + cacheSynced := cache.WaitForNamedCacheSync("garbage collector", waitForStopOrTimeout(ctx.Done(), waitPeriod), func() bool { for _, gb := range gc.dependencyGraphBuilders { if !gb.IsSynced(logger) { return false @@ -376,10 +432,10 @@ func (gc *GarbageCollector) Sync(ctx context.Context, discoveryClient discovery. utilruntime.HandleError(fmt.Errorf("timed out waiting for dependency graph builder sync during GC sync")) metrics.GarbageCollectorResourcesSyncError.Inc() } + } - // 7) Remember current resource set - oldResources = newResources - }, period) + // 7) Remember current resource set + return newResources, true } // printDiff returns a human-readable summary of what resources were added and removed diff --git a/internal/controllers/garbagecollector/garbagecollector_sync_test.go b/internal/controllers/garbagecollector/garbagecollector_sync_test.go new file mode 100644 index 00000000..83aec3c0 --- /dev/null +++ b/internal/controllers/garbagecollector/garbagecollector_sync_test.go @@ -0,0 +1,530 @@ +package garbagecollector + +import ( + "context" + "fmt" + "sync" + "testing" + "time" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/informers" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/workqueue" + "k8s.io/controller-manager/pkg/informerfactory" +) + +// --------------------------------------------------------------------------- +// Test resources used across test cases +// --------------------------------------------------------------------------- + +var ( + gvrPods = schema.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"} + gvrServices = schema.GroupVersionResource{Group: "", Version: "v1", Resource: "services"} + gvrDeployments = schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"} + gvrSecrets = schema.GroupVersionResource{Group: "", Version: "v1", Resource: "secrets"} + gvrConfigMaps = schema.GroupVersionResource{Group: "", Version: "v1", Resource: "configmaps"} + + allResources = map[schema.GroupVersionResource]struct{}{ + gvrPods: {}, + gvrServices: {}, + gvrDeployments: {}, + gvrSecrets: {}, + gvrConfigMaps: {}, + } + + gvkPods = schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Pod"} + gvkServices = schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Service"} + gvkDeployments = schema.GroupVersionKind{Group: "apps", Version: "v1", Kind: "Deployment"} + gvkSecrets = schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Secret"} + gvkConfigMaps = schema.GroupVersionKind{Group: "", Version: "v1", Kind: "ConfigMap"} + + gvrToGVK = map[schema.GroupVersionResource]schema.GroupVersionKind{ + gvrPods: gvkPods, + gvrServices: gvkServices, + gvrDeployments: gvkDeployments, + gvrSecrets: gvkSecrets, + gvrConfigMaps: gvkConfigMaps, + } +) + +// --------------------------------------------------------------------------- +// Fakes +// --------------------------------------------------------------------------- + +// fakeServerResources implements discovery.ServerResourcesInterface. +type fakeServerResources struct { + resources []*metav1.APIResourceList + err error +} + +func (f *fakeServerResources) ServerResourcesForGroupVersion(string) (*metav1.APIResourceList, error) { + return nil, nil +} +func (f *fakeServerResources) ServerGroupsAndResources() ([]*metav1.APIGroup, []*metav1.APIResourceList, error) { + return nil, f.resources, f.err +} +func (f *fakeServerResources) ServerPreferredResources() ([]*metav1.APIResourceList, error) { + return f.resources, f.err +} +func (f *fakeServerResources) ServerPreferredNamespacedResources() ([]*metav1.APIResourceList, error) { + return f.resources, f.err +} + +func discoveryForResources(resources map[schema.GroupVersionResource]struct{}) *fakeServerResources { + byGV := map[schema.GroupVersion]*metav1.APIResourceList{} + for gvr := range resources { + gv := gvr.GroupVersion() + rl, ok := byGV[gv] + if !ok { + rl = &metav1.APIResourceList{GroupVersion: gv.String()} + byGV[gv] = rl + } + rl.APIResources = append(rl.APIResources, metav1.APIResource{ + Name: gvr.Resource, + Verbs: metav1.Verbs{"delete", "list", "watch", "get"}, + Group: gvr.Group, + Version: gvr.Version, + }) + } + var lists []*metav1.APIResourceList + for _, rl := range byGV { + lists = append(lists, rl) + } + return &fakeServerResources{resources: lists} +} + +// fakeResettableRESTMapper satisfies meta.ResettableRESTMapper using a static +// GVR→GVK map. Only KindFor and Reset are needed by the GC sync path. +type fakeResettableRESTMapper struct { + kinds map[schema.GroupVersionResource]schema.GroupVersionKind +} + +func (f *fakeResettableRESTMapper) Reset() {} +func (f *fakeResettableRESTMapper) KindFor(r schema.GroupVersionResource) (schema.GroupVersionKind, error) { + if gvk, ok := f.kinds[r]; ok { + return gvk, nil + } + return schema.GroupVersionKind{}, fmt.Errorf("no mapping for %v", r) +} +func (f *fakeResettableRESTMapper) KindsFor(r schema.GroupVersionResource) ([]schema.GroupVersionKind, error) { + gvk, err := f.KindFor(r) + if err != nil { + return nil, err + } + return []schema.GroupVersionKind{gvk}, nil +} +func (f *fakeResettableRESTMapper) ResourceFor(schema.GroupVersionResource) (schema.GroupVersionResource, error) { + return schema.GroupVersionResource{}, fmt.Errorf("not implemented") +} +func (f *fakeResettableRESTMapper) ResourcesFor(schema.GroupVersionResource) ([]schema.GroupVersionResource, error) { + return nil, fmt.Errorf("not implemented") +} +func (f *fakeResettableRESTMapper) RESTMapping(schema.GroupKind, ...string) (*meta.RESTMapping, error) { + return nil, fmt.Errorf("not implemented") +} +func (f *fakeResettableRESTMapper) RESTMappings(schema.GroupKind, ...string) ([]*meta.RESTMapping, error) { + return nil, fmt.Errorf("not implemented") +} +func (f *fakeResettableRESTMapper) ResourceSingularizer(string) (string, error) { + return "", fmt.Errorf("not implemented") +} + +var _ meta.ResettableRESTMapper = (*fakeResettableRESTMapper)(nil) + +// fakeInformerFactory returns stub informers for any resource. +type fakeInformerFactory struct{} + +func (f *fakeInformerFactory) ForResource(schema.GroupVersionResource) (informers.GenericInformer, error) { + return &fakeGenericInformer{}, nil +} +func (f *fakeInformerFactory) Start(<-chan struct{}) {} + +var _ informerfactory.InformerFactory = (*fakeInformerFactory)(nil) + +type fakeGenericInformer struct { + inf *fakeSharedIndexInformer +} + +func (f *fakeGenericInformer) Informer() cache.SharedIndexInformer { + if f.inf == nil { + f.inf = newFakeSharedIndexInformer() + } + return f.inf +} +func (f *fakeGenericInformer) Lister() cache.GenericLister { return &fakeGenericLister{} } + +type fakeGenericLister struct{} + +func (f *fakeGenericLister) List(labels.Selector) ([]runtime.Object, error) { return nil, nil } +func (f *fakeGenericLister) Get(string) (runtime.Object, error) { return nil, nil } +func (f *fakeGenericLister) ByNamespace(string) cache.GenericNamespaceLister { return nil } + +// fakeSharedIndexInformer satisfies cache.SharedIndexInformer. +type fakeSharedIndexInformer struct { + store cache.Store + ctrl *fakeController +} + +func newFakeSharedIndexInformer() *fakeSharedIndexInformer { + return &fakeSharedIndexInformer{ + store: cache.NewStore(cache.MetaNamespaceKeyFunc), + ctrl: &fakeController{synced: true}, + } +} + +type fakeHandlerRegistration struct{ synced bool } + +func (r *fakeHandlerRegistration) HasSynced() bool { return r.synced } + +func (f *fakeSharedIndexInformer) AddEventHandler(cache.ResourceEventHandler) (cache.ResourceEventHandlerRegistration, error) { + return &fakeHandlerRegistration{synced: true}, nil +} +func (f *fakeSharedIndexInformer) AddEventHandlerWithResyncPeriod(cache.ResourceEventHandler, time.Duration) (cache.ResourceEventHandlerRegistration, error) { + return &fakeHandlerRegistration{synced: true}, nil +} +func (f *fakeSharedIndexInformer) RemoveEventHandler(cache.ResourceEventHandlerRegistration) error { + return nil +} +func (f *fakeSharedIndexInformer) GetStore() cache.Store { return f.store } +func (f *fakeSharedIndexInformer) GetController() cache.Controller { return f.ctrl } +func (f *fakeSharedIndexInformer) Run(<-chan struct{}) {} +func (f *fakeSharedIndexInformer) HasSynced() bool { return f.ctrl.synced } +func (f *fakeSharedIndexInformer) LastSyncResourceVersion() string { return "" } +func (f *fakeSharedIndexInformer) SetWatchErrorHandler(cache.WatchErrorHandler) error { + return nil +} +func (f *fakeSharedIndexInformer) SetTransform(cache.TransformFunc) error { return nil } +func (f *fakeSharedIndexInformer) IsStopped() bool { return false } +func (f *fakeSharedIndexInformer) AddIndexers(cache.Indexers) error { return nil } +func (f *fakeSharedIndexInformer) GetIndexer() cache.Indexer { return cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{}) } + +type fakeController struct { + synced bool +} + +func (f *fakeController) Run(<-chan struct{}) {} +func (f *fakeController) HasSynced() bool { return f.synced } +func (f *fakeController) LastSyncResourceVersion() string { return "" } + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +// newTestGC creates a minimal GarbageCollector suitable for sync tests. +func newTestGC(mapper *fakeResettableRESTMapper) *GarbageCollector { + atd := workqueue.NewTypedRateLimitingQueueWithConfig( + workqueue.DefaultTypedControllerRateLimiter[*node](), + workqueue.TypedRateLimitingQueueConfig[*node]{Name: "test_atd"}, + ) + ato := workqueue.NewTypedRateLimitingQueueWithConfig( + workqueue.DefaultTypedControllerRateLimiter[*node](), + workqueue.TypedRateLimitingQueueConfig[*node]{Name: "test_ato"}, + ) + return &GarbageCollector{ + restMapper: mapper, + attemptToDelete: atd, + attemptToOrphan: ato, + absentOwnerCache: NewReferenceCache(100), + eventBroadcaster: record.NewBroadcaster(record.WithContext(context.Background())), + } +} + +// newTestGraphBuilder creates a GraphBuilder with pre-populated monitors for +// the given resources. Monitors are created as already-started (non-nil stopCh) +// so startMonitors is a no-op. +func newTestGraphBuilder( + project string, + resources map[schema.GroupVersionResource]struct{}, + mapper meta.RESTMapper, + factory informerfactory.InformerFactory, + atd workqueue.TypedRateLimitingInterface[*node], + ato workqueue.TypedRateLimitingInterface[*node], + absent *ReferenceCache, + broadcaster record.EventBroadcaster, +) *GraphBuilder { + informersStarted := make(chan struct{}) + close(informersStarted) + + gb := &GraphBuilder{ + restMapper: mapper, + project: project, + monitors: monitors{}, + informersStarted: informersStarted, + running: true, + metadataClient: nil, + graphChanges: workqueue.NewTypedRateLimitingQueueWithConfig( + workqueue.DefaultTypedControllerRateLimiter[*event](), + workqueue.TypedRateLimitingQueueConfig[*event]{Name: "test_gc_" + project}, + ), + uidToNode: &concurrentUIDToNode{uidToNode: make(map[types.UID]*node)}, + attemptToDelete: atd, + attemptToOrphan: ato, + absentOwnerCache: absent, + sharedInformers: factory, + ignoredResources: map[schema.GroupResource]struct{}{}, + eventRecorder: broadcaster.NewRecorder(runtime.NewScheme(), v1.EventSource{Component: "test"}), + eventBroadcaster: broadcaster, + } + + // Pre-populate monitors with already-started dummy monitors + for gvr := range resources { + stopCh := make(chan struct{}) + gb.monitors[gvr] = &monitor{ + store: cache.NewStore(cache.MetaNamespaceKeyFunc), + controller: &fakeController{synced: true}, + stopCh: stopCh, + } + } + + return gb +} + +func monitorCount(gb *GraphBuilder) int { + gb.monitorLock.RLock() + defer gb.monitorLock.RUnlock() + return len(gb.monitors) +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +// TestSyncOnce_SkipsResyncWhenResourcesUnchanged verifies that the Sync loop +// short-circuits (does not call resyncMonitors) when root discovery reports the +// same resources as the previous tick and no new builders have been added. +func TestSyncOnce_SkipsResyncWhenResourcesUnchanged(t *testing.T) { + mapper := &fakeResettableRESTMapper{kinds: gvrToGVK} + gc := newTestGC(mapper) + + disc := discoveryForResources(allResources) + factory := &fakeInformerFactory{} + + rootGB := newTestGraphBuilder("", allResources, mapper, factory, + gc.attemptToDelete, gc.attemptToOrphan, gc.absentOwnerCache, gc.eventBroadcaster) + gc.dependencyGraphBuilders = []*GraphBuilder{rootGB} + + ctx := context.Background() + + // First syncOnce: oldResources is empty, so it resyncs + newOld, ok := gc.syncOnce(ctx, disc, map[schema.GroupVersionResource]struct{}{}, 0) + if !ok { + t.Fatal("first syncOnce should have performed a resync") + } + if len(newOld) != len(allResources) { + t.Fatalf("expected %d resources, got %d", len(allResources), len(newOld)) + } + + // Second syncOnce: oldResources == newResources, should short-circuit + _, ok = gc.syncOnce(ctx, disc, newOld, 0) + if ok { + t.Fatal("second syncOnce should have short-circuited (resources unchanged, no new builders)") + } +} + +// TestSyncOnce_ResyncsWhenResyncNeeded_BugRepro verifies the original bug: +// a per-project builder added with incomplete monitors is never resynced +// because the Sync loop short-circuits on unchanged root resources. +// +// Without the fix (resyncNeeded flag), the second syncOnce returns false +// and the project builder keeps its incomplete monitor set. +func TestSyncOnce_ResyncsWhenResyncNeeded_BugRepro(t *testing.T) { + mapper := &fakeResettableRESTMapper{kinds: gvrToGVK} + gc := newTestGC(mapper) + + disc := discoveryForResources(allResources) + factory := &fakeInformerFactory{} + + rootGB := newTestGraphBuilder("", allResources, mapper, factory, + gc.attemptToDelete, gc.attemptToOrphan, gc.absentOwnerCache, gc.eventBroadcaster) + + // Simulate a project builder that was seeded with only 2 of 5 resources + // (PCP API wasn't fully ready when AddProject ran). + incompleteResources := map[schema.GroupVersionResource]struct{}{ + gvrPods: {}, + gvrServices: {}, + } + projectGB := newTestGraphBuilder("zachs-project", incompleteResources, mapper, factory, + gc.attemptToDelete, gc.attemptToOrphan, gc.absentOwnerCache, gc.eventBroadcaster) + + gc.dependencyGraphBuilders = []*GraphBuilder{rootGB, projectGB} + + ctx := context.Background() + + // First syncOnce: seeds oldResources + newOld, ok := gc.syncOnce(ctx, disc, map[schema.GroupVersionResource]struct{}{}, 0) + if !ok { + t.Fatal("first syncOnce should have performed a resync") + } + + // At this point both builders have been resynced with allResources. + // Reset project builder to incomplete state to simulate what would + // happen if AddProject ran AFTER the first sync tick. + projectGB.monitorLock.Lock() + // Close excess monitors' stopCh + for gvr, m := range projectGB.monitors { + if _, inOriginal := incompleteResources[gvr]; !inOriginal { + if m.stopCh != nil { + close(m.stopCh) + } + delete(projectGB.monitors, gvr) + } + } + projectGB.monitorLock.Unlock() + + if monitorCount(projectGB) != 2 { + t.Fatalf("project builder should have 2 monitors, got %d", monitorCount(projectGB)) + } + + // Simulate AddProject setting the flag (as our fix does) + gc.mu.Lock() + gc.resyncNeeded = true + gc.mu.Unlock() + + // Second syncOnce: root resources haven't changed, but resyncNeeded is true. + // With the fix, this should resync and bring the project builder up to date. + _, ok = gc.syncOnce(ctx, disc, newOld, 0) + if !ok { + t.Fatal("syncOnce should have resynced because resyncNeeded was true") + } + + if got := monitorCount(projectGB); got != len(allResources) { + t.Fatalf("after resync, project builder should have %d monitors, got %d", len(allResources), got) + } +} + +// TestSyncOnce_ClearsResyncNeeded verifies that after a successful resync, +// the resyncNeeded flag is cleared so subsequent ticks short-circuit normally. +func TestSyncOnce_ClearsResyncNeeded(t *testing.T) { + mapper := &fakeResettableRESTMapper{kinds: gvrToGVK} + gc := newTestGC(mapper) + + disc := discoveryForResources(allResources) + factory := &fakeInformerFactory{} + + rootGB := newTestGraphBuilder("", allResources, mapper, factory, + gc.attemptToDelete, gc.attemptToOrphan, gc.absentOwnerCache, gc.eventBroadcaster) + gc.dependencyGraphBuilders = []*GraphBuilder{rootGB} + + gc.mu.Lock() + gc.resyncNeeded = true + gc.mu.Unlock() + + ctx := context.Background() + + // First syncOnce: resyncNeeded forces resync + newOld, ok := gc.syncOnce(ctx, disc, map[schema.GroupVersionResource]struct{}{}, 0) + if !ok { + t.Fatal("first syncOnce should have resynced") + } + + gc.mu.RLock() + if gc.resyncNeeded { + t.Fatal("resyncNeeded should be false after successful resync") + } + gc.mu.RUnlock() + + // Second syncOnce: resources unchanged, resyncNeeded cleared → should skip + _, ok = gc.syncOnce(ctx, disc, newOld, 0) + if ok { + t.Fatal("second syncOnce should have short-circuited after resyncNeeded was cleared") + } +} + +// TestSyncOnce_WithoutFix_BugDemo demonstrates the pre-fix behavior: without +// the resyncNeeded flag, a project builder with incomplete monitors is never +// resynced when root resources remain stable. +func TestSyncOnce_WithoutFix_BugDemo(t *testing.T) { + mapper := &fakeResettableRESTMapper{kinds: gvrToGVK} + gc := newTestGC(mapper) + + disc := discoveryForResources(allResources) + factory := &fakeInformerFactory{} + + rootGB := newTestGraphBuilder("", allResources, mapper, factory, + gc.attemptToDelete, gc.attemptToOrphan, gc.absentOwnerCache, gc.eventBroadcaster) + gc.dependencyGraphBuilders = []*GraphBuilder{rootGB} + + ctx := context.Background() + + // First syncOnce: seeds oldResources with the full set + newOld, ok := gc.syncOnce(ctx, disc, map[schema.GroupVersionResource]struct{}{}, 0) + if !ok { + t.Fatal("first syncOnce should have performed a resync") + } + + // A new project arrives with only 2 monitors (PCP API was incomplete) + incompleteResources := map[schema.GroupVersionResource]struct{}{ + gvrPods: {}, + gvrServices: {}, + } + projectGB := newTestGraphBuilder("zachs-project", incompleteResources, mapper, factory, + gc.attemptToDelete, gc.attemptToOrphan, gc.absentOwnerCache, gc.eventBroadcaster) + + gc.mu.Lock() + gc.dependencyGraphBuilders = append(gc.dependencyGraphBuilders, projectGB) + // NOTE: NOT setting gc.resyncNeeded - simulating old code without the fix + gc.mu.Unlock() + + // Second syncOnce: root resources unchanged, resyncNeeded is false + // → short-circuits, project builder stays incomplete + _, ok = gc.syncOnce(ctx, disc, newOld, 0) + if ok { + t.Fatal("without fix: syncOnce should short-circuit (resources unchanged)") + } + + // Verify the project builder still only has incomplete monitors + if got := monitorCount(projectGB); got != 2 { + t.Fatalf("without fix: project builder should still have 2 monitors, got %d", got) + } +} + +// TestConcurrentAddProjectAndSync verifies that concurrent AddProject calls +// and Sync ticks don't race on the resyncNeeded flag. +func TestConcurrentAddProjectAndSync(t *testing.T) { + mapper := &fakeResettableRESTMapper{kinds: gvrToGVK} + gc := newTestGC(mapper) + + disc := discoveryForResources(allResources) + factory := &fakeInformerFactory{} + + rootGB := newTestGraphBuilder("", allResources, mapper, factory, + gc.attemptToDelete, gc.attemptToOrphan, gc.absentOwnerCache, gc.eventBroadcaster) + gc.dependencyGraphBuilders = []*GraphBuilder{rootGB} + + ctx := context.Background() + + // Seed oldResources + newOld, _ := gc.syncOnce(ctx, disc, map[schema.GroupVersionResource]struct{}{}, 0) + + var wg sync.WaitGroup + const workers = 10 + + for i := 0; i < workers; i++ { + wg.Add(1) + go func(idx int) { + defer wg.Done() + // Simulate setting resyncNeeded (what AddProject does) + gc.mu.Lock() + gc.resyncNeeded = true + gc.mu.Unlock() + }(i) + } + + for i := 0; i < workers; i++ { + wg.Add(1) + go func() { + defer wg.Done() + gc.syncOnce(ctx, disc, newOld, 0) + }() + } + + wg.Wait() +} diff --git a/internal/controllers/garbagecollector/partition_metrics.go b/internal/controllers/garbagecollector/partition_metrics.go new file mode 100644 index 00000000..38736ef7 --- /dev/null +++ b/internal/controllers/garbagecollector/partition_metrics.go @@ -0,0 +1,69 @@ +package garbagecollector + +import ( + "sync" + + "k8s.io/component-base/metrics" + "k8s.io/component-base/metrics/legacyregistry" +) + +const partitionSubsystem = "gc_partition" + +var ( + // partitionCount tracks the total number of registered per-project GC partitions. + partitionCount = metrics.NewGauge( + &metrics.GaugeOpts{ + Subsystem: partitionSubsystem, + Name: "total", + Help: "Total number of registered per-project GC partitions", + StabilityLevel: metrics.ALPHA, + }, + ) + + // partitionMonitorCount tracks the number of resource monitors per partition. + partitionMonitorCount = metrics.NewGaugeVec( + &metrics.GaugeOpts{ + Subsystem: partitionSubsystem, + Name: "monitor_count", + Help: "Number of resource monitors in a GC partition", + StabilityLevel: metrics.ALPHA, + }, + []string{"project"}, + ) + + // partitionSynced tracks whether each partition's monitors are all synced. + partitionSynced = metrics.NewGaugeVec( + &metrics.GaugeOpts{ + Subsystem: partitionSubsystem, + Name: "synced", + Help: "Whether a GC partition's monitors are all synced (1) or not (0)", + StabilityLevel: metrics.ALPHA, + }, + []string{"project"}, + ) +) + +var registerPartitionMetrics sync.Once + +func registerPartitionMetricsOnce() { + registerPartitionMetrics.Do(func() { + legacyregistry.MustRegister( + partitionCount, + partitionMonitorCount, + partitionSynced, + ) + }) +} + +func monitorCountForBuilder(gb *GraphBuilder) int { + gb.monitorLock.RLock() + defer gb.monitorLock.RUnlock() + return len(gb.monitors) +} + +func boolToFloat(b bool) float64 { + if b { + return 1 + } + return 0 +} diff --git a/internal/controllers/projectprovider/metrics/metrics.go b/internal/controllers/projectprovider/metrics/metrics.go new file mode 100644 index 00000000..1597b411 --- /dev/null +++ b/internal/controllers/projectprovider/metrics/metrics.go @@ -0,0 +1,68 @@ +package metrics + +import ( + "sync" + + "k8s.io/component-base/metrics" + "k8s.io/component-base/metrics/legacyregistry" +) + +const subsystem = "project_provider" + +var ( + // ProjectAddTotal counts AddProject calls by outcome. + // Labels: status = "success" | "error" | "abandoned" + ProjectAddTotal = metrics.NewCounterVec( + &metrics.CounterOpts{ + Subsystem: subsystem, + Name: "add_total", + Help: "Total number of AddProject attempts by outcome", + StabilityLevel: metrics.ALPHA, + }, + []string{"status"}, + ) + + // ProjectAddRetriesTotal counts the total number of retried AddProject calls. + ProjectAddRetriesTotal = metrics.NewCounter( + &metrics.CounterOpts{ + Subsystem: subsystem, + Name: "add_retries_total", + Help: "Total number of AddProject retries due to transient errors", + StabilityLevel: metrics.ALPHA, + }, + ) + + // ProjectAddDurationSeconds measures how long AddProject calls take. + ProjectAddDurationSeconds = metrics.NewHistogram( + &metrics.HistogramOpts{ + Subsystem: subsystem, + Name: "add_duration_seconds", + Help: "Duration of AddProject calls in seconds", + Buckets: []float64{0.1, 0.5, 1, 2, 5, 10, 30}, + StabilityLevel: metrics.ALPHA, + }, + ) + + // QueueDepth reports the current number of projects waiting to be added. + QueueDepth = metrics.NewGauge( + &metrics.GaugeOpts{ + Subsystem: subsystem, + Name: "queue_depth", + Help: "Current number of projects waiting in the add queue", + StabilityLevel: metrics.ALPHA, + }, + ) +) + +var registerOnce sync.Once + +func Register() { + registerOnce.Do(func() { + legacyregistry.MustRegister( + ProjectAddTotal, + ProjectAddRetriesTotal, + ProjectAddDurationSeconds, + QueueDepth, + ) + }) +} diff --git a/internal/controllers/projectprovider/provider.go b/internal/controllers/projectprovider/provider.go index 32db8bf7..2eb2c657 100644 --- a/internal/controllers/projectprovider/provider.go +++ b/internal/controllers/projectprovider/provider.go @@ -3,6 +3,7 @@ package projectprovider import ( "context" "strings" + "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -11,14 +12,45 @@ import ( "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/discovery" "k8s.io/client-go/discovery/cached/memory" - "k8s.io/client-go/restmapper" - "k8s.io/client-go/dynamic" "k8s.io/client-go/rest" + "k8s.io/client-go/restmapper" "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" + "golang.org/x/time/rate" + + ppmetrics "go.miloapis.com/milo/internal/controllers/projectprovider/metrics" ) +// Config holds tunable parameters for the project provider. All fields have +// sensible defaults via DefaultConfig(). +type Config struct { + // Workers is the number of concurrent goroutines processing the add queue. + Workers int + // MaxRetries is the maximum number of times a failed AddProject is retried. + MaxRetries int + // RateLimit is the sustained per-second rate at which projects are added. + RateLimit float64 + // RateBurst is the burst allowance for project additions. + RateBurst int + // BaseBackoff is the initial backoff duration after a failed AddProject. + BaseBackoff time.Duration + // MaxBackoff is the upper bound for exponential backoff between retries. + MaxBackoff time.Duration +} + +func DefaultConfig() Config { + return Config{ + Workers: 5, + MaxRetries: 10, + RateLimit: 10, + RateBurst: 15, + BaseBackoff: 5 * time.Second, + MaxBackoff: 60 * time.Second, + } +} + type Sink interface { AddProject(ctx context.Context, id string, cfg *rest.Config) error RemoveProject(id string) @@ -29,9 +61,10 @@ type Provider struct { dyn dynamic.Interface sink Sink projectGVR schema.GroupVersionResource + cfg Config } -func New(root *rest.Config, sink Sink) (*Provider, error) { +func New(root *rest.Config, sink Sink, cfg Config) (*Provider, error) { dyn, err := dynamic.NewForConfig(root) if err != nil { return nil, err @@ -40,7 +73,10 @@ func New(root *rest.Config, sink Sink) (*Provider, error) { if err != nil { return nil, err } - return &Provider{root: root, dyn: dyn, sink: sink, projectGVR: gvr}, nil + + ppmetrics.Register() + + return &Provider{root: root, dyn: dyn, sink: sink, projectGVR: gvr, cfg: cfg}, nil } func (p *Provider) cfgForProject(id string) *rest.Config { @@ -50,6 +86,15 @@ func (p *Provider) cfgForProject(id string) *rest.Config { } func (p *Provider) Run(ctx context.Context) error { + queue := workqueue.NewTypedRateLimitingQueueWithConfig( + workqueue.NewTypedMaxOfRateLimiter( + workqueue.NewTypedItemExponentialFailureRateLimiter[string](p.cfg.BaseBackoff, p.cfg.MaxBackoff), + &workqueue.TypedBucketRateLimiter[string]{Limiter: rate.NewLimiter(rate.Limit(p.cfg.RateLimit), p.cfg.RateBurst)}, + ), + workqueue.TypedRateLimitingQueueConfig[string]{Name: "project_provider"}, + ) + defer queue.ShutDown() + lw := &cache.ListWatch{ ListFunc: func(lo metav1.ListOptions) (runtime.Object, error) { return p.dyn.Resource(p.projectGVR).List(ctx, lo) @@ -63,23 +108,79 @@ func (p *Provider) Run(ctx context.Context) error { inf.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(o interface{}) { id := o.(*unstructured.Unstructured).GetName() - err := p.sink.AddProject(ctx, id, p.cfgForProject(id)) - if err != nil { - // Log the error but continue processing other projects - klog.Errorf("Failed to add project %q: %v", id, err) - } + queue.Add(id) }, DeleteFunc: func(o interface{}) { + if d, ok := o.(cache.DeletedFinalStateUnknown); ok { + o = d.Obj + } id := o.(*unstructured.Unstructured).GetName() p.sink.RemoveProject(id) }, }) go inf.Run(ctx.Done()) + + // Periodically report queue depth. + go func() { + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + ppmetrics.QueueDepth.Set(float64(queue.Len())) + } + } + }() + + for i := 0; i < p.cfg.Workers; i++ { + go p.runWorker(ctx, queue) + } + <-ctx.Done() return nil } +func (p *Provider) runWorker(ctx context.Context, queue workqueue.TypedRateLimitingInterface[string]) { + for { + id, quit := queue.Get() + if quit { + return + } + p.processProject(ctx, queue, id) + queue.Done(id) + } +} + +func (p *Provider) processProject(ctx context.Context, queue workqueue.TypedRateLimitingInterface[string], id string) { + start := time.Now() + err := p.sink.AddProject(ctx, id, p.cfgForProject(id)) + ppmetrics.ProjectAddDurationSeconds.Observe(time.Since(start).Seconds()) + + if err == nil { + ppmetrics.ProjectAddTotal.WithLabelValues("success").Inc() + queue.Forget(id) + return + } + + retries := queue.NumRequeues(id) + if retries < p.cfg.MaxRetries { + ppmetrics.ProjectAddTotal.WithLabelValues("error").Inc() + ppmetrics.ProjectAddRetriesTotal.Inc() + klog.V(2).Infof("Failed to add project %q (attempt %d/%d, will retry): %v", + id, retries+1, p.cfg.MaxRetries, err) + queue.AddRateLimited(id) + return + } + + ppmetrics.ProjectAddTotal.WithLabelValues("abandoned").Inc() + klog.Errorf("Failed to add project %q after %d attempts, giving up: %v", + id, p.cfg.MaxRetries, err) + queue.Forget(id) +} + func resolveProjectGVR(cfg *rest.Config, group, preferredVersion string) (schema.GroupVersionResource, error) { disc, err := discovery.NewDiscoveryClientForConfig(cfg) if err != nil { @@ -87,7 +188,6 @@ func resolveProjectGVR(cfg *rest.Config, group, preferredVersion string) (schema } rm := restmapper.NewDeferredDiscoveryRESTMapper(memory.NewMemCacheClient(disc)) - // If preferredVersion == "", RESTMapping will pick the preferred version mapping, err := rm.RESTMapping(schema.GroupKind{Group: group, Kind: "Project"}, preferredVersion) if err != nil { return schema.GroupVersionResource{}, err diff --git a/internal/controllers/projectprovider/provider_test.go b/internal/controllers/projectprovider/provider_test.go new file mode 100644 index 00000000..d6b25279 --- /dev/null +++ b/internal/controllers/projectprovider/provider_test.go @@ -0,0 +1,146 @@ +package projectprovider + +import ( + "context" + "fmt" + "sync" + "sync/atomic" + "testing" + "time" + + "k8s.io/client-go/rest" +) + +// fakeSink records AddProject/RemoveProject calls and can simulate failures. +type fakeSink struct { + mu sync.Mutex + added map[string]int // project → number of AddProject calls + removed []string + failNext int32 // atomic: number of remaining AddProject failures +} + +func newFakeSink() *fakeSink { + return &fakeSink{added: make(map[string]int)} +} + +func (s *fakeSink) AddProject(_ context.Context, id string, _ *rest.Config) error { + if atomic.AddInt32(&s.failNext, -1) >= 0 { + return fmt.Errorf("simulated failure for %s", id) + } + s.mu.Lock() + defer s.mu.Unlock() + s.added[id]++ + return nil +} + +func (s *fakeSink) RemoveProject(id string) { + s.mu.Lock() + defer s.mu.Unlock() + s.removed = append(s.removed, id) +} + +func (s *fakeSink) addCount(id string) int { + s.mu.Lock() + defer s.mu.Unlock() + return s.added[id] +} + +func testProvider(sink Sink) *Provider { + return &Provider{ + root: &rest.Config{Host: "https://example.com"}, + sink: sink, + cfg: DefaultConfig(), + } +} + +func TestProcessProject_RetriesOnFailure(t *testing.T) { + sink := newFakeSink() + atomic.StoreInt32(&sink.failNext, 3) + + p := testProvider(sink) + queue := newTestQueue() + ctx := context.Background() + + for i := 0; i < 3; i++ { + p.processProject(ctx, queue, "test-project") + if queue.NumRequeues("test-project") != i+1 { + t.Fatalf("attempt %d: expected %d requeues, got %d", + i, i+1, queue.NumRequeues("test-project")) + } + } + + // 4th attempt should succeed + p.processProject(ctx, queue, "test-project") + if sink.addCount("test-project") != 1 { + t.Fatalf("expected 1 successful add, got %d", sink.addCount("test-project")) + } +} + +func TestProcessProject_GivesUpAfterMaxRetries(t *testing.T) { + sink := newFakeSink() + p := testProvider(sink) + atomic.StoreInt32(&sink.failNext, int32(p.cfg.MaxRetries)+10) + + queue := newTestQueue() + ctx := context.Background() + + for i := 0; i <= p.cfg.MaxRetries; i++ { + p.processProject(ctx, queue, "test-project") + } + + if queue.forgotten["test-project"] != 1 { + t.Fatalf("expected project to be forgotten after %d retries", p.cfg.MaxRetries) + } + if sink.addCount("test-project") != 0 { + t.Fatal("project should never have been successfully added") + } +} + +func TestProcessProject_RespectsCustomConfig(t *testing.T) { + sink := newFakeSink() + atomic.StoreInt32(&sink.failNext, 100) + + p := &Provider{ + root: &rest.Config{Host: "https://example.com"}, + sink: sink, + cfg: Config{MaxRetries: 3}, + } + + queue := newTestQueue() + ctx := context.Background() + + // 3 retries + 1 final = 4 total calls to give up + for i := 0; i <= 3; i++ { + p.processProject(ctx, queue, "test-project") + } + + if queue.forgotten["test-project"] != 1 { + t.Fatal("expected project to be forgotten after custom MaxRetries=3") + } +} + +// testQueue is a minimal mock of workqueue.TypedRateLimitingInterface for +// unit testing processProject without the async queue machinery. +type testQueue struct { + requeues map[string]int + forgotten map[string]int +} + +func newTestQueue() *testQueue { + return &testQueue{ + requeues: make(map[string]int), + forgotten: make(map[string]int), + } +} + +func (q *testQueue) Add(string) {} +func (q *testQueue) Len() int { return 0 } +func (q *testQueue) Get() (string, bool) { return "", true } +func (q *testQueue) Done(string) {} +func (q *testQueue) ShutDown() {} +func (q *testQueue) ShutDownWithDrain() {} +func (q *testQueue) ShuttingDown() bool { return false } +func (q *testQueue) AddAfter(item string, duration time.Duration) {} +func (q *testQueue) AddRateLimited(item string) { q.requeues[item]++ } +func (q *testQueue) Forget(item string) { q.forgotten[item]++ } +func (q *testQueue) NumRequeues(item string) int { return q.requeues[item] }