Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions schemas/chains.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
"command_state",
"summary",
"claim_boundary",
"current_gap",
"artifact_preference_order",
"backing_commands",
"source_artifacts",
Expand Down
61 changes: 27 additions & 34 deletions src/azurefox/chains/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
)
from azurefox.chains.registry import (
GROUPED_COMMAND_NAME,
ChainFamilySpec,
get_chain_family_spec,
implemented_chain_family_names,
is_implemented_chain_family,
Expand Down Expand Up @@ -151,23 +152,10 @@ def _build_credential_path_output(
)
)

return ChainsCommandOutput(
metadata=CommandMetadata(
command=GROUPED_COMMAND_NAME,
tenant_id=options.tenant,
subscription_id=options.subscription,
devops_organization=options.devops_organization,
token_source=None,
),
grouped_command_name=GROUPED_COMMAND_NAME,
family=family_name,
input_mode="live",
command_state="extraction-only",
summary=family.summary,
claim_boundary=family.allowed_claim,
artifact_preference_order=[],
backing_commands=[source.command for source in family.source_commands],
source_artifacts=[],
return _build_chains_command_output(
options=options,
family=family,
family_name=family_name,
paths=paths,
issues=issues,
)
Expand Down Expand Up @@ -352,23 +340,10 @@ def _build_deployment_path_output(
):
issues.extend(getattr(loaded[source_name], "issues", []))

return ChainsCommandOutput(
metadata=CommandMetadata(
command=GROUPED_COMMAND_NAME,
tenant_id=options.tenant,
subscription_id=options.subscription,
devops_organization=options.devops_organization,
token_source=None,
),
grouped_command_name=GROUPED_COMMAND_NAME,
family=family_name,
input_mode="live",
command_state="extraction-only",
summary=family.summary,
claim_boundary=family.allowed_claim,
artifact_preference_order=[],
backing_commands=[source.command for source in family.source_commands],
source_artifacts=[],
return _build_chains_command_output(
options=options,
family=family,
family_name=family_name,
paths=paths,
issues=issues,
)
Expand Down Expand Up @@ -437,6 +412,23 @@ def _build_escalation_path_output(
for source_name in ("privesc", "permissions", "role-trusts"):
issues.extend(getattr(loaded[source_name], "issues", []))

return _build_chains_command_output(
options=options,
family=family,
family_name=family_name,
paths=paths,
issues=issues,
)


def _build_chains_command_output(
*,
options: GlobalOptions,
family: ChainFamilySpec,
family_name: str,
paths: list[ChainPathRecord],
issues: list[CollectionIssue],
) -> ChainsCommandOutput:
return ChainsCommandOutput(
metadata=CommandMetadata(
command=GROUPED_COMMAND_NAME,
Expand All @@ -451,6 +443,7 @@ def _build_escalation_path_output(
command_state="extraction-only",
summary=family.summary,
claim_boundary=family.allowed_claim,
current_gap=family.current_gap,
artifact_preference_order=[],
backing_commands=[source.command for source in family.source_commands],
source_artifacts=[],
Expand Down
2 changes: 2 additions & 0 deletions src/azurefox/collectors/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -3679,6 +3679,7 @@ def _issue_from_exception(area: str, exc: Exception) -> dict:
return {
"kind": classify_exception(exc).value,
"message": f"{area}: {exc}",
"scope": area,
"context": {"collector": area},
}

Expand Down Expand Up @@ -8310,6 +8311,7 @@ def _partial_collection_issue(
return {
"kind": ErrorKind.PARTIAL_COLLECTION.value,
"message": f"{area}: {message}",
"scope": area,
"context": context,
}

Expand Down
18 changes: 17 additions & 1 deletion src/azurefox/help.py
Original file line number Diff line number Diff line change
Expand Up @@ -1111,6 +1111,8 @@ class SectionHelpTopic:
output_highlights=(
"family selectors",
"backing_commands",
"claim_boundary",
"current_gap",
"target_resolution",
"priority",
"target_names",
Expand Down Expand Up @@ -1363,6 +1365,14 @@ def _render_root_help() -> str:
"still landing."
),
" - ATT&CK references are investigative context, not proof that a technique occurred.",
(
" - Default output prefers exact claims when proven, and bounded weaker claims "
"when they stay honest and decision-useful."
),
(
" - When visibility is partial, AzureFox says what current scope did not "
"confirm instead of treating a path as proven."
),
]
)
return "\n".join(lines)
Expand Down Expand Up @@ -1438,7 +1448,13 @@ def _render_command_help(topic: CommandHelpTopic) -> str:
"Example:",
f" {topic.example}",
"",
"Note: ATT&CK references are leads to investigate, not proof of observed behavior.",
"Notes:",
" - ATT&CK references are leads to investigate, not proof of observed behavior.",
" - Output wording keeps proof strength separate from actionability.",
(
" - If current visibility is partial, AzureFox keeps only the honest weaker "
"claim and names the current gap explicitly."
),
]
)
return "\n".join(lines)
Expand Down
1 change: 1 addition & 0 deletions src/azurefox/models/chains.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ class ChainsOutput(BaseModel):
command_state: str
summary: str
claim_boundary: str
current_gap: str | None = None
artifact_preference_order: list[str] = Field(default_factory=list)
backing_commands: list[str] = Field(default_factory=list)
source_artifacts: list[ChainSourceArtifact] = Field(default_factory=list)
Expand Down
12 changes: 11 additions & 1 deletion src/azurefox/models/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from datetime import UTC, datetime
from enum import StrEnum

from pydantic import BaseModel, Field
from pydantic import BaseModel, Field, model_validator

SCHEMA_VERSION = "1.3.0"

Expand Down Expand Up @@ -34,8 +34,18 @@ class CommandMetadata(BaseModel):
class CollectionIssue(BaseModel):
kind: str
message: str
scope: str | None = None
context: dict[str, str] = Field(default_factory=dict)

@model_validator(mode="after")
def _populate_scope_from_context(self) -> CollectionIssue:
if self.scope:
return self
collector = self.context.get("collector")
if collector:
self.scope = collector
return self


class SubscriptionRef(BaseModel):
id: str
Expand Down
98 changes: 88 additions & 10 deletions src/azurefox/render/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def render_table(command: str, payload: dict) -> str:
_render_devops_table(console, payload)
elif command == "role-trusts":
_render_role_trusts_table(console, payload)
elif command == "chains" and payload.get("families"):
elif command == "chains" and "families" in payload:
_render_chains_overview_table(console, payload)
elif command == "chains" and str(payload.get("family") or "") == "deployment-path":
_render_deployment_path_table(console, payload)
Expand All @@ -33,7 +33,7 @@ def render_table(command: str, payload: dict) -> str:

if not records:
table.add_column("info")
table.add_row("No records")
table.add_row(_empty_state_message(command, payload))
else:
for _key, label in columns:
table.add_column(label)
Expand All @@ -57,13 +57,15 @@ def render_table(command: str, payload: dict) -> str:
issues = payload.get("issues", [])
if issues:
console.print("")
console.print("Credential-scope issues:")
console.print("Current-scope issues:")
for issue in issues[:5]:
kind = issue.get("kind") or "unknown"
console.print(f"- {kind}: {issue.get('message')}", markup=False)
remaining = len(issues) - 5
if remaining > 0:
console.print(f"- ... plus {remaining} more credential-scope issues in JSON artifacts.")
console.print(f"- ... plus {remaining} more current-scope issues in JSON artifacts.")

_render_scope_boundary_notes(console, command, payload)

takeaway = _takeaway_for_command(command, payload)
if takeaway:
Expand All @@ -80,7 +82,7 @@ def _render_devops_table(console: Console, payload: dict) -> None:
if not records:
table = Table(title="azurefox devops")
table.add_column("info")
table.add_row("No records")
table.add_row(_empty_state_message("devops", payload))
console.print(table)
return

Expand All @@ -105,7 +107,7 @@ def _render_role_trusts_table(console: Console, payload: dict) -> None:
if not records:
table = Table(title="azurefox role-trusts")
table.add_column("info")
table.add_row("No records")
table.add_row(_empty_state_message("role-trusts", payload))
console.print(table)
return

Expand Down Expand Up @@ -149,7 +151,7 @@ def _render_chains_path_table(
if not records:
table = Table(title="azurefox chains")
table.add_column("info")
table.add_row("No records")
table.add_row(_empty_state_message("chains", payload))
console.print(table)
return

Expand All @@ -174,7 +176,7 @@ def _render_chains_overview_table(console: Console, payload: dict) -> None:

if not records:
table.add_column("info")
table.add_row("No records")
table.add_row(_empty_state_message("chains", payload))
console.print(table)
return

Expand All @@ -184,6 +186,19 @@ def _render_chains_overview_table(console: Console, payload: dict) -> None:
table.add_row(*[_value_to_string(record.get(key)) for key, _ in columns])
console.print(table)

boundary_table = Table(title="chains claim boundaries")
boundary_table.add_column("family")
boundary_table.add_column("allowed claim")
boundary_table.add_column("current gap")
for item in payload.get("families", []):
boundary_table.add_row(
_value_to_string(item.get("family")),
_value_to_string(item.get("allowed_claim")),
_value_to_string(item.get("current_gap")),
)
console.print("")
console.print(boundary_table)


def _table_spec(command: str, payload: dict) -> tuple[list[tuple[str, str]], list[dict]]:
if command == "whoami":
Expand Down Expand Up @@ -599,7 +614,7 @@ def _table_spec(command: str, payload: dict) -> tuple[list[tuple[str, str]], lis
)

if command == "chains":
if payload.get("families"):
if "families" in payload:
return (
[
("family", "family"),
Expand Down Expand Up @@ -1226,7 +1241,7 @@ def _takeaway_for_command(command: str, payload: dict) -> str:
issues = payload.get("issues", [])
return (
f"{len(policies)} policy rows, {len(findings)} findings, and "
f"{len(issues)} credential-scope issues visible from current credentials."
f"{len(issues)} current-scope issues."
)

if command == "permissions":
Expand Down Expand Up @@ -3023,3 +3038,66 @@ def _role_trust_visible_transform(item: dict) -> str | None:
return escalation_mechanism

return None


def _empty_state_message(command: str, payload: dict) -> str:
if command == "chains" and "families" in payload:
return "No chain families are currently registered."

if command == "chains":
family = str(payload.get("family") or "")
family_subjects = {
"credential-path": "credential paths",
"deployment-path": "deployment paths",
"escalation-path": "escalation paths",
"workload-identity-path": "workload-identity paths",
}
subject = family_subjects.get(family, "chain rows")
return f"No visible {subject} were confirmed from current scope."

subjects = {
"acr": "container registries",
"aks": "AKS clusters",
"api-mgmt": "API Management services",
"app-services": "App Service apps",
"application-gateway": "Application Gateways",
"arm-deployments": "ARM deployments",
"auth-policies": "authentication policy rows",
"automation": "Automation accounts",
"cross-tenant": "cross-tenant signals",
"databases": "database servers",
"devops": "Azure DevOps build definitions",
"dns": "DNS zones",
"endpoints": "reachable surfaces",
"env-vars": "environment-variable signals",
"functions": "Function Apps",
"keyvault": "Key Vaults",
"lighthouse": "Azure Lighthouse delegations",
"managed-identities": "managed identities",
"network-effective": "reachability rows",
"network-ports": "port-exposure rows",
"permissions": "permission rows",
"principals": "principals",
"resource-trusts": "resource trust surfaces",
"role-trusts": "role trust edges",
"storage": "storage accounts",
"tokens-credentials": "token or credential surfaces",
}
subject = subjects.get(command, f"{command} rows")
return f"No visible {subject} were confirmed from current scope."


def _render_scope_boundary_notes(console: Console, command: str, payload: dict) -> None:
if command != "chains":
return

claim_boundary = str(payload.get("claim_boundary") or "").strip()
current_gap = str(payload.get("current_gap") or "").strip()
if not claim_boundary and not current_gap:
return

console.print("")
if claim_boundary:
console.print(f"Claim boundary: {claim_boundary}")
if current_gap:
console.print(f"Current gap: {current_gap}")
6 changes: 6 additions & 0 deletions tests/test_cli_smoke.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ def test_cli_smoke_chains_credential_path_json(tmp_path: Path) -> None:
assert payload["metadata"]["command"] == "chains"
assert payload["family"] == "credential-path"
assert payload["command_state"] == "extraction-only"
assert payload["claim_boundary"].startswith("Can claim that the visible evidence suggests")
assert payload["current_gap"].startswith("The live family now joins backing evidence")
assert payload["artifact_preference_order"] == []
assert payload["source_artifacts"] == []
assert payload["backing_commands"] == [
Expand Down Expand Up @@ -148,6 +150,8 @@ def test_cli_smoke_chains_credential_path_table_output(tmp_path: Path) -> None:
normalized_output = " ".join(result.stdout.split())
assert "narrowed" in normalized_output
assert "candidates" in normalized_output
assert "Claim boundary:" in result.stdout
assert "Current gap:" in result.stdout
assert "Takeaway: 3 visible credential paths" in result.stdout


Expand Down Expand Up @@ -365,6 +369,8 @@ def test_cli_smoke_chains_overview_table_output(tmp_path: Path) -> None:

assert result.exit_code == 0
assert "azurefox chains" in result.stdout
assert "allowed claim" in result.stdout
assert "current gap" in result.stdout
assert "credential-path" in result.stdout
assert "deployment-path" in result.stdout
assert "escalation-path" in result.stdout
Expand Down
Loading
Loading