Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions pkg/device-plugin/nvidiadevice/nvinternal/plugin/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,17 +417,20 @@ func (plugin *NvidiaDevicePlugin) GetDevicePluginOptions(context.Context, *kubel

// ListAndWatch lists devices and update that list according to the health status
func (plugin *NvidiaDevicePlugin) ListAndWatch(e *kubeletdevicepluginv1beta1.Empty, s kubeletdevicepluginv1beta1.DevicePlugin_ListAndWatchServer) error {
s.Send(&kubeletdevicepluginv1beta1.ListAndWatchResponse{Devices: plugin.apiDevices()})
_ = s.Send(&kubeletdevicepluginv1beta1.ListAndWatchResponse{Devices: plugin.apiDevices()})

resyncTicker := time.NewTicker(30 * time.Second)
defer resyncTicker.Stop()

for {
select {
case <-plugin.stop:
return nil
case d := <-plugin.health:
// FIXME: there is no way to recover from the Unhealthy state.
d.Health = kubeletdevicepluginv1beta1.Unhealthy
klog.Infof("'%s' device marked unhealthy: %s", plugin.rm.Resource(), d.ID)
s.Send(&kubeletdevicepluginv1beta1.ListAndWatchResponse{Devices: plugin.apiDevices()})
klog.Infof("'%s' device health update: %s => %s", plugin.rm.Resource(), d.ID, d.Health)
_ = s.Send(&kubeletdevicepluginv1beta1.ListAndWatchResponse{Devices: plugin.apiDevices()})
case <-resyncTicker.C:
_ = s.Send(&kubeletdevicepluginv1beta1.ListAndWatchResponse{Devices: plugin.apiDevices()})
}
}
}
Expand Down
42 changes: 39 additions & 3 deletions pkg/device-plugin/nvidiadevice/nvinternal/rm/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"k8s.io/klog/v2"

safecast "github.com/ccoveille/go-safecast"
kubeletdevicepluginv1beta1 "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
)

const (
Expand Down Expand Up @@ -116,6 +117,7 @@ func (r *nvmlResourceManager) checkHealth(stop <-chan any, devices Devices, unhe
uuid, gi, ci, err := r.getDevicePlacement(d)
if err != nil {
klog.Warningf("Could not determine device placement for %v: %v; Marking it unhealthy.", d.ID, err)
d.Health = kubeletdevicepluginv1beta1.Unhealthy
unhealthy <- d
continue
}
Expand All @@ -126,13 +128,15 @@ func (r *nvmlResourceManager) checkHealth(stop <-chan any, devices Devices, unhe
gpu, ret := r.nvml.DeviceGetHandleByUUID(uuid)
if ret != nvml.SUCCESS {
klog.Infof("unable to get device handle from UUID: %v; marking it as unhealthy", ret)
d.Health = kubeletdevicepluginv1beta1.Unhealthy
unhealthy <- d
continue
}

supportedEvents, ret := gpu.GetSupportedEventTypes()
if ret != nvml.SUCCESS {
klog.Infof("Unable to determine the supported events for %v: %v; marking it as unhealthy", d.ID, ret)
d.Health = kubeletdevicepluginv1beta1.Unhealthy
unhealthy <- d
continue
}
Expand All @@ -143,10 +147,16 @@ func (r *nvmlResourceManager) checkHealth(stop <-chan any, devices Devices, unhe
}
if ret != nvml.SUCCESS {
klog.Infof("Marking device %v as unhealthy: %v", d.ID, ret)
d.Health = kubeletdevicepluginv1beta1.Unhealthy
unhealthy <- d
}
}

// Track consecutive NVML event errors to avoid flapping
successiveEventErrorCount := 0
// Track devices we marked unhealthy due to NVML event errors to allow recovery
errorMarked := make(map[string]bool)

for {
select {
case <-stop:
Expand All @@ -161,15 +171,39 @@ func (r *nvmlResourceManager) checkHealth(stop <-chan any, devices Devices, unhe

e, ret := eventSet.Wait(5000)
if ret == nvml.ERROR_TIMEOUT {
// If we previously marked devices unhealthy due to NVML errors, attempt recovery when things stabilize
if successiveEventErrorCount > 0 {
successiveEventErrorCount = 0
for _, d := range devices {
if errorMarked[d.ID] {
klog.Infof("Recovering device %v to healthy after NVML wait stabilized", d.ID)
d.Health = kubeletdevicepluginv1beta1.Healthy
unhealthy <- d
delete(errorMarked, d.ID)
}
}
}
continue
}
if ret != nvml.SUCCESS {
klog.Infof("Error waiting for event: %v; Marking all devices as unhealthy", ret)
for _, d := range devices {
unhealthy <- d
successiveEventErrorCount++
if successiveEventErrorCount >= maxSuccessiveEventErrorCount {
klog.Infof("Error waiting for event (count=%d): %v; Marking error-affected devices as unhealthy", successiveEventErrorCount, ret)
for _, d := range devices {
if !errorMarked[d.ID] {
d.Health = kubeletdevicepluginv1beta1.Unhealthy
unhealthy <- d
errorMarked[d.ID] = true
}
}
}
continue
}
// Successful event received, reset error counter.
// Recovery is handled by the timeout branch once NVML wait stabilizes without errors.
if successiveEventErrorCount > 0 {
successiveEventErrorCount = 0
}

if e.EventType != nvml.EventTypeXidCriticalError {
klog.Infof("Skipping non-nvmlEventTypeXidCriticalError event: %+v", e)
Expand All @@ -187,6 +221,7 @@ func (r *nvmlResourceManager) checkHealth(stop <-chan any, devices Devices, unhe
// If we cannot reliably determine the device UUID, we mark all devices as unhealthy.
klog.Infof("Failed to determine uuid for event %v: %v; Marking all devices as unhealthy.", e, ret)
for _, d := range devices {
d.Health = kubeletdevicepluginv1beta1.Unhealthy
unhealthy <- d
}
continue
Expand All @@ -213,6 +248,7 @@ func (r *nvmlResourceManager) checkHealth(stop <-chan any, devices Devices, unhe
}

klog.Infof("XidCriticalError: Xid=%d on Device=%s; marking device as unhealthy.", e.EventData, d.ID)
d.Health = kubeletdevicepluginv1beta1.Unhealthy
unhealthy <- d
}
}
Expand Down