Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 97 additions & 1 deletion controllers/gpuclusterconfig_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@ import (
"fmt"
"time"

corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"
Expand Down Expand Up @@ -62,6 +64,7 @@ type GPUClusterConfigReconciler struct {
//+kubebuilder:rbac:groups=nvidia.com,resources=gpuclusterconfigs/finalizers,verbs=update
//+kubebuilder:rbac:groups=nvidia.com,resources=clusterpolicies,verbs=get;list;watch
//+kubebuilder:rbac:groups="",resources=namespaces,verbs=get;update;patch
//+kubebuilder:rbac:groups="",resources=nodes,verbs=get;list;watch;update;patch
//+kubebuilder:rbac:groups=resource.k8s.io,resources=resourceclaimtemplates,verbs=get;list;watch;create;update;delete

func (r *GPUClusterConfigReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
Expand Down Expand Up @@ -95,7 +98,10 @@ func (r *GPUClusterConfigReconciler) Reconcile(ctx context.Context, req ctrl.Req
if condErr := r.conditionUpdater.SetConditionsError(ctx, instance, conditions.ReconcileFailed, msg); condErr != nil {
logger.Error(condErr, "failed to set condition")
}
return ctrl.Result{}, nil
// Requeue so the ClusterPolicy's deletion is noticed and the instance
// recovers; nothing watches ClusterPolicy here, mirroring the ready-path
// resync below.
return ctrl.Result{RequeueAfter: time.Minute}, nil
}

// Singleton, first-wins (mirroring ClusterPolicy): the first instance to reconcile
Expand All @@ -111,6 +117,10 @@ func (r *GPUClusterConfigReconciler) Reconcile(ctx context.Context, req ctrl.Req
}
r.singleton = instance

if err := r.labelGPUNodes(ctx); err != nil {
return ctrl.Result{}, fmt.Errorf("error labeling GPU nodes: %w", err)
}

infoCatalog := state.NewInfoCatalog()
infoCatalog.Add(state.InfoTypeClusterInfo, r.ClusterInfo)

Expand Down Expand Up @@ -147,6 +157,64 @@ func (r *GPUClusterConfigReconciler) Reconcile(ctx context.Context, req ctrl.Req
return ctrl.Result{RequeueAfter: time.Minute}, nil
}

// labelGPUNodes sets nvidia.com/gpu.present on nodes NFD found NVIDIA GPUs on,
// and resets it to "false" once the GPUs are gone. Operands match that common
// label directly, so unlike the ClusterPolicy labeler this manages no per-component
// state labels; it only also sets the deploy labels the operand nodeSelectors gate
// on (driver, dra-driver, gfd), which the ClusterPolicy controller would otherwise
// apply.
func (r *GPUClusterConfigReconciler) labelGPUNodes(ctx context.Context) error {
logger := log.FromContext(ctx)

list := &corev1.NodeList{}
if err := r.List(ctx, list); err != nil {
return fmt.Errorf("unable to list nodes to check labels: %w", err)
}
for i := range list.Items {
node := &list.Items[i]
labels := node.GetLabels()
patch := client.MergeFrom(node.DeepCopy())
modified := false
if hasGPULabels(labels) {
if !hasCommonGPULabel(labels) {
labels[commonGPULabelKey] = commonGPULabelValue
modified = true
}
if labels[driverDeployLabelKey] != "true" {
labels[driverDeployLabelKey] = "true"
modified = true
}
// Set only when absent, so k8s-driver-manager can pause the
// kubelet-plugin without this controller fighting it.
if _, ok := labels[state.DraDriverDeployLabelKey]; !ok {
labels[state.DraDriverDeployLabelKey] = "true"
modified = true
}
// Set only when absent, so k8s-driver-manager can pause GFD
// without this controller fighting it.
if _, ok := labels[gfdDeployLabelKey]; !ok {
labels[gfdDeployLabelKey] = "true"
modified = true
}
} else if hasCommonGPULabel(labels) {
labels[commonGPULabelKey] = "false"
delete(labels, driverDeployLabelKey)
delete(labels, state.DraDriverDeployLabelKey)
delete(labels, gfdDeployLabelKey)
modified = true
}
if !modified {
continue
}
node.SetLabels(labels)
logger.Info("Updating GPU node labels", "node", node.Name)
if err := r.Patch(ctx, node, patch); err != nil {
return fmt.Errorf("unable to label node %s: %w", node.Name, err)
}
}
return nil
}

// updateCrStatus writes desired to the CR's status, skipping the write when it is already current.
func (r *GPUClusterConfigReconciler) updateCrStatus(ctx context.Context, cr *nvidiav1alpha1.GPUClusterConfig, desired nvidiav1alpha1.State) error {
reqLogger := log.FromContext(ctx)
Expand Down Expand Up @@ -232,6 +300,34 @@ func (r *GPUClusterConfigReconciler) SetupWithManager(ctx context.Context, mgr c
return err
}

// Watch Nodes so GPU nodes are labeled as NFD discovers GPUs (and the label is
// reset when they disappear). Only label transitions the labeler acts on trigger
// a reconcile; the 1-minute Ready resync covers any remaining drift.
nodeMapFn := func(ctx context.Context, _ *corev1.Node) []reconcile.Request {
return r.enqueueAllGPUClusterConfigs(ctx, nil)
}
nodePredicate := predicate.TypedFuncs[*corev1.Node]{
CreateFunc: func(e event.TypedCreateEvent[*corev1.Node]) bool {
return hasGPULabels(e.Object.GetLabels())
},
UpdateFunc: func(e event.TypedUpdateEvent[*corev1.Node]) bool {
labels := e.ObjectNew.GetLabels()
return hasCommonGPULabel(labels) != hasGPULabels(labels)
},
DeleteFunc: func(_ event.TypedDeleteEvent[*corev1.Node]) bool {
return false
},
}
err = c.Watch(source.Kind(
mgr.GetCache(),
&corev1.Node{},
handler.TypedEnqueueRequestsFromMapFunc(nodeMapFn),
nodePredicate,
))
if err != nil {
return err
}

// Watch the secondary resources each state manager owns.
watchSources := stateManager.GetWatchSources(mgr)
for _, watchSource := range watchSources {
Expand Down
96 changes: 92 additions & 4 deletions controllers/gpuclusterconfig_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"testing"

"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -44,6 +45,7 @@ func newGPUClusterConfigReconciler(t *testing.T, objs ...client.Object) (*GPUClu
scheme := runtime.NewScheme()
require.NoError(t, nvidiav1alpha1.AddToScheme(scheme))
require.NoError(t, gpuv1.AddToScheme(scheme))
require.NoError(t, corev1.AddToScheme(scheme))

c := fake.NewClientBuilder().
WithScheme(scheme).
Expand All @@ -61,14 +63,17 @@ func newGPUClusterConfigReconciler(t *testing.T, objs ...client.Object) (*GPUClu
}

// fakeStateManager returns canned SyncState results so the controller tests don't load
// real manifests. GetWatchSources is promoted from the embedded (nil) interface and is
// never called here — only SetupWithManager calls it, which these tests skip.
// real manifests. It records the last info catalog passed to SyncState so tests can
// assert on its entries. GetWatchSources is promoted from the embedded (nil) interface
// and is never called here; only SetupWithManager calls it, which these tests skip.
type fakeStateManager struct {
state.Manager
results state.Results
results state.Results
lastCatalog state.InfoCatalog
}

func (f *fakeStateManager) SyncState(_ context.Context, _ interface{}, _ state.InfoCatalog) state.Results {
func (f *fakeStateManager) SyncState(_ context.Context, _ interface{}, catalog state.InfoCatalog) state.Results {
f.lastCatalog = catalog
return f.results
}

Expand Down Expand Up @@ -114,14 +119,97 @@ func TestGPUClusterConfigReconcileNotFound(t *testing.T) {

// A ClusterPolicy in the cluster disables the GPUClusterConfig: the two paths are
// mutually exclusive, so the DRA stack is not deployed alongside ClusterPolicy.
// The result requeues so the instance recovers once the ClusterPolicy is removed.
func TestGPUClusterConfigDisabledByClusterPolicy(t *testing.T) {
cfg := &nvidiav1alpha1.GPUClusterConfig{ObjectMeta: metav1.ObjectMeta{Name: "config"}}
cp := &gpuv1.ClusterPolicy{ObjectMeta: metav1.ObjectMeta{Name: "cluster-policy"}}
r, c := newGPUClusterConfigReconciler(t, cfg, cp)

res, err := r.Reconcile(t.Context(), gccRequest(cfg.Name))
require.NoError(t, err)
require.Positive(t, res.RequeueAfter, "disabled instance must requeue to detect ClusterPolicy removal")

require.Equal(t, nvidiav1alpha1.Disabled, gccState(t, c, cfg.Name))

// Removing the ClusterPolicy lets the next reconcile recover the instance.
require.NoError(t, c.Delete(t.Context(), cp))
gccReconcile(t, r, cfg.Name)
require.Equal(t, nvidiav1alpha1.Ready, gccState(t, c, cfg.Name))
}

func testNode(name string, labels map[string]string) *corev1.Node {
return &corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: name, Labels: labels}}
}

func nodeLabels(t *testing.T, c client.Client, name string) map[string]string {
t.Helper()
node := &corev1.Node{}
require.NoError(t, c.Get(t.Context(), types.NamespacedName{Name: name}, node))
return node.GetLabels()
}

// Nodes NFD discovered GPUs on get the common GPU label and the driver deploy
// label (the driver DaemonSet's nodeSelector requires it); nodes that lost their
// GPUs get the common label reset to "false" and the driver label removed; other
// nodes are untouched.
func TestGPUClusterConfigLabelsGPUNodes(t *testing.T) {
cfg := &nvidiav1alpha1.GPUClusterConfig{ObjectMeta: metav1.ObjectMeta{Name: "config"}}
gpuNode := testNode("gpu-node", map[string]string{
"feature.node.kubernetes.io/pci-10de.present": "true",
})
plainNode := testNode("plain-node", map[string]string{
"kubernetes.io/hostname": "plain-node",
})
removedGPUNode := testNode("removed-gpu-node", map[string]string{
commonGPULabelKey: commonGPULabelValue,
driverDeployLabelKey: "true",
state.DraDriverDeployLabelKey: "true",
gfdDeployLabelKey: "true",
})
r, c := newGPUClusterConfigReconciler(t, cfg, gpuNode, plainNode, removedGPUNode)

gccReconcile(t, r, cfg.Name)

require.Equal(t, commonGPULabelValue, nodeLabels(t, c, gpuNode.Name)[commonGPULabelKey])
require.Equal(t, "true", nodeLabels(t, c, gpuNode.Name)[driverDeployLabelKey])
require.Equal(t, "true", nodeLabels(t, c, gpuNode.Name)[state.DraDriverDeployLabelKey])
require.Equal(t, "true", nodeLabels(t, c, gpuNode.Name)[gfdDeployLabelKey])
require.NotContains(t, nodeLabels(t, c, plainNode.Name), commonGPULabelKey)
require.Equal(t, "false", nodeLabels(t, c, removedGPUNode.Name)[commonGPULabelKey])
require.NotContains(t, nodeLabels(t, c, removedGPUNode.Name), driverDeployLabelKey)
require.NotContains(t, nodeLabels(t, c, removedGPUNode.Name), state.DraDriverDeployLabelKey)
require.NotContains(t, nodeLabels(t, c, removedGPUNode.Name), gfdDeployLabelKey)
}

// A GPU node that already has gpu.present but is missing the driver deploy label
// (e.g. it was removed out of band) is converged back.
func TestGPUClusterConfigRestoresDriverDeployLabel(t *testing.T) {
cfg := &nvidiav1alpha1.GPUClusterConfig{ObjectMeta: metav1.ObjectMeta{Name: "config"}}
gpuNode := testNode("gpu-node", map[string]string{
"feature.node.kubernetes.io/pci-10de.present": "true",
commonGPULabelKey: commonGPULabelValue,
})
r, c := newGPUClusterConfigReconciler(t, cfg, gpuNode)

gccReconcile(t, r, cfg.Name)

require.Equal(t, "true", nodeLabels(t, c, gpuNode.Name)[driverDeployLabelKey])
}

// The mutually-exclusive ClusterPolicy path returns before node labeling, so the
// GPUClusterConfig controller never touches nodes on clusters ClusterPolicy owns.
func TestGPUClusterConfigNoNodeLabelingWhenClusterPolicyPresent(t *testing.T) {
cfg := &nvidiav1alpha1.GPUClusterConfig{ObjectMeta: metav1.ObjectMeta{Name: "config"}}
cp := &gpuv1.ClusterPolicy{ObjectMeta: metav1.ObjectMeta{Name: "cluster-policy"}}
gpuNode := testNode("gpu-node", map[string]string{
"feature.node.kubernetes.io/pci-10de.present": "true",
})
r, c := newGPUClusterConfigReconciler(t, cfg, cp, gpuNode)

gccReconcile(t, r, cfg.Name)

require.Equal(t, nvidiav1alpha1.Disabled, gccState(t, c, cfg.Name))
require.NotContains(t, nodeLabels(t, c, gpuNode.Name), commonGPULabelKey)
}

// First-reconciled wins (mirroring ClusterPolicy): whichever instance reconciles first
Expand Down
78 changes: 58 additions & 20 deletions controllers/nvidiadriver_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ type NVIDIADriverReconciler struct {
//+kubebuilder:rbac:groups=nvidia.com,resources=nvidiadrivers,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=nvidia.com,resources=nvidiadrivers/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=nvidia.com,resources=nvidiadrivers/finalizers,verbs=update
//+kubebuilder:rbac:groups=nvidia.com,resources=gpuclusterconfigs,verbs=get;list;watch

// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
Expand Down Expand Up @@ -109,26 +110,45 @@ func (r *NVIDIADriverReconciler) Reconcile(ctx context.Context, req ctrl.Request
return reconcile.Result{}, wrappedErr
}

if len(clusterPolicyList.Items) == 0 {
err := fmt.Errorf("no ClusterPolicy object found in the cluster")
logger.Error(err, "failed to get ClusterPolicy object")
instance.Status.State = nvidiav1alpha1.NotReady
if condErr := r.conditionUpdater.SetConditionsError(ctx, instance, conditions.ReconcileFailed, err.Error()); condErr != nil {
logger.Error(condErr, "failed to set condition")
// The cluster-wide configuration (host root) is sourced from ClusterPolicy when one
// exists; otherwise the controller runs standalone against the GPUClusterConfig.
var hostRoot string
if len(clusterPolicyList.Items) > 0 {
clusterPolicyInstance := clusterPolicyList.Items[0]

// Ensure that ClusterPolicy is configured to use NVIDIADriver CRD
if !clusterPolicyInstance.Spec.Driver.UseNvidiaDriverCRDType() {
msg := "useNvidiaDriverCRD is not enabled in ClusterPolicy"
logger.V(consts.LogLevelWarning).Info("NVIDIADriver reconciliation skipped", "reason", msg)
instance.Status.State = nvidiav1alpha1.Disabled
if condErr := r.conditionUpdater.SetConditionsError(ctx, instance, conditions.Reconciled, msg); condErr != nil {
logger.Error(condErr, "failed to set condition")
}
return reconcile.Result{}, nil
}
return reconcile.Result{}, err
}
clusterPolicyInstance := clusterPolicyList.Items[0]

// Ensure that ClusterPolicy is configured to use NVIDIADriver CRD
if !clusterPolicyInstance.Spec.Driver.UseNvidiaDriverCRDType() {
msg := "useNvidiaDriverCRD is not enabled in ClusterPolicy"
logger.V(consts.LogLevelWarning).Info("NVIDIADriver reconciliation skipped", "reason", msg)
instance.Status.State = nvidiav1alpha1.Disabled
if condErr := r.conditionUpdater.SetConditionsError(ctx, instance, conditions.Reconciled, msg); condErr != nil {
logger.Error(condErr, "failed to set condition")
hostRoot = clusterPolicyInstance.Spec.HostPaths.RootFS
} else {
gpuClusterConfigList := &nvidiav1alpha1.GPUClusterConfigList{}
if err := r.List(ctx, gpuClusterConfigList); err != nil {
wrappedErr := fmt.Errorf("error getting GPUClusterConfig list: %w", err)
logger.Error(err, "error getting GPUClusterConfig list")
instance.Status.State = nvidiav1alpha1.NotReady
if condErr := r.conditionUpdater.SetConditionsError(ctx, instance, conditions.ReconcileFailed, err.Error()); condErr != nil {
logger.Error(condErr, "failed to set condition")
}
return reconcile.Result{}, wrappedErr
}
return reconcile.Result{}, nil

if len(gpuClusterConfigList.Items) == 0 {
err := fmt.Errorf("no ClusterPolicy or GPUClusterConfig object found in the cluster")
logger.Error(err, "failed to get a cluster-wide configuration object")
instance.Status.State = nvidiav1alpha1.NotReady
if condErr := r.conditionUpdater.SetConditionsError(ctx, instance, conditions.ReconcileFailed, err.Error()); condErr != nil {
logger.Error(condErr, "failed to set condition")
}
return reconcile.Result{}, err
}
hostRoot = gpuClusterConfigList.Items[0].Spec.HostPaths.RootFS
}

// Create a new InfoCatalog which is a generic interface for passing information to state managers
Expand All @@ -137,8 +157,8 @@ func (r *NVIDIADriverReconciler) Reconcile(ctx context.Context, req ctrl.Request
// Add an entry for ClusterInfo, which was collected before the NVIDIADriver controller was started
infoCatalog.Add(state.InfoTypeClusterInfo, r.ClusterInfo)

// Add an entry for Clusterpolicy, which is needed to deploy the driver daemonset
infoCatalog.Add(state.InfoTypeClusterPolicyCR, clusterPolicyInstance)
// Add the host root, which is needed to deploy the driver daemonset
infoCatalog.Add(state.InfoTypeHostRoot, hostRoot)

// Verify the nodeSelector configured for this NVIDIADriver instance does
// not conflict with any other instances. This ensures only one driver
Expand Down Expand Up @@ -347,6 +367,24 @@ func (r *NVIDIADriverReconciler) SetupWithManager(ctx context.Context, mgr ctrl.
return err
}

// Watch for changes to GPUClusterConfig. Whenever an event is generated for
// GPUClusterConfig, enqueue a reconcile request for all NVIDIADriver instances.
gpuClusterConfigMapFn := func(ctx context.Context, _ *nvidiav1alpha1.GPUClusterConfig) []reconcile.Request {
return r.enqueueAllNVIDIADrivers(ctx)
}

err = c.Watch(
source.Kind(
mgr.GetCache(),
&nvidiav1alpha1.GPUClusterConfig{},
handler.TypedEnqueueRequestsFromMapFunc(gpuClusterConfigMapFn),
predicate.TypedGenerationChangedPredicate[*nvidiav1alpha1.GPUClusterConfig]{},
),
)
if err != nil {
return err
}

nodePredicate := predicate.TypedFuncs[*corev1.Node]{
CreateFunc: func(e event.TypedCreateEvent[*corev1.Node]) bool {
labels := e.Object.GetLabels()
Expand Down
Loading
Loading