From b1abaa97d5450657fc98117fd36fe519271cba8f Mon Sep 17 00:00:00 2001 From: dkeven Date: Mon, 5 Jan 2026 16:04:00 +0800 Subject: [PATCH] fix(device-plugin): allow devices marked unhealthy to recover --- .../nvidiadevice/nvinternal/plugin/server.go | 13 +++--- .../nvidiadevice/nvinternal/rm/health.go | 42 +++++++++++++++++-- 2 files changed, 47 insertions(+), 8 deletions(-) diff --git a/pkg/device-plugin/nvidiadevice/nvinternal/plugin/server.go b/pkg/device-plugin/nvidiadevice/nvinternal/plugin/server.go index 59ce3c3ba..a65c53d21 100644 --- a/pkg/device-plugin/nvidiadevice/nvinternal/plugin/server.go +++ b/pkg/device-plugin/nvidiadevice/nvinternal/plugin/server.go @@ -417,17 +417,20 @@ func (plugin *NvidiaDevicePlugin) GetDevicePluginOptions(context.Context, *kubel // ListAndWatch lists devices and update that list according to the health status func (plugin *NvidiaDevicePlugin) ListAndWatch(e *kubeletdevicepluginv1beta1.Empty, s kubeletdevicepluginv1beta1.DevicePlugin_ListAndWatchServer) error { - s.Send(&kubeletdevicepluginv1beta1.ListAndWatchResponse{Devices: plugin.apiDevices()}) + _ = s.Send(&kubeletdevicepluginv1beta1.ListAndWatchResponse{Devices: plugin.apiDevices()}) + + resyncTicker := time.NewTicker(30 * time.Second) + defer resyncTicker.Stop() for { select { case <-plugin.stop: return nil case d := <-plugin.health: - // FIXME: there is no way to recover from the Unhealthy state. - d.Health = kubeletdevicepluginv1beta1.Unhealthy - klog.Infof("'%s' device marked unhealthy: %s", plugin.rm.Resource(), d.ID) - s.Send(&kubeletdevicepluginv1beta1.ListAndWatchResponse{Devices: plugin.apiDevices()}) + klog.Infof("'%s' device health update: %s => %s", plugin.rm.Resource(), d.ID, d.Health) + _ = s.Send(&kubeletdevicepluginv1beta1.ListAndWatchResponse{Devices: plugin.apiDevices()}) + case <-resyncTicker.C: + _ = s.Send(&kubeletdevicepluginv1beta1.ListAndWatchResponse{Devices: plugin.apiDevices()}) } } } diff --git a/pkg/device-plugin/nvidiadevice/nvinternal/rm/health.go b/pkg/device-plugin/nvidiadevice/nvinternal/rm/health.go index 4fa0680fc..85d21a3da 100644 --- a/pkg/device-plugin/nvidiadevice/nvinternal/rm/health.go +++ b/pkg/device-plugin/nvidiadevice/nvinternal/rm/health.go @@ -42,6 +42,7 @@ import ( "k8s.io/klog/v2" safecast "github.com/ccoveille/go-safecast" + kubeletdevicepluginv1beta1 "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1" ) const ( @@ -116,6 +117,7 @@ func (r *nvmlResourceManager) checkHealth(stop <-chan any, devices Devices, unhe uuid, gi, ci, err := r.getDevicePlacement(d) if err != nil { klog.Warningf("Could not determine device placement for %v: %v; Marking it unhealthy.", d.ID, err) + d.Health = kubeletdevicepluginv1beta1.Unhealthy unhealthy <- d continue } @@ -126,6 +128,7 @@ func (r *nvmlResourceManager) checkHealth(stop <-chan any, devices Devices, unhe gpu, ret := r.nvml.DeviceGetHandleByUUID(uuid) if ret != nvml.SUCCESS { klog.Infof("unable to get device handle from UUID: %v; marking it as unhealthy", ret) + d.Health = kubeletdevicepluginv1beta1.Unhealthy unhealthy <- d continue } @@ -133,6 +136,7 @@ func (r *nvmlResourceManager) checkHealth(stop <-chan any, devices Devices, unhe supportedEvents, ret := gpu.GetSupportedEventTypes() if ret != nvml.SUCCESS { klog.Infof("Unable to determine the supported events for %v: %v; marking it as unhealthy", d.ID, ret) + d.Health = kubeletdevicepluginv1beta1.Unhealthy unhealthy <- d continue } @@ -143,10 +147,16 @@ func (r *nvmlResourceManager) checkHealth(stop <-chan any, devices Devices, unhe } if ret != nvml.SUCCESS { klog.Infof("Marking device %v as unhealthy: %v", d.ID, ret) + d.Health = kubeletdevicepluginv1beta1.Unhealthy unhealthy <- d } } + // Track consecutive NVML event errors to avoid flapping + successiveEventErrorCount := 0 + // Track devices we marked unhealthy due to NVML event errors to allow recovery + errorMarked := make(map[string]bool) + for { select { case <-stop: @@ -161,15 +171,39 @@ func (r *nvmlResourceManager) checkHealth(stop <-chan any, devices Devices, unhe e, ret := eventSet.Wait(5000) if ret == nvml.ERROR_TIMEOUT { + // If we previously marked devices unhealthy due to NVML errors, attempt recovery when things stabilize + if successiveEventErrorCount > 0 { + successiveEventErrorCount = 0 + for _, d := range devices { + if errorMarked[d.ID] { + klog.Infof("Recovering device %v to healthy after NVML wait stabilized", d.ID) + d.Health = kubeletdevicepluginv1beta1.Healthy + unhealthy <- d + delete(errorMarked, d.ID) + } + } + } continue } if ret != nvml.SUCCESS { - klog.Infof("Error waiting for event: %v; Marking all devices as unhealthy", ret) - for _, d := range devices { - unhealthy <- d + successiveEventErrorCount++ + if successiveEventErrorCount >= maxSuccessiveEventErrorCount { + klog.Infof("Error waiting for event (count=%d): %v; Marking error-affected devices as unhealthy", successiveEventErrorCount, ret) + for _, d := range devices { + if !errorMarked[d.ID] { + d.Health = kubeletdevicepluginv1beta1.Unhealthy + unhealthy <- d + errorMarked[d.ID] = true + } + } } continue } + // Successful event received, reset error counter. + // Recovery is handled by the timeout branch once NVML wait stabilizes without errors. + if successiveEventErrorCount > 0 { + successiveEventErrorCount = 0 + } if e.EventType != nvml.EventTypeXidCriticalError { klog.Infof("Skipping non-nvmlEventTypeXidCriticalError event: %+v", e) @@ -187,6 +221,7 @@ func (r *nvmlResourceManager) checkHealth(stop <-chan any, devices Devices, unhe // If we cannot reliably determine the device UUID, we mark all devices as unhealthy. klog.Infof("Failed to determine uuid for event %v: %v; Marking all devices as unhealthy.", e, ret) for _, d := range devices { + d.Health = kubeletdevicepluginv1beta1.Unhealthy unhealthy <- d } continue @@ -213,6 +248,7 @@ func (r *nvmlResourceManager) checkHealth(stop <-chan any, devices Devices, unhe } klog.Infof("XidCriticalError: Xid=%d on Device=%s; marking device as unhealthy.", e.EventData, d.ID) + d.Health = kubeletdevicepluginv1beta1.Unhealthy unhealthy <- d } }