diff --git a/api/v1/adxcluster_types.go b/api/v1/adxcluster_types.go index dfb604de2..ca8564ad4 100644 --- a/api/v1/adxcluster_types.go +++ b/api/v1/adxcluster_types.go @@ -82,6 +82,32 @@ type ADXClusterFederationSpec struct { //+kubebuilder:validation:Pattern=^(\d+h)?(\d+m)?(\d+s)?$ // If role is "Federated", specifies the ADX cluster's heartbeat table TTL. HeartbeatTTL *string `json:"heartbeatTTL,omitempty"` + + //+kubebuilder:validation:Optional + // Specifies partition clusters that should be excluded from federation macros. + BlockedClusters *ADXClusterFederationBlockedClustersSpec `json:"blockedClusters,omitempty"` +} + +type ADXClusterFederationBlockedClustersSpec struct { + //+kubebuilder:validation:Optional + // Static list of partition cluster endpoints to exclude from macro generation. Values are case-insensitive and trimmed of trailing slashes before comparison. + Static []string `json:"static,omitempty"` + + //+kubebuilder:validation:Optional + // Optional Kusto function that returns additional endpoints to block. The function should return a tabular result with a string column named "ClusterEndpoint" (or "Endpoint"). + KustoFunction *ADXClusterFederationBlockedClustersFunctionSpec `json:"kustoFunction,omitempty"` +} + +type ADXClusterFederationBlockedClustersFunctionSpec struct { + //+kubebuilder:validation:Required + //+kubebuilder:validation:Pattern=^[a-zA-Z0-9_]+$ + // Database containing the Kusto function. + Database string `json:"database"` + + //+kubebuilder:validation:Required + //+kubebuilder:validation:Pattern=^[A-Za-z_][A-Za-z0-9_]*$ + // Name of the Kusto function returning blocked cluster endpoints. + Name string `json:"name"` } type ADXClusterFederatedClusterSpec struct { diff --git a/api/v1/zz_generated.deepcopy.go b/api/v1/zz_generated.deepcopy.go index 4ad6be39d..fad23e09b 100644 --- a/api/v1/zz_generated.deepcopy.go +++ b/api/v1/zz_generated.deepcopy.go @@ -66,6 +66,46 @@ func (in *ADXClusterFederatedClusterSpec) DeepCopy() *ADXClusterFederatedCluster return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ADXClusterFederationBlockedClustersFunctionSpec) DeepCopyInto(out *ADXClusterFederationBlockedClustersFunctionSpec) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ADXClusterFederationBlockedClustersFunctionSpec. +func (in *ADXClusterFederationBlockedClustersFunctionSpec) DeepCopy() *ADXClusterFederationBlockedClustersFunctionSpec { + if in == nil { + return nil + } + out := new(ADXClusterFederationBlockedClustersFunctionSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ADXClusterFederationBlockedClustersSpec) DeepCopyInto(out *ADXClusterFederationBlockedClustersSpec) { + *out = *in + if in.Static != nil { + in, out := &in.Static, &out.Static + *out = make([]string, len(*in)) + copy(*out, *in) + } + if in.KustoFunction != nil { + in, out := &in.KustoFunction, &out.KustoFunction + *out = new(ADXClusterFederationBlockedClustersFunctionSpec) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ADXClusterFederationBlockedClustersSpec. +func (in *ADXClusterFederationBlockedClustersSpec) DeepCopy() *ADXClusterFederationBlockedClustersSpec { + if in == nil { + return nil + } + out := new(ADXClusterFederationBlockedClustersSpec) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ADXClusterFederationSpec) DeepCopyInto(out *ADXClusterFederationSpec) { *out = *in @@ -100,6 +140,11 @@ func (in *ADXClusterFederationSpec) DeepCopyInto(out *ADXClusterFederationSpec) *out = new(string) **out = **in } + if in.BlockedClusters != nil { + in, out := &in.BlockedClusters, &out.BlockedClusters + *out = new(ADXClusterFederationBlockedClustersSpec) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ADXClusterFederationSpec. diff --git a/docs/adxcluster-controller.md b/docs/adxcluster-controller.md index b2c367742..d131aa1ac 100644 --- a/docs/adxcluster-controller.md +++ b/docs/adxcluster-controller.md @@ -76,6 +76,19 @@ Federation mode: `Partition` (local storage + heartbeat to hub) or `Federated` ( - `heartbeatDatabase`, `heartbeatTable`: where partitions write - `heartbeatTTL`: freshness window (default `1h`) +#### Blocked partition list (Federated hubs) +`spec.federation.blockedClusters` lets hub clusters exclude quarantined or misbehaving partitions from macro generation +without editing the partition CRDs. The controller merges two sources into a normalized block list (lower-cased, trimmed +of whitespace and trailing slashes): + +- `blockedClusters.static`: literal list of partition endpoints to suppress. +- `blockedClusters.kustoFunction`: optional break-glass function looked up in the specified `database`/`name`. The + function must return a tabular result with a string column named `ClusterEndpoint` (preferred) or `Endpoint`. Missing + functions are treated as warnings; other errors halt the reconciliation so ops teams can inspect the failure. + +After fetching heartbeats the controller filters any partitions whose endpoints match the block list, logs how many +entries were removed vs. matched, and proceeds with macro creation using the remaining schema. + ### `criteriaExpression` Optional CEL expression against operator cluster labels. Empty/missing = `true`. Errors/false → reconciliation blocked. Example: `labels["geo"] == "eu" && labels.has("tier")` keeps the object scoped to European, tiered operators. See the [CEL language spec](https://opensource.google/projects/cel) for expression syntax. @@ -91,7 +104,8 @@ Authentication: `DefaultAzureCredential` for local cluster; switches to `managed Failures to push heartbeats surface in the operator logs (`adxcluster-controller` logger) alongside the CRD name; reconcile once connectivity or permissions are restored. ### Federated Hubs -Every 10min: query heartbeat table (`WHERE Timestamp > ago(heartbeatTTL)`) → extract partition schemas → ensure OTLP hub tables exist → generate cross-cluster functions. +Every 10min: query heartbeat table (`WHERE Timestamp > ago(heartbeatTTL)`) → extract partition schemas → optionally +filter out any endpoints listed in `spec.federation.blockedClusters` (static values plus any returned by the Kusto function) → ensure OTLP hub tables exist → generate cross-cluster functions. Filtering happens before schema aggregation so blocked partitions contribute neither tables nor functions. Heartbeat table schema: `Timestamp:datetime, ClusterEndpoint:string, Schema:dynamic, PartitionMetadata:dynamic` @@ -184,8 +198,18 @@ spec: heartbeatDatabase: "FleetDiscovery" heartbeatTable: "Heartbeats" heartbeatTTL: "2h" + blockedClusters: + static: + - "https://rogue-partition.kusto.windows.net" + kustoFunction: + database: FleetDiscovery + name: GetBlockedPartitions ``` +The static list handles known quarantined clusters, while the `GetBlockedPartitions()` function can emit emergent +endpoints discovered by external tooling. The controller merges both sources and keeps deduplicated, normalized entries +for filtering during macro reconciliation. + ### Gate Reconciliation with Criteria Restrict reconciliation to operators that carry matching labels. ```yaml diff --git a/docs/crds.md b/docs/crds.md index a75d61f2a..a84de634e 100644 --- a/docs/crds.md +++ b/docs/crds.md @@ -51,6 +51,33 @@ spec: geo: "EU" location: "westeurope" ``` + +**Federated hub example with blocked clusters (static + dynamic):** +```yaml +apiVersion: adx-mon.azure.com/v1 +kind: ADXCluster +metadata: + name: global-hub +spec: + clusterName: global-hub + endpoint: "https://global-hub.kusto.windows.net" + role: Federated + federation: + heartbeatDatabase: "FleetDiscovery" + heartbeatTable: "Heartbeats" + heartbeatTTL: "2h" + blockedClusters: + static: + - "https://legacy-partition.kusto.windows.net" + - "https://quarantined-partition.kusto.windows.net/" # trailing slashes ignored + kustoFunction: + database: FleetDiscovery + name: GetBlockedPartitions +``` +The controller normalizes each endpoint (lower case and no trailing slash) and merges the static list with the results of +`GetBlockedPartitions()` before generating macros. Any partition whose heartbeat endpoint matches the merged list is +excluded from schema fan-out. + **Key Fields:** - `clusterName`: Name for the ADX cluster. - `endpoint`: Existing ADX cluster URI (omit to provision new). When set, the controller mirrors this value into status @@ -61,7 +88,34 @@ spec: `resourceGroup`, `location`, `skuName`, and `tier` explicitly—the controller no longer auto-detects or mutates these values. - `role`: `Partition` (default) or `Federated` for multi-cluster. -- `federation`: Federation/partitioning config for multi-cluster. +- `federation`: Federation/partitioning config for multi-cluster. Federated hubs can optionally specify + `federation.blockedClusters` to exclude rogue or quarantined partitions from macro generation. The block list accepts a + static array of endpoints as well as a break-glass Kusto function (returning `ClusterEndpoint` or `Endpoint` columns) + whose results are normalized (trimmed, case-insensitive, trailing slashes removed) before filtering the partition + schema set. + +### Creating a blocked cluster function in Kusto +The optional `blockedClusters.kustoFunction` expects a function that returns a table containing either a +`ClusterEndpoint` or `Endpoint` string column. You can create one in the heartbeat database with a command like: + +```kusto +.create-or-alter function with (docstring="Return partitions blocked from federation", folder="FleetSafety") GetBlockedPartitions() +{ + let manualOverrides = datatable(ClusterEndpoint:string) + [ + "https://legacy-partition.kusto.windows.net", + "https://maintenance-partition.kusto.windows.net" + ]; + FleetSafetyBlockedPartitions + | project ClusterEndpoint = tostring(ClusterEndpoint) + | union manualOverrides + | distinct ClusterEndpoint +} +``` + +- `FleetSafetyBlockedPartitions` can be any table you manage (for example, populated via Azure Monitor alerts or manual entries). +- The function may also emit an `Endpoint` column; the controller checks both and trims whitespace before use. +- Missing functions are logged as warnings, while other Kusto errors block reconciliation so operators can investigate. **Status highlights:** - `status.endpoint`: Observed Kusto endpoint used by dependent components (mirrors `spec.endpoint` in BYO mode). diff --git a/kustomize/bases/adxclusters_crd.yaml b/kustomize/bases/adxclusters_crd.yaml index d7fcf5c4a..d7b65fb81 100644 --- a/kustomize/bases/adxclusters_crd.yaml +++ b/kustomize/bases/adxclusters_crd.yaml @@ -101,6 +101,37 @@ spec: description: Supports cluster partitioning. Only relevant if Role is set. properties: + blockedClusters: + description: Specifies partition clusters that should be excluded + from federation macros. + properties: + kustoFunction: + description: Optional Kusto function that returns additional + endpoints to block. The function should return a tabular + result with a string column named "ClusterEndpoint" (or + "Endpoint"). + properties: + database: + description: Database containing the break-glass function. + pattern: ^[a-zA-Z0-9_]+$ + type: string + name: + description: Name of the Kusto function returning blocked + cluster endpoints. + pattern: ^[A-Za-z_][A-Za-z0-9_]*$ + type: string + required: + - database + - name + type: object + static: + description: Static list of partition cluster endpoints to + exclude from macro generation. Values are case-insensitive + and trimmed of trailing slashes before comparison. + items: + type: string + type: array + type: object federatedTargets: description: If role is "Partition", specifies the Federated cluster(s) details for heartbeating. diff --git a/operator/adx.go b/operator/adx.go index 6bd42408e..eb347457e 100644 --- a/operator/adx.go +++ b/operator/adx.go @@ -16,6 +16,7 @@ import ( "github.com/Azure/adx-mon/pkg/celutil" "github.com/Azure/adx-mon/pkg/logger" "github.com/Azure/azure-kusto-go/kusto" + kustoerrors "github.com/Azure/azure-kusto-go/kusto/data/errors" "github.com/Azure/azure-kusto-go/kusto/kql" "github.com/Azure/azure-sdk-for-go/sdk/azcore" "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" @@ -43,6 +44,10 @@ const ( otlpHubSchemaDefinition = "Timestamp:datetime, ObservedTimestamp:datetime, TraceId:string, SpanId:string, SeverityText:string, SeverityNumber:int, Body:dynamic, Resource:dynamic, Attributes:dynamic" ) +type kustoQueryClient interface { + Query(ctx context.Context, db string, query kusto.Statement, options ...kusto.QueryOption) (*kusto.RowIterator, error) +} + // resolvedClusterEndpoint returns the effective endpoint to use for a cluster, // preferring the reconciled status endpoint and falling back to the spec when // the status has not been populated yet. @@ -1217,6 +1222,16 @@ func (r *AdxReconciler) FederateClusters(ctx context.Context, cluster *adxmonv1. schemaByEndpoint, _ := parseHeartbeatRows(rows) logger.Infof("ADXCluster %s: processed heartbeat data from %d partition clusters", cluster.Spec.ClusterName, len(schemaByEndpoint)) + blockedEndpoints, err := resolveBlockedClusterEndpoints(ctx, client, cluster) + if err != nil { + return ctrl.Result{}, fmt.Errorf("failed to resolve blocked cluster endpoints: %w", err) + } + if len(blockedEndpoints) > 0 { + removed, matched := filterSchemaByBlockedEndpoints(schemaByEndpoint, blockedEndpoints) + unmatched := len(blockedEndpoints) - matched + logger.Infof("ADXCluster %s: block list entries=%d, filtered partitions=%d, matched=%d, unmatched=%d", cluster.Spec.ClusterName, len(blockedEndpoints), removed, matched, unmatched) + } + // Step 5: Unique list of databases dbSet := extractDatabasesFromSchemas(schemaByEndpoint) var dbSpecs []adxmonv1.ADXClusterDatabaseSpec @@ -1724,16 +1739,189 @@ func splitKustoScripts(funcs []string, maxSize int) [][]string { // Helper: Execute Kusto scripts in a database, using the .execute database script preamble func executeKustoScripts(ctx context.Context, client *kusto.Client, database string, scripts [][]string) error { const scriptPreamble = ".execute database script with (ContinueOnErrors=true)\n<|\n" - for _, script := range scripts { + var errs []error + for idx, script := range scripts { fullScript := scriptPreamble + strings.Join(script, "") - _, err := client.Mgmt(ctx, database, kql.New("").AddUnsafe(fullScript)) - if err != nil { - return fmt.Errorf("failed to execute Kusto script: %w", err) + if _, err := client.Mgmt(ctx, database, kql.New("").AddUnsafe(fullScript)); err != nil { + logger.Errorf("ADXCluster: failed to execute Kusto script chunk %d/%d for database %s: %v", idx+1, len(scripts), database, err) + errs = append(errs, fmt.Errorf("chunk %d: %w", idx+1, err)) } } + if len(errs) > 0 { + return fmt.Errorf("one or more Kusto scripts failed") + } return nil } +type blockedClusterRow struct { + ClusterEndpoint string `kusto:"ClusterEndpoint"` + Endpoint string `kusto:"Endpoint"` +} + +// resolveBlockedClusterEndpoints returns a map keyed by normalized endpoint URLs with the +// original endpoint string as the value. This preserves logging fidelity while allowing +// quick membership checks against canonicalized keys. +func resolveBlockedClusterEndpoints(ctx context.Context, client kustoQueryClient, cluster *adxmonv1.ADXCluster) (map[string]string, error) { + blocked := make(map[string]string) + federation := cluster.Spec.Federation + if federation == nil || federation.BlockedClusters == nil { + return blocked, nil + } + + config := federation.BlockedClusters + for _, entry := range config.Static { + if normalized := normalizeEndpoint(entry); normalized != "" { + blocked[normalized] = strings.TrimSpace(entry) + } + } + + fnSpec := config.KustoFunction + if fnSpec == nil { + return blocked, nil + } + + database := strings.TrimSpace(fnSpec.Database) + functionName := strings.TrimSpace(fnSpec.Name) + if database == "" || functionName == "" { + return blocked, nil + } + if client == nil { + return nil, fmt.Errorf("kusto client is required to execute blocked cluster function %s.%s", database, functionName) + } + + endpoints, err := fetchBlockedEndpointsFromFunction(ctx, client, database, functionName) + if err != nil { + if isBlockedFunctionNotFoundError(err) { + logger.Warnf("ADXCluster %s: blocked cluster function %s.%s not found, skipping", cluster.Spec.ClusterName, database, functionName) + return blocked, nil + } + return nil, fmt.Errorf("failed to execute blocked cluster function %s.%s: %w", database, functionName, err) + } + + for _, endpoint := range endpoints { + trimmed := strings.TrimSpace(endpoint) + if normalized := normalizeEndpoint(trimmed); normalized != "" { + blocked[normalized] = trimmed + } + } + + return blocked, nil +} + +func fetchBlockedEndpointsFromFunction(ctx context.Context, client kustoQueryClient, database, functionName string) ([]string, error) { + // NOTE: The function name is provided via ADXCluster CRD and validated by the pattern ^[A-Za-z_][A-Za-z0-9_]*$ at the CRD level. + // This ensures only valid KQL function names are allowed, mitigating injection risks. + // AddUnsafe is required here because the Kusto SDK does not provide a safe way to interpolate function names. + stmt := kql.New("").AddUnsafe(functionName).AddUnsafe("()") + iter, err := client.Query(ctx, database, stmt) + if err != nil { + return nil, err + } + if iter == nil { + return nil, fmt.Errorf("blocked cluster function %s returned no iterator; verify the function definition and permissions allow queries", functionName) + } + defer iter.Stop() + + var endpoints []string + for { + row, inlineErr, err := iter.NextRowOrError() + if err == io.EOF { + break + } + if err != nil { + return nil, err + } + if inlineErr != nil { + logger.Warnf("Blocked cluster function %s returned inline error: %v", functionName, inlineErr) + continue + } + if row == nil { + continue + } + var rec blockedClusterRow + if err := row.ToStruct(&rec); err != nil { + logger.Warnf("Failed to decode blocked cluster row for function %s: %v", functionName, err) + continue + } + endpoint := strings.TrimSpace(rec.ClusterEndpoint) + if endpoint == "" { + endpoint = strings.TrimSpace(rec.Endpoint) + } + if endpoint == "" { + continue + } + endpoints = append(endpoints, endpoint) + } + + return endpoints, nil +} + +func isBlockedFunctionNotFoundError(err error) bool { + if err == nil { + return false + } + if kerr, ok := kustoerrors.GetKustoError(err); ok { + if restErrorIndicatesNotFound(kerr) { + return true + } + if containsNotFoundText(kerr.Error()) { + return true + } + } + return containsNotFoundText(err.Error()) +} + +func restErrorIndicatesNotFound(kerr *kustoerrors.Error) bool { + decoded := kerr.UnmarshalREST() + if decoded == nil { + return false + } + root, _ := decoded["error"].(map[string]interface{}) + if root == nil { + return false + } + if code, _ := root["code"].(string); code != "" && strings.EqualFold(code, "NotFound") { + return true + } + if message, _ := root["message"].(string); message != "" && containsNotFoundText(message) { + return true + } + return false +} + +func containsNotFoundText(message string) bool { + lower := strings.ToLower(message) + return strings.Contains(lower, "does not refer to any known entity") || + strings.Contains(lower, "does not exist") || + strings.Contains(lower, "not found") +} + +func normalizeEndpoint(endpoint string) string { + trimmed := strings.TrimSpace(endpoint) + trimmed = strings.TrimRight(trimmed, "/") + if trimmed == "" { + return "" + } + return strings.ToLower(trimmed) +} + +func filterSchemaByBlockedEndpoints(schema map[string][]ADXClusterSchema, blocked map[string]string) (int, int) { + if len(blocked) == 0 { + return 0, 0 + } + removed := 0 + matched := make(map[string]struct{}) + for endpoint := range schema { + norm := normalizeEndpoint(endpoint) + if _, ok := blocked[norm]; ok { + delete(schema, endpoint) + removed++ + matched[norm] = struct{}{} + } + } + return removed, len(matched) +} + type FunctionRec struct { Name string `kusto:"Name"` Parameters string `kusto:"Parameters"` diff --git a/operator/adx_test.go b/operator/adx_test.go index 7f4cb6465..a9d842dd5 100644 --- a/operator/adx_test.go +++ b/operator/adx_test.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "io" "net/http" "net/http/httptest" "os/exec" @@ -15,6 +16,9 @@ import ( "github.com/Azure/adx-mon/pkg/testutils" "github.com/Azure/adx-mon/pkg/testutils/kustainer" "github.com/Azure/azure-kusto-go/kusto" + kustoerrors "github.com/Azure/azure-kusto-go/kusto/data/errors" + kustotable "github.com/Azure/azure-kusto-go/kusto/data/table" + kustotypes "github.com/Azure/azure-kusto-go/kusto/data/types" "github.com/Azure/azure-kusto-go/kusto/kql" "github.com/Azure/azure-sdk-for-go/sdk/azcore" "github.com/Azure/azure-sdk-for-go/sdk/azcore/policy" @@ -894,3 +898,251 @@ func TestEnsureHubTables(t *testing.T) { // Re-run to confirm it remains idempotent when tables already exist. require.NoError(t, ensureHubTables(ctx, client, database, tables)) } + +type stubKustoQueryClient struct { + t *testing.T + iterator *kusto.RowIterator + err error + expectDatabase string + expectFunction string + called bool +} + +func (s *stubKustoQueryClient) Query(ctx context.Context, db string, query kusto.Statement, options ...kusto.QueryOption) (*kusto.RowIterator, error) { + if s.expectDatabase != "" { + require.Equal(s.t, s.expectDatabase, db) + } + if s.expectFunction != "" { + require.Contains(s.t, query.String(), s.expectFunction) + } + s.called = true + if s.err != nil { + return nil, s.err + } + return s.iterator, nil +} + +func newMockBlockedClusterIterator(t *testing.T, rows []blockedClusterRow) *kusto.RowIterator { + t.Helper() + columns := kustotable.Columns{ + {Name: "ClusterEndpoint", Type: kustotypes.String}, + {Name: "Endpoint", Type: kustotypes.String}, + } + mockRows, err := kusto.NewMockRows(columns) + require.NoError(t, err) + for _, row := range rows { + require.NoError(t, mockRows.Struct(&row)) + } + iter := &kusto.RowIterator{} + require.NoError(t, iter.Mock(mockRows)) + return iter +} + +func makeBlockedCluster(static []string, fn *adxmonv1.ADXClusterFederationBlockedClustersFunctionSpec) *adxmonv1.ADXCluster { + return &adxmonv1.ADXCluster{ + Spec: adxmonv1.ADXClusterSpec{ + ClusterName: "test-cluster", + Federation: &adxmonv1.ADXClusterFederationSpec{ + BlockedClusters: &adxmonv1.ADXClusterFederationBlockedClustersSpec{ + Static: append([]string(nil), static...), + KustoFunction: fn, + }, + }, + }, + } +} + +func TestNormalizeEndpoint(t *testing.T) { + t.Parallel() + testCases := map[string]struct { + input string + want string + }{ + "empty": {input: "", want: ""}, + "trim": {input: " HTTPS://Example.kusto.windows.net/ ", want: "https://example.kusto.windows.net"}, + "no-scheme": {input: "FoO/", want: "foo"}, + "already-ok": {input: "https://cluster", want: "https://cluster"}, + "multi-slash": {input: "https://cluster///", want: "https://cluster"}, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + require.Equal(t, tc.want, normalizeEndpoint(tc.input)) + }) + } +} + +func TestRestErrorIndicatesNotFound(t *testing.T) { + t.Parallel() + makeHTTPError := func(body string) *kustoerrors.HttpError { + return kustoerrors.HTTP( + kustoerrors.OpQuery, + "404", + http.StatusNotFound, + io.NopCloser(strings.NewReader(body)), + "prefix", + ) + } + + t.Run("code", func(t *testing.T) { + err := makeHTTPError(`{"error":{"code":"NotFound"}}`) + require.True(t, restErrorIndicatesNotFound(&err.KustoError)) + }) + + t.Run("message", func(t *testing.T) { + err := makeHTTPError(`{"error":{"message":"Function does not exist"}}`) + require.True(t, restErrorIndicatesNotFound(&err.KustoError)) + }) +} + +func TestContainsNotFoundText(t *testing.T) { + t.Parallel() + require.True(t, containsNotFoundText("Entity does not refer to any known entity")) + require.True(t, containsNotFoundText("Function not found")) + require.False(t, containsNotFoundText("all systems go")) +} + +func TestIsBlockedFunctionNotFoundError(t *testing.T) { + t.Parallel() + makeHTTPError := func(body string) error { + return kustoerrors.HTTP( + kustoerrors.OpQuery, + "404", + http.StatusNotFound, + io.NopCloser(strings.NewReader(body)), + "prefix", + ) + } + + t.Run("kusto", func(t *testing.T) { + err := makeHTTPError(`{"error":{"code":"NotFound"}}`) + require.True(t, isBlockedFunctionNotFoundError(err)) + }) + + t.Run("generic", func(t *testing.T) { + require.True(t, isBlockedFunctionNotFoundError(fmt.Errorf("Function does not exist"))) + require.False(t, isBlockedFunctionNotFoundError(fmt.Errorf("permission denied"))) + }) +} + +func TestFilterSchemaByBlockedEndpoints(t *testing.T) { + t.Parallel() + schema := map[string][]ADXClusterSchema{ + "https://foo": {{Database: "db1"}}, + "https://bar/": {{Database: "db2"}}, + "https://baz": {{Database: "db3"}}, + } + blocked := map[string]string{ + normalizeEndpoint("https://foo/"): "foo", + normalizeEndpoint("https://bar"): "bar", + normalizeEndpoint("https://qux"): "qux", + } + + removed, matched := filterSchemaByBlockedEndpoints(schema, blocked) + require.Equal(t, 2, removed) + require.Equal(t, 2, matched) + require.NotContains(t, schema, "https://foo") + require.NotContains(t, schema, "https://bar/") + require.Contains(t, schema, "https://baz") +} + +func TestResolveBlockedClusterEndpoints_StaticOnly(t *testing.T) { + t.Parallel() + cluster := makeBlockedCluster([]string{" HTTPS://Foo/ ", " bar "}, nil) + result, err := resolveBlockedClusterEndpoints(context.Background(), nil, cluster) + require.NoError(t, err) + require.Len(t, result, 2) + require.Equal(t, "HTTPS://Foo/", result["https://foo"]) + require.Equal(t, "bar", result["bar"]) +} + +func TestResolveBlockedClusterEndpoints_FunctionSuccess(t *testing.T) { + t.Parallel() + cluster := makeBlockedCluster([]string{"https://static"}, &adxmonv1.ADXClusterFederationBlockedClustersFunctionSpec{ + Database: "db1", + Name: "GetBlocked", + }) + iter := newMockBlockedClusterIterator(t, []blockedClusterRow{ + {ClusterEndpoint: "https://dyn1/"}, + {Endpoint: " https://dyn2 "}, + }) + client := &stubKustoQueryClient{ + t: t, + iterator: iter, + expectDatabase: "db1", + expectFunction: "GetBlocked()", + } + result, err := resolveBlockedClusterEndpoints(context.Background(), client, cluster) + require.NoError(t, err) + require.True(t, client.called) + require.Len(t, result, 3) + require.Contains(t, result, "https://static") + require.Equal(t, "https://dyn1/", result["https://dyn1"]) + require.Contains(t, result, "https://dyn2") +} + +func TestResolveBlockedClusterEndpoints_FunctionDedupesStatic(t *testing.T) { + t.Parallel() + cluster := makeBlockedCluster([]string{"https://dup"}, &adxmonv1.ADXClusterFederationBlockedClustersFunctionSpec{ + Database: "db1", + Name: "GetBlocked", + }) + iter := newMockBlockedClusterIterator(t, []blockedClusterRow{{ClusterEndpoint: "https://dup/"}}) + client := &stubKustoQueryClient{t: t, iterator: iter} + result, err := resolveBlockedClusterEndpoints(context.Background(), client, cluster) + require.NoError(t, err) + require.True(t, client.called) + require.Len(t, result, 1) + require.Equal(t, "https://dup/", result["https://dup"]) +} + +func TestResolveBlockedClusterEndpoints_FunctionNotFound(t *testing.T) { + t.Parallel() + cluster := makeBlockedCluster([]string{"https://static"}, &adxmonv1.ADXClusterFederationBlockedClustersFunctionSpec{ + Database: "db1", + Name: "GetBlocked", + }) + client := &stubKustoQueryClient{t: t, err: fmt.Errorf("function not found")} + result, err := resolveBlockedClusterEndpoints(context.Background(), client, cluster) + require.NoError(t, err) + require.True(t, client.called) + require.Len(t, result, 1) + require.Contains(t, result, "https://static") +} + +func TestResolveBlockedClusterEndpoints_FunctionError(t *testing.T) { + t.Parallel() + cluster := makeBlockedCluster([]string{"https://static"}, &adxmonv1.ADXClusterFederationBlockedClustersFunctionSpec{ + Database: "db1", + Name: "GetBlocked", + }) + client := &stubKustoQueryClient{t: t, err: fmt.Errorf("timeout while executing")} + result, err := resolveBlockedClusterEndpoints(context.Background(), client, cluster) + require.Error(t, err) + require.Nil(t, result) + require.True(t, client.called) +} + +func TestResolveBlockedClusterEndpoints_FunctionRequiresClient(t *testing.T) { + t.Parallel() + cluster := makeBlockedCluster([]string{"https://static"}, &adxmonv1.ADXClusterFederationBlockedClustersFunctionSpec{ + Database: "db1", + Name: "GetBlocked", + }) + _, err := resolveBlockedClusterEndpoints(context.Background(), nil, cluster) + require.Error(t, err) + require.Contains(t, err.Error(), "kusto client is required") +} + +func TestResolveBlockedClusterEndpoints_SkipsWhenFunctionIncomplete(t *testing.T) { + t.Parallel() + cluster := makeBlockedCluster([]string{"https://static"}, &adxmonv1.ADXClusterFederationBlockedClustersFunctionSpec{ + Name: "GetBlocked", + }) + client := &stubKustoQueryClient{t: t} + result, err := resolveBlockedClusterEndpoints(context.Background(), client, cluster) + require.NoError(t, err) + require.False(t, client.called) + require.Len(t, result, 1) + require.Contains(t, result, "https://static") +} diff --git a/operator/manifests/crds/adxclusters_crd.yaml b/operator/manifests/crds/adxclusters_crd.yaml index d7fcf5c4a..d7b65fb81 100644 --- a/operator/manifests/crds/adxclusters_crd.yaml +++ b/operator/manifests/crds/adxclusters_crd.yaml @@ -101,6 +101,37 @@ spec: description: Supports cluster partitioning. Only relevant if Role is set. properties: + blockedClusters: + description: Specifies partition clusters that should be excluded + from federation macros. + properties: + kustoFunction: + description: Optional Kusto function that returns additional + endpoints to block. The function should return a tabular + result with a string column named "ClusterEndpoint" (or + "Endpoint"). + properties: + database: + description: Database containing the break-glass function. + pattern: ^[a-zA-Z0-9_]+$ + type: string + name: + description: Name of the Kusto function returning blocked + cluster endpoints. + pattern: ^[A-Za-z_][A-Za-z0-9_]*$ + type: string + required: + - database + - name + type: object + static: + description: Static list of partition cluster endpoints to + exclude from macro generation. Values are case-insensitive + and trimmed of trailing slashes before comparison. + items: + type: string + type: array + type: object federatedTargets: description: If role is "Partition", specifies the Federated cluster(s) details for heartbeating.