From 342576a96de6335925801e52409364ad551c1394 Mon Sep 17 00:00:00 2001 From: "pingshan.wj" Date: Wed, 1 Apr 2026 19:11:11 +0800 Subject: [PATCH] feat(k8s): add Pool rolling update maxUnavailable, close #610 --- kubernetes/Makefile | 7 +- .../apis/sandbox/v1alpha1/pool_types.go | 16 + .../sandbox/v1alpha1/zz_generated.deepcopy.go | 25 + .../bases/sandbox.opensandbox.io_pools.yaml | 26 ++ .../internal/controller/pool_controller.go | 320 ++++++------- .../internal/controller/pool_eviction_test.go | 258 +++++------ kubernetes/internal/controller/pool_update.go | 113 +++++ .../internal/controller/pool_update_test.go | 433 ++++++++++++++++++ kubernetes/internal/utils/pod.go | 57 ++- kubernetes/internal/utils/pod_test.go | 259 +++++++++++ kubernetes/test/e2e/e2e_test.go | 258 +++++++++++ .../testdata/pool-with-update-strategy.yaml | 22 + 12 files changed, 1465 insertions(+), 329 deletions(-) create mode 100644 kubernetes/internal/controller/pool_update.go create mode 100644 kubernetes/internal/controller/pool_update_test.go create mode 100644 kubernetes/test/e2e/testdata/pool-with-update-strategy.yaml diff --git a/kubernetes/Makefile b/kubernetes/Makefile index b26a52286..0b358ea62 100644 --- a/kubernetes/Makefile +++ b/kubernetes/Makefile @@ -124,6 +124,7 @@ test: manifests generate fmt vet setup-envtest ## Run tests. KIND_CLUSTER ?= sandbox-k8s-test-e2e KIND_K8S_VERSION ?= v1.22.4 GINKGO_ARGS ?= +E2E_TIMEOUT ?= 30m .PHONY: install-kind install-kind: ## Install Kind using go install if not already installed @@ -148,15 +149,15 @@ setup-test-e2e: install-kind ## Set up a Kind cluster for e2e tests if it does n .PHONY: test-e2e-main test-e2e-main: setup-test-e2e manifests generate fmt vet CONTROLLER_IMG=$(CONTROLLER_IMG) TASK_EXECUTOR_IMG=$(TASK_EXECUTOR_IMG) \ - KIND_CLUSTER=$(KIND_CLUSTER) go test ./test/e2e/ -v -ginkgo.v $(GINKGO_ARGS) + KIND_CLUSTER=$(KIND_CLUSTER) go test ./test/e2e/ -v -ginkgo.v -timeout $(E2E_TIMEOUT) $(GINKGO_ARGS) $(MAKE) cleanup-test-e2e .PHONY: test-e2e test-e2e: setup-test-e2e manifests generate fmt vet ## Run the e2e tests. Expected an isolated environment using Kind. Use GINKGO_ARGS to pass additional arguments. CONTROLLER_IMG=$(CONTROLLER_IMG) TASK_EXECUTOR_IMG=$(TASK_EXECUTOR_IMG) \ - KIND_CLUSTER=$(KIND_CLUSTER) go test ./test/e2e/ -v -ginkgo.v $(GINKGO_ARGS) + KIND_CLUSTER=$(KIND_CLUSTER) go test ./test/e2e/ -v -ginkgo.v -timeout $(E2E_TIMEOUT) $(GINKGO_ARGS) CONTROLLER_IMG=$(CONTROLLER_IMG) TASK_EXECUTOR_IMG=$(TASK_EXECUTOR_IMG) \ - KIND_CLUSTER=$(KIND_CLUSTER) go test ./test/e2e_task/ -v -ginkgo.v $(GINKGO_ARGS) + KIND_CLUSTER=$(KIND_CLUSTER) go test ./test/e2e_task/ -v -ginkgo.v -timeout $(E2E_TIMEOUT) $(GINKGO_ARGS) $(MAKE) cleanup-test-e2e $(MAKE) test-gvisor CONTROLLER_IMG=$(CONTROLLER_IMG) TASK_EXECUTOR_IMG=$(TASK_EXECUTOR_IMG) $(MAKE) cleanup-gvisor diff --git a/kubernetes/apis/sandbox/v1alpha1/pool_types.go b/kubernetes/apis/sandbox/v1alpha1/pool_types.go index b652c6592..936e4288d 100644 --- a/kubernetes/apis/sandbox/v1alpha1/pool_types.go +++ b/kubernetes/apis/sandbox/v1alpha1/pool_types.go @@ -36,6 +36,9 @@ type PoolSpec struct { // ScaleStrategy controls the scaling behavior. // +optional ScaleStrategy *ScaleStrategy `json:"scaleStrategy,omitempty"` + // UpdateStrategy controls how pool pods are updated when the template changes. + // +optional + UpdateStrategy *UpdateStrategy `json:"updateStrategy,omitempty"` } type CapacitySpec struct { @@ -66,6 +69,15 @@ type ScaleStrategy struct { MaxUnavailable *intstr.IntOrString `json:"maxUnavailable,omitempty"` } +// UpdateStrategy controls how pool pods are updated when the pool template changes. +type UpdateStrategy struct { + // MaxUnavailable is the maximum number of pods that can be unavailable during an update. + // Can be an absolute number (ex: 5) or a percentage of desired pods (ex: "20%"). + // Defaults to 25%. + // +optional + MaxUnavailable *intstr.IntOrString `json:"maxUnavailable,omitempty"` +} + // PoolStatus defines the observed state of Pool. type PoolStatus struct { // ObservedGeneration is the most recent generation observed for this BatchSandbox. It corresponds to the @@ -79,6 +91,8 @@ type PoolStatus struct { Allocated int32 `json:"allocated"` // Available is the number of nodes currently available in the pool. Available int32 `json:"available"` + // Updated is the number of nodes that have been updated to the latest revision. + Updated int32 `json:"updated,omitempty"` } // +genclient @@ -88,6 +102,8 @@ type PoolStatus struct { // +kubebuilder:printcolumn:name="TOTAL",type="integer",JSONPath=".status.total",description="The number of all nodes in pool." // +kubebuilder:printcolumn:name="ALLOCATED",type="integer",JSONPath=".status.allocated",description="The number of allocated nodes in pool." // +kubebuilder:printcolumn:name="AVAILABLE",type="integer",JSONPath=".status.available",description="The number of available nodes in pool." +// +kubebuilder:printcolumn:name="UPDATED",type="integer",JSONPath=".status.updated",description="The number of nodes updated to the latest revision." +// +kubebuilder:printcolumn:name="AGE",type="date",JSONPath=".metadata.creationTimestamp" // Pool is the Schema for the pools API. type Pool struct { metav1.TypeMeta `json:",inline"` diff --git a/kubernetes/apis/sandbox/v1alpha1/zz_generated.deepcopy.go b/kubernetes/apis/sandbox/v1alpha1/zz_generated.deepcopy.go index 05024c26e..b74fc5f32 100644 --- a/kubernetes/apis/sandbox/v1alpha1/zz_generated.deepcopy.go +++ b/kubernetes/apis/sandbox/v1alpha1/zz_generated.deepcopy.go @@ -239,6 +239,11 @@ func (in *PoolSpec) DeepCopyInto(out *PoolSpec) { *out = new(ScaleStrategy) (*in).DeepCopyInto(*out) } + if in.UpdateStrategy != nil { + in, out := &in.UpdateStrategy, &out.UpdateStrategy + *out = new(UpdateStrategy) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PoolSpec. @@ -453,3 +458,23 @@ func (in *TaskTemplateSpec) DeepCopy() *TaskTemplateSpec { in.DeepCopyInto(out) return out } + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *UpdateStrategy) DeepCopyInto(out *UpdateStrategy) { + *out = *in + if in.MaxUnavailable != nil { + in, out := &in.MaxUnavailable, &out.MaxUnavailable + *out = new(intstr.IntOrString) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new UpdateStrategy. +func (in *UpdateStrategy) DeepCopy() *UpdateStrategy { + if in == nil { + return nil + } + out := new(UpdateStrategy) + in.DeepCopyInto(out) + return out +} diff --git a/kubernetes/config/crd/bases/sandbox.opensandbox.io_pools.yaml b/kubernetes/config/crd/bases/sandbox.opensandbox.io_pools.yaml index 297a8be87..213c16893 100644 --- a/kubernetes/config/crd/bases/sandbox.opensandbox.io_pools.yaml +++ b/kubernetes/config/crd/bases/sandbox.opensandbox.io_pools.yaml @@ -27,6 +27,13 @@ spec: jsonPath: .status.available name: AVAILABLE type: integer + - description: The number of nodes updated to the latest revision. + jsonPath: .status.updated + name: UPDATED + type: integer + - jsonPath: .metadata.creationTimestamp + name: AGE + type: date name: v1alpha1 schema: openAPIV3Schema: @@ -100,6 +107,20 @@ spec: template: description: Pod Template used to create pre-warmed nodes in the pool. x-kubernetes-preserve-unknown-fields: true + updateStrategy: + description: UpdateStrategy controls how pool pods are updated when + the template changes. + properties: + maxUnavailable: + anyOf: + - type: integer + - type: string + description: |- + MaxUnavailable is the maximum number of pods that can be unavailable during an update. + Can be an absolute number (ex: 5) or a percentage of desired pods (ex: "20%"). + Defaults to 25%. + x-kubernetes-int-or-string: true + type: object required: - capacitySpec type: object @@ -129,6 +150,11 @@ spec: description: Total is the total number of nodes in the pool. format: int32 type: integer + updated: + description: Updated is the number of nodes that have been updated + to the latest revision. + format: int32 + type: integer required: - allocated - available diff --git a/kubernetes/internal/controller/pool_controller.go b/kubernetes/internal/controller/pool_controller.go index 7b8871847..f7a4b566a 100644 --- a/kubernetes/internal/controller/pool_controller.go +++ b/kubernetes/internal/controller/pool_controller.go @@ -20,7 +20,6 @@ import ( "encoding/hex" gerrors "errors" "fmt" - "math" "sort" "time" @@ -147,7 +146,6 @@ func (r *PoolReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl. // reconcilePool contains the main reconciliation logic func (r *PoolReconciler) reconcilePool(ctx context.Context, pool *sandboxv1alpha1.Pool, batchSandboxes []*sandboxv1alpha1.BatchSandbox, pods []*corev1.Pod) (ctrl.Result, error) { - log := logf.FromContext(ctx) var result ctrl.Result err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { @@ -157,89 +155,49 @@ func (r *PoolReconciler) reconcilePool(ctx context.Context, pool *sandboxv1alpha return err } - // 2. Handle pod eviction requests - allocBeforeSchedule, err := r.Allocator.GetPoolAllocation(ctx, latestPool) - if err != nil { - log.Error(err, "Failed to get pool allocation") - return err + // 2. Handle pod eviction + schedulePods, evictionErr := r.handleEviction(ctx, latestPool, pods) + if schedulePods == nil { + return evictionErr } - evictionErr := r.handlePodEvictions(ctx, latestPool, pods, allocBeforeSchedule) - - // 3. Filter out evicting pods before scheduling - schedulePods := r.filterEvictingPods(ctx, latestPool, pods, allocBeforeSchedule) - - // 4. Schedule and allocate - podAllocation, pendingSyncs, idlePods, dirtyPods, supplySandbox, poolDirty, err := r.scheduleSandbox(ctx, latestPool, batchSandboxes, schedulePods) + // 3. Schedule sandbox (compute + persist + sync) + schedResult, err := r.scheduleSandbox(ctx, latestPool, batchSandboxes, schedulePods) if err != nil { return err } - - needReconcile := false - delay := time.Duration(0) - if supplySandbox > 0 && len(idlePods) > 0 { - needReconcile = true - delay = defaultRetryTime - } - if int32(len(idlePods)) >= supplySandbox { - supplySandbox = 0 - } else { - supplySandbox -= int32(len(idlePods)) - } - - // 1. Persist to memory - if poolDirty { - if err := r.Allocator.PersistPoolAllocation(ctx, latestPool, &AllocStatus{PodAllocation: podAllocation}); err != nil { - log.Error(err, "Failed to persist pool allocation") - return err - } - } - - // 2. Sync BatchSandbox allocations. Rollback in memory alocation if failed to sync. - // Optimize this concurrently if needed. - var syncErrs []error - for _, syncInfo := range pendingSyncs { - if err := r.Allocator.SyncSandboxAllocation(ctx, syncInfo.Sandbox, syncInfo.Pods); err != nil { - log.Error(err, "Failed to sync sandbox allocation", "sandbox", syncInfo.SandboxName) - syncErrs = append(syncErrs, fmt.Errorf("failed to sync sandbox %s: %w", syncInfo.SandboxName, err)) - } else { - log.Info("Successfully sync Sandbox allocation", "sandbox", syncInfo.SandboxName, "pods", syncInfo.Pods) - } - } - if len(syncErrs) > 0 { - return gerrors.Join(syncErrs...) + // Requeue if there are pending sandboxes waiting for scheduling + if schedResult.SupplyCnt > 0 { + result = ctrl.Result{RequeueAfter: defaultRetryTime} } - latestRevision, err := r.calculateRevision(latestPool) + // 4. Handle pool upgrade + updateResult, err := r.updatePool(ctx, latestPool, schedulePods, schedResult.IdlePods) if err != nil { return err } - latestIdlePods, deleteOld, supplyNew := r.updatePool(ctx, latestRevision, schedulePods, idlePods, dirtyPods) + // 5. Handle pool scale + toDeletePods := append(updateResult.ToDeletePods, schedResult.DirtyPods...) args := &scaleArgs{ - latestRevision: latestRevision, - pool: latestPool, + updateRevision: updateResult.UpdateRevision, pods: schedulePods, totalPodCnt: int32(len(pods)), - allocatedCnt: int32(len(podAllocation)), - idlePods: latestIdlePods, - redundantPods: deleteOld, - supplyCnt: supplySandbox + supplyNew, - } - if err := r.scalePool(ctx, args); err != nil { - return err + allocatedCnt: int32(len(schedResult.PodAllocation)), + idlePods: updateResult.IdlePods, + toDeletePods: toDeletePods, + supplyCnt: schedResult.SupplyCnt + updateResult.SupplyUpdateRevision, } - // 6. Update Status (use all pods for total count, schedulePods for available count) - if err := r.updatePoolStatus(ctx, latestRevision, latestPool, pods, schedulePods, podAllocation); err != nil { + if err := r.scalePool(ctx, latestPool, args); err != nil { return err } - if needReconcile { - result = ctrl.Result{RequeueAfter: delay} + // 6. Update pool status + if err := r.updatePoolStatus(ctx, updateResult.UpdateRevision, latestPool, pods, schedulePods, schedResult.PodAllocation); err != nil { + return err } - // Return eviction error last to trigger requeue for failed evictions if evictionErr != nil { return evictionErr } @@ -334,84 +292,98 @@ func (r *PoolReconciler) SetupWithManager(mgr ctrl.Manager) error { Complete(r) } -func (r *PoolReconciler) scheduleSandbox(ctx context.Context, pool *sandboxv1alpha1.Pool, batchSandboxes []*sandboxv1alpha1.BatchSandbox, pods []*corev1.Pod) (map[string]string, []SandboxSyncInfo, []string, []string, int32, bool, error) { +func (r *PoolReconciler) scheduleSandbox(ctx context.Context, pool *sandboxv1alpha1.Pool, batchSandboxes []*sandboxv1alpha1.BatchSandbox, pods []*corev1.Pod) (*ScheduleResult, error) { log := logf.FromContext(ctx) spec := &AllocSpec{ Sandboxes: batchSandboxes, Pool: pool, Pods: pods, } - status, pendingSyncs, poolDirty, err := r.Allocator.Schedule(ctx, spec) + allocStatus, pendingSyncs, poolDirty, err := r.Allocator.Schedule(ctx, spec) if err != nil { - return nil, nil, nil, nil, 0, false, err + return nil, err } idlePods := make([]string, 0) for _, pod := range pods { - if _, ok := status.PodAllocation[pod.Name]; !ok { + if _, ok := allocStatus.PodAllocation[pod.Name]; !ok { idlePods = append(idlePods, pod.Name) } } - log.Info("Schedule result", "pool", pool.Name, "allocated", len(status.PodAllocation), - "idlePods", len(idlePods), "supplement", status.PodSupplement, "pendingSyncs", len(pendingSyncs), "poolDirty", poolDirty) - return status.PodAllocation, pendingSyncs, idlePods, status.DirtyPods, status.PodSupplement, poolDirty, nil -} - -func (r *PoolReconciler) updatePool(ctx context.Context, latestRevision string, pods []*corev1.Pod, idlePods []string, dirtyPods []string) ([]string, []string, int32) { - podMap := make(map[string]*corev1.Pod) - for _, pod := range pods { - podMap[pod.Name] = pod - } - latestIdlePods := make([]string, 0) - deleteOld := make([]string, 0) - supplyNew := int32(0) + log.Info("Schedule result", "pool", pool.Name, "allocated", len(allocStatus.PodAllocation), + "idlePods", len(idlePods), "supplement", allocStatus.PodSupplement, "pendingSyncs", len(pendingSyncs), "poolDirty", poolDirty) - dirtySet := make(map[string]bool) - for _, p := range dirtyPods { - dirtySet[p] = true + schedResult := &ScheduleResult{ + PodAllocation: allocStatus.PodAllocation, + IdlePods: idlePods, + DirtyPods: allocStatus.DirtyPods, + SupplyCnt: allocStatus.PodSupplement, } - for _, name := range idlePods { - if dirtySet[name] { - deleteOld = append(deleteOld, name) - // no need to supply, next reconcile will do this job - continue + // Persist allocation to memory store + if poolDirty { + if err := r.Allocator.PersistPoolAllocation(ctx, pool, &AllocStatus{PodAllocation: allocStatus.PodAllocation}); err != nil { + log.Error(err, "Failed to persist pool allocation") + return nil, err } + } - pod, ok := podMap[name] - if !ok { - continue - } - revision := pod.Labels[LabelPoolRevision] - if revision == latestRevision { - latestIdlePods = append(latestIdlePods, name) + // Sync to each BatchSandbox + var syncErrs []error + for _, syncInfo := range pendingSyncs { + if err := r.Allocator.SyncSandboxAllocation(ctx, syncInfo.Sandbox, syncInfo.Pods); err != nil { + log.Error(err, "Failed to sync sandbox allocation", "sandbox", syncInfo.SandboxName) + syncErrs = append(syncErrs, fmt.Errorf("failed to sync sandbox %s: %w", syncInfo.SandboxName, err)) } else { - // Rolling: (1) delete old idle pods (2) create latest pods - deleteOld = append(deleteOld, name) - supplyNew++ + log.Info("Successfully sync Sandbox allocation", "sandbox", syncInfo.SandboxName, "pods", syncInfo.Pods) } } - if len(deleteOld) > 0 { - logf.FromContext(ctx).Info("Rolling update detected", "latestRevision", latestRevision, - "outdatedPods", deleteOld, "supplyNew", supplyNew, "latestIdlePods", len(latestIdlePods)) + if err := gerrors.Join(syncErrs...); err != nil { + return nil, err + } + + return schedResult, nil +} + +func (r *PoolReconciler) updatePool(ctx context.Context, pool *sandboxv1alpha1.Pool, pods []*corev1.Pod, idlePods []string) (*UpdateResult, error) { + updateRevision, err := r.calculateRevision(pool) + if err != nil { + return nil, err } - return latestIdlePods, deleteOld, supplyNew + strategy := NewPoolUpdateStrategy(pool) + result := strategy.Compute(ctx, updateRevision, pods, idlePods) + result.UpdateRevision = updateRevision + return result, nil } type scaleArgs struct { - latestRevision string - pool *sandboxv1alpha1.Pool + updateRevision string pods []*corev1.Pod totalPodCnt int32 // all pods including evicting ones, for PoolMax enforcement allocatedCnt int32 supplyCnt int32 // to create idlePods []string - redundantPods []string + toDeletePods []string +} + +// ScheduleResult holds the output of scheduleSandbox. +type ScheduleResult struct { + PodAllocation map[string]string + IdlePods []string + DirtyPods []string + SupplyCnt int32 +} + +type UpdateResult struct { + UpdateRevision string + IdlePods []string + ToDeletePods []string + // Supply Pods with update revision + SupplyUpdateRevision int32 } -func (r *PoolReconciler) scalePool(ctx context.Context, args *scaleArgs) error { +func (r *PoolReconciler) scalePool(ctx context.Context, pool *sandboxv1alpha1.Pool, args *scaleArgs) error { log := logf.FromContext(ctx) errs := make([]error, 0) - pool := args.pool pods := args.pods if satisfied, unsatisfiedDuration, dirtyPods := PoolScaleExpectations.SatisfiedExpectations(controllerutils.GetControllerKey(pool)); !satisfied { log.Info("Pool scale is not ready, requeue", "unsatisfiedDuration", unsatisfiedDuration, "dirtyPods", dirtyPods) @@ -421,7 +393,7 @@ func (r *PoolReconciler) scalePool(ctx context.Context, args *scaleArgs) error { totalPodCnt := args.totalPodCnt allocatedCnt := args.allocatedCnt supplyCnt := args.supplyCnt - redundantPods := args.redundantPods + toDeletePods := args.toDeletePods bufferCnt := schedulableCnt - allocatedCnt // Calculate desired buffer cnt. @@ -431,39 +403,30 @@ func (r *PoolReconciler) scalePool(ctx context.Context, args *scaleArgs) error { } // Calculate desired schedulable cnt. - desiredSchedulableCnt := allocatedCnt + supplyCnt + desiredBufferCnt - if desiredSchedulableCnt < pool.Spec.CapacitySpec.PoolMin { - desiredSchedulableCnt = pool.Spec.CapacitySpec.PoolMin - } + desiredSchedulableCnt := max(allocatedCnt+supplyCnt+desiredBufferCnt, pool.Spec.CapacitySpec.PoolMin) // Enforce PoolMax: limit new pods based on total running pods (including evicting). - maxNewPods := pool.Spec.CapacitySpec.PoolMax - totalPodCnt - if maxNewPods < 0 { - maxNewPods = 0 - } + maxNewPods := max(pool.Spec.CapacitySpec.PoolMax-totalPodCnt, 0) log.Info("Scale pool decision", "pool", pool.Name, "totalPodCnt", totalPodCnt, "schedulableCnt", schedulableCnt, "allocatedCnt", allocatedCnt, "bufferCnt", bufferCnt, "desiredBufferCnt", desiredBufferCnt, "supplyCnt", supplyCnt, "desiredSchedulableCnt", desiredSchedulableCnt, "maxNewPods", maxNewPods, - "redundantPods", len(redundantPods), "idlePods", len(args.idlePods)) + "toDeletePods", len(toDeletePods), "idlePods", len(args.idlePods)) // Scale-up: create new pods if needed and allowed by PoolMax if desiredSchedulableCnt > schedulableCnt && maxNewPods > 0 { - createCnt := desiredSchedulableCnt - schedulableCnt - if createCnt > maxNewPods { - createCnt = maxNewPods - } - maxUnavailable := r.getMaxUnavailable(pool, desiredSchedulableCnt) + createCnt := min(desiredSchedulableCnt-schedulableCnt, maxNewPods) + scaleMaxUnavailable := r.getScaleMaxUnavailable(pool, desiredSchedulableCnt) notReadyCnt := r.countNotReadyPods(pods) - limitedCreatCnt := maxUnavailable - notReadyCnt - createCnt = int32(math.Max(0, math.Min(float64(createCnt), float64(limitedCreatCnt)))) + limitedCreateCnt := scaleMaxUnavailable - notReadyCnt + createCnt = max(0, min(createCnt, limitedCreateCnt)) if createCnt > 0 { log.Info("Scaling up pool with constraint", "pool", pool.Name, - "createCnt", createCnt, "maxUnavailable", maxUnavailable, - "notReadyCnt", notReadyCnt, "desiredSchedulableCnt", desiredSchedulableCnt, "limitedCreatCnt", limitedCreatCnt) + "createCnt", createCnt, "scaleMaxUnavailable", scaleMaxUnavailable, + "notReadyCnt", notReadyCnt, "desiredSchedulableCnt", desiredSchedulableCnt, "limitedCreateCnt", limitedCreateCnt) for range createCnt { - if err := r.createPoolPod(ctx, pool, args.latestRevision); err != nil { + if err := r.createPoolPod(ctx, pool, args.updateRevision); err != nil { log.Error(err, "Failed to create pool pod") errs = append(errs, err) } @@ -476,9 +439,9 @@ func (r *PoolReconciler) scalePool(ctx context.Context, args *scaleArgs) error { if desiredSchedulableCnt < schedulableCnt { scaleIn = schedulableCnt - desiredSchedulableCnt } - if scaleIn > 0 || len(redundantPods) > 0 { - podsToDelete := r.pickPodsToDelete(pods, args.idlePods, args.redundantPods, scaleIn) - log.Info("Scaling down pool", "pool", pool.Name, "scaleIn", scaleIn, "redundantPods", len(redundantPods), "podsToDelete", len(podsToDelete)) + if scaleIn > 0 || len(toDeletePods) > 0 { + podsToDelete := r.pickPodsToDelete(pods, args.idlePods, args.toDeletePods, scaleIn) + log.Info("Scaling down pool", "pool", pool.Name, "scaleIn", scaleIn, "toDeletePods", len(toDeletePods), "podsToDelete", len(podsToDelete)) for _, pod := range podsToDelete { log.Info("Deleting pool pod", "pool", pool.Name, "pod", pod.Name) if err := r.Delete(ctx, pod); err != nil { @@ -490,61 +453,69 @@ func (r *PoolReconciler) scalePool(ctx context.Context, args *scaleArgs) error { return gerrors.Join(errs...) } -func (r *PoolReconciler) updatePoolStatus(ctx context.Context, latestRevision string, pool *sandboxv1alpha1.Pool, pods []*corev1.Pod, schedulePods []*corev1.Pod, podAllocation map[string]string) error { +func (r *PoolReconciler) updatePoolStatus(ctx context.Context, updateRevision string, pool *sandboxv1alpha1.Pool, pods []*corev1.Pod, schedulePods []*corev1.Pod, podAllocation map[string]string) error { oldStatus := pool.Status.DeepCopy() availableCnt := int32(0) for _, pod := range schedulePods { if _, ok := podAllocation[pod.Name]; ok { continue } - if !isPodReady(pod) { + if !utils.IsPodReady(pod) { continue } availableCnt++ } + updatedCnt := int32(0) + for _, pod := range pods { + if pod.Labels[LabelPoolRevision] == updateRevision { + updatedCnt++ + } + } pool.Status.ObservedGeneration = pool.Generation pool.Status.Total = int32(len(pods)) pool.Status.Allocated = int32(len(podAllocation)) pool.Status.Available = availableCnt - pool.Status.Revision = latestRevision + pool.Status.Revision = updateRevision + pool.Status.Updated = updatedCnt if equality.Semantic.DeepEqual(oldStatus, pool.Status) { return nil } log := logf.FromContext(ctx) log.Info("Update pool status", "ObservedGeneration", pool.Status.ObservedGeneration, "Total", pool.Status.Total, - "Allocated", pool.Status.Allocated, "Available", pool.Status.Available, "Revision", pool.Status.Revision) + "Allocated", pool.Status.Allocated, "Available", pool.Status.Available, "Revision", pool.Status.Revision, "Updated", pool.Status.Updated) if err := r.Status().Update(ctx, pool); err != nil { return err } return nil } -func (r *PoolReconciler) pickPodsToDelete(pods []*corev1.Pod, idlePodNames []string, redundantPodNames []string, scaleIn int32) []*corev1.Pod { - var idlePods []*corev1.Pod +func (r *PoolReconciler) pickPodsToDelete(pods []*corev1.Pod, idlePodNames []string, toDeletePodNames []string, scaleIn int32) []*corev1.Pod { podMap := make(map[string]*corev1.Pod) for _, pod := range pods { podMap[pod.Name] = pod } - for _, name := range idlePodNames { + + var podsToDelete []*corev1.Pod + for _, name := range toDeletePodNames { pod, ok := podMap[name] if !ok { continue } - idlePods = append(idlePods, pod) + podsToDelete = append(podsToDelete, pod) } - sort.Slice(idlePods, func(i, j int) bool { - return idlePods[i].CreationTimestamp.Before(&idlePods[j].CreationTimestamp) - }) - var podsToDelete []*corev1.Pod - for _, name := range redundantPodNames { // delete pod from pool update + var idlePods []*corev1.Pod + for _, name := range idlePodNames { pod, ok := podMap[name] if !ok { continue } - podsToDelete = append(podsToDelete, pod) + idlePods = append(idlePods, pod) } - for _, pod := range idlePods { // delete pod from pool scale + sort.Slice(idlePods, func(i, j int) bool { + return idlePods[i].CreationTimestamp.Before(&idlePods[j].CreationTimestamp) + }) + for _, pod := range idlePods { if scaleIn <= 0 { break } @@ -556,10 +527,10 @@ func (r *PoolReconciler) pickPodsToDelete(pods []*corev1.Pod, idlePodNames []str return podsToDelete } -// getMaxUnavailable returns the resolved maxUnavailable value. +// getScaleMaxUnavailable returns the resolved maxUnavailable value. // If not specified, defaults to 25% of desiredTotal. // Minimum return value is 1 to ensure scaling progress. -func (r *PoolReconciler) getMaxUnavailable(pool *sandboxv1alpha1.Pool, desiredTotal int32) int32 { +func (r *PoolReconciler) getScaleMaxUnavailable(pool *sandboxv1alpha1.Pool, desiredTotal int32) int32 { defaultPercentage := intstr.FromString("25%") maxUnavailable := &defaultPercentage @@ -580,14 +551,14 @@ func (r *PoolReconciler) getMaxUnavailable(pool *sandboxv1alpha1.Pool, desiredTo func (r *PoolReconciler) countNotReadyPods(pods []*corev1.Pod) int32 { var count int32 for _, pod := range pods { - if !isPodReady(pod) { + if !utils.IsPodReady(pod) { count++ } } return count } -func (r *PoolReconciler) createPoolPod(ctx context.Context, pool *sandboxv1alpha1.Pool, latestRevision string) error { +func (r *PoolReconciler) createPoolPod(ctx context.Context, pool *sandboxv1alpha1.Pool, updateRevision string) error { log := logf.FromContext(ctx) pod, err := utils.GetPodFromTemplate(pool.Spec.Template, pool, metav1.NewControllerRef(pool, sandboxv1alpha1.SchemeBuilder.GroupVersion.WithKind("Pool"))) if err != nil { @@ -597,7 +568,7 @@ func (r *PoolReconciler) createPoolPod(ctx context.Context, pool *sandboxv1alpha pod.Name = "" pod.GenerateName = pool.Name + "-" pod.Labels[LabelPoolName] = pool.Name - pod.Labels[LabelPoolRevision] = latestRevision + pod.Labels[LabelPoolRevision] = updateRevision if err := ctrl.SetControllerReference(pool, pod, r.Scheme); err != nil { return err } @@ -606,29 +577,40 @@ func (r *PoolReconciler) createPoolPod(ctx context.Context, pool *sandboxv1alpha return err } PoolScaleExpectations.ExpectScale(controllerutils.GetControllerKey(pool), expectations.Create, pod.Name) - log.Info("Created pool pod", "pool", pool.Name, "pod", pod.Name, "revision", latestRevision) + log.Info("Created pool pod", "pool", pool.Name, "pod", pod.Name, "revision", updateRevision) r.Recorder.Eventf(pool, corev1.EventTypeNormal, "SuccessfulCreate", "Created pool pod: %v", pod.Name) return nil } -// handlePodEvictions evicts idle pods marked for eviction. -// Eviction errors don't block the current reconcile; they are returned last to trigger requeue. -func (r *PoolReconciler) handlePodEvictions(ctx context.Context, pool *sandboxv1alpha1.Pool, pods []*corev1.Pod, podAllocation map[string]string) error { +// handleEviction fetches the current allocation, evicts idle pods marked for eviction, +// and returns the schedulable pods (excluding evicting idle pods) along with any eviction error. +// Eviction errors are non-fatal: they are returned to trigger a requeue but do not block the current reconcile. +func (r *PoolReconciler) handleEviction(ctx context.Context, pool *sandboxv1alpha1.Pool, pods []*corev1.Pod) ([]*corev1.Pod, error) { log := logf.FromContext(ctx) + podAllocation, err := r.Allocator.GetPoolAllocation(ctx, pool) + if err != nil { + log.Error(err, "Failed to get pool allocation") + return nil, err + } + handler := eviction.NewEvictionHandler(ctx, r.Client, pool) var evictionErrs []error + filtered := make([]*corev1.Pod, 0, len(pods)) for _, pod := range pods { if !handler.NeedsEviction(pod) { + filtered = append(filtered, pod) continue } if sandboxName, allocated := podAllocation[pod.Name]; allocated { log.V(1).Info("Skipping eviction for allocated pod", "pod", pod.Name, "sandbox", sandboxName) + filtered = append(filtered, pod) continue } + // Idle pod marked for eviction: evict and exclude from scheduling log.Info("Evicting idle pool pod", "pool", pool.Name, "pod", pod.Name) if err := handler.Evict(ctx, pod); err != nil { log.Error(err, "Failed to evict pod", "pod", pod.Name) @@ -638,31 +620,5 @@ func (r *PoolReconciler) handlePodEvictions(ctx context.Context, pool *sandboxv1 } } - return gerrors.Join(evictionErrs...) -} - -// filterEvictingPods excludes idle pods marked for eviction from scheduling candidates. -// Allocated pods with eviction label are kept because they won't be deleted. -func (r *PoolReconciler) filterEvictingPods(ctx context.Context, pool *sandboxv1alpha1.Pool, pods []*corev1.Pod, podAllocation map[string]string) []*corev1.Pod { - handler := eviction.NewEvictionHandler(ctx, r.Client, pool) - filtered := make([]*corev1.Pod, 0, len(pods)) - for _, pod := range pods { - if handler.NeedsEviction(pod) && podAllocation[pod.Name] == "" { - continue - } - filtered = append(filtered, pod) - } - return filtered -} - -func isPodReady(pod *corev1.Pod) bool { - if pod.Status.Phase != corev1.PodRunning { - return false - } - for _, cond := range pod.Status.Conditions { - if cond.Type == corev1.PodReady && cond.Status == corev1.ConditionTrue { - return true - } - } - return false + return filtered, gerrors.Join(evictionErrs...) } diff --git a/kubernetes/internal/controller/pool_eviction_test.go b/kubernetes/internal/controller/pool_eviction_test.go index 52bd43953..f3798b666 100644 --- a/kubernetes/internal/controller/pool_eviction_test.go +++ b/kubernetes/internal/controller/pool_eviction_test.go @@ -30,6 +30,26 @@ import ( "github.com/alibaba/OpenSandbox/sandbox-k8s/internal/controller/eviction" ) +type stubAllocator struct { + podAllocation map[string]string +} + +func (a *stubAllocator) Schedule(_ context.Context, _ *AllocSpec) (*AllocStatus, []SandboxSyncInfo, bool, error) { + return nil, nil, false, nil +} +func (a *stubAllocator) GetPoolAllocation(_ context.Context, _ *sandboxv1alpha1.Pool) (map[string]string, error) { + return a.podAllocation, nil +} +func (a *stubAllocator) PersistPoolAllocation(_ context.Context, _ *sandboxv1alpha1.Pool, _ *AllocStatus) error { + return nil +} +func (a *stubAllocator) ClearPoolAllocation(_ context.Context, _ string, _ string) error { + return nil +} +func (a *stubAllocator) SyncSandboxAllocation(_ context.Context, _ *sandboxv1alpha1.BatchSandbox, _ []string) error { + return nil +} + func newEvictionTestPod(name string, labels map[string]string, deleting bool) *corev1.Pod { pod := &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ @@ -46,33 +66,29 @@ func newEvictionTestPod(name string, labels map[string]string, deleting bool) *c return pod } -func newEvictionTestReconciler(objs ...runtime.Object) *PoolReconciler { +func newEvictionTestReconciler(podAllocation map[string]string, objs ...runtime.Object) *PoolReconciler { scheme := runtime.NewScheme() _ = corev1.AddToScheme(scheme) _ = sandboxv1alpha1.AddToScheme(scheme) - clientObjs := make([]corev1.Pod, 0) + builder := fake.NewClientBuilder().WithScheme(scheme) for _, o := range objs { if pod, ok := o.(*corev1.Pod); ok { - clientObjs = append(clientObjs, *pod) + p := *pod + builder = builder.WithObjects(&p) } } - - builder := fake.NewClientBuilder().WithScheme(scheme) - for _, pod := range clientObjs { - p := pod - builder = builder.WithObjects(&p) - } c := builder.Build() return &PoolReconciler{ - Client: c, - Scheme: scheme, - Recorder: record.NewFakeRecorder(10), + Client: c, + Scheme: scheme, + Recorder: record.NewFakeRecorder(10), + Allocator: &stubAllocator{podAllocation: podAllocation}, } } -func TestFilterEvictingPods(t *testing.T) { +func TestHandleEviction(t *testing.T) { pool := &sandboxv1alpha1.Pool{ ObjectMeta: metav1.ObjectMeta{Name: "test-pool", Namespace: "default"}, } @@ -81,156 +97,112 @@ func TestFilterEvictingPods(t *testing.T) { evictLabel := map[string]string{eviction.LabelEvict: ""} normalLabel := map[string]string{"app": "test"} - tests := []struct { - name string - pods []*corev1.Pod - podAllocation map[string]string - expectNames []string - }{ - { - name: "no eviction labels keeps all pods", - pods: []*corev1.Pod{ - newEvictionTestPod("pod-1", normalLabel, false), - newEvictionTestPod("pod-2", normalLabel, false), - }, - podAllocation: map[string]string{}, - expectNames: []string{"pod-1", "pod-2"}, - }, - { - name: "unallocated eviction-labeled pods are excluded", - pods: []*corev1.Pod{ - newEvictionTestPod("pod-1", evictLabel, false), - newEvictionTestPod("pod-2", normalLabel, false), - }, - podAllocation: map[string]string{}, - expectNames: []string{"pod-2"}, - }, - { - name: "allocated eviction-labeled pods are kept", - pods: []*corev1.Pod{ - newEvictionTestPod("pod-1", evictLabel, false), - newEvictionTestPod("pod-2", normalLabel, false), - }, - podAllocation: map[string]string{"pod-1": "sandbox-1"}, - expectNames: []string{"pod-1", "pod-2"}, - }, - { - name: "mix of allocated and unallocated eviction-labeled pods", - pods: []*corev1.Pod{ - newEvictionTestPod("pod-1", evictLabel, false), - newEvictionTestPod("pod-2", evictLabel, false), - newEvictionTestPod("pod-3", normalLabel, false), - }, - podAllocation: map[string]string{"pod-1": "sandbox-1"}, - expectNames: []string{"pod-1", "pod-3"}, - }, - { - name: "deleting pods with eviction label are not excluded (DeletionTimestamp skips NeedsEviction)", - pods: []*corev1.Pod{ - newEvictionTestPod("pod-1", evictLabel, true), - }, - podAllocation: map[string]string{}, - expectNames: []string{"pod-1"}, - }, - { - name: "empty pod list", - pods: []*corev1.Pod{}, - podAllocation: map[string]string{}, - expectNames: []string{}, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - r := newEvictionTestReconciler() - got := r.filterEvictingPods(ctx, pool, tt.pods, tt.podAllocation) + t.Run("no eviction labels keeps all pods", func(t *testing.T) { + pods := []*corev1.Pod{ + newEvictionTestPod("pod-1", normalLabel, false), + newEvictionTestPod("pod-2", normalLabel, false), + } + r := newEvictionTestReconciler(map[string]string{}, pods[0], pods[1]) - if len(got) != len(tt.expectNames) { - t.Fatalf("filterEvictingPods() returned %d pods, want %d", len(got), len(tt.expectNames)) - } - for i, pod := range got { - if pod.Name != tt.expectNames[i] { - t.Errorf("pod[%d] = %s, want %s", i, pod.Name, tt.expectNames[i]) - } + got, err := r.handleEviction(ctx, pool, pods) + if err != nil { + t.Fatalf("handleEviction() returned error: %v", err) + } + expectNames := []string{"pod-1", "pod-2"} + if len(got) != len(expectNames) { + t.Fatalf("got %d pods, want %d", len(got), len(expectNames)) + } + for i, pod := range got { + if pod.Name != expectNames[i] { + t.Errorf("pod[%d] = %s, want %s", i, pod.Name, expectNames[i]) } - }) - } -} - -func TestHandlePodEvictions(t *testing.T) { - pool := &sandboxv1alpha1.Pool{ - ObjectMeta: metav1.ObjectMeta{Name: "test-pool", Namespace: "default"}, - } - ctx := context.Background() - - evictLabel := map[string]string{eviction.LabelEvict: ""} - normalLabel := map[string]string{"app": "test"} + } + }) - t.Run("evicts unallocated pods with eviction label", func(t *testing.T) { + t.Run("unallocated eviction-labeled pods are evicted and excluded", func(t *testing.T) { pod1 := newEvictionTestPod("pod-1", evictLabel, false) pod2 := newEvictionTestPod("pod-2", normalLabel, false) - r := newEvictionTestReconciler(pod1, pod2) + r := newEvictionTestReconciler(map[string]string{}, pod1, pod2) - err := r.handlePodEvictions(ctx, pool, []*corev1.Pod{pod1, pod2}, map[string]string{}) + got, err := r.handleEviction(ctx, pool, []*corev1.Pod{pod1, pod2}) if err != nil { - t.Fatalf("handlePodEvictions() returned error: %v", err) + t.Fatalf("handleEviction() returned error: %v", err) + } + if len(got) != 1 || got[0].Name != "pod-2" { + t.Fatalf("got %v, want [pod-2]", got) } - // pod-1 should be deleted - got := &corev1.Pod{} - if err := r.Client.Get(ctx, types.NamespacedName{Name: "pod-1", Namespace: "default"}, got); err == nil { + check := &corev1.Pod{} + if err := r.Client.Get(ctx, types.NamespacedName{Name: "pod-1", Namespace: "default"}, check); err == nil { t.Error("expected pod-1 to be deleted") } - // pod-2 should still exist - if err := r.Client.Get(ctx, types.NamespacedName{Name: "pod-2", Namespace: "default"}, got); err != nil { - t.Error("expected pod-2 to still exist") - } }) - t.Run("skips allocated pods with eviction label", func(t *testing.T) { + t.Run("allocated eviction-labeled pods are kept", func(t *testing.T) { pod1 := newEvictionTestPod("pod-1", evictLabel, false) - r := newEvictionTestReconciler(pod1) + pod2 := newEvictionTestPod("pod-2", normalLabel, false) + alloc := map[string]string{"pod-1": "sandbox-1"} + r := newEvictionTestReconciler(alloc, pod1, pod2) - podAllocation := map[string]string{"pod-1": "sandbox-1"} - err := r.handlePodEvictions(ctx, pool, []*corev1.Pod{pod1}, podAllocation) + got, err := r.handleEviction(ctx, pool, []*corev1.Pod{pod1, pod2}) if err != nil { - t.Fatalf("handlePodEvictions() returned error: %v", err) + t.Fatalf("handleEviction() returned error: %v", err) } - - // pod-1 should still exist - got := &corev1.Pod{} - if err := r.Client.Get(ctx, types.NamespacedName{Name: "pod-1", Namespace: "default"}, got); err != nil { - t.Error("expected allocated pod-1 to still exist") + expectNames := []string{"pod-1", "pod-2"} + if len(got) != len(expectNames) { + t.Fatalf("got %d pods, want %d", len(got), len(expectNames)) + } + for i, pod := range got { + if pod.Name != expectNames[i] { + t.Errorf("pod[%d] = %s, want %s", i, pod.Name, expectNames[i]) + } } }) - t.Run("skips pods without eviction label", func(t *testing.T) { - pod1 := newEvictionTestPod("pod-1", normalLabel, false) - r := newEvictionTestReconciler(pod1) + t.Run("mix of allocated and unallocated eviction-labeled pods", func(t *testing.T) { + pod1 := newEvictionTestPod("pod-1", evictLabel, false) + pod2 := newEvictionTestPod("pod-2", evictLabel, false) + pod3 := newEvictionTestPod("pod-3", normalLabel, false) + alloc := map[string]string{"pod-1": "sandbox-1"} + r := newEvictionTestReconciler(alloc, pod1, pod2, pod3) - err := r.handlePodEvictions(ctx, pool, []*corev1.Pod{pod1}, map[string]string{}) + got, err := r.handleEviction(ctx, pool, []*corev1.Pod{pod1, pod2, pod3}) if err != nil { - t.Fatalf("handlePodEvictions() returned error: %v", err) + t.Fatalf("handleEviction() returned error: %v", err) } - - got := &corev1.Pod{} - if err := r.Client.Get(ctx, types.NamespacedName{Name: "pod-1", Namespace: "default"}, got); err != nil { - t.Error("expected pod-1 to still exist") + expectNames := []string{"pod-1", "pod-3"} + if len(got) != len(expectNames) { + t.Fatalf("got %d pods, want %d", len(got), len(expectNames)) + } + for i, pod := range got { + if pod.Name != expectNames[i] { + t.Errorf("pod[%d] = %s, want %s", i, pod.Name, expectNames[i]) + } } }) - t.Run("skips pods already deleting", func(t *testing.T) { + t.Run("deleting pods with eviction label are not evicted", func(t *testing.T) { pod1 := newEvictionTestPod("pod-1", evictLabel, true) - r := newEvictionTestReconciler(pod1) + r := newEvictionTestReconciler(map[string]string{}, pod1) - err := r.handlePodEvictions(ctx, pool, []*corev1.Pod{pod1}, map[string]string{}) + got, err := r.handleEviction(ctx, pool, []*corev1.Pod{pod1}) if err != nil { - t.Fatalf("handlePodEvictions() returned error: %v", err) + t.Fatalf("handleEviction() returned error: %v", err) } + if len(got) != 1 || got[0].Name != "pod-1" { + t.Fatalf("expected deleting pod to be kept, got %v", got) + } + }) + + t.Run("empty pod list", func(t *testing.T) { + r := newEvictionTestReconciler(map[string]string{}) - got := &corev1.Pod{} - if err := r.Client.Get(ctx, types.NamespacedName{Name: "pod-1", Namespace: "default"}, got); err != nil { - t.Error("expected deleting pod-1 to still exist") + got, err := r.handleEviction(ctx, pool, []*corev1.Pod{}) + if err != nil { + t.Fatalf("handleEviction() returned error: %v", err) + } + if len(got) != 0 { + t.Fatalf("expected empty result, got %d pods", len(got)) } }) @@ -238,25 +210,25 @@ func TestHandlePodEvictions(t *testing.T) { pod1 := newEvictionTestPod("pod-1", evictLabel, false) pod2 := newEvictionTestPod("pod-2", evictLabel, false) pod3 := newEvictionTestPod("pod-3", evictLabel, false) - r := newEvictionTestReconciler(pod1, pod2, pod3) + alloc := map[string]string{"pod-2": "sandbox-1"} + r := newEvictionTestReconciler(alloc, pod1, pod2, pod3) - podAllocation := map[string]string{"pod-2": "sandbox-1"} - err := r.handlePodEvictions(ctx, pool, []*corev1.Pod{pod1, pod2, pod3}, podAllocation) + got, err := r.handleEviction(ctx, pool, []*corev1.Pod{pod1, pod2, pod3}) if err != nil { - t.Fatalf("handlePodEvictions() returned error: %v", err) + t.Fatalf("handleEviction() returned error: %v", err) + } + if len(got) != 1 || got[0].Name != "pod-2" { + t.Fatalf("expected only pod-2, got %v", got) } - got := &corev1.Pod{} - // pod-1 (idle, eviction-labeled) -> deleted - if err := r.Client.Get(ctx, types.NamespacedName{Name: "pod-1", Namespace: "default"}, got); err == nil { + check := &corev1.Pod{} + if err := r.Client.Get(ctx, types.NamespacedName{Name: "pod-1", Namespace: "default"}, check); err == nil { t.Error("expected pod-1 to be deleted") } - // pod-2 (allocated, eviction-labeled) -> kept - if err := r.Client.Get(ctx, types.NamespacedName{Name: "pod-2", Namespace: "default"}, got); err != nil { + if err := r.Client.Get(ctx, types.NamespacedName{Name: "pod-2", Namespace: "default"}, check); err != nil { t.Error("expected allocated pod-2 to still exist") } - // pod-3 (idle, eviction-labeled) -> deleted - if err := r.Client.Get(ctx, types.NamespacedName{Name: "pod-3", Namespace: "default"}, got); err == nil { + if err := r.Client.Get(ctx, types.NamespacedName{Name: "pod-3", Namespace: "default"}, check); err == nil { t.Error("expected pod-3 to be deleted") } }) diff --git a/kubernetes/internal/controller/pool_update.go b/kubernetes/internal/controller/pool_update.go new file mode 100644 index 000000000..a1e889012 --- /dev/null +++ b/kubernetes/internal/controller/pool_update.go @@ -0,0 +1,113 @@ +// Copyright 2025 Alibaba Group Holding Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package controller + +import ( + "context" + "sort" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/intstr" + logf "sigs.k8s.io/controller-runtime/pkg/log" + + sandboxv1alpha1 "github.com/alibaba/OpenSandbox/sandbox-k8s/apis/sandbox/v1alpha1" + "github.com/alibaba/OpenSandbox/sandbox-k8s/internal/utils" +) + +type PoolUpdateStrategy interface { + Compute(ctx context.Context, updateRevision string, pods []*corev1.Pod, idlePods []string) *UpdateResult +} + +func NewPoolUpdateStrategy(pool *sandboxv1alpha1.Pool) PoolUpdateStrategy { + return &recreateUpdateStrategy{pool: pool} +} + +func getUpdateMaxUnavailable(pool *sandboxv1alpha1.Pool, desiredTotal int32) int32 { + defaultPercentage := intstr.FromString("25%") + maxUnavailable := &defaultPercentage + if pool.Spec.UpdateStrategy != nil && pool.Spec.UpdateStrategy.MaxUnavailable != nil { + maxUnavailable = pool.Spec.UpdateStrategy.MaxUnavailable + } + result, err := intstr.GetScaledValueFromIntOrPercent(maxUnavailable, int(desiredTotal), true) + if err != nil || result < 1 { + result = 1 + } + return int32(result) +} + +type recreateUpdateStrategy struct { + pool *sandboxv1alpha1.Pool +} + +func (s *recreateUpdateStrategy) Compute(ctx context.Context, updateRevision string, pods []*corev1.Pod, idlePods []string) *UpdateResult { + log := logf.FromContext(ctx) + maxUnavailable := getUpdateMaxUnavailable(s.pool, int32(len(pods))) + + podMap := make(map[string]*corev1.Pod, len(pods)) + for _, pod := range pods { + podMap[pod.Name] = pod + } + + curUnavailable := int32(0) + for _, pod := range pods { + if !utils.IsPodReady(pod) { + curUnavailable++ + } + } + unavailableBudget := max(maxUnavailable-curUnavailable, 0) + + idlePodList := make([]*corev1.Pod, 0, len(idlePods)) + for _, name := range idlePods { + if pod, ok := podMap[name]; ok { + idlePodList = append(idlePodList, pod) + } + } + + sort.SliceStable(idlePodList, func(i, j int) bool { + return utils.ComparePodsForDeletion(idlePodList[i], idlePodList[j]) + }) + + toDeleteCurRevPods := make([]string, 0) + supplyNew := int32(0) + remainingIdlePods := make([]string, 0) + + for _, pod := range idlePodList { + if pod.Labels[LabelPoolRevision] == updateRevision { + remainingIdlePods = append(remainingIdlePods, pod.Name) + continue + } + if !utils.IsPodReady(pod) { + toDeleteCurRevPods = append(toDeleteCurRevPods, pod.Name) + supplyNew++ + } else if unavailableBudget > 0 { + toDeleteCurRevPods = append(toDeleteCurRevPods, pod.Name) + supplyNew++ + unavailableBudget-- + } else { + remainingIdlePods = append(remainingIdlePods, pod.Name) + } + } + + if len(toDeleteCurRevPods) > 0 { + log.Info("Recreate update: to recreate current revision pods", "updateRevision", updateRevision, + "maxUnavailable", maxUnavailable, "curUnavailable", curUnavailable, + "toDeleteCurrentRevisionPods", toDeleteCurRevPods, "supplyNew", supplyNew, "idlePods", len(remainingIdlePods)) + } + return &UpdateResult{ + IdlePods: remainingIdlePods, + ToDeletePods: toDeleteCurRevPods, + SupplyUpdateRevision: supplyNew, + } +} diff --git a/kubernetes/internal/controller/pool_update_test.go b/kubernetes/internal/controller/pool_update_test.go new file mode 100644 index 000000000..aac660221 --- /dev/null +++ b/kubernetes/internal/controller/pool_update_test.go @@ -0,0 +1,433 @@ +// Copyright 2025 Alibaba Group Holding Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package controller + +import ( + "context" + "testing" + "time" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" + + sandboxv1alpha1 "github.com/alibaba/OpenSandbox/sandbox-k8s/apis/sandbox/v1alpha1" +) + +func TestResolveMaxUnavailable(t *testing.T) { + tests := []struct { + name string + pool *sandboxv1alpha1.Pool + desiredTotal int32 + want int32 + }{ + { + name: "default 25% of 10 = 3 (rounded up)", + pool: &sandboxv1alpha1.Pool{ + Spec: sandboxv1alpha1.PoolSpec{}, + }, + desiredTotal: 10, + want: 3, + }, + { + name: "default 25% of 4 = 1", + pool: &sandboxv1alpha1.Pool{ + Spec: sandboxv1alpha1.PoolSpec{}, + }, + desiredTotal: 4, + want: 1, + }, + { + name: "custom percentage 50% of 10 = 5", + pool: &sandboxv1alpha1.Pool{ + Spec: sandboxv1alpha1.PoolSpec{ + UpdateStrategy: &sandboxv1alpha1.UpdateStrategy{ + MaxUnavailable: intStrPtr("50%"), + }, + }, + }, + desiredTotal: 10, + want: 5, + }, + { + name: "custom absolute value 3", + pool: &sandboxv1alpha1.Pool{ + Spec: sandboxv1alpha1.PoolSpec{ + UpdateStrategy: &sandboxv1alpha1.UpdateStrategy{ + MaxUnavailable: intStrIntPtr(3), + }, + }, + }, + desiredTotal: 10, + want: 3, + }, + { + name: "absolute value 0 defaults to 1", + pool: &sandboxv1alpha1.Pool{ + Spec: sandboxv1alpha1.PoolSpec{ + UpdateStrategy: &sandboxv1alpha1.UpdateStrategy{ + MaxUnavailable: intStrPtr("0"), + }, + }, + }, + desiredTotal: 10, + want: 1, + }, + { + name: "percentage rounds up - 10% of 9 = 1", + pool: &sandboxv1alpha1.Pool{ + Spec: sandboxv1alpha1.PoolSpec{ + UpdateStrategy: &sandboxv1alpha1.UpdateStrategy{ + MaxUnavailable: intStrPtr("10%"), + }, + }, + }, + desiredTotal: 9, + want: 1, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := getUpdateMaxUnavailable(tt.pool, tt.desiredTotal) + if got != tt.want { + t.Errorf("resolveMaxUnavailable() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestRecreateUpdateStrategy_Compute(t *testing.T) { + ctx := context.Background() + updateRevision := "v2" + + tests := []struct { + name string + pool *sandboxv1alpha1.Pool + pods []*v1.Pod + idlePods []string + wantIdlePods []string + wantDeletePods []string + wantSupplyNew int32 + }{ + { + name: "all pods already at update revision - no deletion", + pool: &sandboxv1alpha1.Pool{ + Spec: sandboxv1alpha1.PoolSpec{ + UpdateStrategy: &sandboxv1alpha1.UpdateStrategy{ + MaxUnavailable: intStrPtr("25%"), + }, + }, + }, + pods: []*v1.Pod{ + makePod("pod-1", "v2", true, true), + makePod("pod-2", "v2", true, true), + }, + idlePods: []string{"pod-1", "pod-2"}, + wantIdlePods: []string{"pod-1", "pod-2"}, + wantDeletePods: []string{}, + wantSupplyNew: 0, + }, + { + name: "all idle pods at old revision - delete within budget", + pool: &sandboxv1alpha1.Pool{ + Spec: sandboxv1alpha1.PoolSpec{ + UpdateStrategy: &sandboxv1alpha1.UpdateStrategy{ + MaxUnavailable: intStrPtr("100%"), + }, + }, + }, + pods: []*v1.Pod{ + makePod("pod-1", "v1", true, true), + makePod("pod-2", "v1", true, true), + }, + idlePods: []string{"pod-1", "pod-2"}, + wantIdlePods: []string{}, + wantDeletePods: []string{"pod-1", "pod-2"}, + wantSupplyNew: 2, + }, + { + name: "old revision pods exceed budget - partial deletion", + pool: &sandboxv1alpha1.Pool{ + Spec: sandboxv1alpha1.PoolSpec{ + UpdateStrategy: &sandboxv1alpha1.UpdateStrategy{ + MaxUnavailable: intStrPtr("25%"), + }, + }, + }, + pods: []*v1.Pod{ + makePod("pod-1", "v1", true, true), + makePod("pod-2", "v1", true, true), + makePod("pod-3", "v1", true, true), + makePod("pod-4", "v1", true, true), + }, + idlePods: []string{"pod-1", "pod-2", "pod-3", "pod-4"}, + wantIdlePods: []string{"pod-1", "pod-2", "pod-3"}, + wantDeletePods: []string{"pod-4"}, + wantSupplyNew: 1, + }, + { + name: "unavailable pods don't consume budget", + pool: &sandboxv1alpha1.Pool{ + Spec: sandboxv1alpha1.PoolSpec{ + UpdateStrategy: &sandboxv1alpha1.UpdateStrategy{ + MaxUnavailable: intStrPtr("25%"), + }, + }, + }, + pods: []*v1.Pod{ + makePod("pod-1", "v1", false, true), + makePod("pod-2", "v1", true, true), + makePod("pod-3", "v1", true, true), + makePod("pod-4", "v1", true, true), + }, + idlePods: []string{"pod-1", "pod-2", "pod-3", "pod-4"}, + wantIdlePods: []string{"pod-2", "pod-3", "pod-4"}, + wantDeletePods: []string{"pod-1"}, + wantSupplyNew: 1, + }, + { + name: "mixed revisions - only delete old revision", + pool: &sandboxv1alpha1.Pool{ + Spec: sandboxv1alpha1.PoolSpec{ + UpdateStrategy: &sandboxv1alpha1.UpdateStrategy{ + MaxUnavailable: intStrPtr("100%"), + }, + }, + }, + pods: []*v1.Pod{ + makePod("pod-1", "v1", true, true), + makePod("pod-2", "v2", true, true), + makePod("pod-3", "v1", true, true), + }, + idlePods: []string{"pod-1", "pod-2", "pod-3"}, + wantIdlePods: []string{"pod-2"}, + wantDeletePods: []string{"pod-1", "pod-3"}, + wantSupplyNew: 2, + }, + { + name: "allocated pods not in idle list are ignored", + pool: &sandboxv1alpha1.Pool{ + Spec: sandboxv1alpha1.PoolSpec{ + UpdateStrategy: &sandboxv1alpha1.UpdateStrategy{ + MaxUnavailable: intStrPtr("100%"), + }, + }, + }, + pods: []*v1.Pod{ + makePod("pod-1", "v1", true, true), + makePod("pod-2", "v1", true, false), + makePod("pod-3", "v1", true, true), + }, + idlePods: []string{"pod-1", "pod-3"}, + wantIdlePods: []string{}, + wantDeletePods: []string{"pod-1", "pod-3"}, + wantSupplyNew: 2, + }, + { + name: "unready pods count towards curUnavailable", + pool: &sandboxv1alpha1.Pool{ + Spec: sandboxv1alpha1.PoolSpec{ + UpdateStrategy: &sandboxv1alpha1.UpdateStrategy{ + MaxUnavailable: intStrPtr("50%"), + }, + }, + }, + pods: []*v1.Pod{ + makePod("pod-1", "v1", false, false), + makePod("pod-2", "v1", false, false), + makePod("pod-3", "v1", true, true), + makePod("pod-4", "v1", true, true), + }, + idlePods: []string{"pod-3", "pod-4"}, + wantIdlePods: []string{"pod-3", "pod-4"}, + wantDeletePods: []string{}, + wantSupplyNew: 0, + }, + { + name: "empty idle pods", + pool: &sandboxv1alpha1.Pool{ + Spec: sandboxv1alpha1.PoolSpec{ + UpdateStrategy: &sandboxv1alpha1.UpdateStrategy{ + MaxUnavailable: intStrPtr("25%"), + }, + }, + }, + pods: []*v1.Pod{ + makePod("pod-1", "v1", true, false), + makePod("pod-2", "v1", true, false), + }, + idlePods: []string{}, + wantIdlePods: []string{}, + wantDeletePods: []string{}, + wantSupplyNew: 0, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + resetPodCounter() + strategy := &recreateUpdateStrategy{pool: tt.pool} + result := strategy.Compute(ctx, updateRevision, tt.pods, tt.idlePods) + + if !stringSlicesEqualUnordered(result.IdlePods, tt.wantIdlePods) { + t.Errorf("IdlePods = %v, want %v", result.IdlePods, tt.wantIdlePods) + } + if !stringSlicesEqualUnordered(result.ToDeletePods, tt.wantDeletePods) { + t.Errorf("ToDeletePods = %v, want %v", result.ToDeletePods, tt.wantDeletePods) + } + if result.SupplyUpdateRevision != tt.wantSupplyNew { + t.Errorf("SupplyUpdateRevision = %v, want %v", result.SupplyUpdateRevision, tt.wantSupplyNew) + } + }) + } +} + +func TestRecreateUpdateStrategy_Compute_Sorting(t *testing.T) { + ctx := context.Background() + updateRevision := "v2" + + pool := &sandboxv1alpha1.Pool{ + Spec: sandboxv1alpha1.PoolSpec{ + UpdateStrategy: &sandboxv1alpha1.UpdateStrategy{ + MaxUnavailable: intStrPtr("25%"), + }, + }, + } + + now := metav1.Now() + older := metav1.NewTime(now.Add(-2 * time.Hour)) + newer := metav1.NewTime(now.Add(-1 * time.Hour)) + + pods := []*v1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{Name: "assigned-running", CreationTimestamp: newer}, + Spec: v1.PodSpec{NodeName: "node-1"}, + Status: v1.PodStatus{ + Phase: v1.PodRunning, + Conditions: []v1.PodCondition{ + {Type: v1.PodReady, Status: v1.ConditionTrue, LastTransitionTime: older}, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{Name: "unassigned-pending", CreationTimestamp: older}, + Spec: v1.PodSpec{NodeName: ""}, + Status: v1.PodStatus{Phase: v1.PodPending}, + }, + { + ObjectMeta: metav1.ObjectMeta{Name: "assigned-not-ready", CreationTimestamp: now}, + Spec: v1.PodSpec{NodeName: "node-2"}, + Status: v1.PodStatus{Phase: v1.PodRunning, Conditions: []v1.PodCondition{}}, + }, + } + + for _, pod := range pods { + if pod.Labels == nil { + pod.Labels = make(map[string]string) + } + pod.Labels[LabelPoolRevision] = "v1" + } + + strategy := &recreateUpdateStrategy{pool: pool} + result := strategy.Compute(ctx, updateRevision, pods, []string{"assigned-running", "unassigned-pending", "assigned-not-ready"}) + + wantDelete := []string{"unassigned-pending", "assigned-not-ready"} + wantIdle := []string{"assigned-running"} + + if !stringSlicesEqual(result.ToDeletePods, wantDelete) { + t.Errorf("ToDeletePods = %v, want %v", result.ToDeletePods, wantDelete) + } + if !stringSlicesEqual(result.IdlePods, wantIdle) { + t.Errorf("IdlePods = %v, want %v", result.IdlePods, wantIdle) + } +} + +var podCreationCounter int + +func makePod(name, revision string, ready, idle bool) *v1.Pod { + podCreationCounter++ + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + CreationTimestamp: metav1.NewTime( + metav1.Now().Add(time.Duration(podCreationCounter) * time.Second), + ), + Labels: map[string]string{ + LabelPoolRevision: revision, + }, + }, + Spec: v1.PodSpec{ + NodeName: "node-1", + }, + Status: v1.PodStatus{ + Phase: v1.PodRunning, + }, + } + + if ready { + pod.Status.Conditions = []v1.PodCondition{ + {Type: v1.PodReady, Status: v1.ConditionTrue}, + } + } + + if !idle { + pod.Labels["allocated"] = "true" + } + + return pod +} + +func resetPodCounter() { + podCreationCounter = 0 +} + +func intStrPtr(s string) *intstr.IntOrString { + val := intstr.FromString(s) + return &val +} + +func intStrIntPtr(i int) *intstr.IntOrString { + val := intstr.FromInt(i) + return &val +} + +func stringSlicesEqual(a, b []string) bool { + if len(a) != len(b) { + return false + } + for i := range a { + if a[i] != b[i] { + return false + } + } + return true +} + +func stringSlicesEqualUnordered(a, b []string) bool { + if len(a) != len(b) { + return false + } + set := make(map[string]bool, len(a)) + for _, s := range a { + set[s] = true + } + for _, s := range b { + if !set[s] { + return false + } + } + return true +} diff --git a/kubernetes/internal/utils/pod.go b/kubernetes/internal/utils/pod.go index bc552893c..3556f67b2 100644 --- a/kubernetes/internal/utils/pod.go +++ b/kubernetes/internal/utils/pod.go @@ -46,7 +46,7 @@ func IsPodAvailable(pod *v1.Pod, minReadySeconds int32, now metav1.Time) bool { // IsPodReady returns true if a pod is ready; false otherwise. func IsPodReady(pod *v1.Pod) bool { - return IsPodReadyConditionTrue(pod.Status) + return pod.Status.Phase == v1.PodRunning && IsPodReadyConditionTrue(pod.Status) } // IsPodTerminal returns true if a pod is terminal, all containers are stopped and cannot ever regress. @@ -214,3 +214,58 @@ func (m MultiPodSorter) Sort(a, b *v1.Pod) int { } return 0 } + +// ComparePodsForDeletion compares two pods for deletion priority. +// Returns true if p1 should be deleted before p2. +// Priority order: Unassigned < Assigned, Pending < Unknown < Running, +// NotReady < Ready, shorter ready time < longer ready time, +// higher restarts < lower restarts, newer < older, name for tie-breaking. +func ComparePodsForDeletion(p1, p2 *v1.Pod) bool { + if len(p1.Spec.NodeName) != len(p2.Spec.NodeName) && (len(p1.Spec.NodeName) == 0 || len(p2.Spec.NodeName) == 0) { + return len(p1.Spec.NodeName) == 0 + } + + phaseOrder := map[v1.PodPhase]int{ + v1.PodPending: 0, + v1.PodUnknown: 1, + v1.PodRunning: 2, + } + if phaseOrder[p1.Status.Phase] != phaseOrder[p2.Status.Phase] { + return phaseOrder[p1.Status.Phase] < phaseOrder[p2.Status.Phase] + } + + p1Ready := IsPodReady(p1) + p2Ready := IsPodReady(p2) + if p1Ready != p2Ready { + return !p1Ready + } + + if p1Ready && p2Ready { + p1Cond := GetPodReadyCondition(p1.Status) + p2Cond := GetPodReadyCondition(p2.Status) + if p1Cond != nil && p2Cond != nil && !p1Cond.LastTransitionTime.Equal(&p2Cond.LastTransitionTime) { + return p1Cond.LastTransitionTime.Before(&p2Cond.LastTransitionTime) + } + } + + p1Restarts := maxContainerRestarts(p1) + p2Restarts := maxContainerRestarts(p2) + if p1Restarts != p2Restarts { + return p1Restarts > p2Restarts + } + + if !p1.CreationTimestamp.Equal(&p2.CreationTimestamp) { + return p2.CreationTimestamp.Before(&p1.CreationTimestamp) + } + return p1.Name < p2.Name +} + +func maxContainerRestarts(pod *v1.Pod) int32 { + var maxRestarts int32 + for _, cs := range pod.Status.ContainerStatuses { + if cs.RestartCount > maxRestarts { + maxRestarts = cs.RestartCount + } + } + return maxRestarts +} diff --git a/kubernetes/internal/utils/pod_test.go b/kubernetes/internal/utils/pod_test.go index 73bfb737f..eaa272052 100644 --- a/kubernetes/internal/utils/pod_test.go +++ b/kubernetes/internal/utils/pod_test.go @@ -17,11 +17,86 @@ package utils import ( "slices" "testing" + "time" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +func TestIsPodReady(t *testing.T) { + readyCondition := v1.PodCondition{Type: v1.PodReady, Status: v1.ConditionTrue} + notReadyCondition := v1.PodCondition{Type: v1.PodReady, Status: v1.ConditionFalse} + + tests := []struct { + name string + pod *v1.Pod + want bool + }{ + { + name: "running and ready", + pod: &v1.Pod{Status: v1.PodStatus{ + Phase: v1.PodRunning, + Conditions: []v1.PodCondition{readyCondition}, + }}, + want: true, + }, + { + name: "running but not ready", + pod: &v1.Pod{Status: v1.PodStatus{ + Phase: v1.PodRunning, + Conditions: []v1.PodCondition{notReadyCondition}, + }}, + want: false, + }, + { + name: "running but no ready condition", + pod: &v1.Pod{Status: v1.PodStatus{ + Phase: v1.PodRunning, + }}, + want: false, + }, + { + name: "pending with stale ready=true", + pod: &v1.Pod{Status: v1.PodStatus{ + Phase: v1.PodPending, + Conditions: []v1.PodCondition{readyCondition}, + }}, + want: false, + }, + { + name: "unknown phase with stale ready=true", + pod: &v1.Pod{Status: v1.PodStatus{ + Phase: v1.PodUnknown, + Conditions: []v1.PodCondition{readyCondition}, + }}, + want: false, + }, + { + name: "failed phase", + pod: &v1.Pod{Status: v1.PodStatus{ + Phase: v1.PodFailed, + }}, + want: false, + }, + { + name: "succeeded phase", + pod: &v1.Pod{Status: v1.PodStatus{ + Phase: v1.PodSucceeded, + }}, + want: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := IsPodReady(tt.pod) + if got != tt.want { + t.Errorf("IsPodReady() = %v, want %v", got, tt.want) + } + }) + } +} + func TestWithPodIndexSorter(t *testing.T) { tests := []struct { name string @@ -241,3 +316,187 @@ func TestMultiPodSorter_Integration(t *testing.T) { } } } + +func TestComparePodsForDeletion(t *testing.T) { + now := metav1.Now() + older := metav1.NewTime(now.Add(-1 * time.Hour)) + newer := metav1.NewTime(now.Add(-30 * time.Minute)) + + tests := []struct { + name string + p1 *v1.Pod + p2 *v1.Pod + want bool + }{ + { + name: "unassigned < assigned", + p1: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "unassigned"}, Spec: v1.PodSpec{NodeName: ""}}, + p2: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "assigned"}, Spec: v1.PodSpec{NodeName: "node-1"}}, + want: true, + }, + { + name: "assigned > unassigned", + p1: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "assigned"}, Spec: v1.PodSpec{NodeName: "node-1"}}, + p2: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "unassigned"}, Spec: v1.PodSpec{NodeName: ""}}, + want: false, + }, + { + name: "both unassigned - tie-break by name", + p1: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "a"}, Spec: v1.PodSpec{NodeName: ""}}, + p2: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "b"}, Spec: v1.PodSpec{NodeName: ""}}, + want: true, + }, + { + name: "both assigned - tie-break by name", + p1: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "a"}, Spec: v1.PodSpec{NodeName: "node-1"}}, + p2: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "b"}, Spec: v1.PodSpec{NodeName: "node-2"}}, + want: true, + }, + { + name: "pending < running", + p1: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pending"}, Status: v1.PodStatus{Phase: v1.PodPending}}, + p2: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "running"}, Status: v1.PodStatus{Phase: v1.PodRunning}}, + want: true, + }, + { + name: "running > pending", + p1: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "running"}, Status: v1.PodStatus{Phase: v1.PodRunning}}, + p2: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pending"}, Status: v1.PodStatus{Phase: v1.PodPending}}, + want: false, + }, + { + name: "unknown < running", + p1: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "unknown"}, Status: v1.PodStatus{Phase: v1.PodUnknown}}, + p2: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "running"}, Status: v1.PodStatus{Phase: v1.PodRunning}}, + want: true, + }, + { + name: "pending < unknown", + p1: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pending"}, Status: v1.PodStatus{Phase: v1.PodPending}}, + p2: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "unknown"}, Status: v1.PodStatus{Phase: v1.PodUnknown}}, + want: true, + }, + { + name: "not ready < ready", + p1: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "not-ready"}, Status: v1.PodStatus{Phase: v1.PodRunning, Conditions: []v1.PodCondition{}}}, + p2: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "ready"}, Status: v1.PodStatus{Phase: v1.PodRunning, Conditions: []v1.PodCondition{{Type: v1.PodReady, Status: v1.ConditionTrue}}}}, + want: true, + }, + { + name: "ready > not ready", + p1: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "ready"}, Status: v1.PodStatus{Phase: v1.PodRunning, Conditions: []v1.PodCondition{{Type: v1.PodReady, Status: v1.ConditionTrue}}}}, + p2: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "not-ready"}, Status: v1.PodStatus{Phase: v1.PodRunning, Conditions: []v1.PodCondition{}}}, + want: false, + }, + { + name: "shorter ready time < longer ready time", + p1: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "shorter"}, Status: v1.PodStatus{Phase: v1.PodRunning, Conditions: []v1.PodCondition{ + {Type: v1.PodReady, Status: v1.ConditionTrue, LastTransitionTime: older}, + }}}, + p2: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "longer"}, Status: v1.PodStatus{Phase: v1.PodRunning, Conditions: []v1.PodCondition{ + {Type: v1.PodReady, Status: v1.ConditionTrue, LastTransitionTime: newer}, + }}}, + want: true, + }, + { + name: "longer ready time > shorter ready time", + p1: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "longer"}, Status: v1.PodStatus{Phase: v1.PodRunning, Conditions: []v1.PodCondition{ + {Type: v1.PodReady, Status: v1.ConditionTrue, LastTransitionTime: newer}, + }}}, + p2: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "shorter"}, Status: v1.PodStatus{Phase: v1.PodRunning, Conditions: []v1.PodCondition{ + {Type: v1.PodReady, Status: v1.ConditionTrue, LastTransitionTime: older}, + }}}, + want: false, + }, + { + name: "higher restarts < lower restarts", + p1: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "high-restarts"}, Status: v1.PodStatus{Phase: v1.PodRunning, ContainerStatuses: []v1.ContainerStatus{ + {RestartCount: 5}, + }}}, + p2: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "low-restarts"}, Status: v1.PodStatus{Phase: v1.PodRunning, ContainerStatuses: []v1.ContainerStatus{ + {RestartCount: 1}, + }}}, + want: true, + }, + { + name: "lower restarts > higher restarts", + p1: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "low-restarts"}, Status: v1.PodStatus{Phase: v1.PodRunning, ContainerStatuses: []v1.ContainerStatus{ + {RestartCount: 1}, + }}}, + p2: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "high-restarts"}, Status: v1.PodStatus{Phase: v1.PodRunning, ContainerStatuses: []v1.ContainerStatus{ + {RestartCount: 5}, + }}}, + want: false, + }, + { + name: "newer < older", + p1: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "newer", CreationTimestamp: newer}}, + p2: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "older", CreationTimestamp: older}}, + want: true, + }, + { + name: "older > newer", + p1: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "older", CreationTimestamp: older}}, + p2: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "newer", CreationTimestamp: newer}}, + want: false, + }, + { + name: "equal pods", + p1: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "same", CreationTimestamp: now}, Spec: v1.PodSpec{NodeName: "node-1"}, Status: v1.PodStatus{Phase: v1.PodRunning}}, + p2: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "same", CreationTimestamp: now}, Spec: v1.PodSpec{NodeName: "node-1"}, Status: v1.PodStatus{Phase: v1.PodRunning}}, + want: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := ComparePodsForDeletion(tt.p1, tt.p2) + if got != tt.want { + t.Errorf("ComparePodsForDeletion() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestComparePodsForDeletion_SortIntegration(t *testing.T) { + now := metav1.Now() + older := metav1.NewTime(now.Add(-2 * time.Hour)) + newer := metav1.NewTime(now.Add(-1 * time.Hour)) + + pods := []*v1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{Name: "assigned-ready", CreationTimestamp: newer}, + Spec: v1.PodSpec{NodeName: "node-1"}, + Status: v1.PodStatus{Phase: v1.PodRunning, Conditions: []v1.PodCondition{ + {Type: v1.PodReady, Status: v1.ConditionTrue, LastTransitionTime: older}, + }}, + }, + { + ObjectMeta: metav1.ObjectMeta{Name: "unassigned-pending", CreationTimestamp: older}, + Spec: v1.PodSpec{NodeName: ""}, + Status: v1.PodStatus{Phase: v1.PodPending}, + }, + { + ObjectMeta: metav1.ObjectMeta{Name: "assigned-not-ready", CreationTimestamp: now}, + Spec: v1.PodSpec{NodeName: "node-2"}, + Status: v1.PodStatus{Phase: v1.PodRunning, Conditions: []v1.PodCondition{}}, + }, + } + + slices.SortStableFunc(pods, func(a, b *v1.Pod) int { + if ComparePodsForDeletion(a, b) { + return -1 + } + if ComparePodsForDeletion(b, a) { + return 1 + } + return 0 + }) + + expectedOrder := []string{"unassigned-pending", "assigned-not-ready", "assigned-ready"} + for i, pod := range pods { + if pod.Name != expectedOrder[i] { + t.Errorf("pod at index %d: got %s, want %s", i, pod.Name, expectedOrder[i]) + } + } +} diff --git a/kubernetes/test/e2e/e2e_test.go b/kubernetes/test/e2e/e2e_test.go index 4eada347f..f1a4ba4c2 100644 --- a/kubernetes/test/e2e/e2e_test.go +++ b/kubernetes/test/e2e/e2e_test.go @@ -1552,6 +1552,264 @@ var _ = Describe("Manager", Ordered, func() { }) }) + Context("Pool Update", func() { + BeforeAll(func() { + By("waiting for controller to be ready") + Eventually(func(g Gomega) { + cmd := exec.Command("kubectl", "get", "pods", "-l", "control-plane=controller-manager", + "-n", namespace, "-o", "jsonpath={.items[0].status.phase}") + output, err := utils.Run(cmd) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(output).To(Equal("Running")) + }, 2*time.Minute).Should(Succeed()) + }) + + It("should perform rolling update with maxUnavailable constraint", func() { + const poolName = "test-pool-rolling-update" + const testNamespace = "default" + const poolSize = 10 + const maxUnavailablePercent = "20%" + + By("creating a Pool with updateStrategy") + poolYAML, err := renderTemplate("testdata/pool-with-update-strategy.yaml", map[string]interface{}{ + "PoolName": poolName, + "SandboxImage": utils.SandboxImage, + "Namespace": testNamespace, + "BufferMax": poolSize, + "BufferMin": poolSize - 2, + "PoolMax": poolSize, + "PoolMin": poolSize, + "MaxUnavailable": maxUnavailablePercent, + }) + Expect(err).NotTo(HaveOccurred()) + + poolFile := filepath.Join("/tmp", poolName+".yaml") + err = os.WriteFile(poolFile, []byte(poolYAML), 0644) + Expect(err).NotTo(HaveOccurred()) + defer os.Remove(poolFile) + + cmd := exec.Command("kubectl", "apply", "-f", poolFile) + _, err = utils.Run(cmd) + Expect(err).NotTo(HaveOccurred()) + + By("waiting for Pool to have all pods running") + Eventually(func(g Gomega) { + cmd := exec.Command("kubectl", "get", "pool", poolName, "-n", testNamespace, + "-o", "jsonpath={.status.total}") + totalStr, err := utils.Run(cmd) + g.Expect(err).NotTo(HaveOccurred()) + total := 0 + fmt.Sscanf(totalStr, "%d", &total) + g.Expect(total).To(Equal(poolSize)) + + cmd = exec.Command("kubectl", "get", "pods", "-n", testNamespace, + "-l", fmt.Sprintf("sandbox.opensandbox.io/pool-name=%s", poolName), + "--field-selector=status.phase=Running", + "-o", "jsonpath={.items[*].metadata.name}") + output, err := utils.Run(cmd) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(len(strings.Fields(output))).To(Equal(poolSize)) + }, 3*time.Minute).Should(Succeed()) + + By("allocating some pods via BatchSandbox") + const batchSandboxName = "test-bs-rolling-update" + bsYAML, err := renderTemplate("testdata/batchsandbox-pooled-no-expire.yaml", map[string]interface{}{ + "BatchSandboxName": batchSandboxName, + "Namespace": testNamespace, + "Replicas": 3, + "PoolName": poolName, + }) + Expect(err).NotTo(HaveOccurred()) + + bsFile := filepath.Join("/tmp", batchSandboxName+".yaml") + err = os.WriteFile(bsFile, []byte(bsYAML), 0644) + Expect(err).NotTo(HaveOccurred()) + defer os.Remove(bsFile) + + cmd = exec.Command("kubectl", "apply", "-f", bsFile) + _, err = utils.Run(cmd) + Expect(err).NotTo(HaveOccurred()) + + var allocatedPods []string + Eventually(func(g Gomega) { + cmd := exec.Command("kubectl", "get", "batchsandbox", batchSandboxName, "-n", testNamespace, + "-o", "jsonpath={.metadata.annotations.sandbox\\.opensandbox\\.io/alloc-status}") + out, err := utils.Run(cmd) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(out).NotTo(BeEmpty()) + + var alloc struct { + Pods []string `json:"pods"` + } + err = json.Unmarshal([]byte(out), &alloc) + g.Expect(err).NotTo(HaveOccurred()) + allocatedPods = alloc.Pods + g.Expect(allocatedPods).To(HaveLen(3)) + }, 2*time.Minute).Should(Succeed()) + + By("recording initial revision from pool status") + cmd = exec.Command("kubectl", "get", "pool", poolName, "-n", testNamespace, + "-o", "jsonpath={.status.revision}") + initialRevision, err := utils.Run(cmd) + Expect(err).NotTo(HaveOccurred()) + + By("triggering pool update by changing template") + updatedPoolYAML, err := renderTemplate("testdata/pool-with-update-strategy.yaml", map[string]interface{}{ + "PoolName": poolName, + "SandboxImage": utils.SandboxImage, + "Namespace": testNamespace, + "BufferMax": poolSize, + "BufferMin": poolSize - 2, + "PoolMax": poolSize, + "PoolMin": poolSize, + "MaxUnavailable": maxUnavailablePercent, + "EnvValue": "v2", + }) + Expect(err).NotTo(HaveOccurred()) + + updatedPoolWithEnv := strings.Replace(updatedPoolYAML, "command: [\"sleep\", \"3600\"]", + "command: [\"sleep\", \"3600\"]\n env:\n - name: VERSION\n value: \"v2\"", 1) + err = os.WriteFile(poolFile, []byte(updatedPoolWithEnv), 0644) + Expect(err).NotTo(HaveOccurred()) + + cmd = exec.Command("kubectl", "apply", "-f", poolFile) + _, err = utils.Run(cmd) + Expect(err).NotTo(HaveOccurred()) + + By("verifying allocated pods are not deleted during upgrade") + Consistently(func(g Gomega) { + for _, pod := range allocatedPods { + cmd := exec.Command("kubectl", "get", "pod", pod, "-n", testNamespace, + "-o", "jsonpath={.metadata.deletionTimestamp}") + output, err := utils.Run(cmd) + g.Expect(err).NotTo(HaveOccurred(), "allocated pod %s should still exist", pod) + g.Expect(output).To(BeEmpty(), "allocated pod %s should not be terminating", pod) + } + }, 60*time.Second, 5*time.Second).Should(Succeed()) + + By("verifying new BatchSandbox can be allocated during upgrade") + const newBatchSandboxName = "test-bs-rolling-update-new" + newBSYAML, err := renderTemplate("testdata/batchsandbox-pooled-no-expire.yaml", map[string]interface{}{ + "BatchSandboxName": newBatchSandboxName, + "Namespace": testNamespace, + "Replicas": 1, + "PoolName": poolName, + }) + Expect(err).NotTo(HaveOccurred()) + + newBSFile := filepath.Join("/tmp", newBatchSandboxName+".yaml") + err = os.WriteFile(newBSFile, []byte(newBSYAML), 0644) + Expect(err).NotTo(HaveOccurred()) + defer os.Remove(newBSFile) + + cmd = exec.Command("kubectl", "apply", "-f", newBSFile) + _, err = utils.Run(cmd) + Expect(err).NotTo(HaveOccurred()) + + Eventually(func(g Gomega) { + cmd := exec.Command("kubectl", "get", "batchsandbox", newBatchSandboxName, "-n", testNamespace, + "-o", "jsonpath={.status.allocated}") + output, err := utils.Run(cmd) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(output).To(Equal("1")) + }, 2*time.Minute).Should(Succeed()) + + By("verifying maxUnavailable constraint during upgrade") + maxUnavailable := int(float64(poolSize) * 0.2) + if maxUnavailable < 1 { + maxUnavailable = 1 + } + + // Check a few times that unavailable pods don't exceed maxUnavailable + for i := 0; i < 5; i++ { + cmd := exec.Command("kubectl", "get", "pods", "-n", testNamespace, + "-l", fmt.Sprintf("sandbox.opensandbox.io/pool-name=%s", poolName), + "-o", "json") + output, err := utils.Run(cmd) + Expect(err).NotTo(HaveOccurred()) + + var podList struct { + Items []struct { + Status struct { + Phase string `json:"phase"` + Conditions []struct { + Type string `json:"type"` + Status string `json:"status"` + } `json:"conditions"` + } `json:"status"` + } `json:"items"` + } + err = json.Unmarshal([]byte(output), &podList) + Expect(err).NotTo(HaveOccurred()) + + unavailableCount := 0 + for _, pod := range podList.Items { + if pod.Status.Phase != "Running" { + unavailableCount++ + continue + } + ready := false + for _, cond := range pod.Status.Conditions { + if cond.Type == "Ready" && cond.Status == "True" { + ready = true + break + } + } + if !ready { + unavailableCount++ + } + } + Expect(unavailableCount).To(BeNumerically("<=", maxUnavailable+1), + "unavailable pods should not exceed maxUnavailable + 1 (allowing for timing)") + time.Sleep(2 * time.Second) + } + + By("verifying pool status reflects upgrade progress") + Eventually(func(g Gomega) { + cmd := exec.Command("kubectl", "get", "pool", poolName, "-n", testNamespace, + "-o", "jsonpath={.status.updated}") + updatedStr, err := utils.Run(cmd) + g.Expect(err).NotTo(HaveOccurred()) + updated := 0 + if updatedStr != "" { + fmt.Sscanf(updatedStr, "%d", &updated) + } + // At least some pods should be updated + g.Expect(updated).To(BeNumerically(">", 0), "some pods should be updated") + + // Revision should be different from initial + cmd = exec.Command("kubectl", "get", "pool", poolName, "-n", testNamespace, + "-o", "jsonpath={.status.revision}") + currentRevision, err := utils.Run(cmd) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(currentRevision).NotTo(Equal(initialRevision)) + }, 2*time.Minute).Should(Succeed()) + + By("releasing BatchSandbox to allow full upgrade") + cmd = exec.Command("kubectl", "delete", "batchsandbox", batchSandboxName, "-n", testNamespace) + _, _ = utils.Run(cmd) + cmd = exec.Command("kubectl", "delete", "batchsandbox", newBatchSandboxName, "-n", testNamespace) + _, _ = utils.Run(cmd) + + By("verifying pool eventually completes upgrade") + Eventually(func(g Gomega) { + cmd := exec.Command("kubectl", "get", "pool", poolName, "-n", testNamespace, + "-o", "jsonpath={.status.updated}") + updatedStr, err := utils.Run(cmd) + g.Expect(err).NotTo(HaveOccurred()) + updated := 0 + if updatedStr != "" { + fmt.Sscanf(updatedStr, "%d", &updated) + } + g.Expect(updated).To(Equal(poolSize), "all pods should be updated") + }, 3*time.Minute).Should(Succeed()) + + By("cleaning up") + cmd = exec.Command("kubectl", "delete", "pool", poolName, "-n", testNamespace) + _, _ = utils.Run(cmd) + }) + }) + Context("Pool State Recovery", func() { BeforeAll(func() { By("waiting for controller to be ready") diff --git a/kubernetes/test/e2e/testdata/pool-with-update-strategy.yaml b/kubernetes/test/e2e/testdata/pool-with-update-strategy.yaml new file mode 100644 index 000000000..4a32ab26b --- /dev/null +++ b/kubernetes/test/e2e/testdata/pool-with-update-strategy.yaml @@ -0,0 +1,22 @@ +apiVersion: sandbox.opensandbox.io/v1alpha1 +kind: Pool +metadata: + name: {{.PoolName}} + namespace: {{.Namespace}} +spec: + template: + spec: + containers: + - name: sandbox-container + image: {{.SandboxImage}} + command: ["sleep", "3600"] + env: + - name: VERSION + value: {{if .EnvValue}}"{{.EnvValue}}"{{else}}"v1"{{end}} + capacitySpec: + bufferMax: {{.BufferMax}} + bufferMin: {{.BufferMin}} + poolMax: {{.PoolMax}} + poolMin: {{.PoolMin}} + updateStrategy: + maxUnavailable: {{.MaxUnavailable}}