diff --git a/Makefile b/Makefile index 5647cd96..6d6b0cca 100644 --- a/Makefile +++ b/Makefile @@ -55,6 +55,8 @@ package-all: package-linux-amd64 package-linux-arm64 test: @echo "Running tests..." @go test -v ./... + @echo "Running local_e2e build-tag tests..." + @go test -v -tags local_e2e ./pkg/aksmachine ./pkg/cmd/daemon .PHONY: test-coverage test-coverage: diff --git a/cmd/e2ehelper/localmachine/localmachine.go b/cmd/e2ehelper/localmachine/localmachine.go index bb744c69..8a63cc1b 100644 --- a/cmd/e2ehelper/localmachine/localmachine.go +++ b/cmd/e2ehelper/localmachine/localmachine.go @@ -3,16 +3,21 @@ package localmachine import ( "context" "encoding/json" + "errors" "fmt" "io" "os" + "path/filepath" "github.com/spf13/cobra" "github.com/Azure/AKSFlexNode/pkg/aksmachine" - "github.com/Azure/AKSFlexNode/pkg/aksmachine/local" ) +const fileMode = 0o600 + +const localResourceID = "local-test-machine" + var ( flagPath string flagKubernetesVersion string @@ -78,28 +83,32 @@ func init() { Command.AddCommand(createCmd, getCmd, statusCmd, deleteCmd) } -func client() (*local.Client, error) { - return local.NewClient(flagPath) -} - func runCreate(ctx context.Context, out io.Writer) error { - c, err := client() - if err != nil { - return err + select { + case <-ctx.Done(): + return ctx.Err() + default: } - machine, err := c.Create(ctx, aksmachine.GoalState{KubernetesVersion: flagKubernetesVersion, SettingsVersion: flagSettingsVersion}) - if err != nil { + machine := &aksmachine.Machine{ + ID: localResourceID, + Goal: aksmachine.GoalState{ + KubernetesVersion: flagKubernetesVersion, + SettingsVersion: flagSettingsVersion, + }, + } + if err := writeLocalMachine(flagPath, machine); err != nil { return err } return writeMachine(out, machine) } func runGet(ctx context.Context, out io.Writer) error { - c, err := client() - if err != nil { - return err + select { + case <-ctx.Done(): + return ctx.Err() + default: } - machine, err := c.Get(ctx) + machine, err := readLocalMachine(flagPath) if err != nil { return err } @@ -107,7 +116,12 @@ func runGet(ctx context.Context, out io.Writer) error { } func runStatus(ctx context.Context, out io.Writer) error { - c, err := client() + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + machine, err := readLocalMachine(flagPath) if err != nil { return err } @@ -116,11 +130,8 @@ func runStatus(ctx context.Context, out io.Writer) error { ObservedSettingsVersion: flagObservedSettingsVersion, Message: flagMessage, } - if err := c.PatchStatus(ctx, status); err != nil { - return err - } - machine, err := c.Get(ctx) - if err != nil { + machine.Status = status + if err := writeLocalMachine(flagPath, machine); err != nil { return err } return writeMachine(out, machine) @@ -139,3 +150,37 @@ func writeMachine(out io.Writer, machine *aksmachine.Machine) error { enc.SetIndent("", " ") return enc.Encode(machine) } + +func readLocalMachine(path string) (*aksmachine.Machine, error) { + data, err := os.ReadFile(filepath.Clean(path)) + if errors.Is(err, os.ErrNotExist) { + return nil, &aksmachine.NotFoundError{Resource: path} + } + if err != nil { + return nil, fmt.Errorf("read machine file %s: %w", path, err) + } + + var machine aksmachine.Machine + if err := json.Unmarshal(data, &machine); err != nil { + return nil, fmt.Errorf("decode machine file %s: %w", path, err) + } + return &machine, nil +} + +func writeLocalMachine(path string, machine *aksmachine.Machine) error { + if machine == nil { + return fmt.Errorf("machine is nil") + } + if err := os.MkdirAll(filepath.Dir(path), 0o700); err != nil { + return fmt.Errorf("create local machine file directory: %w", err) + } + data, err := json.MarshalIndent(machine, "", " ") + if err != nil { + return fmt.Errorf("marshal machine: %w", err) + } + data = append(data, '\n') + if err := os.WriteFile(filepath.Clean(path), data, fileMode); err != nil { + return fmt.Errorf("write machine file %s: %w", path, err) + } + return nil +} diff --git a/hack/e2e/lib/common.sh b/hack/e2e/lib/common.sh index 80402e12..8f91d856 100755 --- a/hack/e2e/lib/common.sh +++ b/hack/e2e/lib/common.sh @@ -303,7 +303,7 @@ ensure_binary() { E2E_BINARY="${E2E_WORK_DIR}/aks-flex-node" ( cd "${REPO_ROOT}" - GOOS=linux GOARCH=amd64 go build -ldflags "${ldflags}" -o "${E2E_BINARY}" ./cmd/aks-flex-node + GOOS=linux GOARCH=amd64 go build -tags local_e2e -ldflags "${ldflags}" -o "${E2E_BINARY}" ./cmd/aks-flex-node ) chmod +x "${E2E_BINARY}" diff --git a/pkg/aksmachine/client.go b/pkg/aksmachine/client.go new file mode 100644 index 00000000..9b18eea5 --- /dev/null +++ b/pkg/aksmachine/client.go @@ -0,0 +1,15 @@ +package aksmachine + +import ( + "log/slog" + + "github.com/Azure/AKSFlexNode/pkg/config" +) + +func newMachineClientFromConfig(cfg *config.Config, logger *slog.Logger) (MachineClient, error) { + if cfg.Agent.ARMProxyURLOverrideForE2E != "" { + logger.Warn("using ARM proxy machine client for dev-test") + return newARMProxyClient(cfg, logger) + } + return newARMClient(cfg, logger) +} diff --git a/pkg/aksmachine/client_armapi.go b/pkg/aksmachine/client_armapi.go new file mode 100644 index 00000000..03f409de --- /dev/null +++ b/pkg/aksmachine/client_armapi.go @@ -0,0 +1,243 @@ +package aksmachine + +import ( + "context" + "errors" + "fmt" + "log/slog" + "net/http" + "strings" + + "github.com/Azure/azure-sdk-for-go/sdk/azcore" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/arm" + "github.com/Azure/azure-sdk-for-go/sdk/azidentity" + "github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/containerservice/armcontainerservice/v8" + + "github.com/Azure/AKSFlexNode/pkg/config" +) + +const ( + aksFlexNodePoolName = "aksflexnodes" + flexNodeTagKey = "aks-flex-node" +) + +type armMachineClient struct { + machineID *arm.ResourceID + client *armcontainerservice.MachinesClient + logger *slog.Logger +} + +// newARMClient returns a MachineClient backed by the AKS ARM Machine API. +func newARMClient(cfg *config.Config, logger *slog.Logger) (MachineClient, error) { + machineID, err := machineResourceIDFromConfig(cfg) + if err != nil { + return nil, err + } + cred, err := getCredential(cfg, logger) + if err != nil { + return nil, fmt.Errorf("resolve ARM credential: %w", err) + } + var armOpts *arm.ClientOptions // nil = default public ARM endpoint + client, err := armcontainerservice.NewMachinesClient(machineID.SubscriptionID, cred, armOpts) + if err != nil { + return nil, fmt.Errorf("create machines client: %w", err) + } + return &armMachineClient{ + machineID: machineID, + client: client, + logger: logger, + }, nil +} + +func (c *armMachineClient) Create(ctx context.Context, desired GoalState) (*Machine, error) { + if err := desired.validate(); err != nil { + return nil, fmt.Errorf("validate goal state: %w", err) + } + params := armcontainerservice.Machine{ + Properties: &armcontainerservice.MachineProperties{ + Tags: map[string]*string{ + flexNodeTagKey: new("true"), + }, + Kubernetes: buildK8sProfile(desired), + }, + } + agentPoolID := c.machineID.Parent + clusterID := agentPoolID.Parent + c.logger.Info("creating or updating AKS machine", "machine", c.machineID.Name, "pool", agentPoolID.Name) + poller, err := c.client.BeginCreateOrUpdate( + ctx, + c.machineID.ResourceGroupName, + clusterID.Name, + agentPoolID.Name, + c.machineID.Name, + params, + nil, + ) + if err != nil { + return nil, fmt.Errorf("begin create machine %q: %w", c.machineID.Name, err) + } + resp, err := poller.PollUntilDone(ctx, nil) + if err != nil { + return nil, fmt.Errorf("wait for machine %q: %w", c.machineID.Name, err) + } + if err := c.validateMachineIdentity(resp.Machine); err != nil { + return nil, err + } + result := machineFromARM(resp.Machine, desired) + result.ID = c.machineID.String() + result.Name = c.machineID.Name + return result, nil +} + +func (c *armMachineClient) Get(ctx context.Context) (*Machine, error) { + agentPoolID := c.machineID.Parent + clusterID := agentPoolID.Parent + resp, err := c.client.Get( + ctx, + c.machineID.ResourceGroupName, + clusterID.Name, + agentPoolID.Name, + c.machineID.Name, + nil, + ) + if isARMNotFound(err) { + return nil, &NotFoundError{Resource: c.machineID.String()} + } + if err != nil { + return nil, fmt.Errorf("get machine %q: %w", c.machineID.Name, err) + } + if err := c.validateMachineIdentity(resp.Machine); err != nil { + return nil, err + } + result := machineFromARM(resp.Machine, GoalState{}) + result.ID = c.machineID.String() + result.Name = c.machineID.Name + return result, nil +} + +func (c *armMachineClient) PatchStatus(context.Context, Status) error { + // TODO: implement this. + c.logger.Warn("skipping AKS machine status update; ARM Machine status is read-only") + return nil +} + +func machineResourceIDFromConfig(cfg *config.Config) (*arm.ResourceID, error) { + if cfg.Azure.TargetCluster.ResourceID == "" || cfg.Agent.NodeName == "" || cfg.Kubernetes.Version == "" { + return nil, fmt.Errorf("incomplete AKS machine config: clusterResourceId=%q machineName=%q kubernetesVersion=%q", + cfg.Azure.TargetCluster.ResourceID, cfg.Agent.NodeName, cfg.Kubernetes.Version) + } + machineResourceID := strings.TrimRight(cfg.Azure.TargetCluster.ResourceID, "/") + "/agentPools/" + aksFlexNodePoolName + "/machines/" + cfg.Agent.NodeName + machineID, err := arm.ParseResourceID(machineResourceID) + if err != nil { + return nil, fmt.Errorf("parse AKS machine resource ID %q: %w", machineResourceID, err) + } + return machineID, nil +} + +func getCredential(cfg *config.Config, logger *slog.Logger) (azcore.TokenCredential, error) { + switch { + case cfg.IsSPConfigured(): + logger.Debug( + "using service principal credential for ARM", + "tenantID", cfg.Azure.ServicePrincipal.TenantID, + "clientID", cfg.Azure.ServicePrincipal.ClientID, + ) + return azidentity.NewClientSecretCredential( + cfg.Azure.ServicePrincipal.TenantID, + cfg.Azure.ServicePrincipal.ClientID, + cfg.Azure.ServicePrincipal.ClientSecret, + nil, + ) + case cfg.IsMIConfigured(): + opts := &azidentity.ManagedIdentityCredentialOptions{} + if cfg.Azure.ManagedIdentity != nil && cfg.Azure.ManagedIdentity.ClientID != "" { + opts.ID = azidentity.ClientID(cfg.Azure.ManagedIdentity.ClientID) + logger.Debug( + "using user-assigned managed identity credential for ARM", + "clientID", cfg.Azure.ManagedIdentity.ClientID, + ) + } else { + logger.Debug("using system-assigned managed identity credential for ARM") + } + return azidentity.NewManagedIdentityCredential(opts) + default: + logger.Debug("falling back to default credential for ARM") + return azidentity.NewDefaultAzureCredential(nil) + } +} + +func buildK8sProfile(goal GoalState) *armcontainerservice.MachineKubernetesProfile { + p := &armcontainerservice.MachineKubernetesProfile{ + OrchestratorVersion: &goal.KubernetesVersion, + MaxPods: new(int32(goal.MaxPods)), //nolint:gosec // validated non-negative and small + NodeLabels: stringPointerMap(goal.NodeLabels), + NodeTaints: stringPointerSlice(goal.NodeTaints), + KubeletConfig: &armcontainerservice.KubeletConfig{ + ImageGcHighThreshold: new(int32(goal.KubeletConfig.ImageGCHighThreshold)), //nolint:gosec // validated non-negative and small + ImageGcLowThreshold: new(int32(goal.KubeletConfig.ImageGCLowThreshold)), //nolint:gosec // validated non-negative and small + }, + } + return p +} + +func stringPointerMap(values map[string]string) map[string]*string { + if values == nil { + return nil + } + result := make(map[string]*string, len(values)) + for k, v := range values { + value := v + result[k] = &value + } + return result +} + +func stringPointerSlice(values []string) []*string { + if values == nil { + return nil + } + result := make([]*string, len(values)) + for i, v := range values { + value := v + result[i] = &value + } + return result +} + +func (c *armMachineClient) validateMachineIdentity(machine armcontainerservice.Machine) error { + if machine.ID != nil && !strings.EqualFold(*machine.ID, c.machineID.String()) { + return fmt.Errorf("AKS machine ID mismatch: got %q, want %q", *machine.ID, c.machineID.String()) + } + if machine.Name != nil && *machine.Name != c.machineID.Name { + return fmt.Errorf("AKS machine name mismatch: got %q, want %q", *machine.Name, c.machineID.Name) + } + return nil +} + +func machineFromARM(machine armcontainerservice.Machine, fallback GoalState) *Machine { + result := &Machine{Goal: fallback} + if machine.Properties != nil { + if machine.Properties.Kubernetes != nil { + if machine.Properties.Kubernetes.OrchestratorVersion != nil { + result.Goal.KubernetesVersion = *machine.Properties.Kubernetes.OrchestratorVersion + } + if result.Goal.KubernetesVersion == "" && machine.Properties.Kubernetes.CurrentOrchestratorVersion != nil { + result.Goal.KubernetesVersion = *machine.Properties.Kubernetes.CurrentOrchestratorVersion + } + } + if result.Goal.SettingsVersion == "" { + result.Goal.SettingsVersion = result.Goal.KubernetesVersion + } + if machine.Properties.ProvisioningState != nil { + result.Status.ProvisioningState = ProvisioningState(*machine.Properties.ProvisioningState) + } + } + return result +} + +func isARMNotFound(err error) bool { + var respErr *azcore.ResponseError + return errors.As(err, &respErr) && respErr.StatusCode == http.StatusNotFound +} + +var _ MachineClient = (*armMachineClient)(nil) diff --git a/pkg/aksmachine/client_armapi_proxy.go b/pkg/aksmachine/client_armapi_proxy.go new file mode 100644 index 00000000..feb1f6e7 --- /dev/null +++ b/pkg/aksmachine/client_armapi_proxy.go @@ -0,0 +1,134 @@ +package aksmachine + +import ( + "context" + "fmt" + "log/slog" + "net/http" + "net/url" + "strings" + "time" + + "github.com/Azure/azure-sdk-for-go/sdk/azcore" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/arm" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/policy" + "github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/containerservice/armcontainerservice/v8" + + "github.com/Azure/AKSFlexNode/pkg/config" +) + +const ( + //nolint:gosec // Static dev-test token sent only to the configured ARM proxy, never to Azure. + fakeARMProxyBearerToken = "aks-flex-node-e2e" + fakeARMProxyTokenExpiresIn = time.Hour +) + +// newARMProxyClient returns an ARM Machine API client redirected to a dev-test proxy. +func newARMProxyClient(cfg *config.Config, logger *slog.Logger) (MachineClient, error) { + machineID, err := machineResourceIDFromConfig(cfg) + if err != nil { + return nil, err + } + transport, err := newARMProxyTransport(cfg.Agent.ARMProxyURLOverrideForE2E, nil) + if err != nil { + return nil, fmt.Errorf("configure ARM proxy override: %w", err) + } + logger.Warn("using dev-test ARM proxy URL override") + client, err := armcontainerservice.NewMachinesClient( + machineID.SubscriptionID, + staticARMProxyCredential{}, + &arm.ClientOptions{ + ClientOptions: policy.ClientOptions{ + Transport: transport, + }, + }) + if err != nil { + return nil, fmt.Errorf("create proxied machines client: %w", err) + } + return &armMachineClient{ + machineID: machineID, + client: client, + logger: logger, + }, nil +} + +type staticARMProxyCredential struct{} + +func (staticARMProxyCredential) GetToken(context.Context, policy.TokenRequestOptions) (azcore.AccessToken, error) { + return azcore.AccessToken{ + Token: fakeARMProxyBearerToken, + ExpiresOn: time.Now().Add(fakeARMProxyTokenExpiresIn), + }, nil +} + +type armProxyTransport struct { + proxy *url.URL + next policy.Transporter +} + +func newARMProxyTransport(proxyURL string, next policy.Transporter) (*armProxyTransport, error) { + parsed, err := url.Parse(proxyURL) + if err != nil { + return nil, err + } + if parsed.Scheme == "" || parsed.Host == "" { + return nil, fmt.Errorf("proxy URL must be absolute") + } + if parsed.Scheme != "http" && parsed.Scheme != "https" { + return nil, fmt.Errorf("proxy URL scheme must be http or https") + } + if next == nil { + next = roundTripperTransport{next: http.DefaultTransport} + } + return &armProxyTransport{proxy: parsed, next: next}, nil +} + +type roundTripperTransport struct { + next http.RoundTripper +} + +func (t roundTripperTransport) Do(req *http.Request) (*http.Response, error) { + return t.next.RoundTrip(req) +} + +func (t *armProxyTransport) Do(req *http.Request) (*http.Response, error) { + proxied := req.Clone(req.Context()) + proxied.URL = cloneURL(req.URL) + proxied.URL.Scheme = t.proxy.Scheme + proxied.URL.Host = t.proxy.Host + proxied.URL.Path = joinURLPath(t.proxy.Path, req.URL.Path) + proxied.URL.RawPath = "" + proxied.URL.RawQuery = mergeRawQuery(t.proxy.RawQuery, req.URL.RawQuery) + proxied.Host = t.proxy.Host + return t.next.Do(proxied) +} + +func cloneURL(u *url.URL) *url.URL { + cloned := *u + return &cloned +} + +func joinURLPath(prefix, path string) string { + prefix = strings.TrimRight(prefix, "/") + if prefix == "" { + if path == "" { + return "/" + } + return path + } + if path == "" || path == "/" { + return prefix + } + return prefix + "/" + strings.TrimLeft(path, "/") +} + +func mergeRawQuery(first, second string) string { + switch { + case first == "": + return second + case second == "": + return first + default: + return first + "&" + second + } +} diff --git a/pkg/aksmachine/client_armapi_proxy_test.go b/pkg/aksmachine/client_armapi_proxy_test.go new file mode 100644 index 00000000..cac379cd --- /dev/null +++ b/pkg/aksmachine/client_armapi_proxy_test.go @@ -0,0 +1,82 @@ +package aksmachine + +import ( + "context" + "io" + "net/http" + "strings" + "testing" + + "github.com/Azure/azure-sdk-for-go/sdk/azcore/policy" +) + +func TestARMProxyTransportRewritesRequest(t *testing.T) { + t.Parallel() + + transport, err := newARMProxyTransport("http://127.0.0.1:8080/proxy?proxy=true", roundTripFunc(func(req *http.Request) (*http.Response, error) { + if got, want := req.URL.String(), "http://127.0.0.1:8080/proxy/subscriptions/123/resourceGroups/rg?proxy=true&api-version=2026-01-01"; got != want { + t.Fatalf("proxied URL = %q, want %q", got, want) + } + if got, want := req.Host, "127.0.0.1:8080"; got != want { + t.Fatalf("Host = %q, want %q", got, want) + } + return &http.Response{StatusCode: http.StatusOK, Body: io.NopCloser(strings.NewReader("{}"))}, nil + })) + if err != nil { + t.Fatalf("newARMProxyTransport() error = %v", err) + } + + req, err := http.NewRequestWithContext(context.Background(), http.MethodGet, "https://management.azure.com/subscriptions/123/resourceGroups/rg?api-version=2026-01-01", nil) + if err != nil { + t.Fatalf("NewRequestWithContext() error = %v", err) + } + if _, err := transport.Do(req); err != nil { + t.Fatalf("Do() error = %v", err) + } + if got, want := req.URL.String(), "https://management.azure.com/subscriptions/123/resourceGroups/rg?api-version=2026-01-01"; got != want { + t.Fatalf("original URL = %q, want %q", got, want) + } +} + +func TestARMProxyTransportRejectsInvalidURL(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + proxyURL string + }{ + {name: "relative", proxyURL: "/proxy"}, + {name: "unsupported scheme", proxyURL: "ftp://127.0.0.1/proxy"}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + if _, err := newARMProxyTransport(tt.proxyURL, nil); err == nil { + t.Fatal("newARMProxyTransport() error = nil, want error") + } + }) + } +} + +func TestStaticARMProxyCredential(t *testing.T) { + t.Parallel() + + token, err := (staticARMProxyCredential{}).GetToken(context.Background(), policy.TokenRequestOptions{}) + if err != nil { + t.Fatalf("GetToken() error = %v", err) + } + if token.Token != fakeARMProxyBearerToken { + t.Fatalf("Token = %q, want %q", token.Token, fakeARMProxyBearerToken) + } + if token.ExpiresOn.IsZero() { + t.Fatal("ExpiresOn is zero") + } +} + +type roundTripFunc func(*http.Request) (*http.Response, error) + +func (f roundTripFunc) Do(req *http.Request) (*http.Response, error) { + return f(req) +} diff --git a/pkg/aksmachine/client_armapi_test.go b/pkg/aksmachine/client_armapi_test.go new file mode 100644 index 00000000..b90ce48a --- /dev/null +++ b/pkg/aksmachine/client_armapi_test.go @@ -0,0 +1,323 @@ +package aksmachine + +import ( + "strings" + "testing" + + "github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/containerservice/armcontainerservice/v8" + + "github.com/Azure/AKSFlexNode/pkg/config" +) + +const testClusterResourceID = "/subscriptions/12345678-1234-1234-1234-123456789012/resourceGroups/test-rg/providers/Microsoft.ContainerService/managedClusters/test-cluster" + +func TestMachineResourceIDFromConfig(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + cfg *config.Config + want string + wantErr string + }{ + { + name: "valid config", + cfg: testARMConfig( + testClusterResourceID, + "flex-node-1", + "1.34.0", + ), + want: testClusterResourceID + "/agentPools/aksflexnodes/machines/flex-node-1", + }, + { + name: "trims cluster resource slash", + cfg: testARMConfig( + testClusterResourceID+"/", + "flex-node-1", + "1.34.0", + ), + want: testClusterResourceID + "/agentPools/aksflexnodes/machines/flex-node-1", + }, + { + name: "missing cluster resource ID", + cfg: testARMConfig( + "", + "flex-node-1", + "1.34.0", + ), + wantErr: "incomplete AKS machine config", + }, + { + name: "missing node name", + cfg: testARMConfig( + testClusterResourceID, + "", + "1.34.0", + ), + wantErr: "incomplete AKS machine config", + }, + { + name: "missing Kubernetes version", + cfg: testARMConfig( + testClusterResourceID, + "flex-node-1", + "", + ), + wantErr: "incomplete AKS machine config", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + got, err := machineResourceIDFromConfig(tt.cfg) + if tt.wantErr != "" { + if err == nil || !strings.Contains(err.Error(), tt.wantErr) { + t.Fatalf("machineResourceIDFromConfig() error = %v, want containing %q", err, tt.wantErr) + } + return + } + if err != nil { + t.Fatalf("machineResourceIDFromConfig() error = %v", err) + } + if got.String() != tt.want { + t.Fatalf("machineResourceIDFromConfig() = %q, want %q", got.String(), tt.want) + } + if got.Parent == nil || got.Parent.Name != aksFlexNodePoolName { + t.Fatalf("agent pool parent = %#v, want name %q", got.Parent, aksFlexNodePoolName) + } + if got.Parent.Parent == nil || got.Parent.Parent.Name != "test-cluster" { + t.Fatalf("cluster parent = %#v, want name test-cluster", got.Parent.Parent) + } + }) + } +} + +func TestBuildK8sProfile(t *testing.T) { + t.Parallel() + + profile := buildK8sProfile(GoalState{ + KubernetesVersion: "1.35.1", + MaxPods: 42, + NodeLabels: map[string]string{"workload": "flex"}, + NodeTaints: []string{"dedicated=flex:NoSchedule"}, + KubeletConfig: KubeletConfig{ + ImageGCHighThreshold: 85, + ImageGCLowThreshold: 80, + }, + }) + if profile.OrchestratorVersion == nil || *profile.OrchestratorVersion != "1.35.1" { + t.Fatalf("OrchestratorVersion = %v, want 1.35.1", profile.OrchestratorVersion) + } + if profile.MaxPods == nil || *profile.MaxPods != 42 { + t.Fatalf("MaxPods = %v, want 42", profile.MaxPods) + } + if got := profile.NodeLabels["workload"]; got == nil || *got != "flex" { + t.Fatalf("NodeLabels[workload] = %v, want flex", got) + } + if len(profile.NodeTaints) != 1 || profile.NodeTaints[0] == nil || *profile.NodeTaints[0] != "dedicated=flex:NoSchedule" { + t.Fatalf("NodeTaints = %#v, want dedicated=flex:NoSchedule", profile.NodeTaints) + } + if profile.KubeletConfig == nil { + t.Fatal("KubeletConfig is nil") + } + if profile.KubeletConfig.ImageGcHighThreshold == nil || *profile.KubeletConfig.ImageGcHighThreshold != 85 { + t.Fatalf("ImageGcHighThreshold = %v, want 85", profile.KubeletConfig.ImageGcHighThreshold) + } + if profile.KubeletConfig.ImageGcLowThreshold == nil || *profile.KubeletConfig.ImageGcLowThreshold != 80 { + t.Fatalf("ImageGcLowThreshold = %v, want 80", profile.KubeletConfig.ImageGcLowThreshold) + } +} + +func TestGoalStateValidate(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + goal GoalState + wantErr string + }{ + { + name: "valid", + goal: GoalState{KubernetesVersion: "1.35.1"}, + }, + { + name: "missing Kubernetes version", + goal: GoalState{}, + wantErr: "kubernetes version is empty", + }, + { + name: "negative max pods", + goal: GoalState{KubernetesVersion: "1.35.1", MaxPods: -1}, + wantErr: "max pods must be non-negative", + }, + { + name: "negative image GC high threshold", + goal: GoalState{ + KubernetesVersion: "1.35.1", + KubeletConfig: KubeletConfig{ + ImageGCHighThreshold: -1, + }, + }, + wantErr: "image GC high threshold must be non-negative", + }, + { + name: "negative image GC low threshold", + goal: GoalState{ + KubernetesVersion: "1.35.1", + KubeletConfig: KubeletConfig{ + ImageGCLowThreshold: -1, + }, + }, + wantErr: "image GC low threshold must be non-negative", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + err := tt.goal.validate() + if tt.wantErr != "" { + if err == nil || !strings.Contains(err.Error(), tt.wantErr) { + t.Fatalf("validate() error = %v, want containing %q", err, tt.wantErr) + } + return + } + if err != nil { + t.Fatalf("validate() error = %v", err) + } + }) + } +} + +func TestMachineFromARM(t *testing.T) { + t.Parallel() + + orchestratorVersion := "1.35.1" + provisioningState := "Succeeded" + machine := machineFromARM(armcontainerservice.Machine{ + Properties: &armcontainerservice.MachineProperties{ + Kubernetes: &armcontainerservice.MachineKubernetesProfile{ + OrchestratorVersion: &orchestratorVersion, + }, + ProvisioningState: &provisioningState, + }, + }, GoalState{SettingsVersion: "fallback-settings"}) + + if machine.ID != "" { + t.Fatalf("ID = %q, want empty", machine.ID) + } + if machine.Name != "" { + t.Fatalf("Name = %q, want empty", machine.Name) + } + if machine.Goal.KubernetesVersion != orchestratorVersion { + t.Fatalf("KubernetesVersion = %q, want %q", machine.Goal.KubernetesVersion, orchestratorVersion) + } + if machine.Goal.SettingsVersion != "fallback-settings" { + t.Fatalf("SettingsVersion = %q, want fallback-settings", machine.Goal.SettingsVersion) + } + if machine.Status.ProvisioningState != ProvisioningStateSucceeded { + t.Fatalf("ProvisioningState = %q, want %q", machine.Status.ProvisioningState, ProvisioningStateSucceeded) + } +} + +func TestValidateMachineIdentity(t *testing.T) { + t.Parallel() + + machineID, err := machineResourceIDFromConfig(testARMConfig(testClusterResourceID, "flex-node-1", "1.34.0")) + if err != nil { + t.Fatalf("machineResourceIDFromConfig() error = %v", err) + } + client := &armMachineClient{machineID: machineID} + + tests := []struct { + name string + machine armcontainerservice.Machine + wantErr string + }{ + { + name: "matching identity", + machine: armcontainerservice.Machine{ + ID: ptr(machineID.String()), + Name: ptr("flex-node-1"), + }, + }, + { + name: "missing remote identity is allowed", + machine: armcontainerservice.Machine{}, + }, + { + name: "ID mismatch", + machine: armcontainerservice.Machine{ + ID: ptr(testClusterResourceID + "/agentPools/aksflexnodes/machines/other-node"), + }, + wantErr: "AKS machine ID mismatch", + }, + { + name: "name mismatch", + machine: armcontainerservice.Machine{ + Name: ptr("other-node"), + }, + wantErr: "AKS machine name mismatch", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + err := client.validateMachineIdentity(tt.machine) + if tt.wantErr != "" { + if err == nil || !strings.Contains(err.Error(), tt.wantErr) { + t.Fatalf("validateMachineIdentity() error = %v, want containing %q", err, tt.wantErr) + } + return + } + if err != nil { + t.Fatalf("validateMachineIdentity() error = %v", err) + } + }) + } +} + +func TestMachineFromARMUsesCurrentOrchestratorVersionFallback(t *testing.T) { + t.Parallel() + + currentVersion := "1.35.2" + machine := machineFromARM(armcontainerservice.Machine{ + Properties: &armcontainerservice.MachineProperties{ + Kubernetes: &armcontainerservice.MachineKubernetesProfile{ + CurrentOrchestratorVersion: ¤tVersion, + }, + }, + }, GoalState{}) + + if machine.Goal.KubernetesVersion != currentVersion { + t.Fatalf("KubernetesVersion = %q, want %q", machine.Goal.KubernetesVersion, currentVersion) + } + if machine.Goal.SettingsVersion != currentVersion { + t.Fatalf("SettingsVersion = %q, want %q", machine.Goal.SettingsVersion, currentVersion) + } +} + +func testARMConfig(clusterResourceID, nodeName, kubernetesVersion string) *config.Config { + return &config.Config{ + Azure: config.AzureConfig{ + TargetCluster: &config.TargetClusterConfig{ + ResourceID: clusterResourceID, + }, + }, + Agent: config.AgentConfig{ + NodeName: nodeName, + }, + Kubernetes: config.KubernetesConfig{ + Version: kubernetesVersion, + }, + } +} + +func ptr[T any](v T) *T { + return &v +} diff --git a/pkg/aksmachine/client_local.go b/pkg/aksmachine/client_local.go new file mode 100644 index 00000000..37322b55 --- /dev/null +++ b/pkg/aksmachine/client_local.go @@ -0,0 +1,118 @@ +//go:build local_e2e + +package aksmachine + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "log/slog" + "os" + "path/filepath" + + "github.com/Azure/AKSFlexNode/pkg/config" + "github.com/Azure/AKSFlexNode/pkg/utils/utilio" +) + +const fileMode = 0o600 + +const localResourceID = "local-test-machine" + +// e2eMachineFilePath is the well-known local machine file used by e2e daemon +// mode and by e2ehelper when simulating AKS RP machine changes. +const e2eMachineFilePath = "/run/aks-flex-node/e2e-machine.json" + +// LocalClient implements MachineClient with a JSON file. It is compiled only +// with the local_e2e build tag so GitHub E2E tests can simulate the AKS RP +// machine API by mutating local disk state instead of calling ARM. +// +// The file stores the local Machine JSON shape and always uses +// "local-test-machine" as the created machine ID. Tests can create or replace +// goal state by writing a Machine with Goal.KubernetesVersion and +// Goal.SettingsVersion, patch status by updating Machine.Status, and simulate +// an ARM 404 by deleting the file. Missing files are returned as NotFoundError +// so reset/delete flows can treat local deletion like a missing remote machine. +// +// This client is intentionally not a production AKS RP implementation; it only +// exists to let E2E tests validate daemon behavior without a live Machine API. +type LocalClient struct { + path string +} + +// newLocalClient creates a file-backed MachineClient rooted at path. The client +// stores the full Machine payload in JSON and treats a missing file as a missing +// AKS machine resource. +func newLocalClient(path string) (*LocalClient, error) { + if path == "" { + return nil, fmt.Errorf("machine file path is empty") + } + if err := os.MkdirAll(filepath.Dir(path), 0o700); err != nil { + return nil, fmt.Errorf("create local machine file directory: %w", err) + } + return &LocalClient{path: path}, nil +} + +func (c *LocalClient) Create(_ context.Context, desired GoalState) (*Machine, error) { + machine := &Machine{ID: localResourceID, Goal: desired} + if err := c.write(machine); err != nil { + return nil, err + } + return machine, nil +} + +func (c *LocalClient) Get(context.Context) (*Machine, error) { + return c.read() +} + +func (c *LocalClient) PatchStatus(_ context.Context, status Status) error { + machine, err := c.read() + if err != nil { + return err + } + machine.Status = status + return c.write(machine) +} + +func (c *LocalClient) read() (*Machine, error) { + data, err := os.ReadFile(filepath.Clean(c.path)) + if errors.Is(err, os.ErrNotExist) { + return nil, &NotFoundError{Resource: c.path} + } + if err != nil { + return nil, fmt.Errorf("read machine file %s: %w", c.path, err) + } + + var machine Machine + if err := json.Unmarshal(data, &machine); err != nil { + return nil, fmt.Errorf("decode machine file %s: %w", c.path, err) + } + return &machine, nil +} + +func (c *LocalClient) write(machine *Machine) error { + if machine == nil { + return fmt.Errorf("machine is nil") + } + data, err := json.MarshalIndent(machine, "", " ") + if err != nil { + return fmt.Errorf("marshal machine: %w", err) + } + data = append(data, '\n') + if err := utilio.WriteFile(c.path, data, fileMode); err != nil { + return fmt.Errorf("write machine file %s: %w", c.path, err) + } + return nil +} + +var _ MachineClient = (*LocalClient)(nil) + +// NewMachineClient creates a MachineClient instance from config. +func NewMachineClient(cfg *config.Config, logger *slog.Logger) (MachineClient, error) { + if cfg.Agent.E2EMode { + logger.Info("using local file-backed machine client for e2e testing", "path", e2eMachineFilePath) + return newLocalClient(e2eMachineFilePath) + } + + return newMachineClientFromConfig(cfg, logger) +} diff --git a/pkg/aksmachine/local/client_test.go b/pkg/aksmachine/client_local_test.go similarity index 54% rename from pkg/aksmachine/local/client_test.go rename to pkg/aksmachine/client_local_test.go index 017f6255..3e92aebb 100644 --- a/pkg/aksmachine/local/client_test.go +++ b/pkg/aksmachine/client_local_test.go @@ -1,4 +1,6 @@ -package local +//go:build local_e2e + +package aksmachine import ( "context" @@ -7,20 +9,18 @@ import ( "os" "path/filepath" "testing" - - "github.com/Azure/AKSFlexNode/pkg/aksmachine" ) -func TestClientCreateGetAndPatchStatus(t *testing.T) { +func TestLocalClientCreateGetAndPatchStatus(t *testing.T) { t.Parallel() path := filepath.Join(t.TempDir(), "machine.json") - client, err := NewClient(path) + client, err := newLocalClient(path) if err != nil { - t.Fatalf("NewClient: %v", err) + t.Fatalf("newLocalClient: %v", err) } - created, err := client.Create(context.Background(), aksmachine.GoalState{KubernetesVersion: "1.34.0", SettingsVersion: "42"}) + created, err := client.Create(context.Background(), GoalState{KubernetesVersion: "1.34.0", SettingsVersion: "42"}) if err != nil { t.Fatalf("Create: %v", err) } @@ -31,7 +31,7 @@ func TestClientCreateGetAndPatchStatus(t *testing.T) { t.Fatalf("created ID = %q", created.ID) } - if err := client.PatchStatus(context.Background(), aksmachine.Status{ProvisioningState: aksmachine.ProvisioningStateSucceeded, ObservedSettingsVersion: "42"}); err != nil { + if err := client.PatchStatus(context.Background(), Status{ProvisioningState: ProvisioningStateSucceeded, ObservedSettingsVersion: "42"}); err != nil { t.Fatalf("PatchStatus: %v", err) } @@ -39,38 +39,38 @@ func TestClientCreateGetAndPatchStatus(t *testing.T) { if err != nil { t.Fatalf("Get: %v", err) } - if got.Goal.KubernetesVersion != "1.34.0" || got.Status.ProvisioningState != aksmachine.ProvisioningStateSucceeded { + if got.Goal.KubernetesVersion != "1.34.0" || got.Status.ProvisioningState != ProvisioningStateSucceeded { t.Fatalf("got machine = %#v", got) } } -func TestClientGetNotFound(t *testing.T) { +func TestLocalClientGetNotFound(t *testing.T) { t.Parallel() - client, err := NewClient(filepath.Join(t.TempDir(), "missing.json")) + client, err := newLocalClient(filepath.Join(t.TempDir(), "missing.json")) if err != nil { - t.Fatalf("NewClient: %v", err) + t.Fatalf("newLocalClient: %v", err) } _, err = client.Get(context.Background()) - var notFound *aksmachine.NotFoundError + var notFound *NotFoundError if !errors.As(err, ¬Found) { t.Fatalf("Get error = %v, want NotFoundError", err) } } -func TestClientReadsExternalMutation(t *testing.T) { +func TestLocalClientReadsExternalMutation(t *testing.T) { t.Parallel() path := filepath.Join(t.TempDir(), "machine.json") - client, err := NewClient(path) + client, err := newLocalClient(path) if err != nil { - t.Fatalf("NewClient: %v", err) + t.Fatalf("newLocalClient: %v", err) } - if _, err := client.Create(context.Background(), aksmachine.GoalState{KubernetesVersion: "1.34.0", SettingsVersion: "42"}); err != nil { + if _, err := client.Create(context.Background(), GoalState{KubernetesVersion: "1.34.0", SettingsVersion: "42"}); err != nil { t.Fatalf("Create: %v", err) } - mutated := aksmachine.Machine{Goal: aksmachine.GoalState{KubernetesVersion: "1.35.0", SettingsVersion: "43"}} + mutated := Machine{Goal: GoalState{KubernetesVersion: "1.35.0", SettingsVersion: "43"}} data, err := json.Marshal(mutated) if err != nil { t.Fatalf("Marshal: %v", err) diff --git a/pkg/aksmachine/client_nonlocal.go b/pkg/aksmachine/client_nonlocal.go new file mode 100644 index 00000000..81da88de --- /dev/null +++ b/pkg/aksmachine/client_nonlocal.go @@ -0,0 +1,18 @@ +//go:build !local_e2e + +package aksmachine + +import ( + "log/slog" + + "github.com/Azure/AKSFlexNode/pkg/config" +) + +// NewMachineClient creates a MachineClient instance from config. +func NewMachineClient(cfg *config.Config, logger *slog.Logger) (MachineClient, error) { + if cfg.Agent.E2EMode { + logger.Warn("local e2e mode client is not supported in current build") + } + + return newMachineClientFromConfig(cfg, logger) +} diff --git a/pkg/aksmachine/ensure.go b/pkg/aksmachine/ensure.go index 98e8655d..6314de9e 100644 --- a/pkg/aksmachine/ensure.go +++ b/pkg/aksmachine/ensure.go @@ -5,210 +5,37 @@ import ( "errors" "fmt" "log/slog" - "net/http" - "github.com/Azure/azure-sdk-for-go/sdk/azcore" - "github.com/Azure/azure-sdk-for-go/sdk/azcore/arm" - "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" - "github.com/Azure/azure-sdk-for-go/sdk/azidentity" - "github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/containerservice/armcontainerservice/v8" - - "github.com/Azure/AKSFlexNode/pkg/config" "github.com/Azure/unbounded/pkg/agent/phases" ) -const ( - aksFlexNodePoolName = "aksflexnodes" - flexNodeTagKey = "aks-flex-node" -) - type ensureMachineTask struct { - cfg *config.Config - logger *slog.Logger + machines MachineClient + goal GoalState + logger *slog.Logger } -// EnsureMachine returns a task that ensures the AKS "aksflexnodes" agent pool -// (mode=Machines) exists and this machine is registered in it. -func EnsureMachine(cfg *config.Config, logger *slog.Logger) phases.Task { - return &ensureMachineTask{cfg: cfg, logger: logger} +// EnsureMachine returns a task that ensures this machine is registered in AKS. +func EnsureMachine(machines MachineClient, goal GoalState, logger *slog.Logger) phases.Task { + return &ensureMachineTask{machines: machines, goal: goal, logger: logger} } func (t *ensureMachineTask) Name() string { return "ensure-machine" } func (t *ensureMachineTask) Do(ctx context.Context) error { - subID := t.cfg.Azure.TargetCluster.SubscriptionID - rg := t.cfg.Azure.TargetCluster.ResourceGroup - clusterName := t.cfg.Azure.TargetCluster.Name - machineName := t.cfg.Agent.NodeName // TODO: add support for overriding machine name in config - k8sVersion := t.cfg.Kubernetes.Version - - if subID == "" || rg == "" || clusterName == "" || machineName == "" || k8sVersion == "" { - return fmt.Errorf("ensure-machine: incomplete config: subscriptionId=%q resourceGroup=%q clusterName=%q machineName=%q kubernetesVersion=%q", - subID, rg, clusterName, machineName, k8sVersion) - } - - cred, err := t.getCredential() - if err != nil { - return fmt.Errorf("ensure-machine: resolve credential: %w", err) - } - - var armOpts *arm.ClientOptions // nil = default public ARM endpoint - - // Step 1: ensure agent pool exists. - if err := t.ensureAgentPool(ctx, cred, armOpts, subID, rg, clusterName); err != nil { - return fmt.Errorf("ensure-machine: ensure agent pool: %w", err) - } - - // Step 2: ensure this machine is registered. - if err := t.ensureMachineResource(ctx, cred, armOpts, subID, rg, clusterName, machineName, k8sVersion); err != nil { - return fmt.Errorf("ensure-machine: ensure machine: %w", err) - } - - return nil -} - -func (t *ensureMachineTask) getCredential() (azcore.TokenCredential, error) { - cfg := t.cfg - if cfg.IsSPConfigured() { - return azidentity.NewClientSecretCredential( - cfg.Azure.ServicePrincipal.TenantID, - cfg.Azure.ServicePrincipal.ClientID, - cfg.Azure.ServicePrincipal.ClientSecret, - nil, - ) - } - if cfg.IsMIConfigured() { - opts := &azidentity.ManagedIdentityCredentialOptions{} - if cfg.Azure.ManagedIdentity != nil && cfg.Azure.ManagedIdentity.ClientID != "" { - opts.ID = azidentity.ClientID(cfg.Azure.ManagedIdentity.ClientID) - } - return azidentity.NewManagedIdentityCredential(opts) - } - return azidentity.NewAzureCLICredential(nil) -} - -func (t *ensureMachineTask) ensureAgentPool( - ctx context.Context, - cred azcore.TokenCredential, - armOpts *arm.ClientOptions, - subID, rg, clusterName string, -) error { - client, err := armcontainerservice.NewAgentPoolsClient(subID, cred, armOpts) - if err != nil { - return fmt.Errorf("create agent pools client: %w", err) - } - - // Skip if already exists. - if _, err := client.Get(ctx, rg, clusterName, aksFlexNodePoolName, nil); err == nil { - t.logger.Info("agent pool already exists, skipping", "pool", aksFlexNodePoolName) + if _, err := t.machines.Get(ctx); err == nil { + t.logger.Info("machine already registered, skipping") + // The prior run may have registered the machine before node startup failed; + // continue so startup can retry the node and status update later. return nil - } else if !isARMNotFound(err) { - return fmt.Errorf("get agent pool %q: %w", aksFlexNodePoolName, err) - } - - mode := armcontainerservice.AgentPoolMode("Machines") - params := armcontainerservice.AgentPool{ - Properties: &armcontainerservice.ManagedClusterAgentPoolProfileProperties{ - Mode: &mode, - }, - } - - t.logger.Info("creating agent pool", "pool", aksFlexNodePoolName, "cluster", clusterName) - poller, err := client.BeginCreateOrUpdate(ctx, rg, clusterName, aksFlexNodePoolName, params, nil) - if err != nil { - return fmt.Errorf("begin create agent pool %q: %w", aksFlexNodePoolName, err) - } - if _, err = poller.PollUntilDone(ctx, nil); err != nil { - return fmt.Errorf("wait for agent pool %q: %w", aksFlexNodePoolName, err) - } - - t.logger.Info("agent pool created", "pool", aksFlexNodePoolName) - return nil -} - -func (t *ensureMachineTask) ensureMachineResource( - ctx context.Context, - cred azcore.TokenCredential, - armOpts *arm.ClientOptions, - subID, rg, clusterName, machineName, k8sVersion string, -) error { - client, err := armcontainerservice.NewMachinesClient(subID, cred, armOpts) - if err != nil { - return fmt.Errorf("create machines client: %w", err) - } - - // Skip if already exists. - if _, err := client.Get(ctx, rg, clusterName, aksFlexNodePoolName, machineName, nil); err == nil { - t.logger.Info("machine already registered, skipping", "machine", machineName) - return nil - } else if !isARMNotFound(err) { - return fmt.Errorf("get machine %q: %w", machineName, err) - } - - params := armcontainerservice.Machine{ - Properties: &armcontainerservice.MachineProperties{ - Tags: map[string]*string{ - flexNodeTagKey: to.Ptr("true"), - }, - Kubernetes: t.buildK8sProfile(k8sVersion), - }, - } - - t.logger.Info("registering machine", "machine", machineName, "pool", aksFlexNodePoolName) - poller, err := client.BeginCreateOrUpdate(ctx, rg, clusterName, aksFlexNodePoolName, machineName, params, nil) - if err != nil { - return fmt.Errorf("begin create machine %q: %w", machineName, err) - } - if _, err = poller.PollUntilDone(ctx, nil); err != nil { - return fmt.Errorf("wait for machine %q: %w", machineName, err) - } - - t.logger.Info("machine registered", "machine", machineName) - return nil -} - -func (t *ensureMachineTask) buildK8sProfile(k8sVersion string) *armcontainerservice.MachineKubernetesProfile { - cfg := t.cfg - p := &armcontainerservice.MachineKubernetesProfile{} - - if k8sVersion != "" { - p.OrchestratorVersion = to.Ptr(k8sVersion) - } - if cfg.Node.MaxPods > 0 { - p.MaxPods = to.Ptr(int32(cfg.Node.MaxPods)) //nolint:gosec // max pods is always a small positive int - } - if len(cfg.Node.Labels) > 0 { - p.NodeLabels = make(map[string]*string, len(cfg.Node.Labels)) - for k, v := range cfg.Node.Labels { - p.NodeLabels[k] = to.Ptr(v) - } - } - if len(cfg.Node.Taints) > 0 { - p.NodeTaints = make([]*string, len(cfg.Node.Taints)) - for i, taint := range cfg.Node.Taints { - p.NodeTaints[i] = to.Ptr(taint) - } - } - - // Image GC thresholds. - if h := cfg.Node.Kubelet.ImageGCHighThreshold; h > 0 { - if p.KubeletConfig == nil { - p.KubeletConfig = &armcontainerservice.KubeletConfig{} + } else { + var notFound *NotFoundError + if !errors.As(err, ¬Found) { + return fmt.Errorf("ensure-machine: get machine: %w", err) } - p.KubeletConfig.ImageGcHighThreshold = to.Ptr(int32(h)) //nolint:gosec // threshold is always small } - if l := cfg.Node.Kubelet.ImageGCLowThreshold; l > 0 { - if p.KubeletConfig == nil { - p.KubeletConfig = &armcontainerservice.KubeletConfig{} - } - p.KubeletConfig.ImageGcLowThreshold = to.Ptr(int32(l)) //nolint:gosec // threshold is always small + if _, err := t.machines.Create(ctx, t.goal); err != nil { + return fmt.Errorf("ensure-machine: create machine: %w", err) } - - return p -} - -// isARMNotFound reports whether the Azure SDK error is an HTTP 404. -func isARMNotFound(err error) bool { - var respErr *azcore.ResponseError - return errors.As(err, &respErr) && respErr.StatusCode == http.StatusNotFound + return nil } diff --git a/pkg/aksmachine/local/README.md b/pkg/aksmachine/local/README.md deleted file mode 100644 index c5a9d46a..00000000 --- a/pkg/aksmachine/local/README.md +++ /dev/null @@ -1,89 +0,0 @@ -# Local AKS Machine Backend - -This package provides a file-backed implementation of `aksmachine.MachineClient` for e2e tests. - -It is not a production AKS RP or ARM implementation. Production code should use an SDK-backed client once the AKS machine ARM contract is available in the public Azure SDK. This local backend exists so e2e tests can simulate AKS RP machine goal-state changes by mutating a JSON file on disk. - -## Purpose - -The local backend lets tests validate agent behavior without depending on a live AKS RP endpoint. - -It supports: - -- Creating a local machine JSON file with a desired Kubernetes version and settings version. -- Reading the current local machine representation. -- Patching machine status. -- Simulating machine deletion by deleting the file. - -When the file does not exist, `Get` returns `*aksmachine.NotFoundError`. This matches the reset/delete design where the agent treats a missing machine representation as an ARM 404 equivalent. - -## File Format - -The file stores the local `aksmachine.Machine` JSON shape: - -```json -{ - "id": "local-test-machine", - "goal": { - "kubernetesVersion": "1.34.0", - "settingsVersion": "42" - }, - "status": { - "provisioningState": "Succeeded", - "observedSettingsVersion": "42", - "message": "" - } -} -``` - -The local backend always uses `local-test-machine` as the resource ID when creating the file. - -## CLI Usage - -Use the dedicated e2e helper binary, not the main `aks-flex-node` binary. - -Build it with: - -```bash -make build-e2ehelper -``` - -Create or replace the local machine goal state: - -```bash -./e2ehelper local-machine create \ - --path /tmp/aks-machine.json \ - --kubernetes-version 1.34.0 \ - --settings-version 42 -``` - -Read the current machine file: - -```bash -./e2ehelper local-machine get --path /tmp/aks-machine.json -``` - -Patch status: - -```bash -./e2ehelper local-machine status \ - --path /tmp/aks-machine.json \ - --provisioning-state Succeeded \ - --observed-settings-version 42 \ - --message "applied" -``` - -Delete the local machine representation: - -```bash -./e2ehelper local-machine delete --path /tmp/aks-machine.json -``` - -## E2E Pattern - -An e2e test can point the agent at this file-backed client, then trigger control-plane-like changes by mutating the file: - -- Upgrade/reimage/rollback: update `goal.kubernetesVersion` and `goal.settingsVersion`, then trigger the Kubernetes node signal used by the scenario. -- Reset/delete: delete the file to simulate the ARM machine 404, then trigger the Kubernetes node annotation used by the scenario. - -The backend writes JSON atomically through `utilio.WriteFile`, but e2e tests may also directly overwrite or delete the file when simulating external AKS RP behavior. diff --git a/pkg/aksmachine/local/client.go b/pkg/aksmachine/local/client.go deleted file mode 100644 index 891a1dd8..00000000 --- a/pkg/aksmachine/local/client.go +++ /dev/null @@ -1,89 +0,0 @@ -package local - -import ( - "context" - "encoding/json" - "errors" - "fmt" - "os" - "path/filepath" - - "github.com/Azure/AKSFlexNode/pkg/aksmachine" - "github.com/Azure/AKSFlexNode/pkg/utils/utilio" -) - -const fileMode = 0o600 - -const localResourceID = "local-test-machine" - -// E2EMachineFilePath is the well-known local machine file used by e2e daemon -// mode and by e2ehelper when simulating AKS RP machine changes. -const E2EMachineFilePath = "/run/aks-flex-node/e2e-machine.json" - -// Client implements aksmachine.MachineClient with a JSON file. It is intended -// for e2e tests that simulate AKS RP by mutating local disk state. -type Client struct { - path string -} - -func NewClient(path string) (*Client, error) { - if path == "" { - return nil, fmt.Errorf("machine file path is empty") - } - if err := os.MkdirAll(filepath.Dir(path), 0o700); err != nil { - return nil, fmt.Errorf("create local machine file directory: %w", err) - } - return &Client{path: path}, nil -} - -func (c *Client) Create(_ context.Context, desired aksmachine.GoalState) (*aksmachine.Machine, error) { - machine := &aksmachine.Machine{ID: localResourceID, Goal: desired} - if err := c.write(machine); err != nil { - return nil, err - } - return machine, nil -} - -func (c *Client) Get(context.Context) (*aksmachine.Machine, error) { - return c.read() -} - -func (c *Client) PatchStatus(_ context.Context, status aksmachine.Status) error { - machine, err := c.read() - if err != nil { - return err - } - machine.Status = status - return c.write(machine) -} - -func (c *Client) read() (*aksmachine.Machine, error) { - data, err := os.ReadFile(filepath.Clean(c.path)) - if errors.Is(err, os.ErrNotExist) { - return nil, &aksmachine.NotFoundError{Resource: c.path} - } - if err != nil { - return nil, fmt.Errorf("read machine file %s: %w", c.path, err) - } - - var machine aksmachine.Machine - if err := json.Unmarshal(data, &machine); err != nil { - return nil, fmt.Errorf("decode machine file %s: %w", c.path, err) - } - return &machine, nil -} - -func (c *Client) write(machine *aksmachine.Machine) error { - if machine == nil { - return fmt.Errorf("machine is nil") - } - data, err := json.MarshalIndent(machine, "", " ") - if err != nil { - return fmt.Errorf("marshal machine: %w", err) - } - data = append(data, '\n') - if err := utilio.WriteFile(c.path, data, fileMode); err != nil { - return fmt.Errorf("write machine file %s: %w", c.path, err) - } - return nil -} diff --git a/pkg/aksmachine/types.go b/pkg/aksmachine/types.go index e10929dc..daae26af 100644 --- a/pkg/aksmachine/types.go +++ b/pkg/aksmachine/types.go @@ -3,14 +3,62 @@ package aksmachine import ( "context" "fmt" + "maps" + "slices" + + "github.com/Azure/AKSFlexNode/pkg/config" ) // GoalState is the local agent representation of ARM machine desired settings. // Keep this type independent from the public Azure SDK shape; adapt the SDK // payload to this model when the ARM contract is finalized. type GoalState struct { - KubernetesVersion string `json:"kubernetesVersion,omitempty"` - SettingsVersion string `json:"settingsVersion,omitempty"` + KubernetesVersion string `json:"kubernetesVersion,omitempty"` + SettingsVersion string `json:"settingsVersion,omitempty"` + MaxPods int `json:"maxPods,omitempty"` + NodeLabels map[string]string `json:"nodeLabels,omitempty"` + NodeTaints []string `json:"nodeTaints,omitempty"` + KubeletConfig KubeletConfig `json:"kubeletConfig"` +} + +type KubeletConfig struct { + ImageGCHighThreshold int `json:"imageGCHighThreshold,omitempty"` + ImageGCLowThreshold int `json:"imageGCLowThreshold,omitempty"` +} + +func (g GoalState) validate() error { + if g.KubernetesVersion == "" { + return fmt.Errorf("kubernetes version is empty") + } + if g.MaxPods < 0 { + return fmt.Errorf("max pods must be non-negative") + } + if g.KubeletConfig.ImageGCHighThreshold < 0 { + return fmt.Errorf("image GC high threshold must be non-negative") + } + if g.KubeletConfig.ImageGCLowThreshold < 0 { + return fmt.Errorf("image GC low threshold must be non-negative") + } + return nil +} + +// GoalStateFromConfig builds and validates the initial AKS machine goal state +// from local agent configuration. +func GoalStateFromConfig(cfg *config.Config) (GoalState, error) { + goal := GoalState{ + KubernetesVersion: cfg.Kubernetes.Version, + MaxPods: cfg.Node.MaxPods, + NodeLabels: maps.Clone(cfg.Node.Labels), + NodeTaints: slices.Clone(cfg.Node.Taints), + KubeletConfig: KubeletConfig{ + ImageGCHighThreshold: cfg.Node.Kubelet.ImageGCHighThreshold, + ImageGCLowThreshold: cfg.Node.Kubelet.ImageGCLowThreshold, + }, + } + if err := goal.validate(); err != nil { + return GoalState{}, err + } + return goal, nil } type ProvisioningState string diff --git a/pkg/aksmachine/types_test.go b/pkg/aksmachine/types_test.go index e0854643..9377fb69 100644 --- a/pkg/aksmachine/types_test.go +++ b/pkg/aksmachine/types_test.go @@ -2,6 +2,7 @@ package aksmachine import ( "context" + "strings" "testing" ) @@ -16,3 +17,65 @@ func TestFakeMachineClientImplementsInterface(t *testing.T) { var _ MachineClient = fakeMachineClient{} } + +func TestGoalStateFromConfig(t *testing.T) { + t.Parallel() + + cfg := testARMConfig(testClusterResourceID, "flex-node-1", "1.35.1") + cfg.Node.MaxPods = 42 + cfg.Node.Labels = map[string]string{ + "workload": "flex", + "zone": "edge", + } + cfg.Node.Taints = []string{ + "dedicated=flex:NoSchedule", + "edge=true:NoExecute", + } + cfg.Node.Kubelet.ImageGCHighThreshold = 85 + cfg.Node.Kubelet.ImageGCLowThreshold = 80 + + goal, err := GoalStateFromConfig(cfg) + if err != nil { + t.Fatalf("GoalStateFromConfig() error = %v", err) + } + if goal.KubernetesVersion != "1.35.1" { + t.Fatalf("KubernetesVersion = %q, want 1.35.1", goal.KubernetesVersion) + } + if goal.MaxPods != 42 { + t.Fatalf("MaxPods = %d, want 42", goal.MaxPods) + } + if len(goal.NodeLabels) != 2 { + t.Fatalf("NodeLabels length = %d, want 2", len(goal.NodeLabels)) + } + if got := goal.NodeLabels["workload"]; got != "flex" { + t.Fatalf("NodeLabels[workload] = %v, want flex", got) + } + if got := goal.NodeLabels["zone"]; got != "edge" { + t.Fatalf("NodeLabels[zone] = %v, want edge", got) + } + if len(goal.NodeTaints) != 2 { + t.Fatalf("NodeTaints length = %d, want 2", len(goal.NodeTaints)) + } + if goal.NodeTaints[0] != "dedicated=flex:NoSchedule" { + t.Fatalf("NodeTaints[0] = %v, want dedicated=flex:NoSchedule", goal.NodeTaints[0]) + } + if goal.NodeTaints[1] != "edge=true:NoExecute" { + t.Fatalf("NodeTaints[1] = %v, want edge=true:NoExecute", goal.NodeTaints[1]) + } + if goal.KubeletConfig.ImageGCHighThreshold != 85 { + t.Fatalf("ImageGCHighThreshold = %d, want 85", goal.KubeletConfig.ImageGCHighThreshold) + } + if goal.KubeletConfig.ImageGCLowThreshold != 80 { + t.Fatalf("ImageGCLowThreshold = %d, want 80", goal.KubeletConfig.ImageGCLowThreshold) + } +} + +func TestGoalStateFromConfigValidates(t *testing.T) { + t.Parallel() + + cfg := testARMConfig(testClusterResourceID, "flex-node-1", "") + _, err := GoalStateFromConfig(cfg) + if err == nil || !strings.Contains(err.Error(), "kubernetes version is empty") { + t.Fatalf("GoalStateFromConfig() error = %v, want Kubernetes version validation", err) + } +} diff --git a/pkg/cmd/daemon/daemon.go b/pkg/cmd/daemon/daemon.go index 6a19af81..53848ff6 100644 --- a/pkg/cmd/daemon/daemon.go +++ b/pkg/cmd/daemon/daemon.go @@ -7,7 +7,7 @@ import ( "github.com/spf13/cobra" - "github.com/Azure/AKSFlexNode/pkg/aksmachine/local" + "github.com/Azure/AKSFlexNode/pkg/aksmachine" "github.com/Azure/AKSFlexNode/pkg/config" "github.com/Azure/AKSFlexNode/pkg/daemon" "github.com/Azure/AKSFlexNode/pkg/logger" @@ -28,9 +28,6 @@ func NewCommand() *cobra.Command { } logger := logger.CreateLogger(cfg.Agent.LogLevel, cfg.Agent.LogDir) - if cfg.Agent.E2EMode { - return runDaemonE2E(cmd.Context(), cfg, logger) - } return runDaemon(cmd.Context(), cfg, logger) }, } @@ -40,16 +37,10 @@ func NewCommand() *cobra.Command { } func runDaemon(ctx context.Context, cfg *config.Config, logger *slog.Logger) error { - logger.Info("TODO: production agent daemon requires AKS RP machine client implementation") - <-ctx.Done() - return ctx.Err() -} - -func runDaemonE2E(ctx context.Context, cfg *config.Config, logger *slog.Logger) error { - logger.Info("running agent daemon in e2e mode", "machineFile", local.E2EMachineFilePath) - machines, err := local.NewClient(local.E2EMachineFilePath) + machines, err := aksmachine.NewMachineClient(cfg, logger) if err != nil { - return fmt.Errorf("create local AKS machine client: %w", err) + return fmt.Errorf("create AKS machine client: %w", err) } + return daemon.Run(ctx, cfg, logger, machines) } diff --git a/pkg/cmd/start/start.go b/pkg/cmd/start/start.go index d6e1b97f..fd76eb04 100644 --- a/pkg/cmd/start/start.go +++ b/pkg/cmd/start/start.go @@ -9,7 +9,6 @@ import ( "github.com/spf13/cobra" "github.com/Azure/AKSFlexNode/pkg/aksmachine" - "github.com/Azure/AKSFlexNode/pkg/aksmachine/local" "github.com/Azure/AKSFlexNode/pkg/config" "github.com/Azure/AKSFlexNode/pkg/daemon" "github.com/Azure/AKSFlexNode/pkg/logger" @@ -52,28 +51,21 @@ func NewCommand() *cobra.Command { } func runStart(ctx context.Context, cfg *config.Config, logger *slog.Logger) error { - machines, err := newMachineClient(cfg, logger) + goal, err := aksmachine.GoalStateFromConfig(cfg) if err != nil { - return err + return fmt.Errorf("build goal state from config: %w", err) } - machine, err := machines.Get(ctx) + machines, err := aksmachine.NewMachineClient(cfg, logger) if err != nil { - return fmt.Errorf("get AKS machine for daemon state seed: %w", err) + return fmt.Errorf("create AKS machine client: %w", err) } - state := daemon.SeededState(machine.Goal) + state := daemon.SeededState(goal) machineName := state.ActiveMachine stateStore, err := daemon.NewFileStateStore() if err != nil { return err } - // This backfill is a best-effort bridge until cluster connection details are - // provided directly by the API. It should be removed once start no longer - // needs to fetch admin kubeconfig data from ARM. - if err := config.BackfillClusterConfigWithUserCredentials(ctx, cfg, logger); err != nil { - return fmt.Errorf("bootstrap failed at step enrich-cluster-config: %w", err) - } - agentCfg := config.ToAgentConfig(cfg, machineName) gs, err := goalstates.ResolveMachine(logger, agentCfg, machineName, nil) if err != nil { @@ -81,6 +73,8 @@ func runStart(ctx context.Context, cfg *config.Config, logger *slog.Logger) erro } tasks := phases.Serial(logger, + // Persist the goal state in AKS RP before mutating local host state. + aksmachine.EnsureMachine(machines, goal, logger), daemon.SetupHost(cfg, logger), daemon.StartNode(cfg, logger, machineName, gs, stateStore, state), daemon.InstallService(logger), @@ -93,16 +87,3 @@ func runStart(ctx context.Context, cfg *config.Config, logger *slog.Logger) erro return nil } - -func newMachineClient(cfg *config.Config, logger *slog.Logger) (aksmachine.MachineClient, error) { - if cfg.Agent.E2EMode { - logger.Info("using local e2e AKS machine client", "machineFile", local.E2EMachineFilePath) - machines, err := local.NewClient(local.E2EMachineFilePath) - if err != nil { - return nil, fmt.Errorf("create local AKS machine client: %w", err) - } - return machines, nil - } - logger.Info("TODO: using no-op AKS machine client until AKS RP implementation is available") - return aksmachine.NewNoopClient(cfg), nil -} diff --git a/pkg/config/adapter.go b/pkg/config/adapter.go index f80de4fd..6b1ebef8 100644 --- a/pkg/config/adapter.go +++ b/pkg/config/adapter.go @@ -19,8 +19,7 @@ const ( // AgentConfig. The resulting struct can be passed to goalstates.ResolveMachine // to produce goal states for the nspawn-based bootstrap phases. // -// The enrich-cluster-config step must have already run so that -// cfg.Node.Kubelet.ServerURL and cfg.Node.Kubelet.CACertData are populated. +// cfg.Node.Kubelet.ServerURL and cfg.Node.Kubelet.CACertData must be populated. func ToAgentConfig(cfg *Config, machineName string) *agentconfig.AgentConfig { ac := &agentconfig.AgentConfig{ MachineName: machineName, diff --git a/pkg/config/cluster_config.go b/pkg/config/cluster_config.go deleted file mode 100644 index 3de43386..00000000 --- a/pkg/config/cluster_config.go +++ /dev/null @@ -1,137 +0,0 @@ -package config - -import ( - "context" - "fmt" - "log/slog" - - "github.com/Azure/AKSFlexNode/pkg/utils/utilaz" - "github.com/Azure/azure-sdk-for-go/sdk/azcore" - "github.com/Azure/azure-sdk-for-go/sdk/azidentity" - "github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/containerservice/armcontainerservice/v8" - "sigs.k8s.io/yaml" -) - -// BackfillClusterConfigWithUserCredentials populates cfg.Node.Kubelet.ServerURL and -// cfg.Node.Kubelet.CACertData from the AKS cluster admin credentials. -// It is a no-op when these fields are already set or when bootstrap token -// auth is configured (which requires them in the config file). -func BackfillClusterConfigWithUserCredentials(ctx context.Context, cfg *Config, logger *slog.Logger) error { - if cfg.Node.Kubelet.ServerURL != "" && cfg.Node.Kubelet.CACertData != "" { - return nil - } - - if cfg.IsBootstrapTokenConfigured() { - return nil - } - - logger.Info("fetching cluster admin credentials to populate server URL and CA cert data") - cred, err := userCredential(cfg) - if err != nil { - return fmt.Errorf("get credential: %w", err) - } - - clusterSubID := cfg.Azure.TargetCluster.SubscriptionID - mcClient, err := armcontainerservice.NewManagedClustersClient(clusterSubID, cred, nil) - if err != nil { - return fmt.Errorf("create managed clusters client: %w", err) - } - - clusterRG := cfg.Azure.TargetCluster.ResourceGroup - clusterName := cfg.Azure.TargetCluster.Name - - resp, err := mcClient.ListClusterAdminCredentials(ctx, clusterRG, clusterName, nil) - if err != nil { - return fmt.Errorf("list cluster admin credentials for %s/%s: %w", clusterRG, clusterName, err) - } - - if len(resp.Kubeconfigs) == 0 { - return fmt.Errorf("no kubeconfig returned in cluster admin credentials response") - } - - kubeconfig := resp.Kubeconfigs[0] - if kubeconfig == nil || len(kubeconfig.Value) == 0 { - return fmt.Errorf("kubeconfig value is empty in cluster admin credentials response") - } - - serverURL, caCertData, err := extractClusterInfoFromKubeconfig(kubeconfig.Value) - if err != nil { - return fmt.Errorf("extract cluster info from kubeconfig: %w", err) - } - - cfg.Node.Kubelet.ServerURL = serverURL - cfg.Node.Kubelet.CACertData = caCertData - logger.Info("cluster config enriched", "serverURL", serverURL) - return nil -} - -func userCredential(cfg *Config) (azcore.TokenCredential, error) { - var sources []azcore.TokenCredential - if cfg.Azure.ServicePrincipal != nil { - sp := cfg.Azure.ServicePrincipal - cred, err := azidentity.NewClientSecretCredential( - sp.TenantID, - sp.ClientID, - sp.ClientSecret, - nil, - ) - if err == nil { - sources = append(sources, cred) - } else { - sources = append(sources, &utilaz.CredentialErrorReporter{CredentialType: "service principal", Err: err}) - } - } - if cfg.Azure.ManagedIdentity != nil { - options := &azidentity.ManagedIdentityCredentialOptions{} - if cfg.Azure.ManagedIdentity.ClientID != "" { - options.ID = azidentity.ClientID(cfg.Azure.ManagedIdentity.ClientID) - } - cred, err := azidentity.NewManagedIdentityCredential(options) - if err == nil { - sources = append(sources, cred) - } else { - sources = append(sources, &utilaz.CredentialErrorReporter{CredentialType: "managed identity", Err: err}) - } - } - cred, err := azidentity.NewAzureCLICredential(nil) - if err == nil { - sources = append(sources, cred) - } else { - sources = append(sources, &utilaz.CredentialErrorReporter{CredentialType: "azure CLI", Err: err}) - } - - chainedCred, err := azidentity.NewChainedTokenCredential(sources, nil) - if err != nil { - return nil, fmt.Errorf("create chained credential: %w", err) - } - return chainedCred, nil -} - -// minimalKubeconfig holds just the fields we need from an admin kubeconfig. -// sigs.k8s.io/yaml converts YAML to JSON first and then uses encoding/json, -// so json: tags (not yaml: tags) are required for correct field mapping. -type minimalKubeconfig struct { - Clusters []struct { - Cluster struct { - Server string `json:"server"` - CertificateAuthorityData string `json:"certificate-authority-data"` - } `json:"cluster"` - } `json:"clusters"` -} - -// extractClusterInfoFromKubeconfig parses a kubeconfig YAML and returns the -// server URL and base64-encoded CA certificate data from the first cluster entry. -func extractClusterInfoFromKubeconfig(data []byte) (serverURL, caCertData string, err error) { - var kc minimalKubeconfig - if err := yaml.Unmarshal(data, &kc); err != nil { - return "", "", fmt.Errorf("parse kubeconfig YAML: %w", err) - } - if len(kc.Clusters) == 0 { - return "", "", fmt.Errorf("no clusters found in kubeconfig") - } - cluster := kc.Clusters[0].Cluster - if cluster.Server == "" { - return "", "", fmt.Errorf("server URL is empty in kubeconfig") - } - return cluster.Server, cluster.CertificateAuthorityData, nil -} diff --git a/pkg/config/config.go b/pkg/config/config.go index 2c1af324..415f3e45 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -3,6 +3,7 @@ package config import ( "encoding/json" "fmt" + "net/url" "os" "path/filepath" "regexp" @@ -130,10 +131,14 @@ type AgentConfig struct { // machine resource when no Kubernetes Node event wakes the controller. MachineReconcileInterval JSONDuration `json:"machineReconcileInterval,omitempty"` - // E2EMode uses the local file-backed AKS machine client. This is only for - // end-to-end tests until the production AKS RP machine client is available. + // E2EMode uses the local file-backed AKS machine client for local testing. + // It is a no-op in production builds. E2EMode bool `json:"e2eMode,omitempty"` + // ARMProxyURLOverrideForE2E redirects ARM requests to a dev-test proxy. + // It must not be set in production configurations. + ARMProxyURLOverrideForE2E string `json:"armProxyURLOverrideForE2E,omitempty"` + // MachineOperationMode controls MachineOperation handling. Supported values: // "auto" detects Machina CRs, "disable" uses a noop reconciler. MachineOperationMode string `json:"machineOperationMode,omitempty"` @@ -512,6 +517,15 @@ func (c *AgentConfig) validate() error { if c.MachineReconcileInterval < 0 { return fmt.Errorf("agent.machineReconcileInterval must be non-negative") } + if c.ARMProxyURLOverrideForE2E != "" { + proxyURL, err := url.Parse(c.ARMProxyURLOverrideForE2E) + if err != nil || proxyURL.Scheme == "" || proxyURL.Host == "" { + return fmt.Errorf("invalid agent.armProxyURLOverrideForE2E: must be an absolute URL") + } + if proxyURL.Scheme != "http" && proxyURL.Scheme != "https" { + return fmt.Errorf("invalid agent.armProxyURLOverrideForE2E: scheme must be http or https") + } + } if c.MachineOperationMode != "" && !validMachineOperationModes[c.MachineOperationMode] { return fmt.Errorf("invalid agent.machineOperationMode: %s. Valid values are: auto, disable", c.MachineOperationMode) } diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index 2c97ef0c..71f8bd4f 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -285,6 +285,63 @@ func TestValidate(t *testing.T) { wantErr: true, errMsg: "invalid agent.machineOperationMode", }, + { + name: "valid ARM proxy override passes", + config: &Config{ + Azure: AzureConfig{ + SubscriptionID: "12345678-1234-1234-1234-123456789012", + TenantID: "12345678-1234-1234-1234-123456789012", + Cloud: "AzurePublicCloud", + BootstrapToken: &BootstrapTokenConfig{ + Token: "abcdef.0123456789abcdef", + }, + TargetCluster: &TargetClusterConfig{ + ResourceID: "/subscriptions/12345678-1234-1234-1234-123456789012/resourceGroups/test-rg/providers/Microsoft.ContainerService/managedClusters/test-cluster", + Location: "eastus", + }, + }, + Agent: AgentConfig{ + LogLevel: "info", + ARMProxyURLOverrideForE2E: "http://127.0.0.1:8080/proxy", + }, + Node: NodeConfig{ + Kubelet: KubeletConfig{ + ServerURL: "https://test-cluster-abc123.hcp.eastus.azmk8s.io:443", + CACertData: "LS0tLS1CRUdJTi1DRVJUSUZJQ0FURS0tLS0tCk1JSUREekNDQWZlZ0F3SUJBZ0lSQU1kbzBZa0R", + }, + }, + }, + wantErr: false, + }, + { + name: "invalid ARM proxy override fails", + config: &Config{ + Azure: AzureConfig{ + SubscriptionID: "12345678-1234-1234-1234-123456789012", + TenantID: "12345678-1234-1234-1234-123456789012", + Cloud: "AzurePublicCloud", + BootstrapToken: &BootstrapTokenConfig{ + Token: "abcdef.0123456789abcdef", + }, + TargetCluster: &TargetClusterConfig{ + ResourceID: "/subscriptions/12345678-1234-1234-1234-123456789012/resourceGroups/test-rg/providers/Microsoft.ContainerService/managedClusters/test-cluster", + Location: "eastus", + }, + }, + Agent: AgentConfig{ + LogLevel: "info", + ARMProxyURLOverrideForE2E: "/proxy", + }, + Node: NodeConfig{ + Kubelet: KubeletConfig{ + ServerURL: "https://test-cluster-abc123.hcp.eastus.azmk8s.io:443", + CACertData: "LS0tLS1CRUdJTi1DRVJUSUZJQ0FURS0tLS0tCk1JSUREekNDQWZlZ0F3SUJBZ0lSQU1kbzBZa0R", + }, + }, + }, + wantErr: true, + errMsg: "invalid agent.armProxyURLOverrideForE2E", + }, { name: "valid arc config passes", config: &Config{