Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func main() {
opts := ctrlzap.Options{
Development: true,
TimeEncoder: zapcore.ISO8601TimeEncoder,
ZapOpts: []zap.Option{zap.WrapCore(logger.WrapCore)},
Encoder: logger.NewSanitzeReconcileErrorEncoder(zapcore.EncoderConfig{}),
StacktraceLevel: zap.DPanicLevel,
}
opts.BindFlags(flag.CommandLine)
Expand Down
82 changes: 48 additions & 34 deletions internal/controller/aggregates_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,25 @@ package controller

import (
"context"
"errors"
"fmt"
"slices"

"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
k8sclient "sigs.k8s.io/controller-runtime/pkg/client"
logger "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"

"github.com/gophercloud/gophercloud/v2"
"github.com/gophercloud/gophercloud/v2/openstack/compute/v2/aggregates"

kvmv1 "github.com/cobaltcore-dev/openstack-hypervisor-operator/api/v1"
"github.com/cobaltcore-dev/openstack-hypervisor-operator/internal/openstack"
"github.com/cobaltcore-dev/openstack-hypervisor-operator/internal/utils"
)

const (
Expand All @@ -53,70 +57,61 @@ type AggregatesController struct {
// +kubebuilder:rbac:groups=kvm.cloud.sap,resources=hypervisors/status,verbs=get;list;watch;create;update;patch;delete

func (ac *AggregatesController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := logger.FromContext(ctx).WithName(req.Name)
ctx = logger.IntoContext(ctx, log)

log := logger.FromContext(ctx)
hv := &kvmv1.Hypervisor{}
if err := ac.Get(ctx, req.NamespacedName, hv); err != nil {
return ctrl.Result{}, k8sclient.IgnoreNotFound(err)
}

// apply traits only when lifecycle management is enabled
if !hv.Spec.LifecycleEnabled {
return ctrl.Result{}, nil
}

if slices.Equal(hv.Spec.Aggregates, hv.Status.Aggregates) {
// Nothing to be done
return ctrl.Result{}, nil
}

aggs, err := aggregatesByName(ctx, ac.computeClient)
if err != nil {
meta.SetStatusCondition(&hv.Status.Conditions, metav1.Condition{
Type: ConditionTypeAggregatesUpdated,
Status: metav1.ConditionFalse,
Reason: ConditionAggregatesFailed,
Message: err.Error(),
})
return ctrl.Result{}, ac.Status().Update(ctx, hv)
err = fmt.Errorf("failed listing aggregates: %w", err)
if err2 := ac.setErrorCondition(ctx, hv, err.Error()); err2 != nil {
return ctrl.Result{}, errors.Join(err, err2)
}
return ctrl.Result{}, err
}

toAdd := Difference(hv.Status.Aggregates, hv.Spec.Aggregates)
toRemove := Difference(hv.Spec.Aggregates, hv.Status.Aggregates)

// We need to add first the host to the aggregates, because if we first drop
// an aggregate with a filter criterion and then add a new one, we leave the host
// open for period of time. Still, this may fail due to a conflict of aggregates
// with different availability zones, so we collect all the errors and return them
// so it hopefully will converge eventually.
var errs []error
if len(toAdd) > 0 {
log.Info("Adding", "aggregates", toAdd)
for item := range slices.Values(toAdd) {
err = addToAggregate(ctx, ac.computeClient, aggs, hv.Name, item, "")
if err != nil {
meta.SetStatusCondition(&hv.Status.Conditions, metav1.Condition{
Type: ConditionTypeAggregatesUpdated,
Status: metav1.ConditionFalse,
Reason: ConditionAggregatesFailed,
Message: err.Error(),
})
return ctrl.Result{}, ac.Status().Update(ctx, hv)
if err = addToAggregate(ctx, ac.computeClient, aggs, hv.Name, item, ""); err != nil {
errs = append(errs, err)
}
}
}

if len(toRemove) > 0 {
log.Info("Removing", "aggregates", toRemove)
for item := range slices.Values(toRemove) {
err = removeFromAggregate(ctx, ac.computeClient, aggs, hv.Name, item)
if err != nil {
meta.SetStatusCondition(&hv.Status.Conditions, metav1.Condition{
Type: ConditionTypeAggregatesUpdated,
Status: metav1.ConditionFalse,
Reason: ConditionAggregatesFailed,
Message: err.Error(),
})
return ctrl.Result{}, ac.Status().Update(ctx, hv)
if err = removeFromAggregate(ctx, ac.computeClient, aggs, hv.Name, item); err != nil {
errs = append(errs, err)
}
}
}

if errs != nil {
err = fmt.Errorf("encountered errors during aggregate update: %w", errors.Join(errs...))
if err2 := ac.setErrorCondition(ctx, hv, err.Error()); err2 != nil {
return ctrl.Result{}, errors.Join(err, err2)
}
return ctrl.Result{}, err
}

hv.Status.Aggregates = hv.Spec.Aggregates
meta.SetStatusCondition(&hv.Status.Conditions, metav1.Condition{
Type: ConditionTypeAggregatesUpdated,
Expand All @@ -127,6 +122,24 @@ func (ac *AggregatesController) Reconcile(ctx context.Context, req ctrl.Request)
return ctrl.Result{}, ac.Status().Update(ctx, hv)
}

// setErrorCondition sets the error condition on the Hypervisor status, returns error if update fails
func (ac *AggregatesController) setErrorCondition(ctx context.Context, hv *kvmv1.Hypervisor, msg string) error {
condition := metav1.Condition{
Type: ConditionTypeAggregatesUpdated,
Status: metav1.ConditionFalse,
Reason: ConditionAggregatesFailed,
Message: msg,
}

if meta.SetStatusCondition(&hv.Status.Conditions, condition) {
if err := ac.Status().Update(ctx, hv); err != nil {
return err
}
}

return nil
}

// SetupWithManager sets up the controller with the Manager.
func (ac *AggregatesController) SetupWithManager(mgr ctrl.Manager) error {
ctx := context.Background()
Expand All @@ -140,7 +153,8 @@ func (ac *AggregatesController) SetupWithManager(mgr ctrl.Manager) error {

return ctrl.NewControllerManagedBy(mgr).
Named(AggregatesControllerName).
For(&kvmv1.Hypervisor{}).
For(&kvmv1.Hypervisor{}, builder.WithPredicates(utils.LifecycleEnabledPredicate)).
WithEventFilter(predicate.GenerationChangedPredicate{}).
Complete(ac)
}

Expand Down
43 changes: 16 additions & 27 deletions internal/logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,41 +18,30 @@ limitations under the License.
package logger

import (
"go.uber.org/zap/buffer"
"go.uber.org/zap/zapcore"
)

type wrapCore struct {
core zapcore.Core
func NewSanitzeReconcileErrorEncoder(cfg zapcore.EncoderConfig) zapcore.Encoder {
return &SanitzeReconcileErrorEncoder{zapcore.NewConsoleEncoder(cfg), cfg}
}

func WrapCore(core zapcore.Core) zapcore.Core {
return wrapCore{core}
type SanitzeReconcileErrorEncoder struct {
zapcore.Encoder
cfg zapcore.EncoderConfig
}

// Check implements zapcore.Core.
func (w wrapCore) Check(entry zapcore.Entry, checkedEntry *zapcore.CheckedEntry) *zapcore.CheckedEntry {
if entry.Message == "Reconciler error" {
entry.Level = -2
func (e *SanitzeReconcileErrorEncoder) EncodeEntry(entry zapcore.Entry, fields []zapcore.Field) (*buffer.Buffer, error) {
if entry.Message == "Reconcile error" {
// Downgrade the log level to debug to avoid log spam
entry.Level = zapcore.WarnLevel
entry.Stack = ""
}
return w.core.Check(entry, checkedEntry)
return e.Encoder.EncodeEntry(entry, fields)
}

// Enabled implements zapcore.Core.
func (w wrapCore) Enabled(level zapcore.Level) bool {
return w.core.Enabled(level)
}

// Sync implements zapcore.Core.
func (w wrapCore) Sync() error {
return w.core.Sync()
}

// With implements zapcore.Core.
func (w wrapCore) With(fields []zapcore.Field) zapcore.Core {
return wrapCore{w.core.With(fields)}
}

// Write implements zapcore.Core.
func (w wrapCore) Write(entry zapcore.Entry, fields []zapcore.Field) error {
return w.core.Write(entry, fields)
func (e *SanitzeReconcileErrorEncoder) Clone() zapcore.Encoder {
return &SanitzeReconcileErrorEncoder{
Encoder: e.Encoder.Clone(),
}
}
34 changes: 34 additions & 0 deletions internal/utils/lifecycle_enabled.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
SPDX-FileCopyrightText: Copyright 2025 SAP SE or an SAP affiliate company and cobaltcore-dev contributors
SPDX-License-Identifier: Apache-2.0

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package utils

import (
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/predicate"

kvmv1 "github.com/cobaltcore-dev/openstack-hypervisor-operator/api/v1"
)

var (
LifecycleEnabledPredicate = predicate.NewPredicateFuncs(func(object client.Object) bool {
if hv, ok := object.(*kvmv1.Hypervisor); ok {
return hv.Spec.LifecycleEnabled
}
return true
})
)
Loading