diff --git a/cmd/main.go b/cmd/main.go index 79c63034c..dae0deeaa 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -23,6 +23,7 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/manager/signals" + "github.com/VictoriaMetrics/operator/internal/controller/operator" "github.com/VictoriaMetrics/operator/internal/manager" ) @@ -31,11 +32,11 @@ var ( ) func main() { - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancelCause(context.Background()) stop := signals.SetupSignalHandler() go func() { <-stop.Done() - cancel() + cancel(operator.ErrShutdown) }() err := manager.RunManager(ctx) diff --git a/internal/controller/operator/controllers.go b/internal/controller/operator/controllers.go index d043be908..d411612c1 100644 --- a/internal/controller/operator/controllers.go +++ b/internal/controller/operator/controllers.go @@ -88,6 +88,9 @@ type parsingError struct { controller string } +// ErrShutdown is a custom error returned as a cause of operator context cancel +var ErrShutdown = fmt.Errorf("graceful shutdown, exiting") + func (pe *parsingError) Error() string { return fmt.Sprintf("parsing object error for object controller=%q: %q", pe.controller, pe.origin) @@ -126,6 +129,9 @@ func handleReconcileErr[T client.Object, ST reconcile.StatusWithMetadata[STC], S switch { case errors.Is(err, context.Canceled): contextCancelErrorsTotal.Inc() + if !errors.Is(context.Cause(ctx), ErrShutdown) { + originResult.RequeueAfter = time.Second * 5 + } return originResult, nil case errors.As(err, &pe): namespacedName := "unknown" @@ -197,6 +203,9 @@ func handleReconcileErrWithoutStatus( switch { case errors.Is(err, context.Canceled): contextCancelErrorsTotal.Inc() + if !errors.Is(context.Cause(ctx), ErrShutdown) { + originResult.RequeueAfter = time.Second * 5 + } return originResult, nil case errors.As(err, &pe): parseObjectErrorsTotal.WithLabelValues(pe.controller, "unknown").Inc() diff --git a/internal/controller/operator/controllers_test.go b/internal/controller/operator/controllers_test.go index 572a11012..9746b4b07 100644 --- a/internal/controller/operator/controllers_test.go +++ b/internal/controller/operator/controllers_test.go @@ -2,12 +2,16 @@ package operator import ( "context" + "errors" + "fmt" "testing" + "time" "github.com/stretchr/testify/assert" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" vmv1beta1 "github.com/VictoriaMetrics/operator/api/operator/v1beta1" @@ -314,3 +318,78 @@ func TestIsSelectorsMatchesTargetCRD(t *testing.T) { isMatch: true, }) } + +func TestHandleReconcileErrWithoutStatus(t *testing.T) { + type opts struct { + err error + origin ctrl.Result + wantResult ctrl.Result + wantErr error + } + + f := func(ctx context.Context, o opts) { + t.Helper() + got, err := handleReconcileErrWithoutStatus(ctx, nil, nil, o.origin, o.err) + assert.Equal(t, o.wantErr, err) + assert.Equal(t, o.wantResult, got) + } + + // no error + f(context.Background(), opts{ + err: nil, + origin: ctrl.Result{RequeueAfter: 10}, + wantResult: ctrl.Result{RequeueAfter: 10}, + wantErr: nil, + }) + + // context canceled + f(context.Background(), opts{ + err: context.Canceled, + origin: ctrl.Result{}, + wantResult: ctrl.Result{RequeueAfter: time.Second * 5}, + wantErr: nil, + }) + + // context canceled with ErrShutdown + shutdownCtx, shutdownCancel := context.WithCancelCause(context.Background()) + shutdownCancel(ErrShutdown) + f(shutdownCtx, opts{ + err: fmt.Errorf("wrapped: %w", errors.Join(context.Canceled, ErrShutdown)), + origin: ctrl.Result{}, + wantResult: ctrl.Result{}, + wantErr: nil, + }) +} + +func TestHandleReconcileErr(t *testing.T) { + type opts struct { + err error + origin ctrl.Result + wantResult ctrl.Result + wantErr error + } + + f := func(o opts) { + t.Helper() + got, err := handleReconcileErr(context.Background(), nil, (*vmv1beta1.VMCluster)(nil), o.origin, o.err) + assert.Equal(t, o.wantErr, err) + assert.Equal(t, o.wantResult, got) + } + + // no error + f(opts{ + err: nil, + origin: ctrl.Result{RequeueAfter: 10}, + wantResult: ctrl.Result{RequeueAfter: 10}, + wantErr: nil, + }) + + // context canceled + f(opts{ + err: context.Canceled, + origin: ctrl.Result{}, + wantResult: ctrl.Result{RequeueAfter: time.Second * 5}, + wantErr: nil, + }) + +} diff --git a/internal/controller/operator/factory/vmdistributed/zone.go b/internal/controller/operator/factory/vmdistributed/zone.go index d3bf82923..bdb695752 100644 --- a/internal/controller/operator/factory/vmdistributed/zone.go +++ b/internal/controller/operator/factory/vmdistributed/zone.go @@ -2,7 +2,6 @@ package vmdistributed import ( "context" - "errors" "fmt" "net/http" "net/url" @@ -195,8 +194,9 @@ func (zs *zones) upgrade(ctx context.Context, rclient client.Client, cr *vmv1alp } if needsLBUpdate { // wait for empty persistent queue - if err := zs.waitForEmptyPQ(ctx, rclient, defaultMetricsCheckInterval, i); err != nil { - return fmt.Errorf("zone=%s: failed to wait till VMCluster=%s queue is empty: %w", item, nsnCluster.String(), err) + zs.waitForEmptyPQ(ctx, rclient, defaultMetricsCheckInterval, i) + if ctx.Err() != nil { + return fmt.Errorf("zone=%s: failed to wait till VMCluster=%s queue is empty", item, nsnCluster.String()) } // excluding zone from VMAuth LB @@ -217,8 +217,9 @@ func (zs *zones) upgrade(ctx context.Context, rclient client.Client, cr *vmv1alp } // wait for empty persistent queue - if err := zs.waitForEmptyPQ(ctx, rclient, defaultMetricsCheckInterval, i); err != nil { - return fmt.Errorf("zone=%s: failed to wait till VMAgent queue for VMCluster=%s is drained: %w", item, nsnCluster.String(), err) + zs.waitForEmptyPQ(ctx, rclient, defaultMetricsCheckInterval, i) + if ctx.Err() != nil { + return fmt.Errorf("zone=%s: failed to wait till VMAgent queue for VMCluster=%s is drained", item, nsnCluster.String()) } // restore zone in VMAuth LB @@ -290,7 +291,7 @@ func getMetricsAddrs(ctx context.Context, rclient client.Client, vmAgent *vmv1be return addrs } -func (zs *zones) waitForEmptyPQ(ctx context.Context, rclient client.Client, interval time.Duration, clusterIdx int) error { +func (zs *zones) waitForEmptyPQ(ctx context.Context, rclient client.Client, interval time.Duration, clusterIdx int) { vmCluster := zs.vmclusters[clusterIdx] clusterURLHash := fmt.Sprintf("%016X", xxhash.Sum64([]byte(vmCluster.GetRemoteWriteURL()))) @@ -324,17 +325,6 @@ func (zs *zones) waitForEmptyPQ(ctx context.Context, rclient client.Client, inte } var wg sync.WaitGroup - var resultErr error - var once sync.Once - gctx, gcancel := context.WithCancel(ctx) - defer gcancel() - cancel := func(err error) { - once.Do(func() { - resultErr = err - gcancel() - }) - } - for i := range zs.vmagents { vmAgent := zs.vmagents[i] if vmAgent.CreationTimestamp.IsZero() { @@ -344,7 +334,7 @@ func (zs *zones) waitForEmptyPQ(ctx context.Context, rclient client.Client, inte Name: vmAgent.Name, Namespace: vmAgent.Namespace, } - m := newManager(gctx) + m := newManager(ctx) wg.Go(func() { wait.UntilWithContext(m.ctx, func(ctx context.Context) { addrs := getMetricsAddrs(ctx, rclient, vmAgent) @@ -356,10 +346,7 @@ func (zs *zones) waitForEmptyPQ(ctx context.Context, rclient client.Client, inte pctx := m.add(addr) wg.Go(func() { if err := pollMetrics(pctx, nsn, addr); err != nil { - if !errors.Is(err, context.Canceled) { - cancel(err) - return - } + return } m.stop(addr) }) @@ -374,10 +361,6 @@ func (zs *zones) waitForEmptyPQ(ctx context.Context, rclient client.Client, inte }) } wg.Wait() - if resultErr != nil { - return fmt.Errorf("failed to wait for VMAgent metrics: %w", resultErr) - } - return nil } func newManager(ctx context.Context) *manager { diff --git a/internal/controller/operator/factory/vmdistributed/zone_test.go b/internal/controller/operator/factory/vmdistributed/zone_test.go index 113de2315..3cf3b95c1 100644 --- a/internal/controller/operator/factory/vmdistributed/zone_test.go +++ b/internal/controller/operator/factory/vmdistributed/zone_test.go @@ -194,7 +194,8 @@ func TestWaitForEmptyPQ(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), o.timeout) defer cancel() - err = zs.waitForEmptyPQ(ctx, rclient, 1*time.Second, 0) + zs.waitForEmptyPQ(ctx, rclient, 1*time.Second, 0) + err = ctx.Err() if len(o.errMsg) > 0 { assert.Error(t, err) assert.Contains(t, err.Error(), o.errMsg) @@ -241,6 +242,6 @@ func TestWaitForEmptyPQ(t *testing.T) { fmt.Fprintf(w, `%s{path="/tmp/1_EF46DB3751D8E999"} 0`, vmAgentQueueMetricName) }, timeout: 500 * time.Millisecond, - errMsg: "failed to wait for VMAgent metrics", + errMsg: "context deadline exceeded", }) } diff --git a/test/e2e/suite/suite.go b/test/e2e/suite/suite.go index a442325d9..f60aeb126 100644 --- a/test/e2e/suite/suite.go +++ b/test/e2e/suite/suite.go @@ -32,13 +32,14 @@ import ( vmv1 "github.com/VictoriaMetrics/operator/api/operator/v1" vmv1alpha1 "github.com/VictoriaMetrics/operator/api/operator/v1alpha1" vmv1beta1 "github.com/VictoriaMetrics/operator/api/operator/v1beta1" + "github.com/VictoriaMetrics/operator/internal/controller/operator" "github.com/VictoriaMetrics/operator/internal/controller/operator/factory/build" "github.com/VictoriaMetrics/operator/internal/manager" ) var ( testEnv *envtest.Environment - cancelManager context.CancelFunc + cancelManager context.CancelCauseFunc stopped = make(chan struct{}) ) @@ -173,7 +174,7 @@ func InitOperatorProcess(extraNamespaces ...string) []byte { "--health-probe-bind-address", "0", "--controller.maxConcurrentReconciles", "30", ) - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancelCause(context.Background()) go func(ctx context.Context) { defer GinkgoRecover() err := manager.RunManager(ctx) @@ -188,7 +189,7 @@ func InitOperatorProcess(extraNamespaces ...string) []byte { // and cleanup resources func ShutdownOperatorProcess() { By("tearing down the test environment") - cancelManager() + cancelManager(operator.ErrShutdown) Eventually(stopped, 60, 2).Should(BeClosed()) Expect(testEnv.Stop()).ToNot(HaveOccurred())