Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,11 @@ func New(client client.Client, scheme *runtime.Scheme) component.Operator[grovec
// GetExistingResourceNames returns the names of PCLQ-level ResourceClaims
// by selecting on the grove.io/podclique label that Sync stamps on each RC.
func (r _resource) GetExistingResourceNames(ctx context.Context, _ logr.Logger, pclqObjMeta metav1.ObjectMeta) ([]string, error) {
objMetaList := &metav1.PartialObjectMetadataList{}
objMetaList.SetGroupVersionKind(resourcev1.SchemeGroupVersion.WithKind("ResourceClaim"))
if err := r.client.List(ctx,
objMetaList,
objMetaList, err := resourceclaim.ListResourceClaimMetadata(ctx, r.client,
client.InNamespace(pclqObjMeta.Namespace),
client.MatchingLabels(pclqResourceClaimLabels(pclqObjMeta)),
); err != nil {
if meta.IsNoMatchError(err) {
return nil, nil
}
)
if err != nil {
return nil, groveerr.WrapError(err,
errSyncPCLQLevelRC,
component.OperationGetExistingResourceNames,
Expand Down Expand Up @@ -194,7 +189,7 @@ func pclqResourceClaimLabels(pclqObjMeta metav1.ObjectMeta) map[string]string {
// GC would create a deadlock since GC only fires after the PCLQ is fully deleted.
func (r _resource) Delete(ctx context.Context, logger logr.Logger, pclqObjMeta metav1.ObjectMeta) error {
labels := pclqResourceClaimLabels(pclqObjMeta)
if err := r.client.DeleteAllOf(ctx, &resourcev1.ResourceClaim{},
if err := resourceclaim.DeleteResourceClaims(ctx, r.client,
client.InNamespace(pclqObjMeta.Namespace),
client.MatchingLabels(labels),
); err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
k8sutils "github.com/ai-dynamo/grove/operator/internal/utils/kubernetes"

"github.com/go-logr/logr"
resourcev1 "k8s.io/api/resource/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -55,13 +54,11 @@ func New(client client.Client, scheme *runtime.Scheme) component.Operator[grovec

// GetExistingResourceNames returns the names of all ResourceClaims owned by this PCS.
func (r _resource) GetExistingResourceNames(ctx context.Context, logger logr.Logger, pcsObjMeta metav1.ObjectMeta) ([]string, error) {
objMetaList := &metav1.PartialObjectMetadataList{}
objMetaList.SetGroupVersionKind(resourcev1.SchemeGroupVersion.WithKind("ResourceClaim"))
if err := r.client.List(ctx,
objMetaList,
objMetaList, err := resourceclaim.ListResourceClaimMetadata(ctx, r.client,
client.InNamespace(pcsObjMeta.Namespace),
client.MatchingLabels(resourceclaim.ResourceClaimLabels(pcsObjMeta.Name)),
); err != nil {
)
if err != nil {
return nil, groveerr.WrapError(err,
errSyncPCSResourceClaim,
component.OperationGetExistingResourceNames,
Expand Down Expand Up @@ -163,7 +160,7 @@ func (r _resource) cleanupStaleResourceClaims(ctx context.Context, _ logr.Logger
// levels: PCS, PCSG, and PCLQ). This is safe because Delete is only called during
// PCS deletion when the entire hierarchy is being torn down.
func (r _resource) Delete(ctx context.Context, _ logr.Logger, pcsObjMeta metav1.ObjectMeta) error {
if err := r.client.DeleteAllOf(ctx, &resourcev1.ResourceClaim{},
if err := resourceclaim.DeleteResourceClaims(ctx, r.client,
client.InNamespace(pcsObjMeta.Namespace),
client.MatchingLabels(resourceclaim.ResourceClaimLabels(pcsObjMeta.Name)),
); err != nil {
Expand Down
44 changes: 42 additions & 2 deletions operator/internal/resourceclaim/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,21 @@ import (
"github.com/samber/lo"
corev1 "k8s.io/api/core/v1"
resourcev1 "k8s.io/api/resource/v1"
resourcev1beta2 "k8s.io/api/resource/v1beta2"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
)

const resourceClaimKind = "ResourceClaim"

// ResourceSharer is the common interface for all level-specific resource sharing
// types (PCS, PCSG, PCLQ). It enables the reconciler to operate uniformly over
// different resource sharing specs regardless of their filter type.
Expand Down Expand Up @@ -125,7 +130,42 @@ func DeleteResourceClaim(ctx context.Context, cl client.Client, name, namespace
rc := &resourcev1.ResourceClaim{
ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: namespace},
}
return client.IgnoreNotFound(cl.Delete(ctx, rc))
err := cl.Delete(ctx, rc)
if !meta.IsNoMatchError(err) {
return client.IgnoreNotFound(err)
}

rcBeta := &resourcev1beta2.ResourceClaim{
ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: namespace},
}
return client.IgnoreNotFound(cl.Delete(ctx, rcBeta))
}

// DeleteResourceClaims deletes all ResourceClaims matching the given options.
func DeleteResourceClaims(ctx context.Context, cl client.Client, opts ...client.DeleteAllOfOption) error {
err := cl.DeleteAllOf(ctx, &resourcev1.ResourceClaim{}, opts...)
if !meta.IsNoMatchError(err) {
return err
}
return cl.DeleteAllOf(ctx, &resourcev1beta2.ResourceClaim{}, opts...)
}

// ListResourceClaimMetadata lists ResourceClaims matching the given options.
func ListResourceClaimMetadata(ctx context.Context, cl client.Client, opts ...client.ListOption) (*metav1.PartialObjectMetadataList, error) {
objMetaList := newResourceClaimMetadataList(resourcev1.SchemeGroupVersion)
err := cl.List(ctx, objMetaList, opts...)
if !meta.IsNoMatchError(err) {
return objMetaList, err
}

objMetaList = newResourceClaimMetadataList(resourcev1beta2.SchemeGroupVersion)
return objMetaList, cl.List(ctx, objMetaList, opts...)
}

func newResourceClaimMetadataList(groupVersion schema.GroupVersion) *metav1.PartialObjectMetadataList {
objMetaList := &metav1.PartialObjectMetadataList{}
objMetaList.SetGroupVersionKind(groupVersion.WithKind(resourceClaimKind))
return objMetaList
}

// EnsureResourceClaims creates ResourceClaims for a list of ResourceSharer entries
Expand Down Expand Up @@ -326,7 +366,7 @@ func CleanupStalePerReplicaRCs(
sel = sel.Add(*notInReq)
}

return cl.DeleteAllOf(ctx, &resourcev1.ResourceClaim{},
return DeleteResourceClaims(ctx, cl,
client.InNamespace(namespace),
client.MatchingLabelsSelector{Selector: sel},
)
Expand Down
106 changes: 106 additions & 0 deletions operator/internal/resourceclaim/reconcile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,14 @@ import (
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
resourcev1 "k8s.io/api/resource/v1"
resourcev1beta2 "k8s.io/api/resource/v1beta2"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
"sigs.k8s.io/controller-runtime/pkg/client/interceptor"
)

// --- RCName ---
Expand Down Expand Up @@ -490,6 +494,101 @@ func TestDeleteResourceClaim(t *testing.T) {
err := DeleteResourceClaim(context.Background(), cl, "nonexistent", "default")
require.NoError(t, err)
})

t.Run("falls back to v1beta2 when v1 API is not served", func(t *testing.T) {
var calls []string
cl := fake.NewClientBuilder().WithScheme(scheme).WithInterceptorFuncs(interceptor.Funcs{
Delete: func(_ context.Context, _ client.WithWatch, obj client.Object, _ ...client.DeleteOption) error {
switch obj.(type) {
case *resourcev1.ResourceClaim:
calls = append(calls, resourcev1.SchemeGroupVersion.String())
return resourceClaimNoKindMatch(resourcev1.SchemeGroupVersion.Version)
case *resourcev1beta2.ResourceClaim:
calls = append(calls, resourcev1beta2.SchemeGroupVersion.String())
return nil
default:
return fmt.Errorf("unexpected delete object type %T", obj)
}
},
}).Build()

err := DeleteResourceClaim(context.Background(), cl, "my-rc", "default")
require.NoError(t, err)
assert.Equal(t, []string{
resourcev1.SchemeGroupVersion.String(),
resourcev1beta2.SchemeGroupVersion.String(),
}, calls)
})
}

func TestDeleteResourceClaims(t *testing.T) {
scheme := newTestScheme()

t.Run("falls back to v1beta2 when v1 API is not served", func(t *testing.T) {
var calls []string
cl := fake.NewClientBuilder().WithScheme(scheme).WithInterceptorFuncs(interceptor.Funcs{
DeleteAllOf: func(_ context.Context, _ client.WithWatch, obj client.Object, _ ...client.DeleteAllOfOption) error {
switch obj.(type) {
case *resourcev1.ResourceClaim:
calls = append(calls, resourcev1.SchemeGroupVersion.String())
return resourceClaimNoKindMatch(resourcev1.SchemeGroupVersion.Version)
case *resourcev1beta2.ResourceClaim:
calls = append(calls, resourcev1beta2.SchemeGroupVersion.String())
return nil
default:
return fmt.Errorf("unexpected delete-all object type %T", obj)
}
},
}).Build()

err := DeleteResourceClaims(context.Background(), cl,
client.InNamespace("default"),
client.MatchingLabels(ResourceClaimLabels("my-pcs")),
)
require.NoError(t, err)
assert.Equal(t, []string{
resourcev1.SchemeGroupVersion.String(),
resourcev1beta2.SchemeGroupVersion.String(),
}, calls)
})
}

func TestListResourceClaimMetadata(t *testing.T) {
scheme := newTestScheme()

t.Run("falls back to v1beta2 when v1 API is not served", func(t *testing.T) {
var calls []string
cl := fake.NewClientBuilder().WithScheme(scheme).WithInterceptorFuncs(interceptor.Funcs{
List: func(_ context.Context, _ client.WithWatch, list client.ObjectList, _ ...client.ListOption) error {
gvk := list.GetObjectKind().GroupVersionKind()
calls = append(calls, gvk.GroupVersion().String())
if gvk.Version == resourcev1.SchemeGroupVersion.Version {
return resourceClaimNoKindMatch(resourcev1.SchemeGroupVersion.Version)
}
if gvk.Version != resourcev1beta2.SchemeGroupVersion.Version {
return fmt.Errorf("unexpected list version %q", gvk.Version)
}

objMetaList, ok := list.(*metav1.PartialObjectMetadataList)
if !ok {
return fmt.Errorf("unexpected list object type %T", list)
}
objMetaList.Items = []metav1.PartialObjectMetadata{
{ObjectMeta: metav1.ObjectMeta{Name: "my-rc", Namespace: "default"}},
}
return nil
},
}).Build()

objMetaList, err := ListResourceClaimMetadata(context.Background(), cl, client.InNamespace("default"))
require.NoError(t, err)
require.Len(t, objMetaList.Items, 1)
assert.Equal(t, "my-rc", objMetaList.Items[0].Name)
assert.Equal(t, []string{
resourcev1.SchemeGroupVersion.String(),
resourcev1beta2.SchemeGroupVersion.String(),
}, calls)
})
}

// --- CleanupStalePerReplicaRCs ---
Expand Down Expand Up @@ -559,3 +658,10 @@ func TestCleanupStalePerReplicaRCs(t *testing.T) {

// Ensure ptr.To works for tests that need int pointer
var _ = ptr.To(0)

func resourceClaimNoKindMatch(version string) error {
return &meta.NoKindMatchError{
GroupKind: resourcev1.SchemeGroupVersion.WithKind(resourceClaimKind).GroupKind(),
SearchedVersions: []string{version},
}
}
2 changes: 2 additions & 0 deletions operator/internal/resourceclaim/resolve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
resourcev1 "k8s.io/api/resource/v1"
resourcev1beta2 "k8s.io/api/resource/v1beta2"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
Expand All @@ -32,6 +33,7 @@ func newTestScheme() *runtime.Scheme {
scheme := runtime.NewScheme()
_ = grovecorev1alpha1.AddToScheme(scheme)
_ = resourcev1.AddToScheme(scheme)
_ = resourcev1beta2.AddToScheme(scheme)
return scheme
}

Expand Down
Loading