From 9583dc6dddd0eb6d7d3fae98123969def5acf452 Mon Sep 17 00:00:00 2001 From: Colby Farley Date: Sat, 18 Apr 2026 00:03:28 -0500 Subject: [PATCH 1/2] fix: tighten live viewpoint chains shaping --- src/azurefox/chains/compute_control.py | 26 +++-- src/azurefox/chains/runner.py | 147 ++++++++++++++++++++++--- src/azurefox/collectors/commands.py | 1 + src/azurefox/collectors/provider.py | 6 +- src/azurefox/permissions_hints.py | 15 ++- src/azurefox/scope_hints.py | 62 +++++++++++ tests/test_chain_semantics.py | 49 ++++++++- tests/test_cli_smoke.py | 66 +++++++++-- tests/test_collectors.py | 62 +++++++++++ 9 files changed, 394 insertions(+), 40 deletions(-) create mode 100644 src/azurefox/scope_hints.py diff --git a/src/azurefox/chains/compute_control.py b/src/azurefox/chains/compute_control.py index 6709596..190cc86 100644 --- a/src/azurefox/chains/compute_control.py +++ b/src/azurefox/chains/compute_control.py @@ -6,6 +6,7 @@ from azurefox.chains.semantics import semantic_priority_sort_value, semantic_urgency_sort_value from azurefox.models.chains import ChainPathRecord from azurefox.models.common import CollectionIssue +from azurefox.scope_hints import permission_scope_phrase _HIGH_IMPACT_ROLE_NAMES = { "owner", @@ -478,10 +479,13 @@ def _assignment_control_summary( if not high_impact_roles: return None - scopes = {str(item.get("scope_id") or "") for item in assignments if item.get("scope_id")} - scope_count = len(scopes) - scope_text = "subscription-wide scope" if scope_count <= 1 else f"{scope_count} visible scopes" - return f"{', '.join(high_impact_roles)} across {scope_text}" + scopes = sorted( + {str(item.get("scope_id") or "") for item in assignments if item.get("scope_id")} + ) + return ( + f"{', '.join(high_impact_roles)} " + f"{permission_scope_phrase(scopes, scope_count=len(scopes))}" + ) def _build_compute_control_record( @@ -892,11 +896,17 @@ def _permission_control_summary(permission_row: dict | None) -> str | None: roles = [str(role) for role in permission_row.get("high_impact_roles") or [] if role] role_text = ", ".join(roles) or "high-impact roles" - scope_count = int( - permission_row.get("scope_count") or len(permission_row.get("scope_ids") or []) or 0 + return ( + f"{role_text} " + f"{permission_scope_phrase( + list(permission_row.get('scope_ids') or []), + scope_count=int( + permission_row.get('scope_count') + or len(permission_row.get('scope_ids') or []) + or 0 + ), + )}" ) - scope_text = "subscription-wide scope" if scope_count <= 1 else f"{scope_count} visible scopes" - return f"{role_text} across {scope_text}" def _has_public_compute_signal(workload_row: dict) -> bool: diff --git a/src/azurefox/chains/runner.py b/src/azurefox/chains/runner.py index b1da1d7..6a2db86 100644 --- a/src/azurefox/chains/runner.py +++ b/src/azurefox/chains/runner.py @@ -36,9 +36,26 @@ ChainPathRecord, ChainsOutput, ) -from azurefox.models.commands import ChainsCommandOutput +from azurefox.models.commands import ( + AksOutput, + AppServicesOutput, + ArmDeploymentsOutput, + AutomationOutput, + ChainsCommandOutput, + DatabasesOutput, + DevopsOutput, + EnvVarsOutput, + FunctionsOutput, + KeyVaultOutput, + PermissionsOutput, + RbacOutput, + RoleTrustsOutput, + StorageOutput, + TokensCredentialsOutput, +) from azurefox.models.common import ArmDeploymentSummary, CollectionIssue, CommandMetadata from azurefox.registry import get_command_specs +from azurefox.scope_hints import permission_scope_description, permission_scope_phrase from azurefox.target_matching import ( normalize_exact_target_host, normalize_exact_target_resource_id, @@ -165,11 +182,102 @@ def _collect_family_outputs( for source in family.source_commands: collector = collector_by_name[source.command] - loaded[source.command] = collector(provider, options) + try: + loaded[source.command] = collector(provider, options) + except Exception as exc: + loaded[source.command] = _empty_chain_source_output( + command=source.command, + provider=provider, + options=options, + exc=exc, + ) return loaded +def _empty_chain_source_output( + *, + command: str, + provider: BaseProvider, + options: GlobalOptions, + exc: Exception, +): + issue = _chain_source_issue(command, exc) + context = provider.metadata_context() + model_class = _empty_chain_source_model(command) + payload = { + "metadata": CommandMetadata( + command=command, + tenant_id=options.tenant or context.get("tenant_id"), + subscription_id=options.subscription or context.get("subscription_id"), + devops_organization=options.devops_organization, + token_source=context.get("token_source"), + auth_mode=context.get("auth_mode"), + ), + **_empty_chain_source_fields(command, issue, options), + } + return model_class.model_validate(payload) + + +def _empty_chain_source_fields( + command: str, + issue: dict[str, object], + options: GlobalOptions, +) -> dict[str, object]: + fields_by_command: dict[str, dict[str, object]] = { + "devops": {"pipelines": [], "issues": [issue]}, + "automation": {"automation_accounts": [], "issues": [issue]}, + "permissions": {"permissions": [], "issues": [issue]}, + "rbac": {"principals": [], "scopes": [], "role_assignments": [], "issues": [issue]}, + "role-trusts": {"mode": options.role_trusts_mode, "trusts": [], "issues": [issue]}, + "keyvault": {"key_vaults": [], "issues": [issue]}, + "arm-deployments": {"deployments": [], "issues": [issue]}, + "app-services": {"app_services": [], "issues": [issue]}, + "functions": {"function_apps": [], "issues": [issue]}, + "aks": {"aks_clusters": [], "issues": [issue]}, + "env-vars": {"env_vars": [], "issues": [issue]}, + "tokens-credentials": {"surfaces": [], "issues": [issue]}, + "databases": {"database_servers": [], "issues": [issue]}, + "storage": {"storage_assets": [], "issues": [issue]}, + } + try: + return fields_by_command[command] + except KeyError as exc: + raise ValueError(f"Missing empty grouped-source shape for '{command}'") from exc + + +def _empty_chain_source_model(command: str): + models_by_command = { + "devops": DevopsOutput, + "automation": AutomationOutput, + "permissions": PermissionsOutput, + "rbac": RbacOutput, + "role-trusts": RoleTrustsOutput, + "keyvault": KeyVaultOutput, + "arm-deployments": ArmDeploymentsOutput, + "app-services": AppServicesOutput, + "functions": FunctionsOutput, + "aks": AksOutput, + "env-vars": EnvVarsOutput, + "tokens-credentials": TokensCredentialsOutput, + "databases": DatabasesOutput, + "storage": StorageOutput, + } + try: + return models_by_command[command] + except KeyError as exc: + raise ValueError(f"Missing empty grouped-source model for '{command}'") from exc + + +def _chain_source_issue(command: str, exc: Exception) -> dict[str, object]: + return { + "kind": str(getattr(exc, "kind", "unknown")), + "message": str(exc), + "scope": str(getattr(exc, "command", None) or command), + "context": {"collector": command}, + } + + def _build_credential_path_output( provider: BaseProvider, options: GlobalOptions, @@ -1565,15 +1673,17 @@ def _automation_permission_clause(source: dict) -> str | None: return None roles = [str(role) for role in permission.get("high_impact_roles") or [] if role] role_text = ", ".join(roles) or "high-impact RBAC" - scope_count = int(permission.get("scope_count") or 0) - scope_text = "subscription-wide scope" if scope_count <= 1 else f"{scope_count} visible scopes" + scope_text = permission_scope_phrase( + list(permission.get("scope_ids") or []), + scope_count=int(permission.get("scope_count") or 0), + ) principal_name = str( permission.get("display_name") or source.get("name") or permission.get("principal_id") or "automation identity" ) - return f"Azure identity '{principal_name}' already has {role_text} across {scope_text}" + return f"Azure identity '{principal_name}' already has {role_text} {scope_text}" def _automation_role_trust_clause(source: dict) -> str | None: @@ -1618,8 +1728,10 @@ def _devops_permission_clause(source: dict) -> str | None: return None roles = [str(role) for role in permission.get("high_impact_roles") or [] if role] role_text = ", ".join(roles) or "high-impact RBAC" - scope_count = int(permission.get("scope_count") or 0) - scope_text = "subscription-wide scope" if scope_count <= 1 else f"{scope_count} visible scopes" + scope_text = permission_scope_phrase( + list(permission.get("scope_ids") or []), + scope_count=int(permission.get("scope_count") or 0), + ) principal_name = str( permission.get("display_name") or permission.get("principal_id") @@ -1627,7 +1739,7 @@ def _devops_permission_clause(source: dict) -> str | None: ) return ( f"This pipeline runs as Azure identity '{principal_name}', which already has " - f"{role_text} across {scope_text}" + f"{role_text} {scope_text}" ) @@ -3666,12 +3778,12 @@ def _merge_related_ids(*groups: list[str]) -> list[str]: def _permission_scope_text(permission_row: dict | None) -> str: if not permission_row: return "visible scope" - scope_count = int( - permission_row.get("scope_count") or len(permission_row.get("scope_ids") or []) or 0 + return permission_scope_description( + list(permission_row.get("scope_ids") or []), + scope_count=int( + permission_row.get("scope_count") or len(permission_row.get("scope_ids") or []) or 0 + ), ) - if scope_count <= 1: - return "subscription-wide scope" - return f"{scope_count} visible scopes" def _permission_control_summary(permission_row: dict | None) -> str: @@ -3679,8 +3791,13 @@ def _permission_control_summary(permission_row: dict | None) -> str: return "Potential stronger Azure control; exact privilege not yet confirmed" roles = [str(role) for role in permission_row.get("high_impact_roles") or [] if role] role_text = ", ".join(roles) or "high-impact roles" - scope_text = _permission_scope_text(permission_row) - return f"{role_text} across {scope_text}" + scope_text = permission_scope_phrase( + list(permission_row.get("scope_ids") or []), + scope_count=int( + permission_row.get("scope_count") or len(permission_row.get("scope_ids") or []) or 0 + ), + ) + return f"{role_text} {scope_text}" def _permission_adds_net_value( diff --git a/src/azurefox/collectors/commands.py b/src/azurefox/collectors/commands.py index 91d0725..6b91cfc 100644 --- a/src/azurefox/collectors/commands.py +++ b/src/azurefox/collectors/commands.py @@ -1335,6 +1335,7 @@ def _enrich_permission_rows(permissions: list[dict], principals: list[dict]) -> principal_name=item.get("display_name") or item.get("principal_id") or "unknown", principal_type=item.get("principal_type", "unknown"), high_impact_roles=[str(value) for value in item.get("high_impact_roles") or []], + scope_ids=[str(value) for value in item.get("scope_ids") or [] if value], scope_count=_int_or_zero(item.get("scope_count")) or len(item.get("scope_ids") or []), privileged=privileged, is_current_identity=is_current_identity, diff --git a/src/azurefox/collectors/provider.py b/src/azurefox/collectors/provider.py index a55310b..2e49fd3 100644 --- a/src/azurefox/collectors/provider.py +++ b/src/azurefox/collectors/provider.py @@ -1875,6 +1875,7 @@ def ensure_record(principal_id: str) -> dict: "tenant_id": None, "sources": [], "scope_ids": [], + "assignment_scope_ids": [], "role_names": [], "role_assignment_count": 0, "identity_names": [], @@ -1903,6 +1904,7 @@ def ensure_record(principal_id: str) -> dict: _append_unique(record["role_names"], role_name) if scope_id: _append_unique(record["scope_ids"], scope_id) + _append_unique(record["assignment_scope_ids"], scope_id) record["role_assignment_count"] += 1 principal_type = assignment.get("principal_type") if principal_type: @@ -1950,7 +1952,9 @@ def permissions(self) -> dict: for principal in principal_data.get("principals", []): role_names = sorted(set(principal.get("role_names", []))) - scope_ids = sorted(set(principal.get("scope_ids", []))) + scope_ids = sorted( + set(principal.get("assignment_scope_ids") or principal.get("scope_ids", [])) + ) high_impact_roles = sorted( { role_name diff --git a/src/azurefox/permissions_hints.py b/src/azurefox/permissions_hints.py index e26308f..7b0845f 100644 --- a/src/azurefox/permissions_hints.py +++ b/src/azurefox/permissions_hints.py @@ -1,5 +1,7 @@ from __future__ import annotations +from azurefox.scope_hints import permission_scope_phrase + def permissions_operator_signal( *, @@ -66,6 +68,7 @@ def permissions_summary( principal_name: str, principal_type: str, high_impact_roles: list[str], + scope_ids: list[str], scope_count: int, privileged: bool, is_current_identity: bool, @@ -81,36 +84,36 @@ def permissions_summary( ) role_text = ", ".join(high_impact_roles) or "high-impact roles" - scope_text = "subscription-wide" if scope_count <= 1 else f"{scope_count} visible scopes" + scope_text = permission_scope_phrase(scope_ids, scope_count=scope_count) if is_current_identity: return ( f"Current identity '{principal_name}' already has direct control visible through " - f"{role_text} across {scope_text}. {next_review}" + f"{role_text} {scope_text}. {next_review}" ) if has_workload_pivot: return ( f"{principal_type} '{principal_name}' already has direct control visible through " - f"{role_text} across {scope_text}, and current scope also shows a workload pivot. " + f"{role_text} {scope_text}, and current scope also shows a workload pivot. " f"{next_review}" ) if workload_visibility_blocked: return ( f"{principal_type} '{principal_name}' already has direct control visible through " - f"{role_text} across {scope_text}, but the backing workload pivot stays visibility " + f"{role_text} {scope_text}, but the backing workload pivot stays visibility " f"blocked from current scope. {next_review}" ) if trust_expansion_follow_on: return ( f"{principal_type} '{principal_name}' already has direct control visible through " - f"{role_text} across {scope_text}. The next useful question is trust expansion, not " + f"{role_text} {scope_text}. The next useful question is trust expansion, not " f"more privilege ranking. {next_review}" ) return ( f"{principal_type} '{principal_name}' already has direct control visible through " - f"{role_text} across {scope_text}. {next_review}" + f"{role_text} {scope_text}. {next_review}" ) diff --git a/src/azurefox/scope_hints.py b/src/azurefox/scope_hints.py new file mode 100644 index 0000000..a998dd0 --- /dev/null +++ b/src/azurefox/scope_hints.py @@ -0,0 +1,62 @@ +from __future__ import annotations + + +def permission_scope_description( + scope_ids: list[object] | None, + *, + scope_count: int | None = None, +) -> str: + cleaned = _clean_scope_ids(scope_ids) + if len(cleaned) > 1: + return f"{len(cleaned)} visible scopes" + if len(cleaned) == 1: + return _describe_scope_id(cleaned[0]) + if scope_count and scope_count > 1: + return f"{scope_count} visible scopes" + return "visible scope" + + +def permission_scope_phrase( + scope_ids: list[object] | None, + *, + scope_count: int | None = None, +) -> str: + cleaned = _clean_scope_ids(scope_ids) + if len(cleaned) > 1: + return f"across {len(cleaned)} visible scopes" + if len(cleaned) == 1: + scope_description = _describe_scope_id(cleaned[0]) + if scope_description == "subscription-wide scope": + return f"across {scope_description}" + return f"on {scope_description}" + if scope_count and scope_count > 1: + return f"across {scope_count} visible scopes" + return "on visible scope" + + +def _clean_scope_ids(scope_ids: list[object] | None) -> list[str]: + cleaned: list[str] = [] + for value in scope_ids or []: + text = str(value or "").strip().rstrip("/") + if text and text not in cleaned: + cleaned.append(text) + return cleaned + + +def _describe_scope_id(scope_id: str) -> str: + parts = [part for part in scope_id.split("/") if part] + lower_parts = [part.lower() for part in parts] + + if len(parts) == 2 and lower_parts[0] == "subscriptions": + return "subscription-wide scope" + + if "resourcegroups" in lower_parts: + index = lower_parts.index("resourcegroups") + if index + 1 < len(parts): + if len(parts) == index + 2: + return f"resource group '{parts[index + 1]}'" + return f"resource '{parts[-1]}'" + + if parts: + return f"scope '{parts[-1]}'" + return "visible scope" diff --git a/tests/test_chain_semantics.py b/tests/test_chain_semantics.py index f36e89b..8230825 100644 --- a/tests/test_chain_semantics.py +++ b/tests/test_chain_semantics.py @@ -25,7 +25,8 @@ semantic_urgency_sort_value, ) from azurefox.config import GlobalOptions -from azurefox.models.common import OutputMode, PermissionSummary, RoleTrustSummary +from azurefox.models.commands import PermissionsOutput, RoleTrustsOutput +from azurefox.models.common import CommandMetadata, OutputMode, PermissionSummary, RoleTrustSummary def test_credential_path_semantics_promote_named_match() -> None: @@ -86,6 +87,52 @@ def test_chain_semantics_have_default_path_for_other_families() -> None: assert "deployment evidence" in decision.next_review +def test_escalation_direct_control_uses_single_rg_scope_text() -> None: + options = GlobalOptions( + tenant=None, + subscription="sub-test", + output=OutputMode.JSON, + outdir=Path("/tmp/azurefox-escalation-scope-text"), + debug=False, + ) + output = _build_escalation_path_output( + options, + "escalation-path", + { + "permissions": PermissionsOutput( + metadata=CommandMetadata(command="permissions", subscription_id="sub-test"), + permissions=[ + PermissionSummary( + principal_id="current-sp", + display_name="current-sp", + principal_type="ServicePrincipal", + priority="high", + high_impact_roles=["Contributor"], + all_role_names=["Contributor"], + role_assignment_count=1, + scope_count=1, + scope_ids=["/subscriptions/sub-test/resourceGroups/rg-workload"], + privileged=True, + is_current_identity=True, + ) + ], + ), + "role-trusts": RoleTrustsOutput( + metadata=CommandMetadata(command="role-trusts", subscription_id="sub-test"), + trusts=[], + issues=[], + ), + }, + ) + + direct_row = next( + row for row in output.paths if row.path_concept == "current-foothold-direct-control" + ) + + assert direct_row.stronger_outcome == "Contributor on resource group 'rg-workload'" + assert "Contributor on resource group 'rg-workload'" in (direct_row.why_care or "") + + def test_compute_control_semantics_promote_direct_token_opportunity() -> None: decision = evaluate_chain_semantics( ChainSemanticContext( diff --git a/tests/test_cli_smoke.py b/tests/test_cli_smoke.py index b9e7eb9..faf7853 100644 --- a/tests/test_cli_smoke.py +++ b/tests/test_cli_smoke.py @@ -5,9 +5,12 @@ import shutil from pathlib import Path +import pytest from typer.testing import CliRunner from azurefox.cli import app +from azurefox.errors import AzureFoxError, ErrorKind +from azurefox.registry import CommandSpec, get_command_specs runner = CliRunner() @@ -485,9 +488,9 @@ def test_cli_smoke_chains_compute_control_json(tmp_path: Path) -> None: assert aca_row["compute_foothold"] == "aca-orders" assert aca_row["token_path"] == "service token request" assert aca_row["identity"] == "aca-orders system identity" - assert aca_row["azure_access"] == "Contributor across subscription-wide scope" + assert aca_row["azure_access"] == "Contributor on resource group 'rg-containers'" assert aca_row["proof_status"] == "confirmed" - assert aca_row["stronger_outcome"] == "Contributor across subscription-wide scope" + assert aca_row["stronger_outcome"] == "Contributor on resource group 'rg-containers'" assert "ContainerApp 'aca-orders' can request tokens as aca-orders system identity" in aca_row[ "note" ] @@ -504,9 +507,9 @@ def test_cli_smoke_chains_compute_control_json(tmp_path: Path) -> None: assert aci_row["compute_foothold"] == "aci-public-api" assert aci_row["token_path"] == "service token request" assert aci_row["identity"] == "aci-public-api system identity" - assert aci_row["azure_access"] == "Contributor across subscription-wide scope" + assert aci_row["azure_access"] == "Contributor on resource group 'rg-apps'" assert aci_row["proof_status"] == "confirmed" - assert aci_row["stronger_outcome"] == "Contributor across subscription-wide scope" + assert aci_row["stronger_outcome"] == "Contributor on resource group 'rg-apps'" assert ( "ContainerInstance 'aci-public-api' can request tokens as aci-public-api system identity" in aci_row["note"] @@ -524,9 +527,9 @@ def test_cli_smoke_chains_compute_control_json(tmp_path: Path) -> None: assert app_row["compute_foothold"] == "app-empty-mi" assert app_row["token_path"] == "service token request" assert app_row["identity"] == "app-empty-mi-system" - assert app_row["azure_access"] == "Contributor across subscription-wide scope" + assert app_row["azure_access"] == "Contributor on resource group 'rg-apps'" assert app_row["proof_status"] == "confirmed" - assert app_row["stronger_outcome"] == "Contributor across subscription-wide scope" + assert app_row["stronger_outcome"] == "Contributor on resource group 'rg-apps'" assert "can request tokens as app-empty-mi-system" in app_row["note"] assert "Check app-services for the running service foothold" in app_row["next_review"] @@ -540,7 +543,7 @@ def test_cli_smoke_chains_compute_control_json(tmp_path: Path) -> None: assert func_row["compute_foothold"] == "func-orders" assert func_row["token_path"] == "service token request" assert func_row["identity"] == "func-orders-system" - assert func_row["azure_access"] == "Contributor across subscription-wide scope" + assert func_row["azure_access"] == "Contributor on resource group 'rg-apps'" assert func_row["proof_status"] == "best current match" assert func_row["target_names"] == ["func-orders-system"] assert func_row["target_resolution"] == "identity-choice-corroborated" @@ -581,9 +584,9 @@ def test_cli_smoke_chains_compute_control_json(tmp_path: Path) -> None: assert vmss_row["compute_foothold"] == "vmss-edge-01" assert vmss_row["token_path"] == "VM metadata token" assert vmss_row["identity"] == "vmss-edge-01-system" - assert vmss_row["azure_access"] == "Contributor across subscription-wide scope" + assert vmss_row["azure_access"] == "Contributor on resource group 'rg-workload'" assert vmss_row["proof_status"] == "confirmed" - assert vmss_row["stronger_outcome"] == "Contributor across subscription-wide scope" + assert vmss_row["stronger_outcome"] == "Contributor on resource group 'rg-workload'" assert "host-level execution or admin access" in vmss_row["note"] assert "Check vmss for the fleet foothold" in vmss_row["next_review"] @@ -760,6 +763,51 @@ def test_cli_smoke_chains_escalation_path_contributor_view_table_output(tmp_path assert "That would add Owner-level Azure control" in normalized_output +@pytest.mark.parametrize( + ("family", "failing_command", "expected_issue_scope"), + ( + ("deployment-path", "devops", "devops"), + ("escalation-path", "role-trusts", "role-trusts"), + ), +) +def test_cli_smoke_chains_family_keeps_emitting_when_supporting_collector_fails( + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, + family: str, + failing_command: str, + expected_issue_scope: str, +) -> None: + fixture_dir = Path(__file__).resolve().parent / "fixtures" / "lab_tenant" + + def _failing_collector(*_args, **_kwargs): + raise AzureFoxError( + ErrorKind.PERMISSION_DENIED, + f"{failing_command}: synthetic failure for grouped-family regression", + command=failing_command, + ) + + patched_specs = tuple( + CommandSpec(spec.name, spec.section, _failing_collector) + if spec.name == failing_command + else spec + for spec in get_command_specs() + ) + monkeypatch.setattr("azurefox.chains.runner.get_command_specs", lambda: patched_specs) + + result = runner.invoke( + app, + ["--outdir", str(tmp_path), "--output", "json", "chains", family], + env={"AZUREFOX_FIXTURE_DIR": str(fixture_dir)}, + ) + + assert result.exit_code == 0 + payload = json.loads(result.stdout) + assert payload["family"] == family + assert any(issue["scope"] == expected_issue_scope for issue in payload["issues"]) + assert (tmp_path / "json" / "chains.json").exists() + assert (tmp_path / "loot" / "chains.json").exists() + + def test_cli_smoke_chains_compute_control_table_output(tmp_path: Path) -> None: fixture_dir = Path(__file__).resolve().parent / "fixtures" / "lab_tenant" diff --git a/tests/test_collectors.py b/tests/test_collectors.py index e4df6e7..6965333 100644 --- a/tests/test_collectors.py +++ b/tests/test_collectors.py @@ -3828,6 +3828,68 @@ def test_collect_permissions(fixture_provider, options) -> None: ) +def test_permissions_current_identity_uses_assignment_scopes_not_whoami_scope() -> None: + options = GlobalOptions( + tenant=None, + subscription="sub-test", + output=OutputMode.JSON, + outdir=Path("/tmp"), + debug=False, + role_trusts_mode=RoleTrustsMode.FAST, + ) + provider = object.__new__(AzureProvider) + provider.options = options + provider.whoami = lambda: { + "tenant_id": "tenant-test", + "subscription": {"id": "sub-test", "display_name": "Test Sub", "state": "Enabled"}, + "principal": { + "id": "current-sp-object", + "principal_type": "ServicePrincipal", + "display_name": "current-sp", + "tenant_id": "tenant-test", + }, + "effective_scopes": [ + { + "id": "/subscriptions/sub-test", + "scope_type": "subscription", + "display_name": "Test Sub", + } + ], + "issues": [], + } + provider.rbac = lambda: { + "principals": [ + { + "id": "current-sp-object", + "principal_type": "ServicePrincipal", + "display_name": "current-sp", + "tenant_id": "tenant-test", + } + ], + "role_assignments": [ + { + "id": "ra-current-rg", + "scope_id": "/subscriptions/sub-test/resourceGroups/rg-workload", + "principal_id": "current-sp-object", + "principal_type": "ServicePrincipal", + "role_definition_id": "b24988ac-6180-42a0-ab88-20f7382dd24c", + "role_name": "Contributor", + } + ], + "scopes": [], + "issues": [], + } + provider.managed_identities = lambda: {"identities": [], "issues": []} + + permissions = AzureProvider.permissions(provider) + current_row = permissions["permissions"][0] + + assert current_row["is_current_identity"] is True + assert current_row["role_assignment_count"] == 1 + assert current_row["scope_count"] == 1 + assert current_row["scope_ids"] == ["/subscriptions/sub-test/resourceGroups/rg-workload"] + + def test_collect_permissions_prefers_workload_pivot_then_trust_expansion() -> None: class StubProvider: def permissions(self) -> dict: From 45f35fd010d8c143a0c2c30e59e3a6e09f7a3273 Mon Sep 17 00:00:00 2001 From: Colby Farley Date: Sat, 18 Apr 2026 00:06:43 -0500 Subject: [PATCH 2/2] fix: restore python 3.11 chains formatting --- src/azurefox/chains/compute_control.py | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/src/azurefox/chains/compute_control.py b/src/azurefox/chains/compute_control.py index 190cc86..7e43a28 100644 --- a/src/azurefox/chains/compute_control.py +++ b/src/azurefox/chains/compute_control.py @@ -896,17 +896,13 @@ def _permission_control_summary(permission_row: dict | None) -> str | None: roles = [str(role) for role in permission_row.get("high_impact_roles") or [] if role] role_text = ", ".join(roles) or "high-impact roles" - return ( - f"{role_text} " - f"{permission_scope_phrase( - list(permission_row.get('scope_ids') or []), - scope_count=int( - permission_row.get('scope_count') - or len(permission_row.get('scope_ids') or []) - or 0 - ), - )}" + scope_phrase = permission_scope_phrase( + list(permission_row.get("scope_ids") or []), + scope_count=int( + permission_row.get("scope_count") or len(permission_row.get("scope_ids") or []) or 0 + ), ) + return f"{role_text} {scope_phrase}" def _has_public_compute_signal(workload_row: dict) -> bool: