diff --git a/docs/env.md b/docs/env.md
index b54ea6780..d682c4195 100644
--- a/docs/env.md
+++ b/docs/env.md
@@ -230,7 +230,10 @@
| VM_FILTERPROMETHEUSCONVERTERANNOTATIONPREFIXES: `-` #
allows filtering for converted annotations, annotations with matched prefix will be ignored |
| VM_CLUSTERDOMAINNAME: `-` #
Defines domain name suffix for in-cluster addresses most known ClusterDomainName is .cluster.local |
| VM_APPREADYTIMEOUT: `80s` #
Defines deadline for deployment/statefulset to transit into ready state to wait for transition to ready state |
-| VM_PODWAITREADYTIMEOUT: `80s` #
Defines single pod deadline to wait for transition to ready state |
| VM_PODWAITREADYINTERVALCHECK: `5s` #
Defines poll interval for pods ready check at statefulset rollout update |
+| VM_PODWAITREADYTIMEOUT: `80s` #
Defines single pod deadline to wait for transition to ready state |
+| VM_PVC_WAIT_READY_INTERVAL: `5s` #
Defines poll interval for PVC ready check |
+| VM_PVC_WAIT_READY_TIMEOUT: `80s` #
Defines poll timeout for PVC ready check |
+| VM_WAIT_READY_INTERVAL: `5s` #
Defines poll interval for VM CRs |
| VM_FORCERESYNCINTERVAL: `60s` #
configures force resync interval for VMAgent, VMAlert, VMAlertmanager and VMAuth. |
| VM_ENABLESTRICTSECURITY: `false` #
EnableStrictSecurity will add default `securityContext` to pods and containers created by operator Default PodSecurityContext include: 1. RunAsNonRoot: true 2. RunAsUser/RunAsGroup/FSGroup: 65534 '65534' refers to 'nobody' in all the used default images like alpine, busybox. If you're using customize image, please make sure '65534' is a valid uid in there or specify SecurityContext. 3. FSGroupChangePolicy: &onRootMismatch If KubeVersion>=1.20, use `FSGroupChangePolicy="onRootMismatch"` to skip the recursive permission change when the root of the volume already has the correct permissions 4. SeccompProfile: type: RuntimeDefault Use `RuntimeDefault` seccomp profile by default, which is defined by the container runtime, instead of using the Unconfined (seccomp disabled) mode. Default container SecurityContext include: 1. AllowPrivilegeEscalation: false 2. ReadOnlyRootFilesystem: true 3. Capabilities: drop: - all turn off `EnableStrictSecurity` by default, see https://github.com/VictoriaMetrics/operator/issues/749 for details |
diff --git a/internal/config/config.go b/internal/config/config.go
index 2de9d0bf4..2343fe37b 100644
--- a/internal/config/config.go
+++ b/internal/config/config.go
@@ -546,13 +546,19 @@ type BaseOperatorConf struct {
// Defines deadline for deployment/statefulset
// to transit into ready state
// to wait for transition to ready state
- AppReadyTimeout time.Duration `default:"80s" env:"VM_APPREADYTIMEOUT"`
+ AppWaitReadyTimeout time.Duration `default:"80s" env:"VM_APPREADYTIMEOUT"`
+ // Defines poll interval for pods ready check
+ // at statefulset rollout update
+ PodWaitReadyInterval time.Duration `default:"5s" env:"VM_PODWAITREADYINTERVALCHECK"`
// Defines single pod deadline
// to wait for transition to ready state
PodWaitReadyTimeout time.Duration `default:"80s" env:"VM_PODWAITREADYTIMEOUT"`
- // Defines poll interval for pods ready check
- // at statefulset rollout update
- PodWaitReadyIntervalCheck time.Duration `default:"5s" env:"VM_PODWAITREADYINTERVALCHECK"`
+ // Defines poll interval for PVC ready check
+ PVCWaitReadyInterval time.Duration `default:"5s" env:"VM_PVC_WAIT_READY_INTERVAL"`
+ // Defines poll timeout for PVC ready check
+ PVCWaitReadyTimeout time.Duration `default:"80s" env:"VM_PVC_WAIT_READY_TIMEOUT"`
+ // Defines poll interval for VM CRs
+ VMWaitReadyInterval time.Duration `default:"5s" env:"VM_WAIT_READY_INTERVAL"`
// configures force resync interval for VMAgent, VMAlert, VMAlertmanager and VMAuth.
ForceResyncInterval time.Duration `default:"60s" env:"VM_FORCERESYNCINTERVAL"`
// EnableStrictSecurity will add default `securityContext` to pods and containers created by operator
diff --git a/internal/controller/operator/factory/k8stools/interceptors.go b/internal/controller/operator/factory/k8stools/interceptors.go
index 6081fa48d..f8d719825 100644
--- a/internal/controller/operator/factory/k8stools/interceptors.go
+++ b/internal/controller/operator/factory/k8stools/interceptors.go
@@ -4,6 +4,7 @@ import (
"context"
appsv1 "k8s.io/api/apps/v1"
+ corev1 "k8s.io/api/core/v1"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
@@ -12,80 +13,55 @@ import (
vmv1beta1 "github.com/VictoriaMetrics/operator/api/operator/v1beta1"
)
+func updateStatus(ctx context.Context, cl client.WithWatch, obj client.Object) error {
+ switch v := obj.(type) {
+ case *appsv1.StatefulSet:
+ v.Status.ObservedGeneration = v.Generation
+ v.Status.ReadyReplicas = ptr.Deref(v.Spec.Replicas, 0)
+ v.Status.UpdatedReplicas = ptr.Deref(v.Spec.Replicas, 0)
+ v.Status.CurrentReplicas = ptr.Deref(v.Spec.Replicas, 0)
+ v.Status.UpdateRevision = "v1"
+ v.Status.CurrentRevision = "v1"
+ case *appsv1.Deployment:
+ v.Status.ObservedGeneration = v.Generation
+ v.Status.Conditions = append(v.Status.Conditions, appsv1.DeploymentCondition{
+ Type: appsv1.DeploymentProgressing,
+ Reason: "NewReplicaSetAvailable",
+ Status: "True",
+ })
+ v.Status.UpdatedReplicas = ptr.Deref(v.Spec.Replicas, 0)
+ v.Status.ReadyReplicas = ptr.Deref(v.Spec.Replicas, 0)
+ case *vmv1beta1.VMAgent:
+ v.Status.UpdateStatus = vmv1beta1.UpdateStatusOperational
+ v.Status.ObservedGeneration = v.Generation
+ case *vmv1beta1.VMCluster:
+ v.Status.UpdateStatus = vmv1beta1.UpdateStatusOperational
+ v.Status.ObservedGeneration = v.Generation
+ case *vmv1beta1.VMAuth:
+ v.Status.UpdateStatus = vmv1beta1.UpdateStatusOperational
+ v.Status.ObservedGeneration = v.Generation
+ case *corev1.PersistentVolumeClaim:
+ v.Status.Capacity = v.Spec.Resources.Requests
+ default:
+ return nil
+ }
+ return cl.Status().Update(ctx, obj)
+}
+
// GetInterceptorsWithObjects returns interceptors for objects
func GetInterceptorsWithObjects() interceptor.Funcs {
return interceptor.Funcs{
Create: func(ctx context.Context, cl client.WithWatch, obj client.Object, opts ...client.CreateOption) error {
- switch v := obj.(type) {
- case *appsv1.StatefulSet:
- v.Status.ObservedGeneration = v.Generation
- v.Status.ReadyReplicas = ptr.Deref(v.Spec.Replicas, 0)
- v.Status.UpdatedReplicas = ptr.Deref(v.Spec.Replicas, 0)
- v.Status.CurrentReplicas = ptr.Deref(v.Spec.Replicas, 0)
- v.Status.UpdateRevision = "v1"
- v.Status.CurrentRevision = "v1"
- case *appsv1.Deployment:
- v.Status.ObservedGeneration = v.Generation
- v.Status.Conditions = append(v.Status.Conditions, appsv1.DeploymentCondition{
- Type: appsv1.DeploymentProgressing,
- Reason: "NewReplicaSetAvailable",
- Status: "True",
- })
- v.Status.UpdatedReplicas = ptr.Deref(v.Spec.Replicas, 0)
- v.Status.ReadyReplicas = ptr.Deref(v.Spec.Replicas, 0)
- }
if err := cl.Create(ctx, obj, opts...); err != nil {
return err
}
- switch v := obj.(type) {
- case *vmv1beta1.VMAgent:
- v.Status.UpdateStatus = vmv1beta1.UpdateStatusOperational
- v.Status.ObservedGeneration = v.Generation
- return cl.Status().Update(ctx, v)
- case *vmv1beta1.VMCluster:
- v.Status.UpdateStatus = vmv1beta1.UpdateStatusOperational
- v.Status.ObservedGeneration = v.Generation
- return cl.Status().Update(ctx, v)
- case *vmv1beta1.VMAuth:
- v.Status.UpdateStatus = vmv1beta1.UpdateStatusOperational
- v.Status.ObservedGeneration = v.Generation
- return cl.Status().Update(ctx, v)
- }
- return nil
+ return updateStatus(ctx, cl, obj)
},
Update: func(ctx context.Context, cl client.WithWatch, obj client.Object, opts ...client.UpdateOption) error {
- switch v := obj.(type) {
- case *appsv1.StatefulSet:
- v.Status.ObservedGeneration = v.Generation
- v.Status.ReadyReplicas = ptr.Deref(v.Spec.Replicas, 0)
- v.Status.UpdatedReplicas = ptr.Deref(v.Spec.Replicas, 0)
- v.Status.CurrentReplicas = ptr.Deref(v.Spec.Replicas, 0)
- v.Status.UpdateRevision = "v1"
- v.Status.CurrentRevision = "v1"
- case *appsv1.Deployment:
- v.Status.ObservedGeneration = v.Generation
- v.Status.UpdatedReplicas = ptr.Deref(v.Spec.Replicas, 0)
- v.Status.ReadyReplicas = ptr.Deref(v.Spec.Replicas, 0)
- v.Status.Replicas = ptr.Deref(v.Spec.Replicas, 0)
- }
if err := cl.Update(ctx, obj, opts...); err != nil {
return err
}
- switch v := obj.(type) {
- case *vmv1beta1.VMAgent:
- v.Status.UpdateStatus = vmv1beta1.UpdateStatusOperational
- v.Status.ObservedGeneration = v.Generation
- return cl.Status().Update(ctx, v)
- case *vmv1beta1.VMCluster:
- v.Status.UpdateStatus = vmv1beta1.UpdateStatusOperational
- v.Status.ObservedGeneration = v.Generation
- return cl.Status().Update(ctx, v)
- case *vmv1beta1.VMAuth:
- v.Status.UpdateStatus = vmv1beta1.UpdateStatusOperational
- v.Status.ObservedGeneration = v.Generation
- return cl.Status().Update(ctx, v)
- }
- return nil
+ return updateStatus(ctx, cl, obj)
},
}
}
diff --git a/internal/controller/operator/factory/reconcile/daemonset.go b/internal/controller/operator/factory/reconcile/daemonset.go
index 9a26417d4..6bb87f550 100644
--- a/internal/controller/operator/factory/reconcile/daemonset.go
+++ b/internal/controller/operator/factory/reconcile/daemonset.go
@@ -67,7 +67,7 @@ func DaemonSet(ctx context.Context, rclient client.Client, newObj, prevObj *apps
if err != nil {
return err
}
- return waitDaemonSetReady(ctx, rclient, newObj, appWaitReadyDeadline)
+ return waitDaemonSetReady(ctx, rclient, newObj, appWaitReadyTimeout)
}
// waitDeploymentReady waits until deployment's replicaSet rollouts and all new pods is ready
diff --git a/internal/controller/operator/factory/reconcile/deploy.go b/internal/controller/operator/factory/reconcile/deploy.go
index 4a3140f57..5a2cd681a 100644
--- a/internal/controller/operator/factory/reconcile/deploy.go
+++ b/internal/controller/operator/factory/reconcile/deploy.go
@@ -69,7 +69,7 @@ func Deployment(ctx context.Context, rclient client.Client, newObj, prevObj *app
if err != nil {
return err
}
- return waitForDeploymentReady(ctx, rclient, newObj, appWaitReadyDeadline)
+ return waitForDeploymentReady(ctx, rclient, newObj, appWaitReadyTimeout)
}
// waitForDeploymentReady waits until deployment's replicaSet rollouts and all new pods is ready
diff --git a/internal/controller/operator/factory/reconcile/pvc.go b/internal/controller/operator/factory/reconcile/pvc.go
index 0d42b1a0d..ac88f37aa 100644
--- a/internal/controller/operator/factory/reconcile/pvc.go
+++ b/internal/controller/operator/factory/reconcile/pvc.go
@@ -6,8 +6,10 @@ import (
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
+ "k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
+ "k8s.io/apimachinery/pkg/util/wait"
"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/VictoriaMetrics/operator/internal/controller/operator/factory/logger"
@@ -20,24 +22,62 @@ import (
// in case of deletion timestamp > 0 does nothing
// user must manually remove finalizer if needed
func PersistentVolumeClaim(ctx context.Context, rclient client.Client, newObj, prevObj *corev1.PersistentVolumeClaim, owner *metav1.OwnerReference) error {
- l := logger.WithContext(ctx)
- var existingObj corev1.PersistentVolumeClaim
nsn := types.NamespacedName{Namespace: newObj.Namespace, Name: newObj.Name}
- if err := rclient.Get(ctx, nsn, &existingObj); err != nil {
- if k8serrors.IsNotFound(err) {
- l.Info(fmt.Sprintf("creating new PVC=%s", nsn.String()))
- if err := rclient.Create(ctx, newObj); err != nil {
- return fmt.Errorf("cannot create new PVC=%s: %w", nsn.String(), err)
+ var existingObj corev1.PersistentVolumeClaim
+ err := retryOnConflict(func() error {
+ if err := rclient.Get(ctx, nsn, &existingObj); err != nil {
+ if k8serrors.IsNotFound(err) {
+ logger.WithContext(ctx).Info(fmt.Sprintf("creating new PVC=%s", nsn.String()))
+ return rclient.Create(ctx, newObj)
}
+ return fmt.Errorf("cannot get existing PVC=%s: %w", nsn.String(), err)
+ }
+ if !existingObj.DeletionTimestamp.IsZero() {
return nil
}
- return fmt.Errorf("cannot get existing PVC=%s: %w", nsn.String(), err)
+ return updatePVC(ctx, rclient, &existingObj, newObj, prevObj, owner)
+ })
+ if err != nil {
+ return err
+ }
+ size := newObj.Spec.Resources.Requests[corev1.ResourceStorage]
+ if !existingObj.CreationTimestamp.IsZero() {
+ size = existingObj.Spec.Resources.Requests[corev1.ResourceStorage]
+ }
+ if err = waitForPVCReady(ctx, rclient, nsn, size); err != nil {
+ return err
}
if !existingObj.DeletionTimestamp.IsZero() {
- l.Info(fmt.Sprintf("PVC=%s has non zero DeletionTimestamp, skip update."+
+ logger.WithContext(ctx).Info(fmt.Sprintf("PVC=%s has non zero DeletionTimestamp, skip update."+
" To fix this, make backup for this pvc, delete pvc finalizers and restore from backup.", nsn.String()))
- return nil
}
+ return nil
+}
- return updatePVC(ctx, rclient, &existingObj, newObj, prevObj, owner)
+func waitForPVCReady(ctx context.Context, rclient client.Client, nsn types.NamespacedName, size resource.Quantity) error {
+ var pvc corev1.PersistentVolumeClaim
+ return wait.PollUntilContextTimeout(ctx, pvcWaitReadyInterval, pvcWaitReadyTimeout, true, func(ctx context.Context) (done bool, err error) {
+ if err := rclient.Get(ctx, nsn, &pvc); err != nil {
+ if k8serrors.IsNotFound(err) {
+ return false, nil
+ }
+ return false, fmt.Errorf("cannot get PVC=%s: %w", nsn.String(), err)
+ }
+ if !pvc.DeletionTimestamp.IsZero() {
+ return true, nil
+ }
+ if len(pvc.Status.Capacity) == 0 {
+ return true, nil
+ }
+ actualSize := pvc.Status.Capacity[corev1.ResourceStorage]
+ if actualSize.Cmp(size) < 0 {
+ return false, nil
+ }
+ for _, condition := range pvc.Status.Conditions {
+ if condition.Type == corev1.PersistentVolumeClaimResizing && condition.Status == corev1.ConditionTrue {
+ return false, nil
+ }
+ }
+ return true, nil
+ })
}
diff --git a/internal/controller/operator/factory/reconcile/pvc_test.go b/internal/controller/operator/factory/reconcile/pvc_test.go
index 4f20a5279..c44a0e8c5 100644
--- a/internal/controller/operator/factory/reconcile/pvc_test.go
+++ b/internal/controller/operator/factory/reconcile/pvc_test.go
@@ -38,6 +38,7 @@ func TestPersistentVolumeClaimReconcile(t *testing.T) {
},
},
}
+ pvc.Status.Capacity = pvc.Spec.Resources.Requests
for _, fn := range fns {
fn(pvc)
}
@@ -47,7 +48,7 @@ func TestPersistentVolumeClaimReconcile(t *testing.T) {
f := func(o opts) {
t.Helper()
ctx := context.Background()
- cl := k8stools.GetTestClientWithActions(o.predefinedObjects)
+ cl := k8stools.GetTestClientWithActionsAndObjects(o.predefinedObjects)
synctest.Test(t, func(t *testing.T) {
assert.NoError(t, PersistentVolumeClaim(ctx, cl, o.new, o.prev, nil))
assert.Equal(t, o.actions, cl.Actions)
@@ -62,6 +63,7 @@ func TestPersistentVolumeClaimReconcile(t *testing.T) {
actions: []k8stools.ClientAction{
{Verb: "Get", Kind: "PersistentVolumeClaim", Resource: nn},
{Verb: "Create", Kind: "PersistentVolumeClaim", Resource: nn},
+ {Verb: "Get", Kind: "PersistentVolumeClaim", Resource: nn},
},
})
@@ -74,6 +76,7 @@ func TestPersistentVolumeClaimReconcile(t *testing.T) {
},
actions: []k8stools.ClientAction{
{Verb: "Get", Kind: "PersistentVolumeClaim", Resource: nn},
+ {Verb: "Get", Kind: "PersistentVolumeClaim", Resource: nn},
},
})
@@ -89,6 +92,7 @@ func TestPersistentVolumeClaimReconcile(t *testing.T) {
actions: []k8stools.ClientAction{
{Verb: "Get", Kind: "PersistentVolumeClaim", Resource: nn},
{Verb: "Update", Kind: "PersistentVolumeClaim", Resource: nn},
+ {Verb: "Get", Kind: "PersistentVolumeClaim", Resource: nn},
},
})
@@ -106,6 +110,7 @@ func TestPersistentVolumeClaimReconcile(t *testing.T) {
actions: []k8stools.ClientAction{
{Verb: "Get", Kind: "PersistentVolumeClaim", Resource: nn},
{Verb: "Update", Kind: "PersistentVolumeClaim", Resource: nn},
+ {Verb: "Get", Kind: "PersistentVolumeClaim", Resource: nn},
},
})
}
diff --git a/internal/controller/operator/factory/reconcile/reconcile.go b/internal/controller/operator/factory/reconcile/reconcile.go
index bd39a4b67..1f0dccfb4 100644
--- a/internal/controller/operator/factory/reconcile/reconcile.go
+++ b/internal/controller/operator/factory/reconcile/reconcile.go
@@ -18,23 +18,32 @@ import (
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
vmv1beta1 "github.com/VictoriaMetrics/operator/api/operator/v1beta1"
+ "github.com/VictoriaMetrics/operator/internal/config"
"github.com/VictoriaMetrics/operator/internal/controller/operator/factory/finalize"
"github.com/VictoriaMetrics/operator/internal/controller/operator/factory/logger"
)
var (
- podWaitReadyIntervalCheck = 50 * time.Millisecond
- appWaitReadyDeadline = 5 * time.Second
- podWaitReadyTimeout = 5 * time.Second
- vmStatusInterval = 5 * time.Second
+ pvcWaitReadyInterval = 1 * time.Second
+ pvcWaitReadyTimeout = 5 * time.Second
+
+ podWaitReadyInterval = 1 * time.Second
+ podWaitReadyTimeout = 5 * time.Second
+
+ appWaitReadyTimeout = 5 * time.Second
+ vmWaitReadyInterval = 5 * time.Second
)
// Init sets package defaults
-func Init(intervalCheck, appWaitDeadline, podReadyDeadline, statusInterval, statusUpdate time.Duration) {
- podWaitReadyIntervalCheck = intervalCheck
- appWaitReadyDeadline = appWaitDeadline
- podWaitReadyTimeout = podReadyDeadline
- vmStatusInterval = statusInterval
+func Init(cfg *config.BaseOperatorConf, statusUpdate time.Duration) {
+ podWaitReadyInterval = cfg.PodWaitReadyInterval
+ podWaitReadyTimeout = cfg.PodWaitReadyTimeout
+
+ pvcWaitReadyInterval = cfg.PVCWaitReadyInterval
+ pvcWaitReadyTimeout = cfg.PVCWaitReadyTimeout
+
+ appWaitReadyTimeout = cfg.AppWaitReadyTimeout
+ vmWaitReadyInterval = cfg.VMWaitReadyInterval
statusUpdateTTL = statusUpdate
}
diff --git a/internal/controller/operator/factory/reconcile/statefulset.go b/internal/controller/operator/factory/reconcile/statefulset.go
index ebcc679d3..d0115d35a 100644
--- a/internal/controller/operator/factory/reconcile/statefulset.go
+++ b/internal/controller/operator/factory/reconcile/statefulset.go
@@ -43,7 +43,7 @@ func waitForStatefulSetReady(ctx context.Context, rclient client.Client, newObj
if newObj.Spec.Replicas == nil {
return nil
}
- err := wait.PollUntilContextTimeout(ctx, podWaitReadyIntervalCheck, appWaitReadyDeadline, true, func(ctx context.Context) (done bool, err error) {
+ err := wait.PollUntilContextTimeout(ctx, podWaitReadyInterval, appWaitReadyTimeout, true, func(ctx context.Context) (done bool, err error) {
var existingObj appsv1.StatefulSet
if err := rclient.Get(ctx, types.NamespacedName{Namespace: newObj.Namespace, Name: newObj.Name}, &existingObj); err != nil {
if k8serrors.IsNotFound(err) {
@@ -195,8 +195,8 @@ func StatefulSet(ctx context.Context, rclient client.Client, cr STSOptions, newO
// if ObservedGeneration matches current generation
func getLatestStsState(ctx context.Context, rclient client.Client, targetSTS types.NamespacedName) (*appsv1.StatefulSet, error) {
var sts appsv1.StatefulSet
- err := wait.PollUntilContextTimeout(ctx, podWaitReadyIntervalCheck,
- appWaitReadyDeadline, true, func(ctx context.Context) (done bool, err error) {
+ err := wait.PollUntilContextTimeout(ctx, podWaitReadyInterval,
+ appWaitReadyTimeout, true, func(ctx context.Context) (done bool, err error) {
if err := rclient.Get(ctx, targetSTS, &sts); err != nil {
return true, err
}
@@ -227,7 +227,7 @@ type rollingUpdateOpts struct {
// we always check if sts.Status.CurrentRevision needs update, to keep it equal to UpdateRevision
// see https://github.com/kubernetes/kube-state-metrics/issues/1324#issuecomment-1779751992
func performRollingUpdateOnSts(ctx context.Context, rclient client.Client, obj *appsv1.StatefulSet, o rollingUpdateOpts) error {
- time.Sleep(podWaitReadyIntervalCheck)
+ time.Sleep(podWaitReadyInterval)
nsn := types.NamespacedName{
Name: obj.Name,
Namespace: obj.Namespace,
@@ -316,7 +316,7 @@ func performRollingUpdateOnSts(ctx context.Context, rclient client.Client, obj *
l.Info(fmt.Sprintf("updating pod=%s revision label=%q", pod.Name, pod.Labels[podRevisionLabel]))
// eviction may fail due to podDisruption budget and it's unexpected
// so retry pod eviction
- evictErr := wait.PollUntilContextTimeout(ctx, podWaitReadyIntervalCheck, podWaitReadyTimeout, true, func(ctx context.Context) (done bool, err error) {
+ evictErr := wait.PollUntilContextTimeout(ctx, podWaitReadyInterval, podWaitReadyTimeout, true, func(ctx context.Context) (done bool, err error) {
if o.delete {
if err := rclient.Delete(ctx, &pod); err != nil {
if k8serrors.IsNotFound(err) {
@@ -379,7 +379,7 @@ func PodIsReady(pod *corev1.Pod, minReadySeconds int32) bool {
func waitForPodReady(ctx context.Context, rclient client.Client, nsn types.NamespacedName, desiredRevision string, minReadySeconds int32) error {
var pod corev1.Pod
- if err := wait.PollUntilContextTimeout(ctx, podWaitReadyIntervalCheck, podWaitReadyTimeout, true, func(ctx context.Context) (done bool, err error) {
+ if err := wait.PollUntilContextTimeout(ctx, podWaitReadyInterval, podWaitReadyTimeout, true, func(ctx context.Context) (done bool, err error) {
if err := rclient.Get(ctx, nsn, &pod); err != nil {
if k8serrors.IsNotFound(err) {
return false, nil
diff --git a/internal/controller/operator/factory/reconcile/statefulset_pvc_expand.go b/internal/controller/operator/factory/reconcile/statefulset_pvc_expand.go
index a438b12cb..08b669ea7 100644
--- a/internal/controller/operator/factory/reconcile/statefulset_pvc_expand.go
+++ b/internal/controller/operator/factory/reconcile/statefulset_pvc_expand.go
@@ -122,6 +122,13 @@ func updateSTSPVC(ctx context.Context, rclient client.Client, sts *appsv1.Statef
return err
}
}
+ for _, pvc := range pvcs.Items {
+ nsnPvc := types.NamespacedName{Name: pvc.Name, Namespace: pvc.Namespace}
+ size := pvc.Spec.Resources.Requests[corev1.ResourceStorage]
+ if err := waitForPVCReady(ctx, rclient, nsnPvc, size); err != nil {
+ return err
+ }
+ }
return nil
}
diff --git a/internal/controller/operator/factory/reconcile/statefulset_pvc_expand_test.go b/internal/controller/operator/factory/reconcile/statefulset_pvc_expand_test.go
index 6014a529c..1a36fea4d 100644
--- a/internal/controller/operator/factory/reconcile/statefulset_pvc_expand_test.go
+++ b/internal/controller/operator/factory/reconcile/statefulset_pvc_expand_test.go
@@ -160,10 +160,10 @@ func Test_updateSTSPVC(t *testing.T) {
}
f := func(o opts) {
t.Helper()
- cl := k8stools.GetTestClientWithActions(nil)
+ cl := k8stools.GetTestClientWithActionsAndObjects(nil)
ctx := context.TODO()
if o.preRun != nil {
- o.preRun(cl.Client)
+ o.preRun(cl)
cl.Actions = nil
}
err := updateSTSPVC(ctx, cl, o.sts, o.prevVCTs)
@@ -226,6 +226,11 @@ func Test_updateSTSPVC(t *testing.T) {
},
},
},
+ Status: corev1.PersistentVolumeClaimStatus{
+ Capacity: map[corev1.ResourceName]resource.Quantity{
+ corev1.ResourceStorage: resource.MustParse("10Gi"),
+ },
+ },
},
}
}),
@@ -294,7 +299,7 @@ func Test_updateSTSPVC(t *testing.T) {
"test": "test",
"3rd-party": "value",
},
- ResourceVersion: "1",
+ ResourceVersion: "2",
},
Spec: corev1.PersistentVolumeClaimSpec{
Resources: corev1.VolumeResourceRequirements{
@@ -303,6 +308,11 @@ func Test_updateSTSPVC(t *testing.T) {
},
},
},
+ Status: corev1.PersistentVolumeClaimStatus{
+ Capacity: map[corev1.ResourceName]resource.Quantity{
+ corev1.ResourceStorage: resource.MustParse("10Gi"),
+ },
+ },
},
},
})
@@ -383,6 +393,7 @@ func Test_updateSTSPVC(t *testing.T) {
},
actions: []k8stools.ClientAction{
{Verb: "Update", Kind: "PersistentVolumeClaim", Resource: pvc1NSN},
+ {Verb: "Get", Kind: "PersistentVolumeClaim", Resource: pvc1NSN},
},
expected: []corev1.PersistentVolumeClaim{
{
@@ -398,7 +409,7 @@ func Test_updateSTSPVC(t *testing.T) {
"test": "after",
"3rd-party": "value",
},
- ResourceVersion: "2",
+ ResourceVersion: "4",
},
Spec: corev1.PersistentVolumeClaimSpec{
Resources: corev1.VolumeResourceRequirements{
@@ -407,6 +418,11 @@ func Test_updateSTSPVC(t *testing.T) {
},
},
},
+ Status: corev1.PersistentVolumeClaimStatus{
+ Capacity: map[corev1.ResourceName]resource.Quantity{
+ corev1.ResourceStorage: resource.MustParse("10Gi"),
+ },
+ },
},
},
})
@@ -491,6 +507,8 @@ func Test_updateSTSPVC(t *testing.T) {
actions: []k8stools.ClientAction{
{Verb: "Update", Kind: "PersistentVolumeClaim", Resource: pvc2NSN},
{Verb: "Update", Kind: "PersistentVolumeClaim", Resource: pvc1NSN},
+ {Verb: "Get", Kind: "PersistentVolumeClaim", Resource: pvc2NSN},
+ {Verb: "Get", Kind: "PersistentVolumeClaim", Resource: pvc1NSN},
},
expected: []corev1.PersistentVolumeClaim{
{
@@ -501,7 +519,7 @@ func Test_updateSTSPVC(t *testing.T) {
Annotations: map[string]string{
"operator.victoriametrics.com/pvc-allow-volume-expansion": "true",
},
- ResourceVersion: "2",
+ ResourceVersion: "4",
},
Spec: corev1.PersistentVolumeClaimSpec{
Resources: corev1.VolumeResourceRequirements{
@@ -510,6 +528,11 @@ func Test_updateSTSPVC(t *testing.T) {
},
},
},
+ Status: corev1.PersistentVolumeClaimStatus{
+ Capacity: map[corev1.ResourceName]resource.Quantity{
+ corev1.ResourceStorage: resource.MustParse("5Gi"),
+ },
+ },
},
{
ObjectMeta: metav1.ObjectMeta{
@@ -519,7 +542,7 @@ func Test_updateSTSPVC(t *testing.T) {
Annotations: map[string]string{
"operator.victoriametrics.com/pvc-allow-volume-expansion": "true",
},
- ResourceVersion: "2",
+ ResourceVersion: "4",
},
Spec: corev1.PersistentVolumeClaimSpec{
Resources: corev1.VolumeResourceRequirements{
@@ -528,6 +551,11 @@ func Test_updateSTSPVC(t *testing.T) {
},
},
},
+ Status: corev1.PersistentVolumeClaimStatus{
+ Capacity: map[corev1.ResourceName]resource.Quantity{
+ corev1.ResourceStorage: resource.MustParse("15Gi"),
+ },
+ },
},
},
})
@@ -581,7 +609,7 @@ func Test_updateSTSPVC(t *testing.T) {
Name: pvc1NSN.Name,
Namespace: pvc1NSN.Namespace,
Labels: map[string]string{"app": "vmselect"},
- ResourceVersion: "1",
+ ResourceVersion: "2",
},
Spec: corev1.PersistentVolumeClaimSpec{
Resources: corev1.VolumeResourceRequirements{
@@ -590,6 +618,11 @@ func Test_updateSTSPVC(t *testing.T) {
},
},
},
+ Status: corev1.PersistentVolumeClaimStatus{
+ Capacity: map[corev1.ResourceName]resource.Quantity{
+ corev1.ResourceStorage: resource.MustParse("10Gi"),
+ },
+ },
},
},
})
@@ -643,6 +676,7 @@ func Test_updateSTSPVC(t *testing.T) {
},
actions: []k8stools.ClientAction{
{Verb: "Update", Kind: "PersistentVolumeClaim", Resource: pvc1NSN},
+ {Verb: "Get", Kind: "PersistentVolumeClaim", Resource: pvc1NSN},
},
expected: []corev1.PersistentVolumeClaim{
{
@@ -653,7 +687,7 @@ func Test_updateSTSPVC(t *testing.T) {
Annotations: map[string]string{
"operator.victoriametrics.com/pvc-allow-volume-expansion": "true",
},
- ResourceVersion: "2",
+ ResourceVersion: "4",
},
Spec: corev1.PersistentVolumeClaimSpec{
Resources: corev1.VolumeResourceRequirements{
@@ -662,6 +696,11 @@ func Test_updateSTSPVC(t *testing.T) {
},
},
},
+ Status: corev1.PersistentVolumeClaimStatus{
+ Capacity: map[corev1.ResourceName]resource.Quantity{
+ corev1.ResourceStorage: resource.MustParse("15Gi"),
+ },
+ },
},
},
})
@@ -725,6 +764,7 @@ func Test_updateSTSPVC(t *testing.T) {
},
actions: []k8stools.ClientAction{
{Verb: "Update", Kind: "PersistentVolumeClaim", Resource: pvc1NSN},
+ {Verb: "Get", Kind: "PersistentVolumeClaim", Resource: pvc1NSN},
},
expected: []corev1.PersistentVolumeClaim{
{
@@ -735,7 +775,7 @@ func Test_updateSTSPVC(t *testing.T) {
Annotations: map[string]string{
"operator.victoriametrics.com/pvc-allow-volume-expansion": "true",
},
- ResourceVersion: "2",
+ ResourceVersion: "4",
},
Spec: corev1.PersistentVolumeClaimSpec{
Resources: corev1.VolumeResourceRequirements{
@@ -744,6 +784,11 @@ func Test_updateSTSPVC(t *testing.T) {
},
},
},
+ Status: corev1.PersistentVolumeClaimStatus{
+ Capacity: map[corev1.ResourceName]resource.Quantity{
+ corev1.ResourceStorage: resource.MustParse("15Gi"),
+ },
+ },
},
},
})
@@ -772,7 +817,7 @@ func Test_updateSTSPVC(t *testing.T) {
Name: "orphan-vmselect-0",
Namespace: "default",
Labels: map[string]string{"app": "vmselect"},
- ResourceVersion: "1",
+ ResourceVersion: "2",
},
},
},
@@ -825,7 +870,7 @@ func Test_updateSTSPVC(t *testing.T) {
Name: "data-vmselect-0",
Namespace: "default",
Labels: map[string]string{"app": "vmselect"},
- ResourceVersion: "1",
+ ResourceVersion: "2",
},
Spec: corev1.PersistentVolumeClaimSpec{
Resources: corev1.VolumeResourceRequirements{
@@ -834,6 +879,11 @@ func Test_updateSTSPVC(t *testing.T) {
},
},
},
+ Status: corev1.PersistentVolumeClaimStatus{
+ Capacity: map[corev1.ResourceName]resource.Quantity{
+ corev1.ResourceStorage: resource.MustParse("20Gi"),
+ },
+ },
},
},
})
diff --git a/internal/controller/operator/factory/reconcile/vmagent.go b/internal/controller/operator/factory/reconcile/vmagent.go
index 583279ab5..a85734128 100644
--- a/internal/controller/operator/factory/reconcile/vmagent.go
+++ b/internal/controller/operator/factory/reconcile/vmagent.go
@@ -59,7 +59,7 @@ func VMAgent(ctx context.Context, rclient client.Client, newObj, prevObj *vmv1be
if err != nil {
return err
}
- if err := waitForStatus(ctx, rclient, newObj, vmStatusInterval, vmv1beta1.UpdateStatusOperational); err != nil {
+ if err := waitForStatus(ctx, rclient, newObj, vmWaitReadyInterval, vmv1beta1.UpdateStatusOperational); err != nil {
return fmt.Errorf("failed to wait for VMAgent=%s to be ready: %w", nsn.String(), err)
}
return nil
diff --git a/internal/controller/operator/factory/reconcile/vmauth.go b/internal/controller/operator/factory/reconcile/vmauth.go
index b7a1f90b8..97c09caea 100644
--- a/internal/controller/operator/factory/reconcile/vmauth.go
+++ b/internal/controller/operator/factory/reconcile/vmauth.go
@@ -58,7 +58,7 @@ func VMAuth(ctx context.Context, rclient client.Client, newObj, prevObj *vmv1bet
if err != nil {
return err
}
- if err := waitForStatus(ctx, rclient, newObj, vmStatusInterval, vmv1beta1.UpdateStatusOperational); err != nil {
+ if err := waitForStatus(ctx, rclient, newObj, vmWaitReadyInterval, vmv1beta1.UpdateStatusOperational); err != nil {
return fmt.Errorf("failed to wait for VMAuth=%s to be ready: %w", nsn.String(), err)
}
return nil
diff --git a/internal/controller/operator/factory/reconcile/vmcluster.go b/internal/controller/operator/factory/reconcile/vmcluster.go
index bb114929f..0c2336779 100644
--- a/internal/controller/operator/factory/reconcile/vmcluster.go
+++ b/internal/controller/operator/factory/reconcile/vmcluster.go
@@ -58,7 +58,7 @@ func VMCluster(ctx context.Context, rclient client.Client, newObj, prevObj *vmv1
if err != nil {
return err
}
- if err := waitForStatus(ctx, rclient, newObj, vmStatusInterval, vmv1beta1.UpdateStatusOperational); err != nil {
+ if err := waitForStatus(ctx, rclient, newObj, vmWaitReadyInterval, vmv1beta1.UpdateStatusOperational); err != nil {
return fmt.Errorf("failed to wait for VMCluster=%s to be ready: %w", nsn.String(), err)
}
return nil
diff --git a/internal/manager/manager.go b/internal/manager/manager.go
index 823430a1e..68b1db9ed 100644
--- a/internal/manager/manager.go
+++ b/internal/manager/manager.go
@@ -232,7 +232,7 @@ func RunManager(ctx context.Context) error {
}
}
- reconcile.Init(baseConfig.PodWaitReadyIntervalCheck, baseConfig.AppReadyTimeout, baseConfig.PodWaitReadyTimeout, 5*time.Second, *statusUpdateTTL)
+ reconcile.Init(baseConfig, *statusUpdateTTL)
config := ctrl.GetConfigOrDie()
config.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(float32(*clientQPS), *clientBurst)
diff --git a/test/e2e/vlagent_test.go b/test/e2e/vlagent_test.go
index 42f74e36e..57a2e716b 100644
--- a/test/e2e/vlagent_test.go
+++ b/test/e2e/vlagent_test.go
@@ -377,6 +377,25 @@ var _ = Describe("test vlagent Controller", Label("vl", "agent", "vlagent"), fun
Expect(hasVolumeMount(vmc.VolumeMounts, "/var/run/secrets/kubernetes.io/serviceaccount")).To(HaveOccurred())
},
),
+ Entry("with UseProxyProtocol", "proxy-protocol",
+ &vmv1.VLAgent{
+ ObjectMeta: metav1.ObjectMeta{
+ Namespace: namespace,
+ Name: nsn.Name,
+ },
+ Spec: vmv1.VLAgentSpec{
+ CommonApplicationDeploymentParams: vmv1beta1.CommonApplicationDeploymentParams{
+ ReplicaCount: ptr.To[int32](1),
+ ExtraArgs: map[string]string{
+ "httpListenAddr.useProxyProtocol": "true",
+ },
+ },
+ RemoteWrite: []vmv1.VLAgentRemoteWriteSpec{{URL: "http://localhost:9428/internal/insert"}},
+ },
+ },
+ nil,
+ func(cr *vmv1.VLAgent) {},
+ ),
)
type testStep struct {
setup func(*vmv1.VLAgent)
diff --git a/test/e2e/vlcluster_test.go b/test/e2e/vlcluster_test.go
index f3ac7c774..335d2b686 100644
--- a/test/e2e/vlcluster_test.go
+++ b/test/e2e/vlcluster_test.go
@@ -54,6 +54,86 @@ var _ = Describe("test vlcluster Controller", Label("vl", "cluster", "vlcluster"
},
},
}
+ DescribeTable("should create vlcluster",
+ func(name string, cr *vmv1.VLCluster, verify func(cr *vmv1.VLCluster)) {
+ cr.Name = name
+ cr.Namespace = namespace
+ nsn.Name = name
+ Expect(k8sClient.Create(ctx, cr)).ToNot(HaveOccurred())
+ Eventually(func() error {
+ return expectObjectStatusOperational(ctx, k8sClient, &vmv1.VLCluster{}, nsn)
+ }, eventualDeploymentAppReadyTimeout).ShouldNot(HaveOccurred())
+ if verify != nil {
+ var created vmv1.VLCluster
+ Expect(k8sClient.Get(ctx, nsn, &created)).ToNot(HaveOccurred())
+ verify(&created)
+ }
+ },
+ Entry("with UseProxyProtocol on all components", "proxy-protocol",
+ &vmv1.VLCluster{
+ ObjectMeta: metav1.ObjectMeta{
+ Namespace: namespace,
+ },
+ Spec: vmv1.VLClusterSpec{
+ VLInsert: &vmv1.VLInsert{
+ CommonApplicationDeploymentParams: vmv1beta1.CommonApplicationDeploymentParams{
+ ExtraArgs: map[string]string{
+ "httpListenAddr.useProxyProtocol": "true",
+ },
+ },
+ },
+ VLSelect: &vmv1.VLSelect{
+ CommonApplicationDeploymentParams: vmv1beta1.CommonApplicationDeploymentParams{
+ ExtraArgs: map[string]string{
+ "httpListenAddr.useProxyProtocol": "true",
+ },
+ },
+ },
+ VLStorage: &vmv1.VLStorage{
+ RetentionPeriod: "1",
+ CommonApplicationDeploymentParams: vmv1beta1.CommonApplicationDeploymentParams{
+ ReplicaCount: ptr.To[int32](1),
+ ExtraArgs: map[string]string{
+ "httpListenAddr.useProxyProtocol": "true",
+ },
+ },
+ },
+ },
+ },
+ nil,
+ ),
+ Entry("with RequestsLoadBalancer enabled", "with-lb",
+ &vmv1.VLCluster{
+ ObjectMeta: metav1.ObjectMeta{
+ Namespace: namespace,
+ },
+ Spec: vmv1.VLClusterSpec{
+ RequestsLoadBalancer: vmv1beta1.VMAuthLoadBalancer{
+ Enabled: true,
+ },
+ VLInsert: &vmv1.VLInsert{},
+ VLSelect: &vmv1.VLSelect{},
+ VLStorage: &vmv1.VLStorage{
+ RetentionPeriod: "1",
+ CommonApplicationDeploymentParams: vmv1beta1.CommonApplicationDeploymentParams{
+ ReplicaCount: ptr.To[int32](1),
+ },
+ },
+ },
+ },
+ func(cr *vmv1.VLCluster) {
+ var dep appsv1.Deployment
+ nss := types.NamespacedName{Namespace: namespace, Name: cr.PrefixedName(vmv1beta1.ClusterComponentBalancer)}
+ Expect(k8sClient.Get(ctx, nss, &dep)).ToNot(HaveOccurred())
+ var svc corev1.Service
+ Expect(k8sClient.Get(ctx, types.NamespacedName{Namespace: namespace, Name: cr.PrefixedName(vmv1beta1.ClusterComponentSelect)}, &svc)).ToNot(HaveOccurred())
+ Expect(svc.Spec.Selector).To(Equal(cr.SelectorLabels(vmv1beta1.ClusterComponentBalancer)))
+ Expect(k8sClient.Get(ctx, types.NamespacedName{Namespace: namespace, Name: cr.PrefixedName(vmv1beta1.ClusterComponentInsert)}, &svc)).ToNot(HaveOccurred())
+ Expect(svc.Spec.Selector).To(Equal(cr.SelectorLabels(vmv1beta1.ClusterComponentBalancer)))
+ },
+ ),
+ )
+
type testStep struct {
setup func(*vmv1.VLCluster)
modify func(*vmv1.VLCluster)
diff --git a/test/e2e/vlsingle_test.go b/test/e2e/vlsingle_test.go
index 6c4a0517c..5c4960f9a 100644
--- a/test/e2e/vlsingle_test.go
+++ b/test/e2e/vlsingle_test.go
@@ -159,6 +159,23 @@ var _ = Describe("test vlsingle Controller", Label("vl", "single", "vlsingle"),
Expect(ts.Containers[0].VolumeMounts[1].Name).To(Equal("unused"))
}),
+ Entry("with UseProxyProtocol", "proxy-protocol",
+ &vmv1.VLSingle{
+ ObjectMeta: metav1.ObjectMeta{
+ Namespace: namespace,
+ },
+ Spec: vmv1.VLSingleSpec{
+ CommonApplicationDeploymentParams: vmv1beta1.CommonApplicationDeploymentParams{
+ ReplicaCount: ptr.To[int32](1),
+ ExtraArgs: map[string]string{
+ "httpListenAddr.useProxyProtocol": "true",
+ },
+ },
+ RetentionPeriod: "1",
+ },
+ },
+ func(cr *vmv1.VLSingle) {},
+ ),
)
baseVLSingle := &vmv1.VLSingle{
diff --git a/test/e2e/vmagent_test.go b/test/e2e/vmagent_test.go
index 2b9a0d129..7b2ed92e6 100644
--- a/test/e2e/vmagent_test.go
+++ b/test/e2e/vmagent_test.go
@@ -386,6 +386,65 @@ var _ = Describe("test vmagent Controller", Label("vm", "agent", "vmagent"), fun
Expect(vmc.VolumeMounts).To(HaveLen(5))
Expect(hasVolumeMount(vmc.VolumeMounts, "/var/run/secrets/kubernetes.io/serviceaccount")).ToNot(HaveOccurred())
}),
+ Entry("with UseProxyProtocol in deployment mode", "proxy-protocol-deploy",
+ &vmv1beta1.VMAgent{
+ ObjectMeta: metav1.ObjectMeta{
+ Namespace: namespace,
+ Name: nsn.Name,
+ },
+ Spec: vmv1beta1.VMAgentSpec{
+ CommonApplicationDeploymentParams: vmv1beta1.CommonApplicationDeploymentParams{
+ ReplicaCount: ptr.To[int32](1),
+ ExtraArgs: map[string]string{
+ "httpListenAddr.useProxyProtocol": "true",
+ },
+ },
+ RemoteWrite: []vmv1beta1.VMAgentRemoteWriteSpec{
+ {URL: "http://localhost:8428"},
+ },
+ },
+ }, nil, func(cr *vmv1beta1.VMAgent) {},
+ ),
+ Entry("with UseProxyProtocol in statefulset mode", "proxy-protocol-sts",
+ &vmv1beta1.VMAgent{
+ ObjectMeta: metav1.ObjectMeta{
+ Namespace: namespace,
+ Name: nsn.Name,
+ },
+ Spec: vmv1beta1.VMAgentSpec{
+ CommonApplicationDeploymentParams: vmv1beta1.CommonApplicationDeploymentParams{
+ ReplicaCount: ptr.To[int32](1),
+ ExtraArgs: map[string]string{
+ "httpListenAddr.useProxyProtocol": "true",
+ },
+ },
+ StatefulMode: true,
+ RemoteWrite: []vmv1beta1.VMAgentRemoteWriteSpec{
+ {URL: "http://localhost:8428"},
+ },
+ },
+ }, nil, func(cr *vmv1beta1.VMAgent) {},
+ ),
+ Entry("with UseProxyProtocol in daemonset mode", "proxy-protocol-ds",
+ &vmv1beta1.VMAgent{
+ ObjectMeta: metav1.ObjectMeta{
+ Namespace: namespace,
+ Name: nsn.Name,
+ },
+ Spec: vmv1beta1.VMAgentSpec{
+ CommonApplicationDeploymentParams: vmv1beta1.CommonApplicationDeploymentParams{
+ ReplicaCount: ptr.To[int32](1),
+ ExtraArgs: map[string]string{
+ "httpListenAddr.useProxyProtocol": "true",
+ },
+ },
+ DaemonSetMode: true,
+ RemoteWrite: []vmv1beta1.VMAgentRemoteWriteSpec{
+ {URL: "http://localhost:8428"},
+ },
+ },
+ }, nil, func(cr *vmv1beta1.VMAgent) {},
+ ),
)
type testStep struct {
setup func(*vmv1beta1.VMAgent)
diff --git a/test/e2e/vmcluster_test.go b/test/e2e/vmcluster_test.go
index 4096b8379..dace800ad 100644
--- a/test/e2e/vmcluster_test.go
+++ b/test/e2e/vmcluster_test.go
@@ -351,6 +351,80 @@ var _ = Describe("e2e vmcluster", Label("vm", "cluster", "vmcluster"), func() {
}
},
),
+ Entry("with UseProxyProtocol on all components", "proxy-protocol",
+ &vmv1beta1.VMCluster{
+ ObjectMeta: metav1.ObjectMeta{
+ Namespace: namespace,
+ Name: nsn.Name,
+ },
+ Spec: vmv1beta1.VMClusterSpec{
+ RetentionPeriod: "1",
+ VMStorage: &vmv1beta1.VMStorage{
+ CommonApplicationDeploymentParams: vmv1beta1.CommonApplicationDeploymentParams{
+ ReplicaCount: ptr.To[int32](1),
+ ExtraArgs: map[string]string{
+ "httpListenAddr.useProxyProtocol": "true",
+ },
+ },
+ },
+ VMSelect: &vmv1beta1.VMSelect{
+ CommonApplicationDeploymentParams: vmv1beta1.CommonApplicationDeploymentParams{
+ ReplicaCount: ptr.To[int32](1),
+ ExtraArgs: map[string]string{
+ "httpListenAddr.useProxyProtocol": "true",
+ },
+ },
+ },
+ VMInsert: &vmv1beta1.VMInsert{
+ CommonApplicationDeploymentParams: vmv1beta1.CommonApplicationDeploymentParams{
+ ReplicaCount: ptr.To[int32](1),
+ ExtraArgs: map[string]string{
+ "httpListenAddr.useProxyProtocol": "true",
+ },
+ },
+ },
+ },
+ },
+ nil,
+ ),
+ Entry("with RequestsLoadBalancer enabled", "with-lb",
+ &vmv1beta1.VMCluster{
+ ObjectMeta: metav1.ObjectMeta{
+ Namespace: namespace,
+ Name: nsn.Name,
+ },
+ Spec: vmv1beta1.VMClusterSpec{
+ RetentionPeriod: "1",
+ RequestsLoadBalancer: vmv1beta1.VMAuthLoadBalancer{
+ Enabled: true,
+ },
+ VMStorage: &vmv1beta1.VMStorage{
+ CommonApplicationDeploymentParams: vmv1beta1.CommonApplicationDeploymentParams{
+ ReplicaCount: ptr.To[int32](1),
+ },
+ },
+ VMSelect: &vmv1beta1.VMSelect{
+ CommonApplicationDeploymentParams: vmv1beta1.CommonApplicationDeploymentParams{
+ ReplicaCount: ptr.To[int32](1),
+ },
+ },
+ VMInsert: &vmv1beta1.VMInsert{
+ CommonApplicationDeploymentParams: vmv1beta1.CommonApplicationDeploymentParams{
+ ReplicaCount: ptr.To[int32](1),
+ },
+ },
+ },
+ },
+ func(cr *vmv1beta1.VMCluster) {
+ var lbDep appsv1.Deployment
+ nss := types.NamespacedName{Namespace: namespace, Name: cr.PrefixedName(vmv1beta1.ClusterComponentBalancer)}
+ Expect(k8sClient.Get(ctx, nss, &lbDep)).ToNot(HaveOccurred())
+ var svc corev1.Service
+ Expect(k8sClient.Get(ctx, nss, &svc)).ToNot(HaveOccurred())
+ Expect(k8sClient.Get(ctx, types.NamespacedName{Namespace: namespace, Name: cr.PrefixedInternalName(vmv1beta1.ClusterComponentInsert)}, &svc)).ToNot(HaveOccurred())
+ Expect(k8sClient.Get(ctx, types.NamespacedName{Namespace: namespace, Name: cr.PrefixedInternalName(vmv1beta1.ClusterComponentSelect)}, &svc)).ToNot(HaveOccurred())
+ },
+ ),
)
})
Context("update", func() {
diff --git a/test/e2e/vmsingle_test.go b/test/e2e/vmsingle_test.go
index e17cba4cf..1998e5743 100644
--- a/test/e2e/vmsingle_test.go
+++ b/test/e2e/vmsingle_test.go
@@ -503,6 +503,17 @@ var _ = Describe("test vmsingle Controller", Label("vm", "single"), func() {
},
},
),
+ Entry("by enabling UseProxyProtocol", "proxy-protocol", false,
+ baseSingle.DeepCopy(),
+ testStep{
+ modify: func(cr *vmv1beta1.VMSingle) {
+ cr.Spec.ExtraArgs = map[string]string{
+ "httpListenAddr.useProxyProtocol": "true",
+ }
+ },
+ verify: func(cr *vmv1beta1.VMSingle) {},
+ },
+ ),
)
},
)
diff --git a/test/e2e/vtcluster_test.go b/test/e2e/vtcluster_test.go
index 8ca5dc72d..51e5f13a2 100644
--- a/test/e2e/vtcluster_test.go
+++ b/test/e2e/vtcluster_test.go
@@ -39,6 +39,57 @@ var _ = Describe("test vtcluster Controller", Label("vt", "cluster", "vtcluster"
})).ToNot(HaveOccurred())
waitResourceDeleted(ctx, k8sClient, nsn, &vmv1.VTCluster{})
})
+
+ DescribeTable("should create", func(name string, cr *vmv1.VTCluster, verify func(cr *vmv1.VTCluster)) {
+ nsn.Name = name
+ cr.Name = name
+ Expect(k8sClient.Create(ctx, cr)).ToNot(HaveOccurred())
+ Eventually(func() error {
+ return expectObjectStatusOperational(ctx, k8sClient, &vmv1.VTCluster{}, nsn)
+ }, eventualDeploymentAppReadyTimeout).ShouldNot(HaveOccurred())
+
+ var created vmv1.VTCluster
+ Expect(k8sClient.Get(ctx, nsn, &created)).ToNot(HaveOccurred())
+ verify(&created)
+ },
+ Entry("with UseProxyProtocol on all components", "proxy-protocol",
+ &vmv1.VTCluster{
+ ObjectMeta: metav1.ObjectMeta{
+ Namespace: namespace,
+ Name: nsn.Name,
+ },
+ Spec: vmv1.VTClusterSpec{
+ Storage: &vmv1.VTStorage{
+ RetentionPeriod: "1",
+ CommonApplicationDeploymentParams: vmv1beta1.CommonApplicationDeploymentParams{
+ ReplicaCount: ptr.To[int32](1),
+ ExtraArgs: map[string]string{
+ "httpListenAddr.useProxyProtocol": "true",
+ },
+ },
+ },
+ Select: &vmv1.VTSelect{
+ CommonApplicationDeploymentParams: vmv1beta1.CommonApplicationDeploymentParams{
+ ReplicaCount: ptr.To[int32](1),
+ ExtraArgs: map[string]string{
+ "httpListenAddr.useProxyProtocol": "true",
+ },
+ },
+ },
+ Insert: &vmv1.VTInsert{
+ CommonApplicationDeploymentParams: vmv1beta1.CommonApplicationDeploymentParams{
+ ReplicaCount: ptr.To[int32](1),
+ ExtraArgs: map[string]string{
+ "httpListenAddr.useProxyProtocol": "true",
+ },
+ },
+ },
+ },
+ },
+ func(cr *vmv1.VTCluster) {},
+ ),
+ )
+
baseVTCluster := &vmv1.VTCluster{
ObjectMeta: metav1.ObjectMeta{
Namespace: namespace,
diff --git a/test/e2e/vtsingle_test.go b/test/e2e/vtsingle_test.go
index a48b8cc23..394c3ce3d 100644
--- a/test/e2e/vtsingle_test.go
+++ b/test/e2e/vtsingle_test.go
@@ -159,6 +159,23 @@ var _ = Describe("test vtsingle Controller", Label("vt", "single", "vtsingle"),
Expect(ts.Containers[0].VolumeMounts[1].Name).To(Equal("unused"))
}),
+ Entry("with UseProxyProtocol", "proxy-protocol",
+ &vmv1.VTSingle{
+ ObjectMeta: metav1.ObjectMeta{
+ Namespace: namespace,
+ },
+ Spec: vmv1.VTSingleSpec{
+ CommonApplicationDeploymentParams: vmv1beta1.CommonApplicationDeploymentParams{
+ ReplicaCount: ptr.To[int32](1),
+ ExtraArgs: map[string]string{
+ "httpListenAddr.useProxyProtocol": "true",
+ },
+ },
+ RetentionPeriod: "1",
+ },
+ },
+ func(cr *vmv1.VTSingle) {},
+ ),
)
baseVTSingle := &vmv1.VTSingle{