From 33874d145a3b0deca426eadf779126901cf26c2d Mon Sep 17 00:00:00 2001 From: AnouarMohamed Date: Tue, 2 Jun 2026 14:23:25 +0100 Subject: [PATCH 1/4] feat(operator): add configurable drain behavior Signed-off-by: AnouarMohamed --- chart/templates/skyhook-crd.yaml | 45 +++ docs/interrupt_flow.md | 36 +++ .../skyhook/drain-config/chainsaw-test.yaml | 139 +++++++++ .../drain-config/disable-eviction.yaml | 76 +++++ .../skyhook/drain-config/timeout-assert.yaml | 40 +++ .../skyhook/drain-config/timeout.yaml | 59 ++++ operator/CHANGELOG.md | 1 + operator/api/v1alpha1/skyhook_types.go | 62 ++++ operator/api/v1alpha1/skyhook_webhook.go | 4 + operator/api/v1alpha1/skyhook_webhook_test.go | 23 +- .../api/v1alpha1/zz_generated.deepcopy.go | 45 +++ operator/cmd/cli/app/node/node_reset.go | 65 +++- operator/cmd/cli/app/node/node_reset_test.go | 87 +++++- operator/cmd/cli/app/node/node_status.go | 10 +- operator/cmd/cli/app/reset.go | 72 +++-- operator/cmd/cli/app/reset_test.go | 73 ++++- .../bases/skyhook.nvidia.com_skyhooks.yaml | 45 +++ .../internal/controller/mock/SkyhookNodes.go | 218 ++++++------- .../internal/controller/skyhook_controller.go | 125 ++++++-- .../controller/skyhook_controller_test.go | 292 ++++++++++++++++++ operator/internal/drain/drain.go | 155 ++++++++++ operator/internal/drain/drain_suite_test.go | 31 ++ operator/internal/drain/drain_test.go | 232 ++++++++++++++ operator/internal/wrapper/mock/SkyhookNode.go | 143 ++++++++- .../internal/wrapper/mock/SkyhookNodeOnly.go | 143 ++++++++- operator/internal/wrapper/node.go | 68 +++- operator/internal/wrapper/node_test.go | 95 ++++++ 27 files changed, 2189 insertions(+), 195 deletions(-) create mode 100644 k8s-tests/chainsaw/skyhook/drain-config/chainsaw-test.yaml create mode 100644 k8s-tests/chainsaw/skyhook/drain-config/disable-eviction.yaml create mode 100644 k8s-tests/chainsaw/skyhook/drain-config/timeout-assert.yaml create mode 100644 k8s-tests/chainsaw/skyhook/drain-config/timeout.yaml create mode 100644 operator/internal/drain/drain.go create mode 100644 operator/internal/drain/drain_suite_test.go create mode 100644 operator/internal/drain/drain_test.go diff --git a/chart/templates/skyhook-crd.yaml b/chart/templates/skyhook-crd.yaml index 10102ef3..42cf8b53 100644 --- a/chart/templates/skyhook-crd.yaml +++ b/chart/templates/skyhook-crd.yaml @@ -133,6 +133,51 @@ spec: setting for this Skyhook type: boolean type: object + drainConfig: + description: |- + DrainConfig tunes how nodes are drained before running interrupt packages. + If unset, the operator preserves its existing drain behavior. + properties: + deleteEmptyDirData: + default: true + description: |- + DeleteEmptyDirData allows draining pods that use emptyDir volumes. + Defaults to true to preserve the operator's existing behavior. + nullable: true + type: boolean + disableEviction: + default: false + description: |- + DisableEviction bypasses the eviction API and deletes pods directly. + This bypasses PodDisruptionBudgets. + type: boolean + force: + default: true + description: |- + Force allows draining pods not managed by a controller. + Defaults to true to preserve the operator's existing behavior. + nullable: true + type: boolean + gracePeriod: + description: |- + GracePeriod overrides the grace period used on pod eviction/delete. + Unset uses each pod's own terminationGracePeriodSeconds. + nullable: true + type: string + ignoreDaemonSets: + default: true + description: |- + IgnoreDaemonSets skips DaemonSet-managed pods during drain. + Defaults to true to preserve the operator's existing behavior. + nullable: true + type: boolean + timeout: + description: |- + Timeout bounds how long the operator waits for a node to drain. + Zero or unset means no timeout. + nullable: true + type: string + type: object interruptionBudget: description: InterruptionBudget configures how many nodes that match node selectors that allowed to be interrupted at once. diff --git a/docs/interrupt_flow.md b/docs/interrupt_flow.md index 0ea70544..776034b3 100644 --- a/docs/interrupt_flow.md +++ b/docs/interrupt_flow.md @@ -81,6 +81,42 @@ The interrupt flow is managed by the `ProcessInterrupt` and `EnsureNodeIsReadyFo - Ensure the node is ready before proceeding with package operations - Handle the timing and sequencing of all stages +## Drain Configuration + +Interrupt-enabled Skyhooks can tune drain behavior with `spec.drainConfig`. +Unset fields preserve the operator's existing behavior: + +```yaml +apiVersion: skyhook.nvidia.com/v1alpha1 +kind: Skyhook +metadata: + name: gpu-mode-switch +spec: + drainConfig: + disableEviction: false + deleteEmptyDirData: true + force: true + ignoreDaemonSets: true + timeout: 10m + gracePeriod: 30s +``` + +The fields map to Kubernetes drain behavior: + +- `disableEviction`: when `true`, pods are deleted directly instead of evicted. This bypasses PodDisruptionBudgets. The default is `false`, so the eviction API is used. +- `deleteEmptyDirData`: when `false`, pods with `emptyDir` volumes block drain. The default is `true`. +- `force`: when `false`, pods without a managing controller block drain. The default is `true`. +- `ignoreDaemonSets`: when `true`, DaemonSet-managed pods are skipped during drain. The default is `true`. +- `timeout`: bounds how long a node may spend draining. Unset or zero means no timeout. When the timeout expires, the node is marked `erroring` and package stages do not proceed on that node. +- `gracePeriod`: overrides the grace period used for eviction or direct deletion. Unset uses each pod's own `terminationGracePeriodSeconds`. + +The operator also skips pods that are already terminating, pods that tolerate +the `node.kubernetes.io/unschedulable` taint, mirror/static pods, and pods in +`kube-system`. These exclusions are not user-configurable. + +`podNonInterruptLabels` remains a pre-drain barrier. Matching pods must finish +or move away before the operator starts the configurable drain step. + ## Best Practices - Always test interrupt-enabled packages in non-production environments first diff --git a/k8s-tests/chainsaw/skyhook/drain-config/chainsaw-test.yaml b/k8s-tests/chainsaw/skyhook/drain-config/chainsaw-test.yaml new file mode 100644 index 00000000..982fc981 --- /dev/null +++ b/k8s-tests/chainsaw/skyhook/drain-config/chainsaw-test.yaml @@ -0,0 +1,139 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# yaml-language-server: $schema=https://raw.githubusercontent.com/kyverno/chainsaw/main/.schemas/json/test-chainsaw-v1alpha1.json +apiVersion: chainsaw.kyverno.io/v1alpha1 +kind: Test +metadata: + name: drain-config + labels: + pool: interrupt +spec: + timeouts: + assert: 240s + exec: 90s + catch: + - get: + apiVersion: v1 + kind: Node + selector: skyhook.nvidia.com/test-node=skyhooke2e + format: yaml + - get: + apiVersion: skyhook.nvidia.com/v1alpha1 + kind: Skyhook + name: drain-config-disable-eviction + namespace: skyhook + format: yaml + - get: + apiVersion: skyhook.nvidia.com/v1alpha1 + kind: Skyhook + name: drain-config-timeout + namespace: skyhook + format: yaml + - get: + apiVersion: v1 + kind: Pod + namespace: skyhook + selector: app in (drain-config-pdb,drain-config-blocker) + format: yaml + steps: + - name: disable-eviction-bypasses-pdb + description: Verify disableEviction deletes through a zero-disruption PDB and lets the interrupt complete + try: + - script: + content: | + ../skyhook-cli reset drain-config-disable-eviction --confirm 2>/dev/null || true + kubectl -n skyhook delete skyhook drain-config-disable-eviction --ignore-not-found --wait=false + kubectl -n skyhook delete deployment drain-config-pdb --ignore-not-found --wait=false + kubectl -n skyhook delete pdb drain-config-pdb --ignore-not-found + kubectl -n skyhook delete configmap drain-config-original-pod --ignore-not-found + kubectl -n skyhook delete pod -l app=drain-config-pdb --ignore-not-found --wait=false + - apply: + file: disable-eviction.yaml + - script: + content: | + kubectl -n skyhook rollout status deployment/drain-config-pdb --timeout=60s + pod="$(kubectl -n skyhook get pod -l app=drain-config-pdb -o jsonpath='{.items[0].metadata.name}')" + kubectl -n skyhook wait --for=condition=Ready "pod/${pod}" --timeout=30s + kubectl -n skyhook create configmap drain-config-original-pod --from-literal=name="${pod}" --dry-run=client -o yaml | kubectl apply -f - + kubectl -n skyhook wait --for=jsonpath='{.status.disruptionsAllowed}'=0 pdb/drain-config-pdb --timeout=60s + - script: + content: | + kubectl -n skyhook wait --for=condition=Ready skyhook/drain-config-disable-eviction --timeout=180s + - script: + content: | + pod="$(kubectl -n skyhook get configmap drain-config-original-pod -o jsonpath='{.data.name}')" + if kubectl -n skyhook get pod "${pod}" >/dev/null 2>&1; then + echo "expected original PDB-protected pod ${pod} to be deleted directly" + exit 1 + fi + finally: + - script: + content: | + ../skyhook-cli reset drain-config-disable-eviction --confirm 2>/dev/null || true + kubectl -n skyhook delete skyhook drain-config-disable-eviction --ignore-not-found --wait=false + kubectl -n skyhook delete deployment drain-config-pdb --ignore-not-found --wait=false + kubectl -n skyhook delete pdb drain-config-pdb --ignore-not-found + kubectl -n skyhook delete configmap drain-config-original-pod --ignore-not-found + kubectl -n skyhook delete pod -l app=drain-config-pdb --ignore-not-found --wait=false + kubectl get nodes -l skyhook.nvidia.com/test-node=skyhooke2e -o name | while read -r node; do kubectl uncordon "${node}" || true; done + - name: timeout-surfaces-erroring + description: Verify timeout marks the node erroring and emits a drain event when drain cannot proceed + try: + - script: + content: | + ../skyhook-cli reset drain-config-timeout --confirm 2>/dev/null || true + kubectl -n skyhook delete skyhook drain-config-timeout --ignore-not-found --wait=false + kubectl -n skyhook delete pod drain-config-blocker --ignore-not-found --wait=false + - apply: + file: timeout.yaml + - wait: + apiVersion: v1 + kind: Pod + name: drain-config-blocker + namespace: skyhook + timeout: 30s + for: + condition: + name: Ready + value: 'true' + - script: + content: | + kubectl -n skyhook wait --for=jsonpath='{.status.status}'=erroring skyhook/drain-config-timeout --timeout=180s + - assert: + file: timeout-assert.yaml + - script: + content: | + for _ in $(seq 1 30); do + events="$(kubectl -n skyhook get events --field-selector regarding.kind=Skyhook,regarding.name=drain-config-timeout,reason=Drain -o jsonpath='{range .items[*]}{.action}{"\t"}{.message}{"\t"}{.note}{"\n"}{end}' 2>/dev/null || true)" + if printf "%s\n" "${events}" | grep -F "DrainTimeout"; then + exit 0 + fi + events="$(kubectl -n skyhook get events --field-selector involvedObject.kind=Skyhook,involvedObject.name=drain-config-timeout,reason=Drain -o jsonpath='{range .items[*]}{.action}{"\t"}{.message}{"\t"}{.note}{"\n"}{end}' 2>/dev/null || true)" + if printf "%s\n" "${events}" | grep -F "DrainTimeout"; then + exit 0 + fi + sleep 2 + done + echo "expected drain timeout event for drain-config-timeout" + exit 1 + finally: + - script: + content: | + ../skyhook-cli reset drain-config-timeout --confirm 2>/dev/null || true + kubectl -n skyhook delete skyhook drain-config-timeout --ignore-not-found --wait=false + kubectl -n skyhook delete pod drain-config-blocker --ignore-not-found --wait=false + kubectl get nodes -l skyhook.nvidia.com/test-node=skyhooke2e -o name | while read -r node; do kubectl uncordon "${node}" || true; done diff --git a/k8s-tests/chainsaw/skyhook/drain-config/disable-eviction.yaml b/k8s-tests/chainsaw/skyhook/drain-config/disable-eviction.yaml new file mode 100644 index 00000000..c16e5fe6 --- /dev/null +++ b/k8s-tests/chainsaw/skyhook/drain-config/disable-eviction.yaml @@ -0,0 +1,76 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: drain-config-pdb + namespace: skyhook +spec: + replicas: 1 + selector: + matchLabels: + app: drain-config-pdb + template: + metadata: + labels: + app: drain-config-pdb + spec: + terminationGracePeriodSeconds: 5 + nodeSelector: + skyhook.nvidia.com/test-node: skyhooke2e + containers: + - name: workload + image: ghcr.io/nvidia/skyhook/agentless:3.2.3 + imagePullPolicy: IfNotPresent + command: ["sh", "-c", "sleep 1000"] +--- +apiVersion: policy/v1 +kind: PodDisruptionBudget +metadata: + name: drain-config-pdb + namespace: skyhook +spec: + minAvailable: 1 + selector: + matchLabels: + app: drain-config-pdb +--- +apiVersion: skyhook.nvidia.com/v1alpha1 +kind: Skyhook +metadata: + name: drain-config-disable-eviction + namespace: skyhook +spec: + drainConfig: + disableEviction: true + gracePeriod: 0s + nodeSelectors: + matchLabels: + skyhook.nvidia.com/test-node: skyhooke2e + interruptionBudget: + count: 1 + packages: + drain-pdb: + version: "1.2.3" + image: ghcr.io/nvidia/skyhook/agentless + interrupt: + type: service + services: [rsyslog] + env: + - name: SLEEP_LEN + value: "1" diff --git a/k8s-tests/chainsaw/skyhook/drain-config/timeout-assert.yaml b/k8s-tests/chainsaw/skyhook/drain-config/timeout-assert.yaml new file mode 100644 index 00000000..2772feb0 --- /dev/null +++ b/k8s-tests/chainsaw/skyhook/drain-config/timeout-assert.yaml @@ -0,0 +1,40 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: v1 +kind: Node +metadata: + labels: + skyhook.nvidia.com/test-node: skyhooke2e + skyhook.nvidia.com/status_drain-config-timeout: erroring + annotations: + skyhook.nvidia.com/status_drain-config-timeout: erroring + (contains(@, 'skyhook.nvidia.com/drainStart_drain-config-timeout')): true +spec: + taints: + - effect: NoSchedule + key: node.kubernetes.io/unschedulable +--- +apiVersion: skyhook.nvidia.com/v1alpha1 +kind: Skyhook +metadata: + name: drain-config-timeout + namespace: skyhook +status: + status: erroring + nodeStatus: + (values(@)): + - erroring diff --git a/k8s-tests/chainsaw/skyhook/drain-config/timeout.yaml b/k8s-tests/chainsaw/skyhook/drain-config/timeout.yaml new file mode 100644 index 00000000..bb00686e --- /dev/null +++ b/k8s-tests/chainsaw/skyhook/drain-config/timeout.yaml @@ -0,0 +1,59 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +--- +apiVersion: v1 +kind: Pod +metadata: + name: drain-config-blocker + namespace: skyhook + labels: + app: drain-config-blocker +spec: + terminationGracePeriodSeconds: 5 + restartPolicy: Never + nodeSelector: + skyhook.nvidia.com/test-node: skyhooke2e + containers: + - name: workload + image: ghcr.io/nvidia/skyhook/agentless:3.2.3 + imagePullPolicy: IfNotPresent + command: ["sh", "-c", "sleep 1000"] +--- +apiVersion: skyhook.nvidia.com/v1alpha1 +kind: Skyhook +metadata: + name: drain-config-timeout + namespace: skyhook +spec: + drainConfig: + force: false + timeout: 5s + nodeSelectors: + matchLabels: + skyhook.nvidia.com/test-node: skyhooke2e + interruptionBudget: + count: 1 + packages: + drain-timeout: + version: "1.2.3" + image: ghcr.io/nvidia/skyhook/agentless + interrupt: + type: service + services: [rsyslog] + env: + - name: SLEEP_LEN + value: "1" diff --git a/operator/CHANGELOG.md b/operator/CHANGELOG.md index 2ebfad10..ddff7236 100644 --- a/operator/CHANGELOG.md +++ b/operator/CHANGELOG.md @@ -22,6 +22,7 @@ and CR deletion behave. Affects the Operator, Webhook, and CRD. ### New Features +- Add `spec.drainConfig` so interrupt drains can tune eviction, direct deletion, emptyDir handling, unmanaged-pod handling, DaemonSet skipping, timeout, and grace-period behavior. - Add a standard `Ready` condition to Skyhook status for native Kubernetes wait and GitOps health tooling. ### New behavior diff --git a/operator/api/v1alpha1/skyhook_types.go b/operator/api/v1alpha1/skyhook_types.go index 0d09f407..7272854c 100644 --- a/operator/api/v1alpha1/skyhook_types.go +++ b/operator/api/v1alpha1/skyhook_types.go @@ -62,6 +62,11 @@ type SkyhookSpec struct { // InterruptionBudget configures how many nodes that match node selectors that allowed to be interrupted at once. InterruptionBudget InterruptionBudget `json:"interruptionBudget,omitempty"` + // DrainConfig tunes how nodes are drained before running interrupt packages. + // If unset, the operator preserves its existing drain behavior. + // +optional + DrainConfig *DrainConfig `json:"drainConfig,omitempty"` + // Packages are the DAG of packages to be applied to nodes. Packages Packages `json:"packages,omitempty"` @@ -194,6 +199,63 @@ func (i *InterruptionBudget) Validate() error { return nil } +type DrainConfig struct { + // DisableEviction bypasses the eviction API and deletes pods directly. + // This bypasses PodDisruptionBudgets. + // +optional + //+kubebuilder:default=false + DisableEviction bool `json:"disableEviction,omitempty"` + + // DeleteEmptyDirData allows draining pods that use emptyDir volumes. + // Defaults to true to preserve the operator's existing behavior. + // +optional + //+kubebuilder:default=true + //+nullable + DeleteEmptyDirData *bool `json:"deleteEmptyDirData,omitempty"` + + // Force allows draining pods not managed by a controller. + // Defaults to true to preserve the operator's existing behavior. + // +optional + //+kubebuilder:default=true + //+nullable + Force *bool `json:"force,omitempty"` + + // IgnoreDaemonSets skips DaemonSet-managed pods during drain. + // Defaults to true to preserve the operator's existing behavior. + // +optional + //+kubebuilder:default=true + //+nullable + IgnoreDaemonSets *bool `json:"ignoreDaemonSets,omitempty"` + + // Timeout bounds how long the operator waits for a node to drain. + // Zero or unset means no timeout. + // +optional + //+nullable + Timeout *metav1.Duration `json:"timeout,omitempty"` + + // GracePeriod overrides the grace period used on pod eviction/delete. + // Unset uses each pod's own terminationGracePeriodSeconds. + // +optional + //+nullable + GracePeriod *metav1.Duration `json:"gracePeriod,omitempty"` +} + +func (d *DrainConfig) Validate() error { + if d == nil { + return nil + } + + if d.Timeout != nil && d.Timeout.Duration < 0 { + return errors.New("drainConfig.timeout must be greater than or equal to 0") + } + + if d.GracePeriod != nil && d.GracePeriod.Duration < 0 { + return errors.New("drainConfig.gracePeriod must be greater than or equal to 0") + } + + return nil +} + type PackageRef struct { // Name of the package. Do not set unless you know what your doing. Comes from map key. //+optional diff --git a/operator/api/v1alpha1/skyhook_webhook.go b/operator/api/v1alpha1/skyhook_webhook.go index 356dbeca..814c4179 100644 --- a/operator/api/v1alpha1/skyhook_webhook.go +++ b/operator/api/v1alpha1/skyhook_webhook.go @@ -235,6 +235,10 @@ func (r *Skyhook) Validate() error { return err } + if err := r.Spec.DrainConfig.Validate(); err != nil { + return err + } + // DeploymentPolicy and InterruptionBudget are mutually exclusive if r.Spec.DeploymentPolicy != "" && (r.Spec.InterruptionBudget.Percent != nil || r.Spec.InterruptionBudget.Count != nil) { return fmt.Errorf("deploymentPolicy and interruptionBudget are mutually exclusive") diff --git a/operator/api/v1alpha1/skyhook_webhook_test.go b/operator/api/v1alpha1/skyhook_webhook_test.go index d46990ae..69ee43fe 100644 --- a/operator/api/v1alpha1/skyhook_webhook_test.go +++ b/operator/api/v1alpha1/skyhook_webhook_test.go @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. * SPDX-License-Identifier: Apache-2.0 * * @@ -20,6 +20,7 @@ package v1alpha1 import ( "encoding/json" + "time" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" @@ -513,6 +514,26 @@ var _ = Describe("Skyhook Webhook", func() { res5.MemoryLimit = resource.MustParse("-1Mi") Expect(mkSkyhook(res5).Validate()).NotTo(Succeed()) }) + + It("should validate drain config durations", func() { + skyhook := &Skyhook{ + ObjectMeta: metav1.ObjectMeta{Name: "test"}, + Spec: SkyhookSpec{ + DrainConfig: &DrainConfig{ + Timeout: &metav1.Duration{Duration: time.Minute}, + GracePeriod: &metav1.Duration{Duration: 30 * time.Second}, + }, + }, + } + Expect(skyhook.Validate()).To(Succeed()) + + skyhook.Spec.DrainConfig.Timeout = &metav1.Duration{Duration: -time.Second} + Expect(skyhook.Validate()).NotTo(Succeed()) + + skyhook.Spec.DrainConfig.Timeout = nil + skyhook.Spec.DrainConfig.GracePeriod = &metav1.Duration{Duration: -time.Second} + Expect(skyhook.Validate()).NotTo(Succeed()) + }) }) It("packages should UnmarshalJSON correctly", func() { diff --git a/operator/api/v1alpha1/zz_generated.deepcopy.go b/operator/api/v1alpha1/zz_generated.deepcopy.go index aca9129e..f90477a6 100644 --- a/operator/api/v1alpha1/zz_generated.deepcopy.go +++ b/operator/api/v1alpha1/zz_generated.deepcopy.go @@ -246,6 +246,46 @@ func (in *DeploymentStrategy) DeepCopy() *DeploymentStrategy { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *DrainConfig) DeepCopyInto(out *DrainConfig) { + *out = *in + if in.DeleteEmptyDirData != nil { + in, out := &in.DeleteEmptyDirData, &out.DeleteEmptyDirData + *out = new(bool) + **out = **in + } + if in.Force != nil { + in, out := &in.Force, &out.Force + *out = new(bool) + **out = **in + } + if in.IgnoreDaemonSets != nil { + in, out := &in.IgnoreDaemonSets, &out.IgnoreDaemonSets + *out = new(bool) + **out = **in + } + if in.Timeout != nil { + in, out := &in.Timeout, &out.Timeout + *out = new(metav1.Duration) + **out = **in + } + if in.GracePeriod != nil { + in, out := &in.GracePeriod, &out.GracePeriod + *out = new(metav1.Duration) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DrainConfig. +func (in *DrainConfig) DeepCopy() *DrainConfig { + if in == nil { + return nil + } + out := new(DrainConfig) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ExponentialStrategy) DeepCopyInto(out *ExponentialStrategy) { *out = *in @@ -652,6 +692,11 @@ func (in *SkyhookSpec) DeepCopyInto(out *SkyhookSpec) { (*in).DeepCopyInto(*out) } in.InterruptionBudget.DeepCopyInto(&out.InterruptionBudget) + if in.DrainConfig != nil { + in, out := &in.DrainConfig, &out.DrainConfig + *out = new(DrainConfig) + (*in).DeepCopyInto(*out) + } if in.Packages != nil { in, out := &in.Packages, &out.Packages *out = make(Packages, len(*in)) diff --git a/operator/cmd/cli/app/node/node_reset.go b/operator/cmd/cli/app/node/node_reset.go index 3d8da812..3f4b8c12 100644 --- a/operator/cmd/cli/app/node/node_reset.go +++ b/operator/cmd/cli/app/node/node_reset.go @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. * SPDX-License-Identifier: Apache-2.0 * * @@ -40,6 +40,37 @@ type nodeResetOptions struct { confirm bool } +func resetAnnotationKeys(skyhookName string) []string { + return []string{ + nodeStateAnnotationPrefix + skyhookName, + statusAnnotationPrefix + skyhookName, + cordonAnnotationPrefix + skyhookName, + drainStartAnnotationPrefix + skyhookName, + versionAnnotationPrefix + skyhookName, + autoTaintAnnotationPrefix + skyhookName, + } +} + +func resetLabelKeys(skyhookName string) []string { + return []string{ + statusLabelPrefix + skyhookName, + } +} + +func hasResettableMetadata(annotations, labels map[string]string, annotationKeys, labelKeys []string) bool { + for _, annotationKey := range annotationKeys { + if _, ok := annotations[annotationKey]; ok { + return true + } + } + for _, labelKey := range labelKeys { + if _, ok := labels[labelKey]; ok { + return true + } + } + return false +} + // BindToCmd binds the options to the command flags func (o *nodeResetOptions) BindToCmd(cmd *cobra.Command) { cmd.Flags().StringVar(&o.skyhookName, "skyhook", "", "Name of the Skyhook CR (required)") @@ -124,6 +155,8 @@ func runNodeReset(ctx context.Context, cmd *cobra.Command, kubeClient *client.Cl // Find nodes that have the specified Skyhook annotation annotationKey := nodeStateAnnotationPrefix + opts.skyhookName + annotationKeys := resetAnnotationKeys(opts.skyhookName) + labelKeys := resetLabelKeys(opts.skyhookName) nodesToReset := make([]string, 0, len(matchedNodes)) nodeStates := make(map[string]v1alpha1.NodeState) @@ -131,17 +164,17 @@ func runNodeReset(ctx context.Context, cmd *cobra.Command, kubeClient *client.Cl idx := nodeMap[nodeName] node := &nodeList.Items[idx] - annotation, ok := node.Annotations[annotationKey] - if !ok { + if !hasResettableMetadata(node.Annotations, node.Labels, annotationKeys, labelKeys) { continue } - var nodeState v1alpha1.NodeState - if err := json.Unmarshal([]byte(annotation), &nodeState); err != nil { - if cliCtx.GlobalFlags.Verbose { - _, _ = fmt.Fprintf(cmd.ErrOrStderr(), "Warning: skipping node %q - invalid annotation: %v\n", nodeName, err) + nodeState := v1alpha1.NodeState{} + if annotation, ok := node.Annotations[annotationKey]; ok { + if err := json.Unmarshal([]byte(annotation), &nodeState); err != nil { + if cliCtx.GlobalFlags.Verbose { + _, _ = fmt.Fprintf(cmd.ErrOrStderr(), "Warning: node %q has invalid node state annotation; resetting metadata with empty package state: %v\n", nodeName, err) + } } - continue } nodesToReset = append(nodesToReset, nodeName) @@ -191,10 +224,22 @@ func runNodeReset(ctx context.Context, cmd *cobra.Command, kubeClient *client.Cl successCount := 0 for _, nodeName := range nodesToReset { - if err := utils.RemoveNodeAnnotation(ctx, kubeClient.Kubernetes(), nodeName, annotationKey); err != nil { - updateErrors = append(updateErrors, fmt.Sprintf("%s: %v", nodeName, err)) + nodeHasError := false + for _, annotationKey := range annotationKeys { + if err := utils.RemoveNodeAnnotation(ctx, kubeClient.Kubernetes(), nodeName, annotationKey); err != nil { + updateErrors = append(updateErrors, fmt.Sprintf("%s: %v", nodeName, err)) + nodeHasError = true + break + } + } + if nodeHasError { continue } + for _, labelKey := range labelKeys { + if err := utils.RemoveNodeLabel(ctx, kubeClient.Kubernetes(), nodeName, labelKey); err != nil && cliCtx.GlobalFlags.Verbose { + _, _ = fmt.Fprintf(cmd.ErrOrStderr(), "Warning: failed to remove label %q from node %q: %v\n", labelKey, nodeName, err) + } + } successCount++ } diff --git a/operator/cmd/cli/app/node/node_reset_test.go b/operator/cmd/cli/app/node/node_reset_test.go index 5faa3c62..a108814d 100644 --- a/operator/cmd/cli/app/node/node_reset_test.go +++ b/operator/cmd/cli/app/node/node_reset_test.go @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. * SPDX-License-Identifier: Apache-2.0 * * @@ -141,12 +141,14 @@ var _ = Describe("Node Reset Command", func() { } nodeStateJSON, _ := json.Marshal(nodeState) annotationKey := nodeStateAnnotationPrefix + "my-skyhook" + drainStartAnnotationKey := drainStartAnnotationPrefix + "my-skyhook" node := &corev1.Node{ ObjectMeta: metav1.ObjectMeta{ Name: "worker-1", Annotations: map[string]string{ - annotationKey: string(nodeStateJSON), + annotationKey: string(nodeStateJSON), + drainStartAnnotationKey: "2026-06-02T12:00:00Z", }, }, } @@ -163,6 +165,87 @@ var _ = Describe("Node Reset Command", func() { Expect(err).NotTo(HaveOccurred()) _, exists := updatedNode.Annotations[annotationKey] Expect(exists).To(BeFalse()) + _, exists = updatedNode.Annotations[drainStartAnnotationKey] + Expect(exists).To(BeFalse()) + }) + + It("should reset drain metadata before node state exists", func() { + drainStartAnnotationKey := drainStartAnnotationPrefix + "my-skyhook" + statusAnnotationKey := statusAnnotationPrefix + "my-skyhook" + + node := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "worker-1", + Annotations: map[string]string{ + drainStartAnnotationKey: "2026-06-02T12:00:00Z", + statusAnnotationKey: "erroring", + }, + }, + } + _, err := mockKube.CoreV1().Nodes().Create(gocontext.Background(), node, metav1.CreateOptions{}) + Expect(err).NotTo(HaveOccurred()) + + opts := &nodeResetOptions{skyhookName: "my-skyhook", confirm: true} + err = runNodeReset(gocontext.Background(), cmd, kubeClient, []string{"worker-1"}, opts, cliCtx) + Expect(err).NotTo(HaveOccurred()) + Expect(output.String()).To(ContainSubstring("Successfully reset 1 node")) + + updatedNode, err := mockKube.CoreV1().Nodes().Get(gocontext.Background(), "worker-1", metav1.GetOptions{}) + Expect(err).NotTo(HaveOccurred()) + _, exists := updatedNode.Annotations[drainStartAnnotationKey] + Expect(exists).To(BeFalse()) + _, exists = updatedNode.Annotations[statusAnnotationKey] + Expect(exists).To(BeFalse()) + }) + + It("should reset a node with only a status label", func() { + statusLabelKey := statusLabelPrefix + "my-skyhook" + + node := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "worker-1", + Labels: map[string]string{ + statusLabelKey: "erroring", + }, + }, + } + _, err := mockKube.CoreV1().Nodes().Create(gocontext.Background(), node, metav1.CreateOptions{}) + Expect(err).NotTo(HaveOccurred()) + + opts := &nodeResetOptions{skyhookName: "my-skyhook", confirm: true} + err = runNodeReset(gocontext.Background(), cmd, kubeClient, []string{"worker-1"}, opts, cliCtx) + Expect(err).NotTo(HaveOccurred()) + Expect(output.String()).To(ContainSubstring("Successfully reset 1 node")) + + updatedNode, err := mockKube.CoreV1().Nodes().Get(gocontext.Background(), "worker-1", metav1.GetOptions{}) + Expect(err).NotTo(HaveOccurred()) + _, exists := updatedNode.Labels[statusLabelKey] + Expect(exists).To(BeFalse()) + }) + + It("should reset a node with an invalid node state annotation", func() { + annotationKey := nodeStateAnnotationPrefix + "my-skyhook" + + node := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "worker-1", + Annotations: map[string]string{ + annotationKey: "invalid-json", + }, + }, + } + _, err := mockKube.CoreV1().Nodes().Create(gocontext.Background(), node, metav1.CreateOptions{}) + Expect(err).NotTo(HaveOccurred()) + + opts := &nodeResetOptions{skyhookName: "my-skyhook", confirm: true} + err = runNodeReset(gocontext.Background(), cmd, kubeClient, []string{"worker-1"}, opts, cliCtx) + Expect(err).NotTo(HaveOccurred()) + Expect(output.String()).To(ContainSubstring("Successfully reset 1 node")) + + updatedNode, err := mockKube.CoreV1().Nodes().Get(gocontext.Background(), "worker-1", metav1.GetOptions{}) + Expect(err).NotTo(HaveOccurred()) + _, exists := updatedNode.Annotations[annotationKey] + Expect(exists).To(BeFalse()) }) It("should respect dry-run flag", func() { diff --git a/operator/cmd/cli/app/node/node_status.go b/operator/cmd/cli/app/node/node_status.go index a7f0e790..3a9ec202 100644 --- a/operator/cmd/cli/app/node/node_status.go +++ b/operator/cmd/cli/app/node/node_status.go @@ -35,7 +35,15 @@ import ( "github.com/NVIDIA/nodewright/operator/internal/cli/utils" ) -const nodeStateAnnotationPrefix = v1alpha1.METADATA_PREFIX + "/nodeState_" +const ( + nodeStateAnnotationPrefix = v1alpha1.METADATA_PREFIX + "/nodeState_" + statusAnnotationPrefix = v1alpha1.METADATA_PREFIX + "/status_" + cordonAnnotationPrefix = v1alpha1.METADATA_PREFIX + "/cordon_" + drainStartAnnotationPrefix = v1alpha1.METADATA_PREFIX + "/drainStart_" + versionAnnotationPrefix = v1alpha1.METADATA_PREFIX + "/version_" + autoTaintAnnotationPrefix = v1alpha1.METADATA_PREFIX + "/autoTaint_" + statusLabelPrefix = v1alpha1.METADATA_PREFIX + "/status_" +) // nodeStatusOptions holds the options for the node status command type nodeStatusOptions struct { diff --git a/operator/cmd/cli/app/reset.go b/operator/cmd/cli/app/reset.go index ef265edd..568e57bc 100644 --- a/operator/cmd/cli/app/reset.go +++ b/operator/cmd/cli/app/reset.go @@ -35,12 +35,13 @@ import ( ) const ( - nodeStateAnnotationPrefix = v1alpha1.METADATA_PREFIX + "/nodeState_" - statusAnnotationPrefix = v1alpha1.METADATA_PREFIX + "/status_" - cordonAnnotationPrefix = v1alpha1.METADATA_PREFIX + "/cordon_" - versionAnnotationPrefix = v1alpha1.METADATA_PREFIX + "/version_" - autoTaintAnnotationPrefix = v1alpha1.METADATA_PREFIX + "/autoTaint_" - statusLabelPrefix = v1alpha1.METADATA_PREFIX + "/status_" + nodeStateAnnotationPrefix = v1alpha1.METADATA_PREFIX + "/nodeState_" + statusAnnotationPrefix = v1alpha1.METADATA_PREFIX + "/status_" + cordonAnnotationPrefix = v1alpha1.METADATA_PREFIX + "/cordon_" + drainStartAnnotationPrefix = v1alpha1.METADATA_PREFIX + "/drainStart_" + versionAnnotationPrefix = v1alpha1.METADATA_PREFIX + "/version_" + autoTaintAnnotationPrefix = v1alpha1.METADATA_PREFIX + "/autoTaint_" + statusLabelPrefix = v1alpha1.METADATA_PREFIX + "/status_" ) // resetOptions holds the options for the reset command @@ -49,6 +50,37 @@ type resetOptions struct { skipBatchReset bool } +func resetAnnotationKeys(skyhookName string) []string { + return []string{ + nodeStateAnnotationPrefix + skyhookName, + statusAnnotationPrefix + skyhookName, + cordonAnnotationPrefix + skyhookName, + drainStartAnnotationPrefix + skyhookName, + versionAnnotationPrefix + skyhookName, + autoTaintAnnotationPrefix + skyhookName, + } +} + +func resetLabelKeys(skyhookName string) []string { + return []string{ + statusLabelPrefix + skyhookName, + } +} + +func hasResettableMetadata(annotations, labels map[string]string, annotationKeys, labelKeys []string) bool { + for _, annotationKey := range annotationKeys { + if _, ok := annotations[annotationKey]; ok { + return true + } + } + for _, labelKey := range labelKeys { + if _, ok := labels[labelKey]; ok { + return true + } + } + return false +} + // NewResetCmd creates the reset command func NewResetCmd(ctx *cliContext.CLIContext) *cobra.Command { opts := &resetOptions{} @@ -104,21 +136,23 @@ func runReset(ctx context.Context, cmd *cobra.Command, kubeClient *client.Client // Find nodes that have the specified Skyhook annotation annotationKey := nodeStateAnnotationPrefix + skyhookName + annotationKeys := resetAnnotationKeys(skyhookName) + labelKeys := resetLabelKeys(skyhookName) nodesToReset := make([]string, 0) nodeStates := make(map[string]v1alpha1.NodeState) for _, node := range nodeList.Items { - annotation, ok := node.Annotations[annotationKey] - if !ok { + if !hasResettableMetadata(node.Annotations, node.Labels, annotationKeys, labelKeys) { continue } - var nodeState v1alpha1.NodeState - if err := json.Unmarshal([]byte(annotation), &nodeState); err != nil { - if cliCtx.GlobalFlags.Verbose { - _, _ = fmt.Fprintf(cmd.ErrOrStderr(), "Warning: skipping node %q - invalid annotation: %v\n", node.Name, err) + nodeState := v1alpha1.NodeState{} + if annotation, ok := node.Annotations[annotationKey]; ok { + if err := json.Unmarshal([]byte(annotation), &nodeState); err != nil { + if cliCtx.GlobalFlags.Verbose { + _, _ = fmt.Fprintf(cmd.ErrOrStderr(), "Warning: node %q has invalid node state annotation; resetting metadata with empty package state: %v\n", node.Name, err) + } } - continue } nodesToReset = append(nodesToReset, node.Name) @@ -200,16 +234,8 @@ func resetNodeAnnotations(ctx context.Context, cmd *cobra.Command, kubeClient *c successCount := 0 for _, nodeName := range nodesToReset { - annotationsToRemove := []string{ - nodeStateAnnotationPrefix + skyhookName, - statusAnnotationPrefix + skyhookName, - cordonAnnotationPrefix + skyhookName, - versionAnnotationPrefix + skyhookName, - autoTaintAnnotationPrefix + skyhookName, - } - labelsToRemove := []string{ - statusLabelPrefix + skyhookName, - } + annotationsToRemove := resetAnnotationKeys(skyhookName) + labelsToRemove := resetLabelKeys(skyhookName) // Try to remove the main nodeState annotation first - this is the critical one mainAnnotationKey := nodeStateAnnotationPrefix + skyhookName diff --git a/operator/cmd/cli/app/reset_test.go b/operator/cmd/cli/app/reset_test.go index 6978dbd4..fb712588 100644 --- a/operator/cmd/cli/app/reset_test.go +++ b/operator/cmd/cli/app/reset_test.go @@ -166,6 +166,7 @@ var _ = Describe("Reset Command", func() { annotationKey := nodeStateAnnotationPrefix + skyhookName statusAnnotationKey := statusAnnotationPrefix + skyhookName cordonAnnotationKey := cordonAnnotationPrefix + skyhookName + drainStartAnnotationKey := drainStartAnnotationPrefix + skyhookName versionAnnotationKey := versionAnnotationPrefix + skyhookName statusLabelKey := statusLabelPrefix + skyhookName @@ -173,10 +174,11 @@ var _ = Describe("Reset Command", func() { ObjectMeta: metav1.ObjectMeta{ Name: "worker-1", Annotations: map[string]string{ - annotationKey: string(nodeStateJSON), - statusAnnotationKey: "complete", - cordonAnnotationKey: "true", - versionAnnotationKey: "1.0.0", + annotationKey: string(nodeStateJSON), + statusAnnotationKey: "complete", + cordonAnnotationKey: "true", + drainStartAnnotationKey: "2026-06-02T12:00:00Z", + versionAnnotationKey: "1.0.0", }, Labels: map[string]string{ statusLabelKey: "complete", @@ -210,6 +212,8 @@ var _ = Describe("Reset Command", func() { Expect(exists).To(BeFalse()) _, exists = updatedNode1.Annotations[cordonAnnotationKey] Expect(exists).To(BeFalse()) + _, exists = updatedNode1.Annotations[drainStartAnnotationKey] + Expect(exists).To(BeFalse()) _, exists = updatedNode1.Annotations[versionAnnotationKey] Expect(exists).To(BeFalse()) _, exists = updatedNode1.Labels[statusLabelKey] @@ -222,6 +226,60 @@ var _ = Describe("Reset Command", func() { Expect(exists).To(BeFalse()) }) + It("should reset nodes with drain metadata before node state exists", func() { + drainStartAnnotationKey := drainStartAnnotationPrefix + skyhookName + statusAnnotationKey := statusAnnotationPrefix + skyhookName + + node := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "worker-1", + Annotations: map[string]string{ + drainStartAnnotationKey: "2026-06-02T12:00:00Z", + statusAnnotationKey: "erroring", + }, + }, + } + _, err := mockKube.CoreV1().Nodes().Create(gocontext.Background(), node, metav1.CreateOptions{}) + Expect(err).NotTo(HaveOccurred()) + + opts := &resetOptions{confirm: true} + err = runReset(gocontext.Background(), cmd, kubeClient, skyhookName, opts, cliCtx) + Expect(err).NotTo(HaveOccurred()) + Expect(output.String()).To(ContainSubstring("Successfully reset 1 node")) + + updatedNode, err := mockKube.CoreV1().Nodes().Get(gocontext.Background(), "worker-1", metav1.GetOptions{}) + Expect(err).NotTo(HaveOccurred()) + _, exists := updatedNode.Annotations[drainStartAnnotationKey] + Expect(exists).To(BeFalse()) + _, exists = updatedNode.Annotations[statusAnnotationKey] + Expect(exists).To(BeFalse()) + }) + + It("should reset nodes with only a status label", func() { + statusLabelKey := statusLabelPrefix + skyhookName + + node := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "worker-1", + Labels: map[string]string{ + statusLabelKey: "erroring", + }, + }, + } + _, err := mockKube.CoreV1().Nodes().Create(gocontext.Background(), node, metav1.CreateOptions{}) + Expect(err).NotTo(HaveOccurred()) + + opts := &resetOptions{confirm: true} + err = runReset(gocontext.Background(), cmd, kubeClient, skyhookName, opts, cliCtx) + Expect(err).NotTo(HaveOccurred()) + Expect(output.String()).To(ContainSubstring("Successfully reset 1 node")) + + updatedNode, err := mockKube.CoreV1().Nodes().Get(gocontext.Background(), "worker-1", metav1.GetOptions{}) + Expect(err).NotTo(HaveOccurred()) + _, exists := updatedNode.Labels[statusLabelKey] + Expect(exists).To(BeFalse()) + }) + It("should respect dry-run flag", func() { nodeState := v1alpha1.NodeState{ "pkg1|1.0": {Name: "pkg1", Version: "1.0", Stage: v1alpha1.StageApply, State: v1alpha1.StateComplete}, @@ -319,7 +377,12 @@ var _ = Describe("Reset Command", func() { opts := &resetOptions{confirm: true} err = runReset(gocontext.Background(), cmd, kubeClient, skyhookName, opts, cliCtx) Expect(err).NotTo(HaveOccurred()) - Expect(output.String()).To(ContainSubstring("No nodes have state")) + Expect(output.String()).To(ContainSubstring("Successfully reset 1 node")) + + updatedNode, err := mockKube.CoreV1().Nodes().Get(gocontext.Background(), "worker-1", metav1.GetOptions{}) + Expect(err).NotTo(HaveOccurred()) + _, exists := updatedNode.Annotations[annotationKey] + Expect(exists).To(BeFalse()) }) It("should continue even if some annotations/labels don't exist", func() { diff --git a/operator/config/crd/bases/skyhook.nvidia.com_skyhooks.yaml b/operator/config/crd/bases/skyhook.nvidia.com_skyhooks.yaml index 96542dd4..e57d513e 100644 --- a/operator/config/crd/bases/skyhook.nvidia.com_skyhooks.yaml +++ b/operator/config/crd/bases/skyhook.nvidia.com_skyhooks.yaml @@ -138,6 +138,51 @@ spec: setting for this Skyhook type: boolean type: object + drainConfig: + description: |- + DrainConfig tunes how nodes are drained before running interrupt packages. + If unset, the operator preserves its existing drain behavior. + properties: + deleteEmptyDirData: + default: true + description: |- + DeleteEmptyDirData allows draining pods that use emptyDir volumes. + Defaults to true to preserve the operator's existing behavior. + nullable: true + type: boolean + disableEviction: + default: false + description: |- + DisableEviction bypasses the eviction API and deletes pods directly. + This bypasses PodDisruptionBudgets. + type: boolean + force: + default: true + description: |- + Force allows draining pods not managed by a controller. + Defaults to true to preserve the operator's existing behavior. + nullable: true + type: boolean + gracePeriod: + description: |- + GracePeriod overrides the grace period used on pod eviction/delete. + Unset uses each pod's own terminationGracePeriodSeconds. + nullable: true + type: string + ignoreDaemonSets: + default: true + description: |- + IgnoreDaemonSets skips DaemonSet-managed pods during drain. + Defaults to true to preserve the operator's existing behavior. + nullable: true + type: boolean + timeout: + description: |- + Timeout bounds how long the operator waits for a node to drain. + Zero or unset means no timeout. + nullable: true + type: string + type: object interruptionBudget: description: InterruptionBudget configures how many nodes that match node selectors that allowed to be interrupted at once. diff --git a/operator/internal/controller/mock/SkyhookNodes.go b/operator/internal/controller/mock/SkyhookNodes.go index 21383096..8eac1185 100644 --- a/operator/internal/controller/mock/SkyhookNodes.go +++ b/operator/internal/controller/mock/SkyhookNodes.go @@ -593,6 +593,59 @@ func (_c *MockSkyhookNodes_GetSkyhook_Call) RunAndReturn(run func() *wrapper.Sky return _c } +// HasUninstallWork provides a mock function for the type MockSkyhookNodes +func (_mock *MockSkyhookNodes) HasUninstallWork() (bool, error) { + ret := _mock.Called() + + if len(ret) == 0 { + panic("no return value specified for HasUninstallWork") + } + + var r0 bool + var r1 error + if returnFunc, ok := ret.Get(0).(func() (bool, error)); ok { + return returnFunc() + } + if returnFunc, ok := ret.Get(0).(func() bool); ok { + r0 = returnFunc() + } else { + r0 = ret.Get(0).(bool) + } + if returnFunc, ok := ret.Get(1).(func() error); ok { + r1 = returnFunc() + } else { + r1 = ret.Error(1) + } + return r0, r1 +} + +// MockSkyhookNodes_HasUninstallWork_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'HasUninstallWork' +type MockSkyhookNodes_HasUninstallWork_Call struct { + *mock.Call +} + +// HasUninstallWork is a helper method to define mock.On call +func (_e *MockSkyhookNodes_Expecter) HasUninstallWork() *MockSkyhookNodes_HasUninstallWork_Call { + return &MockSkyhookNodes_HasUninstallWork_Call{Call: _e.mock.On("HasUninstallWork")} +} + +func (_c *MockSkyhookNodes_HasUninstallWork_Call) Run(run func()) *MockSkyhookNodes_HasUninstallWork_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockSkyhookNodes_HasUninstallWork_Call) Return(b bool, err error) *MockSkyhookNodes_HasUninstallWork_Call { + _c.Call.Return(b, err) + return _c +} + +func (_c *MockSkyhookNodes_HasUninstallWork_Call) RunAndReturn(run func() (bool, error)) *MockSkyhookNodes_HasUninstallWork_Call { + _c.Call.Return(run) + return _c +} + // IsComplete provides a mock function for the type MockSkyhookNodes func (_mock *MockSkyhookNodes) IsComplete() bool { ret := _mock.Called() @@ -937,151 +990,131 @@ func (_c *MockSkyhookNodes_Status_Call) RunAndReturn(run func() v1alpha1.Status) return _c } -// UpdateCondition provides a mock function for the type MockSkyhookNodes -func (_mock *MockSkyhookNodes) UpdateCondition(logger logr.Logger) bool { - ret := _mock.Called(logger) +// UpdateBlockedCondition provides a mock function for the type MockSkyhookNodes +func (_mock *MockSkyhookNodes) UpdateBlockedCondition() error { + ret := _mock.Called() if len(ret) == 0 { - panic("no return value specified for UpdateCondition") + panic("no return value specified for UpdateBlockedCondition") } - var r0 bool - if returnFunc, ok := ret.Get(0).(func(logr.Logger) bool); ok { - r0 = returnFunc(logger) + var r0 error + if returnFunc, ok := ret.Get(0).(func() error); ok { + r0 = returnFunc() } else { - r0 = ret.Get(0).(bool) + r0 = ret.Error(0) } return r0 } -// MockSkyhookNodes_UpdateCondition_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateCondition' -type MockSkyhookNodes_UpdateCondition_Call struct { +// MockSkyhookNodes_UpdateBlockedCondition_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateBlockedCondition' +type MockSkyhookNodes_UpdateBlockedCondition_Call struct { *mock.Call } -// UpdateCondition is a helper method to define mock.On call -// - logger logr.Logger -func (_e *MockSkyhookNodes_Expecter) UpdateCondition(logger interface{}) *MockSkyhookNodes_UpdateCondition_Call { - return &MockSkyhookNodes_UpdateCondition_Call{Call: _e.mock.On("UpdateCondition", logger)} +// UpdateBlockedCondition is a helper method to define mock.On call +func (_e *MockSkyhookNodes_Expecter) UpdateBlockedCondition() *MockSkyhookNodes_UpdateBlockedCondition_Call { + return &MockSkyhookNodes_UpdateBlockedCondition_Call{Call: _e.mock.On("UpdateBlockedCondition")} } -func (_c *MockSkyhookNodes_UpdateCondition_Call) Run(run func(logger logr.Logger)) *MockSkyhookNodes_UpdateCondition_Call { +func (_c *MockSkyhookNodes_UpdateBlockedCondition_Call) Run(run func()) *MockSkyhookNodes_UpdateBlockedCondition_Call { _c.Call.Run(func(args mock.Arguments) { - var arg0 logr.Logger - if args[0] != nil { - arg0 = args[0].(logr.Logger) - } - run( - arg0, - ) + run() }) return _c } -func (_c *MockSkyhookNodes_UpdateCondition_Call) Return(b bool) *MockSkyhookNodes_UpdateCondition_Call { - _c.Call.Return(b) +func (_c *MockSkyhookNodes_UpdateBlockedCondition_Call) Return(err error) *MockSkyhookNodes_UpdateBlockedCondition_Call { + _c.Call.Return(err) return _c } -func (_c *MockSkyhookNodes_UpdateCondition_Call) RunAndReturn(run func(logger logr.Logger) bool) *MockSkyhookNodes_UpdateCondition_Call { +func (_c *MockSkyhookNodes_UpdateBlockedCondition_Call) RunAndReturn(run func() error) *MockSkyhookNodes_UpdateBlockedCondition_Call { _c.Call.Return(run) return _c } -// HasUninstallWork provides a mock function for the type MockSkyhookNodes -func (_mock *MockSkyhookNodes) HasUninstallWork() (bool, error) { - ret := _mock.Called() +// UpdateCondition provides a mock function for the type MockSkyhookNodes +func (_mock *MockSkyhookNodes) UpdateCondition(logger logr.Logger) bool { + ret := _mock.Called(logger) if len(ret) == 0 { - panic("no return value specified for HasUninstallWork") + panic("no return value specified for UpdateCondition") } var r0 bool - var r1 error - if returnFunc, ok := ret.Get(0).(func() (bool, error)); ok { - return returnFunc() - } - if returnFunc, ok := ret.Get(0).(func() bool); ok { - r0 = returnFunc() + if returnFunc, ok := ret.Get(0).(func(logr.Logger) bool); ok { + r0 = returnFunc(logger) } else { r0 = ret.Get(0).(bool) } - if returnFunc, ok := ret.Get(1).(func() error); ok { - r1 = returnFunc() - } else { - r1 = ret.Error(1) - } - return r0, r1 + return r0 } -// MockSkyhookNodes_HasUninstallWork_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'HasUninstallWork' -type MockSkyhookNodes_HasUninstallWork_Call struct { +// MockSkyhookNodes_UpdateCondition_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateCondition' +type MockSkyhookNodes_UpdateCondition_Call struct { *mock.Call } -// HasUninstallWork is a helper method to define mock.On call -func (_e *MockSkyhookNodes_Expecter) HasUninstallWork() *MockSkyhookNodes_HasUninstallWork_Call { - return &MockSkyhookNodes_HasUninstallWork_Call{Call: _e.mock.On("HasUninstallWork")} +// UpdateCondition is a helper method to define mock.On call +// - logger logr.Logger +func (_e *MockSkyhookNodes_Expecter) UpdateCondition(logger interface{}) *MockSkyhookNodes_UpdateCondition_Call { + return &MockSkyhookNodes_UpdateCondition_Call{Call: _e.mock.On("UpdateCondition", logger)} } -func (_c *MockSkyhookNodes_HasUninstallWork_Call) Run(run func()) *MockSkyhookNodes_HasUninstallWork_Call { +func (_c *MockSkyhookNodes_UpdateCondition_Call) Run(run func(logger logr.Logger)) *MockSkyhookNodes_UpdateCondition_Call { _c.Call.Run(func(args mock.Arguments) { - run() + var arg0 logr.Logger + if args[0] != nil { + arg0 = args[0].(logr.Logger) + } + run( + arg0, + ) }) return _c } -func (_c *MockSkyhookNodes_HasUninstallWork_Call) Return(b bool, err error) *MockSkyhookNodes_HasUninstallWork_Call { - _c.Call.Return(b, err) +func (_c *MockSkyhookNodes_UpdateCondition_Call) Return(b bool) *MockSkyhookNodes_UpdateCondition_Call { + _c.Call.Return(b) return _c } -func (_c *MockSkyhookNodes_HasUninstallWork_Call) RunAndReturn(run func() (bool, error)) *MockSkyhookNodes_HasUninstallWork_Call { +func (_c *MockSkyhookNodes_UpdateCondition_Call) RunAndReturn(run func(logger logr.Logger) bool) *MockSkyhookNodes_UpdateCondition_Call { _c.Call.Return(run) return _c } -// UpdateBlockedCondition provides a mock function for the type MockSkyhookNodes -func (_mock *MockSkyhookNodes) UpdateBlockedCondition() error { - ret := _mock.Called() - - if len(ret) == 0 { - panic("no return value specified for UpdateBlockedCondition") - } - - var r0 error - if returnFunc, ok := ret.Get(0).(func() error); ok { - r0 = returnFunc() - } else { - r0 = ret.Error(0) - } - return r0 +// UpdateNodeStateMalformedCondition provides a mock function for the type MockSkyhookNodes +func (_mock *MockSkyhookNodes) UpdateNodeStateMalformedCondition() { + _mock.Called() + return } -// MockSkyhookNodes_UpdateBlockedCondition_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateBlockedCondition' -type MockSkyhookNodes_UpdateBlockedCondition_Call struct { +// MockSkyhookNodes_UpdateNodeStateMalformedCondition_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateNodeStateMalformedCondition' +type MockSkyhookNodes_UpdateNodeStateMalformedCondition_Call struct { *mock.Call } -// UpdateBlockedCondition is a helper method to define mock.On call -func (_e *MockSkyhookNodes_Expecter) UpdateBlockedCondition() *MockSkyhookNodes_UpdateBlockedCondition_Call { - return &MockSkyhookNodes_UpdateBlockedCondition_Call{Call: _e.mock.On("UpdateBlockedCondition")} +// UpdateNodeStateMalformedCondition is a helper method to define mock.On call +func (_e *MockSkyhookNodes_Expecter) UpdateNodeStateMalformedCondition() *MockSkyhookNodes_UpdateNodeStateMalformedCondition_Call { + return &MockSkyhookNodes_UpdateNodeStateMalformedCondition_Call{Call: _e.mock.On("UpdateNodeStateMalformedCondition")} } -func (_c *MockSkyhookNodes_UpdateBlockedCondition_Call) Run(run func()) *MockSkyhookNodes_UpdateBlockedCondition_Call { +func (_c *MockSkyhookNodes_UpdateNodeStateMalformedCondition_Call) Run(run func()) *MockSkyhookNodes_UpdateNodeStateMalformedCondition_Call { _c.Call.Run(func(args mock.Arguments) { run() }) return _c } -func (_c *MockSkyhookNodes_UpdateBlockedCondition_Call) Return(err error) *MockSkyhookNodes_UpdateBlockedCondition_Call { - _c.Call.Return(err) +func (_c *MockSkyhookNodes_UpdateNodeStateMalformedCondition_Call) Return() *MockSkyhookNodes_UpdateNodeStateMalformedCondition_Call { + _c.Call.Return() return _c } -func (_c *MockSkyhookNodes_UpdateBlockedCondition_Call) RunAndReturn(run func() error) *MockSkyhookNodes_UpdateBlockedCondition_Call { - _c.Call.Return(run) +func (_c *MockSkyhookNodes_UpdateNodeStateMalformedCondition_Call) RunAndReturn(run func()) *MockSkyhookNodes_UpdateNodeStateMalformedCondition_Call { + _c.Run(run) return _c } @@ -1128,36 +1161,3 @@ func (_c *MockSkyhookNodes_UpdateUninstallConditions_Call) RunAndReturn(run func _c.Call.Return(run) return _c } - -// UpdateNodeStateMalformedCondition provides a mock function for the type MockSkyhookNodes -func (_mock *MockSkyhookNodes) UpdateNodeStateMalformedCondition() { - _mock.Called() - return -} - -// MockSkyhookNodes_UpdateNodeStateMalformedCondition_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateNodeStateMalformedCondition' -type MockSkyhookNodes_UpdateNodeStateMalformedCondition_Call struct { - *mock.Call -} - -// UpdateNodeStateMalformedCondition is a helper method to define mock.On call -func (_e *MockSkyhookNodes_Expecter) UpdateNodeStateMalformedCondition() *MockSkyhookNodes_UpdateNodeStateMalformedCondition_Call { - return &MockSkyhookNodes_UpdateNodeStateMalformedCondition_Call{Call: _e.mock.On("UpdateNodeStateMalformedCondition")} -} - -func (_c *MockSkyhookNodes_UpdateNodeStateMalformedCondition_Call) Run(run func()) *MockSkyhookNodes_UpdateNodeStateMalformedCondition_Call { - _c.Call.Run(func(args mock.Arguments) { - run() - }) - return _c -} - -func (_c *MockSkyhookNodes_UpdateNodeStateMalformedCondition_Call) Return() *MockSkyhookNodes_UpdateNodeStateMalformedCondition_Call { - _c.Call.Return() - return _c -} - -func (_c *MockSkyhookNodes_UpdateNodeStateMalformedCondition_Call) RunAndReturn(run func()) *MockSkyhookNodes_UpdateNodeStateMalformedCondition_Call { - _c.Run(run) - return _c -} diff --git a/operator/internal/controller/skyhook_controller.go b/operator/internal/controller/skyhook_controller.go index 438f79f3..4e219b68 100644 --- a/operator/internal/controller/skyhook_controller.go +++ b/operator/internal/controller/skyhook_controller.go @@ -35,6 +35,7 @@ import ( "github.com/NVIDIA/nodewright/operator/api/v1alpha1" "github.com/NVIDIA/nodewright/operator/internal/dal" + "github.com/NVIDIA/nodewright/operator/internal/drain" "github.com/NVIDIA/nodewright/operator/internal/version" "github.com/NVIDIA/nodewright/operator/internal/wrapper" "github.com/go-logr/logr" @@ -1460,45 +1461,62 @@ func (r *SkyhookReconciler) IsDrained(ctx context.Context, skyhookNode wrapper.S return true, nil } - // checking for any running or pending pods with no toleration to unschedulable - // if its has an unschedulable toleration we can ignore + options := resolvedDrainOptions(skyhookNode.GetSkyhook().Spec.DrainConfig) for _, pod := range pods.Items { - - if ShouldEvict(&pod) { + if drain.DecidePod(&pod, options).BlocksDrain() { return false, nil } - } return true, nil } -func ShouldEvict(pod *corev1.Pod) bool { - switch pod.Status.Phase { - case corev1.PodRunning, corev1.PodPending: +func resolvedDrainOptions(config *v1alpha1.DrainConfig) drain.Options { + options := drain.DefaultOptions() + if config == nil { + return options + } - for _, taint := range pod.Spec.Tolerations { - switch taint.Key { - case "node.kubernetes.io/unschedulable": // ignoring - return false - } + options.DisableEviction = config.DisableEviction + if config.DeleteEmptyDirData != nil { + options.DeleteEmptyDirData = *config.DeleteEmptyDirData + } + if config.Force != nil { + options.Force = *config.Force + } + if config.IgnoreDaemonSets != nil { + options.IgnoreDaemonSets = *config.IgnoreDaemonSets + } + if config.GracePeriod != nil { + seconds := int64(config.GracePeriod.Duration / time.Second) + if config.GracePeriod.Duration%time.Second != 0 { + seconds++ } + options.GracePeriodSeconds = &seconds + } - if len(pod.ObjectMeta.OwnerReferences) > 1 { - for _, owner := range pod.ObjectMeta.OwnerReferences { - if owner.Kind == "DaemonSet" { // ignoring - return false - } - } - } + return options +} - if pod.GetNamespace() == "kube-system" { - return false - } +func drainDeleteOptions(options drain.Options) []client.DeleteOption { + if options.GracePeriodSeconds == nil { + return nil + } + return []client.DeleteOption{client.GracePeriodSeconds(*options.GracePeriodSeconds)} +} - return true +func drainEvictionDeleteOptions(options drain.Options) *metav1.DeleteOptions { + if options.GracePeriodSeconds == nil { + return nil } - return false + return &metav1.DeleteOptions{GracePeriodSeconds: options.GracePeriodSeconds} +} + +func drainTimedOut(startedAt *metav1.Time, timeout *metav1.Duration, now time.Time) bool { + if startedAt == nil || timeout == nil || timeout.Duration == 0 { + return false + } + return !now.Before(startedAt.Add(timeout.Duration)) } // HandleFinalizer returns true only if we container is deleted and we handled it completely, else false. @@ -1750,9 +1768,42 @@ func (r *SkyhookReconciler) DrainNode(ctx context.Context, skyhookNode wrapper.S return false, err } if drained { + skyhookNode.ClearDrainStart() return true, nil } + drainStartedAt, err := skyhookNode.DrainStartedAt() + if err != nil { + return false, fmt.Errorf("error reading drain start for node [%s]: %w", skyhookNode.GetNode().Name, err) + } + + drainConfig := skyhookNode.GetSkyhook().Spec.DrainConfig + now := metav1.Now() + if drainStartedAt == nil { + skyhookNode.StartDrain(now) + skyhookNode.SetStatus(v1alpha1.StatusInProgress) + } else if drainConfig != nil && drainTimedOut(drainStartedAt, drainConfig.Timeout, now.Time) { + if skyhookNode.Status() != v1alpha1.StatusErroring { + r.recorder.Eventf(skyhookNode.GetNode(), nil, corev1.EventTypeWarning, EventsReasonSkyhookDrain, "DrainTimeout", + "drain timed out after [%s] for node [%s] package [%s:%s] from [skyhook:%s]", + drainConfig.Timeout.Duration, + skyhookNode.GetNode().Name, + _package.Name, + _package.Version, + skyhookNode.GetSkyhook().Name, + ) + r.recorder.Eventf(skyhookNode.GetSkyhook().Skyhook, nil, corev1.EventTypeWarning, EventsReasonSkyhookDrain, "DrainTimeout", + "drain timed out after [%s] for node [%s] package [%s:%s]", + drainConfig.Timeout.Duration, + skyhookNode.GetNode().Name, + _package.Name, + _package.Version, + ) + } + skyhookNode.SetStatus(v1alpha1.StatusErroring) + return false, nil + } + pods, err := r.dal.GetPods(ctx, client.MatchingFields{ fieldSelectorNodeName: skyhookNode.GetNode().Name, }) @@ -1772,19 +1823,35 @@ func (r *SkyhookReconciler) DrainNode(ctx context.Context, skyhookNode wrapper.S skyhookNode.GetSkyhook().Name, ) + options := resolvedDrainOptions(skyhookNode.GetSkyhook().Spec.DrainConfig) errs := make([]error, 0) + waitingForPods := false for _, pod := range pods.Items { - - if ShouldEvict(&pod) { - eviction := policyv1.Eviction{} + decision := drain.DecidePod(&pod, options) + switch decision.Action { + case drain.ActionBlock: + waitingForPods = true + case drain.ActionEvict: + waitingForPods = true + eviction := policyv1.Eviction{DeleteOptions: drainEvictionDeleteOptions(options)} err := r.Client.SubResource("eviction").Create(ctx, &pod, &eviction) if err != nil { errs = append(errs, fmt.Errorf("error evicting pod [%s:%s]: %w", pod.Namespace, pod.Name, err)) } + case drain.ActionDelete: + waitingForPods = true + err := r.Delete(ctx, &pod, drainDeleteOptions(options)...) + if err != nil { + errs = append(errs, fmt.Errorf("error deleting pod [%s:%s]: %w", pod.Namespace, pod.Name, err)) + } } } - return len(errs) == 0, utilerrors.NewAggregate(errs) + if len(errs) > 0 { + return false, utilerrors.NewAggregate(errs) + } + + return !waitingForPods, nil } // Interrupt should not be called unless safe to do so, IE already cordoned and drained diff --git a/operator/internal/controller/skyhook_controller_test.go b/operator/internal/controller/skyhook_controller_test.go index 80d381a0..d7ef99d7 100644 --- a/operator/internal/controller/skyhook_controller_test.go +++ b/operator/internal/controller/skyhook_controller_test.go @@ -34,9 +34,15 @@ import ( . "github.com/onsi/gomega" "github.com/stretchr/testify/mock" corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/events" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/client/interceptor" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/reconcile" ) @@ -520,6 +526,292 @@ var _ = Describe("skyhook controller tests", func() { Expect(envs).To(BeEquivalentTo(expected)) }) + Context("DrainNode", func() { + fakeDrainClient := func(objects ...client.Object) client.WithWatch { + scheme := runtime.NewScheme() + Expect(corev1.AddToScheme(scheme)).To(Succeed()) + Expect(v1alpha1.AddToScheme(scheme)).To(Succeed()) + + return fake.NewClientBuilder(). + WithScheme(scheme). + WithObjects(objects...). + WithIndex(&corev1.Pod{}, fieldSelectorNodeName, func(obj client.Object) []string { + pod, ok := obj.(*corev1.Pod) + if !ok { + return nil + } + return []string{pod.Spec.NodeName} + }). + Build() + } + + It("should delete pods directly when disableEviction is true", func() { + gracePeriodSeconds := int64(-1) + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "workload", + Namespace: "default", + }, + Spec: corev1.PodSpec{ + NodeName: "node-a", + Containers: []corev1.Container{ + {Name: "workload", Image: "busybox"}, + }, + }, + Status: corev1.PodStatus{Phase: corev1.PodRunning}, + } + + baseClient := fakeDrainClient(pod) + testClient := interceptor.NewClient(baseClient, interceptor.Funcs{ + Delete: func(ctx context.Context, c client.WithWatch, obj client.Object, opts ...client.DeleteOption) error { + deleteOptions := &client.DeleteOptions{} + deleteOptions.ApplyOptions(opts) + if deleteOptions.GracePeriodSeconds != nil { + gracePeriodSeconds = *deleteOptions.GracePeriodSeconds + } + return c.Delete(ctx, obj, opts...) + }, + }) + + r, err := NewSkyhookReconciler(testClient.Scheme(), testClient, events.NewFakeRecorder(10), opts) + Expect(err).ToNot(HaveOccurred()) + + node := &corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node-a"}} + skyhook := &v1alpha1.Skyhook{ + ObjectMeta: metav1.ObjectMeta{Name: "drain-delete"}, + Spec: v1alpha1.SkyhookSpec{ + DrainConfig: &v1alpha1.DrainConfig{ + DisableEviction: true, + GracePeriod: &metav1.Duration{Duration: 7 * time.Second}, + }, + Packages: v1alpha1.Packages{}, + }, + } + skyhookNode, err := wrapper.NewSkyhookNode(node, skyhook) + Expect(err).ToNot(HaveOccurred()) + + drained, err := r.DrainNode(ctx, skyhookNode, &v1alpha1.Package{ + PackageRef: v1alpha1.PackageRef{Name: "pkg", Version: "1.0.0"}, + }) + Expect(err).ToNot(HaveOccurred()) + Expect(drained).To(BeFalse()) + Expect(gracePeriodSeconds).To(Equal(int64(7))) + + deletedPod := &corev1.Pod{} + err = testClient.Get(ctx, types.NamespacedName{Namespace: "default", Name: "workload"}, deletedPod) + Expect(apierrors.IsNotFound(err)).To(BeTrue()) + + drained, err = r.DrainNode(ctx, skyhookNode, &v1alpha1.Package{ + PackageRef: v1alpha1.PackageRef{Name: "pkg", Version: "1.0.0"}, + }) + Expect(err).ToNot(HaveOccurred()) + Expect(drained).To(BeTrue()) + }) + + It("should wait without deleting unmanaged pods when force is false", func() { + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "workload", + Namespace: "default", + }, + Spec: corev1.PodSpec{ + NodeName: "node-a", + Containers: []corev1.Container{ + {Name: "workload", Image: "busybox"}, + }, + }, + Status: corev1.PodStatus{Phase: corev1.PodRunning}, + } + + deleteCalled := false + evictCalled := false + baseClient := fakeDrainClient(pod) + testClient := interceptor.NewClient(baseClient, interceptor.Funcs{ + Delete: func(ctx context.Context, c client.WithWatch, obj client.Object, opts ...client.DeleteOption) error { + deleteCalled = true + return c.Delete(ctx, obj, opts...) + }, + SubResourceCreate: func(ctx context.Context, c client.Client, subResourceName string, obj client.Object, subResource client.Object, opts ...client.SubResourceCreateOption) error { + evictCalled = true + return nil + }, + }) + + r, err := NewSkyhookReconciler(testClient.Scheme(), testClient, events.NewFakeRecorder(10), opts) + Expect(err).ToNot(HaveOccurred()) + + force := false + node := &corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node-a"}} + skyhook := &v1alpha1.Skyhook{ + ObjectMeta: metav1.ObjectMeta{Name: "drain-block"}, + Spec: v1alpha1.SkyhookSpec{ + DrainConfig: &v1alpha1.DrainConfig{ + Force: &force, + }, + Packages: v1alpha1.Packages{}, + }, + } + skyhookNode, err := wrapper.NewSkyhookNode(node, skyhook) + Expect(err).ToNot(HaveOccurred()) + + drained, err := r.DrainNode(ctx, skyhookNode, &v1alpha1.Package{ + PackageRef: v1alpha1.PackageRef{Name: "pkg", Version: "1.0.0"}, + }) + Expect(err).ToNot(HaveOccurred()) + Expect(drained).To(BeFalse()) + Expect(deleteCalled).To(BeFalse()) + Expect(evictCalled).To(BeFalse()) + Expect(skyhookNode.Status()).To(Equal(v1alpha1.StatusInProgress)) + }) + + It("should block before drain when podNonInterruptLabels match a running pod", func() { + goldenPod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "golden", + Namespace: "default", + Labels: map[string]string{ + "workload": "golden", + }, + }, + Spec: corev1.PodSpec{ + NodeName: "node-a", + Containers: []corev1.Container{ + {Name: "golden", Image: "busybox"}, + }, + }, + Status: corev1.PodStatus{Phase: corev1.PodRunning}, + } + evictablePod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "evictable", + Namespace: "default", + }, + Spec: corev1.PodSpec{ + NodeName: "node-a", + Containers: []corev1.Container{ + {Name: "evictable", Image: "busybox"}, + }, + }, + Status: corev1.PodStatus{Phase: corev1.PodRunning}, + } + + deleteCalled := false + evictCalled := false + baseClient := fakeDrainClient(goldenPod, evictablePod) + testClient := interceptor.NewClient(baseClient, interceptor.Funcs{ + Delete: func(ctx context.Context, c client.WithWatch, obj client.Object, opts ...client.DeleteOption) error { + deleteCalled = true + return c.Delete(ctx, obj, opts...) + }, + SubResourceCreate: func(ctx context.Context, c client.Client, subResourceName string, obj client.Object, subResource client.Object, opts ...client.SubResourceCreateOption) error { + evictCalled = true + return nil + }, + }) + + r, err := NewSkyhookReconciler(testClient.Scheme(), testClient, events.NewFakeRecorder(10), opts) + Expect(err).ToNot(HaveOccurred()) + + node := &corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node-a"}} + skyhook := &v1alpha1.Skyhook{ + ObjectMeta: metav1.ObjectMeta{Name: "drain-golden"}, + Spec: v1alpha1.SkyhookSpec{ + PodNonInterruptLabels: metav1.LabelSelector{ + MatchLabels: map[string]string{ + "workload": "golden", + }, + }, + DrainConfig: &v1alpha1.DrainConfig{ + DisableEviction: true, + }, + Packages: v1alpha1.Packages{}, + }, + } + skyhookNode, err := wrapper.NewSkyhookNode(node, skyhook) + Expect(err).ToNot(HaveOccurred()) + + ready, err := r.EnsureNodeIsReadyForInterrupt(ctx, skyhookNode, &v1alpha1.Package{ + PackageRef: v1alpha1.PackageRef{Name: "pkg", Version: "1.0.0"}, + }) + Expect(err).ToNot(HaveOccurred()) + Expect(ready).To(BeFalse()) + Expect(deleteCalled).To(BeFalse()) + Expect(evictCalled).To(BeFalse()) + Expect(skyhookNode.GetNode().Spec.Unschedulable).To(BeTrue()) + + drainStartedAt, err := skyhookNode.DrainStartedAt() + Expect(err).ToNot(HaveOccurred()) + Expect(drainStartedAt).To(BeNil()) + }) + + It("should mark the node erroring when drain timeout expires", func() { + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "workload", + Namespace: "default", + OwnerReferences: []metav1.OwnerReference{ + { + Kind: "ReplicaSet", + Name: "workload-rs", + Controller: ptr(true), + }, + }, + }, + Spec: corev1.PodSpec{ + NodeName: "node-a", + Containers: []corev1.Container{ + {Name: "workload", Image: "busybox"}, + }, + }, + Status: corev1.PodStatus{Phase: corev1.PodRunning}, + } + + deleteCalled := false + baseClient := fakeDrainClient(pod) + testClient := interceptor.NewClient(baseClient, interceptor.Funcs{ + Delete: func(ctx context.Context, c client.WithWatch, obj client.Object, opts ...client.DeleteOption) error { + deleteCalled = true + return c.Delete(ctx, obj, opts...) + }, + SubResourceCreate: func(ctx context.Context, c client.Client, subResourceName string, obj client.Object, subResource client.Object, opts ...client.SubResourceCreateOption) error { + Fail("drain timeout should not evict pods") + return nil + }, + }) + + r, err := NewSkyhookReconciler(testClient.Scheme(), testClient, events.NewFakeRecorder(10), opts) + Expect(err).ToNot(HaveOccurred()) + + node := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node-a", + Annotations: map[string]string{ + "skyhook.nvidia.com/drainStart_drain-timeout": time.Now().Add(-2 * time.Minute).Format(time.RFC3339Nano), + }, + }, + } + skyhook := &v1alpha1.Skyhook{ + ObjectMeta: metav1.ObjectMeta{Name: "drain-timeout"}, + Spec: v1alpha1.SkyhookSpec{ + DrainConfig: &v1alpha1.DrainConfig{ + Timeout: &metav1.Duration{Duration: time.Second}, + }, + Packages: v1alpha1.Packages{}, + }, + } + skyhookNode, err := wrapper.NewSkyhookNode(node, skyhook) + Expect(err).ToNot(HaveOccurred()) + + drained, err := r.DrainNode(ctx, skyhookNode, &v1alpha1.Package{ + PackageRef: v1alpha1.PackageRef{Name: "pkg", Version: "1.0.0"}, + }) + Expect(err).ToNot(HaveOccurred()) + Expect(drained).To(BeFalse()) + Expect(deleteCalled).To(BeFalse()) + Expect(skyhookNode.Status()).To(Equal(v1alpha1.StatusErroring)) + }) + }) + It("should set monotonic SKYHOOK_NODE_ORDER across nodes and batches", func() { now := time.Now() testSkyhook := wrapper.NewSkyhookWrapper(&v1alpha1.Skyhook{ diff --git a/operator/internal/drain/drain.go b/operator/internal/drain/drain.go new file mode 100644 index 00000000..4a0e4078 --- /dev/null +++ b/operator/internal/drain/drain.go @@ -0,0 +1,155 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package drain + +import ( + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/klog/v2" +) + +const ( + MirrorPodAnnotationKey = "kubernetes.io/config.mirror" + + ReasonPhase = "phase" + ReasonTerminating = "terminating" + ReasonUnschedulableToleration = "unschedulable-toleration" + ReasonDaemonSet = "daemonset" + ReasonKubeSystem = "kube-system" + ReasonMirrorPod = "mirror-pod" + ReasonUnmanaged = "unmanaged" + ReasonEmptyDir = "emptydir" + ReasonEviction = "eviction" + ReasonDelete = "delete" +) + +type Action string + +const ( + ActionIgnore Action = "ignore" + ActionBlock Action = "block" + ActionEvict Action = "evict" + ActionDelete Action = "delete" +) + +type Options struct { + DisableEviction bool + DeleteEmptyDirData bool + Force bool + IgnoreDaemonSets bool + GracePeriodSeconds *int64 +} + +func DefaultOptions() Options { + return Options{ + DeleteEmptyDirData: true, + Force: true, + IgnoreDaemonSets: true, + } +} + +type Decision struct { + Action Action + Reason string +} + +func (d Decision) BlocksDrain() bool { + return d.Action != ActionIgnore +} + +func (d Decision) RequiresAction() bool { + return d.Action == ActionEvict || d.Action == ActionDelete +} + +func DecidePod(pod *corev1.Pod, options Options) Decision { + if pod == nil { + return Decision{Action: ActionIgnore, Reason: ReasonPhase} + } + + switch pod.Status.Phase { + case corev1.PodRunning, corev1.PodPending: + default: + return Decision{Action: ActionIgnore, Reason: ReasonPhase} + } + + if pod.DeletionTimestamp != nil { + return Decision{Action: ActionIgnore, Reason: ReasonTerminating} + } + + if toleratesUnschedulable(pod) { + return Decision{Action: ActionIgnore, Reason: ReasonUnschedulableToleration} + } + + controllerRef := metav1.GetControllerOf(pod) + if options.IgnoreDaemonSets && controllerRef != nil && controllerRef.Kind == "DaemonSet" { + return Decision{Action: ActionIgnore, Reason: ReasonDaemonSet} + } + + if pod.Namespace == "kube-system" { + return Decision{Action: ActionIgnore, Reason: ReasonKubeSystem} + } + + if isMirrorPod(pod) { + return Decision{Action: ActionIgnore, Reason: ReasonMirrorPod} + } + + if controllerRef == nil && !options.Force { + return Decision{Action: ActionBlock, Reason: ReasonUnmanaged} + } + + if hasEmptyDir(pod) && !options.DeleteEmptyDirData { + return Decision{Action: ActionBlock, Reason: ReasonEmptyDir} + } + + if options.DisableEviction { + return Decision{Action: ActionDelete, Reason: ReasonDelete} + } + + return Decision{Action: ActionEvict, Reason: ReasonEviction} +} + +func toleratesUnschedulable(pod *corev1.Pod) bool { + unschedulableTaint := corev1.Taint{ + Key: corev1.TaintNodeUnschedulable, + Effect: corev1.TaintEffectNoSchedule, + } + for _, toleration := range pod.Spec.Tolerations { + if toleration.ToleratesTaint(klog.Background(), &unschedulableTaint, false) { + return true + } + } + return false +} + +func isMirrorPod(pod *corev1.Pod) bool { + if pod.Annotations == nil { + return false + } + _, ok := pod.Annotations[MirrorPodAnnotationKey] + return ok +} + +func hasEmptyDir(pod *corev1.Pod) bool { + for _, volume := range pod.Spec.Volumes { + if volume.EmptyDir != nil { + return true + } + } + return false +} diff --git a/operator/internal/drain/drain_suite_test.go b/operator/internal/drain/drain_suite_test.go new file mode 100644 index 00000000..d2796cfd --- /dev/null +++ b/operator/internal/drain/drain_suite_test.go @@ -0,0 +1,31 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package drain + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestDrain(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Drain Suite") +} diff --git a/operator/internal/drain/drain_test.go b/operator/internal/drain/drain_test.go new file mode 100644 index 00000000..ab33dbdd --- /dev/null +++ b/operator/internal/drain/drain_test.go @@ -0,0 +1,232 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package drain + +import ( + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +var _ = Describe("DecidePod", func() { + controller := true + const daemonSetKind = "DaemonSet" + + basePod := func() *corev1.Pod { + return &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "workload", + Namespace: "default", + OwnerReferences: []metav1.OwnerReference{ + { + Kind: "ReplicaSet", + Name: "workload-rs", + Controller: &controller, + }, + }, + }, + Status: corev1.PodStatus{Phase: corev1.PodRunning}, + } + } + + It("should preserve the current default behavior", func() { + pod := basePod() + + decision := DecidePod(pod, DefaultOptions()) + + Expect(decision).To(Equal(Decision{Action: ActionEvict, Reason: ReasonEviction})) + Expect(decision.BlocksDrain()).To(BeTrue()) + Expect(decision.RequiresAction()).To(BeTrue()) + }) + + It("should delete directly when eviction is disabled", func() { + pod := basePod() + options := DefaultOptions() + options.DisableEviction = true + + decision := DecidePod(pod, options) + + Expect(decision).To(Equal(Decision{Action: ActionDelete, Reason: ReasonDelete})) + Expect(decision.BlocksDrain()).To(BeTrue()) + Expect(decision.RequiresAction()).To(BeTrue()) + }) + + It("should ignore pods that are not running or pending", func() { + pod := basePod() + pod.Status.Phase = corev1.PodSucceeded + + decision := DecidePod(pod, DefaultOptions()) + + Expect(decision).To(Equal(Decision{Action: ActionIgnore, Reason: ReasonPhase})) + Expect(decision.BlocksDrain()).To(BeFalse()) + }) + + It("should ignore pods that are already terminating", func() { + pod := basePod() + deletionTimestamp := metav1.NewTime(time.Date(2026, time.June, 2, 12, 0, 0, 0, time.UTC)) + pod.DeletionTimestamp = &deletionTimestamp + + decision := DecidePod(pod, DefaultOptions()) + + Expect(decision).To(Equal(Decision{Action: ActionIgnore, Reason: ReasonTerminating})) + Expect(decision.BlocksDrain()).To(BeFalse()) + }) + + It("should never drain pods that tolerate the unschedulable taint", func() { + pod := basePod() + pod.Spec.Tolerations = []corev1.Toleration{ + {Key: corev1.TaintNodeUnschedulable}, + } + + decision := DecidePod(pod, DefaultOptions()) + + Expect(decision).To(Equal(Decision{Action: ActionIgnore, Reason: ReasonUnschedulableToleration})) + Expect(decision.BlocksDrain()).To(BeFalse()) + }) + + DescribeTable("should apply Kubernetes toleration matching for the unschedulable taint", + func(toleration corev1.Toleration, expectedDecision Decision) { + pod := basePod() + pod.Spec.Tolerations = []corev1.Toleration{toleration} + + decision := DecidePod(pod, DefaultOptions()) + + Expect(decision).To(Equal(expectedDecision)) + }, + Entry("matches Exists on the NoSchedule unschedulable taint", + corev1.Toleration{ + Key: corev1.TaintNodeUnschedulable, + Operator: corev1.TolerationOpExists, + Effect: corev1.TaintEffectNoSchedule, + }, + Decision{Action: ActionIgnore, Reason: ReasonUnschedulableToleration}, + ), + Entry("does not match a different effect", + corev1.Toleration{ + Key: corev1.TaintNodeUnschedulable, + Operator: corev1.TolerationOpExists, + Effect: corev1.TaintEffectNoExecute, + }, + Decision{Action: ActionEvict, Reason: ReasonEviction}, + ), + Entry("does not match a different value with Equal", + corev1.Toleration{ + Key: corev1.TaintNodeUnschedulable, + Operator: corev1.TolerationOpEqual, + Value: "true", + Effect: corev1.TaintEffectNoSchedule, + }, + Decision{Action: ActionEvict, Reason: ReasonEviction}, + ), + ) + + It("should ignore daemonset pods by default", func() { + pod := basePod() + pod.OwnerReferences[0].Kind = daemonSetKind + + decision := DecidePod(pod, DefaultOptions()) + + Expect(decision).To(Equal(Decision{Action: ActionIgnore, Reason: ReasonDaemonSet})) + Expect(decision.BlocksDrain()).To(BeFalse()) + }) + + It("should drain daemonset pods when ignoreDaemonSets is false", func() { + pod := basePod() + pod.OwnerReferences[0].Kind = daemonSetKind + options := DefaultOptions() + options.IgnoreDaemonSets = false + + decision := DecidePod(pod, options) + + Expect(decision).To(Equal(Decision{Action: ActionEvict, Reason: ReasonEviction})) + }) + + It("should never drain kube-system pods", func() { + pod := basePod() + pod.Namespace = "kube-system" + + decision := DecidePod(pod, DefaultOptions()) + + Expect(decision).To(Equal(Decision{Action: ActionIgnore, Reason: ReasonKubeSystem})) + Expect(decision.BlocksDrain()).To(BeFalse()) + }) + + It("should ignore mirror pods", func() { + pod := basePod() + pod.Annotations = map[string]string{ + MirrorPodAnnotationKey: "mirror-id", + } + + decision := DecidePod(pod, DefaultOptions()) + + Expect(decision).To(Equal(Decision{Action: ActionIgnore, Reason: ReasonMirrorPod})) + Expect(decision.BlocksDrain()).To(BeFalse()) + }) + + It("should evict unmanaged pods by default", func() { + pod := basePod() + pod.OwnerReferences = nil + + decision := DecidePod(pod, DefaultOptions()) + + Expect(decision).To(Equal(Decision{Action: ActionEvict, Reason: ReasonEviction})) + }) + + It("should block on unmanaged pods when force is false", func() { + pod := basePod() + pod.OwnerReferences = nil + options := DefaultOptions() + options.Force = false + + decision := DecidePod(pod, options) + + Expect(decision).To(Equal(Decision{Action: ActionBlock, Reason: ReasonUnmanaged})) + Expect(decision.BlocksDrain()).To(BeTrue()) + Expect(decision.RequiresAction()).To(BeFalse()) + }) + + It("should evict emptyDir pods by default", func() { + pod := basePod() + pod.Spec.Volumes = []corev1.Volume{ + {Name: "scratch", VolumeSource: corev1.VolumeSource{EmptyDir: &corev1.EmptyDirVolumeSource{}}}, + } + + decision := DecidePod(pod, DefaultOptions()) + + Expect(decision).To(Equal(Decision{Action: ActionEvict, Reason: ReasonEviction})) + }) + + It("should block on emptyDir pods when deleteEmptyDirData is false", func() { + pod := basePod() + pod.Spec.Volumes = []corev1.Volume{ + {Name: "scratch", VolumeSource: corev1.VolumeSource{EmptyDir: &corev1.EmptyDirVolumeSource{}}}, + } + options := DefaultOptions() + options.DeleteEmptyDirData = false + + decision := DecidePod(pod, options) + + Expect(decision).To(Equal(Decision{Action: ActionBlock, Reason: ReasonEmptyDir})) + Expect(decision.BlocksDrain()).To(BeTrue()) + Expect(decision.RequiresAction()).To(BeFalse()) + }) +}) diff --git a/operator/internal/wrapper/mock/SkyhookNode.go b/operator/internal/wrapper/mock/SkyhookNode.go index f47a596a..ec7de777 100644 --- a/operator/internal/wrapper/mock/SkyhookNode.go +++ b/operator/internal/wrapper/mock/SkyhookNode.go @@ -27,7 +27,8 @@ import ( "github.com/NVIDIA/nodewright/operator/internal/wrapper" "github.com/go-logr/logr" mock "github.com/stretchr/testify/mock" - "k8s.io/api/core/v1" + v10 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1" ) // NewMockSkyhookNode creates a new instance of MockSkyhookNode. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. @@ -134,6 +135,39 @@ func (_c *MockSkyhookNode_CleanupSCRMetadata_Call) RunAndReturn(run func()) *Moc return _c } +// ClearDrainStart provides a mock function for the type MockSkyhookNode +func (_mock *MockSkyhookNode) ClearDrainStart() { + _mock.Called() + return +} + +// MockSkyhookNode_ClearDrainStart_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ClearDrainStart' +type MockSkyhookNode_ClearDrainStart_Call struct { + *mock.Call +} + +// ClearDrainStart is a helper method to define mock.On call +func (_e *MockSkyhookNode_Expecter) ClearDrainStart() *MockSkyhookNode_ClearDrainStart_Call { + return &MockSkyhookNode_ClearDrainStart_Call{Call: _e.mock.On("ClearDrainStart")} +} + +func (_c *MockSkyhookNode_ClearDrainStart_Call) Run(run func()) *MockSkyhookNode_ClearDrainStart_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockSkyhookNode_ClearDrainStart_Call) Return() *MockSkyhookNode_ClearDrainStart_Call { + _c.Call.Return() + return _c +} + +func (_c *MockSkyhookNode_ClearDrainStart_Call) RunAndReturn(run func()) *MockSkyhookNode_ClearDrainStart_Call { + _c.Run(run) + return _c +} + // Cordon provides a mock function for the type MockSkyhookNode func (_mock *MockSkyhookNode) Cordon() { _mock.Called() @@ -167,6 +201,61 @@ func (_c *MockSkyhookNode_Cordon_Call) RunAndReturn(run func()) *MockSkyhookNode return _c } +// DrainStartedAt provides a mock function for the type MockSkyhookNode +func (_mock *MockSkyhookNode) DrainStartedAt() (*v1.Time, error) { + ret := _mock.Called() + + if len(ret) == 0 { + panic("no return value specified for DrainStartedAt") + } + + var r0 *v1.Time + var r1 error + if returnFunc, ok := ret.Get(0).(func() (*v1.Time, error)); ok { + return returnFunc() + } + if returnFunc, ok := ret.Get(0).(func() *v1.Time); ok { + r0 = returnFunc() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*v1.Time) + } + } + if returnFunc, ok := ret.Get(1).(func() error); ok { + r1 = returnFunc() + } else { + r1 = ret.Error(1) + } + return r0, r1 +} + +// MockSkyhookNode_DrainStartedAt_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DrainStartedAt' +type MockSkyhookNode_DrainStartedAt_Call struct { + *mock.Call +} + +// DrainStartedAt is a helper method to define mock.On call +func (_e *MockSkyhookNode_Expecter) DrainStartedAt() *MockSkyhookNode_DrainStartedAt_Call { + return &MockSkyhookNode_DrainStartedAt_Call{Call: _e.mock.On("DrainStartedAt")} +} + +func (_c *MockSkyhookNode_DrainStartedAt_Call) Run(run func()) *MockSkyhookNode_DrainStartedAt_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockSkyhookNode_DrainStartedAt_Call) Return(time *v1.Time, err error) *MockSkyhookNode_DrainStartedAt_Call { + _c.Call.Return(time, err) + return _c +} + +func (_c *MockSkyhookNode_DrainStartedAt_Call) RunAndReturn(run func() (*v1.Time, error)) *MockSkyhookNode_DrainStartedAt_Call { + _c.Call.Return(run) + return _c +} + // GetComplete provides a mock function for the type MockSkyhookNode func (_mock *MockSkyhookNode) GetComplete() []string { ret := _mock.Called() @@ -214,19 +303,19 @@ func (_c *MockSkyhookNode_GetComplete_Call) RunAndReturn(run func() []string) *M } // GetNode provides a mock function for the type MockSkyhookNode -func (_mock *MockSkyhookNode) GetNode() *v1.Node { +func (_mock *MockSkyhookNode) GetNode() *v10.Node { ret := _mock.Called() if len(ret) == 0 { panic("no return value specified for GetNode") } - var r0 *v1.Node - if returnFunc, ok := ret.Get(0).(func() *v1.Node); ok { + var r0 *v10.Node + if returnFunc, ok := ret.Get(0).(func() *v10.Node); ok { r0 = returnFunc() } else { if ret.Get(0) != nil { - r0 = ret.Get(0).(*v1.Node) + r0 = ret.Get(0).(*v10.Node) } } return r0 @@ -249,12 +338,12 @@ func (_c *MockSkyhookNode_GetNode_Call) Run(run func()) *MockSkyhookNode_GetNode return _c } -func (_c *MockSkyhookNode_GetNode_Call) Return(node *v1.Node) *MockSkyhookNode_GetNode_Call { +func (_c *MockSkyhookNode_GetNode_Call) Return(node *v10.Node) *MockSkyhookNode_GetNode_Call { _c.Call.Return(node) return _c } -func (_c *MockSkyhookNode_GetNode_Call) RunAndReturn(run func() *v1.Node) *MockSkyhookNode_GetNode_Call { +func (_c *MockSkyhookNode_GetNode_Call) RunAndReturn(run func() *v10.Node) *MockSkyhookNode_GetNode_Call { _c.Call.Return(run) return _c } @@ -1041,6 +1130,46 @@ func (_c *MockSkyhookNode_SetVersion_Call) RunAndReturn(run func()) *MockSkyhook return _c } +// StartDrain provides a mock function for the type MockSkyhookNode +func (_mock *MockSkyhookNode) StartDrain(startedAt v1.Time) { + _mock.Called(startedAt) + return +} + +// MockSkyhookNode_StartDrain_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'StartDrain' +type MockSkyhookNode_StartDrain_Call struct { + *mock.Call +} + +// StartDrain is a helper method to define mock.On call +// - startedAt v1.Time +func (_e *MockSkyhookNode_Expecter) StartDrain(startedAt interface{}) *MockSkyhookNode_StartDrain_Call { + return &MockSkyhookNode_StartDrain_Call{Call: _e.mock.On("StartDrain", startedAt)} +} + +func (_c *MockSkyhookNode_StartDrain_Call) Run(run func(startedAt v1.Time)) *MockSkyhookNode_StartDrain_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 v1.Time + if args[0] != nil { + arg0 = args[0].(v1.Time) + } + run( + arg0, + ) + }) + return _c +} + +func (_c *MockSkyhookNode_StartDrain_Call) Return() *MockSkyhookNode_StartDrain_Call { + _c.Call.Return() + return _c +} + +func (_c *MockSkyhookNode_StartDrain_Call) RunAndReturn(run func(startedAt v1.Time)) *MockSkyhookNode_StartDrain_Call { + _c.Run(run) + return _c +} + // State provides a mock function for the type MockSkyhookNode func (_mock *MockSkyhookNode) State() (v1alpha1.NodeState, error) { ret := _mock.Called() diff --git a/operator/internal/wrapper/mock/SkyhookNodeOnly.go b/operator/internal/wrapper/mock/SkyhookNodeOnly.go index 0e08d5ed..fffcbbd1 100644 --- a/operator/internal/wrapper/mock/SkyhookNodeOnly.go +++ b/operator/internal/wrapper/mock/SkyhookNodeOnly.go @@ -26,7 +26,8 @@ import ( "github.com/NVIDIA/nodewright/operator/api/v1alpha1" "github.com/go-logr/logr" mock "github.com/stretchr/testify/mock" - "k8s.io/api/core/v1" + v10 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1" ) // NewMockSkyhookNodeOnly creates a new instance of MockSkyhookNodeOnly. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. @@ -100,6 +101,39 @@ func (_c *MockSkyhookNodeOnly_Changed_Call) RunAndReturn(run func() bool) *MockS return _c } +// ClearDrainStart provides a mock function for the type MockSkyhookNodeOnly +func (_mock *MockSkyhookNodeOnly) ClearDrainStart() { + _mock.Called() + return +} + +// MockSkyhookNodeOnly_ClearDrainStart_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ClearDrainStart' +type MockSkyhookNodeOnly_ClearDrainStart_Call struct { + *mock.Call +} + +// ClearDrainStart is a helper method to define mock.On call +func (_e *MockSkyhookNodeOnly_Expecter) ClearDrainStart() *MockSkyhookNodeOnly_ClearDrainStart_Call { + return &MockSkyhookNodeOnly_ClearDrainStart_Call{Call: _e.mock.On("ClearDrainStart")} +} + +func (_c *MockSkyhookNodeOnly_ClearDrainStart_Call) Run(run func()) *MockSkyhookNodeOnly_ClearDrainStart_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockSkyhookNodeOnly_ClearDrainStart_Call) Return() *MockSkyhookNodeOnly_ClearDrainStart_Call { + _c.Call.Return() + return _c +} + +func (_c *MockSkyhookNodeOnly_ClearDrainStart_Call) RunAndReturn(run func()) *MockSkyhookNodeOnly_ClearDrainStart_Call { + _c.Run(run) + return _c +} + // Cordon provides a mock function for the type MockSkyhookNodeOnly func (_mock *MockSkyhookNodeOnly) Cordon() { _mock.Called() @@ -133,20 +167,75 @@ func (_c *MockSkyhookNodeOnly_Cordon_Call) RunAndReturn(run func()) *MockSkyhook return _c } +// DrainStartedAt provides a mock function for the type MockSkyhookNodeOnly +func (_mock *MockSkyhookNodeOnly) DrainStartedAt() (*v1.Time, error) { + ret := _mock.Called() + + if len(ret) == 0 { + panic("no return value specified for DrainStartedAt") + } + + var r0 *v1.Time + var r1 error + if returnFunc, ok := ret.Get(0).(func() (*v1.Time, error)); ok { + return returnFunc() + } + if returnFunc, ok := ret.Get(0).(func() *v1.Time); ok { + r0 = returnFunc() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*v1.Time) + } + } + if returnFunc, ok := ret.Get(1).(func() error); ok { + r1 = returnFunc() + } else { + r1 = ret.Error(1) + } + return r0, r1 +} + +// MockSkyhookNodeOnly_DrainStartedAt_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DrainStartedAt' +type MockSkyhookNodeOnly_DrainStartedAt_Call struct { + *mock.Call +} + +// DrainStartedAt is a helper method to define mock.On call +func (_e *MockSkyhookNodeOnly_Expecter) DrainStartedAt() *MockSkyhookNodeOnly_DrainStartedAt_Call { + return &MockSkyhookNodeOnly_DrainStartedAt_Call{Call: _e.mock.On("DrainStartedAt")} +} + +func (_c *MockSkyhookNodeOnly_DrainStartedAt_Call) Run(run func()) *MockSkyhookNodeOnly_DrainStartedAt_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockSkyhookNodeOnly_DrainStartedAt_Call) Return(time *v1.Time, err error) *MockSkyhookNodeOnly_DrainStartedAt_Call { + _c.Call.Return(time, err) + return _c +} + +func (_c *MockSkyhookNodeOnly_DrainStartedAt_Call) RunAndReturn(run func() (*v1.Time, error)) *MockSkyhookNodeOnly_DrainStartedAt_Call { + _c.Call.Return(run) + return _c +} + // GetNode provides a mock function for the type MockSkyhookNodeOnly -func (_mock *MockSkyhookNodeOnly) GetNode() *v1.Node { +func (_mock *MockSkyhookNodeOnly) GetNode() *v10.Node { ret := _mock.Called() if len(ret) == 0 { panic("no return value specified for GetNode") } - var r0 *v1.Node - if returnFunc, ok := ret.Get(0).(func() *v1.Node); ok { + var r0 *v10.Node + if returnFunc, ok := ret.Get(0).(func() *v10.Node); ok { r0 = returnFunc() } else { if ret.Get(0) != nil { - r0 = ret.Get(0).(*v1.Node) + r0 = ret.Get(0).(*v10.Node) } } return r0 @@ -169,12 +258,12 @@ func (_c *MockSkyhookNodeOnly_GetNode_Call) Run(run func()) *MockSkyhookNodeOnly return _c } -func (_c *MockSkyhookNodeOnly_GetNode_Call) Return(node *v1.Node) *MockSkyhookNodeOnly_GetNode_Call { +func (_c *MockSkyhookNodeOnly_GetNode_Call) Return(node *v10.Node) *MockSkyhookNodeOnly_GetNode_Call { _c.Call.Return(node) return _c } -func (_c *MockSkyhookNodeOnly_GetNode_Call) RunAndReturn(run func() *v1.Node) *MockSkyhookNodeOnly_GetNode_Call { +func (_c *MockSkyhookNodeOnly_GetNode_Call) RunAndReturn(run func() *v10.Node) *MockSkyhookNodeOnly_GetNode_Call { _c.Call.Return(run) return _c } @@ -584,6 +673,46 @@ func (_c *MockSkyhookNodeOnly_SetVersion_Call) RunAndReturn(run func()) *MockSky return _c } +// StartDrain provides a mock function for the type MockSkyhookNodeOnly +func (_mock *MockSkyhookNodeOnly) StartDrain(startedAt v1.Time) { + _mock.Called(startedAt) + return +} + +// MockSkyhookNodeOnly_StartDrain_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'StartDrain' +type MockSkyhookNodeOnly_StartDrain_Call struct { + *mock.Call +} + +// StartDrain is a helper method to define mock.On call +// - startedAt v1.Time +func (_e *MockSkyhookNodeOnly_Expecter) StartDrain(startedAt interface{}) *MockSkyhookNodeOnly_StartDrain_Call { + return &MockSkyhookNodeOnly_StartDrain_Call{Call: _e.mock.On("StartDrain", startedAt)} +} + +func (_c *MockSkyhookNodeOnly_StartDrain_Call) Run(run func(startedAt v1.Time)) *MockSkyhookNodeOnly_StartDrain_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 v1.Time + if args[0] != nil { + arg0 = args[0].(v1.Time) + } + run( + arg0, + ) + }) + return _c +} + +func (_c *MockSkyhookNodeOnly_StartDrain_Call) Return() *MockSkyhookNodeOnly_StartDrain_Call { + _c.Call.Return() + return _c +} + +func (_c *MockSkyhookNodeOnly_StartDrain_Call) RunAndReturn(run func(startedAt v1.Time)) *MockSkyhookNodeOnly_StartDrain_Call { + _c.Run(run) + return _c +} + // State provides a mock function for the type MockSkyhookNodeOnly func (_mock *MockSkyhookNodeOnly) State() (v1alpha1.NodeState, error) { ret := _mock.Called() diff --git a/operator/internal/wrapper/node.go b/operator/internal/wrapper/node.go index 3fa33fd5..a97cd5fb 100644 --- a/operator/internal/wrapper/node.go +++ b/operator/internal/wrapper/node.go @@ -23,6 +23,7 @@ import ( "fmt" "sort" "strings" + "time" "github.com/NVIDIA/nodewright/operator/api/v1alpha1" "github.com/NVIDIA/nodewright/operator/internal/graph" @@ -98,6 +99,12 @@ type SkyhookNodeOnly interface { RemoveTaint(key string) // Cordon marks the node unschedulable and records the cordon in annotations for this Skyhook. Cordon() + // StartDrain records when draining started for this Skyhook on this node. + StartDrain(startedAt metav1.Time) + // DrainStartedAt returns when draining started for this Skyhook on this node. + DrainStartedAt() (*metav1.Time, error) + // ClearDrainStart removes the drain start marker for this Skyhook on this node. + ClearDrainStart() // Uncordon marks the node schedulable and removes this Skyhook's cordon annotation if present. Uncordon() // Reset clears Skyhook-related state and annotations so the node can be reconfigured from scratch. @@ -166,6 +173,10 @@ type skyhookNode struct { updated bool } +func (node *skyhookNode) drainStartAnnotationKey() string { + return fmt.Sprintf("%s/drainStart_%s", v1alpha1.METADATA_PREFIX, node.skyhookName) +} + // GetSkyhook returns the Skyhook associated with this node, or nil if only a name was set. func (node *skyhookNode) GetSkyhook() *Skyhook { return node.skyhook @@ -463,12 +474,65 @@ func (node *skyhookNode) HasSkyhookAnnotations() bool { func (node *skyhookNode) Cordon() { _, ok := node.Annotations[fmt.Sprintf("%s/cordon_%s", v1alpha1.METADATA_PREFIX, node.skyhookName)] if !node.Spec.Unschedulable || !ok { + if node.Annotations == nil { + node.Annotations = make(map[string]string) + } node.Spec.Unschedulable = true node.Annotations[fmt.Sprintf("%s/cordon_%s", v1alpha1.METADATA_PREFIX, node.skyhookName)] = "true" node.updated = true } } +// StartDrain records when draining started for this Skyhook on this node. +func (node *skyhookNode) StartDrain(startedAt metav1.Time) { + if node.Annotations == nil { + node.Annotations = make(map[string]string) + } + + key := node.drainStartAnnotationKey() + if _, ok := node.Annotations[key]; ok { + return + } + + node.Annotations[key] = startedAt.Time.Format(time.RFC3339Nano) + node.updated = true +} + +// DrainStartedAt returns when draining started for this Skyhook on this node. +func (node *skyhookNode) DrainStartedAt() (*metav1.Time, error) { + if node.Annotations == nil { + return nil, nil + } + + value, ok := node.Annotations[node.drainStartAnnotationKey()] + if !ok { + return nil, nil + } + + parsed, err := time.Parse(time.RFC3339Nano, value) + if err != nil { + return nil, fmt.Errorf("error parsing drain start annotation: %w", err) + } + + startedAt := metav1.NewTime(parsed) + return &startedAt, nil +} + +// ClearDrainStart removes the drain start marker for this Skyhook on this node. +func (node *skyhookNode) ClearDrainStart() { + if node.Annotations == nil { + return + } + + key := node.drainStartAnnotationKey() + if _, ok := node.Annotations[key]; !ok { + return + } + + delete(node.Annotations, key) + node.updated = true +} + // Uncordon marks the node schedulable and removes this Skyhook's cordon annotation if present. func (node *skyhookNode) Uncordon() { @@ -489,9 +553,11 @@ func (node *skyhookNode) Reset() { node.skyhook.Status.Status = v1alpha1.StatusUnknown node.skyhook.Updated = true - delete(node.Annotations, fmt.Sprintf("%s/cordon_", v1alpha1.METADATA_PREFIX)) + delete(node.Annotations, fmt.Sprintf("%s/cordon_%s", v1alpha1.METADATA_PREFIX, node.skyhook.Name)) + delete(node.Annotations, fmt.Sprintf("%s/drainStart_%s", v1alpha1.METADATA_PREFIX, node.skyhook.Name)) delete(node.Annotations, fmt.Sprintf("%s/nodeState_%s", v1alpha1.METADATA_PREFIX, node.skyhook.Name)) delete(node.Annotations, fmt.Sprintf("%s/status_%s", v1alpha1.METADATA_PREFIX, node.skyhook.Name)) + delete(node.Annotations, fmt.Sprintf("%s/version_%s", v1alpha1.METADATA_PREFIX, node.skyhook.Name)) delete(node.Labels, fmt.Sprintf("%s/status_%s", v1alpha1.METADATA_PREFIX, node.skyhook.Name)) node.updated = true diff --git a/operator/internal/wrapper/node_test.go b/operator/internal/wrapper/node_test.go index 7a1d2476..0cca906d 100644 --- a/operator/internal/wrapper/node_test.go +++ b/operator/internal/wrapper/node_test.go @@ -19,6 +19,8 @@ package wrapper import ( + "time" + "github.com/NVIDIA/nodewright/operator/api/v1alpha1" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" @@ -178,6 +180,99 @@ var _ = Describe("SkyhookNode", func() { }) }) + Context("DrainStart", func() { + It("should record, read, and clear the drain start annotation", func() { + node := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node", + }, + } + + sn, err := NewSkyhookNode(node, &v1alpha1.Skyhook{ + ObjectMeta: metav1.ObjectMeta{Name: "my-skyhook"}, + Spec: v1alpha1.SkyhookSpec{Packages: v1alpha1.Packages{}}, + }) + Expect(err).ToNot(HaveOccurred()) + + started := metav1.NewTime(time.Date(2026, time.June, 2, 13, 14, 15, 0, time.UTC)) + sn.StartDrain(started) + + Expect(node.Annotations).To(HaveKeyWithValue("skyhook.nvidia.com/drainStart_my-skyhook", "2026-06-02T13:14:15Z")) + Expect(sn.Changed()).To(BeTrue()) + + drainStartedAt, err := sn.DrainStartedAt() + Expect(err).ToNot(HaveOccurred()) + Expect(drainStartedAt).To(Equal(&started)) + + sn.ClearDrainStart() + Expect(node.Annotations).ToNot(HaveKey("skyhook.nvidia.com/drainStart_my-skyhook")) + }) + + It("should return an error for a malformed drain start annotation", func() { + node := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node", + Annotations: map[string]string{ + "skyhook.nvidia.com/drainStart_my-skyhook": "not-a-time", + }, + }, + } + + sn, err := NewSkyhookNode(node, &v1alpha1.Skyhook{ + ObjectMeta: metav1.ObjectMeta{Name: "my-skyhook"}, + Spec: v1alpha1.SkyhookSpec{Packages: v1alpha1.Packages{}}, + }) + Expect(err).ToNot(HaveOccurred()) + + drainStartedAt, err := sn.DrainStartedAt() + Expect(err).To(HaveOccurred()) + Expect(drainStartedAt).To(BeNil()) + }) + }) + + Context("Reset", func() { + It("should remove drain start and other node metadata for the skyhook", func() { + node := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node", + Annotations: map[string]string{ + "skyhook.nvidia.com/cordon_my-skyhook": "true", + "skyhook.nvidia.com/drainStart_my-skyhook": "2026-06-02T13:14:15Z", + "skyhook.nvidia.com/nodeState_my-skyhook": "{}", + "skyhook.nvidia.com/status_my-skyhook": "erroring", + "skyhook.nvidia.com/version_my-skyhook": "1.0.0", + }, + Labels: map[string]string{ + "skyhook.nvidia.com/status_my-skyhook": "erroring", + }, + }, + } + skyhook := &v1alpha1.Skyhook{ + ObjectMeta: metav1.ObjectMeta{Name: "my-skyhook"}, + Spec: v1alpha1.SkyhookSpec{Packages: v1alpha1.Packages{}}, + Status: v1alpha1.SkyhookStatus{ + NodeState: map[string]v1alpha1.NodeState{"test-node": {}}, + NodeStatus: map[string]v1alpha1.Status{"test-node": v1alpha1.StatusErroring}, + }, + } + + sn, err := NewSkyhookNode(node, skyhook) + Expect(err).ToNot(HaveOccurred()) + + sn.Reset() + + Expect(node.Annotations).ToNot(HaveKey("skyhook.nvidia.com/cordon_my-skyhook")) + Expect(node.Annotations).ToNot(HaveKey("skyhook.nvidia.com/drainStart_my-skyhook")) + Expect(node.Annotations).ToNot(HaveKey("skyhook.nvidia.com/nodeState_my-skyhook")) + Expect(node.Annotations).ToNot(HaveKey("skyhook.nvidia.com/status_my-skyhook")) + Expect(node.Annotations).ToNot(HaveKey("skyhook.nvidia.com/version_my-skyhook")) + Expect(node.Labels).ToNot(HaveKey("skyhook.nvidia.com/status_my-skyhook")) + Expect(skyhook.Status.NodeState).ToNot(HaveKey("test-node")) + Expect(skyhook.Status.NodeStatus).ToNot(HaveKey("test-node")) + Expect(skyhook.Status.Status).To(Equal(v1alpha1.StatusUnknown)) + }) + }) + Context("CleanupSCRMetadata", func() { It("should remove matching keys and preserve others", func() { node := &corev1.Node{ From 8a174cc884029f0e9a5995f351d631ab8950a32e Mon Sep 17 00:00:00 2001 From: AnouarMohamed Date: Fri, 5 Jun 2026 16:01:52 +0100 Subject: [PATCH 2/4] fix(operator): address drain config review feedback Signed-off-by: AnouarMohamed --- chart/templates/skyhook-crd.yaml | 1 + .../skyhook/drain-config/chainsaw-test.yaml | 2 ++ .../disable-eviction-skyhook.yaml | 27 +++++++++++++++++++ .../drain-config/disable-eviction.yaml | 25 ----------------- .../skyhook/drain-config/timeout.yaml | 3 +++ operator/api/v1alpha1/skyhook_types.go | 3 ++- .../api/v1alpha1/zz_generated.deepcopy.go | 5 ++++ .../bases/skyhook.nvidia.com_skyhooks.yaml | 1 + .../internal/controller/skyhook_controller.go | 13 ++++++--- .../controller/skyhook_controller_test.go | 4 +-- 10 files changed, 52 insertions(+), 32 deletions(-) create mode 100644 k8s-tests/chainsaw/skyhook/drain-config/disable-eviction-skyhook.yaml diff --git a/chart/templates/skyhook-crd.yaml b/chart/templates/skyhook-crd.yaml index 42cf8b53..647c15e1 100644 --- a/chart/templates/skyhook-crd.yaml +++ b/chart/templates/skyhook-crd.yaml @@ -150,6 +150,7 @@ spec: description: |- DisableEviction bypasses the eviction API and deletes pods directly. This bypasses PodDisruptionBudgets. + nullable: true type: boolean force: default: true diff --git a/k8s-tests/chainsaw/skyhook/drain-config/chainsaw-test.yaml b/k8s-tests/chainsaw/skyhook/drain-config/chainsaw-test.yaml index 982fc981..db0ff1c3 100644 --- a/k8s-tests/chainsaw/skyhook/drain-config/chainsaw-test.yaml +++ b/k8s-tests/chainsaw/skyhook/drain-config/chainsaw-test.yaml @@ -70,6 +70,8 @@ spec: kubectl -n skyhook wait --for=condition=Ready "pod/${pod}" --timeout=30s kubectl -n skyhook create configmap drain-config-original-pod --from-literal=name="${pod}" --dry-run=client -o yaml | kubectl apply -f - kubectl -n skyhook wait --for=jsonpath='{.status.disruptionsAllowed}'=0 pdb/drain-config-pdb --timeout=60s + - apply: + file: disable-eviction-skyhook.yaml - script: content: | kubectl -n skyhook wait --for=condition=Ready skyhook/drain-config-disable-eviction --timeout=180s diff --git a/k8s-tests/chainsaw/skyhook/drain-config/disable-eviction-skyhook.yaml b/k8s-tests/chainsaw/skyhook/drain-config/disable-eviction-skyhook.yaml new file mode 100644 index 00000000..8af64149 --- /dev/null +++ b/k8s-tests/chainsaw/skyhook/drain-config/disable-eviction-skyhook.yaml @@ -0,0 +1,27 @@ +apiVersion: skyhook.nvidia.com/v1alpha1 +kind: Skyhook +metadata: + name: drain-config-disable-eviction + namespace: skyhook +spec: + drainConfig: + disableEviction: true + gracePeriod: 0s + nodeSelectors: + matchLabels: + skyhook.nvidia.com/test-node: skyhooke2e + interruptionBudget: + count: 1 + packages: + drain-pdb: + version: "1.2.3" + image: ghcr.io/nvidia/skyhook/agentless + uninstall: + enabled: false + apply: false + interrupt: + type: service + services: [rsyslog] + env: + - name: SLEEP_LEN + value: "1" diff --git a/k8s-tests/chainsaw/skyhook/drain-config/disable-eviction.yaml b/k8s-tests/chainsaw/skyhook/drain-config/disable-eviction.yaml index c16e5fe6..a058d00f 100644 --- a/k8s-tests/chainsaw/skyhook/drain-config/disable-eviction.yaml +++ b/k8s-tests/chainsaw/skyhook/drain-config/disable-eviction.yaml @@ -49,28 +49,3 @@ spec: selector: matchLabels: app: drain-config-pdb ---- -apiVersion: skyhook.nvidia.com/v1alpha1 -kind: Skyhook -metadata: - name: drain-config-disable-eviction - namespace: skyhook -spec: - drainConfig: - disableEviction: true - gracePeriod: 0s - nodeSelectors: - matchLabels: - skyhook.nvidia.com/test-node: skyhooke2e - interruptionBudget: - count: 1 - packages: - drain-pdb: - version: "1.2.3" - image: ghcr.io/nvidia/skyhook/agentless - interrupt: - type: service - services: [rsyslog] - env: - - name: SLEEP_LEN - value: "1" diff --git a/k8s-tests/chainsaw/skyhook/drain-config/timeout.yaml b/k8s-tests/chainsaw/skyhook/drain-config/timeout.yaml index bb00686e..94f801ed 100644 --- a/k8s-tests/chainsaw/skyhook/drain-config/timeout.yaml +++ b/k8s-tests/chainsaw/skyhook/drain-config/timeout.yaml @@ -51,6 +51,9 @@ spec: drain-timeout: version: "1.2.3" image: ghcr.io/nvidia/skyhook/agentless + uninstall: + enabled: false + apply: false interrupt: type: service services: [rsyslog] diff --git a/operator/api/v1alpha1/skyhook_types.go b/operator/api/v1alpha1/skyhook_types.go index 7272854c..3ef5c53a 100644 --- a/operator/api/v1alpha1/skyhook_types.go +++ b/operator/api/v1alpha1/skyhook_types.go @@ -204,7 +204,8 @@ type DrainConfig struct { // This bypasses PodDisruptionBudgets. // +optional //+kubebuilder:default=false - DisableEviction bool `json:"disableEviction,omitempty"` + //+nullable + DisableEviction *bool `json:"disableEviction,omitempty"` // DeleteEmptyDirData allows draining pods that use emptyDir volumes. // Defaults to true to preserve the operator's existing behavior. diff --git a/operator/api/v1alpha1/zz_generated.deepcopy.go b/operator/api/v1alpha1/zz_generated.deepcopy.go index f90477a6..d131f605 100644 --- a/operator/api/v1alpha1/zz_generated.deepcopy.go +++ b/operator/api/v1alpha1/zz_generated.deepcopy.go @@ -249,6 +249,11 @@ func (in *DeploymentStrategy) DeepCopy() *DeploymentStrategy { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *DrainConfig) DeepCopyInto(out *DrainConfig) { *out = *in + if in.DisableEviction != nil { + in, out := &in.DisableEviction, &out.DisableEviction + *out = new(bool) + **out = **in + } if in.DeleteEmptyDirData != nil { in, out := &in.DeleteEmptyDirData, &out.DeleteEmptyDirData *out = new(bool) diff --git a/operator/config/crd/bases/skyhook.nvidia.com_skyhooks.yaml b/operator/config/crd/bases/skyhook.nvidia.com_skyhooks.yaml index e57d513e..4337662d 100644 --- a/operator/config/crd/bases/skyhook.nvidia.com_skyhooks.yaml +++ b/operator/config/crd/bases/skyhook.nvidia.com_skyhooks.yaml @@ -155,6 +155,7 @@ spec: description: |- DisableEviction bypasses the eviction API and deletes pods directly. This bypasses PodDisruptionBudgets. + nullable: true type: boolean force: default: true diff --git a/operator/internal/controller/skyhook_controller.go b/operator/internal/controller/skyhook_controller.go index 4e219b68..d9e096c2 100644 --- a/operator/internal/controller/skyhook_controller.go +++ b/operator/internal/controller/skyhook_controller.go @@ -1252,7 +1252,8 @@ func (r *SkyhookReconciler) HandleConfigUpdates(ctx context.Context, clusterStat for _, node := range skyhook.GetNodes() { exists, err := r.PodExists(ctx, node.GetNode().Name, skyhook.GetSkyhook().Name, &_package) if err != nil { - return false, false, err + return false, false, fmt.Errorf("checking package pod existence on node %s for package %s: %w", + node.GetNode().Name, _package.GetUniqueName(), err) } if !exists && node.IsPackageComplete(_package) { @@ -1279,14 +1280,16 @@ func (r *SkyhookReconciler) HandleConfigUpdates(ctx context.Context, clusterStat }, ) if err != nil { - return false, false, err + return false, false, fmt.Errorf("listing package pods on node %s for package %s: %w", + node.GetNode().Name, _package.GetUniqueName(), err) } if pods != nil { for _, pod := range pods.Items { err := r.Delete(ctx, &pod) if err != nil { - return false, false, err + return false, false, fmt.Errorf("deleting erroring pod %s/%s on node %s: %w", + pod.Namespace, pod.Name, node.GetNode().Name, err) } } } @@ -1477,7 +1480,9 @@ func resolvedDrainOptions(config *v1alpha1.DrainConfig) drain.Options { return options } - options.DisableEviction = config.DisableEviction + if config.DisableEviction != nil { + options.DisableEviction = *config.DisableEviction + } if config.DeleteEmptyDirData != nil { options.DeleteEmptyDirData = *config.DeleteEmptyDirData } diff --git a/operator/internal/controller/skyhook_controller_test.go b/operator/internal/controller/skyhook_controller_test.go index d7ef99d7..d0dce54c 100644 --- a/operator/internal/controller/skyhook_controller_test.go +++ b/operator/internal/controller/skyhook_controller_test.go @@ -581,7 +581,7 @@ var _ = Describe("skyhook controller tests", func() { ObjectMeta: metav1.ObjectMeta{Name: "drain-delete"}, Spec: v1alpha1.SkyhookSpec{ DrainConfig: &v1alpha1.DrainConfig{ - DisableEviction: true, + DisableEviction: ptr(true), GracePeriod: &metav1.Duration{Duration: 7 * time.Second}, }, Packages: v1alpha1.Packages{}, @@ -722,7 +722,7 @@ var _ = Describe("skyhook controller tests", func() { }, }, DrainConfig: &v1alpha1.DrainConfig{ - DisableEviction: true, + DisableEviction: ptr(true), }, Packages: v1alpha1.Packages{}, }, From e5ad3d5e7fbfaa0584917522356911e90bf1c816 Mon Sep 17 00:00:00 2001 From: AnouarMohamed Date: Fri, 5 Jun 2026 20:55:15 +0100 Subject: [PATCH 3/4] fix(operator): address drain config review feedback Signed-off-by: AnouarMohamed --- docs/interrupt_flow.md | 33 +++++++ .../disable-eviction-skyhook.yaml | 16 ++++ .../skyhook/drain-config/timeout-assert.yaml | 3 +- operator/CHANGELOG.md | 15 ++- .../api/v1alpha1/zz_generated.deepcopy.go | 4 +- operator/cmd/cli/CHANGELOG.md | 7 ++ .../internal/controller/skyhook_controller.go | 60 +----------- operator/internal/drain/drain.go | 54 +++++++++++ operator/internal/drain/drain_test.go | 96 +++++++++++++++++++ 9 files changed, 228 insertions(+), 60 deletions(-) diff --git a/docs/interrupt_flow.md b/docs/interrupt_flow.md index 776034b3..0ba384b6 100644 --- a/docs/interrupt_flow.md +++ b/docs/interrupt_flow.md @@ -114,9 +114,42 @@ The operator also skips pods that are already terminating, pods that tolerate the `node.kubernetes.io/unschedulable` taint, mirror/static pods, and pods in `kube-system`. These exclusions are not user-configurable. +Compared to earlier releases, the default drain filter now follows Kubernetes +matching more closely: the unschedulable toleration check uses Kubernetes +`ToleratesTaint` semantics, DaemonSet pods are identified from the controller +owner reference, and already-terminating or mirror/static pods are ignored. + `podNonInterruptLabels` remains a pre-drain barrier. Matching pods must finish or move away before the operator starts the configurable drain step. +### Recovering From a Drain Timeout + +When `spec.drainConfig.timeout` expires, the operator records a `DrainTimeout` +warning event, marks the node and Skyhook `erroring`, and leaves the node +cordoned. The operator stops issuing further evict/delete actions while the +blocking condition remains, so package stages do not proceed on that node. + +To recover, remove the underlying blocker first, such as a PDB with zero allowed +disruptions, an unmanaged pod when `force: false`, or an `emptyDir` pod when +`deleteEmptyDirData: false`. Then reset the failed rollout metadata: + +```bash +kubectl skyhook reset --confirm +``` + +For a single node, use: + +```bash +kubectl skyhook node reset --skyhook --confirm +``` + +If the blocker clears after the timeout without a reset, a later reconcile can +observe the node as drained and continue from current cluster state. Reset is +still the recommended recovery workflow in production because it explicitly +clears the `erroring` status, drain-start metadata, cordon metadata, and batch +state before retrying. If the blocker is still present after reset, the drain +will time out again. + ## Best Practices - Always test interrupt-enabled packages in non-production environments first diff --git a/k8s-tests/chainsaw/skyhook/drain-config/disable-eviction-skyhook.yaml b/k8s-tests/chainsaw/skyhook/drain-config/disable-eviction-skyhook.yaml index 8af64149..aa497e72 100644 --- a/k8s-tests/chainsaw/skyhook/drain-config/disable-eviction-skyhook.yaml +++ b/k8s-tests/chainsaw/skyhook/drain-config/disable-eviction-skyhook.yaml @@ -1,3 +1,19 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + apiVersion: skyhook.nvidia.com/v1alpha1 kind: Skyhook metadata: diff --git a/k8s-tests/chainsaw/skyhook/drain-config/timeout-assert.yaml b/k8s-tests/chainsaw/skyhook/drain-config/timeout-assert.yaml index 2772feb0..8851414c 100644 --- a/k8s-tests/chainsaw/skyhook/drain-config/timeout-assert.yaml +++ b/k8s-tests/chainsaw/skyhook/drain-config/timeout-assert.yaml @@ -22,7 +22,7 @@ metadata: skyhook.nvidia.com/status_drain-config-timeout: erroring annotations: skyhook.nvidia.com/status_drain-config-timeout: erroring - (contains(@, 'skyhook.nvidia.com/drainStart_drain-config-timeout')): true + (contains(keys(@), 'skyhook.nvidia.com/drainStart_drain-config-timeout')): true spec: taints: - effect: NoSchedule @@ -32,7 +32,6 @@ apiVersion: skyhook.nvidia.com/v1alpha1 kind: Skyhook metadata: name: drain-config-timeout - namespace: skyhook status: status: erroring nodeStatus: diff --git a/operator/CHANGELOG.md b/operator/CHANGELOG.md index ddff7236..9a558d64 100644 --- a/operator/CHANGELOG.md +++ b/operator/CHANGELOG.md @@ -2,6 +2,20 @@ All notable changes to this project will be documented in this file. +## [Unreleased] + +### New Features + +- Add `spec.drainConfig` so interrupt drains can tune eviction, direct deletion, emptyDir handling, unmanaged-pod handling, DaemonSet skipping, timeout, and grace-period behavior. + +### Changed + +- Align the default interrupt drain pod filter with Kubernetes drain semantics: + already-terminating pods and mirror/static pods are skipped, unschedulable + tolerations use Kubernetes `ToleratesTaint` matching, and DaemonSet pods are + identified from the controller owner reference instead of the previous owner + reference count heuristic. + ## [operator/v0.16.1] - 2026-05-22 ### Bug Fixes @@ -22,7 +36,6 @@ and CR deletion behave. Affects the Operator, Webhook, and CRD. ### New Features -- Add `spec.drainConfig` so interrupt drains can tune eviction, direct deletion, emptyDir handling, unmanaged-pod handling, DaemonSet skipping, timeout, and grace-period behavior. - Add a standard `Ready` condition to Skyhook status for native Kubernetes wait and GitOps health tooling. ### New behavior diff --git a/operator/api/v1alpha1/zz_generated.deepcopy.go b/operator/api/v1alpha1/zz_generated.deepcopy.go index d131f605..63b3296a 100644 --- a/operator/api/v1alpha1/zz_generated.deepcopy.go +++ b/operator/api/v1alpha1/zz_generated.deepcopy.go @@ -1,3 +1,5 @@ +//go:build !ignore_autogenerated + /* * SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. * SPDX-License-Identifier: Apache-2.0 @@ -16,8 +18,6 @@ * limitations under the License. */ -//go:build !ignore_autogenerated - // Code generated by controller-gen. DO NOT EDIT. package v1alpha1 diff --git a/operator/cmd/cli/CHANGELOG.md b/operator/cmd/cli/CHANGELOG.md index 2fb9112a..97b9e4db 100644 --- a/operator/cmd/cli/CHANGELOG.md +++ b/operator/cmd/cli/CHANGELOG.md @@ -10,6 +10,13 @@ All notable changes to this project will be documented in this file. - AutoTaintNewNodes - Add SKYHOOK_NODE_ORDER env var for monotonic node ordering +### Changed + +- `reset` and `node reset` now select nodes with any resettable Skyhook + metadata, including status, cordon, and drain-start metadata. Nodes with a + malformed `nodeState` annotation are reset with an empty package state instead + of being skipped. + ## [cli/v0.1.1] - 2026-01-13 ### Bug Fixes diff --git a/operator/internal/controller/skyhook_controller.go b/operator/internal/controller/skyhook_controller.go index d9e096c2..53c10502 100644 --- a/operator/internal/controller/skyhook_controller.go +++ b/operator/internal/controller/skyhook_controller.go @@ -1464,7 +1464,7 @@ func (r *SkyhookReconciler) IsDrained(ctx context.Context, skyhookNode wrapper.S return true, nil } - options := resolvedDrainOptions(skyhookNode.GetSkyhook().Spec.DrainConfig) + options := drain.OptionsFromConfig(skyhookNode.GetSkyhook().Spec.DrainConfig) for _, pod := range pods.Items { if drain.DecidePod(&pod, options).BlocksDrain() { return false, nil @@ -1474,56 +1474,6 @@ func (r *SkyhookReconciler) IsDrained(ctx context.Context, skyhookNode wrapper.S return true, nil } -func resolvedDrainOptions(config *v1alpha1.DrainConfig) drain.Options { - options := drain.DefaultOptions() - if config == nil { - return options - } - - if config.DisableEviction != nil { - options.DisableEviction = *config.DisableEviction - } - if config.DeleteEmptyDirData != nil { - options.DeleteEmptyDirData = *config.DeleteEmptyDirData - } - if config.Force != nil { - options.Force = *config.Force - } - if config.IgnoreDaemonSets != nil { - options.IgnoreDaemonSets = *config.IgnoreDaemonSets - } - if config.GracePeriod != nil { - seconds := int64(config.GracePeriod.Duration / time.Second) - if config.GracePeriod.Duration%time.Second != 0 { - seconds++ - } - options.GracePeriodSeconds = &seconds - } - - return options -} - -func drainDeleteOptions(options drain.Options) []client.DeleteOption { - if options.GracePeriodSeconds == nil { - return nil - } - return []client.DeleteOption{client.GracePeriodSeconds(*options.GracePeriodSeconds)} -} - -func drainEvictionDeleteOptions(options drain.Options) *metav1.DeleteOptions { - if options.GracePeriodSeconds == nil { - return nil - } - return &metav1.DeleteOptions{GracePeriodSeconds: options.GracePeriodSeconds} -} - -func drainTimedOut(startedAt *metav1.Time, timeout *metav1.Duration, now time.Time) bool { - if startedAt == nil || timeout == nil || timeout.Duration == 0 { - return false - } - return !now.Before(startedAt.Add(timeout.Duration)) -} - // HandleFinalizer returns true only if we container is deleted and we handled it completely, else false. // For Skyhooks with UninstallEnabled packages, this uses a multi-reconcile flow: // Phase 1: trigger uninstall for enabled packages, return false to requeue @@ -1787,7 +1737,7 @@ func (r *SkyhookReconciler) DrainNode(ctx context.Context, skyhookNode wrapper.S if drainStartedAt == nil { skyhookNode.StartDrain(now) skyhookNode.SetStatus(v1alpha1.StatusInProgress) - } else if drainConfig != nil && drainTimedOut(drainStartedAt, drainConfig.Timeout, now.Time) { + } else if drainConfig != nil && drain.TimedOut(drainStartedAt, drainConfig.Timeout, now.Time) { if skyhookNode.Status() != v1alpha1.StatusErroring { r.recorder.Eventf(skyhookNode.GetNode(), nil, corev1.EventTypeWarning, EventsReasonSkyhookDrain, "DrainTimeout", "drain timed out after [%s] for node [%s] package [%s:%s] from [skyhook:%s]", @@ -1828,7 +1778,7 @@ func (r *SkyhookReconciler) DrainNode(ctx context.Context, skyhookNode wrapper.S skyhookNode.GetSkyhook().Name, ) - options := resolvedDrainOptions(skyhookNode.GetSkyhook().Spec.DrainConfig) + options := drain.OptionsFromConfig(skyhookNode.GetSkyhook().Spec.DrainConfig) errs := make([]error, 0) waitingForPods := false for _, pod := range pods.Items { @@ -1838,14 +1788,14 @@ func (r *SkyhookReconciler) DrainNode(ctx context.Context, skyhookNode wrapper.S waitingForPods = true case drain.ActionEvict: waitingForPods = true - eviction := policyv1.Eviction{DeleteOptions: drainEvictionDeleteOptions(options)} + eviction := policyv1.Eviction{DeleteOptions: options.EvictionDeleteOptions()} err := r.Client.SubResource("eviction").Create(ctx, &pod, &eviction) if err != nil { errs = append(errs, fmt.Errorf("error evicting pod [%s:%s]: %w", pod.Namespace, pod.Name, err)) } case drain.ActionDelete: waitingForPods = true - err := r.Delete(ctx, &pod, drainDeleteOptions(options)...) + err := r.Delete(ctx, &pod, options.DeleteOptions()...) if err != nil { errs = append(errs, fmt.Errorf("error deleting pod [%s:%s]: %w", pod.Namespace, pod.Name, err)) } diff --git a/operator/internal/drain/drain.go b/operator/internal/drain/drain.go index 4a0e4078..72e2a223 100644 --- a/operator/internal/drain/drain.go +++ b/operator/internal/drain/drain.go @@ -19,9 +19,13 @@ package drain import ( + "time" + + "github.com/NVIDIA/nodewright/operator/api/v1alpha1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/client" ) const ( @@ -64,6 +68,56 @@ func DefaultOptions() Options { } } +func OptionsFromConfig(config *v1alpha1.DrainConfig) Options { + options := DefaultOptions() + if config == nil { + return options + } + + if config.DisableEviction != nil { + options.DisableEviction = *config.DisableEviction + } + if config.DeleteEmptyDirData != nil { + options.DeleteEmptyDirData = *config.DeleteEmptyDirData + } + if config.Force != nil { + options.Force = *config.Force + } + if config.IgnoreDaemonSets != nil { + options.IgnoreDaemonSets = *config.IgnoreDaemonSets + } + if config.GracePeriod != nil { + seconds := int64(config.GracePeriod.Duration / time.Second) + if config.GracePeriod.Duration%time.Second != 0 { + seconds++ + } + options.GracePeriodSeconds = &seconds + } + + return options +} + +func TimedOut(startedAt *metav1.Time, timeout *metav1.Duration, now time.Time) bool { + if startedAt == nil || timeout == nil || timeout.Duration == 0 { + return false + } + return !now.Before(startedAt.Add(timeout.Duration)) +} + +func (o Options) DeleteOptions() []client.DeleteOption { + if o.GracePeriodSeconds == nil { + return nil + } + return []client.DeleteOption{client.GracePeriodSeconds(*o.GracePeriodSeconds)} +} + +func (o Options) EvictionDeleteOptions() *metav1.DeleteOptions { + if o.GracePeriodSeconds == nil { + return nil + } + return &metav1.DeleteOptions{GracePeriodSeconds: o.GracePeriodSeconds} +} + type Decision struct { Action Action Reason string diff --git a/operator/internal/drain/drain_test.go b/operator/internal/drain/drain_test.go index ab33dbdd..c9f05f1b 100644 --- a/operator/internal/drain/drain_test.go +++ b/operator/internal/drain/drain_test.go @@ -21,12 +21,26 @@ package drain import ( "time" + "github.com/NVIDIA/nodewright/operator/api/v1alpha1" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" ) +func boolPtr(value bool) *bool { + return &value +} + +func durationPtr(value time.Duration) *metav1.Duration { + return &metav1.Duration{Duration: value} +} + +func int64Ptr(value int64) *int64 { + return &value +} + var _ = Describe("DecidePod", func() { controller := true const daemonSetKind = "DaemonSet" @@ -230,3 +244,85 @@ var _ = Describe("DecidePod", func() { Expect(decision.RequiresAction()).To(BeFalse()) }) }) + +var _ = Describe("OptionsFromConfig", func() { + It("should return defaults for nil config", func() { + options := OptionsFromConfig(nil) + + Expect(options).To(Equal(DefaultOptions())) + }) + + It("should apply configured field overrides", func() { + options := OptionsFromConfig(&v1alpha1.DrainConfig{ + DisableEviction: boolPtr(true), + DeleteEmptyDirData: boolPtr(false), + Force: boolPtr(false), + IgnoreDaemonSets: boolPtr(false), + GracePeriod: durationPtr(2 * time.Second), + }) + + Expect(options.DisableEviction).To(BeTrue()) + Expect(options.DeleteEmptyDirData).To(BeFalse()) + Expect(options.Force).To(BeFalse()) + Expect(options.IgnoreDaemonSets).To(BeFalse()) + Expect(options.GracePeriodSeconds).To(Equal(int64Ptr(2))) + }) + + DescribeTable("should round grace period up to whole seconds", + func(gracePeriod time.Duration, expectedSeconds int64) { + options := OptionsFromConfig(&v1alpha1.DrainConfig{ + GracePeriod: durationPtr(gracePeriod), + }) + + Expect(options.GracePeriodSeconds).To(Equal(int64Ptr(expectedSeconds))) + }, + Entry("zero seconds", 0*time.Second, int64(0)), + Entry("sub-second", 500*time.Millisecond, int64(1)), + Entry("whole second", 1*time.Second, int64(1)), + Entry("partial second", 1500*time.Millisecond, int64(2)), + ) +}) + +var _ = Describe("TimedOut", func() { + start := metav1.NewTime(time.Date(2026, time.June, 5, 12, 0, 0, 0, time.UTC)) + timeout := metav1.Duration{Duration: 5 * time.Second} + zeroTimeout := metav1.Duration{} + + DescribeTable("should evaluate timeout boundaries", + func(startedAt *metav1.Time, timeout *metav1.Duration, now time.Time, expected bool) { + Expect(TimedOut(startedAt, timeout, now)).To(Equal(expected)) + }, + Entry("nil start", nil, &timeout, start.Add(6*time.Second), false), + Entry("nil timeout", &start, nil, start.Add(6*time.Second), false), + Entry("zero timeout", &start, &zeroTimeout, start.Add(6*time.Second), false), + Entry("just before deadline", &start, &timeout, start.Add(5*time.Second-time.Nanosecond), false), + Entry("exactly at deadline", &start, &timeout, start.Add(5*time.Second), true), + Entry("after deadline", &start, &timeout, start.Add(6*time.Second), true), + ) +}) + +var _ = Describe("delete options", func() { + It("should omit delete options when grace period is unset", func() { + options := DefaultOptions() + + Expect(options.DeleteOptions()).To(BeNil()) + Expect(options.EvictionDeleteOptions()).To(BeNil()) + }) + + It("should carry configured grace period into delete options", func() { + options := Options{GracePeriodSeconds: int64Ptr(7)} + + deleteOptions := options.DeleteOptions() + Expect(deleteOptions).To(HaveLen(1)) + + applied := &client.DeleteOptions{} + for _, option := range deleteOptions { + option.ApplyToDelete(applied) + } + Expect(applied.GracePeriodSeconds).To(Equal(int64Ptr(7))) + + evictionDeleteOptions := options.EvictionDeleteOptions() + Expect(evictionDeleteOptions).NotTo(BeNil()) + Expect(evictionDeleteOptions.GracePeriodSeconds).To(Equal(int64Ptr(7))) + }) +}) From 2671b54e3142ed7ee49489eb428f36f9efc76357 Mon Sep 17 00:00:00 2001 From: AnouarMohamed Date: Fri, 5 Jun 2026 22:07:25 +0100 Subject: [PATCH 4/4] fix(operator): always emit drain timeout event Signed-off-by: AnouarMohamed --- .../internal/controller/skyhook_controller.go | 32 +++++---- .../controller/skyhook_controller_test.go | 65 +++++++++++++++++++ 2 files changed, 80 insertions(+), 17 deletions(-) diff --git a/operator/internal/controller/skyhook_controller.go b/operator/internal/controller/skyhook_controller.go index 5d8ff290..dd0f72c6 100644 --- a/operator/internal/controller/skyhook_controller.go +++ b/operator/internal/controller/skyhook_controller.go @@ -1747,23 +1747,21 @@ func (r *SkyhookReconciler) DrainNode(ctx context.Context, skyhookNode wrapper.S skyhookNode.StartDrain(now) skyhookNode.SetStatus(v1alpha1.StatusInProgress) } else if drainConfig != nil && drain.TimedOut(drainStartedAt, drainConfig.Timeout, now.Time) { - if skyhookNode.Status() != v1alpha1.StatusErroring { - r.recorder.Eventf(skyhookNode.GetNode(), nil, corev1.EventTypeWarning, EventsReasonSkyhookDrain, "DrainTimeout", - "drain timed out after [%s] for node [%s] package [%s:%s] from [skyhook:%s]", - drainConfig.Timeout.Duration, - skyhookNode.GetNode().Name, - _package.Name, - _package.Version, - skyhookNode.GetSkyhook().Name, - ) - r.recorder.Eventf(skyhookNode.GetSkyhook().Skyhook, nil, corev1.EventTypeWarning, EventsReasonSkyhookDrain, "DrainTimeout", - "drain timed out after [%s] for node [%s] package [%s:%s]", - drainConfig.Timeout.Duration, - skyhookNode.GetNode().Name, - _package.Name, - _package.Version, - ) - } + r.recorder.Eventf(skyhookNode.GetNode(), nil, corev1.EventTypeWarning, EventsReasonSkyhookDrain, "DrainTimeout", + "drain timed out after [%s] for node [%s] package [%s:%s] from [skyhook:%s]", + drainConfig.Timeout.Duration, + skyhookNode.GetNode().Name, + _package.Name, + _package.Version, + skyhookNode.GetSkyhook().Name, + ) + r.recorder.Eventf(skyhookNode.GetSkyhook().Skyhook, nil, corev1.EventTypeWarning, EventsReasonSkyhookDrain, "DrainTimeout", + "drain timed out after [%s] for node [%s] package [%s:%s]", + drainConfig.Timeout.Duration, + skyhookNode.GetNode().Name, + _package.Name, + _package.Version, + ) skyhookNode.SetStatus(v1alpha1.StatusErroring) return false, nil } diff --git a/operator/internal/controller/skyhook_controller_test.go b/operator/internal/controller/skyhook_controller_test.go index 43dec61b..ac399892 100644 --- a/operator/internal/controller/skyhook_controller_test.go +++ b/operator/internal/controller/skyhook_controller_test.go @@ -810,6 +810,71 @@ var _ = Describe("skyhook controller tests", func() { Expect(deleteCalled).To(BeFalse()) Expect(skyhookNode.Status()).To(Equal(v1alpha1.StatusErroring)) }) + + It("should emit drain timeout events when the node is already erroring", func() { + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "workload", + Namespace: "default", + OwnerReferences: []metav1.OwnerReference{ + { + Kind: "ReplicaSet", + Name: "workload-rs", + Controller: ptr(true), + }, + }, + }, + Spec: corev1.PodSpec{ + NodeName: "node-a", + Containers: []corev1.Container{ + {Name: "workload", Image: "busybox"}, + }, + }, + Status: corev1.PodStatus{Phase: corev1.PodRunning}, + } + + baseClient := fakeDrainClient(pod) + testClient := interceptor.NewClient(baseClient, interceptor.Funcs{ + SubResourceCreate: func(ctx context.Context, c client.Client, subResourceName string, obj client.Object, subResource client.Object, opts ...client.SubResourceCreateOption) error { + Fail("drain timeout should not evict pods") + return nil + }, + }) + + recorder := events.NewFakeRecorder(10) + r, err := NewSkyhookReconciler(testClient.Scheme(), testClient, recorder, opts) + Expect(err).ToNot(HaveOccurred()) + + node := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node-a", + Annotations: map[string]string{ + "skyhook.nvidia.com/drainStart_drain-timeout": time.Now().Add(-2 * time.Minute).Format(time.RFC3339Nano), + "skyhook.nvidia.com/status_drain-timeout": string(v1alpha1.StatusErroring), + }, + }, + } + skyhook := &v1alpha1.Skyhook{ + ObjectMeta: metav1.ObjectMeta{Name: "drain-timeout"}, + Spec: v1alpha1.SkyhookSpec{ + DrainConfig: &v1alpha1.DrainConfig{ + Timeout: &metav1.Duration{Duration: time.Second}, + }, + Packages: v1alpha1.Packages{}, + }, + } + skyhookNode, err := wrapper.NewSkyhookNode(node, skyhook) + Expect(err).ToNot(HaveOccurred()) + + drained, err := r.DrainNode(ctx, skyhookNode, &v1alpha1.Package{ + PackageRef: v1alpha1.PackageRef{Name: "pkg", Version: "1.0.0"}, + }) + Expect(err).ToNot(HaveOccurred()) + Expect(drained).To(BeFalse()) + Expect(skyhookNode.Status()).To(Equal(v1alpha1.StatusErroring)) + Eventually(recorder.Events).Should(Receive(ContainSubstring("Warning Drain drain timed out after [1s] for node [node-a] package [pkg:1.0.0] from [skyhook:drain-timeout]"))) + Eventually(recorder.Events).Should(Receive(ContainSubstring("Warning Drain drain timed out after [1s] for node [node-a] package [pkg:1.0.0]"))) + }) }) It("should set monotonic SKYHOOK_NODE_ORDER across nodes and batches", func() {