Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/manager/signals"

"github.com/VictoriaMetrics/operator/internal/controller/operator"
"github.com/VictoriaMetrics/operator/internal/manager"
)

Expand All @@ -31,11 +32,11 @@ var (
)

func main() {
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancelCause(context.Background())
stop := signals.SetupSignalHandler()
go func() {
<-stop.Done()
cancel()
cancel(operator.ErrShutdown)
}()

err := manager.RunManager(ctx)
Expand Down
9 changes: 9 additions & 0 deletions internal/controller/operator/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ type parsingError struct {
controller string
}

// ErrShutdown is a custom error returned as a cause of operator context cancel
var ErrShutdown = fmt.Errorf("graceful shutdown, exiting")

func (pe *parsingError) Error() string {
return fmt.Sprintf("parsing object error for object controller=%q: %q",
pe.controller, pe.origin)
Expand Down Expand Up @@ -126,6 +129,9 @@ func handleReconcileErr[T client.Object, ST reconcile.StatusWithMetadata[STC], S
switch {
case errors.Is(err, context.Canceled):
contextCancelErrorsTotal.Inc()
if !errors.Is(context.Cause(ctx), ErrShutdown) {
originResult.RequeueAfter = time.Second * 5
}
return originResult, nil
case errors.As(err, &pe):
namespacedName := "unknown"
Expand Down Expand Up @@ -197,6 +203,9 @@ func handleReconcileErrWithoutStatus(
switch {
case errors.Is(err, context.Canceled):
contextCancelErrorsTotal.Inc()
if !errors.Is(context.Cause(ctx), ErrShutdown) {
originResult.RequeueAfter = time.Second * 5
}
return originResult, nil
case errors.As(err, &pe):
parseObjectErrorsTotal.WithLabelValues(pe.controller, "unknown").Inc()
Expand Down
79 changes: 79 additions & 0 deletions internal/controller/operator/controllers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@ package operator

import (
"context"
"errors"
"fmt"
"testing"
"time"

"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

vmv1beta1 "github.com/VictoriaMetrics/operator/api/operator/v1beta1"
Expand Down Expand Up @@ -314,3 +318,78 @@ func TestIsSelectorsMatchesTargetCRD(t *testing.T) {
isMatch: true,
})
}

func TestHandleReconcileErrWithoutStatus(t *testing.T) {
type opts struct {
err error
origin ctrl.Result
wantResult ctrl.Result
wantErr error
}

f := func(ctx context.Context, o opts) {
t.Helper()
got, err := handleReconcileErrWithoutStatus(ctx, nil, nil, o.origin, o.err)
assert.Equal(t, o.wantErr, err)
assert.Equal(t, o.wantResult, got)
}

// no error
f(context.Background(), opts{
err: nil,
origin: ctrl.Result{RequeueAfter: 10},
wantResult: ctrl.Result{RequeueAfter: 10},
wantErr: nil,
})

// context canceled
f(context.Background(), opts{
err: context.Canceled,
origin: ctrl.Result{},
wantResult: ctrl.Result{RequeueAfter: time.Second * 5},
wantErr: nil,
})

// context canceled with ErrShutdown
shutdownCtx, shutdownCancel := context.WithCancelCause(context.Background())
shutdownCancel(ErrShutdown)
f(shutdownCtx, opts{
err: fmt.Errorf("wrapped: %w", errors.Join(context.Canceled, ErrShutdown)),
origin: ctrl.Result{},
wantResult: ctrl.Result{},
wantErr: nil,
})
}

func TestHandleReconcileErr(t *testing.T) {
type opts struct {
err error
origin ctrl.Result
wantResult ctrl.Result
wantErr error
}

f := func(o opts) {
t.Helper()
got, err := handleReconcileErr(context.Background(), nil, (*vmv1beta1.VMCluster)(nil), o.origin, o.err)
assert.Equal(t, o.wantErr, err)
assert.Equal(t, o.wantResult, got)
}

// no error
f(opts{
err: nil,
origin: ctrl.Result{RequeueAfter: 10},
wantResult: ctrl.Result{RequeueAfter: 10},
wantErr: nil,
})

// context canceled
f(opts{
err: context.Canceled,
origin: ctrl.Result{},
wantResult: ctrl.Result{RequeueAfter: time.Second * 5},
wantErr: nil,
})

}
35 changes: 9 additions & 26 deletions internal/controller/operator/factory/vmdistributed/zone.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package vmdistributed

import (
"context"
"errors"
"fmt"
"net/http"
"net/url"
Expand Down Expand Up @@ -195,8 +194,9 @@ func (zs *zones) upgrade(ctx context.Context, rclient client.Client, cr *vmv1alp
}
if needsLBUpdate {
// wait for empty persistent queue
if err := zs.waitForEmptyPQ(ctx, rclient, defaultMetricsCheckInterval, i); err != nil {
return fmt.Errorf("zone=%s: failed to wait till VMCluster=%s queue is empty: %w", item, nsnCluster.String(), err)
zs.waitForEmptyPQ(ctx, rclient, defaultMetricsCheckInterval, i)
if ctx.Err() != nil {
return fmt.Errorf("zone=%s: failed to wait till VMCluster=%s queue is empty", item, nsnCluster.String())
Copy link
Contributor

@cubic-dev-ai cubic-dev-ai bot Mar 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1: Wrap ctx.Err() in this return; otherwise canceled zone updates are treated as generic failures and won't be requeued.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At internal/controller/operator/factory/vmdistributed/zone.go, line 199:

<comment>Wrap `ctx.Err()` in this return; otherwise canceled zone updates are treated as generic failures and won't be requeued.</comment>

<file context>
@@ -195,8 +194,9 @@ func (zs *zones) upgrade(ctx context.Context, rclient client.Client, cr *vmv1alp
-			return fmt.Errorf("zone=%s: failed to wait till VMCluster=%s queue is empty: %w", item, nsnCluster.String(), err)
+		zs.waitForEmptyPQ(ctx, rclient, defaultMetricsCheckInterval, i)
+		if ctx.Err() != nil {
+			return fmt.Errorf("zone=%s: failed to wait till VMCluster=%s queue is empty", item, nsnCluster.String())
 		}
 
</file context>
Suggested change
return fmt.Errorf("zone=%s: failed to wait till VMCluster=%s queue is empty", item, nsnCluster.String())
return fmt.Errorf("zone=%s: failed to wait till VMCluster=%s queue is empty: %w", item, nsnCluster.String(), ctx.Err())
Fix with Cubic

}

// excluding zone from VMAuth LB
Expand All @@ -217,8 +217,9 @@ func (zs *zones) upgrade(ctx context.Context, rclient client.Client, cr *vmv1alp
}

// wait for empty persistent queue
if err := zs.waitForEmptyPQ(ctx, rclient, defaultMetricsCheckInterval, i); err != nil {
return fmt.Errorf("zone=%s: failed to wait till VMAgent queue for VMCluster=%s is drained: %w", item, nsnCluster.String(), err)
zs.waitForEmptyPQ(ctx, rclient, defaultMetricsCheckInterval, i)
if ctx.Err() != nil {
return fmt.Errorf("zone=%s: failed to wait till VMAgent queue for VMCluster=%s is drained", item, nsnCluster.String())
}

// restore zone in VMAuth LB
Expand Down Expand Up @@ -290,7 +291,7 @@ func getMetricsAddrs(ctx context.Context, rclient client.Client, vmAgent *vmv1be
return addrs
}

func (zs *zones) waitForEmptyPQ(ctx context.Context, rclient client.Client, interval time.Duration, clusterIdx int) error {
func (zs *zones) waitForEmptyPQ(ctx context.Context, rclient client.Client, interval time.Duration, clusterIdx int) {
vmCluster := zs.vmclusters[clusterIdx]
clusterURLHash := fmt.Sprintf("%016X", xxhash.Sum64([]byte(vmCluster.GetRemoteWriteURL())))

Expand Down Expand Up @@ -324,17 +325,6 @@ func (zs *zones) waitForEmptyPQ(ctx context.Context, rclient client.Client, inte
}

var wg sync.WaitGroup
var resultErr error
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like that we remove this ugly block, but not sure its worth removing return error and read it from the context instead

Copy link
Contributor

@AndrewChubatiuk AndrewChubatiuk Mar 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this function returns nothing besides context.Canceled, other errors are treated as transient

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, okay, lets roll with it then. Could you LGTM the PR?

var once sync.Once
gctx, gcancel := context.WithCancel(ctx)
defer gcancel()
cancel := func(err error) {
once.Do(func() {
resultErr = err
gcancel()
})
}

for i := range zs.vmagents {
vmAgent := zs.vmagents[i]
if vmAgent.CreationTimestamp.IsZero() {
Expand All @@ -344,7 +334,7 @@ func (zs *zones) waitForEmptyPQ(ctx context.Context, rclient client.Client, inte
Name: vmAgent.Name,
Namespace: vmAgent.Namespace,
}
m := newManager(gctx)
m := newManager(ctx)
wg.Go(func() {
wait.UntilWithContext(m.ctx, func(ctx context.Context) {
addrs := getMetricsAddrs(ctx, rclient, vmAgent)
Expand All @@ -356,10 +346,7 @@ func (zs *zones) waitForEmptyPQ(ctx context.Context, rclient client.Client, inte
pctx := m.add(addr)
wg.Go(func() {
if err := pollMetrics(pctx, nsn, addr); err != nil {
if !errors.Is(err, context.Canceled) {
cancel(err)
return
}
return
}
m.stop(addr)
})
Expand All @@ -374,10 +361,6 @@ func (zs *zones) waitForEmptyPQ(ctx context.Context, rclient client.Client, inte
})
}
wg.Wait()
if resultErr != nil {
return fmt.Errorf("failed to wait for VMAgent metrics: %w", resultErr)
}
return nil
}

func newManager(ctx context.Context) *manager {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,8 @@ func TestWaitForEmptyPQ(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), o.timeout)
defer cancel()

err = zs.waitForEmptyPQ(ctx, rclient, 1*time.Second, 0)
zs.waitForEmptyPQ(ctx, rclient, 1*time.Second, 0)
err = ctx.Err()
if len(o.errMsg) > 0 {
assert.Error(t, err)
assert.Contains(t, err.Error(), o.errMsg)
Expand Down Expand Up @@ -241,6 +242,6 @@ func TestWaitForEmptyPQ(t *testing.T) {
fmt.Fprintf(w, `%s{path="/tmp/1_EF46DB3751D8E999"} 0`, vmAgentQueueMetricName)
},
timeout: 500 * time.Millisecond,
errMsg: "failed to wait for VMAgent metrics",
errMsg: "context deadline exceeded",
})
}
7 changes: 4 additions & 3 deletions test/e2e/suite/suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,14 @@ import (
vmv1 "github.com/VictoriaMetrics/operator/api/operator/v1"
vmv1alpha1 "github.com/VictoriaMetrics/operator/api/operator/v1alpha1"
vmv1beta1 "github.com/VictoriaMetrics/operator/api/operator/v1beta1"
"github.com/VictoriaMetrics/operator/internal/controller/operator"
"github.com/VictoriaMetrics/operator/internal/controller/operator/factory/build"
"github.com/VictoriaMetrics/operator/internal/manager"
)

var (
testEnv *envtest.Environment
cancelManager context.CancelFunc
cancelManager context.CancelCauseFunc
stopped = make(chan struct{})
)

Expand Down Expand Up @@ -173,7 +174,7 @@ func InitOperatorProcess(extraNamespaces ...string) []byte {
"--health-probe-bind-address", "0",
"--controller.maxConcurrentReconciles", "30",
)
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancelCause(context.Background())
go func(ctx context.Context) {
defer GinkgoRecover()
err := manager.RunManager(ctx)
Expand All @@ -188,7 +189,7 @@ func InitOperatorProcess(extraNamespaces ...string) []byte {
// and cleanup resources
func ShutdownOperatorProcess() {
By("tearing down the test environment")
cancelManager()
cancelManager(operator.ErrShutdown)
Eventually(stopped, 60, 2).Should(BeClosed())
Expect(testEnv.Stop()).ToNot(HaveOccurred())

Expand Down
Loading