From bfc3c8c3e9886e2a1c1035145a1807d237049dd1 Mon Sep 17 00:00:00 2001 From: Colby Farley Date: Thu, 9 Apr 2026 23:11:56 -0500 Subject: [PATCH 1/3] feat: harden deployment-path actionability --- src/azurefox/chains/registry.py | 40 ++ src/azurefox/chains/runner.py | 740 ++++++++++++++++++-- src/azurefox/collectors/provider.py | 52 ++ src/azurefox/models/chains.py | 1 + src/azurefox/models/common.py | 3 + src/azurefox/render/table.py | 24 +- tests/fixtures/lab_tenant/automation.json | 15 + tests/fixtures/lab_tenant/permissions.json | 18 + tests/fixtures/lab_tenant/role_trusts.json | 16 + tests/golden/automation.json | 15 + tests/golden/permissions.json | 21 + tests/golden/role-trusts.json | 24 + tests/test_chain_scaffold.py | 2 + tests/test_cli_smoke.py | 48 +- tests/test_collectors.py | 40 +- tests/test_deployment_path_admissibility.py | 52 ++ tests/test_terminal_ux.py | 21 +- 17 files changed, 1050 insertions(+), 82 deletions(-) diff --git a/src/azurefox/chains/registry.py b/src/azurefox/chains/registry.py index febf26c..4c43369 100644 --- a/src/azurefox/chains/registry.py +++ b/src/azurefox/chains/registry.py @@ -186,6 +186,46 @@ class ChainFamilySpec: "reintroduce change." ), ), + ChainSourceSpec( + command="permissions", + minimum_fields=( + "principal_id", + "principal_type", + "high_impact_roles", + "scope_count", + "privileged", + ), + rationale=( + "Provides direct RBAC proof for automation identities and service-connection-" + "backed principals." + ), + ), + ChainSourceSpec( + command="rbac", + minimum_fields=( + "scope_id", + "principal_id", + "role_name", + ), + rationale=( + "Provides exact role-to-scope evidence for the current identity when the " + "family needs to prove start or edit control on an automation path." + ), + ), + ChainSourceSpec( + command="role-trusts", + minimum_fields=( + "source_object_id", + "target_object_id", + "trust_type", + "confidence", + "summary", + ), + rationale=( + "Provides trust-expansion context when the deployment identity can also " + "control other app or service-principal boundaries." + ), + ), ChainSourceSpec( command="arm-deployments", minimum_fields=( diff --git a/src/azurefox/chains/runner.py b/src/azurefox/chains/runner.py index 6a792b8..aba4797 100644 --- a/src/azurefox/chains/runner.py +++ b/src/azurefox/chains/runner.py @@ -68,6 +68,21 @@ "collection_key": "deployments", }, } +_AUTOMATION_EDIT_ROLE_NAMES = { + "owner", + "contributor", + "automation contributor", +} +_AUTOMATION_START_ROLE_NAMES = { + "owner", + "contributor", + "automation contributor", + "automation operator", +} +_AUTOMATION_EDIT_ROLE_DEFINITION_IDS = { + "8e3af657-a8ff-443c-a75c-2fe8c4bcb635", # Owner + "b24988ac-6180-42a0-ab88-20f7382dd24c", # Contributor +} def implemented_chain_families() -> tuple[str, ...]: @@ -168,6 +183,9 @@ def _build_deployment_path_output( devops_output = loaded["devops"] automation_output = loaded["automation"] + permissions_output = loaded["permissions"] + rbac_output = loaded["rbac"] + role_trusts_output = loaded["role-trusts"] arm_output = loaded["arm-deployments"] app_services_output = loaded["app-services"] functions_output = loaded["functions"] @@ -206,6 +224,27 @@ def _build_deployment_path_output( arm_correlations = _arm_correlations_by_target_family( [item.model_dump(mode="json") for item in arm_output.deployments] ) + permissions_by_principal = { + item.principal_id: item.model_dump(mode="json") + for item in permissions_output.permissions + if item.principal_id + } + current_identity_principal_ids = { + str(item.principal_id) + for item in permissions_output.permissions + if item.is_current_identity and item.principal_id + } + current_identity_role_assignments = [ + item.model_dump(mode="json") + for item in rbac_output.role_assignments + if str(item.principal_id or "") in current_identity_principal_ids + ] + trusts_by_source_id: dict[str, list[dict]] = defaultdict(list) + for trust in role_trusts_output.trusts: + trust_row = trust.model_dump(mode="json") + source_object_id = str(trust_row.get("source_object_id") or "") + if source_object_id: + trusts_by_source_id[source_object_id].append(trust_row) paths: list[ChainPathRecord] = [] for pipeline in devops_output.pipelines: @@ -240,6 +279,18 @@ def _build_deployment_path_output( for account in automation_output.automation_accounts: account_dict = account.model_dump(mode="json") + account_dict["current_operator_access"] = _automation_current_operator_access( + account_dict, + current_identity_role_assignments, + ) + account_dict["joined_permission"] = _automation_joined_permission( + account_dict, + permissions_by_principal, + ) + account_dict["joined_role_trusts"] = _automation_joined_role_trusts( + account_dict, + trusts_by_source_id, + ) assessment = assess_deployment_source(account) if assessment.posture == "insufficient evidence": continue @@ -278,6 +329,9 @@ def _build_deployment_path_output( for source_name in ( "devops", "automation", + "permissions", + "rbac", + "role-trusts", "arm-deployments", "app-services", "functions", @@ -452,6 +506,12 @@ def _build_deployment_source_record( current_operator_can_inject=_source_current_operator_can_inject(source_command, source), ) ) + semantic_priority = _deployment_priority_override( + source_command=source_command, + source=source, + path_concept=assessment.path_concept, + semantic_priority=semantic.priority, + ) return ChainPathRecord( chain_id=_source_chain_id( @@ -467,13 +527,25 @@ def _build_deployment_source_record( source_context=source_context, clue_type=assessment.path_concept or source_command, confirmation_basis=record_confirmation_basis, - priority=semantic.priority, + priority=semantic_priority, urgency=semantic.urgency, + actionability_state=_deployment_actionability_state( + source_command=source_command, + source=source, + path_concept=assessment.path_concept, + target_resolution=admission.state, + missing_target_mapping=assessment.missing_target_mapping, + ), visible_path=_deployment_visible_path( source_command, assessment.path_concept, target_spec["label"], ), + insertion_point=_deployment_insertion_point( + source_command=source_command, + source=source, + path_concept=assessment.path_concept, + ), path_concept=assessment.path_concept, primary_injection_surface=( str(source.get("primary_injection_surface")) @@ -497,6 +569,8 @@ def _build_deployment_source_record( missing_target_mapping=assessment.missing_target_mapping, ), confidence_boundary=_deployment_confidence_boundary( + source_command=source_command, + source=source, target_label=target_spec["label"], target_resolution=admission.state, confirmation_basis=record_confirmation_basis, @@ -540,6 +614,7 @@ def _build_deployment_source_record( supporting_deployments=supporting_deployments, ), missing_confirmation=_deployment_missing_confirmation( + source=source, source_command=source_command, path_concept=assessment.path_concept, target_label=target_spec["label"], @@ -575,6 +650,411 @@ def _deployment_visible_path( return f"Deployment source -> likely {target_label}" +def _deployment_actionability_state( + *, + source_command: str, + source: dict, + path_concept: str | None, + target_resolution: str, + missing_target_mapping: bool, +) -> str: + if path_concept == "secret-escalation-support": + return "support-only" + if _source_current_operator_can_inject(source_command, source): + return "currently actionable" + if _source_current_operator_can_drive(source_command, source): + return "conditionally actionable" + if target_resolution == "visibility blocked" and not missing_target_mapping: + return "visibility-bounded" + return "consequence-grounded but insertion point unproven" + + +def _deployment_insertion_point( + *, + source_command: str, + source: dict, + path_concept: str | None, +) -> str: + if source_command == "devops": + return _deployment_devops_insertion_point(source) + if source_command == "automation": + return _deployment_automation_insertion_point(source, path_concept=path_concept) + return "Visible deployment path, but the insertion point is not yet described." + + +def _deployment_devops_insertion_point(source: dict) -> str: + primary_input = _devops_primary_trusted_input(source) + trusted_input_text = _devops_trusted_input_text(primary_input) + injection_surfaces = [ + str(value) for value in (source.get("current_operator_injection_surface_types") or []) if value + ] + if any(value != "definition-edit" for value in injection_surfaces): + surface_list = ", ".join(value for value in injection_surfaces if value != "definition-edit") + return f"Poison {trusted_input_text} through {surface_list}." + if "definition-edit" in injection_surfaces or source.get("current_operator_can_edit"): + return "Edit the pipeline definition directly." + if source.get("current_operator_can_queue"): + if primary_input: + access_state = str(primary_input.get("current_operator_access_state") or "") + if access_state == "read": + return f"Queue this pipeline now; {trusted_input_text} is only readable." + if access_state == "exists-only": + return ( + f"Queue this pipeline now; {trusted_input_text} is visible, but source control " + "is still unproven." + ) + return "Queue this pipeline now, but source poisoning is still unproven." + if primary_input: + input_type = str(primary_input.get("input_type") or "") + if input_type == "pipeline-artifact": + return ( + f"Artifact trust is visible at {trusted_input_text}, but upstream producer control " + "is unproven." + ) + access_state = str(primary_input.get("current_operator_access_state") or "") + if access_state == "read": + return f"{trusted_input_text} is visible and readable, but not writable from current evidence." + if access_state == "exists-only": + return f"{trusted_input_text} is visible, but current control is unproven." + return f"Source depends on {trusted_input_text}." + return "Azure-facing pipeline is visible, but the source-side insertion point is still unproven." + + +def _deployment_automation_insertion_point(source: dict, *, path_concept: str | None) -> str: + primary_mode = str(source.get("primary_start_mode") or "") or None + primary_runbook = str(source.get("primary_runbook_name") or "") or None + identity_type = str(source.get("identity_type") or "").strip() or None + primary_clause = _automation_primary_run_path_clause( + primary_mode=primary_mode, + primary_runbook=primary_runbook, + identity_type=identity_type, + ) + webhook_runbooks = [str(value) for value in (source.get("webhook_runbook_names") or []) if value] + schedule_runbooks = [str(value) for value in (source.get("schedule_runbook_names") or []) if value] + hybrid_count = int(source.get("hybrid_worker_group_count") or 0) + + surfaces: list[str] = [] + if primary_clause: + surfaces.append(primary_clause) + elif webhook_runbooks: + surfaces.append("webhook-triggerable runbooks " + ", ".join(webhook_runbooks[:2])) + elif schedule_runbooks: + surfaces.append("schedule-backed runbooks " + ", ".join(schedule_runbooks[:2])) + if hybrid_count > 0: + surfaces.append(f"{hybrid_count} Hybrid Worker reach point(s)") + operator_clause = _automation_current_operator_control_clause(source) + if operator_clause: + surfaces.append(operator_clause) + + if surfaces and path_concept == "secret-escalation-support": + return "; ".join(surfaces) + "." + if surfaces: + if operator_clause: + return "; ".join(surfaces) + "." + return "; ".join(surfaces) + ", but current operator control is still unproven." + if path_concept == "secret-escalation-support": + return "Reusable automation support is visible, but no operator-controlled run path is proven." + return "Automation consequences are grounded, but the operator-controlled start or edit path is still unproven." + + +def _automation_joined_permission( + source: dict, + permissions_by_principal: dict[str, dict], +) -> dict | None: + identity_refs = _automation_identity_refs(source) + for ref in identity_refs: + if ref in permissions_by_principal: + return dict(permissions_by_principal[ref]) + return None + + +def _automation_joined_role_trusts( + source: dict, + trusts_by_source_id: dict[str, list[dict]], +) -> list[dict]: + identity_refs = _automation_identity_refs(source) + seen: set[tuple[str, str, str]] = set() + joined: list[dict] = [] + for ref in identity_refs: + for trust in trusts_by_source_id.get(ref, []): + key = ( + str(trust.get("source_object_id") or ""), + str(trust.get("trust_type") or ""), + str(trust.get("target_object_id") or ""), + ) + if key in seen: + continue + seen.add(key) + joined.append(dict(trust)) + return joined + + +def _automation_identity_refs(source: dict) -> list[str]: + refs: list[str] = [] + for value in ( + source.get("principal_id"), + *list(source.get("identity_join_ids") or []), + *list(source.get("identity_ids") or []), + ): + text = str(value or "").strip() + if text and text not in refs: + refs.append(text) + return refs + + +def _automation_primary_run_path_clause( + *, + primary_mode: str | None, + primary_runbook: str | None, + identity_type: str | None, +) -> str | None: + identity_clause = f" under automation identity {identity_type}" if identity_type else "" + if primary_mode == "webhook" and primary_runbook: + return f"webhook path can start runbook {primary_runbook}{identity_clause}" + if primary_mode == "schedule" and primary_runbook: + return f"schedule path can start runbook {primary_runbook}{identity_clause}" + if primary_mode == "manual-only" and primary_runbook: + return f"published runbook {primary_runbook}{identity_clause} is visible" + if primary_mode == "published-runbook" and primary_runbook: + return f"published runbook {primary_runbook}{identity_clause} is visible" + if primary_mode == "hybrid-worker": + return "hybrid-worker-backed execution is visible" + return None + + +def _automation_primary_run_path_evidence_sentence(source: dict) -> str | None: + primary_mode = str(source.get("primary_start_mode") or "") or None + primary_runbook = str(source.get("primary_runbook_name") or "") or None + identity_type = str(source.get("identity_type") or "").strip() or None + identity_clause = f" under automation identity {identity_type}" if identity_type else "" + if primary_mode == "webhook" and primary_runbook: + return f"AzureFox can identify a webhook path into runbook {primary_runbook}{identity_clause}." + if primary_mode == "schedule" and primary_runbook: + return f"AzureFox can identify a schedule-backed path into runbook {primary_runbook}{identity_clause}." + if primary_mode in {"manual-only", "published-runbook"} and primary_runbook: + return f"AzureFox can identify published runbook {primary_runbook}{identity_clause}." + if primary_mode == "hybrid-worker": + return "AzureFox can identify hybrid-worker-backed execution." + return None + + +def _automation_current_operator_access( + source: dict, + role_assignments: list[dict], +) -> dict | None: + resource_id = str(source.get("id") or "").strip() + if not resource_id: + return None + + best_access: dict | None = None + best_sort_key: tuple[int, int] | None = None + for assignment in role_assignments: + scope_id = str(assignment.get("scope_id") or "").strip() + role_name = str(assignment.get("role_name") or "").strip() + if not scope_id or not _scope_applies_to_resource(scope_id, resource_id): + continue + + capability = _automation_role_capability( + role_name=role_name, + role_definition_id=str(assignment.get("role_definition_id") or "").strip() or None, + ) + if capability is None: + continue + + sort_key = (1 if capability == "edit" else 0, len(scope_id)) + if best_sort_key is not None and sort_key <= best_sort_key: + continue + best_sort_key = sort_key + best_access = { + "capability": capability, + "role_name": role_name, + "scope_id": scope_id, + } + return best_access + + +def _automation_current_operator_control_clause(source: dict) -> str | None: + access = source.get("current_operator_access") + if not isinstance(access, dict): + return None + + capability = str(access.get("capability") or "").strip() + role_name = str(access.get("role_name") or "").strip() + scope_id = str(access.get("scope_id") or "").strip() + if not capability or not role_name or not scope_id: + return None + + scope_text = _automation_scope_label(scope_id, resource_id=str(source.get("id") or "")) + primary_mode = str(source.get("primary_start_mode") or "").strip() or None + primary_runbook = str(source.get("primary_runbook_name") or "").strip() or None + + if capability == "edit": + if primary_runbook and primary_mode == "webhook": + return ( + f"current role assignment {role_name} at {scope_text} can edit runbook " + f"{primary_runbook} or its webhook-backed execution boundary; AzureFox does not " + "prove possession of the current webhook URI" + ) + if primary_runbook and primary_mode == "schedule": + return ( + f"current role assignment {role_name} at {scope_text} can edit runbook " + f"{primary_runbook} or its schedule-backed execution boundary" + ) + if primary_runbook: + return ( + f"current role assignment {role_name} at {scope_text} can edit published " + f"runbook {primary_runbook}" + ) + return f"current role assignment {role_name} at {scope_text} can edit this automation execution boundary" + + if primary_runbook: + return ( + f"current role assignment {role_name} at {scope_text} can start runbook " + f"{primary_runbook}, but edit control is still unproven" + ) + return ( + f"current role assignment {role_name} at {scope_text} can start visible runbook jobs, " + "but edit control is still unproven" + ) + + +def _automation_scope_label(scope_id: str, *, resource_id: str) -> str: + normalized_scope = _normalized_arm_segments(scope_id) + normalized_resource = _normalized_arm_segments(resource_id) + if normalized_scope == normalized_resource: + return "this automation account" + if _arm_scope_kind(scope_id) == "resource_group": + return f"resource group {_arm_scope_name(scope_id)}" + if _arm_scope_kind(scope_id) == "subscription": + return "subscription scope" + if _arm_scope_kind(scope_id) == "resource": + resource_name = _arm_scope_name(scope_id) + if resource_name: + return f"resource scope {resource_name}" + return "a parent scope of this automation account" + + +def _scope_applies_to_resource(scope_id: str | None, resource_id: str | None) -> bool: + if not scope_id or not resource_id: + return False + scope_segments = _normalized_arm_segments(scope_id) + resource_segments = _normalized_arm_segments(resource_id) + if not scope_segments or len(scope_segments) > len(resource_segments): + return False + return resource_segments[: len(scope_segments)] == scope_segments + + +def _normalized_arm_segments(value: str | None) -> tuple[str, ...]: + if not value: + return () + return tuple(part.strip().lower() for part in str(value).split("/") if part.strip()) + + +def _arm_scope_kind(scope_id: str | None) -> str | None: + segments = _normalized_arm_segments(scope_id) + if len(segments) == 2 and segments[0] == "subscriptions": + return "subscription" + if len(segments) == 4 and segments[0] == "subscriptions" and segments[2] == "resourcegroups": + return "resource_group" + if len(segments) >= 8 and segments[0] == "subscriptions" and segments[2] == "resourcegroups": + return "resource" + return None + + +def _arm_scope_name(scope_id: str | None) -> str | None: + parts = [part for part in str(scope_id or "").split("/") if part] + if not parts: + return None + kind = _arm_scope_kind(scope_id) + if kind == "resource_group" and len(parts) >= 4: + return parts[3] + if kind == "resource": + return parts[-1] + return None + + +def _normalize_role_name(value: str | None) -> str: + return " ".join(str(value or "").split()).strip().lower() + + +def _role_definition_key(role_definition_id: str | None) -> str | None: + text = str(role_definition_id or "").strip() + if not text: + return None + return text.rstrip("/").split("/")[-1].lower() + + +def _automation_role_capability( + *, + role_name: str | None, + role_definition_id: str | None, +) -> str | None: + role_key = _role_definition_key(role_definition_id) + if role_key in _AUTOMATION_EDIT_ROLE_DEFINITION_IDS: + return "edit" + + normalized_role = _normalize_role_name(role_name) + if normalized_role in _AUTOMATION_EDIT_ROLE_NAMES: + return "edit" + if normalized_role in _AUTOMATION_START_ROLE_NAMES: + return "start" + return None + + +def _deployment_priority_override( + *, + source_command: str, + source: dict, + path_concept: str | None, + semantic_priority: str, +) -> str: + if path_concept == "secret-escalation-support": + return "low" + if source_command == "automation" and _source_current_operator_can_inject(source_command, source): + return "high" + return semantic_priority + + +def _automation_permission_clause(source: dict) -> str | None: + permission = source.get("joined_permission") + if not isinstance(permission, dict): + return None + if not bool(permission.get("privileged")): + return None + roles = [str(role) for role in permission.get("high_impact_roles") or [] if role] + role_text = ", ".join(roles) or "high-impact RBAC" + scope_count = int(permission.get("scope_count") or 0) + scope_text = "subscription-wide scope" if scope_count <= 1 else f"{scope_count} visible scopes" + principal_name = str( + permission.get("display_name") + or source.get("name") + or permission.get("principal_id") + or "automation identity" + ) + return f"automation identity '{principal_name}' already has {role_text} across {scope_text}" + + +def _automation_role_trust_clause(source: dict) -> str | None: + trusts = source.get("joined_role_trusts") + if not isinstance(trusts, list) or not trusts: + return None + trust = trusts[0] if isinstance(trusts[0], dict) else None + if not trust: + return None + trust_type = str(trust.get("trust_type") or "") + target_name = str(trust.get("target_name") or trust.get("target_object_id") or "unknown target") + if trust_type == "service-principal-owner": + return f"role-trusts also show owner-level control over service principal '{target_name}'" + if trust_type == "app-owner": + return f"role-trusts also show owner control over application '{target_name}'" + if trust_type == "federated-credential": + return f"role-trusts also show federated trust into service principal '{target_name}'" + if trust_type == "app-to-service-principal": + return f"role-trusts also show application-permission reach into service principal '{target_name}'" + summary = str(trust.get("summary") or "").strip() + return summary or None + + def _deployment_why_care( source_command: str, source: dict, @@ -611,10 +1091,6 @@ def _deployment_why_care( current_operator_suffix = _deployment_current_operator_suffix(source_command, source) if current_operator_suffix: sentence = f"{sentence} {current_operator_suffix}" - if source.get("missing_target_mapping"): - sentence = ( - f"{sentence} AzureFox has not yet mapped the downstream Azure footprint cleanly." - ) return sentence if source_command == "devops": @@ -649,16 +1125,26 @@ def _deployment_why_care( return sentence if source_command == "automation": + primary_mode = str(source.get("primary_start_mode") or "") or None + primary_runbook = str(source.get("primary_runbook_name") or "") or None + identity_type = str(source.get("identity_type") or "").strip() or None + primary_clause = _automation_primary_run_path_clause( + primary_mode=primary_mode, + primary_runbook=primary_runbook, + identity_type=identity_type, + ) surface_parts: list[str] = [] - if source.get("identity_type"): + if primary_clause: + surface_parts.append(primary_clause) + elif source.get("identity_type"): surface_parts.append("managed identity") published = int(source.get("published_runbook_count") or 0) if published > 0: surface_parts.append(f"{published} published runbook(s)") - if "webhook-start" in assessment.change_signals: + if "webhook-start" in assessment.change_signals and primary_mode != "webhook": webhooks = int(source.get("webhook_count") or 0) surface_parts.append(f"{webhooks} webhook start path(s)") - if "scheduled-start" in assessment.change_signals: + if "scheduled-start" in assessment.change_signals and primary_mode != "schedule": schedules = int(source.get("schedule_count") or 0) surface_parts.append(f"{schedules} schedule-backed run path(s)") if "hybrid-worker-reach" in assessment.change_signals: @@ -675,10 +1161,15 @@ def _deployment_why_care( f"{sentence} Secure assets around the account could widen blast radius once a " "run path is started or modified." ) - if source.get("missing_target_mapping"): - sentence = ( - f"{sentence} AzureFox has not yet mapped the downstream Azure footprint cleanly." - ) + current_operator_suffix = _deployment_current_operator_suffix(source_command, source) + if current_operator_suffix: + sentence = f"{sentence} {current_operator_suffix}" + permission_clause = _automation_permission_clause(source) + if permission_clause: + sentence = f"{sentence} The {permission_clause}." + trust_clause = _automation_role_trust_clause(source) + if trust_clause: + sentence = f"{sentence} {trust_clause.capitalize()}." return sentence return "Visible source evidence suggests Azure change capability" @@ -787,6 +1278,10 @@ def _deployment_current_operator_suffix(source_command: str, source: dict) -> st return "Current evidence only shows that the trusted input exists." if source.get("missing_injection_point"): return "AzureFox has not yet proven a poisonable trusted input for current credentials." + if source_command == "automation": + clause = _automation_current_operator_control_clause(source) + if clause: + return clause[:1].upper() + clause[1:] + "." return "" @@ -796,6 +1291,10 @@ def _source_current_operator_can_drive(source_command: str, source: dict) -> boo edit = source.get("current_operator_can_edit") if isinstance(queue, bool) or isinstance(edit, bool): return bool(queue or edit) + if source_command == "automation": + access = source.get("current_operator_access") + if isinstance(access, dict): + return str(access.get("capability") or "") in {"start", "edit"} return None @@ -806,6 +1305,10 @@ def _source_current_operator_can_inject(source_command: str, source: dict) -> bo edit = source.get("current_operator_can_edit") if injection_surfaces or isinstance(queue, bool) or isinstance(edit, bool): return bool(injection_surfaces) + if source_command == "automation": + access = source.get("current_operator_access") + if isinstance(access, dict): + return str(access.get("capability") or "") == "edit" return None @@ -877,6 +1380,8 @@ def _deployment_likely_impact( def _deployment_confidence_boundary( *, + source_command: str | None = None, + source: dict | None = None, target_label: str, target_resolution: str, confirmation_basis: str | None, @@ -884,86 +1389,116 @@ def _deployment_confidence_boundary( current_operator_can_inject: bool | None, missing_target_mapping: bool, ) -> str: + automation_sentences: list[str] = [] + if source_command == "automation" and source is not None: + primary_sentence = _automation_primary_run_path_evidence_sentence(source) + if primary_sentence: + automation_sentences.append(primary_sentence) + current_operator_sentence = _deployment_current_operator_suffix(source_command, source) + if current_operator_sentence: + automation_sentences.append(current_operator_sentence) + permission_clause = _automation_permission_clause(source) + if permission_clause: + automation_sentences.append(f"The {permission_clause}.") + trust_clause = _automation_role_trust_clause(source) + if trust_clause: + automation_sentences.append(f"{trust_clause.capitalize()}.") + if missing_target_mapping: + if source_command == "automation": + detail = " ".join(automation_sentences).strip() + if detail: + return ( + f"{detail} AzureFox has not yet mapped the real Azure footprint beyond " + f"{target_label} evidence." + ) + return ( + "AzureFox can ground downstream consequence here, but it has not yet mapped the " + f"real Azure footprint beyond {target_label} evidence." + ) if current_operator_can_inject: return ( - "Current credentials can control the source side, but AzureFox has not yet " - f"mapped the downstream Azure footprint beyond {target_label} consequence " - "grounding." + "You can control the source side now, but AzureFox has not yet mapped the " + f"downstream Azure footprint beyond {target_label} consequence evidence." ) if current_operator_can_drive: return ( - "Current credentials can start or edit the source path, but AzureFox has not yet " - f"mapped the downstream Azure footprint beyond {target_label} consequence " - "grounding." + "You can start or edit this path now, but AzureFox has not yet mapped the " + f"downstream Azure footprint beyond {target_label} consequence evidence." ) return ( - "Current evidence shows meaningful deployment support, but AzureFox has not yet " - f"mapped the downstream Azure footprint beyond {target_label} consequence grounding." + "AzureFox can ground downstream consequence here, but it has not yet mapped the " + f"real Azure footprint beyond {target_label} evidence." ) if current_operator_can_inject: if target_resolution == "named match": return ( - f"Impact is visible, the current credentials can poison a trusted input, and " - f"the {target_label} target is joined strongly enough to " - "validate next." + f"You can poison the source now, and AzureFox has already joined the exact " + f"{target_label} target strongly enough to validate next." ) if target_resolution == "narrowed candidates": return ( - f"Impact is visible and the current credentials can poison a trusted input, but " - f"the exact {target_label} target is still unconfirmed." + f"You can poison the source now, but AzureFox still cannot name the exact " + f"{target_label} target." ) if target_resolution == "visibility blocked": return ( - f"Current credentials can poison a trusted input, but current scope cannot name " - f"the downstream {target_label} target yet." + f"You can poison the source now, but current scope still hides the downstream " + f"{target_label} target." ) if current_operator_can_drive: if target_resolution == "named match": return ( - "Impact is visible and current credentials can start or edit this path, but " - "AzureFox has not yet proven a poisonable trusted input." + "You can start or edit this path now, but AzureFox has not yet proven a " + "writable source." ) if target_resolution == "narrowed candidates": return ( - f"Impact is visible and current credentials can start this path, but AzureFox " - f"has not yet proven a poisonable trusted input or the exact {target_label} " - "target." + f"You can start this path now, but AzureFox has not yet proven a writable " + f"source or the exact {target_label} target." ) if target_resolution == "visibility blocked": return ( - f"Current credentials can start this path, but AzureFox has not yet proven a " - f"poisonable trusted input and current scope cannot name the downstream " - f"{target_label} target." + f"You can start this path now, but AzureFox has not yet proven a writable " + f"source and current scope cannot name the downstream {target_label} target." ) if target_resolution == "named match": if confirmation_basis == "parsed-config-target": return ( - f"Impact is visible and the {target_label} target is joined from parsed source " - "clues, but AzureFox has not yet proven that the current credentials can invoke " - "this path." + f"AzureFox can name the exact {target_label} target from parsed source clues, " + "but current-credential invocation is still unproven." ) return ( - f"Impact is visible and the {target_label} target is backed by a stronger visible " - "join, but AzureFox has not yet proven that the current credentials can invoke this " - "path." + f"AzureFox can name the exact {target_label} target, but current-credential " + "invocation is still unproven." ) if target_resolution == "narrowed candidates": return ( - f"Impact is visible, but AzureFox has not yet proven current-credential invocation " - f"or the exact {target_label} target." + f"AzureFox narrowed the likely {target_label} targets, but current-credential " + "invocation is still unproven." ) if target_resolution == "visibility blocked": + if source_command == "automation": + detail = " ".join(automation_sentences).strip() + if detail: + return ( + f"{detail} Current scope still hides the downstream {target_label} target, " + "so AzureFox cannot complete the target-side actionability judgment yet." + ) + return ( + f"Current scope still hides the downstream {target_label} target, so " + "AzureFox cannot complete the actionability judgment yet." + ) return ( - f"Impact is partially visible, but AzureFox has not yet proven current-credential " - f"invocation and current scope cannot name the downstream {target_label} target." + f"Current scope still hides the downstream {target_label} target, so AzureFox " + "cannot complete the actionability judgment yet." ) return ( - f"Current evidence does not yet hold a defensible {target_label} target story or prove " - "that the current credentials can drive the source path." + f"AzureFox still cannot show either a defensible {target_label} target story or a " + "current operator path into this source." ) @@ -975,6 +1510,12 @@ def _deployment_evidence_commands( supporting_deployments: list[dict], ) -> list[str]: commands = [source_command, "permissions"] + if source_command == "automation" and source.get("current_operator_access"): + commands.append("rbac") + if source_command == "automation" and ( + source.get("principal_id") or source.get("client_id") or source.get("identity_ids") + ): + commands.append("role-trusts") if source.get("azure_service_connection_client_ids") or source.get( "azure_service_connection_principal_ids" ): @@ -984,7 +1525,7 @@ def _deployment_evidence_commands( commands.append(_DEPLOYMENT_TARGET_SPECS[target_family]["command"]) if supporting_deployments and "arm-deployments" not in commands: commands.append("arm-deployments") - return commands + return list(dict.fromkeys(commands)) def _deployment_next_review( @@ -995,6 +1536,54 @@ def _deployment_next_review( target_family: str, target_resolution: str, ) -> str: + if source_command == "automation": + primary_mode = str(source.get("primary_start_mode") or "") or None + primary_runbook = str(source.get("primary_runbook_name") or "") or None + permission_clause = _automation_permission_clause(source) + trust_clause = _automation_role_trust_clause(source) + current_operator_can_edit = bool(_source_current_operator_can_inject(source_command, source)) + current_operator_can_start = bool(_source_current_operator_can_drive(source_command, source)) + if path_concept == "secret-escalation-support": + steps = ["Confirm what separate foothold could reuse this secret-backed support"] + elif current_operator_can_edit: + steps = ["Current RBAC evidence already shows edit-capable automation control here"] + elif current_operator_can_start: + steps = ["Current RBAC evidence already shows runbook-start control here"] + elif permission_clause: + steps = ["Validate what Azure scope the automation identity can already change"] + else: + steps = ["Check permissions for the automation identity behind this execution path"] + if current_operator_can_edit and primary_runbook and primary_mode == "webhook": + steps.append( + f"map what runbook {primary_runbook} changes because current control does not depend on the webhook URI" + ) + elif current_operator_can_edit and primary_runbook: + steps.append(f"map what runbook {primary_runbook} changes on the Azure side") + elif current_operator_can_start and primary_runbook: + steps.append(f"confirm whether runbook {primary_runbook} also has an editable trigger or definition path") + elif primary_runbook and primary_mode == "webhook": + steps.append(f"confirm whether current credentials can trigger webhook runbook {primary_runbook}") + elif primary_runbook and primary_mode == "schedule": + steps.append(f"confirm whether current credentials can influence scheduled runbook {primary_runbook}") + elif primary_runbook: + steps.append(f"confirm how runbook {primary_runbook} is started from current credentials") + else: + steps.append("confirm which runbook and trigger path performs the Azure change") + if trust_clause: + steps.append("review role-trusts around the automation identity's downstream control") + elif source.get("principal_id") or source.get("client_id") or source.get("identity_ids"): + steps.append("review role-trusts for controllable automation identity links") + target_command = _DEPLOYMENT_TARGET_SPECS[target_family]["command"] + if source.get("missing_target_mapping"): + steps.append( + f"use {target_command} as consequence grounding because runbook target mapping is still missing" + ) + elif target_resolution == "visibility blocked": + steps.append(f"restore {target_command} visibility for consequence grounding") + else: + steps.append(f"open {target_command} to validate the likely Azure impact") + return "; ".join(steps) + "." + if path_concept == "secret-escalation-support": steps: list[str] = ["Confirm what separate foothold could reuse this secret-backed support"] elif _source_current_operator_can_inject(source_command, source): @@ -1082,6 +1671,8 @@ def _deployment_summary( f"{target_label} candidate(s): {', '.join(target_names[:_CANDIDATE_LIMIT])}." ) confidence_boundary = _deployment_confidence_boundary( + source_command=source_command, + source=source, target_label=target_label, target_resolution=target_resolution, confirmation_basis=confirmation_basis, @@ -1164,6 +1755,7 @@ def _normalize_target_name(value: str) -> str: def _deployment_missing_confirmation( *, + source: dict, source_command: str, path_concept: str | None, target_label: str, @@ -1194,6 +1786,32 @@ def _deployment_missing_confirmation( "and current evidence still does not prove a poisonable trusted input or a " "definition-edit path for current credentials." ) + if _source_current_operator_can_inject(source_command, source): + return ( + f"Missing target-side visibility for the downstream {target_label} footprint; " + "current RBAC evidence already shows edit-capable automation control, but AzureFox " + "still cannot name which Azure target that control reaches." + ) + if _source_current_operator_can_drive(source_command, source): + return ( + f"Missing target-side visibility for the downstream {target_label} footprint; " + "current RBAC evidence already shows runbook-start control, but AzureFox still " + "cannot name which Azure target that execution reaches." + ) + primary_mode = str(source.get("primary_start_mode") or "") or None + primary_runbook = str(source.get("primary_runbook_name") or "") or None + permission_clause = _automation_permission_clause(source) + if primary_runbook and primary_mode: + return ( + f"Missing target-side visibility for the downstream {target_label} footprint, and " + f"current evidence does not show that the current credentials can control the " + f"{primary_mode} path into runbook {primary_runbook}" + + ( + f" even though the {permission_clause}." + if permission_clause + else "." + ) + ) return ( f"Missing target-side visibility for the downstream {target_label} footprint, and " "current evidence does not show that the current credentials can start the runbook " @@ -1216,6 +1834,32 @@ def _deployment_missing_confirmation( "evidence does not confirm a poisonable trusted input or a current-credential " "definition-edit path." ) + if _source_current_operator_can_inject(source_command, source): + return ( + f"Missing exact {target_label} mapping; current RBAC evidence already shows " + "edit-capable automation control, but AzureFox has not yet mapped which Azure target " + "that control reaches." + ) + if _source_current_operator_can_drive(source_command, source): + return ( + f"Missing exact {target_label} mapping; current RBAC evidence already shows " + "runbook-start control, but AzureFox has not yet mapped which Azure target that " + "execution reaches." + ) + primary_mode = str(source.get("primary_start_mode") or "") or None + primary_runbook = str(source.get("primary_runbook_name") or "") or None + permission_clause = _automation_permission_clause(source) + if primary_runbook and primary_mode: + return ( + f"Missing exact {target_label} mapping and runbook-level execution proof; current " + f"evidence does not show that the current credentials can control the {primary_mode} " + f"path into runbook {primary_runbook}" + + ( + f" even though the {permission_clause}." + if permission_clause + else "." + ) + ) return ( f"Missing exact {target_label} mapping and runbook-level execution proof; current " "evidence does not show that the current credentials can start the published runbook path " diff --git a/src/azurefox/collectors/provider.py b/src/azurefox/collectors/provider.py index 004b047..2309c98 100644 --- a/src/azurefox/collectors/provider.py +++ b/src/azurefox/collectors/provider.py @@ -3818,8 +3818,16 @@ def _automation_account_summary( webhook_count=_len_or_none(webhooks), hybrid_worker_group_count=_len_or_none(hybrid_worker_groups), ) + published_runbook_names = _automation_published_runbook_names(runbooks) schedule_runbook_names = _automation_schedule_runbook_names(job_schedules) webhook_runbook_names = _automation_webhook_runbook_names(webhooks) + primary_start_mode, primary_runbook_name = _automation_primary_run_path( + start_modes=start_modes, + published_runbook_names=published_runbook_names, + schedule_runbook_names=schedule_runbook_names, + webhook_runbook_names=webhook_runbook_names, + hybrid_worker_group_count=_len_or_none(hybrid_worker_groups), + ) hybrid_worker_group_ids = _dedupe_strings( _string_value(getattr(group, "id", None)) for group in (hybrid_worker_groups or []) ) @@ -3904,6 +3912,7 @@ def _automation_account_summary( "identity_ids": identity_ids, "runbook_count": _len_or_none(runbooks), "published_runbook_count": published_runbook_count, + "published_runbook_names": published_runbook_names, "schedule_count": _len_or_none(schedules), "job_schedule_count": _len_or_none(job_schedules), "webhook_count": _len_or_none(webhooks), @@ -3914,6 +3923,8 @@ def _automation_account_summary( "variable_count": _len_or_none(variables), "encrypted_variable_count": encrypted_variable_count, "start_modes": start_modes, + "primary_start_mode": primary_start_mode, + "primary_runbook_name": primary_runbook_name, "schedule_runbook_names": schedule_runbook_names, "webhook_runbook_names": webhook_runbook_names, "hybrid_worker_group_ids": hybrid_worker_group_ids, @@ -4067,6 +4078,45 @@ def _automation_start_modes( return _dedupe_strings(modes) +def _automation_published_runbook_names(runbooks: list[object] | None) -> list[str]: + if runbooks is None: + return [] + published_names: list[str] = [] + for runbook in runbooks: + state = str( + getattr(getattr(runbook, "properties", None), "state", None) + or getattr(runbook, "state", None) + or "" + ).lower() + if state != "published": + continue + name = _automation_runbook_name(runbook) + if name: + published_names.append(name) + return _dedupe_strings(published_names) + + +def _automation_primary_run_path( + *, + start_modes: list[str], + published_runbook_names: list[str], + schedule_runbook_names: list[str], + webhook_runbook_names: list[str], + hybrid_worker_group_count: int | None, +) -> tuple[str | None, str | None]: + if webhook_runbook_names: + return "webhook", webhook_runbook_names[0] + if schedule_runbook_names: + return "schedule", schedule_runbook_names[0] + if published_runbook_names: + if "manual-only" in start_modes: + return "manual-only", published_runbook_names[0] + return "published-runbook", published_runbook_names[0] + if hybrid_worker_group_count and hybrid_worker_group_count > 0: + return "hybrid-worker", None + return None, None + + def _automation_schedule_runbook_names(job_schedules: list[object] | None) -> list[str]: if job_schedules is None: return [] @@ -4082,6 +4132,8 @@ def _automation_webhook_runbook_names(webhooks: list[object] | None) -> list[str def _automation_runbook_name(item: object) -> str | None: properties = getattr(item, "properties", None) for candidate in ( + getattr(item, "name", None), + getattr(properties, "name", None), getattr(item, "runbook_name", None), getattr(properties, "runbook_name", None), getattr(getattr(item, "runbook", None), "name", None), diff --git a/src/azurefox/models/chains.py b/src/azurefox/models/chains.py index 0784dfa..992b67b 100644 --- a/src/azurefox/models/chains.py +++ b/src/azurefox/models/chains.py @@ -53,6 +53,7 @@ class ChainPathRecord(BaseModel): confirmation_basis: str | None = None priority: str urgency: str | None = None + actionability_state: str | None = None visible_path: str insertion_point: str | None = None path_concept: str | None = None diff --git a/src/azurefox/models/common.py b/src/azurefox/models/common.py index 7dda0d3..5101abd 100644 --- a/src/azurefox/models/common.py +++ b/src/azurefox/models/common.py @@ -755,6 +755,7 @@ class AutomationAccountAsset(BaseModel): identity_ids: list[str] = Field(default_factory=list) runbook_count: int | None = None published_runbook_count: int | None = None + published_runbook_names: list[str] = Field(default_factory=list) schedule_count: int | None = None job_schedule_count: int | None = None webhook_count: int | None = None @@ -765,6 +766,8 @@ class AutomationAccountAsset(BaseModel): variable_count: int | None = None encrypted_variable_count: int | None = None start_modes: list[str] = Field(default_factory=list) + primary_start_mode: str | None = None + primary_runbook_name: str | None = None schedule_runbook_names: list[str] = Field(default_factory=list) webhook_runbook_names: list[str] = Field(default_factory=list) hybrid_worker_group_ids: list[str] = Field(default_factory=list) diff --git a/src/azurefox/render/table.py b/src/azurefox/render/table.py index 9064825..79b0f74 100644 --- a/src/azurefox/render/table.py +++ b/src/azurefox/render/table.py @@ -630,10 +630,10 @@ def _table_spec(command: str, payload: dict) -> tuple[list[tuple[str, str]], lis ("priority", "priority"), ("urgency", "urgency"), ("asset_name", "source"), - ("path_concept", "path type"), - ("why_care", "why care"), + ("actionability_state", "actionability"), + ("insertion_point", "insertion point"), ("likely_impact", "likely azure impact"), - ("confidence_boundary", "confidence boundary"), + ("confidence_boundary", "what's missing"), ("next_review", "next review"), ], [ @@ -641,12 +641,14 @@ def _table_spec(command: str, payload: dict) -> tuple[list[tuple[str, str]], lis "priority": item.get("priority"), "urgency": item.get("urgency") or "-", "asset_name": item.get("asset_name"), - "path_concept": _deployment_path_type(item), - "why_care": item.get("why_care") or item.get("asset_kind"), + "actionability_state": _deployment_actionability_state_label(item), + "insertion_point": item.get("insertion_point") + or _deployment_path_type(item), "likely_impact": item.get("likely_impact") or _chains_target_context(item), "confidence_boundary": item.get("confidence_boundary") or _chains_note(item, family=family), "next_review": item.get("next_review"), + "why_care": item.get("why_care") or item.get("asset_kind"), } for item in payload.get("paths", []) ], @@ -1945,6 +1947,18 @@ def _deployment_path_type(item: dict) -> str: return labels.get(concept, concept or "-") +def _deployment_actionability_state_label(item: dict) -> str: + state = str(item.get("actionability_state") or "") + labels = { + "currently actionable": "currently actionable", + "conditionally actionable": "conditionally actionable", + "consequence-grounded but insertion point unproven": "grounded, insertion unproven", + "visibility-bounded": "visibility-bounded", + "support-only": "support-only", + } + return labels.get(state, state or "-") + + def _escalation_path_type(item: dict) -> str: concept = str(item.get("path_concept") or "") labels = { diff --git a/tests/fixtures/lab_tenant/automation.json b/tests/fixtures/lab_tenant/automation.json index 3741ea9..3e25e02 100644 --- a/tests/fixtures/lab_tenant/automation.json +++ b/tests/fixtures/lab_tenant/automation.json @@ -15,6 +15,14 @@ ], "runbook_count": 7, "published_runbook_count": 6, + "published_runbook_names": [ + "Baseline-Config", + "Nightly-Reconcile", + "Redeploy-App", + "Reapply-Agent", + "Sync-Secrets", + "Rotate-Certs" + ], "schedule_count": 4, "job_schedule_count": 5, "webhook_count": 2, @@ -66,6 +74,8 @@ "webhook", "hybrid-worker" ], + "primary_start_mode": "webhook", + "primary_runbook_name": "Redeploy-App", "trigger_join_ids": [ "automation-job-schedule:baseline-nightly", "automation-webhook:redeploy-api", @@ -89,6 +99,9 @@ "identity_ids": [], "runbook_count": 2, "published_runbook_count": 1, + "published_runbook_names": [ + "Lab-Maintenance" + ], "schedule_count": 1, "job_schedule_count": 1, "webhook_count": 0, @@ -126,6 +139,8 @@ "schedule", "job-schedule" ], + "primary_start_mode": "schedule", + "primary_runbook_name": "Lab-Maintenance", "trigger_join_ids": [ "automation-job-schedule:lab-maintenance" ], diff --git a/tests/fixtures/lab_tenant/permissions.json b/tests/fixtures/lab_tenant/permissions.json index 63ffd6d..8d813eb 100644 --- a/tests/fixtures/lab_tenant/permissions.json +++ b/tests/fixtures/lab_tenant/permissions.json @@ -33,6 +33,24 @@ ], "privileged": false, "is_current_identity": false + }, + { + "principal_id": "12121212-1212-1212-1212-121212121212", + "display_name": "aa-hybrid-prod-mi", + "principal_type": "ServicePrincipal", + "high_impact_roles": [ + "Contributor" + ], + "all_role_names": [ + "Contributor" + ], + "role_assignment_count": 1, + "scope_count": 1, + "scope_ids": [ + "/subscriptions/22222222-2222-2222-2222-222222222222/resourceGroups/rg-ops" + ], + "privileged": true, + "is_current_identity": false } ], "issues": [] diff --git a/tests/fixtures/lab_tenant/role_trusts.json b/tests/fixtures/lab_tenant/role_trusts.json index d492100..d9ca05b 100644 --- a/tests/fixtures/lab_tenant/role_trusts.json +++ b/tests/fixtures/lab_tenant/role_trusts.json @@ -17,6 +17,22 @@ "55555555-5555-5555-5555-555555555555" ] }, + { + "trust_type": "service-principal-owner", + "source_object_id": "12121212-1212-1212-1212-121212121212", + "source_name": "aa-hybrid-prod", + "source_type": "ServicePrincipal", + "target_object_id": "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa", + "target_name": "ops-deploy-sp", + "target_type": "ServicePrincipal", + "evidence_type": "graph-owner", + "confidence": "confirmed", + "summary": "Owner 'aa-hybrid-prod' can modify service principal 'ops-deploy-sp'.", + "related_ids": [ + "12121212-1212-1212-1212-121212121212", + "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa" + ] + }, { "trust_type": "service-principal-owner", "source_object_id": "88888888-8888-8888-8888-888888888888", diff --git a/tests/golden/automation.json b/tests/golden/automation.json index cf19aa2..e0d4da4 100644 --- a/tests/golden/automation.json +++ b/tests/golden/automation.json @@ -23,6 +23,14 @@ ], "runbook_count": 7, "published_runbook_count": 6, + "published_runbook_names": [ + "Baseline-Config", + "Nightly-Reconcile", + "Redeploy-App", + "Reapply-Agent", + "Sync-Secrets", + "Rotate-Certs" + ], "schedule_count": 4, "job_schedule_count": 5, "webhook_count": 2, @@ -38,6 +46,8 @@ "webhook", "hybrid-worker" ], + "primary_start_mode": "webhook", + "primary_runbook_name": "Redeploy-App", "schedule_runbook_names": [ "Baseline-Config", "Nightly-Reconcile" @@ -97,6 +107,9 @@ "identity_ids": [], "runbook_count": 2, "published_runbook_count": 1, + "published_runbook_names": [ + "Lab-Maintenance" + ], "schedule_count": 1, "job_schedule_count": 1, "webhook_count": 0, @@ -110,6 +123,8 @@ "schedule", "job-schedule" ], + "primary_start_mode": "schedule", + "primary_runbook_name": "Lab-Maintenance", "schedule_runbook_names": [ "Lab-Maintenance" ], diff --git a/tests/golden/permissions.json b/tests/golden/permissions.json index a545384..502073f 100644 --- a/tests/golden/permissions.json +++ b/tests/golden/permissions.json @@ -30,6 +30,27 @@ ], "summary": "Current identity 'azurefox-lab-sp' already has direct control visible through Owner across subscription-wide. Check privesc for the direct abuse or escalation path behind this current identity." }, + { + "all_role_names": [ + "Contributor" + ], + "display_name": "aa-hybrid-prod-mi", + "high_impact_roles": [ + "Contributor" + ], + "is_current_identity": false, + "next_review": "Check role-trusts for trust expansion around who can influence this principal.", + "operator_signal": "Direct control visible; trust expansion follow-on.", + "principal_id": "12121212-1212-1212-1212-121212121212", + "principal_type": "ServicePrincipal", + "privileged": true, + "role_assignment_count": 1, + "scope_count": 1, + "scope_ids": [ + "/subscriptions/22222222-2222-2222-2222-222222222222/resourceGroups/rg-ops" + ], + "summary": "ServicePrincipal 'aa-hybrid-prod-mi' already has direct control visible through Contributor across subscription-wide. The next useful question is trust expansion, not more privilege ranking. Check role-trusts for trust expansion around who can influence this principal." + }, { "all_role_names": [ "Reader" diff --git a/tests/golden/role-trusts.json b/tests/golden/role-trusts.json index 511c624..ccb85f2 100644 --- a/tests/golden/role-trusts.json +++ b/tests/golden/role-trusts.json @@ -35,6 +35,30 @@ "trust_type": "federated-credential", "usable_identity_result": "Federated sign-in can yield service principal 'build-sp' access." }, + { + "confidence": "confirmed", + "control_primitive": "owner-control", + "controlled_object_name": "ops-deploy-sp", + "controlled_object_type": "ServicePrincipal", + "defender_cut_point": "Remove the owner-level control path over service principal 'ops-deploy-sp'.", + "evidence_type": "graph-owner", + "escalation_mechanism": "Owner-level control over service principal 'ops-deploy-sp' is visible, but the exact authentication-control transform is not yet explicit.", + "next_review": "Review ownership around service principal 'ops-deploy-sp', then confirm Azure control in permissions.", + "operator_signal": "Indirect control visible; ownership review next.", + "related_ids": [ + "12121212-1212-1212-1212-121212121212", + "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa" + ], + "source_name": "aa-hybrid-prod", + "source_object_id": "12121212-1212-1212-1212-121212121212", + "source_type": "ServicePrincipal", + "summary": "Owner 'aa-hybrid-prod' can modify service principal 'ops-deploy-sp'. This is an indirect-control row: ownership is the visible trust path, not direct Azure privilege by itself. Review ownership around service principal 'ops-deploy-sp', then confirm Azure control in permissions.", + "target_name": "ops-deploy-sp", + "target_object_id": "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa", + "target_type": "ServicePrincipal", + "trust_type": "service-principal-owner", + "usable_identity_result": null + }, { "confidence": "confirmed", "control_primitive": "owner-control", diff --git a/tests/test_chain_scaffold.py b/tests/test_chain_scaffold.py index 7c0bdf4..e0b04f9 100644 --- a/tests/test_chain_scaffold.py +++ b/tests/test_chain_scaffold.py @@ -23,6 +23,7 @@ ManagedIdentity, PermissionSummary, PrivescPathSummary, + RoleAssignment, RoleTrustSummary, StorageAsset, TokenCredentialSurfaceSummary, @@ -42,6 +43,7 @@ "managed-identities": set(ManagedIdentity.model_fields), "permissions": set(PermissionSummary.model_fields), "privesc": set(PrivescPathSummary.model_fields), + "rbac": set(RoleAssignment.model_fields), "role-trusts": set(RoleTrustSummary.model_fields), "storage": set(StorageAsset.model_fields), "tokens-credentials": set(TokenCredentialSurfaceSummary.model_fields), diff --git a/tests/test_cli_smoke.py b/tests/test_cli_smoke.py index 836c177..6b02c20 100644 --- a/tests/test_cli_smoke.py +++ b/tests/test_cli_smoke.py @@ -161,23 +161,22 @@ def test_cli_smoke_chains_deployment_path_table_output(tmp_path: Path) -> None: ) assert result.exit_code == 0 + normalized_output = " ".join(result.stdout.split()) assert "azurefox chains" in result.stdout assert "why care" in result.stdout - assert "path type" in result.stdout + assert "actionability" in result.stdout + assert "insertion point" in result.stdout assert "likely azure impact" in result.stdout - assert "confidence boundary" in result.stdout + assert "what's missing" in result.stdout assert "deploy-aks-prod" in result.stdout - assert "deploy-appservice-prod" in result.stdout - assert "deploy-artifact-app-p" in result.stdout assert "plan-infra-prod" in result.stdout assert "aa-hybrid-prod" in result.stdout - assert "trusted input" in result.stdout - assert "Automation account" in result.stdout - assert "controllable change" in result.stdout - assert "execution hub" in result.stdout - assert "secret-backed support" in result.stdout - assert "Check permissions for" in result.stdout - assert "consequence grounding" in result.stdout + assert "repo-content" in result.stdout + assert "currently actionable" in result.stdout + assert "conditionally" in result.stdout + assert "support-only" in result.stdout + assert "Redeploy-App" in result.stdout + assert "Lab-Maintenance" in result.stdout assert "Takeaway: 6 visible deployment paths" in result.stdout @@ -200,6 +199,9 @@ def test_cli_smoke_chains_deployment_path_json(tmp_path: Path) -> None: assert payload["backing_commands"] == [ "devops", "automation", + "permissions", + "rbac", + "role-trusts", "arm-deployments", "aks", "functions", @@ -229,18 +231,42 @@ def test_cli_smoke_chains_deployment_path_json(tmp_path: Path) -> None: "execution-hub", "secret-escalation-support", } + assert {item["actionability_state"] for item in payload["paths"]} == { + "currently actionable", + "conditionally actionable", + "consequence-grounded but insertion point unproven", + "support-only", + } support_row = next(item for item in payload["paths"] if item["asset_name"] == "aa-lab-quiet") assert support_row["priority"] == "low" + assert support_row["actionability_state"] == "support-only" + assert "Lab-Maintenance" in support_row["insertion_point"] assert "Another foothold" in support_row["why_care"] assert "target mapping is still missing" in support_row["next_review"] automation_row = next( item for item in payload["paths"] if item["asset_name"] == "aa-hybrid-prod" ) assert automation_row["target_resolution"] == "visibility blocked" + assert automation_row["actionability_state"] == "currently actionable" + assert automation_row["priority"] == "high" + assert "webhook path can start runbook Redeploy-App" in automation_row["insertion_point"] + assert "current role assignment Owner at subscription scope" in automation_row["insertion_point"] + assert "role-trusts" in automation_row["evidence_commands"] + assert "rbac" in automation_row["evidence_commands"] + assert "aa-hybrid-prod-mi" in automation_row["confidence_boundary"] + assert "ops-deploy-sp" in automation_row["why_care"] + assert "map what runbook Redeploy-App changes" in automation_row["next_review"] assert "run recurring Azure-facing execution" in automation_row["why_care"] aks_row = next(item for item in payload["paths"] if item["asset_name"] == "deploy-aks-prod") + assert aks_row["actionability_state"] == "conditionally actionable" + assert "Queue this pipeline now" in aks_row["insertion_point"] assert "permissions" in aks_row["evidence_commands"] assert "keyvault" in aks_row["evidence_commands"] + appsvc_row = next( + item for item in payload["paths"] if item["asset_name"] == "deploy-appservice-prod" + ) + assert appsvc_row["actionability_state"] == "currently actionable" + assert "Poison repository" in appsvc_row["insertion_point"] def test_cli_smoke_chains_escalation_path_json(tmp_path: Path) -> None: diff --git a/tests/test_collectors.py b/tests/test_collectors.py index a5e490d..8d50775 100644 --- a/tests/test_collectors.py +++ b/tests/test_collectors.py @@ -985,7 +985,12 @@ def test_collect_automation(fixture_provider, options) -> None: assert output.automation_accounts[0].hybrid_worker_group_count == 1 assert output.automation_accounts[0].webhook_count == 2 assert output.automation_accounts[0].identity_type == "SystemAssigned" + assert output.automation_accounts[0].primary_start_mode == "webhook" + assert output.automation_accounts[0].primary_runbook_name == "Redeploy-App" + assert "Redeploy-App" in output.automation_accounts[0].published_runbook_names assert output.automation_accounts[1].name == "aa-lab-quiet" + assert output.automation_accounts[1].primary_start_mode == "schedule" + assert output.automation_accounts[1].primary_runbook_name == "Lab-Maintenance" assert output.metadata.command == "automation" @@ -3015,7 +3020,7 @@ def test_principal_sort_key_prioritizes_high_impact_then_workload_attachment() - def test_collect_permissions(fixture_provider, options) -> None: output = collect_permissions(fixture_provider, options) - assert len(output.permissions) == 2 + assert len(output.permissions) == 3 assert output.permissions[0].privileged is True assert output.permissions[0].high_impact_roles == ["Owner"] assert output.permissions[0].operator_signal == "Direct control visible; current foothold." @@ -3024,6 +3029,11 @@ def test_collect_permissions(fixture_provider, options) -> None: == "Check privesc for the direct abuse or escalation path behind this current identity." ) assert "direct control visible" in (output.permissions[0].summary or "").lower() + assert output.permissions[1].display_name == "aa-hybrid-prod-mi" + assert output.permissions[1].high_impact_roles == ["Contributor"] + assert output.permissions[1].next_review == ( + "Check role-trusts for trust expansion around who can influence this principal." + ) def test_collect_permissions_prefers_workload_pivot_then_trust_expansion() -> None: @@ -3252,10 +3262,13 @@ def test_privesc_sort_key_prioritizes_severity_then_current_identity_then_path_t def test_collect_role_trusts(fixture_provider, options) -> None: output = collect_role_trusts(fixture_provider, options) assert output.mode == "fast" - assert len(output.trusts) == 4 + assert len(output.trusts) == 5 assert output.trusts[0].trust_type == "federated-credential" assert output.trusts[1].trust_type == "service-principal-owner" - assert output.trusts[2].trust_type == "app-owner" + assert output.trusts[1].source_name == "aa-hybrid-prod" + assert output.trusts[2].trust_type == "service-principal-owner" + assert output.trusts[2].source_name == "automation-runner" + assert output.trusts[3].trust_type == "app-owner" assert ( output.trusts[0].operator_signal == "Trust expansion visible; privilege confirmation next." ) @@ -3265,6 +3278,11 @@ def test_collect_role_trusts(fixture_provider, options) -> None: ) assert output.trusts[1].operator_signal == "Indirect control visible; ownership review next." assert output.trusts[1].next_review == ( + "Review ownership around service principal 'ops-deploy-sp', then confirm " + "Azure control in permissions." + ) + assert output.trusts[2].operator_signal == "Indirect control visible; ownership review next." + assert output.trusts[2].next_review == ( "Review ownership around service principal 'build-sp', then confirm " "Azure control in permissions." ) @@ -3279,18 +3297,22 @@ def test_collect_role_trusts(fixture_provider, options) -> None: ) assert output.trusts[1].control_primitive == "owner-control" assert output.trusts[1].controlled_object_type == "ServicePrincipal" - assert output.trusts[1].controlled_object_name == "build-sp" + assert output.trusts[1].controlled_object_name == "ops-deploy-sp" assert output.trusts[1].usable_identity_result is None assert "authentication-control transform is not yet explicit" in ( output.trusts[1].escalation_mechanism or "" ) - assert output.trusts[2].control_primitive == "change-auth-material" - assert output.trusts[2].controlled_object_type == "Application" - assert output.trusts[2].controlled_object_name == "build-app" - assert output.trusts[2].usable_identity_result == ( + assert output.trusts[2].control_primitive == "owner-control" + assert output.trusts[2].controlled_object_type == "ServicePrincipal" + assert output.trusts[2].controlled_object_name == "build-sp" + assert output.trusts[2].usable_identity_result is None + assert output.trusts[3].control_primitive == "change-auth-material" + assert output.trusts[3].controlled_object_type == "Application" + assert output.trusts[3].controlled_object_name == "build-app" + assert output.trusts[3].usable_identity_result == ( "Control of application 'build-app' could make service principal 'build-sp' usable." ) - assert output.trusts[2].defender_cut_point == ( + assert output.trusts[3].defender_cut_point == ( "Remove the ownership path that lets the source control application 'build-app'." ) diff --git a/tests/test_deployment_path_admissibility.py b/tests/test_deployment_path_admissibility.py index dc85141..ab5de9a 100644 --- a/tests/test_deployment_path_admissibility.py +++ b/tests/test_deployment_path_admissibility.py @@ -9,6 +9,10 @@ target_family_hints_from_arm_deployment, ) from azurefox.chains.runner import _structured_deployment_target_matches +from azurefox.chains.runner import ( + _automation_current_operator_access, + _automation_scope_label, +) from azurefox.models.common import ( ArmDeploymentSummary, AutomationAccountAsset, @@ -244,6 +248,54 @@ def test_structured_target_clue_can_reach_exact_named_match_input() -> None: assert [item["name"] for item in matches] == ["app-public-api"] +def test_automation_current_operator_access_uses_role_definition_id_and_best_scope_match() -> None: + access = _automation_current_operator_access( + { + "id": ( + "/subscriptions/test-sub/resourceGroups/rg-ops/providers/Microsoft.Automation/" + "automationAccounts/aa-prod" + ) + }, + [ + { + "scope_id": "/subscriptions/test-sub", + "role_definition_id": ( + "/subscriptions/test-sub/providers/Microsoft.Authorization/" + "roleDefinitions/8e3af657-a8ff-443c-a75c-2fe8c4bcb635" + ), + "role_name": "Owner (renamed locally)", + }, + { + "scope_id": ( + "/subscriptions/test-sub/resourceGroups/rg-ops/providers/Microsoft.Automation/" + "automationAccounts/aa-prod" + ), + "role_definition_id": None, + "role_name": " automation operator ", + }, + ], + ) + + assert access == { + "capability": "edit", + "role_name": "Owner (renamed locally)", + "scope_id": "/subscriptions/test-sub", + } + + +def test_automation_scope_label_keeps_child_resource_scope_distinct_from_resource_group() -> None: + scope_id = ( + "/subscriptions/test-sub/resourceGroups/rg-ops/providers/Microsoft.Automation/" + "automationAccounts/aa-prod/runbooks/Redeploy-App" + ) + resource_id = ( + "/subscriptions/test-sub/resourceGroups/rg-ops/providers/Microsoft.Automation/" + "automationAccounts/aa-prod" + ) + + assert _automation_scope_label(scope_id, resource_id=resource_id) == "resource scope Redeploy-App" + + def _load_devops_pipeline(name: str) -> DevopsPipelineAsset: payload = json.loads( (Path(__file__).resolve().parent / "fixtures" / "lab_tenant" / "devops.json").read_text( diff --git a/tests/test_terminal_ux.py b/tests/test_terminal_ux.py index fb9dd5b..bd1a222 100644 --- a/tests/test_terminal_ux.py +++ b/tests/test_terminal_ux.py @@ -37,7 +37,7 @@ def test_role_trusts_table_mode_includes_narration_and_takeaway(tmp_path: Path) assert "service principal 'build-sp' access." in normalized_output assert "authentication-control transform is not yet explicit." in normalized_output assert "Check permissions for Azure control" in result.stdout - assert "Takeaway: 4 trust edges surfaced in fast mode" in result.stdout + assert "Takeaway: 5 trust edges surfaced in fast mode" in result.stdout assert "privilege-confirmation follow-ons" in result.stdout assert "Delegated and admin" in result.stdout assert "out of scope for this" in result.stdout @@ -757,7 +757,8 @@ def test_permissions_table_mode_surfaces_next_review(tmp_path: Path) -> None: assert "next review" in result.stdout assert "Direct control visible; current foothold." in result.stdout assert "Check privesc" in result.stdout - assert "Takeaway: 1 of 2 principals hold high-impact RBAC roles;" in result.stdout + assert "aa-hybrid-prod-mi" in result.stdout + assert "Takeaway: 2 of 3 principals hold high-impact RBAC roles;" in result.stdout def test_chains_table_mode_surfaces_priority_and_next_review(tmp_path: Path) -> None: @@ -799,19 +800,21 @@ def test_deployment_chains_table_mode_surfaces_source_oriented_columns(tmp_path: assert "why care" in result.stdout assert "priority" in result.stdout assert "urgency" in result.stdout - assert "path type" in result.stdout - assert "confidence boundary" in result.stdout + assert "actionability" in result.stdout + assert "insertion point" in result.stdout + assert "what's missing" in result.stdout assert "deploy-aks-prod" in result.stdout - assert "deploy-appservice-prod" in result.stdout - assert "deploy-artifact-app-p" in result.stdout assert "plan-infra-prod" in result.stdout assert "aa-hybrid-prod" in result.stdout assert "aa-lab-quiet" in result.stdout assert "pivot-now" in normalized_output assert "upstream producer control" in normalized_output - assert "trusted input" in normalized_output - assert "execution hub" in normalized_output - assert "secret-backed support" in normalized_output + assert "currently actionable" in normalized_output + assert "conditionally" in normalized_output + assert "grounded, insertion" in normalized_output + assert "support-only" in normalized_output + assert "Redeploy-App" in normalized_output + assert "Lab-Maintenance" in normalized_output def test_deployment_chains_table_mode_renders_why_care_as_detail_rows(tmp_path: Path) -> None: From f373d0ad45de8582ab22a1efa32259b66017b539 Mon Sep 17 00:00:00 2001 From: Colby Farley Date: Thu, 9 Apr 2026 23:13:51 -0500 Subject: [PATCH 2/3] style: fix deployment-path lint --- src/azurefox/chains/runner.py | 94 ++++++++++++++++----- tests/test_cli_smoke.py | 6 +- tests/test_deployment_path_admissibility.py | 8 +- 3 files changed, 82 insertions(+), 26 deletions(-) diff --git a/src/azurefox/chains/runner.py b/src/azurefox/chains/runner.py index aba4797..1c80561 100644 --- a/src/azurefox/chains/runner.py +++ b/src/azurefox/chains/runner.py @@ -686,10 +686,14 @@ def _deployment_devops_insertion_point(source: dict) -> str: primary_input = _devops_primary_trusted_input(source) trusted_input_text = _devops_trusted_input_text(primary_input) injection_surfaces = [ - str(value) for value in (source.get("current_operator_injection_surface_types") or []) if value + str(value) + for value in (source.get("current_operator_injection_surface_types") or []) + if value ] if any(value != "definition-edit" for value in injection_surfaces): - surface_list = ", ".join(value for value in injection_surfaces if value != "definition-edit") + surface_list = ", ".join( + value for value in injection_surfaces if value != "definition-edit" + ) return f"Poison {trusted_input_text} through {surface_list}." if "definition-edit" in injection_surfaces or source.get("current_operator_can_edit"): return "Edit the pipeline definition directly." @@ -713,11 +717,17 @@ def _deployment_devops_insertion_point(source: dict) -> str: ) access_state = str(primary_input.get("current_operator_access_state") or "") if access_state == "read": - return f"{trusted_input_text} is visible and readable, but not writable from current evidence." + return ( + f"{trusted_input_text} is visible and readable, but not writable from " + "current evidence." + ) if access_state == "exists-only": return f"{trusted_input_text} is visible, but current control is unproven." return f"Source depends on {trusted_input_text}." - return "Azure-facing pipeline is visible, but the source-side insertion point is still unproven." + return ( + "Azure-facing pipeline is visible, but the source-side insertion point is " + "still unproven." + ) def _deployment_automation_insertion_point(source: dict, *, path_concept: str | None) -> str: @@ -729,8 +739,12 @@ def _deployment_automation_insertion_point(source: dict, *, path_concept: str | primary_runbook=primary_runbook, identity_type=identity_type, ) - webhook_runbooks = [str(value) for value in (source.get("webhook_runbook_names") or []) if value] - schedule_runbooks = [str(value) for value in (source.get("schedule_runbook_names") or []) if value] + webhook_runbooks = [ + str(value) for value in (source.get("webhook_runbook_names") or []) if value + ] + schedule_runbooks = [ + str(value) for value in (source.get("schedule_runbook_names") or []) if value + ] hybrid_count = int(source.get("hybrid_worker_group_count") or 0) surfaces: list[str] = [] @@ -753,8 +767,14 @@ def _deployment_automation_insertion_point(source: dict, *, path_concept: str | return "; ".join(surfaces) + "." return "; ".join(surfaces) + ", but current operator control is still unproven." if path_concept == "secret-escalation-support": - return "Reusable automation support is visible, but no operator-controlled run path is proven." - return "Automation consequences are grounded, but the operator-controlled start or edit path is still unproven." + return ( + "Reusable automation support is visible, but no operator-controlled run " + "path is proven." + ) + return ( + "Automation consequences are grounded, but the operator-controlled start " + "or edit path is still unproven." + ) def _automation_joined_permission( @@ -828,9 +848,15 @@ def _automation_primary_run_path_evidence_sentence(source: dict) -> str | None: identity_type = str(source.get("identity_type") or "").strip() or None identity_clause = f" under automation identity {identity_type}" if identity_type else "" if primary_mode == "webhook" and primary_runbook: - return f"AzureFox can identify a webhook path into runbook {primary_runbook}{identity_clause}." + return ( + f"AzureFox can identify a webhook path into runbook {primary_runbook}" + f"{identity_clause}." + ) if primary_mode == "schedule" and primary_runbook: - return f"AzureFox can identify a schedule-backed path into runbook {primary_runbook}{identity_clause}." + return ( + f"AzureFox can identify a schedule-backed path into runbook " + f"{primary_runbook}{identity_clause}." + ) if primary_mode in {"manual-only", "published-runbook"} and primary_runbook: return f"AzureFox can identify published runbook {primary_runbook}{identity_clause}." if primary_mode == "hybrid-worker": @@ -905,7 +931,10 @@ def _automation_current_operator_control_clause(source: dict) -> str | None: f"current role assignment {role_name} at {scope_text} can edit published " f"runbook {primary_runbook}" ) - return f"current role assignment {role_name} at {scope_text} can edit this automation execution boundary" + return ( + f"current role assignment {role_name} at {scope_text} can edit this " + "automation execution boundary" + ) if primary_runbook: return ( @@ -1010,7 +1039,9 @@ def _deployment_priority_override( ) -> str: if path_concept == "secret-escalation-support": return "low" - if source_command == "automation" and _source_current_operator_can_inject(source_command, source): + if source_command == "automation" and _source_current_operator_can_inject( + source_command, source + ): return "high" return semantic_priority @@ -1050,7 +1081,10 @@ def _automation_role_trust_clause(source: dict) -> str | None: if trust_type == "federated-credential": return f"role-trusts also show federated trust into service principal '{target_name}'" if trust_type == "app-to-service-principal": - return f"role-trusts also show application-permission reach into service principal '{target_name}'" + return ( + "role-trusts also show application-permission reach into service " + f"principal '{target_name}'" + ) summary = str(trust.get("summary") or "").strip() return summary or None @@ -1541,8 +1575,12 @@ def _deployment_next_review( primary_runbook = str(source.get("primary_runbook_name") or "") or None permission_clause = _automation_permission_clause(source) trust_clause = _automation_role_trust_clause(source) - current_operator_can_edit = bool(_source_current_operator_can_inject(source_command, source)) - current_operator_can_start = bool(_source_current_operator_can_drive(source_command, source)) + current_operator_can_edit = bool( + _source_current_operator_can_inject(source_command, source) + ) + current_operator_can_start = bool( + _source_current_operator_can_drive(source_command, source) + ) if path_concept == "secret-escalation-support": steps = ["Confirm what separate foothold could reuse this secret-backed support"] elif current_operator_can_edit: @@ -1555,18 +1593,31 @@ def _deployment_next_review( steps = ["Check permissions for the automation identity behind this execution path"] if current_operator_can_edit and primary_runbook and primary_mode == "webhook": steps.append( - f"map what runbook {primary_runbook} changes because current control does not depend on the webhook URI" + f"map what runbook {primary_runbook} changes because current " + "control does not depend on the webhook URI" ) elif current_operator_can_edit and primary_runbook: steps.append(f"map what runbook {primary_runbook} changes on the Azure side") elif current_operator_can_start and primary_runbook: - steps.append(f"confirm whether runbook {primary_runbook} also has an editable trigger or definition path") + steps.append( + f"confirm whether runbook {primary_runbook} also has an editable " + "trigger or definition path" + ) elif primary_runbook and primary_mode == "webhook": - steps.append(f"confirm whether current credentials can trigger webhook runbook {primary_runbook}") + steps.append( + "confirm whether current credentials can trigger webhook runbook " + f"{primary_runbook}" + ) elif primary_runbook and primary_mode == "schedule": - steps.append(f"confirm whether current credentials can influence scheduled runbook {primary_runbook}") + steps.append( + "confirm whether current credentials can influence scheduled " + f"runbook {primary_runbook}" + ) elif primary_runbook: - steps.append(f"confirm how runbook {primary_runbook} is started from current credentials") + steps.append( + f"confirm how runbook {primary_runbook} is started from current " + "credentials" + ) else: steps.append("confirm which runbook and trigger path performs the Azure change") if trust_clause: @@ -1576,7 +1627,8 @@ def _deployment_next_review( target_command = _DEPLOYMENT_TARGET_SPECS[target_family]["command"] if source.get("missing_target_mapping"): steps.append( - f"use {target_command} as consequence grounding because runbook target mapping is still missing" + f"use {target_command} as consequence grounding because runbook " + "target mapping is still missing" ) elif target_resolution == "visibility blocked": steps.append(f"restore {target_command} visibility for consequence grounding") diff --git a/tests/test_cli_smoke.py b/tests/test_cli_smoke.py index 6b02c20..4021e1e 100644 --- a/tests/test_cli_smoke.py +++ b/tests/test_cli_smoke.py @@ -161,7 +161,6 @@ def test_cli_smoke_chains_deployment_path_table_output(tmp_path: Path) -> None: ) assert result.exit_code == 0 - normalized_output = " ".join(result.stdout.split()) assert "azurefox chains" in result.stdout assert "why care" in result.stdout assert "actionability" in result.stdout @@ -250,7 +249,10 @@ def test_cli_smoke_chains_deployment_path_json(tmp_path: Path) -> None: assert automation_row["actionability_state"] == "currently actionable" assert automation_row["priority"] == "high" assert "webhook path can start runbook Redeploy-App" in automation_row["insertion_point"] - assert "current role assignment Owner at subscription scope" in automation_row["insertion_point"] + assert ( + "current role assignment Owner at subscription scope" + in automation_row["insertion_point"] + ) assert "role-trusts" in automation_row["evidence_commands"] assert "rbac" in automation_row["evidence_commands"] assert "aa-hybrid-prod-mi" in automation_row["confidence_boundary"] diff --git a/tests/test_deployment_path_admissibility.py b/tests/test_deployment_path_admissibility.py index ab5de9a..214cc5b 100644 --- a/tests/test_deployment_path_admissibility.py +++ b/tests/test_deployment_path_admissibility.py @@ -8,10 +8,10 @@ assess_deployment_source, target_family_hints_from_arm_deployment, ) -from azurefox.chains.runner import _structured_deployment_target_matches from azurefox.chains.runner import ( _automation_current_operator_access, _automation_scope_label, + _structured_deployment_target_matches, ) from azurefox.models.common import ( ArmDeploymentSummary, @@ -292,8 +292,10 @@ def test_automation_scope_label_keeps_child_resource_scope_distinct_from_resourc "/subscriptions/test-sub/resourceGroups/rg-ops/providers/Microsoft.Automation/" "automationAccounts/aa-prod" ) - - assert _automation_scope_label(scope_id, resource_id=resource_id) == "resource scope Redeploy-App" + assert ( + _automation_scope_label(scope_id, resource_id=resource_id) + == "resource scope Redeploy-App" + ) def _load_devops_pipeline(name: str) -> DevopsPipelineAsset: From 749e6bbd8c528f299dc56e02e1319743920f0acb Mon Sep 17 00:00:00 2001 From: Colby Farley Date: Thu, 9 Apr 2026 23:16:56 -0500 Subject: [PATCH 3/3] chore: refresh PR metadata checks