diff --git a/src/azurefox/chains/compute_control.py b/src/azurefox/chains/compute_control.py index 1ce3f73..1331fde 100644 --- a/src/azurefox/chains/compute_control.py +++ b/src/azurefox/chains/compute_control.py @@ -22,6 +22,7 @@ def collect_compute_control_records( loaded: dict[str, object], ) -> tuple[list[ChainPathRecord], list[CollectionIssue]]: tokens_output = loaded["tokens-credentials"] + env_output = loaded.get("env-vars") managed_output = loaded["managed-identities"] permissions_output = loaded["permissions"] workloads_output = loaded["workloads"] @@ -32,9 +33,7 @@ def collect_compute_control_records( if item.asset_id } managed_by_id = { - item.id: item.model_dump(mode="json") - for item in managed_output.identities - if item.id + item.id: item.model_dump(mode="json") for item in managed_output.identities if item.id } managed_by_principal: dict[str, list[dict]] = defaultdict(list) for item in managed_output.identities: @@ -47,6 +46,11 @@ def collect_compute_control_records( for item in permissions_output.permissions if item.principal_id and item.privileged } + env_rows_by_asset: dict[str, list[dict]] = defaultdict(list) + if env_output is not None: + for item in env_output.env_vars: + if item.asset_id: + env_rows_by_asset[item.asset_id].append(item.model_dump(mode="json")) role_assignments_by_principal: dict[str, list[dict]] = defaultdict(list) for assignment in managed_output.role_assignments: row = assignment.model_dump(mode="json") @@ -64,6 +68,53 @@ def collect_compute_control_records( if workload is None: continue + env_rows = env_rows_by_asset.get(str(row.get("asset_id") or ""), []) + if _is_mixed_identity_workload(workload): + identity_binding = _resolve_mixed_identity_binding( + surface_row=row, + workload_row=workload, + env_rows=env_rows, + managed_by_id=managed_by_id, + managed_by_principal=managed_by_principal, + ) + if identity_binding is not None: + permission_row = permissions_by_principal.get(str(identity_binding["principal_id"])) + assignment_summary = _assignment_control_summary( + str(identity_binding["principal_id"]), + role_assignments_by_principal, + ) + if permission_row is not None or assignment_summary is not None: + paths.append( + _build_compute_control_record( + family_name=family_name, + surface_row=row, + workload_row=workload, + identity_binding=identity_binding, + permission_row=permission_row, + assignment_summary=assignment_summary, + ) + ) + continue + + candidate_bindings = _mixed_identity_candidates( + surface_row=row, + workload_row=workload, + managed_by_id=managed_by_id, + managed_by_principal=managed_by_principal, + permissions_by_principal=permissions_by_principal, + role_assignments_by_principal=role_assignments_by_principal, + ) + if candidate_bindings: + paths.append( + _build_mixed_identity_candidate_record( + family_name=family_name, + surface_row=row, + workload_row=workload, + candidate_bindings=candidate_bindings, + ) + ) + continue + identity_binding = _resolve_identity_binding( surface_row=row, workload_row=workload, @@ -73,9 +124,9 @@ def collect_compute_control_records( if identity_binding is None: continue - permission_row = permissions_by_principal.get(identity_binding["principal_id"]) + permission_row = permissions_by_principal.get(str(identity_binding["principal_id"])) assignment_summary = _assignment_control_summary( - identity_binding["principal_id"], + str(identity_binding["principal_id"]), role_assignments_by_principal, ) if permission_row is None and assignment_summary is None: @@ -103,6 +154,7 @@ def collect_compute_control_records( issues = [ *getattr(tokens_output, "issues", []), + *(getattr(env_output, "issues", []) if env_output is not None else []), *getattr(managed_output, "issues", []), *getattr(permissions_output, "issues", []), *getattr(workloads_output, "issues", []), @@ -116,7 +168,7 @@ def _resolve_identity_binding( workload_row: dict, managed_by_id: dict[str, dict], managed_by_principal: dict[str, list[dict]], -) -> dict[str, str] | None: +) -> dict[str, object] | None: related_ids = [str(value) for value in surface_row.get("related_ids") or [] if value] workload_identity_ids = [ str(value) for value in workload_row.get("identity_ids") or [] if value @@ -129,52 +181,76 @@ def _resolve_identity_binding( seen_ids.add(value) managed_matches.append(managed_by_id[value]) - # Mixed system-assigned and user-assigned identities stay out of the narrow v1 - # until the actor is explicit enough to defend one default row. - if workload_row.get("identity_principal_id") and workload_identity_ids: - return None - if len(managed_matches) == 1: - principal_id = str(managed_matches[0].get("principal_id") or "").strip() - if not principal_id: - return None - return { - "principal_id": principal_id, - "identity_name": str(managed_matches[0].get("name") or principal_id), - "identity_id": str(managed_matches[0].get("id") or ""), - "binding_source": "managed-identity", - } + return _binding_from_managed_identity_row(managed_matches[0]) if len(managed_matches) > 1: return None principal_id = str(workload_row.get("identity_principal_id") or "").strip() if not principal_id: - principal_ids = { - value for value in related_ids if _UUID_RE.match(value) - } + principal_ids = {value for value in related_ids if _UUID_RE.match(value)} if len(principal_ids) == 1: principal_id = next(iter(principal_ids)) if not principal_id: return None + attached_binding = _attached_identity_binding_for_principal( + principal_id=principal_id, + asset_id=str(surface_row.get("asset_id") or ""), + managed_by_principal=managed_by_principal, + ) + if attached_binding is not None: + return attached_binding + + return _workload_principal_binding( + principal_id=principal_id, + surface_row=surface_row, + workload_row=workload_row, + ) + + +def _is_mixed_identity_workload(workload_row: dict) -> bool: + return bool(workload_row.get("identity_principal_id")) and bool( + workload_row.get("identity_ids") + ) + + +def _binding_from_managed_identity_row(managed_row: dict) -> dict[str, object] | None: + principal_id = str(managed_row.get("principal_id") or "").strip() + if not principal_id: + return None + return { + "principal_id": principal_id, + "identity_name": str(managed_row.get("name") or principal_id), + "identity_id": str(managed_row.get("id") or principal_id), + "binding_source": "managed-identity", + } + + +def _attached_identity_binding_for_principal( + *, + principal_id: str, + asset_id: str, + managed_by_principal: dict[str, list[dict]], +) -> dict[str, object] | None: principal_matches = [ item for item in managed_by_principal.get(principal_id, []) - if str(surface_row.get("asset_id") or "") - in {str(value) for value in item.get("attached_to") or []} + if asset_id in {str(value) for value in item.get("attached_to") or []} ] - if len(principal_matches) == 1: - return { - "principal_id": principal_id, - "identity_name": str(principal_matches[0].get("name") or principal_id), - "identity_id": str(principal_matches[0].get("id") or principal_id), - "binding_source": "managed-identity", - } - if len(principal_matches) > 1: + if len(principal_matches) != 1: return None + return _binding_from_managed_identity_row(principal_matches[0]) + +def _workload_principal_binding( + *, + principal_id: str, + surface_row: dict, + workload_row: dict, +) -> dict[str, object]: return { "principal_id": principal_id, "identity_name": ( @@ -185,6 +261,208 @@ def _resolve_identity_binding( } +def _resolve_mixed_identity_binding( + *, + surface_row: dict, + workload_row: dict, + env_rows: list[dict], + managed_by_id: dict[str, dict], + managed_by_principal: dict[str, list[dict]], +) -> dict[str, object] | None: + corroboration = _identity_choice_corroboration(workload_row, env_rows) + if corroboration is None: + return None + + if corroboration["identity_choice"] == "systemAssigned": + principal_id = str(workload_row.get("identity_principal_id") or "").strip() + if not principal_id: + return None + binding = _attached_identity_binding_for_principal( + principal_id=principal_id, + asset_id=str(surface_row.get("asset_id") or ""), + managed_by_principal=managed_by_principal, + ) or _workload_principal_binding( + principal_id=principal_id, + surface_row=surface_row, + workload_row=workload_row, + ) + binding["identity_choice_basis"] = corroboration["basis"] + binding["identity_choice_detail"] = corroboration["detail"] + return binding + + if corroboration["identity_choice"] == "userAssigned": + identity_id = str(corroboration.get("identity_id") or "") + if identity_id and identity_id in managed_by_id: + binding = _binding_from_managed_identity_row(managed_by_id[identity_id]) + if binding is None: + return None + binding["identity_choice_basis"] = corroboration["basis"] + binding["identity_choice_detail"] = corroboration["detail"] + return binding + return None + + +def _identity_choice_corroboration( + workload_row: dict, env_rows: list[dict] +) -> dict[str, str] | None: + identity_ids = [str(value) for value in workload_row.get("identity_ids") or [] if value] + identity_names: dict[str, set[str]] = defaultdict(set) + for identity_id in identity_ids: + normalized = _normalized_identity_selector(identity_id) + if normalized: + identity_names[normalized].add(identity_id) + identity_ids_set = set(identity_ids) + corroborations: dict[tuple[str, str], dict[str, str]] = {} + for row in env_rows: + explicit_identity = str(row.get("key_vault_reference_identity") or "").strip() + if not explicit_identity: + continue + basis = f"env-vars:{row.get('setting_name') or 'unknown-setting'}" + if explicit_identity.lower() == "systemassigned": + corroborations[("systemAssigned", "")] = { + "identity_choice": "systemAssigned", + "basis": basis, + "detail": ( + "current app configuration explicitly names SystemAssigned for a " + "collected workload behavior." + ), + } + continue + if explicit_identity in identity_ids_set: + corroborations[("userAssigned", explicit_identity)] = { + "identity_choice": "userAssigned", + "identity_id": explicit_identity, + "basis": basis, + "detail": ( + "current app configuration explicitly names the attached " + "user-assigned identity " + f"'{_display_identity_selector(explicit_identity)}' for a " + "collected workload behavior." + ), + } + continue + normalized_identity = _normalized_identity_selector(explicit_identity) + matched_ids = identity_names.get(normalized_identity or "", set()) + if len(matched_ids) == 1: + matched_identity_id = next(iter(matched_ids)) + corroborations[("userAssigned", matched_identity_id)] = { + "identity_choice": "userAssigned", + "identity_id": matched_identity_id, + "basis": basis, + "detail": ( + "current app configuration explicitly names the attached " + "user-assigned identity " + f"'{_display_identity_selector(explicit_identity)}' for a " + "collected workload behavior." + ), + } + + if len(corroborations) != 1: + return None + return next(iter(corroborations.values())) + + +def _mixed_identity_candidates( + *, + surface_row: dict, + workload_row: dict, + managed_by_id: dict[str, dict], + managed_by_principal: dict[str, list[dict]], + permissions_by_principal: dict[str, dict], + role_assignments_by_principal: dict[str, list[dict]], +) -> list[dict[str, str]]: + asset_id = str(surface_row.get("asset_id") or "") + candidates: list[dict[str, str]] = [] + seen_keys: set[tuple[str, str]] = set() + + system_principal_id = str(workload_row.get("identity_principal_id") or "").strip() + if system_principal_id: + system_match = _attached_identity_binding_for_principal( + principal_id=system_principal_id, + asset_id=asset_id, + managed_by_principal=managed_by_principal, + ) + if system_match is not None: + candidates.append(system_match) + else: + candidates.append( + _workload_principal_binding( + principal_id=system_principal_id, + surface_row=surface_row, + workload_row=workload_row, + ) + ) + + for identity_id in [str(value) for value in workload_row.get("identity_ids") or [] if value]: + managed_row = managed_by_id.get(identity_id) + if managed_row is None: + candidates.append( + { + "principal_id": "", + "identity_name": _display_identity_selector(identity_id), + "identity_id": identity_id, + } + ) + continue + binding = _binding_from_managed_identity_row(managed_row) + if binding is not None: + candidates.append(binding) + + visible_candidates: list[dict[str, str]] = [] + for candidate in candidates: + dedupe_key = ( + str(candidate.get("principal_id") or ""), + str(candidate.get("identity_id") or ""), + ) + if dedupe_key in seen_keys: + continue + seen_keys.add(dedupe_key) + stronger_outcome, control_basis = _candidate_control_summary( + candidate=candidate, + permissions_by_principal=permissions_by_principal, + role_assignments_by_principal=role_assignments_by_principal, + ) + if stronger_outcome: + candidate["stronger_outcome"] = stronger_outcome + candidate["control_basis"] = control_basis or "" + visible_candidates.append(candidate) + + if not any(item.get("stronger_outcome") for item in visible_candidates): + return [] + return visible_candidates + + +def _candidate_control_summary( + *, + candidate: dict[str, str], + permissions_by_principal: dict[str, dict], + role_assignments_by_principal: dict[str, list[dict]], +) -> tuple[str | None, str | None]: + principal_id = str(candidate.get("principal_id") or "").strip() + if not principal_id: + return None, None + permission_summary = _permission_control_summary(permissions_by_principal.get(principal_id)) + if permission_summary: + return permission_summary, "permissions" + assignment_summary = _assignment_control_summary( + principal_id, + role_assignments_by_principal, + ) + if assignment_summary: + return assignment_summary, "role-assignment" + return None, None + + +def _normalized_identity_selector(value: str) -> str: + return value.rstrip("/").split("/")[-1].strip().lower() + + +def _display_identity_selector(value: str) -> str: + if "/" not in value: + return value + return value.rstrip("/").split("/")[-1] + + def _assignment_control_summary( principal_id: str, role_assignments_by_principal: dict[str, list[dict]], @@ -211,13 +489,16 @@ def _build_compute_control_record( family_name: str, surface_row: dict, workload_row: dict, - identity_binding: dict[str, str], + identity_binding: dict[str, object], permission_row: dict | None, assignment_summary: str | None, ) -> ChainPathRecord: stronger_outcome = _permission_control_summary(permission_row) or assignment_summary or "-" public_foothold = _has_public_compute_signal(workload_row) binding_source = str(identity_binding.get("binding_source") or "managed-identity") + identity_choice_basis = str(identity_binding.get("identity_choice_basis") or "") + identity_choice_detail = str(identity_binding.get("identity_choice_detail") or "") + mixed_identity_corroborated = bool(identity_choice_basis) if public_foothold: priority = "high" @@ -226,7 +507,23 @@ def _build_compute_control_record( priority = "medium" urgency = "review-soon" - if permission_row is not None and binding_source == "managed-identity": + if mixed_identity_corroborated and permission_row is not None: + confidence_boundary = ( + "Due to mixed identities and the current foothold, AzureFox cannot directly verify " + "which attached identity the raw token path will choose on every request. Another " + "collected workload surface currently points to this identity as the best current " + f"lead, and the stronger Azure control behind it is visible. Specifically, " + f"{identity_choice_detail}" + ) + elif mixed_identity_corroborated: + confidence_boundary = ( + "Due to mixed identities and the current foothold, AzureFox cannot directly verify " + "which attached identity the raw token path will choose on every request. Another " + "collected workload surface currently points to this identity as the best current " + "lead. " + f"Specifically, {identity_choice_detail}" + ) + elif permission_row is not None and binding_source == "managed-identity": confidence_boundary = ( "AzureFox can name the token-capable compute foothold, the attached identity, and " "the stronger Azure control behind it from current scope." @@ -251,15 +548,29 @@ def _build_compute_control_record( "anchor and fuller permission story still need confirmation." ) - next_review = _compute_control_next_review(workload_row) + next_review = _compute_control_next_review( + workload_row, + identity_choice_basis=identity_choice_basis, + ) identity_name = identity_binding["identity_name"] path_type = "direct-token-opportunity" - why_care = ( - f"{workload_row.get('asset_kind')} '{workload_row.get('asset_name')}' can request tokens " - f"as {identity_name}; that identity already maps to {stronger_outcome}." - ) + if mixed_identity_corroborated: + why_care = ( + f"{workload_row.get('asset_kind')} '{workload_row.get('asset_name')}' carries mixed " + f"identities. Current collected workload behavior points to {identity_name} as the " + f"best current lead, and that identity already maps to {stronger_outcome}." + ) + else: + why_care = ( + f"{workload_row.get('asset_kind')} '{workload_row.get('asset_name')}' can request " + "tokens " + f"as {identity_name}; that identity already maps to {stronger_outcome}." + ) evidence_commands = ["tokens-credentials", "workloads"] joined_surface_types = ["managed-identity-token", "workload"] + if mixed_identity_corroborated: + evidence_commands.append("env-vars") + joined_surface_types.append("identity-choice-corroboration") if binding_source == "managed-identity": evidence_commands.append("managed-identities") joined_surface_types.append("identity-anchor") @@ -272,43 +583,186 @@ def _build_compute_control_record( joined_surface_types.append("role-assignment") return ChainPathRecord( - chain_id=( - f"{family_name}::{surface_row.get('asset_id')}::{identity_binding['principal_id']}" - ), - asset_id=str(surface_row.get("asset_id") or ""), - asset_name=str(surface_row.get("asset_name") or surface_row.get("asset_id") or ""), - asset_kind=str(surface_row.get("asset_kind") or workload_row.get("asset_kind") or ""), - location=surface_row.get("location") or workload_row.get("location"), - source_command="tokens-credentials", - source_context=str(surface_row.get("access_path") or ""), - clue_type=str(surface_row.get("surface_type") or ""), - confirmation_basis=( - "permission-join" if permission_row is not None else "role-assignment-join" - ), - priority=priority, - urgency=urgency, - visible_path=str(surface_row.get("summary") or ""), - insertion_point=_compute_control_insertion_point(surface_row, workload_row), - path_concept=path_type, - stronger_outcome=stronger_outcome, - why_care=why_care, - likely_impact=stronger_outcome, - confidence_boundary=confidence_boundary, - target_service="azure-control", - target_resolution="path-confirmed", - evidence_commands=evidence_commands, - joined_surface_types=joined_surface_types, - target_count=1, - target_ids=[identity_binding["identity_id"]], - target_names=[identity_name], - next_review=next_review, - summary=f"{confidence_boundary} {next_review}", - missing_confirmation="", - related_ids=_merge_related_ids( - [str(value) for value in surface_row.get("related_ids") or [] if value], - [str(value) for value in workload_row.get("related_ids") or [] if value], - [identity_binding["identity_id"]], - ), + **_base_compute_control_payload( + surface_row=surface_row, + workload_row=workload_row, + chain_id=f"{family_name}::{surface_row.get('asset_id')}::{identity_binding['principal_id']}", + path_type=path_type, + confirmation_basis=( + "mixed-identity-corroborated-permission-join" + if mixed_identity_corroborated and permission_row is not None + else "mixed-identity-corroborated-role-assignment-join" + if mixed_identity_corroborated + else "permission-join" + if permission_row is not None + else "role-assignment-join" + ), + priority=priority, + urgency=urgency, + stronger_outcome=stronger_outcome, + why_care=why_care, + likely_impact=stronger_outcome, + confidence_boundary=confidence_boundary, + target_resolution=( + "identity-choice-corroborated" if mixed_identity_corroborated else "path-confirmed" + ), + evidence_commands=evidence_commands, + joined_surface_types=joined_surface_types, + target_ids=[str(identity_binding["identity_id"])], + target_names=[str(identity_name)], + next_review=next_review, + missing_confirmation=( + "Current foothold does not directly verify which attached identity the raw " + "token path will choose on every request." + if mixed_identity_corroborated + else "" + ), + related_ids=_merge_related_ids( + [str(value) for value in surface_row.get("related_ids") or [] if value], + [str(value) for value in workload_row.get("related_ids") or [] if value], + [str(identity_binding["identity_id"])], + ), + ) + ) + + +def _base_compute_control_payload( + *, + surface_row: dict, + workload_row: dict, + chain_id: str, + path_type: str, + confirmation_basis: str, + priority: str, + urgency: str, + stronger_outcome: str, + why_care: str, + likely_impact: str, + confidence_boundary: str, + target_resolution: str, + evidence_commands: list[str], + joined_surface_types: list[str], + target_ids: list[str], + target_names: list[str], + next_review: str, + missing_confirmation: str, + related_ids: list[str], +) -> dict[str, object]: + return { + "chain_id": chain_id, + "asset_id": str(surface_row.get("asset_id") or ""), + "asset_name": str(surface_row.get("asset_name") or surface_row.get("asset_id") or ""), + "asset_kind": str(surface_row.get("asset_kind") or workload_row.get("asset_kind") or ""), + "location": surface_row.get("location") or workload_row.get("location"), + "source_command": "tokens-credentials", + "source_context": str(surface_row.get("access_path") or ""), + "clue_type": str(surface_row.get("surface_type") or ""), + "priority": priority, + "urgency": urgency, + "visible_path": str(surface_row.get("summary") or ""), + "insertion_point": _compute_control_insertion_point(surface_row, workload_row), + "path_concept": path_type, + "confirmation_basis": confirmation_basis, + "stronger_outcome": stronger_outcome, + "why_care": why_care, + "likely_impact": likely_impact, + "confidence_boundary": confidence_boundary, + "target_service": "azure-control", + "target_resolution": target_resolution, + "evidence_commands": evidence_commands, + "joined_surface_types": joined_surface_types, + "target_count": len(target_ids), + "target_ids": target_ids, + "target_names": target_names, + "next_review": next_review, + "summary": f"{confidence_boundary} {next_review}", + "missing_confirmation": missing_confirmation, + "related_ids": related_ids, + } + + +def _build_mixed_identity_candidate_record( + *, + family_name: str, + surface_row: dict, + workload_row: dict, + candidate_bindings: list[dict[str, str]], +) -> ChainPathRecord: + control_candidates = [ + item for item in candidate_bindings if str(item.get("stronger_outcome") or "").strip() + ] + stronger_outcome = "; ".join( + f"{item['identity_name']}={item['stronger_outcome']}" for item in control_candidates + ) + control_bases = {str(item.get("control_basis") or "") for item in control_candidates} + binding_sources = {str(item.get("binding_source") or "") for item in candidate_bindings} + confidence_boundary = ( + "Based on the current evidence, this workload can request tokens through mixed attached " + "identities, but AzureFox cannot directly verify which attached identity the raw token " + "path will choose on every request. The attached identities currently in play are listed " + "here instead of a single chosen lead." + ) + next_review = ( + "The current foothold bounds this path to the attached identities shown here; exact " + "per-request identity choice remains unconfirmed." + ) + why_care = ( + f"{workload_row.get('asset_kind')} '{workload_row.get('asset_name')}' carries mixed " + "identities. AzureFox cannot yet defend one chosen identity, but visible Azure control " + f"currently maps to {stronger_outcome}." + ) + + evidence_commands = ["tokens-credentials", "workloads"] + joined_surface_types = ["managed-identity-token", "workload"] + if "managed-identity" in binding_sources: + evidence_commands.append("managed-identities") + joined_surface_types.append("identity-anchor") + if "workload-principal" in binding_sources: + joined_surface_types.append("workload-principal") + if "permissions" in control_bases: + evidence_commands.append("permissions") + joined_surface_types.append("permissions") + if "role-assignment" in control_bases: + joined_surface_types.append("role-assignment") + + target_ids = [ + str(item.get("identity_id") or "") for item in candidate_bindings if item.get("identity_id") + ] + target_names = [ + str(item.get("identity_name") or "") + for item in candidate_bindings + if item.get("identity_name") + ] + + return ChainPathRecord( + **_base_compute_control_payload( + surface_row=surface_row, + workload_row=workload_row, + chain_id=f"{family_name}::{surface_row.get('asset_id')}::mixed-identities", + path_type="direct-token-opportunity", + confirmation_basis="mixed-identity-attached-candidates", + priority="high" if _has_public_compute_signal(workload_row) else "medium", + urgency="pivot-now" if _has_public_compute_signal(workload_row) else "review-soon", + stronger_outcome=stronger_outcome, + why_care=why_care, + likely_impact=stronger_outcome, + confidence_boundary=confidence_boundary, + target_resolution="narrowed candidates", + evidence_commands=evidence_commands, + joined_surface_types=joined_surface_types, + target_ids=target_ids, + target_names=target_names, + next_review=next_review, + missing_confirmation=( + "Current foothold does not directly verify which attached identity the raw " + "token path will choose on every request." + ), + related_ids=_merge_related_ids( + [str(value) for value in surface_row.get("related_ids") or [] if value], + [str(value) for value in workload_row.get("related_ids") or [] if value], + target_ids, + ), + ) ) @@ -325,7 +779,7 @@ def _compute_control_insertion_point(surface_row: dict, workload_row: dict) -> s return access_path or "token-capable compute path" -def _compute_control_next_review(workload_row: dict) -> str: +def _compute_control_next_review(workload_row: dict, *, identity_choice_basis: str = "") -> str: asset_kind = str(workload_row.get("asset_kind") or "") if asset_kind == "VM": return ( @@ -343,6 +797,12 @@ def _compute_control_next_review(workload_row: dict) -> str: "scope on the attached identity." ) if asset_kind == "FunctionApp": + if identity_choice_basis: + return ( + "Current collected workload configuration already narrows this path to the " + "identity shown here; exact per-request token choice remains bounded by the " + "current foothold." + ) return ( "Check functions for the running service foothold, then permissions for exact scope " "on the attached identity." diff --git a/src/azurefox/chains/registry.py b/src/azurefox/chains/registry.py index 566d584..76945c6 100644 --- a/src/azurefox/chains/registry.py +++ b/src/azurefox/chains/registry.py @@ -394,12 +394,13 @@ class ChainFamilySpec: ), current_gap=( "The live family is intentionally narrow in v1: direct token-opportunity rows only. " - "Broader trust expansion, secret-bearing config starts, and mixed-identity workloads " - "still need clearer admission rules or a different family boundary." + "Broader trust expansion and secret-bearing config starts still sit outside this " + "family, and mixed-identity workloads still need explicit corroboration before " + "default admission." ), best_current_examples=( "tokens-credentials -> managed-identities -> permissions", - "workloads -> tokens-credentials -> permissions", + "tokens-credentials -> env-vars -> managed-identities -> permissions", ), source_commands=( ChainSourceSpec( @@ -417,6 +418,21 @@ class ChainFamilySpec: "or request tokens." ), ), + ChainSourceSpec( + command="env-vars", + minimum_fields=( + "asset_id", + "asset_name", + "setting_name", + "value_type", + "key_vault_reference_identity", + "workload_identity_ids", + ), + rationale=( + "Provides workload configuration clues that can explicitly name which " + "attached identity a mixed-identity web workload is using." + ), + ), ChainSourceSpec( command="workloads", minimum_fields=( diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/tests/__init__.py @@ -0,0 +1 @@ + diff --git a/tests/fixtures/lab_tenant/permissions.json b/tests/fixtures/lab_tenant/permissions.json index 8d813eb..963ef76 100644 --- a/tests/fixtures/lab_tenant/permissions.json +++ b/tests/fixtures/lab_tenant/permissions.json @@ -51,6 +51,60 @@ ], "privileged": true, "is_current_identity": false + }, + { + "principal_id": "cccc2222-2222-2222-2222-222222222222", + "display_name": "func-orders-system", + "principal_type": "ServicePrincipal", + "high_impact_roles": [ + "Contributor" + ], + "all_role_names": [ + "Contributor" + ], + "role_assignment_count": 1, + "scope_count": 1, + "scope_ids": [ + "/subscriptions/22222222-2222-2222-2222-222222222222/resourceGroups/rg-apps" + ], + "privileged": true, + "is_current_identity": false + }, + { + "principal_id": "eeee3333-3333-3333-3333-333333333333", + "display_name": "app-empty-mi-system", + "principal_type": "ServicePrincipal", + "high_impact_roles": [ + "Contributor" + ], + "all_role_names": [ + "Contributor" + ], + "role_assignment_count": 1, + "scope_count": 1, + "scope_ids": [ + "/subscriptions/22222222-2222-2222-2222-222222222222/resourceGroups/rg-apps" + ], + "privileged": true, + "is_current_identity": false + }, + { + "principal_id": "77770000-0000-0000-0000-000000000001", + "display_name": "vmss-edge-01-system", + "principal_type": "ServicePrincipal", + "high_impact_roles": [ + "Contributor" + ], + "all_role_names": [ + "Contributor" + ], + "role_assignment_count": 1, + "scope_count": 1, + "scope_ids": [ + "/subscriptions/22222222-2222-2222-2222-222222222222/resourceGroups/rg-workload" + ], + "privileged": true, + "is_current_identity": false } ], "issues": [] diff --git a/tests/fixtures/lab_tenant/principals.json b/tests/fixtures/lab_tenant/principals.json index d4d0ee2..9ecc918 100644 --- a/tests/fixtures/lab_tenant/principals.json +++ b/tests/fixtures/lab_tenant/principals.json @@ -47,6 +47,84 @@ "identity_types": [], "attached_to": [], "is_current_identity": false + }, + { + "id": "cccc2222-2222-2222-2222-222222222222", + "principal_type": "ServicePrincipal", + "display_name": "func-orders-system", + "tenant_id": "11111111-1111-1111-1111-111111111111", + "sources": [ + "managed-identities" + ], + "scope_ids": [ + "/subscriptions/22222222-2222-2222-2222-222222222222/resourceGroups/rg-apps" + ], + "role_names": [ + "Contributor" + ], + "role_assignment_count": 1, + "identity_names": [ + "func-orders-system" + ], + "identity_types": [ + "systemAssigned" + ], + "attached_to": [ + "/subscriptions/22222222-2222-2222-2222-222222222222/resourceGroups/rg-apps/providers/Microsoft.Web/sites/func-orders" + ], + "is_current_identity": false + }, + { + "id": "eeee3333-3333-3333-3333-333333333333", + "principal_type": "ServicePrincipal", + "display_name": "app-empty-mi-system", + "tenant_id": "11111111-1111-1111-1111-111111111111", + "sources": [ + "managed-identities" + ], + "scope_ids": [ + "/subscriptions/22222222-2222-2222-2222-222222222222/resourceGroups/rg-apps" + ], + "role_names": [ + "Contributor" + ], + "role_assignment_count": 1, + "identity_names": [ + "app-empty-mi-system" + ], + "identity_types": [ + "systemAssigned" + ], + "attached_to": [ + "/subscriptions/22222222-2222-2222-2222-222222222222/resourceGroups/rg-apps/providers/Microsoft.Web/sites/app-empty-mi" + ], + "is_current_identity": false + }, + { + "id": "77770000-0000-0000-0000-000000000001", + "principal_type": "ServicePrincipal", + "display_name": "vmss-edge-01-system", + "tenant_id": "11111111-1111-1111-1111-111111111111", + "sources": [ + "managed-identities" + ], + "scope_ids": [ + "/subscriptions/22222222-2222-2222-2222-222222222222/resourceGroups/rg-workload" + ], + "role_names": [ + "Contributor" + ], + "role_assignment_count": 1, + "identity_names": [ + "vmss-edge-01-system" + ], + "identity_types": [ + "systemAssigned" + ], + "attached_to": [ + "/subscriptions/22222222-2222-2222-2222-222222222222/resourceGroups/rg-workload/providers/Microsoft.Compute/virtualMachineScaleSets/vmss-edge-01" + ], + "is_current_identity": false } ], "issues": [] diff --git a/tests/golden/permissions.json b/tests/golden/permissions.json index 34566ef..d487a4a 100644 --- a/tests/golden/permissions.json +++ b/tests/golden/permissions.json @@ -4,7 +4,7 @@ "auth_mode": null, "command": "permissions", "devops_organization": null, - "generated_at": "", + "generated_at": "2026-04-12T18:56:06.241502Z", "schema_version": "1.3.0", "subscription_id": "22222222-2222-2222-2222-222222222222", "tenant_id": "11111111-1111-1111-1111-111111111111", @@ -22,9 +22,9 @@ "is_current_identity": true, "next_review": "Check privesc for the direct abuse or escalation path behind this current identity.", "operator_signal": "Direct control visible; current foothold.", - "priority": "high", "principal_id": "33333333-3333-3333-3333-333333333333", "principal_type": "ServicePrincipal", + "priority": "high", "privileged": true, "role_assignment_count": 1, "scope_count": 1, @@ -33,6 +33,72 @@ ], "summary": "Current identity 'azurefox-lab-sp' already has direct control visible through Owner across subscription-wide. Check privesc for the direct abuse or escalation path behind this current identity." }, + { + "all_role_names": [ + "Contributor" + ], + "display_name": "app-empty-mi-system", + "high_impact_roles": [ + "Contributor" + ], + "is_current_identity": false, + "next_review": "Check managed-identities for the workload pivot behind this direct control row.", + "operator_signal": "Direct control visible; workload pivot visible.", + "principal_id": "eeee3333-3333-3333-3333-333333333333", + "principal_type": "ServicePrincipal", + "priority": "high", + "privileged": true, + "role_assignment_count": 1, + "scope_count": 1, + "scope_ids": [ + "/subscriptions/22222222-2222-2222-2222-222222222222/resourceGroups/rg-apps" + ], + "summary": "ServicePrincipal 'app-empty-mi-system' already has direct control visible through Contributor across subscription-wide, and current scope also shows a workload pivot. Check managed-identities for the workload pivot behind this direct control row." + }, + { + "all_role_names": [ + "Contributor" + ], + "display_name": "func-orders-system", + "high_impact_roles": [ + "Contributor" + ], + "is_current_identity": false, + "next_review": "Check managed-identities for the workload pivot behind this direct control row.", + "operator_signal": "Direct control visible; workload pivot visible.", + "principal_id": "cccc2222-2222-2222-2222-222222222222", + "principal_type": "ServicePrincipal", + "priority": "high", + "privileged": true, + "role_assignment_count": 1, + "scope_count": 1, + "scope_ids": [ + "/subscriptions/22222222-2222-2222-2222-222222222222/resourceGroups/rg-apps" + ], + "summary": "ServicePrincipal 'func-orders-system' already has direct control visible through Contributor across subscription-wide, and current scope also shows a workload pivot. Check managed-identities for the workload pivot behind this direct control row." + }, + { + "all_role_names": [ + "Contributor" + ], + "display_name": "vmss-edge-01-system", + "high_impact_roles": [ + "Contributor" + ], + "is_current_identity": false, + "next_review": "Check managed-identities for the workload pivot behind this direct control row.", + "operator_signal": "Direct control visible; workload pivot visible.", + "principal_id": "77770000-0000-0000-0000-000000000001", + "principal_type": "ServicePrincipal", + "priority": "high", + "privileged": true, + "role_assignment_count": 1, + "scope_count": 1, + "scope_ids": [ + "/subscriptions/22222222-2222-2222-2222-222222222222/resourceGroups/rg-workload" + ], + "summary": "ServicePrincipal 'vmss-edge-01-system' already has direct control visible through Contributor across subscription-wide, and current scope also shows a workload pivot. Check managed-identities for the workload pivot behind this direct control row." + }, { "all_role_names": [ "Contributor" @@ -44,9 +110,9 @@ "is_current_identity": false, "next_review": "Check role-trusts for trust expansion around who can influence this principal.", "operator_signal": "Direct control visible; trust expansion follow-on.", - "priority": "medium", "principal_id": "12121212-1212-1212-1212-121212121212", "principal_type": "ServicePrincipal", + "priority": "medium", "privileged": true, "role_assignment_count": 1, "scope_count": 1, @@ -64,9 +130,9 @@ "is_current_identity": false, "next_review": "Check rbac for the exact assignment evidence behind this lower-signal row.", "operator_signal": "Direct control not confirmed.", - "priority": "low", "principal_id": "44444444-4444-4444-4444-444444444444", "principal_type": "User", + "priority": "low", "privileged": false, "role_assignment_count": 1, "scope_count": 1, diff --git a/tests/golden/principals.json b/tests/golden/principals.json index 7aa30b7..3cc02a8 100644 --- a/tests/golden/principals.json +++ b/tests/golden/principals.json @@ -1,8 +1,10 @@ { "issues": [], "metadata": { + "auth_mode": null, "command": "principals", - "generated_at": "", + "devops_organization": null, + "generated_at": "2026-04-12T18:56:06.240867Z", "schema_version": "1.3.0", "subscription_id": "22222222-2222-2222-2222-222222222222", "tenant_id": "11111111-1111-1111-1111-111111111111", @@ -56,6 +58,84 @@ "rbac" ], "tenant_id": "11111111-1111-1111-1111-111111111111" + }, + { + "attached_to": [ + "/subscriptions/22222222-2222-2222-2222-222222222222/resourceGroups/rg-apps/providers/Microsoft.Web/sites/func-orders" + ], + "display_name": "func-orders-system", + "id": "cccc2222-2222-2222-2222-222222222222", + "identity_names": [ + "func-orders-system" + ], + "identity_types": [ + "systemAssigned" + ], + "is_current_identity": false, + "principal_type": "ServicePrincipal", + "role_assignment_count": 1, + "role_names": [ + "Contributor" + ], + "scope_ids": [ + "/subscriptions/22222222-2222-2222-2222-222222222222/resourceGroups/rg-apps" + ], + "sources": [ + "managed-identities" + ], + "tenant_id": "11111111-1111-1111-1111-111111111111" + }, + { + "attached_to": [ + "/subscriptions/22222222-2222-2222-2222-222222222222/resourceGroups/rg-apps/providers/Microsoft.Web/sites/app-empty-mi" + ], + "display_name": "app-empty-mi-system", + "id": "eeee3333-3333-3333-3333-333333333333", + "identity_names": [ + "app-empty-mi-system" + ], + "identity_types": [ + "systemAssigned" + ], + "is_current_identity": false, + "principal_type": "ServicePrincipal", + "role_assignment_count": 1, + "role_names": [ + "Contributor" + ], + "scope_ids": [ + "/subscriptions/22222222-2222-2222-2222-222222222222/resourceGroups/rg-apps" + ], + "sources": [ + "managed-identities" + ], + "tenant_id": "11111111-1111-1111-1111-111111111111" + }, + { + "attached_to": [ + "/subscriptions/22222222-2222-2222-2222-222222222222/resourceGroups/rg-workload/providers/Microsoft.Compute/virtualMachineScaleSets/vmss-edge-01" + ], + "display_name": "vmss-edge-01-system", + "id": "77770000-0000-0000-0000-000000000001", + "identity_names": [ + "vmss-edge-01-system" + ], + "identity_types": [ + "systemAssigned" + ], + "is_current_identity": false, + "principal_type": "ServicePrincipal", + "role_assignment_count": 1, + "role_names": [ + "Contributor" + ], + "scope_ids": [ + "/subscriptions/22222222-2222-2222-2222-222222222222/resourceGroups/rg-workload" + ], + "sources": [ + "managed-identities" + ], + "tenant_id": "11111111-1111-1111-1111-111111111111" } ] } diff --git a/tests/test_cli_smoke.py b/tests/test_cli_smoke.py index 4171e11..6954082 100644 --- a/tests/test_cli_smoke.py +++ b/tests/test_cli_smoke.py @@ -354,23 +354,59 @@ def test_cli_smoke_chains_compute_control_json(tmp_path: Path) -> None: assert payload["source_artifacts"] == [] assert payload["backing_commands"] == [ "tokens-credentials", + "env-vars", "workloads", "managed-identities", "permissions", ] - assert len(payload["paths"]) == 1 - row = payload["paths"][0] - assert row["asset_name"] == "vm-web-01" - assert row["asset_kind"] == "VM" - assert row["path_concept"] == "direct-token-opportunity" - assert row["insertion_point"] == "public IMDS token path" - assert row["stronger_outcome"] == "Owner across subscription-wide scope" - assert row["priority"] == "high" - assert row["urgency"] == "pivot-now" - assert row["target_service"] == "azure-control" - assert "token-capable compute foothold" in row["confidence_boundary"] - assert "Check vms for the host foothold" in row["next_review"] - assert "can request tokens as ua-app" in row["why_care"] + assert len(payload["paths"]) == 4 + names = [item["asset_name"] for item in payload["paths"]] + assert names == ["app-empty-mi", "func-orders", "vm-web-01", "vmss-edge-01"] + assert {item["target_resolution"] for item in payload["paths"]} == { + "path-confirmed", + "identity-choice-corroborated", + } + + app_row = next(item for item in payload["paths"] if item["asset_name"] == "app-empty-mi") + assert app_row["asset_kind"] == "AppService" + assert app_row["path_concept"] == "direct-token-opportunity" + assert app_row["priority"] == "high" + assert app_row["urgency"] == "pivot-now" + assert app_row["stronger_outcome"] == "Contributor across subscription-wide scope" + assert "Check app-services for the running service foothold" in app_row["next_review"] + + func_row = next(item for item in payload["paths"] if item["asset_name"] == "func-orders") + assert func_row["asset_kind"] == "FunctionApp" + assert func_row["path_concept"] == "direct-token-opportunity" + assert func_row["priority"] == "high" + assert func_row["urgency"] == "pivot-now" + assert func_row["target_names"] == ["func-orders-system"] + assert func_row["target_resolution"] == "identity-choice-corroborated" + assert func_row["confirmation_basis"] == "mixed-identity-corroborated-permission-join" + assert "cannot directly verify" in func_row["confidence_boundary"] + assert "SystemAssigned" in func_row["confidence_boundary"] + assert "does not directly verify" in func_row["missing_confirmation"] + assert "already narrows this path to the identity shown here" in func_row["next_review"] + + vm_row = next(item for item in payload["paths"] if item["asset_name"] == "vm-web-01") + assert vm_row["asset_kind"] == "VM" + assert vm_row["path_concept"] == "direct-token-opportunity" + assert vm_row["insertion_point"] == "public IMDS token path" + assert vm_row["stronger_outcome"] == "Owner across subscription-wide scope" + assert vm_row["priority"] == "high" + assert vm_row["urgency"] == "pivot-now" + assert vm_row["target_service"] == "azure-control" + assert "token-capable compute foothold" in vm_row["confidence_boundary"] + assert "Check vms for the host foothold" in vm_row["next_review"] + assert "can request tokens as ua-app" in vm_row["why_care"] + + vmss_row = next(item for item in payload["paths"] if item["asset_name"] == "vmss-edge-01") + assert vmss_row["asset_kind"] == "VMSS" + assert vmss_row["path_concept"] == "direct-token-opportunity" + assert vmss_row["priority"] == "medium" + assert vmss_row["urgency"] == "review-soon" + assert vmss_row["stronger_outcome"] == "Contributor across subscription-wide scope" + assert "Check vmss for the fleet foothold" in vmss_row["next_review"] def test_cli_smoke_chains_escalation_path_json(tmp_path: Path) -> None: @@ -433,6 +469,7 @@ def test_cli_smoke_chains_compute_control_table_output(tmp_path: Path) -> None: ["--outdir", str(tmp_path), "chains", "compute-control"], env={"AZUREFOX_FIXTURE_DIR": str(fixture_dir)}, ) + normalized_output = " ".join(result.stdout.split()).lower() assert result.exit_code == 0 assert "azurefox chains" in result.stdout @@ -441,13 +478,19 @@ def test_cli_smoke_chains_compute_control_table_output(tmp_path: Path) -> None: assert "insertion point" in result.stdout assert "visible azure control" in result.stdout assert "confidence boundary" in result.stdout + assert "app-empty-mi" in result.stdout + assert "func-orders" in result.stdout assert "vm-web-01" in result.stdout + assert "vmss-edge-01" in result.stdout assert "direct token opportunity" in result.stdout assert "public imds token path" in result.stdout.lower() assert "Owner across subscription-wide scope" in result.stdout + assert "mixed identities" in normalized_output + assert "current foothold" in normalized_output assert "Claim boundary:" in result.stdout assert "Current gap:" in result.stdout - assert "Takeaway: 1 visible compute-control paths" in result.stdout + assert "Takeaway: 4 visible compute-control paths" in result.stdout + assert "narrowed candidates" not in normalized_output def test_cli_smoke_deployment_path_operator_language_guard(tmp_path: Path) -> None: @@ -571,7 +614,7 @@ def test_cli_smoke_chains_help_csv_matches_overview(tmp_path: Path) -> None: assert overview_csv == help_csv -def test_cli_smoke_loot_uses_semantic_high_band_for_tokens_credentials(tmp_path: Path) -> None: +def test_cli_smoke_loot_artifact_written_end_to_end(tmp_path: Path) -> None: fixture_dir = Path(__file__).resolve().parent / "fixtures" / "lab_tenant" result = runner.invoke( @@ -606,79 +649,6 @@ def test_cli_smoke_loot_uses_semantic_high_band_for_tokens_credentials(tmp_path: } -def test_cli_smoke_loot_uses_semantic_high_band_for_cross_tenant(tmp_path: Path) -> None: - fixture_dir = Path(__file__).resolve().parent / "fixtures" / "lab_tenant" - - result = runner.invoke( - app, - ["--outdir", str(tmp_path), "--output", "json", "cross-tenant"], - env={"AZUREFOX_FIXTURE_DIR": str(fixture_dir)}, - ) - - assert result.exit_code == 0 - json_payload = json.loads((tmp_path / "json" / "cross-tenant.json").read_text(encoding="utf-8")) - loot_payload = json.loads((tmp_path / "loot" / "cross-tenant.json").read_text(encoding="utf-8")) - - assert len(json_payload["cross_tenant_paths"]) == 4 - assert len(loot_payload["cross_tenant_paths"]) == 3 - assert {row["priority"] for row in loot_payload["cross_tenant_paths"]} == {"high"} - assert loot_payload["loot_scope"] == { - "selection": "semantic-high-priority", - "priority_band": "high", - "source_count": len(json_payload["cross_tenant_paths"]), - "returned_count": 3, - } - - -def test_cli_smoke_loot_uses_semantic_high_band_for_permissions(tmp_path: Path) -> None: - fixture_dir = Path(__file__).resolve().parent / "fixtures" / "lab_tenant" - - result = runner.invoke( - app, - ["--outdir", str(tmp_path), "--output", "json", "permissions"], - env={"AZUREFOX_FIXTURE_DIR": str(fixture_dir)}, - ) - - assert result.exit_code == 0 - json_payload = json.loads((tmp_path / "json" / "permissions.json").read_text(encoding="utf-8")) - loot_payload = json.loads((tmp_path / "loot" / "permissions.json").read_text(encoding="utf-8")) - - assert [row["priority"] for row in json_payload["permissions"]] == ["high", "medium", "low"] - assert len(loot_payload["permissions"]) == 1 - assert {row["priority"] for row in loot_payload["permissions"]} == {"high"} - assert loot_payload["loot_scope"] == { - "selection": "semantic-high-priority", - "priority_band": "high", - "source_count": len(json_payload["permissions"]), - "returned_count": 1, - } - - -def test_cli_smoke_loot_uses_semantic_high_band_for_privesc(tmp_path: Path) -> None: - fixture_dir = Path(__file__).resolve().parent / "fixtures" / "lab_tenant" - - result = runner.invoke( - app, - ["--outdir", str(tmp_path), "--output", "json", "privesc"], - env={"AZUREFOX_FIXTURE_DIR": str(fixture_dir)}, - ) - - assert result.exit_code == 0 - json_payload = json.loads((tmp_path / "json" / "privesc.json").read_text(encoding="utf-8")) - loot_payload = json.loads((tmp_path / "loot" / "privesc.json").read_text(encoding="utf-8")) - - assert [row["priority"] for row in json_payload["paths"]] == ["high", "medium"] - assert [row["severity"] for row in json_payload["paths"]] == ["high", "high"] - assert len(loot_payload["paths"]) == 1 - assert {row["priority"] for row in loot_payload["paths"]} == {"high"} - assert loot_payload["loot_scope"] == { - "selection": "semantic-high-priority", - "priority_band": "high", - "source_count": len(json_payload["paths"]), - "returned_count": 1, - } - - def test_cli_smoke_devops_accepts_organization_after_command(tmp_path: Path) -> None: fixture_dir = Path(__file__).resolve().parent / "fixtures" / "lab_tenant" diff --git a/tests/test_collectors.py b/tests/test_collectors.py index 01b6f0d..ef6a575 100644 --- a/tests/test_collectors.py +++ b/tests/test_collectors.py @@ -3341,9 +3341,16 @@ def test_collect_rbac(fixture_provider, options) -> None: def test_collect_principals(fixture_provider, options) -> None: output = collect_principals(fixture_provider, options) - assert len(output.principals) == 2 + assert len(output.principals) == 5 assert output.principals[0].is_current_identity is True assert "ua-app" in output.principals[0].identity_names + assert {item.display_name for item in output.principals} == { + "azurefox-lab-sp", + "operator@lab.local", + "func-orders-system", + "app-empty-mi-system", + "vmss-edge-01-system", + } def test_principal_sort_key_prioritizes_high_impact_then_workload_attachment() -> None: @@ -3388,7 +3395,7 @@ def test_principal_sort_key_prioritizes_high_impact_then_workload_attachment() - def test_collect_permissions(fixture_provider, options) -> None: output = collect_permissions(fixture_provider, options) - assert len(output.permissions) == 3 + assert len(output.permissions) == 6 assert output.permissions[0].priority == "high" assert output.permissions[0].privileged is True assert output.permissions[0].high_impact_roles == ["Owner"] @@ -3398,12 +3405,27 @@ def test_collect_permissions(fixture_provider, options) -> None: == "Check privesc for the direct abuse or escalation path behind this current identity." ) assert "direct control visible" in (output.permissions[0].summary or "").lower() - assert output.permissions[1].display_name == "aa-hybrid-prod-mi" - assert output.permissions[1].priority == "medium" - assert output.permissions[1].high_impact_roles == ["Contributor"] - assert output.permissions[1].next_review == ( + by_name = {item.display_name: item for item in output.permissions} + assert by_name["aa-hybrid-prod-mi"].priority == "medium" + assert by_name["aa-hybrid-prod-mi"].high_impact_roles == ["Contributor"] + assert by_name["aa-hybrid-prod-mi"].next_review == ( "Check role-trusts for trust expansion around who can influence this principal." ) + assert by_name["app-empty-mi-system"].operator_signal == ( + "Direct control visible; workload pivot visible." + ) + assert by_name["app-empty-mi-system"].next_review == ( + "Check managed-identities for the workload pivot behind this direct control row." + ) + assert by_name["func-orders-system"].operator_signal == ( + "Direct control visible; workload pivot visible." + ) + assert by_name["func-orders-system"].next_review == ( + "Check managed-identities for the workload pivot behind this direct control row." + ) + assert by_name["vmss-edge-01-system"].operator_signal == ( + "Direct control visible; workload pivot visible." + ) def test_collect_permissions_prefers_workload_pivot_then_trust_expansion() -> None: diff --git a/tests/test_compute_control.py b/tests/test_compute_control.py index 574e22d..15cc827 100644 --- a/tests/test_compute_control.py +++ b/tests/test_compute_control.py @@ -2,33 +2,49 @@ from azurefox.chains.compute_control import collect_compute_control_records from azurefox.models.commands import ( + EnvVarsOutput, ManagedIdentitiesOutput, PermissionsOutput, TokensCredentialsOutput, WorkloadsOutput, ) from azurefox.models.common import ( + CollectionIssue, CommandMetadata, + EnvVarSummary, ManagedIdentity, PermissionSummary, RoleAssignment, TokenCredentialSurfaceSummary, WorkloadSummary, ) +from tests.truthfulness import ( + assert_issue_collectors_include, + assert_rows_exclude, + assert_rows_include, + row_by_field, +) def _metadata(command: str) -> CommandMetadata: return CommandMetadata(command=command) -def test_compute_control_admits_system_assigned_workload_via_workload_principal() -> None: - loaded = { +def _base_loaded_app_service( + *, + asset_name: str, + principal_id: str, + permission: PermissionSummary | None, + identities: list[ManagedIdentity] | None = None, + managed_identity_issues: list[CollectionIssue] | None = None, +) -> dict[str, object]: + return { "tokens-credentials": TokensCredentialsOutput( metadata=_metadata("tokens-credentials"), surfaces=[ TokenCredentialSurfaceSummary( - asset_id="/subscriptions/sub/resourceGroups/rg-apps/providers/Microsoft.Web/sites/app-public-api", - asset_name="app-public-api", + asset_id=f"/subscriptions/sub/resourceGroups/rg-apps/providers/Microsoft.Web/sites/{asset_name}", + asset_name=asset_name, asset_kind="AppService", resource_group="rg-apps", location="eastus", @@ -38,65 +54,80 @@ def test_compute_control_admits_system_assigned_workload_via_workload_principal( operator_signal="SystemAssigned", summary="App Service can request tokens through its attached identity.", related_ids=[ - "/subscriptions/sub/resourceGroups/rg-apps/providers/Microsoft.Web/sites/app-public-api", - "aaaa1111-1111-1111-1111-111111111111", + f"/subscriptions/sub/resourceGroups/rg-apps/providers/Microsoft.Web/sites/{asset_name}", + principal_id, ], ) ], + issues=[], ), "workloads": WorkloadsOutput( metadata=_metadata("workloads"), workloads=[ WorkloadSummary( - asset_id="/subscriptions/sub/resourceGroups/rg-apps/providers/Microsoft.Web/sites/app-public-api", - asset_name="app-public-api", + asset_id=f"/subscriptions/sub/resourceGroups/rg-apps/providers/Microsoft.Web/sites/{asset_name}", + asset_name=asset_name, asset_kind="AppService", resource_group="rg-apps", location="eastus", identity_type="SystemAssigned", - identity_principal_id="aaaa1111-1111-1111-1111-111111111111", - endpoints=["app-public-api.azurewebsites.net"], + identity_principal_id=principal_id, + endpoints=[f"{asset_name}.azurewebsites.net"], ingress_paths=["azurewebsites-default-hostname"], exposure_families=["managed-web-hostname"], summary=( "App Service exposes a reachable hostname and carries a system identity." ), related_ids=[ - "/subscriptions/sub/resourceGroups/rg-apps/providers/Microsoft.Web/sites/app-public-api", - "aaaa1111-1111-1111-1111-111111111111", + f"/subscriptions/sub/resourceGroups/rg-apps/providers/Microsoft.Web/sites/{asset_name}", + principal_id, ], ) ], + issues=[], ), "managed-identities": ManagedIdentitiesOutput( metadata=_metadata("managed-identities"), - identities=[], + identities=identities or [], role_assignments=[], + issues=managed_identity_issues or [], + ), + "env-vars": EnvVarsOutput( + metadata=_metadata("env-vars"), + env_vars=[], + issues=[], ), "permissions": PermissionsOutput( metadata=_metadata("permissions"), - permissions=[ - PermissionSummary( - principal_id="aaaa1111-1111-1111-1111-111111111111", - display_name="app-public-api-system", - principal_type="ServicePrincipal", - priority="high", - high_impact_roles=["Contributor"], - all_role_names=["Contributor"], - role_assignment_count=1, - scope_count=1, - scope_ids=["/subscriptions/sub/resourceGroups/rg-apps"], - privileged=True, - ) - ], + permissions=[permission] if permission is not None else [], + issues=[], ), } + +def test_compute_control_admits_system_assigned_workload_via_workload_principal() -> None: + loaded = _base_loaded_app_service( + asset_name="app-public-api", + principal_id="aaaa1111-1111-1111-1111-111111111111", + permission=PermissionSummary( + principal_id="aaaa1111-1111-1111-1111-111111111111", + display_name="app-public-api-system", + principal_type="ServicePrincipal", + priority="high", + high_impact_roles=["Contributor"], + all_role_names=["Contributor"], + role_assignment_count=1, + scope_count=1, + scope_ids=["/subscriptions/sub/resourceGroups/rg-apps"], + privileged=True, + ), + ) + paths, issues = collect_compute_control_records("compute-control", loaded) assert not issues - assert len(paths) == 1 - row = paths[0] + assert_rows_include(paths, field="asset_name", expected=["app-public-api"]) + row = row_by_field(paths, field="asset_name", expected="app-public-api") assert row.asset_name == "app-public-api" assert row.insertion_point == "reachable service token request path" assert row.target_names == ["app-public-api system identity"] @@ -111,24 +142,80 @@ def test_compute_control_admits_system_assigned_workload_via_workload_principal( def test_compute_control_prefers_explicit_system_identity_anchor_when_present() -> None: + loaded = _base_loaded_app_service( + asset_name="app-public-api", + principal_id="aaaa1111-1111-1111-1111-111111111111", + permission=PermissionSummary( + principal_id="aaaa1111-1111-1111-1111-111111111111", + display_name="app-public-api-system", + principal_type="ServicePrincipal", + priority="high", + high_impact_roles=["Contributor"], + all_role_names=["Contributor"], + role_assignment_count=1, + scope_count=1, + scope_ids=["/subscriptions/sub/resourceGroups/rg-apps"], + privileged=True, + ), + identities=[ + ManagedIdentity( + id="/subscriptions/sub/resourceGroups/rg-apps/providers/Microsoft.Web/sites/app-public-api/identities/system", + name="app-public-api-system", + identity_type="systemAssigned", + principal_id="aaaa1111-1111-1111-1111-111111111111", + attached_to=[ + "/subscriptions/sub/resourceGroups/rg-apps/providers/Microsoft.Web/sites/app-public-api" + ], + ) + ], + ) + + paths, issues = collect_compute_control_records("compute-control", loaded) + + assert not issues + assert_rows_include(paths, field="asset_name", expected=["app-public-api"]) + row = row_by_field(paths, field="asset_name", expected="app-public-api") + assert row.target_ids == [ + "/subscriptions/sub/resourceGroups/rg-apps/providers/Microsoft.Web/sites/app-public-api/identities/system" + ] + assert row.target_names == ["app-public-api-system"] + assert row.evidence_commands == [ + "tokens-credentials", + "workloads", + "managed-identities", + "permissions", + ] + assert row.joined_surface_types == [ + "managed-identity-token", + "workload", + "identity-anchor", + "permissions", + ] + assert "the attached identity" in (row.confidence_boundary or "") + + +def test_compute_control_emits_bounded_mixed_identity_candidates_when_actor_is_not_explicit() -> ( + None +): loaded = { "tokens-credentials": TokensCredentialsOutput( metadata=_metadata("tokens-credentials"), surfaces=[ TokenCredentialSurfaceSummary( - asset_id="/subscriptions/sub/resourceGroups/rg-apps/providers/Microsoft.Web/sites/app-public-api", - asset_name="app-public-api", - asset_kind="AppService", + asset_id="/subscriptions/sub/resourceGroups/rg-apps/providers/Microsoft.Web/sites/func-orders", + asset_name="func-orders", + asset_kind="FunctionApp", resource_group="rg-apps", location="eastus", surface_type="managed-identity-token", access_path="workload-identity", priority="medium", - operator_signal="SystemAssigned", - summary="App Service can request tokens through its attached identity.", + operator_signal="SystemAssigned, UserAssigned; user-assigned=1", + summary="Function App can request tokens through multiple attached identities.", related_ids=[ - "/subscriptions/sub/resourceGroups/rg-apps/providers/Microsoft.Web/sites/app-public-api", - "aaaa1111-1111-1111-1111-111111111111", + "/subscriptions/sub/resourceGroups/rg-apps/providers/Microsoft.Web/sites/func-orders", + "cccc2222-2222-2222-2222-222222222222", + "/subscriptions/sub/resourceGroups/rg-identities/providers/Microsoft.ManagedIdentity/userAssignedIdentities/ua-orders", ], ) ], @@ -137,22 +224,24 @@ def test_compute_control_prefers_explicit_system_identity_anchor_when_present() metadata=_metadata("workloads"), workloads=[ WorkloadSummary( - asset_id="/subscriptions/sub/resourceGroups/rg-apps/providers/Microsoft.Web/sites/app-public-api", - asset_name="app-public-api", - asset_kind="AppService", + asset_id="/subscriptions/sub/resourceGroups/rg-apps/providers/Microsoft.Web/sites/func-orders", + asset_name="func-orders", + asset_kind="FunctionApp", resource_group="rg-apps", location="eastus", - identity_type="SystemAssigned", - identity_principal_id="aaaa1111-1111-1111-1111-111111111111", - endpoints=["app-public-api.azurewebsites.net"], - ingress_paths=["azurewebsites-default-hostname"], + identity_type="SystemAssigned, UserAssigned", + identity_principal_id="cccc2222-2222-2222-2222-222222222222", + identity_ids=[ + "/subscriptions/sub/resourceGroups/rg-identities/providers/Microsoft.ManagedIdentity/userAssignedIdentities/ua-orders" + ], + endpoints=["func-orders.azurewebsites.net"], + ingress_paths=["azure-functions-default-hostname"], exposure_families=["managed-web-hostname"], - summary=( - "App Service exposes a reachable hostname and carries a system identity." - ), + summary="Function App exposes a hostname and carries mixed identities.", related_ids=[ - "/subscriptions/sub/resourceGroups/rg-apps/providers/Microsoft.Web/sites/app-public-api", - "aaaa1111-1111-1111-1111-111111111111", + "/subscriptions/sub/resourceGroups/rg-apps/providers/Microsoft.Web/sites/func-orders", + "cccc2222-2222-2222-2222-222222222222", + "/subscriptions/sub/resourceGroups/rg-identities/providers/Microsoft.ManagedIdentity/userAssignedIdentities/ua-orders", ], ) ], @@ -161,23 +250,184 @@ def test_compute_control_prefers_explicit_system_identity_anchor_when_present() metadata=_metadata("managed-identities"), identities=[ ManagedIdentity( - id="/subscriptions/sub/resourceGroups/rg-apps/providers/Microsoft.Web/sites/app-public-api/identities/system", - name="app-public-api-system", + id="/subscriptions/sub/resourceGroups/rg-apps/providers/Microsoft.Web/sites/func-orders/identities/system", + name="func-orders-system", identity_type="systemAssigned", - principal_id="aaaa1111-1111-1111-1111-111111111111", + principal_id="cccc2222-2222-2222-2222-222222222222", + attached_to=[ + "/subscriptions/sub/resourceGroups/rg-apps/providers/Microsoft.Web/sites/func-orders" + ], + ), + ManagedIdentity( + id="/subscriptions/sub/resourceGroups/rg-identities/providers/Microsoft.ManagedIdentity/userAssignedIdentities/ua-orders", + name="ua-orders", + identity_type="userAssigned", + principal_id="dddd3333-3333-3333-3333-333333333333", attached_to=[ - "/subscriptions/sub/resourceGroups/rg-apps/providers/Microsoft.Web/sites/app-public-api" + "/subscriptions/sub/resourceGroups/rg-apps/providers/Microsoft.Web/sites/func-orders" + ], + ), + ], + role_assignments=[ + RoleAssignment( + id="ra-orders", + scope_id="/subscriptions/sub", + principal_id="dddd3333-3333-3333-3333-333333333333", + principal_type="ServicePrincipal", + role_name="Owner", + ) + ], + ), + "permissions": PermissionsOutput( + metadata=_metadata("permissions"), + permissions=[ + PermissionSummary( + principal_id="dddd3333-3333-3333-3333-333333333333", + display_name="ua-orders", + principal_type="ServicePrincipal", + priority="high", + high_impact_roles=["Owner"], + all_role_names=["Owner"], + role_assignment_count=1, + scope_count=1, + scope_ids=["/subscriptions/sub"], + privileged=True, + ) + ], + ), + "env-vars": EnvVarsOutput( + metadata=_metadata("env-vars"), + env_vars=[], + issues=[], + ), + } + + paths, issues = collect_compute_control_records("compute-control", loaded) + + assert not issues + assert_rows_include(paths, field="asset_name", expected=["func-orders"]) + row = row_by_field(paths, field="asset_name", expected="func-orders") + assert row.target_names == ["func-orders-system", "ua-orders"] + assert row.target_resolution == "narrowed candidates" + assert row.confirmation_basis == "mixed-identity-attached-candidates" + assert row.target_count == 2 + assert "cannot directly verify" in (row.confidence_boundary or "") + assert "attached identities currently in play" in (row.confidence_boundary or "") + assert "func-orders-system" in (row.target_names or []) + assert "ua-orders=Owner across subscription-wide scope" in (row.stronger_outcome or "") + assert "does not directly verify" in (row.missing_confirmation or "") + + +def test_compute_control_admits_mixed_identity_workload_when_env_vars_corroborate_choice() -> None: + loaded = { + "tokens-credentials": TokensCredentialsOutput( + metadata=_metadata("tokens-credentials"), + surfaces=[ + TokenCredentialSurfaceSummary( + asset_id="/subscriptions/sub/resourceGroups/rg-apps/providers/Microsoft.Web/sites/func-orders", + asset_name="func-orders", + asset_kind="FunctionApp", + resource_group="rg-apps", + location="eastus", + surface_type="managed-identity-token", + access_path="workload-identity", + priority="medium", + operator_signal="SystemAssigned, UserAssigned; user-assigned=1", + summary="Function App can request tokens through multiple attached identities.", + related_ids=[ + "/subscriptions/sub/resourceGroups/rg-apps/providers/Microsoft.Web/sites/func-orders", + "cccc2222-2222-2222-2222-222222222222", + "/subscriptions/sub/resourceGroups/rg-identities/providers/Microsoft.ManagedIdentity/userAssignedIdentities/ua-orders", + ], + ) + ], + issues=[], + ), + "workloads": WorkloadsOutput( + metadata=_metadata("workloads"), + workloads=[ + WorkloadSummary( + asset_id="/subscriptions/sub/resourceGroups/rg-apps/providers/Microsoft.Web/sites/func-orders", + asset_name="func-orders", + asset_kind="FunctionApp", + resource_group="rg-apps", + location="eastus", + identity_type="SystemAssigned, UserAssigned", + identity_principal_id="cccc2222-2222-2222-2222-222222222222", + identity_ids=[ + "/subscriptions/sub/resourceGroups/rg-identities/providers/Microsoft.ManagedIdentity/userAssignedIdentities/ua-orders" + ], + endpoints=["func-orders.azurewebsites.net"], + ingress_paths=["azure-functions-default-hostname"], + exposure_families=["managed-web-hostname"], + summary="Function App exposes a hostname and carries mixed identities.", + related_ids=[ + "/subscriptions/sub/resourceGroups/rg-apps/providers/Microsoft.Web/sites/func-orders", + "cccc2222-2222-2222-2222-222222222222", + "/subscriptions/sub/resourceGroups/rg-identities/providers/Microsoft.ManagedIdentity/userAssignedIdentities/ua-orders", ], ) ], + issues=[], + ), + "managed-identities": ManagedIdentitiesOutput( + metadata=_metadata("managed-identities"), + identities=[ + ManagedIdentity( + id="/subscriptions/sub/resourceGroups/rg-apps/providers/Microsoft.Web/sites/func-orders/identities/system", + name="func-orders-system", + identity_type="systemAssigned", + principal_id="cccc2222-2222-2222-2222-222222222222", + attached_to=[ + "/subscriptions/sub/resourceGroups/rg-apps/providers/Microsoft.Web/sites/func-orders" + ], + ), + ManagedIdentity( + id="/subscriptions/sub/resourceGroups/rg-identities/providers/Microsoft.ManagedIdentity/userAssignedIdentities/ua-orders", + name="ua-orders", + identity_type="userAssigned", + principal_id="dddd3333-3333-3333-3333-333333333333", + attached_to=[ + "/subscriptions/sub/resourceGroups/rg-apps/providers/Microsoft.Web/sites/func-orders" + ], + ), + ], role_assignments=[], + issues=[], + ), + "env-vars": EnvVarsOutput( + metadata=_metadata("env-vars"), + env_vars=[ + EnvVarSummary( + asset_id="/subscriptions/sub/resourceGroups/rg-apps/providers/Microsoft.Web/sites/func-orders", + asset_name="func-orders", + asset_kind="FunctionApp", + resource_group="rg-apps", + location="eastus", + workload_identity_type="SystemAssigned, UserAssigned", + workload_principal_id="cccc2222-2222-2222-2222-222222222222", + workload_client_id="dddd2222-2222-2222-2222-222222222222", + workload_identity_ids=[ + "/subscriptions/sub/resourceGroups/rg-identities/providers/Microsoft.ManagedIdentity/userAssignedIdentities/ua-orders" + ], + key_vault_reference_identity="SystemAssigned", + setting_name="PAYMENT_API_KEY", + value_type="keyvault-ref", + looks_sensitive=True, + reference_target="kvlabopen01.vault.azure.net/secrets/payment-api-key", + summary=( + "Function App uses a Key Vault-backed setting via SystemAssigned identity." + ), + ) + ], + issues=[], ), "permissions": PermissionsOutput( metadata=_metadata("permissions"), permissions=[ PermissionSummary( - principal_id="aaaa1111-1111-1111-1111-111111111111", - display_name="app-public-api-system", + principal_id="cccc2222-2222-2222-2222-222222222222", + display_name="func-orders-system", principal_type="ServicePrincipal", priority="high", high_impact_roles=["Contributor"], @@ -188,34 +438,43 @@ def test_compute_control_prefers_explicit_system_identity_anchor_when_present() privileged=True, ) ], + issues=[], ), } paths, issues = collect_compute_control_records("compute-control", loaded) assert not issues - assert len(paths) == 1 - row = paths[0] - assert row.target_ids == [ - "/subscriptions/sub/resourceGroups/rg-apps/providers/Microsoft.Web/sites/app-public-api/identities/system" - ] - assert row.target_names == ["app-public-api-system"] + assert_rows_include(paths, field="asset_name", expected=["func-orders"]) + row = row_by_field(paths, field="asset_name", expected="func-orders") + assert row.target_names == ["func-orders-system"] assert row.evidence_commands == [ "tokens-credentials", "workloads", + "env-vars", "managed-identities", "permissions", ] assert row.joined_surface_types == [ "managed-identity-token", "workload", + "identity-choice-corroboration", "identity-anchor", "permissions", ] - assert "the attached identity" in (row.confidence_boundary or "") + assert row.target_resolution == "identity-choice-corroborated" + assert row.confirmation_basis == "mixed-identity-corroborated-permission-join" + assert "mixed identities" in (row.confidence_boundary or "") + assert "cannot directly verify" in (row.confidence_boundary or "") + assert "SystemAssigned" in (row.confidence_boundary or "") + assert "does not directly verify" in (row.missing_confirmation or "") -def test_compute_control_excludes_mixed_identity_workloads_until_actor_is_explicit() -> None: +def test_compute_control_admits_user_assigned_choice_when_env_vars_names_resource_id() -> None: + user_assigned_id = ( + "/subscriptions/sub/resourceGroups/rg-identities/providers/" + "Microsoft.ManagedIdentity/userAssignedIdentities/ua-orders" + ) loaded = { "tokens-credentials": TokensCredentialsOutput( metadata=_metadata("tokens-credentials"), @@ -234,10 +493,11 @@ def test_compute_control_excludes_mixed_identity_workloads_until_actor_is_explic related_ids=[ "/subscriptions/sub/resourceGroups/rg-apps/providers/Microsoft.Web/sites/func-orders", "cccc2222-2222-2222-2222-222222222222", - "/subscriptions/sub/resourceGroups/rg-identities/providers/Microsoft.ManagedIdentity/userAssignedIdentities/ua-orders", + user_assigned_id, ], ) ], + issues=[], ), "workloads": WorkloadsOutput( metadata=_metadata("workloads"), @@ -250,9 +510,7 @@ def test_compute_control_excludes_mixed_identity_workloads_until_actor_is_explic location="eastus", identity_type="SystemAssigned, UserAssigned", identity_principal_id="cccc2222-2222-2222-2222-222222222222", - identity_ids=[ - "/subscriptions/sub/resourceGroups/rg-identities/providers/Microsoft.ManagedIdentity/userAssignedIdentities/ua-orders" - ], + identity_ids=[user_assigned_id], endpoints=["func-orders.azurewebsites.net"], ingress_paths=["azure-functions-default-hostname"], exposure_families=["managed-web-hostname"], @@ -260,33 +518,61 @@ def test_compute_control_excludes_mixed_identity_workloads_until_actor_is_explic related_ids=[ "/subscriptions/sub/resourceGroups/rg-apps/providers/Microsoft.Web/sites/func-orders", "cccc2222-2222-2222-2222-222222222222", - "/subscriptions/sub/resourceGroups/rg-identities/providers/Microsoft.ManagedIdentity/userAssignedIdentities/ua-orders", + user_assigned_id, ], ) ], + issues=[], ), "managed-identities": ManagedIdentitiesOutput( metadata=_metadata("managed-identities"), identities=[ ManagedIdentity( - id="/subscriptions/sub/resourceGroups/rg-identities/providers/Microsoft.ManagedIdentity/userAssignedIdentities/ua-orders", + id="/subscriptions/sub/resourceGroups/rg-apps/providers/Microsoft.Web/sites/func-orders/identities/system", + name="func-orders-system", + identity_type="systemAssigned", + principal_id="cccc2222-2222-2222-2222-222222222222", + attached_to=[ + "/subscriptions/sub/resourceGroups/rg-apps/providers/Microsoft.Web/sites/func-orders" + ], + ), + ManagedIdentity( + id=user_assigned_id, name="ua-orders", identity_type="userAssigned", principal_id="dddd3333-3333-3333-3333-333333333333", attached_to=[ "/subscriptions/sub/resourceGroups/rg-apps/providers/Microsoft.Web/sites/func-orders" ], - ) + ), ], - role_assignments=[ - RoleAssignment( - id="ra-orders", - scope_id="/subscriptions/sub", - principal_id="dddd3333-3333-3333-3333-333333333333", - principal_type="ServicePrincipal", - role_name="Owner", + role_assignments=[], + issues=[], + ), + "env-vars": EnvVarsOutput( + metadata=_metadata("env-vars"), + env_vars=[ + EnvVarSummary( + asset_id="/subscriptions/sub/resourceGroups/rg-apps/providers/Microsoft.Web/sites/func-orders", + asset_name="func-orders", + asset_kind="FunctionApp", + resource_group="rg-apps", + location="eastus", + workload_identity_type="SystemAssigned, UserAssigned", + workload_principal_id="cccc2222-2222-2222-2222-222222222222", + workload_client_id="dddd2222-2222-2222-2222-222222222222", + workload_identity_ids=[user_assigned_id], + key_vault_reference_identity=user_assigned_id, + setting_name="PAYMENT_API_KEY", + value_type="keyvault-ref", + looks_sensitive=True, + reference_target="kvlabopen01.vault.azure.net/secrets/payment-api-key", + summary=( + "Function App uses a Key Vault-backed setting via a user-assigned identity." + ), ) ], + issues=[], ), "permissions": PermissionsOutput( metadata=_metadata("permissions"), @@ -304,10 +590,707 @@ def test_compute_control_excludes_mixed_identity_workloads_until_actor_is_explic privileged=True, ) ], + issues=[], ), } paths, issues = collect_compute_control_records("compute-control", loaded) assert not issues - assert paths == [] + assert_rows_include(paths, field="asset_name", expected=["func-orders"]) + row = row_by_field(paths, field="asset_name", expected="func-orders") + assert row.target_names == ["ua-orders"] + assert row.target_resolution == "identity-choice-corroborated" + assert row.confirmation_basis == "mixed-identity-corroborated-permission-join" + assert "ua-orders" in (row.confidence_boundary or "") + + +def test_compute_control_falls_back_when_corroborated_identity_lacks_control() -> None: + user_assigned_id = ( + "/subscriptions/sub/resourceGroups/rg-identities/providers/" + "Microsoft.ManagedIdentity/userAssignedIdentities/ua-orders" + ) + loaded = { + "tokens-credentials": TokensCredentialsOutput( + metadata=_metadata("tokens-credentials"), + surfaces=[ + TokenCredentialSurfaceSummary( + asset_id="/subscriptions/sub/resourceGroups/rg-apps/providers/Microsoft.Web/sites/func-orders", + asset_name="func-orders", + asset_kind="FunctionApp", + resource_group="rg-apps", + location="eastus", + surface_type="managed-identity-token", + access_path="workload-identity", + priority="medium", + operator_signal="SystemAssigned, UserAssigned; user-assigned=1", + summary="Function App can request tokens through multiple attached identities.", + related_ids=[ + "/subscriptions/sub/resourceGroups/rg-apps/providers/Microsoft.Web/sites/func-orders", + "cccc2222-2222-2222-2222-222222222222", + user_assigned_id, + ], + ) + ], + issues=[], + ), + "workloads": WorkloadsOutput( + metadata=_metadata("workloads"), + workloads=[ + WorkloadSummary( + asset_id="/subscriptions/sub/resourceGroups/rg-apps/providers/Microsoft.Web/sites/func-orders", + asset_name="func-orders", + asset_kind="FunctionApp", + resource_group="rg-apps", + location="eastus", + identity_type="SystemAssigned, UserAssigned", + identity_principal_id="cccc2222-2222-2222-2222-222222222222", + identity_ids=[user_assigned_id], + endpoints=["func-orders.azurewebsites.net"], + ingress_paths=["azure-functions-default-hostname"], + exposure_families=["managed-web-hostname"], + summary="Function App exposes a hostname and carries mixed identities.", + related_ids=[ + "/subscriptions/sub/resourceGroups/rg-apps/providers/Microsoft.Web/sites/func-orders", + "cccc2222-2222-2222-2222-222222222222", + user_assigned_id, + ], + ) + ], + issues=[], + ), + "managed-identities": ManagedIdentitiesOutput( + metadata=_metadata("managed-identities"), + identities=[ + ManagedIdentity( + id="/subscriptions/sub/resourceGroups/rg-apps/providers/Microsoft.Web/sites/func-orders/identities/system", + name="func-orders-system", + identity_type="systemAssigned", + principal_id="cccc2222-2222-2222-2222-222222222222", + attached_to=[ + "/subscriptions/sub/resourceGroups/rg-apps/providers/Microsoft.Web/sites/func-orders" + ], + ), + ManagedIdentity( + id=user_assigned_id, + name="ua-orders", + identity_type="userAssigned", + principal_id="dddd3333-3333-3333-3333-333333333333", + attached_to=[ + "/subscriptions/sub/resourceGroups/rg-apps/providers/Microsoft.Web/sites/func-orders" + ], + ), + ], + role_assignments=[], + issues=[], + ), + "env-vars": EnvVarsOutput( + metadata=_metadata("env-vars"), + env_vars=[ + EnvVarSummary( + asset_id="/subscriptions/sub/resourceGroups/rg-apps/providers/Microsoft.Web/sites/func-orders", + asset_name="func-orders", + asset_kind="FunctionApp", + resource_group="rg-apps", + location="eastus", + workload_identity_type="SystemAssigned, UserAssigned", + workload_principal_id="cccc2222-2222-2222-2222-222222222222", + workload_client_id="dddd2222-2222-2222-2222-222222222222", + workload_identity_ids=[user_assigned_id], + key_vault_reference_identity="SystemAssigned", + setting_name="PAYMENT_API_KEY", + value_type="keyvault-ref", + looks_sensitive=True, + reference_target="kvlabopen01.vault.azure.net/secrets/payment-api-key", + summary=( + "Function App uses a Key Vault-backed setting via SystemAssigned identity." + ), + ) + ], + issues=[], + ), + "permissions": PermissionsOutput( + metadata=_metadata("permissions"), + permissions=[ + PermissionSummary( + principal_id="dddd3333-3333-3333-3333-333333333333", + display_name="ua-orders", + principal_type="ServicePrincipal", + priority="high", + high_impact_roles=["Owner"], + all_role_names=["Owner"], + role_assignment_count=1, + scope_count=1, + scope_ids=["/subscriptions/sub"], + privileged=True, + ) + ], + issues=[], + ), + } + + paths, issues = collect_compute_control_records("compute-control", loaded) + + assert not issues + assert_rows_include(paths, field="asset_name", expected=["func-orders"]) + row = row_by_field(paths, field="asset_name", expected="func-orders") + assert row.target_resolution == "narrowed candidates" + assert row.confirmation_basis == "mixed-identity-attached-candidates" + assert row.target_names == ["func-orders-system", "ua-orders"] + assert "ua-orders=Owner across subscription-wide scope" in (row.stronger_outcome or "") + + +def test_compute_control_rejects_conflicting_env_var_identity_hints() -> None: + user_assigned_id = ( + "/subscriptions/sub/resourceGroups/rg-identities/providers/" + "Microsoft.ManagedIdentity/userAssignedIdentities/ua-orders" + ) + loaded = { + "tokens-credentials": TokensCredentialsOutput( + metadata=_metadata("tokens-credentials"), + surfaces=[ + TokenCredentialSurfaceSummary( + asset_id="/subscriptions/sub/resourceGroups/rg-apps/providers/Microsoft.Web/sites/func-orders", + asset_name="func-orders", + asset_kind="FunctionApp", + resource_group="rg-apps", + location="eastus", + surface_type="managed-identity-token", + access_path="workload-identity", + priority="medium", + operator_signal="SystemAssigned, UserAssigned; user-assigned=1", + summary="Function App can request tokens through multiple attached identities.", + related_ids=[ + "/subscriptions/sub/resourceGroups/rg-apps/providers/Microsoft.Web/sites/func-orders", + "cccc2222-2222-2222-2222-222222222222", + user_assigned_id, + ], + ) + ], + issues=[], + ), + "workloads": WorkloadsOutput( + metadata=_metadata("workloads"), + workloads=[ + WorkloadSummary( + asset_id="/subscriptions/sub/resourceGroups/rg-apps/providers/Microsoft.Web/sites/func-orders", + asset_name="func-orders", + asset_kind="FunctionApp", + resource_group="rg-apps", + location="eastus", + identity_type="SystemAssigned, UserAssigned", + identity_principal_id="cccc2222-2222-2222-2222-222222222222", + identity_ids=[user_assigned_id], + endpoints=["func-orders.azurewebsites.net"], + ingress_paths=["azure-functions-default-hostname"], + exposure_families=["managed-web-hostname"], + summary="Function App exposes a hostname and carries mixed identities.", + related_ids=[ + "/subscriptions/sub/resourceGroups/rg-apps/providers/Microsoft.Web/sites/func-orders", + "cccc2222-2222-2222-2222-222222222222", + user_assigned_id, + ], + ) + ], + issues=[], + ), + "managed-identities": ManagedIdentitiesOutput( + metadata=_metadata("managed-identities"), + identities=[ + ManagedIdentity( + id="/subscriptions/sub/resourceGroups/rg-apps/providers/Microsoft.Web/sites/func-orders/identities/system", + name="func-orders-system", + identity_type="systemAssigned", + principal_id="cccc2222-2222-2222-2222-222222222222", + attached_to=[ + "/subscriptions/sub/resourceGroups/rg-apps/providers/Microsoft.Web/sites/func-orders" + ], + ), + ManagedIdentity( + id=user_assigned_id, + name="ua-orders", + identity_type="userAssigned", + principal_id="dddd3333-3333-3333-3333-333333333333", + attached_to=[ + "/subscriptions/sub/resourceGroups/rg-apps/providers/Microsoft.Web/sites/func-orders" + ], + ), + ], + role_assignments=[], + issues=[], + ), + "env-vars": EnvVarsOutput( + metadata=_metadata("env-vars"), + env_vars=[ + EnvVarSummary( + asset_id="/subscriptions/sub/resourceGroups/rg-apps/providers/Microsoft.Web/sites/func-orders", + asset_name="func-orders", + asset_kind="FunctionApp", + resource_group="rg-apps", + location="eastus", + workload_identity_type="SystemAssigned, UserAssigned", + workload_principal_id="cccc2222-2222-2222-2222-222222222222", + workload_client_id="dddd2222-2222-2222-2222-222222222222", + workload_identity_ids=[user_assigned_id], + key_vault_reference_identity="SystemAssigned", + setting_name="PAYMENT_API_KEY", + value_type="keyvault-ref", + looks_sensitive=True, + reference_target="kvlabopen01.vault.azure.net/secrets/payment-api-key", + summary=( + "Function App uses a Key Vault-backed setting via SystemAssigned identity." + ), + ), + EnvVarSummary( + asset_id="/subscriptions/sub/resourceGroups/rg-apps/providers/Microsoft.Web/sites/func-orders", + asset_name="func-orders", + asset_kind="FunctionApp", + resource_group="rg-apps", + location="eastus", + workload_identity_type="SystemAssigned, UserAssigned", + workload_principal_id="cccc2222-2222-2222-2222-222222222222", + workload_client_id="dddd2222-2222-2222-2222-222222222222", + workload_identity_ids=[user_assigned_id], + key_vault_reference_identity=user_assigned_id, + setting_name="PAYMENT_API_KEY_ALT", + value_type="keyvault-ref", + looks_sensitive=True, + reference_target="kvlabopen01.vault.azure.net/secrets/payment-api-key-alt", + summary=( + "Function App uses a Key Vault-backed setting via a user-assigned identity." + ), + ), + ], + issues=[], + ), + "permissions": PermissionsOutput( + metadata=_metadata("permissions"), + permissions=[ + PermissionSummary( + principal_id="cccc2222-2222-2222-2222-222222222222", + display_name="func-orders-system", + principal_type="ServicePrincipal", + priority="high", + high_impact_roles=["Contributor"], + all_role_names=["Contributor"], + role_assignment_count=1, + scope_count=1, + scope_ids=["/subscriptions/sub/resourceGroups/rg-apps"], + privileged=True, + ), + PermissionSummary( + principal_id="dddd3333-3333-3333-3333-333333333333", + display_name="ua-orders", + principal_type="ServicePrincipal", + priority="high", + high_impact_roles=["Owner"], + all_role_names=["Owner"], + role_assignment_count=1, + scope_count=1, + scope_ids=["/subscriptions/sub"], + privileged=True, + ), + ], + issues=[], + ), + } + + paths, issues = collect_compute_control_records("compute-control", loaded) + + assert not issues + assert_rows_include(paths, field="asset_name", expected=["func-orders"]) + row = row_by_field(paths, field="asset_name", expected="func-orders") + assert row.target_resolution == "narrowed candidates" + assert row.confirmation_basis == "mixed-identity-attached-candidates" + assert row.target_names == ["func-orders-system", "ua-orders"] + + +def test_compute_control_rejects_duplicate_user_assigned_suffix_match() -> None: + user_assigned_id_a = ( + "/subscriptions/sub/resourceGroups/rg-identities-a/providers/" + "Microsoft.ManagedIdentity/userAssignedIdentities/ua-shared" + ) + user_assigned_id_b = ( + "/subscriptions/sub/resourceGroups/rg-identities-b/providers/" + "Microsoft.ManagedIdentity/userAssignedIdentities/ua-shared" + ) + loaded = { + "tokens-credentials": TokensCredentialsOutput( + metadata=_metadata("tokens-credentials"), + surfaces=[ + TokenCredentialSurfaceSummary( + asset_id="/subscriptions/sub/resourceGroups/rg-apps/providers/Microsoft.Web/sites/func-orders", + asset_name="func-orders", + asset_kind="FunctionApp", + resource_group="rg-apps", + location="eastus", + surface_type="managed-identity-token", + access_path="workload-identity", + priority="medium", + operator_signal="SystemAssigned, UserAssigned; user-assigned=2", + summary="Function App can request tokens through multiple attached identities.", + related_ids=[ + "/subscriptions/sub/resourceGroups/rg-apps/providers/Microsoft.Web/sites/func-orders", + "cccc2222-2222-2222-2222-222222222222", + user_assigned_id_a, + user_assigned_id_b, + ], + ) + ], + issues=[], + ), + "workloads": WorkloadsOutput( + metadata=_metadata("workloads"), + workloads=[ + WorkloadSummary( + asset_id="/subscriptions/sub/resourceGroups/rg-apps/providers/Microsoft.Web/sites/func-orders", + asset_name="func-orders", + asset_kind="FunctionApp", + resource_group="rg-apps", + location="eastus", + identity_type="SystemAssigned, UserAssigned", + identity_principal_id="cccc2222-2222-2222-2222-222222222222", + identity_ids=[user_assigned_id_a, user_assigned_id_b], + endpoints=["func-orders.azurewebsites.net"], + ingress_paths=["azure-functions-default-hostname"], + exposure_families=["managed-web-hostname"], + summary="Function App exposes a hostname and carries mixed identities.", + related_ids=[ + "/subscriptions/sub/resourceGroups/rg-apps/providers/Microsoft.Web/sites/func-orders", + "cccc2222-2222-2222-2222-222222222222", + user_assigned_id_a, + user_assigned_id_b, + ], + ) + ], + issues=[], + ), + "managed-identities": ManagedIdentitiesOutput( + metadata=_metadata("managed-identities"), + identities=[ + ManagedIdentity( + id=user_assigned_id_a, + name="ua-shared-a", + identity_type="userAssigned", + principal_id="dddd3333-3333-3333-3333-333333333333", + attached_to=[ + "/subscriptions/sub/resourceGroups/rg-apps/providers/Microsoft.Web/sites/func-orders" + ], + ), + ManagedIdentity( + id=user_assigned_id_b, + name="ua-shared-b", + identity_type="userAssigned", + principal_id="eeee4444-4444-4444-4444-444444444444", + attached_to=[ + "/subscriptions/sub/resourceGroups/rg-apps/providers/Microsoft.Web/sites/func-orders" + ], + ), + ], + role_assignments=[], + issues=[], + ), + "env-vars": EnvVarsOutput( + metadata=_metadata("env-vars"), + env_vars=[ + EnvVarSummary( + asset_id="/subscriptions/sub/resourceGroups/rg-apps/providers/Microsoft.Web/sites/func-orders", + asset_name="func-orders", + asset_kind="FunctionApp", + resource_group="rg-apps", + location="eastus", + workload_identity_type="SystemAssigned, UserAssigned", + workload_principal_id="cccc2222-2222-2222-2222-222222222222", + workload_client_id="dddd2222-2222-2222-2222-222222222222", + workload_identity_ids=[user_assigned_id_a, user_assigned_id_b], + key_vault_reference_identity="ua-shared", + setting_name="PAYMENT_API_KEY", + value_type="keyvault-ref", + looks_sensitive=True, + reference_target="kvlabopen01.vault.azure.net/secrets/payment-api-key", + summary=( + "Function App uses a Key Vault-backed setting via a user-assigned identity." + ), + ) + ], + issues=[], + ), + "permissions": PermissionsOutput( + metadata=_metadata("permissions"), + permissions=[ + PermissionSummary( + principal_id="dddd3333-3333-3333-3333-333333333333", + display_name="ua-shared-a", + principal_type="ServicePrincipal", + priority="high", + high_impact_roles=["Owner"], + all_role_names=["Owner"], + role_assignment_count=1, + scope_count=1, + scope_ids=["/subscriptions/sub"], + privileged=True, + ) + ], + issues=[], + ), + } + + paths, issues = collect_compute_control_records("compute-control", loaded) + + assert not issues + assert_rows_include(paths, field="asset_name", expected=["func-orders"]) + row = row_by_field(paths, field="asset_name", expected="func-orders") + assert row.target_resolution == "narrowed candidates" + assert row.confirmation_basis == "mixed-identity-attached-candidates" + assert row.target_names == ["func-orders system identity", "ua-shared-a", "ua-shared-b"] + + +def test_compute_control_bounded_candidates_do_not_claim_anchor_without_identity_rows() -> None: + loaded = { + "tokens-credentials": TokensCredentialsOutput( + metadata=_metadata("tokens-credentials"), + surfaces=[ + TokenCredentialSurfaceSummary( + asset_id="/subscriptions/sub/resourceGroups/rg-apps/providers/Microsoft.Web/sites/func-orders", + asset_name="func-orders", + asset_kind="FunctionApp", + resource_group="rg-apps", + location="eastus", + surface_type="managed-identity-token", + access_path="workload-identity", + priority="medium", + operator_signal="SystemAssigned, UserAssigned; user-assigned=1", + summary="Function App can request tokens through multiple attached identities.", + related_ids=[ + "/subscriptions/sub/resourceGroups/rg-apps/providers/Microsoft.Web/sites/func-orders", + "cccc2222-2222-2222-2222-222222222222", + "/subscriptions/sub/resourceGroups/rg-identities/providers/Microsoft.ManagedIdentity/userAssignedIdentities/ua-orders", + ], + ) + ], + issues=[], + ), + "workloads": WorkloadsOutput( + metadata=_metadata("workloads"), + workloads=[ + WorkloadSummary( + asset_id="/subscriptions/sub/resourceGroups/rg-apps/providers/Microsoft.Web/sites/func-orders", + asset_name="func-orders", + asset_kind="FunctionApp", + resource_group="rg-apps", + location="eastus", + identity_type="SystemAssigned, UserAssigned", + identity_principal_id="cccc2222-2222-2222-2222-222222222222", + identity_ids=[ + "/subscriptions/sub/resourceGroups/rg-identities/providers/Microsoft.ManagedIdentity/userAssignedIdentities/ua-orders" + ], + endpoints=["func-orders.azurewebsites.net"], + ingress_paths=["azure-functions-default-hostname"], + exposure_families=["managed-web-hostname"], + summary="Function App exposes a hostname and carries mixed identities.", + related_ids=[ + "/subscriptions/sub/resourceGroups/rg-apps/providers/Microsoft.Web/sites/func-orders", + "cccc2222-2222-2222-2222-222222222222", + "/subscriptions/sub/resourceGroups/rg-identities/providers/Microsoft.ManagedIdentity/userAssignedIdentities/ua-orders", + ], + ) + ], + issues=[], + ), + "managed-identities": ManagedIdentitiesOutput( + metadata=_metadata("managed-identities"), + identities=[], + role_assignments=[], + issues=[], + ), + "env-vars": EnvVarsOutput( + metadata=_metadata("env-vars"), + env_vars=[], + issues=[], + ), + "permissions": PermissionsOutput( + metadata=_metadata("permissions"), + permissions=[ + PermissionSummary( + principal_id="cccc2222-2222-2222-2222-222222222222", + display_name="func-orders-system", + principal_type="ServicePrincipal", + priority="high", + high_impact_roles=["Contributor"], + all_role_names=["Contributor"], + role_assignment_count=1, + scope_count=1, + scope_ids=["/subscriptions/sub/resourceGroups/rg-apps"], + privileged=True, + ) + ], + issues=[], + ), + } + + paths, issues = collect_compute_control_records("compute-control", loaded) + + assert not issues + assert_rows_include(paths, field="asset_name", expected=["func-orders"]) + row = row_by_field(paths, field="asset_name", expected="func-orders") + assert row.target_resolution == "narrowed candidates" + assert row.evidence_commands == ["tokens-credentials", "workloads", "permissions"] + assert row.joined_surface_types == [ + "managed-identity-token", + "workload", + "workload-principal", + "permissions", + ] + + +def test_compute_control_suppresses_mixed_identity_workload_without_visible_control() -> None: + loaded = { + "tokens-credentials": TokensCredentialsOutput( + metadata=_metadata("tokens-credentials"), + surfaces=[ + TokenCredentialSurfaceSummary( + asset_id="/subscriptions/sub/resourceGroups/rg-apps/providers/Microsoft.Web/sites/func-orders", + asset_name="func-orders", + asset_kind="FunctionApp", + resource_group="rg-apps", + location="eastus", + surface_type="managed-identity-token", + access_path="workload-identity", + priority="medium", + operator_signal="SystemAssigned, UserAssigned; user-assigned=1", + summary="Function App can request tokens through multiple attached identities.", + related_ids=[ + "/subscriptions/sub/resourceGroups/rg-apps/providers/Microsoft.Web/sites/func-orders", + "cccc2222-2222-2222-2222-222222222222", + "/subscriptions/sub/resourceGroups/rg-identities/providers/Microsoft.ManagedIdentity/userAssignedIdentities/ua-orders", + ], + ) + ], + issues=[], + ), + "workloads": WorkloadsOutput( + metadata=_metadata("workloads"), + workloads=[ + WorkloadSummary( + asset_id="/subscriptions/sub/resourceGroups/rg-apps/providers/Microsoft.Web/sites/func-orders", + asset_name="func-orders", + asset_kind="FunctionApp", + resource_group="rg-apps", + location="eastus", + identity_type="SystemAssigned, UserAssigned", + identity_principal_id="cccc2222-2222-2222-2222-222222222222", + identity_ids=[ + "/subscriptions/sub/resourceGroups/rg-identities/providers/Microsoft.ManagedIdentity/userAssignedIdentities/ua-orders" + ], + endpoints=["func-orders.azurewebsites.net"], + ingress_paths=["azure-functions-default-hostname"], + exposure_families=["managed-web-hostname"], + summary="Function App exposes a hostname and carries mixed identities.", + related_ids=[ + "/subscriptions/sub/resourceGroups/rg-apps/providers/Microsoft.Web/sites/func-orders", + "cccc2222-2222-2222-2222-222222222222", + "/subscriptions/sub/resourceGroups/rg-identities/providers/Microsoft.ManagedIdentity/userAssignedIdentities/ua-orders", + ], + ) + ], + issues=[], + ), + "managed-identities": ManagedIdentitiesOutput( + metadata=_metadata("managed-identities"), + identities=[ + ManagedIdentity( + id="/subscriptions/sub/resourceGroups/rg-apps/providers/Microsoft.Web/sites/func-orders/identities/system", + name="func-orders-system", + identity_type="systemAssigned", + principal_id="cccc2222-2222-2222-2222-222222222222", + attached_to=[ + "/subscriptions/sub/resourceGroups/rg-apps/providers/Microsoft.Web/sites/func-orders" + ], + ), + ManagedIdentity( + id="/subscriptions/sub/resourceGroups/rg-identities/providers/Microsoft.ManagedIdentity/userAssignedIdentities/ua-orders", + name="ua-orders", + identity_type="userAssigned", + principal_id="dddd3333-3333-3333-3333-333333333333", + attached_to=[ + "/subscriptions/sub/resourceGroups/rg-apps/providers/Microsoft.Web/sites/func-orders" + ], + ), + ], + role_assignments=[], + issues=[], + ), + "permissions": PermissionsOutput( + metadata=_metadata("permissions"), + permissions=[], + issues=[], + ), + "env-vars": EnvVarsOutput( + metadata=_metadata("env-vars"), + env_vars=[], + issues=[], + ), + } + + paths, issues = collect_compute_control_records("compute-control", loaded) + + assert not issues + assert_rows_exclude(paths, field="asset_name", expected=["func-orders"]) + + +def test_compute_control_suppresses_system_assigned_workload_without_stronger_control() -> None: + loaded = _base_loaded_app_service( + asset_name="app-empty-mi", + principal_id="eeee3333-3333-3333-3333-333333333333", + permission=None, + identities=[ + ManagedIdentity( + id="/subscriptions/sub/resourceGroups/rg-apps/providers/Microsoft.Web/sites/app-empty-mi/identities/system", + name="app-empty-mi-system", + identity_type="systemAssigned", + principal_id="eeee3333-3333-3333-3333-333333333333", + attached_to=[ + "/subscriptions/sub/resourceGroups/rg-apps/providers/Microsoft.Web/sites/app-empty-mi" + ], + ) + ], + ) + + paths, issues = collect_compute_control_records("compute-control", loaded) + + assert not issues + assert_rows_exclude(paths, field="asset_name", expected=["app-empty-mi"]) + + +def test_compute_control_preserves_partial_visibility_issues_when_row_still_admits() -> None: + loaded = _base_loaded_app_service( + asset_name="app-empty-mi", + principal_id="eeee3333-3333-3333-3333-333333333333", + permission=PermissionSummary( + principal_id="eeee3333-3333-3333-3333-333333333333", + display_name="app-empty-mi-system", + principal_type="ServicePrincipal", + priority="high", + high_impact_roles=["Contributor"], + all_role_names=["Contributor"], + role_assignment_count=1, + scope_count=1, + scope_ids=["/subscriptions/sub/resourceGroups/rg-apps"], + privileged=True, + ), + managed_identity_issues=[ + CollectionIssue( + kind="permission_denied", + message="managed-identities[app-empty-mi]: nested configuration read blocked", + context={"collector": "managed-identities[app-empty-mi]"}, + ) + ], + ) + + paths, issues = collect_compute_control_records("compute-control", loaded) + + assert_rows_include(paths, field="asset_name", expected=["app-empty-mi"]) + assert_issue_collectors_include( + issues, + expected_collectors=["managed-identities[app-empty-mi]"], + ) diff --git a/tests/test_help.py b/tests/test_help.py index 9429ab4..8eefb15 100644 --- a/tests/test_help.py +++ b/tests/test_help.py @@ -1,5 +1,6 @@ from __future__ import annotations +import pytest from typer.testing import CliRunner from azurefox.cli import _normalize_argv, app @@ -52,248 +53,225 @@ def test_help_command_command_topic() -> None: assert "names the current gap explicitly" in result.stdout -def test_help_command_arm_deployments_topic() -> None: - result = runner.invoke(app, ["help", "arm-deployments"]) - - assert result.exit_code == 0 - assert "AzureFox Help :: arm-deployments" in result.stdout - assert "linked templates" in result.stdout - assert "outputs_count" in result.stdout - - -def test_help_command_automation_topic() -> None: - result = runner.invoke(app, ["help", "automation"]) - - assert result.exit_code == 0 - assert "AzureFox Help :: automation" in result.stdout - assert "Hybrid Worker" in result.stdout - assert "published_runbook_count" in result.stdout - assert "webhook_count" in result.stdout - assert "encrypted_variable_count" in result.stdout - - -def test_help_command_devops_topic() -> None: - result = runner.invoke(app, ["help", "devops"]) - - assert result.exit_code == 0 - assert "AzureFox Help :: devops" in result.stdout - assert "build definitions" in result.stdout - assert "repository_host_type" in result.stdout - assert "source_visibility_state" in result.stdout - assert "execution_modes" in result.stdout - assert "trusted_input_types" in result.stdout - assert "trusted_input_refs" in result.stdout - assert "primary_injection_surface" in result.stdout - assert "injection_surface_types" in result.stdout - assert "current_operator_injection_surface_types" in result.stdout - assert "azure_service_connection_names" in result.stdout - assert "current_operator_can_contribute_source" in result.stdout - assert "missing_injection_point" in result.stdout - assert "consequence_types" in result.stdout - assert "--devops-organization" in result.stdout - - -def test_help_command_endpoints_topic() -> None: - result = runner.invoke(app, ["help", "endpoints"]) - - assert result.exit_code == 0 - assert "AzureFox Help :: endpoints" in result.stdout - assert "ingress triage view" in result.stdout - assert "exposure_family" in result.stdout - assert "ingress_path" in result.stdout - - -def test_help_command_cross_tenant_topic() -> None: - result = runner.invoke(app, ["help", "cross-tenant"]) - - assert result.exit_code == 0 - assert "AzureFox Help :: cross-tenant" in result.stdout - assert "outside-tenant trust" in result.stdout - assert "attack_path" in result.stdout - - -def test_help_command_app_services_topic() -> None: - result = runner.invoke(app, ["help", "app-services"]) - - assert result.exit_code == 0 - assert "AzureFox Help :: app-services" in result.stdout - assert "runtime stack" in result.stdout - assert "workload_identity_type" in result.stdout - assert "public_network_access" in result.stdout - - -def test_help_command_acr_topic() -> None: - result = runner.invoke(app, ["help", "acr"]) - - assert result.exit_code == 0 - assert "AzureFox Help :: acr" in result.stdout - assert "Azure Container Registry (ACR)" in result.stdout - assert "login_server" in result.stdout - assert "admin_user_enabled" in result.stdout - assert "webhook_count" in result.stdout - assert "replication_count" in result.stdout - assert "network_rule_default_action" in result.stdout - - -def test_help_command_databases_topic() -> None: - result = runner.invoke(app, ["help", "databases"]) - - assert result.exit_code == 0 - assert "AzureFox Help :: databases" in result.stdout - assert "engine" in result.stdout - assert "fully_qualified_domain_name" in result.stdout - assert "database_count" in result.stdout - assert "high_availability_mode" in result.stdout - assert "minimal_tls_version" in result.stdout - - -def test_help_command_dns_topic() -> None: - result = runner.invoke(app, ["help", "dns"]) - - assert result.exit_code == 0 - assert "AzureFox Help :: dns" in result.stdout - assert "private VNet-linked namespace context" in result.stdout - assert "name_servers" in result.stdout - assert "registration_virtual_network_count" in result.stdout - assert "private_endpoint_reference_count" in result.stdout - - -def test_help_command_application_gateway_topic() -> None: - result = runner.invoke(app, ["help", "application-gateway"]) - - assert result.exit_code == 0 - assert "AzureFox Help :: application-gateway" in result.stdout - assert "shared public front doors" in result.stdout - assert "listener_count" in result.stdout - assert "request_routing_rule_count" in result.stdout - assert "firewall_policy_id" in result.stdout - - -def test_help_command_storage_topic() -> None: - result = runner.invoke(app, ["help", "storage"]) - - assert result.exit_code == 0 - assert "AzureFox Help :: storage" in result.stdout - assert "public_network_access" in result.stdout - assert "allow_shared_key_access" in result.stdout - assert "minimum_tls_version" in result.stdout - assert "https_traffic_only_enabled" in result.stdout - - -def test_help_command_lighthouse_topic() -> None: - result = runner.invoke(app, ["help", "lighthouse"]) - - assert result.exit_code == 0 - assert "AzureFox Help :: lighthouse" in result.stdout - assert "Azure Lighthouse" in result.stdout - assert "managed_by_tenant_name" in result.stdout - assert "eligible_authorization_count" in result.stdout - assert "provisioning_state" in result.stdout - - -def test_help_command_snapshots_disks_topic() -> None: - result = runner.invoke(app, ["help", "snapshots-disks"]) - - assert result.exit_code == 0 - assert "AzureFox Help :: snapshots-disks" in result.stdout - assert "highest-value offline-copy targets first" in result.stdout - assert "attachment_state" in result.stdout - assert "network_access_policy" in result.stdout - assert "disk_encryption_set_id" in result.stdout - - -def test_help_command_network_effective_topic() -> None: - result = runner.invoke(app, ["help", "network-effective"]) - - assert result.exit_code == 0 - assert "AzureFox Help :: network-effective" in result.stdout - assert "public-IP-backed assets" in result.stdout - assert "not to prove full effective exposure" in result.stdout - - -def test_help_command_functions_topic() -> None: - result = runner.invoke(app, ["help", "functions"]) - - assert result.exit_code == 0 - assert "AzureFox Help :: functions" in result.stdout - assert "Functions runtime" in result.stdout - assert "azure_webjobs_storage_value_type" in result.stdout - assert "run_from_package" in result.stdout - - -def test_help_command_aks_topic() -> None: - result = runner.invoke(app, ["help", "aks"]) - - assert result.exit_code == 0 - assert "AzureFox Help :: aks" in result.stdout - assert "Azure Kubernetes Service" in result.stdout - assert "private_cluster_enabled" in result.stdout - assert "cluster_identity_type" in result.stdout - assert "azure_rbac_enabled" in result.stdout - assert "network_plugin" in result.stdout - assert "oidc_issuer_enabled" in result.stdout - - -def test_help_command_api_mgmt_topic() -> None: - result = runner.invoke(app, ["help", "api-mgmt"]) - - assert result.exit_code == 0 - assert "AzureFox Help :: api-mgmt" in result.stdout - assert "Application Programming Interface (API) Management" in result.stdout - assert "gateway_hostnames" in result.stdout - assert "virtual_network_type" in result.stdout - assert "active_subscription_count" in result.stdout - assert "named_value_secret_count" in result.stdout - - -def test_help_command_vmss_topic() -> None: - result = runner.invoke(app, ["help", "vmss"]) - - assert result.exit_code == 0 - assert "AzureFox Help :: vmss" in result.stdout - assert "Virtual Machine Scale Sets" in result.stdout - assert "instance_count" in result.stdout - assert "orchestration_mode" in result.stdout - assert "public_ip_configuration_count" in result.stdout - - -def test_help_command_network_ports_topic() -> None: - result = runner.invoke(app, ["help", "network-ports"]) - - assert result.exit_code == 0 - assert "AzureFox Help :: network-ports" in result.stdout - assert "NIC-backed public endpoints" in result.stdout - assert "allow_source_summary" in result.stdout - assert "exposure_confidence" in result.stdout - - -def test_help_command_rbac_topic() -> None: - result = runner.invoke(app, ["help", "rbac"]) - - assert result.exit_code == 0 - assert "AzureFox Help :: rbac" in result.stdout - assert "Role-Based Access Control (RBAC)" in result.stdout - assert "role_assignments" in result.stdout - - -def test_help_command_nics_topic() -> None: - result = runner.invoke(app, ["help", "nics"]) - - assert result.exit_code == 0 - assert "AzureFox Help :: nics" in result.stdout - assert "network interfaces (NICs)" in result.stdout - assert "attached_asset_name" in result.stdout - assert "public IP references" in result.stdout - assert "network_security_group_id" in result.stdout - - -def test_help_command_vms_topic() -> None: - result = runner.invoke(app, ["help", "vms"]) - - assert result.exit_code == 0 - assert "AzureFox Help :: vms" in result.stdout - assert "virtual machines (VMs)" in result.stdout - assert "public_ips" in result.stdout +HELP_TOPICS_IDENTITY = ( + ("cross-tenant", ("outside-tenant trust", "attack_path")), + ("rbac", ("Role-Based Access Control (RBAC)", "role_assignments")), + ( + "role-trusts", + ( + "Trusted Relationship", + "federated credentials", + "delegated or admin consent grants", + "Fast mode is the default", + "per-application owner and federated credential lookups", + "usable_identity_result", + "escalation_mechanism", + "operator_signal", + "next_review", + ), + ), + ( + "auth-policies", + ( + "Conditional Access", + "guest, consent, app-creation, or sign-in abuse paths", + "Unreadable policy surfaces stay explicit", + "issues", + ), + ), +) + +HELP_TOPICS_INFRA = ( + ("arm-deployments", ("linked templates", "outputs_count")), + ( + "automation", + ( + "Hybrid Worker", + "published_runbook_count", + "webhook_count", + "encrypted_variable_count", + ), + ), + ( + "devops", + ( + "build definitions", + "repository_host_type", + "source_visibility_state", + "execution_modes", + "trusted_input_types", + "trusted_input_refs", + "primary_injection_surface", + "injection_surface_types", + "current_operator_injection_surface_types", + "azure_service_connection_names", + "current_operator_can_contribute_source", + "missing_injection_point", + "consequence_types", + "--devops-organization", + ), + ), + ("endpoints", ("ingress triage view", "exposure_family", "ingress_path")), + ( + "acr", + ( + "Azure Container Registry (ACR)", + "login_server", + "admin_user_enabled", + "webhook_count", + "replication_count", + "network_rule_default_action", + ), + ), + ( + "databases", + ( + "engine", + "fully_qualified_domain_name", + "database_count", + "high_availability_mode", + "minimal_tls_version", + ), + ), + ( + "dns", + ( + "private VNet-linked namespace context", + "name_servers", + "registration_virtual_network_count", + "private_endpoint_reference_count", + ), + ), + ( + "application-gateway", + ( + "shared public front doors", + "listener_count", + "request_routing_rule_count", + "firewall_policy_id", + ), + ), + ( + "storage", + ( + "public_network_access", + "allow_shared_key_access", + "minimum_tls_version", + "https_traffic_only_enabled", + ), + ), + ( + "lighthouse", + ( + "Azure Lighthouse", + "managed_by_tenant_name", + "eligible_authorization_count", + "provisioning_state", + ), + ), + ( + "snapshots-disks", + ( + "highest-value offline-copy targets first", + "attachment_state", + "network_access_policy", + "disk_encryption_set_id", + ), + ), + ("network-effective", ("public-IP-backed assets", "not to prove full effective exposure")), + ( + "network-ports", + ("NIC-backed public endpoints", "allow_source_summary", "exposure_confidence"), + ), + ( + "nics", + ( + "network interfaces (NICs)", + "attached_asset_name", + "public IP references", + "network_security_group_id", + ), + ), + ("keyvault", ("secret-management surface", "purge_protection_enabled")), + ("resource-trusts", ("public network paths", "resource_type")), +) + +HELP_TOPICS_WORKLOADS = ( + ("app-services", ("runtime stack", "workload_identity_type", "public_network_access")), + ("functions", ("Functions runtime", "azure_webjobs_storage_value_type", "run_from_package")), + ( + "aks", + ( + "Azure Kubernetes Service", + "private_cluster_enabled", + "cluster_identity_type", + "azure_rbac_enabled", + "network_plugin", + "oidc_issuer_enabled", + ), + ), + ( + "api-mgmt", + ( + "Application Programming Interface (API) Management", + "gateway_hostnames", + "virtual_network_type", + "active_subscription_count", + "named_value_secret_count", + ), + ), + ( + "vmss", + ( + "Virtual Machine Scale Sets", + "instance_count", + "orchestration_mode", + "public_ip_configuration_count", + ), + ), + ("vms", ("virtual machines (VMs)", "public_ips")), + ( + "env-vars", + ("Key Vault references", "pivot to next", "workload_identity_type", "setting_name"), + ), + ("tokens-credentials", ("pivot to next", "mint tokens", "operator_signal")), + ( + "privesc", + ( + "Cloud Instance Metadata API", + "workload identity pivots", + "starting_foothold", + "operator_signal", + "proven_path", + "next_review", + ), + ), + ("workloads", ("joined workload census", "identity_type", "ingress_paths")), +) + + +def _assert_help_topic_surface(topic: str, snippets: tuple[str, ...]) -> None: + result = runner.invoke(app, ["help", topic]) + + assert result.exit_code == 0 + assert f"AzureFox Help :: {topic}" in result.stdout + for snippet in snippets: + assert snippet in result.stdout + + +@pytest.mark.parametrize(("topic", "snippets"), HELP_TOPICS_IDENTITY) +def test_help_command_topic_surface_identity(topic: str, snippets: tuple[str, ...]) -> None: + _assert_help_topic_surface(topic, snippets) + + +@pytest.mark.parametrize(("topic", "snippets"), HELP_TOPICS_INFRA) +def test_help_command_topic_surface_infra(topic: str, snippets: tuple[str, ...]) -> None: + _assert_help_topic_surface(topic, snippets) + + +@pytest.mark.parametrize(("topic", "snippets"), HELP_TOPICS_WORKLOADS) +def test_help_command_topic_surface_workloads(topic: str, snippets: tuple[str, ...]) -> None: + _assert_help_topic_surface(topic, snippets) def test_help_command_chains_topic_sets_planned_runtime_expectations() -> None: @@ -314,95 +292,6 @@ def test_help_command_chains_topic_sets_planned_runtime_expectations() -> None: assert "current_gap" in result.stdout -def test_help_command_env_vars_topic() -> None: - result = runner.invoke(app, ["help", "env-vars"]) - - assert result.exit_code == 0 - assert "AzureFox Help :: env-vars" in result.stdout - assert "Key Vault references" in result.stdout - assert "pivot to next" in result.stdout - assert "workload_identity_type" in result.stdout - assert "setting_name" in result.stdout - - -def test_help_command_tokens_credentials_topic() -> None: - result = runner.invoke(app, ["help", "tokens-credentials"]) - - assert result.exit_code == 0 - assert "AzureFox Help :: tokens-credentials" in result.stdout - assert "pivot to next" in result.stdout - assert "mint tokens" in result.stdout - assert "operator_signal" in result.stdout - - -def test_help_command_privesc_topic() -> None: - result = runner.invoke(app, ["help", "privesc"]) - - assert result.exit_code == 0 - assert "AzureFox Help :: privesc" in result.stdout - assert "Cloud Instance Metadata API" in result.stdout - assert "workload identity pivots" in result.stdout - assert "starting_foothold" in result.stdout - assert "operator_signal" in result.stdout - assert "proven_path" in result.stdout - assert "next_review" in result.stdout - - -def test_help_command_role_trusts_topic() -> None: - result = runner.invoke(app, ["help", "role-trusts"]) - - assert result.exit_code == 0 - assert "AzureFox Help :: role-trusts" in result.stdout - assert "Trusted Relationship" in result.stdout - assert "federated credentials" in result.stdout - assert "delegated or admin consent grants" in result.stdout - assert "Fast mode is the default" in result.stdout - assert "per-application owner and federated credential lookups" in result.stdout - assert "usable_identity_result" in result.stdout - assert "escalation_mechanism" in result.stdout - assert "operator_signal" in result.stdout - assert "next_review" in result.stdout - - -def test_help_command_auth_policies_topic() -> None: - result = runner.invoke(app, ["help", "auth-policies"]) - - assert result.exit_code == 0 - assert "AzureFox Help :: auth-policies" in result.stdout - assert "Conditional Access" in result.stdout - assert "guest, consent, app-creation, or sign-in abuse paths" in result.stdout - assert "Unreadable policy surfaces stay explicit" in result.stdout - assert "issues" in result.stdout - - -def test_help_command_keyvault_topic() -> None: - result = runner.invoke(app, ["help", "keyvault"]) - - assert result.exit_code == 0 - assert "AzureFox Help :: keyvault" in result.stdout - assert "secret-management surface" in result.stdout - assert "purge_protection_enabled" in result.stdout - - -def test_help_command_resource_trusts_topic() -> None: - result = runner.invoke(app, ["help", "resource-trusts"]) - - assert result.exit_code == 0 - assert "AzureFox Help :: resource-trusts" in result.stdout - assert "public network paths" in result.stdout - assert "resource_type" in result.stdout - - -def test_help_command_workloads_topic() -> None: - result = runner.invoke(app, ["help", "workloads"]) - - assert result.exit_code == 0 - assert "AzureFox Help :: workloads" in result.stdout - assert "joined workload census" in result.stdout - assert "identity_type" in result.stdout - assert "ingress_paths" in result.stdout - - def test_help_command_unknown_topic() -> None: result = runner.invoke(app, ["help", "banana"]) diff --git a/tests/test_terminal_ux.py b/tests/test_terminal_ux.py index 5b9d291..db25dcd 100644 --- a/tests/test_terminal_ux.py +++ b/tests/test_terminal_ux.py @@ -609,7 +609,7 @@ def test_principals_table_mode_uses_curated_columns(tmp_path: Path) -> None: assert result.exit_code == 0 assert "identity context" in result.stdout assert "current" in result.stdout - assert "Takeaway: 2 principals visible" in result.stdout + assert "Takeaway: 5 principals visible" in result.stdout def test_arm_deployments_table_mode_surfaces_scope_and_linked_refs(tmp_path: Path) -> None: @@ -763,7 +763,7 @@ def test_permissions_table_mode_surfaces_next_review(tmp_path: Path) -> None: assert "foothold." in normalized_output assert "Check privesc" in result.stdout assert "aa-hybrid-prod-mi" in result.stdout - assert "Takeaway: 2 of 3 principals hold high-impact RBAC roles;" in result.stdout + assert "Takeaway: 5 of 6 principals hold high-impact RBAC roles;" in result.stdout def test_chains_table_mode_surfaces_priority_and_next_review(tmp_path: Path) -> None: diff --git a/tests/truthfulness.py b/tests/truthfulness.py new file mode 100644 index 0000000..cc527c7 --- /dev/null +++ b/tests/truthfulness.py @@ -0,0 +1,82 @@ +from __future__ import annotations + +from collections.abc import Sequence + + +def _field_value(row: object, field: str) -> object: + if isinstance(row, dict): + return row.get(field) + return getattr(row, field, None) + + +def _duplicate_field_values(rows: Sequence[object], field: str) -> list[object]: + seen: set[object] = set() + duplicates: list[object] = [] + for row in rows: + value = _field_value(row, field) + if value is None: + continue + if value in seen and value not in duplicates: + duplicates.append(value) + continue + seen.add(value) + return duplicates + + +def row_by_field(rows: Sequence[object], *, field: str, expected: object) -> object: + matches = [row for row in rows if _field_value(row, field) == expected] + if len(matches) != 1: + raise AssertionError(f"Expected one row with {field}={expected!r}, found {len(matches)}.") + return matches[0] + + +def assert_rows_include(rows: Sequence[object], *, field: str, expected: Sequence[object]) -> None: + duplicates = _duplicate_field_values(rows, field) + if duplicates: + raise AssertionError( + f"Expected unique {field} values, but found duplicates {duplicates!r}." + ) + actual = {_field_value(row, field) for row in rows} + missing = [value for value in expected if value not in actual] + if missing: + raise AssertionError( + f"Expected rows with {field} values {missing!r}, " + f"but actual values were {sorted(actual)!r}." + ) + + +def assert_rows_exclude(rows: Sequence[object], *, field: str, expected: Sequence[object]) -> None: + duplicates = _duplicate_field_values(rows, field) + if duplicates: + raise AssertionError( + f"Expected unique {field} values, but found duplicates {duplicates!r}." + ) + actual = {_field_value(row, field) for row in rows} + unexpected = [value for value in expected if value in actual] + if unexpected: + raise AssertionError( + f"Expected rows to exclude {unexpected!r}, but actual values were {sorted(actual)!r}." + ) + + +def assert_issue_collectors_include( + issues: Sequence[object], + *, + expected_collectors: Sequence[str], +) -> None: + collectors: set[str] = set() + for issue in issues: + context = ( + issue.get("context") if isinstance(issue, dict) else getattr(issue, "context", None) + ) + if isinstance(context, dict): + collector = context.get("collector") + if collector: + collectors.add(str(collector)) + + missing = [collector for collector in expected_collectors if collector not in collectors] + if missing: + raise AssertionError( + f"Expected issue collectors {missing!r}, " + f"but actual collectors were {sorted(collectors)!r}." + )