From 278921c05343c3c2f842b483054ca41ca4b1b673 Mon Sep 17 00:00:00 2001 From: Colby Farley Date: Fri, 10 Apr 2026 22:32:48 -0500 Subject: [PATCH 1/2] feat: finish deployment-path slice --- src/azurefox/chains/deployment_path.py | 114 ++-- src/azurefox/chains/registry.py | 15 + src/azurefox/chains/runner.py | 717 +++++++++++++++++--- src/azurefox/collectors/provider.py | 177 +++-- src/azurefox/target_matching.py | 67 ++ tests/test_cli_smoke.py | 25 +- tests/test_collectors.py | 117 ++++ tests/test_deployment_path_admissibility.py | 472 +++++++++++-- 8 files changed, 1457 insertions(+), 247 deletions(-) create mode 100644 src/azurefox/target_matching.py diff --git a/src/azurefox/chains/deployment_path.py b/src/azurefox/chains/deployment_path.py index 5168b19..ea675e8 100644 --- a/src/azurefox/chains/deployment_path.py +++ b/src/azurefox/chains/deployment_path.py @@ -130,40 +130,24 @@ def admit_deployment_path_row( candidate_count = max(exact_target_count, narrowed_candidate_count) if source.posture == "stores secrets here": - if exact_target_count == 1 and confirmation_basis in _CANONICAL_CONFIRMATION_BASES: - return DeploymentRowAdmission( - state="named match", - admitted=True, - reason=( - "Secret-bearing deployment support is tied to a visible Azure change path, " - "and the downstream target is joined by canonical evidence." - ), - ) - - if visibility_issue or source.missing_target_mapping: - return DeploymentRowAdmission( - state="visibility blocked", - admitted=True, - reason=( - "Secret-bearing deployment support is visible, but AzureFox still needs " - "stronger target mapping before it can name the downstream Azure footprint." - ), - ) - - if candidate_count > 0: - return DeploymentRowAdmission( - state="narrowed candidates", - admitted=True, - reason=( - "Secret-bearing deployment support is visible, and current evidence narrows " - "the Azure footprint it could widen if another foothold controls execution." - ), - ) - - return DeploymentRowAdmission( - state="blocked", - admitted=False, - reason=( + return _resolved_deployment_row_admission( + exact_target_count=exact_target_count, + candidate_count=candidate_count, + confirmation_basis=confirmation_basis, + visibility_blocked=bool(visibility_issue or source.missing_target_mapping), + named_match_reason=( + "Secret-bearing deployment support is tied to a visible Azure change path, " + "and the downstream target is joined by canonical evidence." + ), + visibility_reason=( + "Secret-bearing deployment support is visible, but AzureFox still needs " + "stronger target mapping before it can name the downstream Azure footprint." + ), + narrowed_reason=( + "Secret-bearing deployment support is visible, and current evidence narrows " + "the Azure footprint it could widen if another foothold controls execution." + ), + blocked_reason=( "Support-only deployment rows still need a named target, a narrowed candidate " "set, or an explicit missing target-mapping boundary." ), @@ -179,44 +163,64 @@ def admit_deployment_path_row( ), ) + return _resolved_deployment_row_admission( + exact_target_count=exact_target_count, + candidate_count=candidate_count, + confirmation_basis=confirmation_basis, + visibility_blocked=bool(visibility_issue), + named_match_reason=( + "The source already looks change-capable and the downstream target is joined by " + "canonical evidence rather than name-only inference." + ), + visibility_reason=( + "The source already looks change-capable, but current scope does not confirm " + "which downstream Azure targets are visible enough to name." + ), + narrowed_reason=( + "The source already looks change-capable, but current evidence narrows the next " + "review set without proving one exact downstream target." + ), + blocked_reason=( + "Visible execution or service-connection posture alone is not enough; a default " + "deployment-path row also needs a named target, a narrowed candidate set, or an " + "explicit visibility block." + ), + ) + + +def _resolved_deployment_row_admission( + *, + exact_target_count: int, + candidate_count: int, + confirmation_basis: DeploymentConfirmationBasis | None, + visibility_blocked: bool, + named_match_reason: str, + visibility_reason: str, + narrowed_reason: str, + blocked_reason: str, +) -> DeploymentRowAdmission: if exact_target_count == 1 and confirmation_basis in _CANONICAL_CONFIRMATION_BASES: return DeploymentRowAdmission( state="named match", admitted=True, - reason=( - "The source already looks change-capable and the downstream target is joined by " - "canonical evidence rather than name-only inference." - ), + reason=named_match_reason, ) - - if visibility_issue: + if visibility_blocked: return DeploymentRowAdmission( state="visibility blocked", admitted=True, - reason=( - "The source already looks change-capable, but current scope does not confirm " - "which downstream Azure targets are visible enough to name." - ), + reason=visibility_reason, ) - if candidate_count > 0: return DeploymentRowAdmission( state="narrowed candidates", admitted=True, - reason=( - "The source already looks change-capable, but current evidence narrows the next " - "review set without proving one exact downstream target." - ), + reason=narrowed_reason, ) - return DeploymentRowAdmission( state="blocked", admitted=False, - reason=( - "Visible execution or service-connection posture alone is not enough; a default " - "deployment-path row also needs a named target, a narrowed candidate set, or an " - "explicit visibility block." - ), + reason=blocked_reason, ) diff --git a/src/azurefox/chains/registry.py b/src/azurefox/chains/registry.py index 616d827..4555db2 100644 --- a/src/azurefox/chains/registry.py +++ b/src/azurefox/chains/registry.py @@ -226,6 +226,21 @@ class ChainFamilySpec: "control other app or service-principal boundaries." ), ), + ChainSourceSpec( + command="keyvault", + minimum_fields=( + "name", + "vault_uri", + "public_network_access", + "enable_rbac_authorization", + "access_policy_count", + "summary", + ), + rationale=( + "Provides the visible secret-store boundary when the deployment path relies " + "on Key Vault-backed variable or input support." + ), + ), ChainSourceSpec( command="arm-deployments", minimum_fields=( diff --git a/src/azurefox/chains/runner.py b/src/azurefox/chains/runner.py index 0a3cb30..ce58703 100644 --- a/src/azurefox/chains/runner.py +++ b/src/azurefox/chains/runner.py @@ -33,6 +33,10 @@ from azurefox.models.commands import ChainsCommandOutput from azurefox.models.common import ArmDeploymentSummary, CollectionIssue, CommandMetadata from azurefox.registry import get_command_specs +from azurefox.target_matching import ( + normalize_exact_target_host, + normalize_exact_target_resource_id, +) _CANDIDATE_LIMIT = 3 _JOIN_QUALITY_ORDER = { @@ -201,6 +205,7 @@ def _build_deployment_path_output( permissions_output = loaded["permissions"] rbac_output = loaded["rbac"] role_trusts_output = loaded["role-trusts"] + keyvault_output = loaded["keyvault"] arm_output = loaded["arm-deployments"] app_services_output = loaded["app-services"] functions_output = loaded["functions"] @@ -239,6 +244,11 @@ def _build_deployment_path_output( arm_correlations = _arm_correlations_by_target_family( [item.model_dump(mode="json") for item in arm_output.deployments] ) + key_vaults_by_name: dict[str, list[dict]] = defaultdict(list) + for item in keyvault_output.key_vaults: + if not item.name: + continue + key_vaults_by_name[_normalize_target_name(item.name)].append(item.model_dump(mode="json")) permissions_by_principal = { item.principal_id: item.model_dump(mode="json") for item in permissions_output.permissions @@ -277,6 +287,10 @@ def _build_deployment_path_output( pipeline_dict, trusts_by_object_id, ) + pipeline_dict["joined_key_vaults"] = _deployment_joined_key_vaults( + pipeline_dict, + key_vaults_by_name, + ) assessment = assess_deployment_source(pipeline) for target_family in assessment.target_family_hints: target_spec = _DEPLOYMENT_TARGET_SPECS.get(target_family) @@ -319,6 +333,10 @@ def _build_deployment_path_output( account_dict, trusts_by_source_id, ) + account_dict["joined_key_vaults"] = _deployment_joined_key_vaults( + account_dict, + key_vaults_by_name, + ) assessment = assess_deployment_source(account) if assessment.posture == "insufficient evidence": continue @@ -377,6 +395,7 @@ def _build_deployment_path_output( "permissions", "rbac", "role-trusts", + "keyvault", "arm-deployments", "app-services", "functions", @@ -548,6 +567,7 @@ def _build_deployment_source_record( ) ): record_confirmation_basis = "source-issue-present" + record_source["confirmation_basis"] = record_confirmation_basis semantic = evaluate_chain_semantics( ChainSemanticContext( @@ -621,11 +641,15 @@ def _build_deployment_source_record( source_command, record_source, assessment=record_assessment, + target_family=target_family, + selected_targets=selected_targets, + target_resolution=admission.state, ), likely_impact=_deployment_likely_impact( target_label=target_spec["label"], target_names=target_names, target_resolution=admission.state, + confirmation_basis=record_confirmation_basis, missing_target_mapping=record_assessment.missing_target_mapping, ), confidence_boundary=_deployment_confidence_boundary( @@ -674,7 +698,9 @@ def _build_deployment_source_record( source=record_source, source_command=source_command, assessment=record_assessment, + target_family=target_family, target_label=target_spec["label"], + selected_targets=selected_targets, target_names=target_names, target_resolution=admission.state, confirmation_basis=record_confirmation_basis, @@ -748,35 +774,147 @@ def _automation_target_matches( candidates: list[dict], supporting_deployments: list[dict], ) -> tuple[list[dict], list[dict], str | None]: + evidence_groups = _automation_target_evidence_groups(source) runbook_names = _automation_runbook_names(source) - if not runbook_names: + if not evidence_groups and not runbook_names: return [], [], None - normalized_runbook_names = {_normalize_target_name(name) for name in runbook_names} - runbook_tokens = _automation_runbook_tokens(runbook_names) - if not runbook_tokens: - return [], [], None + best_name_only_targets: list[dict] = [] + best_name_only_rank = 99 + for rank, group in enumerate(evidence_groups): + matched_targets = _automation_exact_name_matches(candidates, group["names"]) + if not matched_targets: + continue + if group["allow_exact"] and supporting_deployments and len(matched_targets) == 1: + return matched_targets, matched_targets, "same-workload-corroborated" + if ( + not best_name_only_targets + or len(matched_targets) < len(best_name_only_targets) + or (len(matched_targets) == len(best_name_only_targets) and rank < best_name_only_rank) + ): + best_name_only_targets = matched_targets + best_name_only_rank = rank + + if best_name_only_targets: + return [], best_name_only_targets, "name-only-inference" + + overlap_tokens = _automation_overlap_signal_tokens(source, runbook_names) + if overlap_tokens: + narrowed_targets = [ + dict(candidate) + for candidate in candidates + if _automation_candidate_overlap_tokens(candidate, overlap_tokens) + ] + if narrowed_targets and supporting_deployments: + return [], narrowed_targets, "same-workload-corroborated" - exact_targets = [ - dict(candidate) - for candidate in candidates - if _normalize_target_name(str(candidate.get("name") or "")) in normalized_runbook_names - ] - if exact_targets: - confirmation_basis = ( - "same-workload-corroborated" if supporting_deployments else "name-only-inference" + return [], [], None + + +def _automation_target_evidence_groups(source: dict) -> list[dict[str, object]]: + groups: list[dict[str, object]] = [] + active_trigger_names = _automation_active_trigger_names(source) + if active_trigger_names: + groups.append({"names": active_trigger_names, "allow_exact": True}) + primary_runbook = str(source.get("primary_runbook_name") or "").strip() + if primary_runbook: + groups.append({"names": [primary_runbook], "allow_exact": True}) + active_mode_runbooks = _automation_active_mode_runbook_names(source) + if active_mode_runbooks: + groups.append({"names": active_mode_runbooks, "allow_exact": True}) + fallback_runbooks = _automation_runbook_names(source) + if fallback_runbooks: + groups.append({"names": fallback_runbooks, "allow_exact": False}) + deduped_groups: list[dict[str, object]] = [] + seen: set[tuple[str, ...]] = set() + for group in groups: + names = [str(name or "").strip() for name in group["names"] if str(name or "").strip()] + normalized_group = tuple(sorted(_normalize_target_name(name) for name in names)) + if not normalized_group or normalized_group in seen: + continue + seen.add(normalized_group) + deduped_groups.append( + { + "names": names, + "allow_exact": bool(group["allow_exact"]), + } ) - return exact_targets, exact_targets, confirmation_basis + return deduped_groups - narrowed_targets = [ + +def _automation_exact_name_matches(candidates: list[dict], names: list[str]) -> list[dict]: + normalized_names = {_normalize_target_name(name) for name in names if name} + return [ dict(candidate) for candidate in candidates - if _automation_candidate_overlap_tokens(candidate, runbook_tokens) + if _normalize_target_name(str(candidate.get("name") or "")) in normalized_names ] - if narrowed_targets and supporting_deployments: - return [], narrowed_targets, "same-workload-corroborated" - return [], [], None + +def _automation_active_mode_runbook_names(source: dict) -> list[str]: + primary_mode = str(source.get("primary_start_mode") or "").strip().lower() or None + names: list[str] = [] + + primary_runbook = str(source.get("primary_runbook_name") or "").strip() + if primary_runbook: + names.append(primary_runbook) + + if primary_mode == "webhook": + _append_unique_runbook_names(names, source.get("webhook_runbook_names") or []) + elif primary_mode == "schedule": + _append_unique_runbook_names(names, source.get("schedule_runbook_names") or []) + elif primary_mode in {"manual-only", "published-runbook"}: + _append_unique_runbook_names(names, source.get("published_runbook_names") or []) + + return names + + +def _automation_active_trigger_names(source: dict) -> list[str]: + primary_mode = str(source.get("primary_start_mode") or "").strip().lower() + if primary_mode == "webhook": + prefix = "automation-webhook:" + raw_segment = "/webhooks/" + elif primary_mode == "schedule": + prefix = "automation-job-schedule:" + raw_segment = "/jobschedules/" + else: + return [] + + names: list[str] = [] + for value in source.get("trigger_join_ids") or []: + text = str(value or "").strip() + lowered = text.lower() + if lowered.startswith(prefix): + trigger_name = text.split(":", 1)[1].strip() + elif raw_segment in lowered: + trigger_name = text.rsplit("/", 1)[-1].strip() + else: + continue + if trigger_name and trigger_name not in names: + names.append(trigger_name) + return names + + +def _automation_overlap_signal_tokens( + source: dict, + runbook_names: list[str], +) -> set[str]: + trigger_tokens = _automation_runbook_tokens(_automation_active_trigger_names(source)) + if trigger_tokens: + return trigger_tokens + active_mode_tokens = _automation_runbook_tokens(_automation_active_mode_runbook_names(source)) + if active_mode_tokens: + return active_mode_tokens + if source.get("primary_runbook_name") or source.get("primary_start_mode"): + return set() + return _automation_runbook_tokens(runbook_names) + + +def _append_unique_runbook_names(names: list[str], values: list[object]) -> None: + for value in values: + text = str(value or "").strip() + if text and text not in names: + names.append(text) def _automation_runbook_names(source: dict) -> list[str]: @@ -1121,6 +1259,41 @@ def _devops_joined_role_trusts( return [best_rows[key] for key in sorted(best_rows, key=lambda item: best_scores[item])] +def _deployment_joined_key_vaults( + source: dict, + key_vaults_by_name: dict[str, list[dict]], +) -> list[dict]: + joined: list[dict] = [] + seen: set[str] = set() + for name in _deployment_named_key_vaults(source): + key = _normalize_target_name(name) + if not key: + continue + for vault in key_vaults_by_name.get(key, []): + vault_id = str(vault.get("id") or "").strip() or key + if vault_id in seen: + continue + seen.add(vault_id) + joined.append(dict(vault)) + return joined + + +def _deployment_named_key_vaults(source: dict) -> list[str]: + names: list[str] = [] + for value in source.get("key_vault_names") or []: + text = str(value or "").strip() + if text and text not in names: + names.append(text) + for value in source.get("secret_dependency_ids") or []: + text = str(value or "").strip() + if not text.lower().startswith("keyvault:"): + continue + name = text.split(":", 1)[1].strip() + if name and name not in names: + names.append(name) + return names + + def _devops_service_principal_refs(source: dict) -> list[str]: refs: list[str] = [] for value in source.get("azure_service_connection_principal_ids") or []: @@ -1472,6 +1645,9 @@ def _deployment_why_care( source: dict, *, assessment: DeploymentSourceAssessment, + target_family: str, + selected_targets: list[dict], + target_resolution: str, ) -> str: source_name = str(source.get("name") or source.get("id") or source_command) support_parts = _deployment_support_phrase_parts(source) @@ -1506,22 +1682,15 @@ def _deployment_why_care( if source_command == "devops": trusted_input = _devops_primary_trusted_input(source) sentence = _devops_why_care_intro(source, trusted_input=trusted_input) - grounded_reach = _deployment_grounded_reach_clause(source) - if grounded_reach: - sentence = f"{sentence} {grounded_reach}" - - if support_parts: - sentence = ( - f"{sentence} Additional visible deployment support around this path includes " - + " and ".join(support_parts) - + "." - ) - permission_clause = _devops_permission_clause(source) - if permission_clause: - sentence = f"{sentence} {permission_clause}." - trust_clause = _devops_role_trust_clause(source) - if trust_clause: - sentence = f"{sentence} {trust_clause}." + for clause in _deployment_why_care_clauses( + source_command=source_command, + source=source, + support_parts=support_parts, + target_family=target_family, + selected_targets=selected_targets, + target_resolution=target_resolution, + ): + sentence = f"{sentence} {clause}" if source.get("missing_target_mapping"): sentence = ( f"{sentence} AzureFox has not yet mapped the downstream Azure footprint cleanly." @@ -1557,29 +1726,73 @@ def _deployment_why_care( sentence = ( f"Automation account '{source_name}' combines " + ", ".join(surface_parts) + "." ) - grounded_reach = _deployment_grounded_reach_clause(source) - if grounded_reach: - sentence = f"{sentence} {grounded_reach}" - if support_parts: - sentence = ( - f"{sentence} Additional visible deployment support around this account includes " - + " and ".join(support_parts) - + "." - ) current_operator_suffix = _deployment_current_operator_suffix(source_command, source) - if current_operator_suffix: - sentence = f"{sentence} {current_operator_suffix}" - permission_clause = _automation_permission_clause(source) - if permission_clause: - sentence = f"{sentence} {permission_clause}." - trust_clause = _automation_role_trust_clause(source) - if trust_clause: - sentence = f"{sentence} {trust_clause}." + for clause in _deployment_why_care_clauses( + source_command=source_command, + source=source, + support_parts=support_parts, + target_family=target_family, + selected_targets=selected_targets, + target_resolution=target_resolution, + include_current_operator_suffix=current_operator_suffix, + ): + sentence = f"{sentence} {clause}" return sentence return "Visible source evidence suggests Azure change capability" +def _deployment_why_care_clauses( + *, + source_command: str, + source: dict, + support_parts: list[str], + target_family: str, + selected_targets: list[dict], + target_resolution: str, + include_current_operator_suffix: str | None = None, +) -> list[str]: + clauses: list[str] = [] + grounded_reach = _deployment_grounded_reach_clause(source) + if grounded_reach: + clauses.append(grounded_reach) + if support_parts: + support_subject = "path" if source_command == "devops" else "account" + clauses.append( + f"Additional visible deployment support around this {support_subject} includes " + + " and ".join(support_parts) + + "." + ) + key_vault_clause = _deployment_key_vault_support_clause(source) + if key_vault_clause: + clauses.append(key_vault_clause) + if include_current_operator_suffix: + clauses.append(include_current_operator_suffix) + permission_clause = ( + _devops_permission_clause(source) + if source_command == "devops" + else _automation_permission_clause(source) + ) + if permission_clause: + clauses.append(permission_clause + ".") + trust_clause = ( + _devops_role_trust_clause(source) + if source_command == "devops" + else _automation_role_trust_clause(source) + ) + if trust_clause: + clauses.append(trust_clause + ".") + target_clause = _deployment_target_evidence_clause( + target_family=target_family, + selected_targets=selected_targets, + target_resolution=target_resolution, + confirmation_basis=str(source.get("confirmation_basis") or "") or None, + ) + if target_clause: + clauses.append(target_clause) + return clauses + + def _devops_why_care_intro(source: dict, *, trusted_input: dict | None) -> str: trusted_input_text = _devops_trusted_input_text(trusted_input) execution_context = _deployment_execution_context("devops", source) @@ -1654,6 +1867,35 @@ def _deployment_support_phrase_parts(source: dict) -> list[str]: ] +def _deployment_key_vault_support_clause(source: dict) -> str | None: + named_vaults = _deployment_named_key_vaults(source) + joined_vaults = source.get("joined_key_vaults") + if isinstance(joined_vaults, list) and joined_vaults: + shown = ", ".join( + _key_vault_support_summary(vault) + for vault in joined_vaults[:_CANDIDATE_LIMIT] + if isinstance(vault, dict) + ) + if shown: + return f"Visible Key Vault support includes {shown}." + if named_vaults: + shown_names = ", ".join(named_vaults[:_CANDIDATE_LIMIT]) + return ( + "AzureFox has not matched named Key Vault support " + f"({shown_names}) to visible vault inventory." + ) + return None + + +def _key_vault_support_summary(vault: dict) -> str: + name = str(vault.get("name") or vault.get("vault_uri") or "visible vault").strip() + public_network = str(vault.get("public_network_access") or "").strip() or None + auth_mode = "RBAC auth" if vault.get("enable_rbac_authorization") else "access-policy auth" + if public_network: + return f"vault '{name}' ({public_network} network, {auth_mode})" + return f"vault '{name}' ({auth_mode})" + + def _deployment_current_operator_suffix(source_command: str, source: dict) -> str: if source_command == "devops": primary_input = _devops_primary_trusted_input(source) @@ -1738,6 +1980,118 @@ def _source_current_operator_can_drive(source_command: str, source: dict) -> boo return None +def _deployment_target_evidence_clause( + *, + target_family: str, + selected_targets: list[dict], + target_resolution: str, + confirmation_basis: str | None = None, +) -> str | None: + if not selected_targets: + return None + if target_resolution == "named match" and len(selected_targets) == 1: + selected_target = selected_targets[0] + summary = str(selected_target.get("summary") or "").strip() + if summary: + if confirmation_basis in {"normalized-uri-match", "resource-id-match"}: + return f"Matched visible target-side record: {summary}" + if confirmation_basis == "same-workload-corroborated": + return f"Corroborated target-side record for this path: {summary}" + return f"Visible target-side record: {summary}" + target_name = str(selected_target.get("name") or "").strip() + if not target_name: + return None + target_label = _DEPLOYMENT_TARGET_SPECS[target_family]["label"].lower() + if confirmation_basis in {"normalized-uri-match", "resource-id-match"}: + return f"Matched visible target-side record: {target_label} '{target_name}'." + if confirmation_basis == "same-workload-corroborated": + return f"Corroborated target-side record for this path: {target_label} '{target_name}'." + return f"Visible target-side record: {target_label} '{target_name}'." + if target_resolution != "narrowed candidates": + return None + if target_family == "app-services": + return _deployment_app_service_target_clause(selected_targets) + if target_family == "functions": + return _deployment_function_target_clause(selected_targets) + if target_family == "aks": + return _deployment_aks_target_clause(selected_targets) + if target_family == "arm-deployments": + return _deployment_arm_target_clause(selected_targets) + return None + + +def _deployment_app_service_target_clause(targets: list[dict]) -> str: + total = len(targets) + public_enabled = sum( + str(item.get("public_network_access") or "").strip().lower() == "enabled" + for item in targets + ) + identities = sum(bool(item.get("workload_identity_type")) for item in targets) + weaker_posture = sum( + item.get("https_only") is False + or str(item.get("min_tls_version") or "").strip() in {"1.0", "1.1"} + for item in targets + ) + details = [ + f"Visible App Service evidence keeps {total} candidate(s) in play; " + f"{public_enabled} keep public network access enabled and {identities} carry " + "managed identity." + ] + if weaker_posture: + details.append(f"{weaker_posture} still show weaker HTTPS or TLS posture.") + return " ".join(details) + + +def _deployment_function_target_clause(targets: list[dict]) -> str: + total = len(targets) + identities = sum(bool(item.get("workload_identity_type")) for item in targets) + deployment_signals = sum( + bool(item.get("azure_webjobs_storage_value_type")) + or int(item.get("key_vault_reference_count") or 0) > 0 + for item in targets + ) + return ( + f"Visible Function evidence keeps {total} candidate(s) in play; " + f"{identities} carry managed identity and {deployment_signals} still show storage- or " + "Key Vault-backed deployment signals." + ) + + +def _deployment_aks_target_clause(targets: list[dict]) -> str: + total = len(targets) + private_api = sum(item.get("private_cluster_enabled") is True for item in targets) + workload_identity = sum(item.get("workload_identity_enabled") is True for item in targets) + service_principal = sum( + str(item.get("cluster_identity_type") or "").strip() == "ServicePrincipal" + for item in targets + ) + return ( + f"Visible AKS evidence keeps {total} candidate(s) in play; " + f"{private_api} keep private API endpoints, {workload_identity} keep workload identity " + f"enabled, and {service_principal} still use service principal auth." + ) + + +def _deployment_arm_target_clause(targets: list[dict]) -> str: + scope_counts: defaultdict[str, int] = defaultdict(int) + providers: set[str] = set() + for item in targets: + scope_counts[str(item.get("scope_type") or "unknown")] += 1 + providers.update(str(provider) for provider in item.get("providers") or [] if provider) + scope_parts = [] + for scope in ("resource_group", "subscription", "management_group", "tenant", "unknown"): + count = scope_counts.get(scope) + if not count: + continue + label = scope.replace("_", " ") + scope_parts.append(f"{count} {label} deployment(s)") + provider_names = ", ".join(sorted(providers)[:_CANDIDATE_LIMIT + 1]) + sentence = "Visible ARM deployment history keeps " + ", ".join(scope_parts) + " in play." + if provider_names: + sentence = f"{sentence[:-1]} across providers {provider_names}." + return sentence + + def _source_current_operator_can_inject(source_command: str, source: dict) -> bool | None: if source_command == "devops": injection_surfaces = source.get("current_operator_injection_surface_types") or [] @@ -1870,13 +2224,18 @@ def _deployment_likely_impact( target_label: str, target_names: list[str], target_resolution: str, + confirmation_basis: str | None, missing_target_mapping: bool, ) -> str: lowered_label = target_label.lower() if missing_target_mapping: return f"Azure footprint not yet mapped; visible {lowered_label} clues only" if target_resolution == "named match": - return f"exact {lowered_label}: {', '.join(target_names[:_CANDIDATE_LIMIT])}" + return _deployment_named_match_label( + lowered_label, + target_names, + confirmation_basis=confirmation_basis, + ) if target_resolution == "narrowed candidates": shown = ", ".join(target_names[:_CANDIDATE_LIMIT]) if shown: @@ -1917,8 +2276,14 @@ def _deployment_confidence_boundary( if current_operator_can_inject: if target_resolution == "named match": + control_boundary = _deployment_operator_control_boundary( + source_command, + source, + target_label, + confirmation_basis, + ) return ( - f"{_deployment_operator_control_boundary(source_command, source, target_label)}, " + f"{control_boundary}, " f"but not {_deployment_remaining_identity_boundary(source_command, source)}." ) if target_resolution == "narrowed candidates": @@ -1934,8 +2299,9 @@ def _deployment_confidence_boundary( if current_operator_can_drive: if target_resolution == "named match": return ( - f"This row proves current-credential run-path control and the exact " - f"{target_label} target, but not a writable source." + "This row proves current-credential run-path control and " + f"{_deployment_named_match_narrative(target_label, confirmation_basis)}, " + "but not a writable source." ) if target_resolution == "narrowed candidates": return ( @@ -1954,6 +2320,13 @@ def _deployment_confidence_boundary( f"This row proves the exact {target_label} target from parsed source clues. " "Current evidence does not show that current credentials can run this path." ) + if confirmation_basis == "same-workload-corroborated": + return ( + "This row proves " + f"{_deployment_named_match_narrative(target_label, confirmation_basis)} " + "through same-workload corroboration. Current evidence does not show that " + "current credentials can run this path." + ) return ( f"This row proves the exact {target_label} target. Current evidence does not show " "that current credentials can run this path." @@ -1993,36 +2366,57 @@ def _deployment_operator_control_boundary( source_command: str | None, source: dict | None, target_label: str, + confirmation_basis: str | None, ) -> str: if source_command == "devops" and source is not None: identity_name = _devops_execution_identity_name(source) + target_phrase = _deployment_named_match_narrative(target_label, confirmation_basis) if _devops_current_operator_control_mode(source) == "definition-edit": if identity_name: return ( "Current evidence shows you can edit this pipeline definition so it runs as " - f"Azure identity '{identity_name}' against the exact {target_label} target" + f"Azure identity '{identity_name}' against {target_phrase}" ) return ( "Current evidence shows you can edit this pipeline definition so it runs " - f"against the exact {target_label} target" + f"against {target_phrase}" ) if identity_name: return ( "Current evidence shows you can poison this trusted input so it runs as Azure " - f"identity '{identity_name}' against the exact {target_label} target" + f"identity '{identity_name}' against {target_phrase}" ) return ( - "Current evidence shows you can poison this trusted input against the exact " - f"{target_label} target" + "Current evidence shows you can poison this trusted input against " + f"{target_phrase}" ) if source_command == "automation": return ( - "Current evidence shows you can control this source-side path against the exact " - f"{target_label} target" + "Current evidence shows you can control this source-side path against the " + f"{_deployment_named_match_narrative(target_label, confirmation_basis)}" ) return f"Current evidence shows source poisoning and the exact {target_label} target" +def _deployment_named_match_label( + lowered_label: str, + target_names: list[str], + *, + confirmation_basis: str | None, +) -> str: + prefix = "corroborated exact" if confirmation_basis == "same-workload-corroborated" else "exact" + return f"{prefix} {lowered_label}: {', '.join(target_names[:_CANDIDATE_LIMIT])}" + + +def _deployment_named_match_narrative( + target_label: str, + confirmation_basis: str | None, +) -> str: + if confirmation_basis == "same-workload-corroborated": + return f"the corroborated exact {target_label} target" + return f"the exact {target_label} target" + + def _deployment_source_control_label( source_command: str | None, source: dict | None, @@ -2123,6 +2517,7 @@ def _deployment_next_review( target_resolution=target_resolution, target_label=target_label, target_names=target_names, + confirmation_basis=str(source.get("confirmation_basis") or "") or None, supporting_deployments=supporting_deployments, ) ) @@ -2170,6 +2565,7 @@ def _deployment_next_review( target_resolution=target_resolution, target_label=target_label, target_names=target_names, + confirmation_basis=str(source.get("confirmation_basis") or "") or None, supporting_deployments=supporting_deployments, ) ) @@ -2181,11 +2577,15 @@ def _deployment_target_review_step( target_resolution: str, target_label: str, target_names: list[str], + confirmation_basis: str | None, supporting_deployments: list[dict], ) -> str: shown_targets = ", ".join(target_names[:_CANDIDATE_LIMIT]) if target_resolution == "named match" and shown_targets: - step = f"AzureFox already named the exact {target_label} target {shown_targets}" + if confirmation_basis == "same-workload-corroborated": + step = f"AzureFox already corroborated the exact {target_label} target {shown_targets}" + else: + step = f"AzureFox already named the exact {target_label} target {shown_targets}" elif shown_targets: step = ( f"AzureFox already narrowed the visible {target_label} candidates to {shown_targets}; " @@ -2242,6 +2642,10 @@ def _deployment_joined_surfaces( joined.append("permission-summary") if source.get("joined_role_trusts"): joined.append("trust-edge") + if source.get("joined_key_vaults"): + joined.append("keyvault-support") + if source_command == "automation" and source is not None and source.get("joined_key_vaults"): + joined.append("keyvault-support") return sorted(dict.fromkeys(joined)) @@ -2250,14 +2654,23 @@ def _deployment_summary( source: dict, source_command: str, assessment: DeploymentSourceAssessment, + target_family: str, target_label: str, + selected_targets: list[dict], target_names: list[str], target_resolution: str, confirmation_basis: str | None, target_visibility_note: str | None, supporting_deployments: list[dict], ) -> str: - summary = _deployment_why_care(source_command, source, assessment=assessment) + summary = _deployment_why_care( + source_command, + source, + assessment=assessment, + target_family=target_family, + selected_targets=selected_targets, + target_resolution=target_resolution, + ) if assessment.missing_target_mapping: impact_sentence = ( f"AzureFox has not yet mapped the downstream Azure footprint cleanly, so " @@ -2269,8 +2682,12 @@ def _deployment_summary( f"scope cannot name visible {target_label} targets." ) elif target_resolution == "named match": + target_phrase = _deployment_named_match_narrative( + target_label, + confirmation_basis, + ) impact_sentence = ( - f"The likeliest downstream Azure footprint is the exact visible {target_label} target " + f"The likeliest downstream Azure footprint is {target_phrase} " f"{', '.join(target_names[:_CANDIDATE_LIMIT])}." ) else: @@ -2317,42 +2734,142 @@ def _structured_deployment_target_matches( target_family: str, candidates: list[dict], ) -> tuple[list[dict], str | None]: - structured_names = _structured_target_names(source, target_family) - if not structured_names: - return [], None - matched = [ - item - for item in candidates - if _normalize_target_name(str(item.get("name") or "")) in structured_names - ] - if matched: - return matched, "parsed-config-target" - return [], "parsed-config-target" - - -def _structured_target_names(source: dict, target_family: str) -> set[str]: - family_tokens = { - "aks": {"aks", "kubernetes"}, - "app-services": {"appservice", "app-service"}, - "functions": {"function", "functions", "functionapp", "function-app"}, - "arm-deployments": {"deployment", "arm", "bicep", "terraform"}, - }[target_family] - normalized_names: set[str] = set() + signals = _structured_target_signals(source, target_family) + if signals["resource_ids"]: + matched = [ + item + for item in candidates + if _deployment_candidate_resource_ids(item, target_family) & signals["resource_ids"] + ] + if matched: + return matched, "resource-id-match" + if signals["hosts"]: + matched = [ + item + for item in candidates + if _deployment_candidate_hosts(item, target_family) & signals["hosts"] + ] + if matched: + return matched, "normalized-uri-match" + if signals["names"]: + matched = [ + item + for item in candidates + if _normalize_target_name(str(item.get("name") or "")) in signals["names"] + ] + if matched: + return matched, "parsed-config-target" + return [], "parsed-config-target" + if signals["resource_ids"] or signals["hosts"]: + return [], "parsed-config-target" + return [], None + + +def _structured_target_signals(source: dict, target_family: str) -> dict[str, set[str]]: + signals = { + "names": set(), + "resource_ids": set(), + "hosts": set(), + } for clue in source.get("target_clues", []) or []: text = str(clue).strip() lowered = text.lower() + payloads: list[str] = [] if ":" in text: prefix, raw_name = text.split(":", 1) - prefix_tokens = { - token.strip().lower().replace(" ", "").replace("/", "") - for token in prefix.split("/") - if token.strip() - } - if prefix_tokens & family_tokens and raw_name.strip(): - normalized_names.add(_normalize_target_name(raw_name)) + normalized_prefix = _normalize_target_name(prefix) + payload = raw_name.strip() + if ( + normalized_prefix in _STRUCTURED_TARGET_PREFIXES[target_family] + and _looks_like_structured_target_payload(payload, target_family) + ): + payloads.append(payload) elif lowered.startswith("target="): - normalized_names.add(_normalize_target_name(text.split("=", 1)[1])) - return normalized_names + payload = text.split("=", 1)[1].strip() + if _looks_like_structured_target_payload(payload, target_family): + payloads.append(payload) + elif ( + text.startswith("/subscriptions/") + or normalize_exact_target_host(text, target_family=target_family) is not None + ): + payloads.append(text) + + for payload in payloads: + if payload.startswith("/subscriptions/"): + normalized_resource_id = normalize_exact_target_resource_id( + payload, + target_family=target_family, + ) + if normalized_resource_id: + signals["resource_ids"].add(normalized_resource_id) + continue + normalized_resource_id = normalize_exact_target_resource_id( + payload, + target_family=target_family, + ) + if normalized_resource_id: + signals["resource_ids"].add(normalized_resource_id) + continue + host = normalize_exact_target_host(payload, target_family=target_family) + if host: + signals["hosts"].add(host) + continue + if "://" in payload: + continue + normalized_name = _normalize_target_name(payload) + if normalized_name: + signals["names"].add(normalized_name) + return signals + + +_STRUCTURED_TARGET_PREFIXES = { + "aks": {"aks", "kubernetes", "akskubernetes"}, + "app-services": {"appservice", "appservices"}, + "functions": {"function", "functions", "functionapp", "azurefunctions"}, + "arm-deployments": {"deployment", "arm", "bicep", "terraform", "armbicepterraform"}, +} + + +def _looks_like_structured_target_payload(payload: str, target_family: str) -> bool: + text = str(payload or "").strip() + if not text: + return False + if normalize_exact_target_resource_id(text, target_family=target_family): + return True + if normalize_exact_target_host(text, target_family=target_family): + return True + if "://" in text or "/" in text or " " in text or "." in text: + return False + return bool(re.fullmatch(r"[A-Za-z0-9_-]+", text)) + + +def _deployment_candidate_resource_ids(item: dict, target_family: str) -> set[str]: + ids: set[str] = set() + for value in (item.get("id"), item.get("scope")): + normalized = normalize_exact_target_resource_id( + str(value or ""), + target_family=target_family, + ) + if normalized: + ids.add(normalized) + return ids + + +def _deployment_candidate_hosts(item: dict, target_family: str) -> set[str]: + hosts: set[str] = set() + if target_family in {"app-services", "functions"}: + host = normalize_exact_target_host( + str(item.get("default_hostname") or ""), + target_family=target_family, + ) + if host: + hosts.add(host) + if target_family == "aks": + for value in (item.get("fqdn"), item.get("private_fqdn")): + host = normalize_exact_target_host(str(value or ""), target_family=target_family) + if host: + hosts.add(host) + return hosts def _normalize_target_name(value: str) -> str: @@ -2433,6 +2950,14 @@ def _deployment_missing_confirmation( + _devops_missing_source_control_text(definite=False) + " on the source side." ) + primary_mode = str(source.get("primary_start_mode") or "").strip() or None + primary_runbook = str(source.get("primary_runbook_name") or "").strip() or None + if primary_runbook and primary_mode: + return ( + f"Current evidence names the likely {target_label} target and the {primary_mode} " + f"path into runbook {primary_runbook}, but does not show that current credentials " + "can control that path." + ) return ( f"Current evidence names the likely {target_label} target, but does not confirm which " "specific runbook or current-credential start path performs that Azure change." diff --git a/src/azurefox/collectors/provider.py b/src/azurefox/collectors/provider.py index f8ae40b..744b36e 100644 --- a/src/azurefox/collectors/provider.py +++ b/src/azurefox/collectors/provider.py @@ -39,6 +39,7 @@ privesc_proven_path, privesc_summary, ) +from azurefox.target_matching import looks_like_exact_target_value from azurefox.tokens_credential_hints import tokens_credential_next_review_hint _DNS_RESOURCE_API_VERSION = { @@ -4154,7 +4155,6 @@ def _automation_object_join_ids(items: list[object] | None, *, prefix: str) -> l raw_id = _string_value(getattr(item, "id", None)) if raw_id: ids.append(raw_id) - continue name = _string_value(getattr(item, "name", None)) if name: ids.append(f"{prefix}:{name}") @@ -10493,68 +10493,102 @@ def _devops_structured_target_clues( ) -> list[str]: structured: list[str] = [] lowered_broad_clues = {value.lower() for value in broad_clues} - for path, node in _recursive_nodes(definition): - if not isinstance(node, dict): - continue - - app_service_name = _devops_named_target_input( - node, - token_groups=( + target_specs = ( + { + "target_family": "app-services", + "label": "App Service", + "broad_clue": "app service", + "named_groups": ( ("azure", "web", "app", "name"), ("web", "app", "name"), ("app", "service", "name"), ), - ) - if not app_service_name and "app service" in lowered_broad_clues: - app_service_name = _devops_named_target_input( - node, - token_groups=(("app", "name"),), - ) - if app_service_name: - structured.append(f"App Service: {app_service_name}") - - function_name = _devops_named_target_input( - node, - token_groups=( + "fallback_named_groups": (("app", "name"),), + "exact_groups": ( + ("azure", "web", "app"), + ("web", "app"), + ("app", "service"), + ), + "allow_host_or_url": True, + "needs_arm_context": False, + }, + { + "target_family": "functions", + "label": "Functions", + "broad_clue": "functions", + "named_groups": ( ("azure", "function", "app", "name"), ("function", "app", "name"), ), - ) - if not function_name and "functions" in lowered_broad_clues: - function_name = _devops_named_target_input( - node, - token_groups=(("function", "name"),), - ) - if function_name: - structured.append(f"Functions: {function_name}") - - cluster_name = _devops_named_target_input( - node, - token_groups=( + "fallback_named_groups": (("function", "name"),), + "exact_groups": ( + ("azure", "function", "app"), + ("function", "app"), + ("function",), + ), + "allow_host_or_url": True, + "needs_arm_context": False, + }, + { + "target_family": "aks", + "label": "AKS/Kubernetes", + "broad_clue": "aks/kubernetes", + "named_groups": ( ("aks", "cluster", "name"), ("kubernetes", "cluster", "name"), ), - ) - if not cluster_name and "aks/kubernetes" in lowered_broad_clues: - cluster_name = _devops_named_target_input( - node, - token_groups=(("cluster", "name"),), - ) - if cluster_name: - structured.append(f"AKS/Kubernetes: {cluster_name}") - + "fallback_named_groups": (("cluster", "name"),), + "exact_groups": ( + ("aks", "cluster"), + ("kubernetes", "cluster"), + ("kubernetes",), + ), + "allow_host_or_url": True, + "needs_arm_context": False, + }, + { + "target_family": "arm-deployments", + "label": "ARM/Bicep/Terraform", + "broad_clue": "arm/bicep/terraform", + "named_groups": (("deployment", "name"),), + "fallback_named_groups": (), + "exact_groups": (("deployment",),), + "allow_host_or_url": False, + "needs_arm_context": True, + }, + ) + for path, node in _recursive_nodes(definition): + if not isinstance(node, dict): + continue node_tokens = { token for key in node for token in _devops_identifier_tokens(key) } | set(_devops_path_tokens(path)) - if "arm/bicep/terraform" in lowered_broad_clues or _devops_node_has_arm_target_context( - node_tokens - ): - deployment_name = _devops_named_target_input( + arm_context_present = _devops_node_has_arm_target_context(node_tokens) + for spec in target_specs: + broad_present = spec["broad_clue"] in lowered_broad_clues + if spec["needs_arm_context"] and not (broad_present or arm_context_present): + continue + named_target = _devops_named_target_input( + node, + token_groups=spec["named_groups"], + ) + if not named_target and broad_present and spec["fallback_named_groups"]: + named_target = _devops_named_target_input( + node, + token_groups=spec["fallback_named_groups"], + ) + if named_target: + structured.append(f"{spec['label']}: {named_target}") + exact_target = _devops_exact_target_input( node, - token_groups=(("deployment", "name"),), + target_family=spec["target_family"], + node_tokens=node_tokens, + token_groups=spec["exact_groups"], + broad_clue_present=broad_present, + allow_host_or_url=spec["allow_host_or_url"], ) - if deployment_name: - structured.append(f"ARM/Bicep/Terraform: {deployment_name}") + if exact_target: + structured.append(f"{spec['label']}: {exact_target}") return structured @@ -10586,6 +10620,53 @@ def _devops_named_target_input( return None +def _devops_exact_target_input( + node: dict[str, object], + *, + target_family: str, + node_tokens: set[str], + token_groups: tuple[tuple[str, ...], ...], + broad_clue_present: bool, + allow_host_or_url: bool, +) -> str | None: + generic_exact_tokens = { + "resourceid", + "resource", + "id", + "url", + "uri", + "hostname", + "host", + "fqdn", + } + family_tokens = {token for group in token_groups for token in group} + for key, value in node.items(): + if not isinstance(value, str): + continue + cleaned_value = value.strip() + if not cleaned_value or _looks_like_expression(cleaned_value): + continue + if not looks_like_exact_target_value( + cleaned_value, + target_family=target_family, + allow_host_or_url=allow_host_or_url, + ): + continue + key_tokens = set(_devops_identifier_tokens(key)) + has_direct_context = any(set(group).issubset(key_tokens) for group in token_groups) + has_generic_exact_key = bool(key_tokens & generic_exact_tokens) + has_family_key_context = bool(key_tokens & family_tokens) + has_node_context = any(set(group).issubset(node_tokens) for group in token_groups) + if has_direct_context or ( + broad_clue_present + and has_generic_exact_key + and has_family_key_context + and has_node_context + ): + return cleaned_value + return None + + def _devops_secret_support_types( *, secret_variable_names: list[str], diff --git a/src/azurefox/target_matching.py b/src/azurefox/target_matching.py new file mode 100644 index 0000000..9aba7ef --- /dev/null +++ b/src/azurefox/target_matching.py @@ -0,0 +1,67 @@ +from __future__ import annotations + +from urllib.parse import urlparse + +_TARGET_HOST_SUFFIXES = { + "app-services": (".azurewebsites.net",), + "functions": (".azurewebsites.net",), + "aks": (".azmk8s.io",), + "arm-deployments": (), +} + +_TARGET_RESOURCE_ID_FRAGMENTS = { + "app-services": ("/providers/microsoft.web/sites/",), + "functions": ("/providers/microsoft.web/sites/",), + "aks": ("/providers/microsoft.containerservice/managedclusters/",), + "arm-deployments": ("/providers/microsoft.resources/deployments/",), +} + + +def normalize_exact_target_host(value: str, *, target_family: str) -> str | None: + text = str(value or "").strip() + if not text: + return None + parsed = urlparse(text) + if parsed.scheme and parsed.netloc: + hostname = (parsed.hostname or parsed.netloc).strip().lower() + if hostname and _is_canonical_exact_target_host(hostname, target_family=target_family): + return hostname + return None + if "." in text and " " not in text and "/" not in text.strip("/"): + hostname = text.lower() + if _is_canonical_exact_target_host(hostname, target_family=target_family): + return hostname + return None + + +def _is_canonical_exact_target_host(hostname: str, *, target_family: str) -> bool: + allowed_suffixes = _TARGET_HOST_SUFFIXES.get(target_family, ()) + if not any(hostname.endswith(suffix) for suffix in allowed_suffixes): + return False + if target_family in {"app-services", "functions"}: + return hostname.count(".") == 2 + return True + + +def normalize_exact_target_resource_id(value: str | None, *, target_family: str) -> str | None: + text = str(value or "").strip() + if not text.startswith("/subscriptions/"): + return None + normalized = text.rstrip("/").lower() + allowed_fragments = _TARGET_RESOURCE_ID_FRAGMENTS.get(target_family, ()) + if allowed_fragments and not any(fragment in normalized for fragment in allowed_fragments): + return None + return normalized + + +def looks_like_exact_target_value( + value: str, + *, + target_family: str, + allow_host_or_url: bool, +) -> bool: + if normalize_exact_target_resource_id(value, target_family=target_family): + return True + if not allow_host_or_url: + return False + return normalize_exact_target_host(value, target_family=target_family) is not None diff --git a/tests/test_cli_smoke.py b/tests/test_cli_smoke.py index 83d8f73..a4ba376 100644 --- a/tests/test_cli_smoke.py +++ b/tests/test_cli_smoke.py @@ -220,6 +220,7 @@ def test_cli_smoke_chains_deployment_path_json(tmp_path: Path) -> None: "permissions", "rbac", "role-trusts", + "keyvault", "arm-deployments", "aks", "functions", @@ -279,15 +280,12 @@ def test_cli_smoke_chains_deployment_path_json(tmp_path: Path) -> None: ) assert "role-trusts" in automation_row["evidence_commands"] assert "rbac" in automation_row["evidence_commands"] - assert "This row proves source-side control" in automation_row["confidence_boundary"] assert "not the exact App Service target" in automation_row["confidence_boundary"] + assert automation_row["confirmation_basis"] == "same-workload-corroborated" assert "ops-deploy-sp" in automation_row["why_care"] - assert automation_row["target_names"] == ["app-empty-mi", "app-public-api"] - assert ( - automation_row["likely_impact"] - == "2 visible app service candidate(s): app-empty-mi, app-public-api" - ) - assert "AzureFox already narrowed the visible App Service candidates" in automation_row[ + assert automation_row["target_names"] == ["app-public-api"] + assert automation_row["likely_impact"] == "1 visible app service candidate(s): app-public-api" + assert "narrowed the visible App Service candidates to app-public-api" in automation_row[ "next_review" ] assert "editable trigger or definition path" not in automation_row["next_review"] @@ -297,15 +295,19 @@ def test_cli_smoke_chains_deployment_path_json(tmp_path: Path) -> None: ) assert "app-failed" in automation_row["next_review"] assert "recurring Azure execution" in automation_row["why_care"] + assert "Visible App Service evidence keeps 1 candidate(s) in play" in automation_row["why_care"] aks_row = next(item for item in payload["paths"] if item["asset_name"] == "deploy-aks-prod") assert aks_row["actionability_state"] == "conditionally actionable" assert "Queue this pipeline now" in aks_row["insertion_point"] assert "permission-summary" in aks_row["joined_surface_types"] assert "permissions" in aks_row["evidence_commands"] assert "keyvault" in aks_row["evidence_commands"] + assert "kv-prod-shared" in aks_row["why_care"] + assert "Key Vault support" in aks_row["why_care"] assert "current-credential run-path control" in aks_row["confidence_boundary"] assert "not a writable source" in aks_row["confidence_boundary"] assert "exact AKS cluster target" in aks_row["confidence_boundary"] + assert "candidate(s) in play" in aks_row["why_care"] assert "AzureFox already narrowed the visible AKS cluster candidates" in aks_row["next_review"] appsvc_row = next( item for item in payload["paths"] if item["asset_name"] == "deploy-appservice-prod" @@ -317,8 +319,10 @@ def test_cli_smoke_chains_deployment_path_json(tmp_path: Path) -> None: assert appsvc_row["target_names"] == ["app-public-api"] assert appsvc_row["likely_impact"] == "exact app service: app-public-api" assert "trust-edge" in appsvc_row["joined_surface_types"] - assert "runs as Azure identity 'build-sp'" in appsvc_row["confidence_boundary"] - assert "exact App Service target" in appsvc_row["confidence_boundary"] + assert "Azure identity 'build-sp'" in appsvc_row["confidence_boundary"] + assert "App Service target" in appsvc_row["confidence_boundary"] + assert "target-side record" in appsvc_row["why_care"] + assert "app-public-api.azurewebsites.net" in appsvc_row["why_care"] assert ( "separate direct sign-in as Azure identity 'build-sp'" in appsvc_row["confidence_boundary"] @@ -327,6 +331,9 @@ def test_cli_smoke_chains_deployment_path_json(tmp_path: Path) -> None: "AzureFox already named the exact App Service target app-public-api" in appsvc_row["next_review"] ) + plan_row = next(item for item in payload["paths"] if item["asset_name"] == "plan-infra-prod") + assert "kv-platform-shared" in plan_row["why_care"] + assert "Key Vault support" in plan_row["why_care"] def test_cli_smoke_chains_escalation_path_json(tmp_path: Path) -> None: diff --git a/tests/test_collectors.py b/tests/test_collectors.py index 35c0653..1655381 100644 --- a/tests/test_collectors.py +++ b/tests/test_collectors.py @@ -2394,6 +2394,123 @@ def test_devops_structured_target_clues_ignore_generic_names_without_azure_conte assert clues == [] +def test_devops_structured_target_clues_capture_exact_urls_and_resource_ids() -> None: + clues = _devops_structured_target_clues( + { + "process": { + "phases": [ + { + "steps": [ + { + "inputs": { + "azureWebAppResourceId": ( + "/subscriptions/sub/resourceGroups/rg-apps/providers/" + "Microsoft.Web/sites/app-public-api" + ), + "aksClusterUrl": ( + "https://aks-ops-01-abcd1234.privatelink.eastus.azmk8s.io" + ), + "deploymentResourceId": ( + "/subscriptions/sub/resourceGroups/rg-app/providers/" + "Microsoft.Resources/deployments/app-failed" + ), + } + } + ] + } + ] + } + }, + broad_clues=["App Service", "AKS/Kubernetes", "ARM/Bicep/Terraform"], + ) + + assert ( + "App Service: /subscriptions/sub/resourceGroups/rg-apps/providers/" + "Microsoft.Web/sites/app-public-api" + ) in clues + assert ( + "AKS/Kubernetes: https://aks-ops-01-abcd1234.privatelink.eastus.azmk8s.io" + ) in clues + assert ( + "ARM/Bicep/Terraform: /subscriptions/sub/resourceGroups/rg-app/providers/" + "Microsoft.Resources/deployments/app-failed" + ) in clues + + +def test_devops_structured_target_clues_ignore_non_azure_hosts_and_generic_urls() -> None: + clues = _devops_structured_target_clues( + { + "process": { + "phases": [ + { + "steps": [ + { + "inputs": { + "appServiceUrl": "https://example.com/releases/app", + "functionHost": "release-1.2.3", + "aksClusterUrl": "https://cluster.internal.local", + } + } + ] + } + ] + } + }, + broad_clues=["App Service", "Functions", "AKS/Kubernetes"], + ) + + assert clues == [] + + +def test_devops_structured_target_clues_ignore_wrong_family_resource_ids() -> None: + clues = _devops_structured_target_clues( + { + "process": { + "phases": [ + { + "steps": [ + { + "inputs": { + "azureWebAppResourceId": ( + "/subscriptions/sub/resourceGroups/rg-workload/providers/" + "Microsoft.ContainerService/managedClusters/aks-ops-01" + ), + } + } + ] + } + ] + } + }, + broad_clues=["App Service"], + ) + + assert clues == [] + + +def test_devops_structured_target_clues_ignore_appservice_scm_hosts() -> None: + clues = _devops_structured_target_clues( + { + "process": { + "phases": [ + { + "steps": [ + { + "inputs": { + "azureWebAppUrl": "https://app-public-api.scm.azurewebsites.net", + } + } + ] + } + ] + } + }, + broad_clues=["App Service"], + ) + + assert clues == [] + + def test_devops_finalize_trusted_inputs_adds_exists_only_proof_metadata() -> None: trusted_inputs = _devops_finalize_trusted_inputs( trusted_inputs=[ diff --git a/tests/test_deployment_path_admissibility.py b/tests/test_deployment_path_admissibility.py index b90b9bb..3eb5231 100644 --- a/tests/test_deployment_path_admissibility.py +++ b/tests/test_deployment_path_admissibility.py @@ -12,6 +12,7 @@ _automation_current_operator_access, _automation_scope_label, _best_automation_target_mapping, + _deployment_joined_key_vaults, _structured_deployment_target_matches, ) from azurefox.models.common import ( @@ -249,6 +250,101 @@ def test_structured_target_clue_can_reach_exact_named_match_input() -> None: assert [item["name"] for item in matches] == ["app-public-api"] +def test_structured_target_clue_can_match_exact_resource_id() -> None: + pipeline = DevopsPipelineAsset( + id="devops/pipeline/release-appservice-prod", + definition_id="88", + name="release-appservice-prod", + project_name="prod-platform", + azure_service_connection_names=["prod-appsvc-wif"], + target_clues=[ + "App Service: " + "/subscriptions/sub/resourceGroups/rg-apps/providers/" + "Microsoft.Web/sites/app-public-api" + ], + summary="Structured target resource id test.", + ) + + matches, confirmation_basis = _structured_deployment_target_matches( + pipeline.model_dump(mode="json"), + "app-services", + [ + { + "id": ( + "/subscriptions/sub/resourceGroups/rg-apps/providers/Microsoft.Web/sites/" + "app-public-api" + ), + "name": "not-the-join-key", + } + ], + ) + + assert confirmation_basis == "resource-id-match" + assert [item["id"] for item in matches] == [ + "/subscriptions/sub/resourceGroups/rg-apps/providers/Microsoft.Web/sites/app-public-api" + ] + + +def test_structured_target_clue_can_match_exact_hostname() -> None: + pipeline = DevopsPipelineAsset( + id="devops/pipeline/release-appservice-prod", + definition_id="88", + name="release-appservice-prod", + project_name="prod-platform", + azure_service_connection_names=["prod-appsvc-wif"], + target_clues=["App Service: https://app-public-api.azurewebsites.net"], + summary="Structured target hostname test.", + ) + + matches, confirmation_basis = _structured_deployment_target_matches( + pipeline.model_dump(mode="json"), + "app-services", + [ + { + "id": ( + "/subscriptions/sub/resourceGroups/rg-apps/providers/Microsoft.Web/sites/" + "app-public-api" + ), + "name": "app-public-api", + "default_hostname": "app-public-api.azurewebsites.net", + } + ], + ) + + assert confirmation_basis == "normalized-uri-match" + assert [item["name"] for item in matches] == ["app-public-api"] + + +def test_structured_target_clue_can_match_exact_aks_private_fqdn() -> None: + pipeline = DevopsPipelineAsset( + id="devops/pipeline/deploy-aks-prod", + definition_id="17", + name="deploy-aks-prod", + project_name="prod-platform", + azure_service_connection_names=["prod-subscription"], + target_clues=["AKS/Kubernetes: https://aks-ops-01-abcd1234.privatelink.eastus.azmk8s.io"], + summary="Structured target AKS hostname test.", + ) + + matches, confirmation_basis = _structured_deployment_target_matches( + pipeline.model_dump(mode="json"), + "aks", + [ + { + "id": ( + "/subscriptions/sub/resourceGroups/rg-workload/providers/" + "Microsoft.ContainerService/managedClusters/aks-ops-01" + ), + "name": "aks-ops-01", + "private_fqdn": "aks-ops-01-abcd1234.privatelink.eastus.azmk8s.io", + } + ], + ) + + assert confirmation_basis == "normalized-uri-match" + assert [item["name"] for item in matches] == ["aks-ops-01"] + + def test_automation_current_operator_access_uses_role_definition_id_and_best_scope_match() -> None: access = _automation_current_operator_access( { @@ -353,65 +449,324 @@ def test_best_automation_target_mapping_uses_runbook_names_to_narrow_visible_tar assert mapping is not None assert mapping["target_family"] == "app-services" assert mapping["exact_targets"] == [] - assert [item["name"] for item in mapping["target_candidates"]] == [ - "app-empty-mi", - "app-public-api", - ] + assert [item["name"] for item in mapping["target_candidates"]] == ["app-public-api"] assert mapping["confirmation_basis"] == "same-workload-corroborated" def test_best_automation_target_mapping_does_not_remap_from_name_overlap_alone() -> None: account = _load_automation_account("aa-hybrid-prod").model_dump(mode="json") - app_services = json.loads( - ( - Path(__file__).resolve().parent / "fixtures" / "lab_tenant" / "app_services.json" - ).read_text(encoding="utf-8") + target_inputs = _load_automation_target_inputs() + + mapping = _best_automation_mapping( + account, + target_inputs=target_inputs, + arm_correlations={ + "app-services": [], + "functions": [], + "aks": [], + "arm-deployments": target_inputs["arm-deployments"], + }, ) - functions = json.loads( - (Path(__file__).resolve().parent / "fixtures" / "lab_tenant" / "functions.json").read_text( - encoding="utf-8" - ) + + assert mapping is None + + +def test_best_automation_target_mapping_uses_primary_runbook_path_for_exact_match() -> None: + account = _load_automation_account("aa-hybrid-prod").model_dump(mode="json") + account["primary_runbook_name"] = "app-public-api" + account["webhook_runbook_names"] = ["app-public-api", "app-empty-mi"] + account["published_runbook_names"] = ["app-public-api", "app-empty-mi", "func-orders"] + target_inputs = _load_automation_target_inputs() + + mapping = _best_automation_mapping( + account, + target_inputs=target_inputs, + arm_correlations={ + "app-services": [], + "functions": [], + "aks": [], + "arm-deployments": target_inputs["arm-deployments"], + }, ) - aks = json.loads( - (Path(__file__).resolve().parent / "fixtures" / "lab_tenant" / "aks.json").read_text( - encoding="utf-8" - ) + + assert mapping is not None + assert mapping["target_family"] == "app-services" + assert mapping["exact_targets"] == [] + assert [item["name"] for item in mapping["target_candidates"]] == ["app-public-api"] + assert mapping["confirmation_basis"] == "name-only-inference" + + +def test_best_automation_target_mapping_does_not_widen_from_non_active_published_runbooks() -> None: + account = _load_automation_account("aa-hybrid-prod").model_dump(mode="json") + account["primary_runbook_name"] = "func-orders" + account["primary_start_mode"] = "webhook" + account["webhook_runbook_names"] = ["func-orders"] + account["published_runbook_names"] = ["func-orders", "app-empty-mi", "app-public-api"] + target_inputs = _load_automation_target_inputs() + + mapping = _best_automation_mapping( + account, + target_inputs=target_inputs, + arm_correlations={ + "app-services": [target_inputs["arm-deployments"][2]], + "functions": [target_inputs["arm-deployments"][2]], + "aks": [], + "arm-deployments": target_inputs["arm-deployments"], + }, ) - arm = json.loads( - ( - Path(__file__).resolve().parent / "fixtures" / "lab_tenant" / "arm_deployments.json" - ).read_text(encoding="utf-8") + + assert mapping is not None + assert mapping["target_family"] == "functions" + assert [item["name"] for item in mapping["exact_targets"]] == ["func-orders"] + assert mapping["confirmation_basis"] == "same-workload-corroborated" + + +def test_best_automation_target_mapping_falls_back_to_published_name_only_inference() -> None: + account = _load_automation_account("aa-hybrid-prod").model_dump(mode="json") + account["primary_runbook_name"] = "Nightly-Reconcile" + account["primary_start_mode"] = "webhook" + account["webhook_runbook_names"] = ["Nightly-Reconcile"] + account["published_runbook_names"] = ["Nightly-Reconcile", "app-public-api"] + account["trigger_join_ids"] = ["automation-webhook:nightly-reconcile"] + target_inputs = _load_automation_target_inputs() + + mapping = _best_automation_mapping( + account, + target_inputs=target_inputs, + arm_correlations={ + "app-services": [target_inputs["arm-deployments"][2]], + "functions": [], + "aks": [], + "arm-deployments": target_inputs["arm-deployments"], + }, ) - mapping = _best_automation_target_mapping( + assert mapping is not None + assert mapping["exact_targets"] == [] + assert [item["name"] for item in mapping["target_candidates"]] == ["app-public-api"] + assert mapping["confirmation_basis"] == "name-only-inference" + + +def test_best_automation_target_mapping_supports_raw_trigger_join_ids() -> None: + account = _load_automation_account("aa-hybrid-prod").model_dump(mode="json") + account["trigger_join_ids"] = [ + "/subscriptions/sub/resourceGroups/rg-ops/providers/Microsoft.Automation/" + "automationAccounts/aa-hybrid-prod/webhooks/app-public-api" + ] + target_inputs = _load_automation_target_inputs() + + mapping = _best_automation_mapping( account, - target_candidates={ - "app-services": app_services["app_services"], - "functions": functions["function_apps"], - "aks": aks["aks_clusters"], - "arm-deployments": arm["deployments"], + target_inputs=target_inputs, + arm_correlations={ + "app-services": [target_inputs["arm-deployments"][2]], + "functions": [], + "aks": [], + "arm-deployments": target_inputs["arm-deployments"], }, - target_visibility_notes={ - "app-services": None, - "functions": None, - "aks": None, - "arm-deployments": None, + ) + + assert mapping is not None + assert mapping["target_family"] == "app-services" + assert [item["name"] for item in mapping["exact_targets"]] == ["app-public-api"] + assert mapping["confirmation_basis"] == "same-workload-corroborated" + + +def test_best_automation_target_mapping_does_not_promote_duplicate_name_collisions() -> None: + account = _load_automation_account("aa-hybrid-prod").model_dump(mode="json") + account["primary_runbook_name"] = "shared-api" + account["webhook_runbook_names"] = ["shared-api"] + target_inputs = _load_automation_target_inputs() + target_inputs["app-services"] = [ + { + "id": ( + "/subscriptions/sub/resourceGroups/rg-apps/providers/" + "Microsoft.Web/sites/shared-a" + ), + "name": "shared-api", }, - target_visibility_issues={ - "app-services": None, - "functions": None, - "aks": None, - "arm-deployments": None, + { + "id": ( + "/subscriptions/sub/resourceGroups/rg-apps/providers/" + "Microsoft.Web/sites/shared-b" + ), + "name": "shared-api", }, + ] + + mapping = _best_automation_mapping( + account, + target_inputs=target_inputs, arm_correlations={ - "app-services": [], + "app-services": [target_inputs["arm-deployments"][2]], "functions": [], "aks": [], - "arm-deployments": arm["deployments"], + "arm-deployments": target_inputs["arm-deployments"], }, ) - assert mapping is None + assert mapping is not None + assert mapping["target_family"] == "app-services" + assert mapping["exact_targets"] == [] + assert [item["id"] for item in mapping["target_candidates"]] == [ + "/subscriptions/sub/resourceGroups/rg-apps/providers/Microsoft.Web/sites/shared-a", + "/subscriptions/sub/resourceGroups/rg-apps/providers/Microsoft.Web/sites/shared-b", + ] + assert mapping["confirmation_basis"] == "name-only-inference" + + +def test_structured_target_clue_ignores_non_azure_dotted_host_values() -> None: + pipeline = DevopsPipelineAsset( + id="devops/pipeline/release-appservice-prod", + definition_id="88", + name="release-appservice-prod", + project_name="prod-platform", + azure_service_connection_names=["prod-appsvc-wif"], + target_clues=["App Service: release-1.2.3"], + summary="Structured target dotted host guard.", + ) + + matches, confirmation_basis = _structured_deployment_target_matches( + pipeline.model_dump(mode="json"), + "app-services", + [ + { + "id": ( + "/subscriptions/sub/resourceGroups/rg-apps/providers/Microsoft.Web/sites/" + "app-public-api" + ), + "name": "app-public-api", + "default_hostname": "app-public-api.azurewebsites.net", + } + ], + ) + + assert matches == [] + assert confirmation_basis is None + + +def test_structured_target_clue_ignores_appservice_scm_hosts() -> None: + pipeline = DevopsPipelineAsset( + id="devops/pipeline/release-appservice-prod", + definition_id="88", + name="release-appservice-prod", + project_name="prod-platform", + azure_service_connection_names=["prod-appsvc-wif"], + target_clues=["App Service: https://app-public-api.scm.azurewebsites.net"], + summary="Structured target scm host guard.", + ) + + matches, confirmation_basis = _structured_deployment_target_matches( + pipeline.model_dump(mode="json"), + "app-services", + [ + { + "id": ( + "/subscriptions/sub/resourceGroups/rg-apps/providers/Microsoft.Web/sites/" + "app-public-api" + ), + "name": "app-public-api", + "default_hostname": "app-public-api.azurewebsites.net", + } + ], + ) + + assert matches == [] + assert confirmation_basis is None + + +def test_structured_target_clue_ignores_unstructured_prefixed_payloads() -> None: + pipeline = DevopsPipelineAsset( + id="devops/pipeline/release-appservice-prod", + definition_id="88", + name="release-appservice-prod", + project_name="prod-platform", + azure_service_connection_names=["prod-appsvc-wif"], + target_clues=["App Service: release notes for blue deployment"], + summary="Structured target prefixed payload guard.", + ) + + matches, confirmation_basis = _structured_deployment_target_matches( + pipeline.model_dump(mode="json"), + "app-services", + [ + { + "id": ( + "/subscriptions/sub/resourceGroups/rg-apps/providers/Microsoft.Web/sites/" + "app-public-api" + ), + "name": "app-public-api", + "default_hostname": "app-public-api.azurewebsites.net", + } + ], + ) + + assert matches == [] + assert confirmation_basis is None + + +def test_structured_target_clue_ignores_wrong_family_resource_ids() -> None: + pipeline = DevopsPipelineAsset( + id="devops/pipeline/release-appservice-prod", + definition_id="88", + name="release-appservice-prod", + project_name="prod-platform", + azure_service_connection_names=["prod-appsvc-wif"], + target_clues=[ + "App Service: " + "/subscriptions/sub/resourceGroups/rg-workload/providers/" + "Microsoft.ContainerService/managedClusters/aks-ops-01" + ], + summary="Structured target wrong-family resource id guard.", + ) + + matches, confirmation_basis = _structured_deployment_target_matches( + pipeline.model_dump(mode="json"), + "app-services", + [ + { + "id": ( + "/subscriptions/sub/resourceGroups/rg-apps/providers/Microsoft.Web/sites/" + "app-public-api" + ), + "name": "app-public-api", + } + ], + ) + + assert matches == [] + assert confirmation_basis is None + + +def test_deployment_joined_key_vaults_keeps_duplicate_named_vault_records() -> None: + joined = _deployment_joined_key_vaults( + { + "key_vault_names": ["kv-prod-shared"], + }, + { + "kv-prod-shared": [ + { + "id": ( + "/subscriptions/sub-a/resourceGroups/rg-a/providers/" + "Microsoft.KeyVault/vaults/kv-prod-shared" + ), + "name": "kv-prod-shared", + }, + { + "id": ( + "/subscriptions/sub-b/resourceGroups/rg-b/providers/" + "Microsoft.KeyVault/vaults/kv-prod-shared" + ), + "name": "kv-prod-shared", + }, + ] + }, + ) + + assert [item["id"] for item in joined] == [ + "/subscriptions/sub-a/resourceGroups/rg-a/providers/Microsoft.KeyVault/vaults/kv-prod-shared", + "/subscriptions/sub-b/resourceGroups/rg-b/providers/Microsoft.KeyVault/vaults/kv-prod-shared", + ] def _load_devops_pipeline(name: str) -> DevopsPipelineAsset: @@ -426,6 +781,45 @@ def _load_devops_pipeline(name: str) -> DevopsPipelineAsset: raise AssertionError(f"missing devops fixture pipeline {name}") +def _best_automation_mapping( + account: dict, + *, + target_inputs: dict[str, list[dict]], + arm_correlations: dict[str, list[dict]], +) -> dict[str, object] | None: + return _best_automation_target_mapping( + account, + target_candidates=target_inputs, + target_visibility_notes={ + "app-services": None, + "functions": None, + "aks": None, + "arm-deployments": None, + }, + target_visibility_issues={ + "app-services": None, + "functions": None, + "aks": None, + "arm-deployments": None, + }, + arm_correlations=arm_correlations, + ) + + +def _load_automation_target_inputs() -> dict[str, list[dict]]: + fixtures_dir = Path(__file__).resolve().parent / "fixtures" / "lab_tenant" + app_services = json.loads((fixtures_dir / "app_services.json").read_text(encoding="utf-8")) + functions = json.loads((fixtures_dir / "functions.json").read_text(encoding="utf-8")) + aks = json.loads((fixtures_dir / "aks.json").read_text(encoding="utf-8")) + arm = json.loads((fixtures_dir / "arm_deployments.json").read_text(encoding="utf-8")) + return { + "app-services": app_services["app_services"], + "functions": functions["function_apps"], + "aks": aks["aks_clusters"], + "arm-deployments": arm["deployments"], + } + + def _load_automation_account(name: str) -> AutomationAccountAsset: payload = json.loads( (Path(__file__).resolve().parent / "fixtures" / "lab_tenant" / "automation.json").read_text( From 58f4d02ccb98a4a286275d243f715f4a67e2a4b0 Mon Sep 17 00:00:00 2001 From: Colby Farley Date: Fri, 10 Apr 2026 22:34:21 -0500 Subject: [PATCH 2/2] fix deployment-path guardrail regressions --- src/azurefox/chains/registry.py | 1 - src/azurefox/chains/runner.py | 26 ++++++++++++++------------ 2 files changed, 14 insertions(+), 13 deletions(-) diff --git a/src/azurefox/chains/registry.py b/src/azurefox/chains/registry.py index 4555db2..efcc0e2 100644 --- a/src/azurefox/chains/registry.py +++ b/src/azurefox/chains/registry.py @@ -234,7 +234,6 @@ class ChainFamilySpec: "public_network_access", "enable_rbac_authorization", "access_policy_count", - "summary", ), rationale=( "Provides the visible secret-store boundary when the deployment path relies " diff --git a/src/azurefox/chains/runner.py b/src/azurefox/chains/runner.py index ce58703..e15a30c 100644 --- a/src/azurefox/chains/runner.py +++ b/src/azurefox/chains/runner.py @@ -1645,12 +1645,13 @@ def _deployment_why_care( source: dict, *, assessment: DeploymentSourceAssessment, - target_family: str, - selected_targets: list[dict], - target_resolution: str, + target_family: str | None = None, + selected_targets: list[dict] | None = None, + target_resolution: str = "narrowed candidates", ) -> str: source_name = str(source.get("name") or source.get("id") or source_command) support_parts = _deployment_support_phrase_parts(source) + selected_targets = selected_targets or [] support_phrase = ( " and ".join(support_parts) if support_parts else "secret-backed deployment support" ) @@ -1747,7 +1748,7 @@ def _deployment_why_care_clauses( source_command: str, source: dict, support_parts: list[str], - target_family: str, + target_family: str | None, selected_targets: list[dict], target_resolution: str, include_current_operator_suffix: str | None = None, @@ -1782,14 +1783,15 @@ def _deployment_why_care_clauses( ) if trust_clause: clauses.append(trust_clause + ".") - target_clause = _deployment_target_evidence_clause( - target_family=target_family, - selected_targets=selected_targets, - target_resolution=target_resolution, - confirmation_basis=str(source.get("confirmation_basis") or "") or None, - ) - if target_clause: - clauses.append(target_clause) + if target_family: + target_clause = _deployment_target_evidence_clause( + target_family=target_family, + selected_targets=selected_targets, + target_resolution=target_resolution, + confirmation_basis=str(source.get("confirmation_basis") or "") or None, + ) + if target_clause: + clauses.append(target_clause) return clauses