From 1dd318fe1dca72242df156ed05d2a4d4e4b32b20 Mon Sep 17 00:00:00 2001 From: Fabian Wiesel Date: Wed, 22 Oct 2025 12:35:05 +0200 Subject: [PATCH 1/2] Aggregates: Use GenerationChangedPredicate and join errors Prior to this change, we would have to exit the reconcile function on each status update, and continue there. Now with the filter, we only retry on either an error or RequeueAfter. Joining the errors allows us to complete at least some of the changes, and not always abort on the first. Return kubernetes errors directly, and return a RequeueAfter on an error from Openstack for now, until the logging situation has been cleared. --- internal/controller/aggregates_controller.go | 64 +++++++++++++------- 1 file changed, 42 insertions(+), 22 deletions(-) diff --git a/internal/controller/aggregates_controller.go b/internal/controller/aggregates_controller.go index 63ae2756..b42e5249 100644 --- a/internal/controller/aggregates_controller.go +++ b/internal/controller/aggregates_controller.go @@ -19,6 +19,7 @@ package controller import ( "context" + "errors" "fmt" "slices" @@ -26,8 +27,10 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" k8sclient "sigs.k8s.io/controller-runtime/pkg/client" logger "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/predicate" "github.com/gophercloud/gophercloud/v2" "github.com/gophercloud/gophercloud/v2/openstack/compute/v2/aggregates" @@ -73,30 +76,24 @@ func (ac *AggregatesController) Reconcile(ctx context.Context, req ctrl.Request) aggs, err := aggregatesByName(ctx, ac.computeClient) if err != nil { - meta.SetStatusCondition(&hv.Status.Conditions, metav1.Condition{ - Type: ConditionTypeAggregatesUpdated, - Status: metav1.ConditionFalse, - Reason: ConditionAggregatesFailed, - Message: err.Error(), - }) - return ctrl.Result{}, ac.Status().Update(ctx, hv) + return ctrl.Result{}, ac.trackError(ctx, hv, "failed fetching aggregates", err) } toAdd := Difference(hv.Status.Aggregates, hv.Spec.Aggregates) toRemove := Difference(hv.Spec.Aggregates, hv.Status.Aggregates) + // We need to add first the host to the aggregates, because if we first drop + // an aggregate with a filter criterion and then add a new one, we leave the host + // open for period of time. Still, this may fail due to a conflict of aggregates + // with different availability zones, so we collect all the errors and return them + // so it hopefully will converge eventually. + var errs []error if len(toAdd) > 0 { log.Info("Adding", "aggregates", toAdd) for item := range slices.Values(toAdd) { err = addToAggregate(ctx, ac.computeClient, aggs, hv.Name, item, "") if err != nil { - meta.SetStatusCondition(&hv.Status.Conditions, metav1.Condition{ - Type: ConditionTypeAggregatesUpdated, - Status: metav1.ConditionFalse, - Reason: ConditionAggregatesFailed, - Message: err.Error(), - }) - return ctrl.Result{}, ac.Status().Update(ctx, hv) + errs = append(errs, err) } } } @@ -106,17 +103,15 @@ func (ac *AggregatesController) Reconcile(ctx context.Context, req ctrl.Request) for item := range slices.Values(toRemove) { err = removeFromAggregate(ctx, ac.computeClient, aggs, hv.Name, item) if err != nil { - meta.SetStatusCondition(&hv.Status.Conditions, metav1.Condition{ - Type: ConditionTypeAggregatesUpdated, - Status: metav1.ConditionFalse, - Reason: ConditionAggregatesFailed, - Message: err.Error(), - }) - return ctrl.Result{}, ac.Status().Update(ctx, hv) + errs = append(errs, err) } } } + if errs != nil { + return ctrl.Result{}, ac.trackError(ctx, hv, "failed updating aggregates", errs...) + } + hv.Status.Aggregates = hv.Spec.Aggregates meta.SetStatusCondition(&hv.Status.Conditions, metav1.Condition{ Type: ConditionTypeAggregatesUpdated, @@ -127,6 +122,31 @@ func (ac *AggregatesController) Reconcile(ctx context.Context, req ctrl.Request) return ctrl.Result{}, ac.Status().Update(ctx, hv) } +func (ac *AggregatesController) trackError(ctx context.Context, hv *kvmv1.Hypervisor, msg string, errs ...error) error { + err := errors.Join(errs...) + if err == nil { + return nil + } + + condition := metav1.Condition{ + Type: ConditionTypeAggregatesUpdated, + Status: metav1.ConditionFalse, + Reason: ConditionAggregatesFailed, + Message: err.Error(), + } + + if meta.SetStatusCondition(&hv.Status.Conditions, condition) { + if err2 := ac.Status().Update(ctx, hv); err2 != nil { + return errors.Join(err, err2) + } + logger.FromContext(ctx). + WithCallDepth(1). // Where did we call trackError() from? + Error(err, msg) + } + + return err +} + // SetupWithManager sets up the controller with the Manager. func (ac *AggregatesController) SetupWithManager(mgr ctrl.Manager) error { ctx := context.Background() @@ -140,7 +160,7 @@ func (ac *AggregatesController) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). Named(AggregatesControllerName). - For(&kvmv1.Hypervisor{}). + For(&kvmv1.Hypervisor{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})). Complete(ac) } From 0aebca4e807053fd264c91004dfb9d60db5e2615 Mon Sep 17 00:00:00 2001 From: Andrew Karpow Date: Thu, 23 Oct 2025 16:09:12 -0400 Subject: [PATCH 2/2] Use zapEncoder for removing stacktrace of reconcilerrors, fixups This PR modifies the Code of #161 to improve following points: 1. no need for extra error-log since instead of dropping Reconcile Errors, we format them nicely with the Encoder. 2. Function (like rewritten `setErrorCondition`) should not return the errors the've been invoked with - but only return errors if they fail. Also, it's an uneeded roundtrip to return the same error that has been passed by the caller. 3. Introduce `utils.LifecycleEnabledPredicate`, a predicate that will filter event's for hypervisors with LifecycleEnabled == True. --- cmd/main.go | 2 +- internal/controller/aggregates_controller.go | 50 +++++++++----------- internal/logger/logger.go | 43 +++++++---------- internal/utils/lifecycle_enabled.go | 34 +++++++++++++ 4 files changed, 73 insertions(+), 56 deletions(-) create mode 100644 internal/utils/lifecycle_enabled.go diff --git a/cmd/main.go b/cmd/main.go index 3f5e59a2..09922aaa 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -96,7 +96,7 @@ func main() { opts := ctrlzap.Options{ Development: true, TimeEncoder: zapcore.ISO8601TimeEncoder, - ZapOpts: []zap.Option{zap.WrapCore(logger.WrapCore)}, + Encoder: logger.NewSanitzeReconcileErrorEncoder(zapcore.EncoderConfig{}), StacktraceLevel: zap.DPanicLevel, } opts.BindFlags(flag.CommandLine) diff --git a/internal/controller/aggregates_controller.go b/internal/controller/aggregates_controller.go index b42e5249..36b52c45 100644 --- a/internal/controller/aggregates_controller.go +++ b/internal/controller/aggregates_controller.go @@ -37,6 +37,7 @@ import ( kvmv1 "github.com/cobaltcore-dev/openstack-hypervisor-operator/api/v1" "github.com/cobaltcore-dev/openstack-hypervisor-operator/internal/openstack" + "github.com/cobaltcore-dev/openstack-hypervisor-operator/internal/utils" ) const ( @@ -56,19 +57,12 @@ type AggregatesController struct { // +kubebuilder:rbac:groups=kvm.cloud.sap,resources=hypervisors/status,verbs=get;list;watch;create;update;patch;delete func (ac *AggregatesController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - log := logger.FromContext(ctx).WithName(req.Name) - ctx = logger.IntoContext(ctx, log) - + log := logger.FromContext(ctx) hv := &kvmv1.Hypervisor{} if err := ac.Get(ctx, req.NamespacedName, hv); err != nil { return ctrl.Result{}, k8sclient.IgnoreNotFound(err) } - // apply traits only when lifecycle management is enabled - if !hv.Spec.LifecycleEnabled { - return ctrl.Result{}, nil - } - if slices.Equal(hv.Spec.Aggregates, hv.Status.Aggregates) { // Nothing to be done return ctrl.Result{}, nil @@ -76,7 +70,11 @@ func (ac *AggregatesController) Reconcile(ctx context.Context, req ctrl.Request) aggs, err := aggregatesByName(ctx, ac.computeClient) if err != nil { - return ctrl.Result{}, ac.trackError(ctx, hv, "failed fetching aggregates", err) + err = fmt.Errorf("failed listing aggregates: %w", err) + if err2 := ac.setErrorCondition(ctx, hv, err.Error()); err2 != nil { + return ctrl.Result{}, errors.Join(err, err2) + } + return ctrl.Result{}, err } toAdd := Difference(hv.Status.Aggregates, hv.Spec.Aggregates) @@ -91,8 +89,7 @@ func (ac *AggregatesController) Reconcile(ctx context.Context, req ctrl.Request) if len(toAdd) > 0 { log.Info("Adding", "aggregates", toAdd) for item := range slices.Values(toAdd) { - err = addToAggregate(ctx, ac.computeClient, aggs, hv.Name, item, "") - if err != nil { + if err = addToAggregate(ctx, ac.computeClient, aggs, hv.Name, item, ""); err != nil { errs = append(errs, err) } } @@ -101,15 +98,18 @@ func (ac *AggregatesController) Reconcile(ctx context.Context, req ctrl.Request) if len(toRemove) > 0 { log.Info("Removing", "aggregates", toRemove) for item := range slices.Values(toRemove) { - err = removeFromAggregate(ctx, ac.computeClient, aggs, hv.Name, item) - if err != nil { + if err = removeFromAggregate(ctx, ac.computeClient, aggs, hv.Name, item); err != nil { errs = append(errs, err) } } } if errs != nil { - return ctrl.Result{}, ac.trackError(ctx, hv, "failed updating aggregates", errs...) + err = fmt.Errorf("encountered errors during aggregate update: %w", errors.Join(errs...)) + if err2 := ac.setErrorCondition(ctx, hv, err.Error()); err2 != nil { + return ctrl.Result{}, errors.Join(err, err2) + } + return ctrl.Result{}, err } hv.Status.Aggregates = hv.Spec.Aggregates @@ -122,29 +122,22 @@ func (ac *AggregatesController) Reconcile(ctx context.Context, req ctrl.Request) return ctrl.Result{}, ac.Status().Update(ctx, hv) } -func (ac *AggregatesController) trackError(ctx context.Context, hv *kvmv1.Hypervisor, msg string, errs ...error) error { - err := errors.Join(errs...) - if err == nil { - return nil - } - +// setErrorCondition sets the error condition on the Hypervisor status, returns error if update fails +func (ac *AggregatesController) setErrorCondition(ctx context.Context, hv *kvmv1.Hypervisor, msg string) error { condition := metav1.Condition{ Type: ConditionTypeAggregatesUpdated, Status: metav1.ConditionFalse, Reason: ConditionAggregatesFailed, - Message: err.Error(), + Message: msg, } if meta.SetStatusCondition(&hv.Status.Conditions, condition) { - if err2 := ac.Status().Update(ctx, hv); err2 != nil { - return errors.Join(err, err2) + if err := ac.Status().Update(ctx, hv); err != nil { + return err } - logger.FromContext(ctx). - WithCallDepth(1). // Where did we call trackError() from? - Error(err, msg) } - return err + return nil } // SetupWithManager sets up the controller with the Manager. @@ -160,7 +153,8 @@ func (ac *AggregatesController) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). Named(AggregatesControllerName). - For(&kvmv1.Hypervisor{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})). + For(&kvmv1.Hypervisor{}, builder.WithPredicates(utils.LifecycleEnabledPredicate)). + WithEventFilter(predicate.GenerationChangedPredicate{}). Complete(ac) } diff --git a/internal/logger/logger.go b/internal/logger/logger.go index c8a8c193..26e78b53 100644 --- a/internal/logger/logger.go +++ b/internal/logger/logger.go @@ -18,41 +18,30 @@ limitations under the License. package logger import ( + "go.uber.org/zap/buffer" "go.uber.org/zap/zapcore" ) -type wrapCore struct { - core zapcore.Core +func NewSanitzeReconcileErrorEncoder(cfg zapcore.EncoderConfig) zapcore.Encoder { + return &SanitzeReconcileErrorEncoder{zapcore.NewConsoleEncoder(cfg), cfg} } -func WrapCore(core zapcore.Core) zapcore.Core { - return wrapCore{core} +type SanitzeReconcileErrorEncoder struct { + zapcore.Encoder + cfg zapcore.EncoderConfig } -// Check implements zapcore.Core. -func (w wrapCore) Check(entry zapcore.Entry, checkedEntry *zapcore.CheckedEntry) *zapcore.CheckedEntry { - if entry.Message == "Reconciler error" { - entry.Level = -2 +func (e *SanitzeReconcileErrorEncoder) EncodeEntry(entry zapcore.Entry, fields []zapcore.Field) (*buffer.Buffer, error) { + if entry.Message == "Reconcile error" { + // Downgrade the log level to debug to avoid log spam + entry.Level = zapcore.WarnLevel + entry.Stack = "" } - return w.core.Check(entry, checkedEntry) + return e.Encoder.EncodeEntry(entry, fields) } -// Enabled implements zapcore.Core. -func (w wrapCore) Enabled(level zapcore.Level) bool { - return w.core.Enabled(level) -} - -// Sync implements zapcore.Core. -func (w wrapCore) Sync() error { - return w.core.Sync() -} - -// With implements zapcore.Core. -func (w wrapCore) With(fields []zapcore.Field) zapcore.Core { - return wrapCore{w.core.With(fields)} -} - -// Write implements zapcore.Core. -func (w wrapCore) Write(entry zapcore.Entry, fields []zapcore.Field) error { - return w.core.Write(entry, fields) +func (e *SanitzeReconcileErrorEncoder) Clone() zapcore.Encoder { + return &SanitzeReconcileErrorEncoder{ + Encoder: e.Encoder.Clone(), + } } diff --git a/internal/utils/lifecycle_enabled.go b/internal/utils/lifecycle_enabled.go new file mode 100644 index 00000000..0934b0c5 --- /dev/null +++ b/internal/utils/lifecycle_enabled.go @@ -0,0 +1,34 @@ +/* +SPDX-FileCopyrightText: Copyright 2025 SAP SE or an SAP affiliate company and cobaltcore-dev contributors +SPDX-License-Identifier: Apache-2.0 + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package utils + +import ( + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/predicate" + + kvmv1 "github.com/cobaltcore-dev/openstack-hypervisor-operator/api/v1" +) + +var ( + LifecycleEnabledPredicate = predicate.NewPredicateFuncs(func(object client.Object) bool { + if hv, ok := object.(*kvmv1.Hypervisor); ok { + return hv.Spec.LifecycleEnabled + } + return true + }) +)