diff --git a/cmd/main.go b/cmd/main.go index 3f5e59a2..09922aaa 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -96,7 +96,7 @@ func main() { opts := ctrlzap.Options{ Development: true, TimeEncoder: zapcore.ISO8601TimeEncoder, - ZapOpts: []zap.Option{zap.WrapCore(logger.WrapCore)}, + Encoder: logger.NewSanitzeReconcileErrorEncoder(zapcore.EncoderConfig{}), StacktraceLevel: zap.DPanicLevel, } opts.BindFlags(flag.CommandLine) diff --git a/internal/controller/aggregates_controller.go b/internal/controller/aggregates_controller.go index 63ae2756..36b52c45 100644 --- a/internal/controller/aggregates_controller.go +++ b/internal/controller/aggregates_controller.go @@ -19,6 +19,7 @@ package controller import ( "context" + "errors" "fmt" "slices" @@ -26,14 +27,17 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" k8sclient "sigs.k8s.io/controller-runtime/pkg/client" logger "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/predicate" "github.com/gophercloud/gophercloud/v2" "github.com/gophercloud/gophercloud/v2/openstack/compute/v2/aggregates" kvmv1 "github.com/cobaltcore-dev/openstack-hypervisor-operator/api/v1" "github.com/cobaltcore-dev/openstack-hypervisor-operator/internal/openstack" + "github.com/cobaltcore-dev/openstack-hypervisor-operator/internal/utils" ) const ( @@ -53,19 +57,12 @@ type AggregatesController struct { // +kubebuilder:rbac:groups=kvm.cloud.sap,resources=hypervisors/status,verbs=get;list;watch;create;update;patch;delete func (ac *AggregatesController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - log := logger.FromContext(ctx).WithName(req.Name) - ctx = logger.IntoContext(ctx, log) - + log := logger.FromContext(ctx) hv := &kvmv1.Hypervisor{} if err := ac.Get(ctx, req.NamespacedName, hv); err != nil { return ctrl.Result{}, k8sclient.IgnoreNotFound(err) } - // apply traits only when lifecycle management is enabled - if !hv.Spec.LifecycleEnabled { - return ctrl.Result{}, nil - } - if slices.Equal(hv.Spec.Aggregates, hv.Status.Aggregates) { // Nothing to be done return ctrl.Result{}, nil @@ -73,30 +70,27 @@ func (ac *AggregatesController) Reconcile(ctx context.Context, req ctrl.Request) aggs, err := aggregatesByName(ctx, ac.computeClient) if err != nil { - meta.SetStatusCondition(&hv.Status.Conditions, metav1.Condition{ - Type: ConditionTypeAggregatesUpdated, - Status: metav1.ConditionFalse, - Reason: ConditionAggregatesFailed, - Message: err.Error(), - }) - return ctrl.Result{}, ac.Status().Update(ctx, hv) + err = fmt.Errorf("failed listing aggregates: %w", err) + if err2 := ac.setErrorCondition(ctx, hv, err.Error()); err2 != nil { + return ctrl.Result{}, errors.Join(err, err2) + } + return ctrl.Result{}, err } toAdd := Difference(hv.Status.Aggregates, hv.Spec.Aggregates) toRemove := Difference(hv.Spec.Aggregates, hv.Status.Aggregates) + // We need to add first the host to the aggregates, because if we first drop + // an aggregate with a filter criterion and then add a new one, we leave the host + // open for period of time. Still, this may fail due to a conflict of aggregates + // with different availability zones, so we collect all the errors and return them + // so it hopefully will converge eventually. + var errs []error if len(toAdd) > 0 { log.Info("Adding", "aggregates", toAdd) for item := range slices.Values(toAdd) { - err = addToAggregate(ctx, ac.computeClient, aggs, hv.Name, item, "") - if err != nil { - meta.SetStatusCondition(&hv.Status.Conditions, metav1.Condition{ - Type: ConditionTypeAggregatesUpdated, - Status: metav1.ConditionFalse, - Reason: ConditionAggregatesFailed, - Message: err.Error(), - }) - return ctrl.Result{}, ac.Status().Update(ctx, hv) + if err = addToAggregate(ctx, ac.computeClient, aggs, hv.Name, item, ""); err != nil { + errs = append(errs, err) } } } @@ -104,19 +98,20 @@ func (ac *AggregatesController) Reconcile(ctx context.Context, req ctrl.Request) if len(toRemove) > 0 { log.Info("Removing", "aggregates", toRemove) for item := range slices.Values(toRemove) { - err = removeFromAggregate(ctx, ac.computeClient, aggs, hv.Name, item) - if err != nil { - meta.SetStatusCondition(&hv.Status.Conditions, metav1.Condition{ - Type: ConditionTypeAggregatesUpdated, - Status: metav1.ConditionFalse, - Reason: ConditionAggregatesFailed, - Message: err.Error(), - }) - return ctrl.Result{}, ac.Status().Update(ctx, hv) + if err = removeFromAggregate(ctx, ac.computeClient, aggs, hv.Name, item); err != nil { + errs = append(errs, err) } } } + if errs != nil { + err = fmt.Errorf("encountered errors during aggregate update: %w", errors.Join(errs...)) + if err2 := ac.setErrorCondition(ctx, hv, err.Error()); err2 != nil { + return ctrl.Result{}, errors.Join(err, err2) + } + return ctrl.Result{}, err + } + hv.Status.Aggregates = hv.Spec.Aggregates meta.SetStatusCondition(&hv.Status.Conditions, metav1.Condition{ Type: ConditionTypeAggregatesUpdated, @@ -127,6 +122,24 @@ func (ac *AggregatesController) Reconcile(ctx context.Context, req ctrl.Request) return ctrl.Result{}, ac.Status().Update(ctx, hv) } +// setErrorCondition sets the error condition on the Hypervisor status, returns error if update fails +func (ac *AggregatesController) setErrorCondition(ctx context.Context, hv *kvmv1.Hypervisor, msg string) error { + condition := metav1.Condition{ + Type: ConditionTypeAggregatesUpdated, + Status: metav1.ConditionFalse, + Reason: ConditionAggregatesFailed, + Message: msg, + } + + if meta.SetStatusCondition(&hv.Status.Conditions, condition) { + if err := ac.Status().Update(ctx, hv); err != nil { + return err + } + } + + return nil +} + // SetupWithManager sets up the controller with the Manager. func (ac *AggregatesController) SetupWithManager(mgr ctrl.Manager) error { ctx := context.Background() @@ -140,7 +153,8 @@ func (ac *AggregatesController) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). Named(AggregatesControllerName). - For(&kvmv1.Hypervisor{}). + For(&kvmv1.Hypervisor{}, builder.WithPredicates(utils.LifecycleEnabledPredicate)). + WithEventFilter(predicate.GenerationChangedPredicate{}). Complete(ac) } diff --git a/internal/logger/logger.go b/internal/logger/logger.go index c8a8c193..26e78b53 100644 --- a/internal/logger/logger.go +++ b/internal/logger/logger.go @@ -18,41 +18,30 @@ limitations under the License. package logger import ( + "go.uber.org/zap/buffer" "go.uber.org/zap/zapcore" ) -type wrapCore struct { - core zapcore.Core +func NewSanitzeReconcileErrorEncoder(cfg zapcore.EncoderConfig) zapcore.Encoder { + return &SanitzeReconcileErrorEncoder{zapcore.NewConsoleEncoder(cfg), cfg} } -func WrapCore(core zapcore.Core) zapcore.Core { - return wrapCore{core} +type SanitzeReconcileErrorEncoder struct { + zapcore.Encoder + cfg zapcore.EncoderConfig } -// Check implements zapcore.Core. -func (w wrapCore) Check(entry zapcore.Entry, checkedEntry *zapcore.CheckedEntry) *zapcore.CheckedEntry { - if entry.Message == "Reconciler error" { - entry.Level = -2 +func (e *SanitzeReconcileErrorEncoder) EncodeEntry(entry zapcore.Entry, fields []zapcore.Field) (*buffer.Buffer, error) { + if entry.Message == "Reconcile error" { + // Downgrade the log level to debug to avoid log spam + entry.Level = zapcore.WarnLevel + entry.Stack = "" } - return w.core.Check(entry, checkedEntry) + return e.Encoder.EncodeEntry(entry, fields) } -// Enabled implements zapcore.Core. -func (w wrapCore) Enabled(level zapcore.Level) bool { - return w.core.Enabled(level) -} - -// Sync implements zapcore.Core. -func (w wrapCore) Sync() error { - return w.core.Sync() -} - -// With implements zapcore.Core. -func (w wrapCore) With(fields []zapcore.Field) zapcore.Core { - return wrapCore{w.core.With(fields)} -} - -// Write implements zapcore.Core. -func (w wrapCore) Write(entry zapcore.Entry, fields []zapcore.Field) error { - return w.core.Write(entry, fields) +func (e *SanitzeReconcileErrorEncoder) Clone() zapcore.Encoder { + return &SanitzeReconcileErrorEncoder{ + Encoder: e.Encoder.Clone(), + } } diff --git a/internal/utils/lifecycle_enabled.go b/internal/utils/lifecycle_enabled.go new file mode 100644 index 00000000..0934b0c5 --- /dev/null +++ b/internal/utils/lifecycle_enabled.go @@ -0,0 +1,34 @@ +/* +SPDX-FileCopyrightText: Copyright 2025 SAP SE or an SAP affiliate company and cobaltcore-dev contributors +SPDX-License-Identifier: Apache-2.0 + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package utils + +import ( + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/predicate" + + kvmv1 "github.com/cobaltcore-dev/openstack-hypervisor-operator/api/v1" +) + +var ( + LifecycleEnabledPredicate = predicate.NewPredicateFuncs(func(object client.Object) bool { + if hv, ok := object.(*kvmv1.Hypervisor); ok { + return hv.Spec.LifecycleEnabled + } + return true + }) +)