Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 14 additions & 8 deletions src/azurefox/chains/compute_control.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from azurefox.chains.semantics import semantic_priority_sort_value, semantic_urgency_sort_value
from azurefox.models.chains import ChainPathRecord
from azurefox.models.common import CollectionIssue
from azurefox.scope_hints import permission_scope_phrase

_HIGH_IMPACT_ROLE_NAMES = {
"owner",
Expand Down Expand Up @@ -478,10 +479,13 @@ def _assignment_control_summary(
if not high_impact_roles:
return None

scopes = {str(item.get("scope_id") or "") for item in assignments if item.get("scope_id")}
scope_count = len(scopes)
scope_text = "subscription-wide scope" if scope_count <= 1 else f"{scope_count} visible scopes"
return f"{', '.join(high_impact_roles)} across {scope_text}"
scopes = sorted(
{str(item.get("scope_id") or "") for item in assignments if item.get("scope_id")}
)
return (
f"{', '.join(high_impact_roles)} "
f"{permission_scope_phrase(scopes, scope_count=len(scopes))}"
)


def _build_compute_control_record(
Expand Down Expand Up @@ -892,11 +896,13 @@ def _permission_control_summary(permission_row: dict | None) -> str | None:

roles = [str(role) for role in permission_row.get("high_impact_roles") or [] if role]
role_text = ", ".join(roles) or "high-impact roles"
scope_count = int(
permission_row.get("scope_count") or len(permission_row.get("scope_ids") or []) or 0
scope_phrase = permission_scope_phrase(
list(permission_row.get("scope_ids") or []),
scope_count=int(
permission_row.get("scope_count") or len(permission_row.get("scope_ids") or []) or 0
),
)
scope_text = "subscription-wide scope" if scope_count <= 1 else f"{scope_count} visible scopes"
return f"{role_text} across {scope_text}"
return f"{role_text} {scope_phrase}"


def _has_public_compute_signal(workload_row: dict) -> bool:
Expand Down
147 changes: 132 additions & 15 deletions src/azurefox/chains/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,26 @@
ChainPathRecord,
ChainsOutput,
)
from azurefox.models.commands import ChainsCommandOutput
from azurefox.models.commands import (
AksOutput,
AppServicesOutput,
ArmDeploymentsOutput,
AutomationOutput,
ChainsCommandOutput,
DatabasesOutput,
DevopsOutput,
EnvVarsOutput,
FunctionsOutput,
KeyVaultOutput,
PermissionsOutput,
RbacOutput,
RoleTrustsOutput,
StorageOutput,
TokensCredentialsOutput,
)
from azurefox.models.common import ArmDeploymentSummary, CollectionIssue, CommandMetadata
from azurefox.registry import get_command_specs
from azurefox.scope_hints import permission_scope_description, permission_scope_phrase
from azurefox.target_matching import (
normalize_exact_target_host,
normalize_exact_target_resource_id,
Expand Down Expand Up @@ -165,11 +182,102 @@ def _collect_family_outputs(

for source in family.source_commands:
collector = collector_by_name[source.command]
loaded[source.command] = collector(provider, options)
try:
loaded[source.command] = collector(provider, options)
except Exception as exc:
loaded[source.command] = _empty_chain_source_output(
command=source.command,
provider=provider,
options=options,
exc=exc,
)

return loaded


def _empty_chain_source_output(
*,
command: str,
provider: BaseProvider,
options: GlobalOptions,
exc: Exception,
):
issue = _chain_source_issue(command, exc)
context = provider.metadata_context()
model_class = _empty_chain_source_model(command)
payload = {
"metadata": CommandMetadata(
command=command,
tenant_id=options.tenant or context.get("tenant_id"),
subscription_id=options.subscription or context.get("subscription_id"),
devops_organization=options.devops_organization,
token_source=context.get("token_source"),
auth_mode=context.get("auth_mode"),
),
**_empty_chain_source_fields(command, issue, options),
}
return model_class.model_validate(payload)


def _empty_chain_source_fields(
command: str,
issue: dict[str, object],
options: GlobalOptions,
) -> dict[str, object]:
fields_by_command: dict[str, dict[str, object]] = {
"devops": {"pipelines": [], "issues": [issue]},
"automation": {"automation_accounts": [], "issues": [issue]},
"permissions": {"permissions": [], "issues": [issue]},
"rbac": {"principals": [], "scopes": [], "role_assignments": [], "issues": [issue]},
"role-trusts": {"mode": options.role_trusts_mode, "trusts": [], "issues": [issue]},
"keyvault": {"key_vaults": [], "issues": [issue]},
"arm-deployments": {"deployments": [], "issues": [issue]},
"app-services": {"app_services": [], "issues": [issue]},
"functions": {"function_apps": [], "issues": [issue]},
"aks": {"aks_clusters": [], "issues": [issue]},
"env-vars": {"env_vars": [], "issues": [issue]},
"tokens-credentials": {"surfaces": [], "issues": [issue]},
"databases": {"database_servers": [], "issues": [issue]},
"storage": {"storage_assets": [], "issues": [issue]},
}
try:
return fields_by_command[command]
except KeyError as exc:
raise ValueError(f"Missing empty grouped-source shape for '{command}'") from exc


def _empty_chain_source_model(command: str):
models_by_command = {
"devops": DevopsOutput,
"automation": AutomationOutput,
"permissions": PermissionsOutput,
"rbac": RbacOutput,
"role-trusts": RoleTrustsOutput,
"keyvault": KeyVaultOutput,
"arm-deployments": ArmDeploymentsOutput,
"app-services": AppServicesOutput,
"functions": FunctionsOutput,
"aks": AksOutput,
"env-vars": EnvVarsOutput,
"tokens-credentials": TokensCredentialsOutput,
"databases": DatabasesOutput,
"storage": StorageOutput,
}
try:
return models_by_command[command]
except KeyError as exc:
raise ValueError(f"Missing empty grouped-source model for '{command}'") from exc


def _chain_source_issue(command: str, exc: Exception) -> dict[str, object]:
return {
"kind": str(getattr(exc, "kind", "unknown")),
"message": str(exc),
"scope": str(getattr(exc, "command", None) or command),
"context": {"collector": command},
}


def _build_credential_path_output(
provider: BaseProvider,
options: GlobalOptions,
Expand Down Expand Up @@ -1565,15 +1673,17 @@ def _automation_permission_clause(source: dict) -> str | None:
return None
roles = [str(role) for role in permission.get("high_impact_roles") or [] if role]
role_text = ", ".join(roles) or "high-impact RBAC"
scope_count = int(permission.get("scope_count") or 0)
scope_text = "subscription-wide scope" if scope_count <= 1 else f"{scope_count} visible scopes"
scope_text = permission_scope_phrase(
list(permission.get("scope_ids") or []),
scope_count=int(permission.get("scope_count") or 0),
)
principal_name = str(
permission.get("display_name")
or source.get("name")
or permission.get("principal_id")
or "automation identity"
)
return f"Azure identity '{principal_name}' already has {role_text} across {scope_text}"
return f"Azure identity '{principal_name}' already has {role_text} {scope_text}"


def _automation_role_trust_clause(source: dict) -> str | None:
Expand Down Expand Up @@ -1618,16 +1728,18 @@ def _devops_permission_clause(source: dict) -> str | None:
return None
roles = [str(role) for role in permission.get("high_impact_roles") or [] if role]
role_text = ", ".join(roles) or "high-impact RBAC"
scope_count = int(permission.get("scope_count") or 0)
scope_text = "subscription-wide scope" if scope_count <= 1 else f"{scope_count} visible scopes"
scope_text = permission_scope_phrase(
list(permission.get("scope_ids") or []),
scope_count=int(permission.get("scope_count") or 0),
)
principal_name = str(
permission.get("display_name")
or permission.get("principal_id")
or "the Azure identity tied to this pipeline"
)
return (
f"This pipeline runs as Azure identity '{principal_name}', which already has "
f"{role_text} across {scope_text}"
f"{role_text} {scope_text}"
)


Expand Down Expand Up @@ -3666,21 +3778,26 @@ def _merge_related_ids(*groups: list[str]) -> list[str]:
def _permission_scope_text(permission_row: dict | None) -> str:
if not permission_row:
return "visible scope"
scope_count = int(
permission_row.get("scope_count") or len(permission_row.get("scope_ids") or []) or 0
return permission_scope_description(
list(permission_row.get("scope_ids") or []),
scope_count=int(
permission_row.get("scope_count") or len(permission_row.get("scope_ids") or []) or 0
),
)
if scope_count <= 1:
return "subscription-wide scope"
return f"{scope_count} visible scopes"


def _permission_control_summary(permission_row: dict | None) -> str:
if not permission_row:
return "Potential stronger Azure control; exact privilege not yet confirmed"
roles = [str(role) for role in permission_row.get("high_impact_roles") or [] if role]
role_text = ", ".join(roles) or "high-impact roles"
scope_text = _permission_scope_text(permission_row)
return f"{role_text} across {scope_text}"
scope_text = permission_scope_phrase(
list(permission_row.get("scope_ids") or []),
scope_count=int(
permission_row.get("scope_count") or len(permission_row.get("scope_ids") or []) or 0
),
)
return f"{role_text} {scope_text}"


def _permission_adds_net_value(
Expand Down
1 change: 1 addition & 0 deletions src/azurefox/collectors/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -1335,6 +1335,7 @@ def _enrich_permission_rows(permissions: list[dict], principals: list[dict]) ->
principal_name=item.get("display_name") or item.get("principal_id") or "unknown",
principal_type=item.get("principal_type", "unknown"),
high_impact_roles=[str(value) for value in item.get("high_impact_roles") or []],
scope_ids=[str(value) for value in item.get("scope_ids") or [] if value],
scope_count=_int_or_zero(item.get("scope_count")) or len(item.get("scope_ids") or []),
privileged=privileged,
is_current_identity=is_current_identity,
Expand Down
6 changes: 5 additions & 1 deletion src/azurefox/collectors/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -1875,6 +1875,7 @@ def ensure_record(principal_id: str) -> dict:
"tenant_id": None,
"sources": [],
"scope_ids": [],
"assignment_scope_ids": [],
"role_names": [],
"role_assignment_count": 0,
"identity_names": [],
Expand Down Expand Up @@ -1903,6 +1904,7 @@ def ensure_record(principal_id: str) -> dict:
_append_unique(record["role_names"], role_name)
if scope_id:
_append_unique(record["scope_ids"], scope_id)
_append_unique(record["assignment_scope_ids"], scope_id)
record["role_assignment_count"] += 1
principal_type = assignment.get("principal_type")
if principal_type:
Expand Down Expand Up @@ -1950,7 +1952,9 @@ def permissions(self) -> dict:

for principal in principal_data.get("principals", []):
role_names = sorted(set(principal.get("role_names", [])))
scope_ids = sorted(set(principal.get("scope_ids", [])))
scope_ids = sorted(
set(principal.get("assignment_scope_ids") or principal.get("scope_ids", []))
)
high_impact_roles = sorted(
{
role_name
Expand Down
15 changes: 9 additions & 6 deletions src/azurefox/permissions_hints.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from __future__ import annotations

from azurefox.scope_hints import permission_scope_phrase


def permissions_operator_signal(
*,
Expand Down Expand Up @@ -66,6 +68,7 @@ def permissions_summary(
principal_name: str,
principal_type: str,
high_impact_roles: list[str],
scope_ids: list[str],
scope_count: int,
privileged: bool,
is_current_identity: bool,
Expand All @@ -81,36 +84,36 @@ def permissions_summary(
)

role_text = ", ".join(high_impact_roles) or "high-impact roles"
scope_text = "subscription-wide" if scope_count <= 1 else f"{scope_count} visible scopes"
scope_text = permission_scope_phrase(scope_ids, scope_count=scope_count)

if is_current_identity:
return (
f"Current identity '{principal_name}' already has direct control visible through "
f"{role_text} across {scope_text}. {next_review}"
f"{role_text} {scope_text}. {next_review}"
)

if has_workload_pivot:
return (
f"{principal_type} '{principal_name}' already has direct control visible through "
f"{role_text} across {scope_text}, and current scope also shows a workload pivot. "
f"{role_text} {scope_text}, and current scope also shows a workload pivot. "
f"{next_review}"
)

if workload_visibility_blocked:
return (
f"{principal_type} '{principal_name}' already has direct control visible through "
f"{role_text} across {scope_text}, but the backing workload pivot stays visibility "
f"{role_text} {scope_text}, but the backing workload pivot stays visibility "
f"blocked from current scope. {next_review}"
)

if trust_expansion_follow_on:
return (
f"{principal_type} '{principal_name}' already has direct control visible through "
f"{role_text} across {scope_text}. The next useful question is trust expansion, not "
f"{role_text} {scope_text}. The next useful question is trust expansion, not "
f"more privilege ranking. {next_review}"
)

return (
f"{principal_type} '{principal_name}' already has direct control visible through "
f"{role_text} across {scope_text}. {next_review}"
f"{role_text} {scope_text}. {next_review}"
)
Loading
Loading