From c706846ae01fe274bba1cfe03d664e484c37c89e Mon Sep 17 00:00:00 2001 From: Fabian Wiesel Date: Wed, 22 Oct 2025 12:35:05 +0200 Subject: [PATCH] Aggregates: Use GenerationChangedPredicate and join errors Prior to this change, we would have to exit the reconcile function on each status update, and continue there. Now with the filter, we only retry on either an error or RequeueAfter. Joining the errors allows us to complete at least some of the changes, and not always abort on the first. Return kubernetes errors directly, and return a RequeueAfter on an error from Openstack for now, until the logging situation has been cleared. --- internal/controller/aggregates_controller.go | 64 +++++++++++++------- 1 file changed, 42 insertions(+), 22 deletions(-) diff --git a/internal/controller/aggregates_controller.go b/internal/controller/aggregates_controller.go index 63ae2756..b42e5249 100644 --- a/internal/controller/aggregates_controller.go +++ b/internal/controller/aggregates_controller.go @@ -19,6 +19,7 @@ package controller import ( "context" + "errors" "fmt" "slices" @@ -26,8 +27,10 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" k8sclient "sigs.k8s.io/controller-runtime/pkg/client" logger "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/predicate" "github.com/gophercloud/gophercloud/v2" "github.com/gophercloud/gophercloud/v2/openstack/compute/v2/aggregates" @@ -73,30 +76,24 @@ func (ac *AggregatesController) Reconcile(ctx context.Context, req ctrl.Request) aggs, err := aggregatesByName(ctx, ac.computeClient) if err != nil { - meta.SetStatusCondition(&hv.Status.Conditions, metav1.Condition{ - Type: ConditionTypeAggregatesUpdated, - Status: metav1.ConditionFalse, - Reason: ConditionAggregatesFailed, - Message: err.Error(), - }) - return ctrl.Result{}, ac.Status().Update(ctx, hv) + return ctrl.Result{}, ac.trackError(ctx, hv, "failed fetching aggregates", err) } toAdd := Difference(hv.Status.Aggregates, hv.Spec.Aggregates) toRemove := Difference(hv.Spec.Aggregates, hv.Status.Aggregates) + // We need to add first the host to the aggregates, because if we first drop + // an aggregate with a filter criterion and then add a new one, we leave the host + // open for period of time. Still, this may fail due to a conflict of aggregates + // with different availability zones, so we collect all the errors and return them + // so it hopefully will converge eventually. + var errs []error if len(toAdd) > 0 { log.Info("Adding", "aggregates", toAdd) for item := range slices.Values(toAdd) { err = addToAggregate(ctx, ac.computeClient, aggs, hv.Name, item, "") if err != nil { - meta.SetStatusCondition(&hv.Status.Conditions, metav1.Condition{ - Type: ConditionTypeAggregatesUpdated, - Status: metav1.ConditionFalse, - Reason: ConditionAggregatesFailed, - Message: err.Error(), - }) - return ctrl.Result{}, ac.Status().Update(ctx, hv) + errs = append(errs, err) } } } @@ -106,17 +103,15 @@ func (ac *AggregatesController) Reconcile(ctx context.Context, req ctrl.Request) for item := range slices.Values(toRemove) { err = removeFromAggregate(ctx, ac.computeClient, aggs, hv.Name, item) if err != nil { - meta.SetStatusCondition(&hv.Status.Conditions, metav1.Condition{ - Type: ConditionTypeAggregatesUpdated, - Status: metav1.ConditionFalse, - Reason: ConditionAggregatesFailed, - Message: err.Error(), - }) - return ctrl.Result{}, ac.Status().Update(ctx, hv) + errs = append(errs, err) } } } + if errs != nil { + return ctrl.Result{}, ac.trackError(ctx, hv, "failed updating aggregates", errs...) + } + hv.Status.Aggregates = hv.Spec.Aggregates meta.SetStatusCondition(&hv.Status.Conditions, metav1.Condition{ Type: ConditionTypeAggregatesUpdated, @@ -127,6 +122,31 @@ func (ac *AggregatesController) Reconcile(ctx context.Context, req ctrl.Request) return ctrl.Result{}, ac.Status().Update(ctx, hv) } +func (ac *AggregatesController) trackError(ctx context.Context, hv *kvmv1.Hypervisor, msg string, errs ...error) error { + err := errors.Join(errs...) + if err == nil { + return nil + } + + condition := metav1.Condition{ + Type: ConditionTypeAggregatesUpdated, + Status: metav1.ConditionFalse, + Reason: ConditionAggregatesFailed, + Message: err.Error(), + } + + if meta.SetStatusCondition(&hv.Status.Conditions, condition) { + if err2 := ac.Status().Update(ctx, hv); err2 != nil { + return errors.Join(err, err2) + } + logger.FromContext(ctx). + WithCallDepth(1). // Where did we call trackError() from? + Error(err, msg) + } + + return err +} + // SetupWithManager sets up the controller with the Manager. func (ac *AggregatesController) SetupWithManager(mgr ctrl.Manager) error { ctx := context.Background() @@ -140,7 +160,7 @@ func (ac *AggregatesController) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). Named(AggregatesControllerName). - For(&kvmv1.Hypervisor{}). + For(&kvmv1.Hypervisor{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})). Complete(ac) }