diff --git a/README.md b/README.md index 9d0b151..c30a4a1 100644 --- a/README.md +++ b/README.md @@ -99,15 +99,15 @@ azurefox permissions | Section | Commands | | --- | --- | -| `core` | `inventory` | -| `identity` | `whoami`, `rbac`, `principals`, `permissions`, `privesc`, `role-trusts`, `lighthouse`, `auth-policies`, `managed-identities` | -| `config` | `arm-deployments`, `env-vars` | -| `secrets` | `keyvault`, `tokens-credentials` | -| `resource` | `automation`, `devops`, `acr`, `api-mgmt`, `databases`, `resource-trusts` | -| `storage` | `storage` | -| `network` | `nics`, `dns`, `endpoints`, `network-effective`, `network-ports` | -| `compute` | `workloads`, `app-services`, `functions`, `aks`, `vms`, `vmss`, `snapshots-disks` | -| orchestration | `chains` | +| `core` | [`inventory`](https://github.com/TacoRocket/AzureFox/wiki/Inventory) | +| `identity` | [`whoami`](https://github.com/TacoRocket/AzureFox/wiki/Whoami), [`rbac`](https://github.com/TacoRocket/AzureFox/wiki/RBAC), [`principals`](https://github.com/TacoRocket/AzureFox/wiki/Principals), [`permissions`](https://github.com/TacoRocket/AzureFox/wiki/Permissions), [`privesc`](https://github.com/TacoRocket/AzureFox/wiki/Privesc), [`role-trusts`](https://github.com/TacoRocket/AzureFox/wiki/Role-Trusts), [`lighthouse`](https://github.com/TacoRocket/AzureFox/wiki/Lighthouse), [`auth-policies`](https://github.com/TacoRocket/AzureFox/wiki/Auth-Policies), [`managed-identities`](https://github.com/TacoRocket/AzureFox/wiki/Managed-Identities) | +| `config` | [`arm-deployments`](https://github.com/TacoRocket/AzureFox/wiki/Arm-Deployments), [`env-vars`](https://github.com/TacoRocket/AzureFox/wiki/Env-Vars) | +| `secrets` | [`keyvault`](https://github.com/TacoRocket/AzureFox/wiki/Keyvault), [`tokens-credentials`](https://github.com/TacoRocket/AzureFox/wiki/Tokens-Credentials) | +| `resource` | [`automation`](https://github.com/TacoRocket/AzureFox/wiki/Automation), [`devops`](https://github.com/TacoRocket/AzureFox/wiki/Devops), [`acr`](https://github.com/TacoRocket/AzureFox/wiki/ACR), [`api-mgmt`](https://github.com/TacoRocket/AzureFox/wiki/API-Mgmt), [`databases`](https://github.com/TacoRocket/AzureFox/wiki/Databases), [`resource-trusts`](https://github.com/TacoRocket/AzureFox/wiki/Resource-Trusts) | +| `storage` | [`storage`](https://github.com/TacoRocket/AzureFox/wiki/Storage) | +| `network` | [`nics`](https://github.com/TacoRocket/AzureFox/wiki/Nics), [`dns`](https://github.com/TacoRocket/AzureFox/wiki/DNS), [`endpoints`](https://github.com/TacoRocket/AzureFox/wiki/Endpoints), [`network-effective`](https://github.com/TacoRocket/AzureFox/wiki/Network-Effective), [`network-ports`](https://github.com/TacoRocket/AzureFox/wiki/Network-Ports) | +| `compute` | [`workloads`](https://github.com/TacoRocket/AzureFox/wiki/Workloads), [`app-services`](https://github.com/TacoRocket/AzureFox/wiki/App-Services), [`functions`](https://github.com/TacoRocket/AzureFox/wiki/Functions), [`aks`](https://github.com/TacoRocket/AzureFox/wiki/AKS), [`vms`](https://github.com/TacoRocket/AzureFox/wiki/VMs), [`vmss`](https://github.com/TacoRocket/AzureFox/wiki/VMSS), [`snapshots-disks`](https://github.com/TacoRocket/AzureFox/wiki/Snapshots-Disks) | +| orchestration | [`chains`](https://github.com/TacoRocket/AzureFox/wiki/Chains) | ## Need A Test Lab? diff --git a/src/azurefox/chains/runner.py b/src/azurefox/chains/runner.py index 1c80561..c5332f0 100644 --- a/src/azurefox/chains/runner.py +++ b/src/azurefox/chains/runner.py @@ -240,15 +240,28 @@ def _build_deployment_path_output( if str(item.principal_id or "") in current_identity_principal_ids ] trusts_by_source_id: dict[str, list[dict]] = defaultdict(list) + trusts_by_object_id: dict[str, list[dict]] = defaultdict(list) for trust in role_trusts_output.trusts: trust_row = trust.model_dump(mode="json") source_object_id = str(trust_row.get("source_object_id") or "") if source_object_id: trusts_by_source_id[source_object_id].append(trust_row) + trusts_by_object_id[source_object_id].append(trust_row) + target_object_id = str(trust_row.get("target_object_id") or "") + if target_object_id and target_object_id != source_object_id: + trusts_by_object_id[target_object_id].append(trust_row) paths: list[ChainPathRecord] = [] for pipeline in devops_output.pipelines: pipeline_dict = pipeline.model_dump(mode="json") + pipeline_dict["joined_permission"] = _devops_joined_permission( + pipeline_dict, + permissions_by_principal, + ) + pipeline_dict["joined_role_trusts"] = _devops_joined_role_trusts( + pipeline_dict, + trusts_by_object_id, + ) assessment = assess_deployment_source(pipeline) for target_family in assessment.target_family_hints: target_spec = _DEPLOYMENT_TARGET_SPECS.get(target_family) @@ -590,6 +603,7 @@ def _build_deployment_source_record( source_command, assessment.change_signals, supporting_deployments=supporting_deployments, + source=source, ), target_count=len(target_ids), target_ids=target_ids, @@ -601,6 +615,9 @@ def _build_deployment_source_record( path_concept=assessment.path_concept, target_family=target_family, target_resolution=admission.state, + target_names=target_names, + target_label=target_spec["label"], + supporting_deployments=supporting_deployments, ), summary=_deployment_summary( source=source, @@ -685,21 +702,27 @@ def _deployment_insertion_point( def _deployment_devops_insertion_point(source: dict) -> str: primary_input = _devops_primary_trusted_input(source) trusted_input_text = _devops_trusted_input_text(primary_input) - injection_surfaces = [ - str(value) - for value in (source.get("current_operator_injection_surface_types") or []) - if value - ] - if any(value != "definition-edit" for value in injection_surfaces): - surface_list = ", ".join( - value for value in injection_surfaces if value != "definition-edit" - ) + control_mode = _devops_current_operator_control_mode(source, primary_input=primary_input) + non_definition_surfaces = _devops_non_definition_injection_surfaces(source) + if control_mode == "trusted-input-poison": + surface_list = ", ".join(non_definition_surfaces) return f"Poison {trusted_input_text} through {surface_list}." - if "definition-edit" in injection_surfaces or source.get("current_operator_can_edit"): + if control_mode == "definition-edit": return "Edit the pipeline definition directly." - if source.get("current_operator_can_queue"): + if control_mode == "queue-only": if primary_input: access_state = str(primary_input.get("current_operator_access_state") or "") + if access_state == "use" and primary_input.get("input_type") == "secure-file": + return ( + f"Queue this pipeline now; {trusted_input_text} is usable in pipeline " + "context, but secure-file administration is still unproven." + ) + if access_state == "read" and primary_input.get("input_type") == "pipeline-artifact": + return ( + "Queue this pipeline now; the upstream producer behind " + f"{trusted_input_text} is inspectable, but producer control is still " + "unproven." + ) if access_state == "read": return f"Queue this pipeline now; {trusted_input_text} is only readable." if access_state == "exists-only": @@ -710,12 +733,22 @@ def _deployment_devops_insertion_point(source: dict) -> str: return "Queue this pipeline now, but source poisoning is still unproven." if primary_input: input_type = str(primary_input.get("input_type") or "") + access_state = str(primary_input.get("current_operator_access_state") or "") + if access_state == "use" and primary_input.get("input_type") == "secure-file": + return ( + f"{trusted_input_text} is usable in pipeline context, but secure-file " + "administration is unproven." + ) + if access_state == "read" and primary_input.get("input_type") == "pipeline-artifact": + return ( + f"The upstream producer behind {trusted_input_text} is inspectable, but " + "producer control is unproven." + ) if input_type == "pipeline-artifact": return ( f"Artifact trust is visible at {trusted_input_text}, but upstream producer control " "is unproven." ) - access_state = str(primary_input.get("current_operator_access_state") or "") if access_state == "read": return ( f"{trusted_input_text} is visible and readable, but not writable from " @@ -899,6 +932,107 @@ def _automation_current_operator_access( return best_access +def _devops_joined_permission( + source: dict, + permissions_by_principal: dict[str, dict], +) -> dict | None: + for ref in _devops_service_principal_refs(source): + if ref in permissions_by_principal: + return dict(permissions_by_principal[ref]) + return None + + +def _devops_joined_role_trusts( + source: dict, + trusts_by_object_id: dict[str, list[dict]], +) -> list[dict]: + known_refs = {ref for ref, _, _ in _devops_identity_ref_candidates(source)} + best_scores: dict[tuple[str, str, str], tuple[int, int, int, str, str]] = {} + best_rows: dict[tuple[str, str, str], dict] = {} + for ref, ref_kind, ref_rank in _devops_identity_ref_candidates(source): + for trust in trusts_by_object_id.get(ref, []): + key = ( + str(trust.get("source_object_id") or ""), + str(trust.get("trust_type") or ""), + str(trust.get("target_object_id") or ""), + ) + score = _devops_role_trust_sort_key( + trust, + matched_ref=ref, + matched_kind=ref_kind, + ref_rank=ref_rank, + known_refs=known_refs, + ) + if key not in best_scores or score < best_scores[key]: + best_scores[key] = score + best_rows[key] = dict(trust) + return [best_rows[key] for key in sorted(best_rows, key=lambda item: best_scores[item])] + + +def _devops_service_principal_refs(source: dict) -> list[str]: + refs: list[str] = [] + for value in source.get("azure_service_connection_principal_ids") or []: + text = str(value or "").strip() + if text and text not in refs: + refs.append(text) + return refs + + +def _devops_application_refs(source: dict) -> list[str]: + refs: list[str] = [] + for value in source.get("azure_service_connection_client_ids") or []: + text = str(value or "").strip() + if text and text not in refs: + refs.append(text) + return refs + + +def _devops_identity_ref_candidates(source: dict) -> list[tuple[str, str, int]]: + candidates: list[tuple[str, str, int]] = [] + for ref in _devops_service_principal_refs(source): + candidates.append((ref, "service-principal", 0)) + for ref in _devops_application_refs(source): + candidates.append((ref, "application", 1)) + return candidates + + +def _devops_identity_refs(source: dict) -> list[str]: + return [ref for ref, _, _ in _devops_identity_ref_candidates(source)] + + +def _devops_role_trust_sort_key( + trust: dict, + *, + matched_ref: str, + matched_kind: str, + ref_rank: int, + known_refs: set[str], +) -> tuple[int, int, int, str, str]: + source_object_id = str(trust.get("source_object_id") or "").strip() + target_object_id = str(trust.get("target_object_id") or "").strip() + both_known = int(not (source_object_id in known_refs and target_object_id in known_refs)) + matched_side_rank = 2 + if target_object_id == matched_ref: + matched_side_rank = 0 + elif source_object_id == matched_ref: + matched_side_rank = 1 + trust_type_rank = { + "federated-credential": 0, + "app-to-service-principal": 1, + "service-principal-owner": 2, + "app-owner": 3, + }.get(str(trust.get("trust_type") or ""), 9) + kind_rank = 0 if matched_kind == "service-principal" else 1 + return ( + both_known, + matched_side_rank, + min(ref_rank, kind_rank, 1), + trust_type_rank, + source_object_id, + target_object_id, + ) + + def _automation_current_operator_control_clause(source: dict) -> str | None: access = source.get("current_operator_access") if not isinstance(access, dict): @@ -1062,7 +1196,7 @@ def _automation_permission_clause(source: dict) -> str | None: or permission.get("principal_id") or "automation identity" ) - return f"automation identity '{principal_name}' already has {role_text} across {scope_text}" + return f"Azure identity '{principal_name}' already has {role_text} across {scope_text}" def _automation_role_trust_clause(source: dict) -> str | None: @@ -1074,21 +1208,113 @@ def _automation_role_trust_clause(source: dict) -> str | None: return None trust_type = str(trust.get("trust_type") or "") target_name = str(trust.get("target_name") or trust.get("target_object_id") or "unknown target") + source_name = str(trust.get("source_name") or trust.get("source_object_id") or "unknown source") if trust_type == "service-principal-owner": - return f"role-trusts also show owner-level control over service principal '{target_name}'" + return ( + f"AzureFox also sees a separate identity-control path into Azure identity " + f"'{target_name}' through service principal '{source_name}'" + ) if trust_type == "app-owner": - return f"role-trusts also show owner control over application '{target_name}'" + return ( + f"AzureFox also sees a separate app control path into app '{target_name}' " + f"through '{source_name}'" + ) if trust_type == "federated-credential": - return f"role-trusts also show federated trust into service principal '{target_name}'" + return ( + f"AzureFox also sees a separate app trust path into Azure identity " + f"'{target_name}' through app '{source_name}'" + ) if trust_type == "app-to-service-principal": return ( - "role-trusts also show application-permission reach into service " - f"principal '{target_name}'" + f"AzureFox also sees a separate app-permission path into Azure identity " + f"'{target_name}'" + ) + summary = str(trust.get("summary") or "").strip() + return summary or None + + +def _devops_permission_clause(source: dict) -> str | None: + permission = source.get("joined_permission") + if not isinstance(permission, dict): + return None + if not bool(permission.get("privileged")): + return None + roles = [str(role) for role in permission.get("high_impact_roles") or [] if role] + role_text = ", ".join(roles) or "high-impact RBAC" + scope_count = int(permission.get("scope_count") or 0) + scope_text = "subscription-wide scope" if scope_count <= 1 else f"{scope_count} visible scopes" + principal_name = str( + permission.get("display_name") + or permission.get("principal_id") + or "the Azure identity tied to this pipeline" + ) + return ( + f"This pipeline runs as Azure identity '{principal_name}', which already has " + f"{role_text} across {scope_text}" + ) + + +def _devops_role_trust_clause(source: dict) -> str | None: + trusts = source.get("joined_role_trusts") + if not isinstance(trusts, list) or not trusts: + return None + trust = trusts[0] if isinstance(trusts[0], dict) else None + if not trust: + return None + trust_type = str(trust.get("trust_type") or "") + target_name = str(trust.get("target_name") or trust.get("target_object_id") or "unknown target") + source_name = str(trust.get("source_name") or trust.get("source_object_id") or "unknown source") + same_identity = _devops_execution_identity_name(source) == target_name + target_identity_text = ( + "that same Azure identity" + if same_identity + else f"Azure identity '{target_name}'" + ) + if trust_type == "service-principal-owner": + return ( + f"AzureFox also sees a separate identity-control path into {target_identity_text} " + f"through service principal '{source_name}'" + ) + if trust_type == "app-owner": + return ( + f"AzureFox also sees a separate app control path through '{source_name}' " + f"into app '{target_name}'" ) + if trust_type == "federated-credential": + return ( + f"AzureFox also sees a separate app trust path into {target_identity_text} " + f"through app '{source_name}'" + ) + if trust_type == "app-to-service-principal": + return f"AzureFox also sees a separate app-permission path into {target_identity_text}" summary = str(trust.get("summary") or "").strip() return summary or None +def _devops_execution_identity_name(source: dict) -> str | None: + permission = source.get("joined_permission") + if isinstance(permission, dict): + text = str(permission.get("display_name") or permission.get("principal_id") or "").strip() + if text: + return text + + identity_refs = set(_devops_identity_refs(source)) + trusts = source.get("joined_role_trusts") + if isinstance(trusts, list): + for trust in trusts: + if not isinstance(trust, dict): + continue + target_object_id = str(trust.get("target_object_id") or "").strip() + target_name = str(trust.get("target_name") or "").strip() + if target_object_id and target_object_id in identity_refs and target_name: + return target_name + source_object_id = str(trust.get("source_object_id") or "").strip() + source_name = str(trust.get("source_name") or "").strip() + if source_object_id and source_object_id in identity_refs and source_name: + return source_name + return None + + def _deployment_why_care( source_command: str, source: dict, @@ -1129,29 +1355,23 @@ def _deployment_why_care( if source_command == "devops": trusted_input = _devops_primary_trusted_input(source) - if _source_current_operator_can_inject(source_command, source): - sentence = ( - f"This path trusts {_devops_trusted_input_text(trusted_input)}; poisoning it " - f"would execute under {_deployment_execution_context(source_command, source)} " - f"and could {consequence_phrase}." - ) - else: - sentence = ( - f"This path trusts {_devops_trusted_input_text(trusted_input)}; if that trusted " - f"input becomes attacker-controlled, poisoning it would execute under " - f"{_deployment_execution_context(source_command, source)} and could " - f"{consequence_phrase}." - ) - current_operator_suffix = _deployment_current_operator_suffix(source_command, source) - if current_operator_suffix: - sentence = f"{sentence} {current_operator_suffix}" + sentence = _devops_why_care_intro(source, trusted_input=trusted_input) + grounded_reach = _devops_grounded_reach_clause(source) + if grounded_reach: + sentence = f"{sentence} {grounded_reach}" if support_parts: sentence = ( - f"{sentence} The surrounding deployment support also includes " + f"{sentence} Visible deployment support around this path also includes " + " and ".join(support_parts) - + ", which could widen blast radius once execution is controlled." + + "." ) + permission_clause = _devops_permission_clause(source) + if permission_clause: + sentence = f"{sentence} {permission_clause}." + trust_clause = _devops_role_trust_clause(source) + if trust_clause: + sentence = f"{sentence} {trust_clause}." if source.get("missing_target_mapping"): sentence = ( f"{sentence} AzureFox has not yet mapped the downstream Azure footprint cleanly." @@ -1200,15 +1420,38 @@ def _deployment_why_care( sentence = f"{sentence} {current_operator_suffix}" permission_clause = _automation_permission_clause(source) if permission_clause: - sentence = f"{sentence} The {permission_clause}." + sentence = f"{sentence} {permission_clause}." trust_clause = _automation_role_trust_clause(source) if trust_clause: - sentence = f"{sentence} {trust_clause.capitalize()}." + sentence = f"{sentence} {trust_clause}." return sentence return "Visible source evidence suggests Azure change capability" +def _devops_why_care_intro(source: dict, *, trusted_input: dict | None) -> str: + trusted_input_text = _devops_trusted_input_text(trusted_input) + execution_context = _deployment_execution_context("devops", source) + control_mode = _devops_current_operator_control_mode(source, primary_input=trusted_input) + if control_mode == "definition-edit": + return ( + f"This path trusts {trusted_input_text}. Current credentials can already edit this " + f"pipeline definition directly. An edited run would use {execution_context} when it " + "makes changes in Azure." + ) + if _source_current_operator_can_inject("devops", source): + return ( + f"This path trusts {trusted_input_text}. Current credentials can already poison " + f"that source. A poisoned run would use {execution_context} when it makes changes " + "in Azure." + ) + return ( + f"This path trusts {trusted_input_text}. If that trusted input becomes " + f"attacker-controlled, a poisoned run would use {execution_context} when it makes " + "changes in Azure." + ) + + def _deployment_consequence_phrase(source: dict) -> str: labels = { "consume-secret-backed-deployment-material": "consume secret-backed deployment material", @@ -1260,21 +1503,18 @@ def _deployment_support_phrase_parts(source: dict) -> list[str]: def _deployment_current_operator_suffix(source_command: str, source: dict) -> str: if source_command == "devops": - injection_surfaces = [ - str(value) for value in (source.get("current_operator_injection_surface_types") or []) - ] primary_input = _devops_primary_trusted_input(source) - queue = source.get("current_operator_can_queue") - edit = source.get("current_operator_can_edit") - if any(value != "definition-edit" for value in injection_surfaces): + control_mode = _devops_current_operator_control_mode(source, primary_input=primary_input) + non_definition_surfaces = _devops_non_definition_injection_surfaces(source) + if control_mode == "trusted-input-poison": return ( "Current credentials can already poison that trusted input through " - + ", ".join(value for value in injection_surfaces if value != "definition-edit") + + ", ".join(non_definition_surfaces) + "." ) - if "definition-edit" in injection_surfaces or edit: + if control_mode == "definition-edit": return "Current credentials can already edit the pipeline definition directly." - if queue: + if control_mode == "queue-only": return ( "Current credentials can already queue this pipeline, but AzureFox has not yet " "proven that they can poison the trusted input." @@ -1284,6 +1524,15 @@ def _deployment_current_operator_suffix(source_command: str, source: dict) -> st if primary_input and primary_input.get("current_operator_access_state") else None ) + if ( + access_state == "use" + and primary_input + and primary_input.get("input_type") == "secure-file" + ): + return ( + "Current credentials can use that secure file in pipeline context, but " + "Azure DevOps evidence here does not prove secure-file administration." + ) if access_state == "read": if primary_input and primary_input.get("input_type") == "pipeline-artifact": return ( @@ -1311,7 +1560,11 @@ def _deployment_current_operator_suffix(source_command: str, source: dict) -> st ) return "Current evidence only shows that the trusted input exists." if source.get("missing_injection_point"): - return "AzureFox has not yet proven a poisonable trusted input for current credentials." + return ( + "AzureFox has not yet proven " + + _devops_missing_source_control_text(definite=False) + + " for current credentials." + ) if source_command == "automation": clause = _automation_current_operator_control_clause(source) if clause: @@ -1346,14 +1599,52 @@ def _source_current_operator_can_inject(source_command: str, source: dict) -> bo return None +def _devops_non_definition_injection_surfaces(source: dict) -> list[str]: + return [ + str(value) + for value in (source.get("current_operator_injection_surface_types") or []) + if value and str(value) != "definition-edit" + ] + + +def _devops_current_operator_control_mode( + source: dict, + *, + primary_input: dict | None = None, +) -> str | None: + if _devops_non_definition_injection_surfaces(source): + return "trusted-input-poison" + injection_surfaces = [ + str(value) + for value in (source.get("current_operator_injection_surface_types") or []) + if value + ] + if "definition-edit" in injection_surfaces or source.get("current_operator_can_edit"): + return "definition-edit" + if source.get("current_operator_can_queue"): + return "queue-only" + primary_input = primary_input or _devops_primary_trusted_input(source) + if primary_input: + access_state = str(primary_input.get("current_operator_access_state") or "") + if access_state == "use" and primary_input.get("input_type") == "secure-file": + return "secure-file-use" + if access_state == "read" and primary_input.get("input_type") == "pipeline-artifact": + return "artifact-read" + if access_state == "read": + return "trusted-input-read" + if access_state == "exists-only": + return "trusted-input-exists" + if source.get("missing_injection_point"): + return "unproven" + return None + + def _deployment_execution_context(source_command: str, source: dict) -> str: if source_command == "devops": - names = [ - str(value) for value in (source.get("azure_service_connection_names") or []) if value - ] - if names: - return "Azure service connection " + ", ".join(names) - return "the authenticated Azure deployment path behind this pipeline" + identity_name = _devops_execution_identity_name(source) + if identity_name: + return f"Azure identity '{identity_name}'" + return "the Azure identity tied to this pipeline" if source_command == "automation": identity_type = str(source.get("identity_type") or "").strip() if identity_type: @@ -1406,6 +1697,9 @@ def _deployment_likely_impact( if target_resolution == "named match": return f"exact {lowered_label}: {', '.join(target_names[:_CANDIDATE_LIMIT])}" if target_resolution == "narrowed candidates": + shown = ", ".join(target_names[:_CANDIDATE_LIMIT]) + if shown: + return f"{len(target_names)} visible {lowered_label} candidate(s): {shown}" return f"{len(target_names)} visible {lowered_label} candidate(s)" if target_resolution == "visibility blocked": return f"likely {lowered_label}; target-side visibility blocked" @@ -1423,117 +1717,147 @@ def _deployment_confidence_boundary( current_operator_can_inject: bool | None, missing_target_mapping: bool, ) -> str: - automation_sentences: list[str] = [] - if source_command == "automation" and source is not None: - primary_sentence = _automation_primary_run_path_evidence_sentence(source) - if primary_sentence: - automation_sentences.append(primary_sentence) - current_operator_sentence = _deployment_current_operator_suffix(source_command, source) - if current_operator_sentence: - automation_sentences.append(current_operator_sentence) - permission_clause = _automation_permission_clause(source) - if permission_clause: - automation_sentences.append(f"The {permission_clause}.") - trust_clause = _automation_role_trust_clause(source) - if trust_clause: - automation_sentences.append(f"{trust_clause.capitalize()}.") - + source_control_label = _deployment_source_control_label(source_command, source) if missing_target_mapping: - if source_command == "automation": - detail = " ".join(automation_sentences).strip() - if detail: - return ( - f"{detail} AzureFox has not yet mapped the real Azure footprint beyond " - f"{target_label} evidence." - ) - return ( - "AzureFox can ground downstream consequence here, but it has not yet mapped the " - f"real Azure footprint beyond {target_label} evidence." - ) if current_operator_can_inject: return ( - "You can control the source side now, but AzureFox has not yet mapped the " - f"downstream Azure footprint beyond {target_label} consequence evidence." + "This row proves source-side control, but AzureFox has not yet mapped the " + f"downstream Azure footprint beyond {target_label} evidence." ) if current_operator_can_drive: return ( - "You can start or edit this path now, but AzureFox has not yet mapped the " - f"downstream Azure footprint beyond {target_label} consequence evidence." + "This row proves current-credential run-path control, but AzureFox has not yet " + f"mapped the downstream Azure footprint beyond {target_label} evidence." ) return ( - "AzureFox can ground downstream consequence here, but it has not yet mapped the " - f"real Azure footprint beyond {target_label} evidence." + "AzureFox can ground downstream consequence here, but it has not yet mapped the real " + f"Azure footprint beyond {target_label} evidence." ) if current_operator_can_inject: if target_resolution == "named match": return ( - f"You can poison the source now, and AzureFox has already joined the exact " - f"{target_label} target strongly enough to validate next." + f"{_deployment_operator_control_boundary(source_command, source, target_label)}, " + f"but not {_deployment_remaining_identity_boundary(source_command, source)}." ) if target_resolution == "narrowed candidates": return ( - f"You can poison the source now, but AzureFox still cannot name the exact " - f"{target_label} target." + f"This row proves {source_control_label}, but not the exact {target_label} target." ) if target_resolution == "visibility blocked": return ( - f"You can poison the source now, but current scope still hides the downstream " - f"{target_label} target." + f"This row proves {source_control_label}, but current scope still hides the " + f"downstream {target_label} target." ) if current_operator_can_drive: if target_resolution == "named match": return ( - "You can start or edit this path now, but AzureFox has not yet proven a " - "writable source." + f"This row proves current-credential run-path control and the exact " + f"{target_label} target, but not a writable source." ) if target_resolution == "narrowed candidates": return ( - f"You can start this path now, but AzureFox has not yet proven a writable " + f"This row proves current-credential run-path control, but not a writable " f"source or the exact {target_label} target." ) if target_resolution == "visibility blocked": return ( - f"You can start this path now, but AzureFox has not yet proven a writable " - f"source and current scope cannot name the downstream {target_label} target." + f"This row proves current-credential run-path control, but not a writable source " + f"or visible downstream {target_label} target." ) if target_resolution == "named match": if confirmation_basis == "parsed-config-target": return ( - f"AzureFox can name the exact {target_label} target from parsed source clues, " - "but current-credential invocation is still unproven." + f"This row proves the exact {target_label} target from parsed source clues, but " + "not current-credential invocation." ) return ( - f"AzureFox can name the exact {target_label} target, but current-credential " - "invocation is still unproven." + f"This row proves the exact {target_label} target, but not current-credential " + "invocation." ) if target_resolution == "narrowed candidates": return ( - f"AzureFox narrowed the likely {target_label} targets, but current-credential " - "invocation is still unproven." + f"This row narrows the likely {target_label} targets, but not current-credential " + "invocation." ) if target_resolution == "visibility blocked": - if source_command == "automation": - detail = " ".join(automation_sentences).strip() - if detail: + return ( + f"Current scope still hides the downstream {target_label} target, so AzureFox cannot " + "complete the target-side judgment yet." + ) + return ( + f"AzureFox still cannot prove either a defensible {target_label} target story or a " + "current-credential path into this source." + ) + + +def _deployment_remaining_identity_boundary( + source_command: str | None, + source: dict | None, +) -> str: + if source_command == "devops" and source is not None: + identity_name = _devops_execution_identity_name(source) + if identity_name: + return f"a separate direct sign-in as Azure identity '{identity_name}'" + if source_command == "automation" and source is not None: + identity_type = str(source.get("identity_type") or "").strip() + if identity_type: + return f"a separate direct sign-in as the automation Azure identity ({identity_type})" + return "a separate direct sign-in from this row alone" + + +def _deployment_operator_control_boundary( + source_command: str | None, + source: dict | None, + target_label: str, +) -> str: + if source_command == "devops" and source is not None: + identity_name = _devops_execution_identity_name(source) + if _devops_current_operator_control_mode(source) == "definition-edit": + if identity_name: return ( - f"{detail} Current scope still hides the downstream {target_label} target, " - "so AzureFox cannot complete the target-side actionability judgment yet." + "Current evidence shows you can edit this pipeline definition so it runs as " + f"Azure identity '{identity_name}' against the exact {target_label} target" ) return ( - f"Current scope still hides the downstream {target_label} target, so " - "AzureFox cannot complete the actionability judgment yet." + "Current evidence shows you can edit this pipeline definition so it runs " + f"against the exact {target_label} target" + ) + if identity_name: + return ( + "Current evidence shows you can poison this trusted input so it runs as Azure " + f"identity '{identity_name}' against the exact {target_label} target" ) return ( - f"Current scope still hides the downstream {target_label} target, so AzureFox " - "cannot complete the actionability judgment yet." + "Current evidence shows you can poison this trusted input against the exact " + f"{target_label} target" ) - return ( - f"AzureFox still cannot show either a defensible {target_label} target story or a " - "current operator path into this source." - ) + if source_command == "automation": + return ( + "Current evidence shows you can control this source-side path against the exact " + f"{target_label} target" + ) + return f"Current evidence shows source poisoning and the exact {target_label} target" + + +def _deployment_source_control_label( + source_command: str | None, + source: dict | None, +) -> str: + if source_command == "devops" and source is not None: + if _devops_current_operator_control_mode(source) == "definition-edit": + return "source-side definition control" + return "source poisoning" + if source_command == "automation": + return "source-side control" + return "source poisoning" + + +def _devops_missing_source_control_text(*, definite: bool = True) -> str: + phrase = "writable trusted input or current-credential definition-edit path" + return f"the {phrase}" if definite else phrase def _deployment_evidence_commands( @@ -1569,6 +1893,9 @@ def _deployment_next_review( path_concept: str | None, target_family: str, target_resolution: str, + target_names: list[str], + target_label: str, + supporting_deployments: list[dict], ) -> str: if source_command == "automation": primary_mode = str(source.get("primary_start_mode") or "") or None @@ -1621,25 +1948,39 @@ def _deployment_next_review( else: steps.append("confirm which runbook and trigger path performs the Azure change") if trust_clause: - steps.append("review role-trusts around the automation identity's downstream control") + steps.append("review other identity trust paths around that same Azure identity") elif source.get("principal_id") or source.get("client_id") or source.get("identity_ids"): - steps.append("review role-trusts for controllable automation identity links") - target_command = _DEPLOYMENT_TARGET_SPECS[target_family]["command"] + steps.append("review other identity trust paths around the automation identity") if source.get("missing_target_mapping"): steps.append( - f"use {target_command} as consequence grounding because runbook " - "target mapping is still missing" + "use already-loaded ARM deployment evidence as consequence grounding " + "because runbook target mapping is still missing" ) elif target_resolution == "visibility blocked": - steps.append(f"restore {target_command} visibility for consequence grounding") + steps.append( + f"restore {target_label} visibility so AzureFox can finish the target-side join" + ) else: - steps.append(f"open {target_command} to validate the likely Azure impact") + steps.append( + _deployment_target_review_step( + target_resolution=target_resolution, + target_label=target_label, + target_names=target_names, + supporting_deployments=supporting_deployments, + ) + ) return "; ".join(steps) + "." if path_concept == "secret-escalation-support": steps: list[str] = ["Confirm what separate foothold could reuse this secret-backed support"] elif _source_current_operator_can_inject(source_command, source): - steps: list[str] = ["Current credentials can already poison a trusted input"] + if ( + source_command == "devops" + and _devops_current_operator_control_mode(source) == "definition-edit" + ): + steps = ["Current credentials can already edit this pipeline definition directly"] + else: + steps = ["Current credentials can already poison a trusted input"] elif _source_current_operator_can_drive(source_command, source): steps = [ "Current credentials can already start this path, but trusted-input poisoning is " @@ -1661,31 +2002,111 @@ def _deployment_next_review( if source.get("azure_service_connection_client_ids") or source.get( "azure_service_connection_principal_ids" ): - steps.append("review role-trusts for controllable identity links") + if source.get("joined_role_trusts"): + steps.append( + "use the already-joined app and identity trust evidence to validate " + "other sign-in paths into that same Azure identity" + ) + else: + steps.append("review other trust paths into the Azure identity tied to this pipeline") + permission_clause = _devops_permission_clause(source) + if permission_clause: + steps.append( + "use the already-joined Azure control on the Azure identity tied to this pipeline" + ) if "keyvault-backed-inputs" in (source.get("secret_support_types") or []): - steps.append("review keyvault for secret-backed deployment support") - - target_command = _DEPLOYMENT_TARGET_SPECS[target_family]["command"] + steps.append( + "use the already-loaded Key Vault support evidence to keep blast " + "radius in view" + ) if source.get("missing_target_mapping"): steps.append( - f"use {target_command} as consequence grounding because target mapping is still missing" + "use already-loaded ARM deployment evidence as consequence grounding " + "because target mapping is still missing" ) elif target_resolution == "visibility blocked": - steps.append(f"restore {target_command} visibility for consequence grounding") + steps.append( + f"restore {target_label} visibility so AzureFox can finish the target-side join" + ) else: - steps.append(f"open {target_command} to validate the likely Azure impact") + steps.append( + _deployment_target_review_step( + target_resolution=target_resolution, + target_label=target_label, + target_names=target_names, + supporting_deployments=supporting_deployments, + ) + ) return "; ".join(steps) + "." +def _deployment_target_review_step( + *, + target_resolution: str, + target_label: str, + target_names: list[str], + supporting_deployments: list[dict], +) -> str: + shown_targets = ", ".join(target_names[:_CANDIDATE_LIMIT]) + if target_resolution == "named match" and shown_targets: + step = ( + f"AzureFox already named the exact {target_label} target {shown_targets}; " + "validate that target directly" + ) + elif shown_targets: + step = ( + f"AzureFox already narrowed the likely {target_label} candidates to {shown_targets}; " + "confirm which one this path actually changes" + ) + else: + step = f"confirm which {target_label} this path actually changes" + supporting_names = ", ".join( + str(item.get("name") or "") + for item in supporting_deployments[:_CANDIDATE_LIMIT] + if item.get("name") + ) + if supporting_names: + step += f" while keeping supporting ARM deployment history {supporting_names} in view" + return step + + +def _devops_grounded_reach_clause(source: dict) -> str: + consequence_types = {str(value) for value in (source.get("consequence_types") or []) if value} + phrases: list[str] = [] + if "redeploy-workload" in consequence_types: + phrases.append("AzureFox already ties this path to visible workload deployment reach") + if "modify-infra" in consequence_types: + phrases.append("visible infrastructure deployment reach") + if "reintroduce-config" in consequence_types: + phrases.append("configuration change reach") + if "run-recurring-execution" in consequence_types: + phrases.append("recurring execution") + if "consume-secret-backed-deployment-material" in consequence_types: + phrases.append("secret-backed deployment material") + if not phrases: + return "" + if len(phrases) == 1: + return phrases[0] + "." + if len(phrases) == 2: + return f"{phrases[0]} and {phrases[1]}." + return ", ".join(phrases[:-1]) + f", and {phrases[-1]}." + + def _deployment_joined_surfaces( source_command: str, change_signals: tuple[str, ...], *, supporting_deployments: list[dict], + source: dict | None = None, ) -> list[str]: joined = [source_command, *change_signals] if supporting_deployments: joined.append("provider-family-match") + if source_command == "devops" and source is not None: + if source.get("joined_permission"): + joined.append("permission-summary") + if source.get("joined_role_trusts"): + joined.append("trust-edge") return sorted(dict.fromkeys(joined)) @@ -1835,8 +2256,9 @@ def _deployment_missing_confirmation( if source_command == "devops": return ( f"Missing target-side visibility for the downstream {target_label} footprint, " - "and current evidence still does not prove a poisonable trusted input or a " - "definition-edit path for current credentials." + "and current evidence still does not prove " + + _devops_missing_source_control_text(definite=False) + + "." ) if _source_current_operator_can_inject(source_command, source): return ( @@ -1873,8 +2295,8 @@ def _deployment_missing_confirmation( if source_command == "devops": return ( f"Current evidence names the likely {target_label} target, but does not confirm a " - "poisonable trusted input or a current-credential definition-edit path on the " - "source side." + + _devops_missing_source_control_text(definite=False) + + " on the source side." ) return ( f"Current evidence names the likely {target_label} target, but does not confirm which " @@ -1883,8 +2305,9 @@ def _deployment_missing_confirmation( if source_command == "devops": return ( f"Missing exact {target_label} mapping and source-side poisoning proof; current " - "evidence does not confirm a poisonable trusted input or a current-credential " - "definition-edit path." + "evidence does not confirm " + + _devops_missing_source_control_text(definite=False) + + "." ) if _source_current_operator_can_inject(source_command, source): return ( diff --git a/src/azurefox/collectors/provider.py b/src/azurefox/collectors/provider.py index 2309c98..7d2d069 100644 --- a/src/azurefox/collectors/provider.py +++ b/src/azurefox/collectors/provider.py @@ -9818,7 +9818,7 @@ def _devops_package_feed_input( def _devops_access_state_rank(value: object) -> int: - return {"write": 0, "read": 1, "exists-only": 2}.get(str(value or ""), 9) + return {"write": 0, "use": 1, "read": 2, "exists-only": 3}.get(str(value or ""), 9) def _devops_apply_permission_proof( @@ -10100,7 +10100,7 @@ def _devops_apply_secure_file_role_proof( if best_role == "user": return _devops_apply_permission_proof( trusted_input, - access_state="read", + access_state="use", can_poison=False, evidence_basis="secure-file-role", permission_source="azure-devops-library-security-role", @@ -10476,7 +10476,112 @@ def _devops_target_clues(definition: dict[str, object]) -> list[str]: for clue, keywords in patterns.items(): if any(keyword in text for keyword in keywords for text in strings): clues.append(clue) - return clues + structured_clues = _devops_structured_target_clues(definition, broad_clues=clues) + clues.extend( + clue.split(":", 1)[0].strip() for clue in structured_clues if ":" in clue and clue + ) + clues.extend(structured_clues) + return _dedupe_strings(clues) + + +def _devops_structured_target_clues( + definition: dict[str, object], + *, + broad_clues: list[str], +) -> list[str]: + structured: list[str] = [] + lowered_broad_clues = {value.lower() for value in broad_clues} + for path, node in _recursive_nodes(definition): + if not isinstance(node, dict): + continue + + app_service_name = _devops_named_target_input( + node, + token_groups=( + ("azure", "web", "app", "name"), + ("web", "app", "name"), + ("app", "service", "name"), + ), + ) + if not app_service_name and "app service" in lowered_broad_clues: + app_service_name = _devops_named_target_input( + node, + token_groups=(("app", "name"),), + ) + if app_service_name: + structured.append(f"App Service: {app_service_name}") + + function_name = _devops_named_target_input( + node, + token_groups=( + ("azure", "function", "app", "name"), + ("function", "app", "name"), + ), + ) + if not function_name and "functions" in lowered_broad_clues: + function_name = _devops_named_target_input( + node, + token_groups=(("function", "name"),), + ) + if function_name: + structured.append(f"Functions: {function_name}") + + cluster_name = _devops_named_target_input( + node, + token_groups=( + ("aks", "cluster", "name"), + ("kubernetes", "cluster", "name"), + ), + ) + if not cluster_name and "aks/kubernetes" in lowered_broad_clues: + cluster_name = _devops_named_target_input( + node, + token_groups=(("cluster", "name"),), + ) + if cluster_name: + structured.append(f"AKS/Kubernetes: {cluster_name}") + + node_tokens = { + token for key in node for token in _devops_identifier_tokens(key) + } | set(_devops_path_tokens(path)) + if "arm/bicep/terraform" in lowered_broad_clues or _devops_node_has_arm_target_context( + node_tokens + ): + deployment_name = _devops_named_target_input( + node, + token_groups=(("deployment", "name"),), + ) + if deployment_name: + structured.append(f"ARM/Bicep/Terraform: {deployment_name}") + + return structured + + +def _devops_node_has_arm_target_context(tokens: set[str]) -> bool: + return bool( + "arm" in tokens + or "bicep" in tokens + or "terraform" in tokens + or ("resource" in tokens and "manager" in tokens) + ) + + +def _devops_named_target_input( + node: dict[str, object], + *, + token_groups: tuple[tuple[str, ...], ...], +) -> str | None: + for key, value in node.items(): + if not isinstance(value, str): + continue + cleaned_value = value.strip() + if not cleaned_value or _looks_like_expression(cleaned_value): + continue + key_tokens = set(_devops_identifier_tokens(key)) + for token_group in token_groups: + if set(token_group).issubset(key_tokens): + return cleaned_value + return None def _devops_secret_support_types( @@ -10758,6 +10863,12 @@ def _devops_injection_clause( return "current credentials can inject through " + ", ".join( current_operator_injection_surface_types ) + if primary_trusted_input_access_state == "use": + if primary_trusted_input_type == "secure-file": + return ( + f"current credentials can use {trusted_input} in pipeline context, but Azure " + "DevOps evidence here does not prove secure-file administration" + ) if primary_trusted_input_access_state == "read": if primary_trusted_input_type == "pipeline-artifact": return ( diff --git a/tests/fixtures/lab_tenant/devops.json b/tests/fixtures/lab_tenant/devops.json index 3678225..08fc69b 100644 --- a/tests/fixtures/lab_tenant/devops.json +++ b/tests/fixtures/lab_tenant/devops.json @@ -278,7 +278,8 @@ "source_visibility_state": "visible", "summary": "Build definition 'deploy-appservice-prod' in project 'prod-platform' exposes an Azure change path. trusted inputs include repository azure-repos:customer-portal@refs/heads/main. execution can start through pr-trigger. current credentials can poison repo-content through repository azure-repos:customer-portal@refs/heads/main. uses Azure-facing service connection(s) prod-appsvc-wif. references variable group(s) prod-appsvc-release. surfaces 3 secret-marked variable name(s). source clues ground likely Azure impact in App Service. Current credentials can poison repo-content through repository azure-repos:customer-portal@refs/heads/main. Check app-services for the named deployment target; review permissions and role-trusts for Azure control.", "target_clues": [ - "App Service" + "App Service", + "App Service: app-public-api" ], "trigger_join_ids": [ "devops-trigger://contoso/prod-platform/23/pr-trigger", diff --git a/tests/golden/devops.json b/tests/golden/devops.json index 3de25f5..ee1f614 100644 --- a/tests/golden/devops.json +++ b/tests/golden/devops.json @@ -231,7 +231,8 @@ "22222222-2222-2222-2222-222222222222" ], "target_clues": [ - "App Service" + "App Service", + "App Service: app-public-api" ], "risk_cues": [ "auto-triggered", diff --git a/tests/test_chain_semantics.py b/tests/test_chain_semantics.py index fb5f940..9247443 100644 --- a/tests/test_chain_semantics.py +++ b/tests/test_chain_semantics.py @@ -1,8 +1,17 @@ from __future__ import annotations from azurefox.chains.credential_path import _build_candidate_record +from azurefox.chains.deployment_path import DeploymentSourceAssessment from azurefox.chains.runner import ( _build_escalation_trust_record, + _deployment_confidence_boundary, + _deployment_current_operator_suffix, + _deployment_devops_insertion_point, + _deployment_next_review, + _deployment_why_care, + _devops_execution_identity_name, + _devops_joined_permission, + _devops_joined_role_trusts, _source_current_operator_can_inject, ) from azurefox.chains.semantics import ( @@ -163,6 +172,131 @@ def test_devops_edit_rights_count_as_definition_edit_injection() -> None: ) +def test_deployment_path_secure_file_use_suffix_stays_use_scoped() -> None: + assert ( + _deployment_current_operator_suffix( + "devops", + { + "trusted_inputs": [ + { + "input_type": "secure-file", + "ref": "secure-file:codesign-cert.pfx", + "current_operator_access_state": "use", + } + ], + "primary_trusted_input_ref": "secure-file:codesign-cert.pfx", + "current_operator_injection_surface_types": [], + "current_operator_can_queue": False, + "current_operator_can_edit": False, + }, + ) + == ( + "Current credentials can use that secure file in pipeline context, but " + "Azure DevOps evidence here does not prove secure-file administration." + ) + ) + + +def test_deployment_path_secure_file_use_insertion_point_stays_use_scoped() -> None: + assert ( + _deployment_devops_insertion_point( + { + "trusted_inputs": [ + { + "input_type": "secure-file", + "ref": "secure-file:codesign-cert.pfx", + "current_operator_access_state": "use", + } + ], + "primary_trusted_input_ref": "secure-file:codesign-cert.pfx", + "current_operator_injection_surface_types": [], + "current_operator_can_queue": False, + "current_operator_can_edit": False, + } + ) + == ( + "secure file codesign-cert.pfx is usable in pipeline context, but " + "secure-file administration is unproven." + ) + ) + + +def test_deployment_path_artifact_read_insertion_point_stays_producer_scoped() -> None: + assert ( + _deployment_devops_insertion_point( + { + "trusted_inputs": [ + { + "input_type": "pipeline-artifact", + "ref": "pipeline-artifact:prod-platform/shared-build#signed-drop", + "current_operator_access_state": "read", + } + ], + "primary_trusted_input_ref": ( + "pipeline-artifact:prod-platform/shared-build#signed-drop" + ), + "current_operator_injection_surface_types": [], + "current_operator_can_queue": False, + "current_operator_can_edit": False, + } + ) + == ( + "The upstream producer behind pipeline artifact " + "prod-platform/shared-build#signed-drop is inspectable, but producer control is " + "unproven." + ) + ) + + +def test_deployment_path_definition_edit_boundary_stays_definition_scoped() -> None: + source = { + "current_operator_can_edit": True, + "current_operator_injection_surface_types": ["definition-edit"], + "joined_permission": {"display_name": "build-sp"}, + } + + assert ( + _deployment_confidence_boundary( + source_command="devops", + source=source, + target_label="App Service", + target_resolution="named match", + confirmation_basis="parsed-config-target", + current_operator_can_drive=True, + current_operator_can_inject=True, + missing_target_mapping=False, + ) + == ( + "Current evidence shows you can edit this pipeline definition so it runs as Azure " + "identity 'build-sp' against the exact App Service target, but not a separate " + "direct sign-in as Azure identity 'build-sp'." + ) + ) + + +def test_deployment_path_definition_edit_next_review_stays_definition_scoped() -> None: + assert ( + _deployment_next_review( + source_command="devops", + source={ + "current_operator_can_edit": True, + "current_operator_injection_surface_types": ["definition-edit"], + }, + path_concept="controllable-change-path", + target_family="app-services", + target_resolution="named match", + target_names=["app-public-api"], + target_label="App Service", + supporting_deployments=[], + ) + == ( + "Current credentials can already edit this pipeline definition directly; AzureFox " + "already named the exact App Service target app-public-api; validate that target " + "directly." + ) + ) + + def test_secret_support_rows_stay_lower_priority() -> None: decision = evaluate_chain_semantics( ChainSemanticContext( @@ -334,6 +468,102 @@ def test_escalation_path_trust_rows_use_hidden_role_trust_transform_fields() -> assert record.target_resolution == "path-confirmed" +def test_devops_joined_permission_ignores_service_connection_id_matches() -> None: + joined = _devops_joined_permission( + { + "azure_service_connection_ids": ["endpoint-1"], + "azure_service_connection_principal_ids": ["sp-1"], + "azure_service_connection_client_ids": ["app-1"], + "identity_join_ids": ["endpoint-1", "sp-1", "app-1"], + }, + { + "endpoint-1": {"display_name": "wrong-endpoint"}, + "sp-1": {"display_name": "build-sp"}, + }, + ) + + assert joined == {"display_name": "build-sp"} + + +def test_devops_joined_role_trusts_prefers_internal_app_to_sp_link_first() -> None: + trusts = _devops_joined_role_trusts( + { + "azure_service_connection_principal_ids": ["sp-1"], + "azure_service_connection_client_ids": ["app-1"], + }, + { + "sp-1": [ + { + "trust_type": "service-principal-owner", + "source_object_id": "runner-sp", + "source_name": "automation-runner", + "target_object_id": "sp-1", + "target_name": "build-sp", + }, + { + "trust_type": "federated-credential", + "source_object_id": "app-1", + "source_name": "build-app", + "target_object_id": "sp-1", + "target_name": "build-sp", + }, + ], + "app-1": [ + { + "trust_type": "app-owner", + "source_object_id": "user-1", + "source_name": "ci-admin@lab.local", + "target_object_id": "app-1", + "target_name": "build-app", + }, + { + "trust_type": "federated-credential", + "source_object_id": "app-1", + "source_name": "build-app", + "target_object_id": "sp-1", + "target_name": "build-sp", + }, + ], + }, + ) + + assert trusts[0]["trust_type"] == "federated-credential" + assert _devops_execution_identity_name( + { + "azure_service_connection_principal_ids": ["sp-1"], + "azure_service_connection_client_ids": ["app-1"], + "joined_role_trusts": trusts, + } + ) == "build-sp" + + +def test_deployment_why_care_definition_edit_does_not_claim_trusted_input_poisoning() -> None: + text = _deployment_why_care( + "devops", + { + "trusted_inputs": [ + { + "input_type": "repository", + "ref": "repository:azure-repos:customer-portal@refs/heads/main", + } + ], + "primary_trusted_input_ref": "repository:azure-repos:customer-portal@refs/heads/main", + "current_operator_can_edit": True, + "current_operator_injection_surface_types": ["definition-edit"], + "joined_permission": {"display_name": "build-sp", "privileged": False}, + }, + assessment=DeploymentSourceAssessment( + source_command="devops", + source_name="deploy-appservice-prod", + posture="can already change Azure here", + path_concept="controllable-change-path", + ), + ) + + assert "edit this pipeline definition directly" in text + assert "poison that source" not in text + + def test_semantic_priority_sort_value_orders_highest_first() -> None: assert semantic_priority_sort_value("high") < semantic_priority_sort_value("medium") assert semantic_priority_sort_value("medium") < semantic_priority_sort_value("low") diff --git a/tests/test_cli_smoke.py b/tests/test_cli_smoke.py index 4021e1e..b051a29 100644 --- a/tests/test_cli_smoke.py +++ b/tests/test_cli_smoke.py @@ -221,6 +221,7 @@ def test_cli_smoke_chains_deployment_path_json(tmp_path: Path) -> None: "arm-deployment", } assert {item["target_resolution"] for item in payload["paths"]} == { + "named match", "narrowed candidates", "visibility blocked", } @@ -255,20 +256,41 @@ def test_cli_smoke_chains_deployment_path_json(tmp_path: Path) -> None: ) assert "role-trusts" in automation_row["evidence_commands"] assert "rbac" in automation_row["evidence_commands"] - assert "aa-hybrid-prod-mi" in automation_row["confidence_boundary"] + assert "This row proves source-side control" in automation_row["confidence_boundary"] + assert "Azure footprint beyond ARM deployment evidence" in automation_row["confidence_boundary"] assert "ops-deploy-sp" in automation_row["why_care"] assert "map what runbook Redeploy-App changes" in automation_row["next_review"] assert "run recurring Azure-facing execution" in automation_row["why_care"] aks_row = next(item for item in payload["paths"] if item["asset_name"] == "deploy-aks-prod") assert aks_row["actionability_state"] == "conditionally actionable" assert "Queue this pipeline now" in aks_row["insertion_point"] + assert "permission-summary" in aks_row["joined_surface_types"] assert "permissions" in aks_row["evidence_commands"] assert "keyvault" in aks_row["evidence_commands"] + assert "current-credential run-path control" in aks_row["confidence_boundary"] + assert "not a writable source" in aks_row["confidence_boundary"] + assert "exact AKS cluster target" in aks_row["confidence_boundary"] + assert "AzureFox already narrowed the likely AKS cluster candidates" in aks_row["next_review"] appsvc_row = next( item for item in payload["paths"] if item["asset_name"] == "deploy-appservice-prod" ) + assert appsvc_row["target_resolution"] == "named match" + assert appsvc_row["confirmation_basis"] == "parsed-config-target" assert appsvc_row["actionability_state"] == "currently actionable" assert "Poison repository" in appsvc_row["insertion_point"] + assert appsvc_row["target_names"] == ["app-public-api"] + assert appsvc_row["likely_impact"] == "exact app service: app-public-api" + assert "trust-edge" in appsvc_row["joined_surface_types"] + assert "runs as Azure identity 'build-sp'" in appsvc_row["confidence_boundary"] + assert "exact App Service target" in appsvc_row["confidence_boundary"] + assert ( + "separate direct sign-in as Azure identity 'build-sp'" + in appsvc_row["confidence_boundary"] + ) + assert ( + "AzureFox already named the exact App Service target app-public-api" + in appsvc_row["next_review"] + ) def test_cli_smoke_chains_escalation_path_json(tmp_path: Path) -> None: diff --git a/tests/test_collectors.py b/tests/test_collectors.py index 8d50775..336f65c 100644 --- a/tests/test_collectors.py +++ b/tests/test_collectors.py @@ -57,6 +57,7 @@ _devops_operator_summary, _devops_package_feed_input, _devops_pipeline_summary, + _devops_structured_target_clues, _env_var_reference_target, _network_effective_row_from_endpoint, _network_scope_label, @@ -2227,6 +2228,140 @@ def test_devops_pipeline_summary_extracts_non_repo_trusted_inputs() -> None: assert pipeline["trusted_input_refs"] +def test_devops_pipeline_summary_extracts_structured_app_service_target_clue() -> None: + pipeline, issues = _devops_pipeline_summary( + organization="contoso", + project={"name": "prod-platform", "id": "project-1"}, + definition={ + "id": 23, + "name": "deploy-appservice-prod", + "repository": {"name": "customer-portal", "type": "TfsGit"}, + "process": { + "phases": [ + { + "steps": [ + { + "inputs": { + "azureWebAppName": "app-public-api", + "connectedServiceNameARM": "prod-appsvc-wif", + } + } + ] + } + ] + }, + }, + service_endpoints_by_id={}, + service_endpoints_by_name={ + "prod-appsvc-wif": { + "id": "endpoint-1", + "name": "prod-appsvc-wif", + "type": "azurerm", + "authorization": {"scheme": "WorkloadIdentityFederation"}, + } + }, + repositories_by_id={}, + repositories_by_name={}, + variable_groups_by_id={}, + ) + + assert issues == [] + assert "App Service" in pipeline["target_clues"] + assert "App Service: app-public-api" in pipeline["target_clues"] + + +def test_devops_pipeline_summary_extracts_structured_aks_and_arm_target_clues() -> None: + pipeline, issues = _devops_pipeline_summary( + organization="contoso", + project={"name": "platform-infra", "id": "project-1"}, + definition={ + "id": 34, + "name": "plan-infra-prod", + "repository": {"name": "infra-live", "type": "TfsGit"}, + "process": { + "phases": [ + { + "steps": [ + { + "inputs": { + "aksClusterName": "aks-ops-01", + "script": "az deployment group create", + "deploymentName": "sub-foundation", + "connectedServiceNameARM": "infra-subscription", + } + } + ] + } + ] + }, + }, + service_endpoints_by_id={}, + service_endpoints_by_name={ + "infra-subscription": { + "id": "endpoint-1", + "name": "infra-subscription", + "type": "azurerm", + "authorization": {"scheme": "ManagedServiceIdentity"}, + } + }, + repositories_by_id={}, + repositories_by_name={}, + variable_groups_by_id={}, + ) + + assert issues == [] + assert "AKS/Kubernetes" in pipeline["target_clues"] + assert "AKS/Kubernetes: aks-ops-01" in pipeline["target_clues"] + assert "ARM/Bicep/Terraform" in pipeline["target_clues"] + assert "ARM/Bicep/Terraform: sub-foundation" in pipeline["target_clues"] + + +def test_devops_structured_target_clues_do_not_depend_on_broad_clues() -> None: + clues = _devops_structured_target_clues( + { + "process": { + "phases": [ + { + "steps": [ + {"inputs": {"azureWebAppName": "app-public-api"}}, + {"inputs": {"aksClusterName": "aks-ops-01"}}, + ] + } + ] + } + }, + broad_clues=[], + ) + + assert "App Service: app-public-api" in clues + assert "AKS/Kubernetes: aks-ops-01" in clues + + +def test_devops_structured_target_clues_ignore_generic_names_without_azure_context() -> None: + clues = _devops_structured_target_clues( + { + "process": { + "phases": [ + { + "steps": [ + { + "inputs": { + "appName": "docs-site", + "clusterName": "shared-cluster", + "deploymentName": "release-42", + } + } + ] + } + ] + } + }, + broad_clues=[], + ) + + assert clues == [] + + def test_devops_finalize_trusted_inputs_adds_exists_only_proof_metadata() -> None: trusted_inputs = _devops_finalize_trusted_inputs( trusted_inputs=[ @@ -2409,7 +2544,7 @@ def test_devops_secure_file_role_proof_distinguishes_visible_use_and_manage() -> assert visible_only["current_operator_access_state"] == "exists-only" assert visible_only["current_operator_can_poison"] is False assert visible_only["trusted_input_permission_detail"] == "role=reader" - assert usable["current_operator_access_state"] == "read" + assert usable["current_operator_access_state"] == "use" assert usable["current_operator_can_poison"] is False assert usable["trusted_input_permission_detail"] == "role=user" assert manageable["current_operator_access_state"] == "write" @@ -2425,7 +2560,7 @@ def test_devops_secure_file_use_proof_does_not_overstate_generic_reading() -> No { "input_type": "secure-file", "ref": "secure-file:codesign-cert.pfx", - "current_operator_access_state": "read", + "current_operator_access_state": "use", "current_operator_can_poison": False, "surface_types": ["secure-file"], } @@ -2447,7 +2582,7 @@ def test_devops_secure_file_use_proof_does_not_overstate_generic_reading() -> No current_operator_can_contribute_source=False, current_operator_injection_surface_types=[], primary_trusted_input_type="secure-file", - primary_trusted_input_access_state="read", + primary_trusted_input_access_state="use", ) assert "can use secure file codesign-cert.pfx in pipeline context" in summary diff --git a/tests/test_golden_outputs.py b/tests/test_golden_outputs.py index c4c9b6c..699b67a 100644 --- a/tests/test_golden_outputs.py +++ b/tests/test_golden_outputs.py @@ -41,9 +41,29 @@ collect_workloads, ) +_PROSE_HEAVY_FIELDS = { + "summary", + "next_review", +} + + +def _scrub_prose_heavy_fields(node: object) -> object: + if isinstance(node, dict): + cleaned: dict[str, object] = {} + for key, value in node.items(): + if key in _PROSE_HEAVY_FIELDS and isinstance(value, str): + cleaned[key] = f"<{key}>" + else: + cleaned[key] = _scrub_prose_heavy_fields(value) + return cleaned + if isinstance(node, list): + return [_scrub_prose_heavy_fields(item) for item in node] + return node + def _normalize(payload: dict) -> dict: payload = json.loads(json.dumps(payload)) + payload = _scrub_prose_heavy_fields(payload) metadata = payload["metadata"] metadata["generated_at"] = "" metadata.setdefault("auth_mode", None) diff --git a/tests/test_terminal_ux.py b/tests/test_terminal_ux.py index bd1a222..bb4ee00 100644 --- a/tests/test_terminal_ux.py +++ b/tests/test_terminal_ux.py @@ -808,7 +808,8 @@ def test_deployment_chains_table_mode_surfaces_source_oriented_columns(tmp_path: assert "aa-hybrid-prod" in result.stdout assert "aa-lab-quiet" in result.stdout assert "pivot-now" in normalized_output - assert "upstream producer control" in normalized_output + assert "Artifact trust is" in normalized_output + assert "pipeline artifact" in normalized_output assert "currently actionable" in normalized_output assert "conditionally" in normalized_output assert "grounded, insertion" in normalized_output @@ -832,8 +833,8 @@ def test_deployment_chains_table_mode_renders_why_care_as_detail_rows(tmp_path: assert main_header_lines assert all("why care" not in line for line in main_header_lines) assert detail_header_lines - assert "Current credentials can already poison that trusted input" in result.stdout - assert "Current evidence only shows that the trusted input exists" in result.stdout + assert "Current credentials can already poison that source" in result.stdout + assert "If that trusted input becomes attacker-controlled" in result.stdout assert len(detail_header_lines) == 6