diff --git a/.github/pull_request_template.md b/.github/pull_request_template.md index 5bc3c91..e5f5c1c 100644 --- a/.github/pull_request_template.md +++ b/.github/pull_request_template.md @@ -18,3 +18,4 @@ - [ ] Focused, single-purpose change - [ ] No secrets introduced +- [ ] Non-operator docs were kept out of the repo unless they are required for build, validation, release, install, or packaging diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index eec20e7..16b19cc 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -33,18 +33,30 @@ pytest -m integration ## Semantics + Contracts +- Before changing output wording, proof language, help text, or artifact behavior that could apply + across the HarrierOps family, read: + - the shared family style guide + - the shared family style guide applicability register - Keep command boundaries stable. - Use normalized models under `src/azurefox/models/`. - Keep JSON output deterministic and schema-compatible. +- Keep family-wide truthfulness rules, claim-strength rules, and partial-read rules aligned with + the shared family documents instead of re-inventing them in repo-local prose. - Update schemas with `python scripts/generate_schemas.py`. +- If AzureFox needs a documented wording or contract exception, update the applicability register in + the same change instead of leaving the exception implied only by local docs, help text, or tests. ## Documentation Boundaries - Keep live operator guidance in `azurefox help`, `azurefox help `, `README.md`, and curated `docs/` content. -- Treat `wiki/` as source material for intentional long-form wiki pages, not as a catch-all for internal planning notes. -- Do not add new command-by-command planning trees under `wiki/` unless they are actively maintained and have a clear published destination. -- If a note is temporary design scratch work, keep it out of the main repo or remove it once the implementation lands. -- If planning material becomes durable user-facing documentation, promote it into a maintained docs/wiki page instead of leaving README links pointed at planning notes. +- Keep non-operator documentation out of the repo unless the file is directly needed for build, + validation, release, install, or packaging flow. +- Treat maintainer-only planning, drift-control, and family-governance notes as external reference + material rather than repo content. +- If a temporary note or governance reminder lands locally during implementation, remove it before + merge unless it is truly required for the repo to build, validate, release, install, or package. +- If planning material becomes durable user-facing documentation, promote it into maintained + operator documentation instead of leaving internal notes in the repo. ## Lightweight Guardrails (Solo) @@ -56,3 +68,5 @@ pytest -m integration - Local pre-push hook blocks `codex` branch names, blocks direct pushes to `main`, and runs lint/tests. - CI blocks Codex-branded PR titles. - Temporary bypass for emergency push: `AZUREFOX_ALLOW_MAIN_PUSH=1 git push`. +- Before merge, remove any non-operator doc that is not required for build, validation, release, + install, or packaging. diff --git a/docs/output-contracts.md b/docs/output-contracts.md deleted file mode 100644 index 0574078..0000000 --- a/docs/output-contracts.md +++ /dev/null @@ -1,38 +0,0 @@ -# Output Contracts - -Each command output is represented by a versioned Pydantic model and is rendered into table/JSON/CSV from the same model instance. - -AzureFox also writes a `loot/` artifact for each command, but `loot` is not the same thing as the -full JSON contract. - -## Schema Version - -- Current: `1.3.0` - -## Contract Rules - -- JSON output is deterministic (`sort_keys=True` serialization). -- JSON output is the full structured command contract and authoritative backing record. -- `loot` is the smaller operator-facing handoff for fast follow-up and later chain-oriented - workflows. -- `loot` should focus on the top-ranked high-value targets from a command, rather than mirror the - full JSON record. -- `loot` may omit context-only metadata, omit empty informational sections, and cap the primary - target list to the top-ranked rows while JSON keeps the full list. -- Table output must not contain fields that are absent from the JSON contract. -- Each command schema is stored under `schemas/.schema.json`. -- Fixture snapshots under `tests/golden/` are regression baselines. - -## Milestone 1 Models - -- `WhoAmIOutput` -- `InventoryOutput` -- `RbacOutput` -- `PrincipalsOutput` -- `PermissionsOutput` -- `PrivescOutput` -- `RoleTrustsOutput` -- `AuthPoliciesOutput` -- `ManagedIdentitiesOutput` -- `StorageOutput` -- `VmsOutput` diff --git a/src/azurefox/chains/credential_path.py b/src/azurefox/chains/credential_path.py index 4e93069..fb6dab1 100644 --- a/src/azurefox/chains/credential_path.py +++ b/src/azurefox/chains/credential_path.py @@ -363,9 +363,11 @@ def _build_candidate_record( target_resolution=target_resolution, visibility_note=visibility_note, ), - missing_confirmation=( - f"AzureFox has not yet shown which {target_service} this setting reaches or whether " - "the credential works there." + missing_confirmation=_candidate_missing_confirmation( + target_service=target_service, + target_resolution=target_resolution, + target_count=len(target_ids), + visibility_issue=visibility_issue, ), related_ids=_merge_related_ids( env.get("related_ids", []), @@ -451,29 +453,75 @@ def _candidate_confidence_boundary( ) -> str: if target_resolution == "visibility blocked": return ( - f"AzureFox cannot name the downstream {target_service} under current target-side " - "visibility; do not treat this as a confirmed credential path." + f"Current scope does not confirm which downstream {target_service} target this " + "setting reaches." ) if target_resolution == "narrowed candidates": candidate_count = max(len(target_names), 1) + candidate_text = "candidate" if candidate_count == 1 else "candidates" return ( - f"AzureFox narrowed this to {candidate_count} visible {target_service} candidate(s), " - "but has not yet proved the exact target or a working credential." + f"AzureFox narrowed this to {candidate_count} visible {target_service} " + f"{candidate_text}, but the loaded evidence does not name the exact target, so this " + "setting is not confirmed to reach it." ) if target_resolution == "tenant-wide candidates": return ( f"AzureFox can only narrow this to a broad visible {target_service} set so far; " - "the exact target and working credential remain unconfirmed." + "the loaded evidence does not name the exact target, so this setting is not " + "confirmed to reach a specific downstream target." ) if target_resolution == "service hint only": return ( - f"AzureFox only has a service hint for this {target_service} path so far; the " - "downstream target and working credential remain unconfirmed." + f"AzureFox only has a service hint for this {target_service} path so far; no " + "concrete downstream target is visible, so this setting is not confirmed to reach a " + "specific downstream target." ) + return ( + f"AzureFox has not yet proved the exact downstream {target_service} target, so this " + "setting is not confirmed to reach a specific downstream target." + ) + + +def _candidate_missing_confirmation( + *, + target_service: str, + target_resolution: str, + target_count: int, + visibility_issue: str | None, +) -> str: + if target_resolution == "visibility blocked" or visibility_issue: + return ( + f"Current scope does not confirm which {target_service} target this setting reaches. " + "AzureFox also has not proved a working credential there." + ) + if target_resolution == "narrowed candidates": + if target_count == 1: + return ( + f"Current env-vars and token surfaces narrow this to one visible " + f"{target_service} candidate, but they do not name the exact downstream target. " + "AzureFox also has not proved a working credential there." + ) + return ( + f"Current env-vars and token surfaces narrow this to {target_count} visible " + f"{target_service} candidates, but they do not show which one this setting reaches. " + "AzureFox also has not proved a working credential against any listed target." + ) + if target_resolution == "tenant-wide candidates": + return ( + f"Current evidence only narrows this to a broad visible {target_service} set and " + "does not name the exact downstream target. AzureFox also has not proved a working " + "credential there." + ) + if target_resolution == "service hint only": + return ( + f"Current evidence suggests a {target_service} path, but no concrete downstream " + "target is visible from current inventory and AzureFox has not proved a working " + "credential." + ) return ( f"AzureFox has not yet proved the exact downstream {target_service} target or a working " "credential." diff --git a/src/azurefox/chains/registry.py b/src/azurefox/chains/registry.py index 4c43369..616d827 100644 --- a/src/azurefox/chains/registry.py +++ b/src/azurefox/chains/registry.py @@ -40,8 +40,8 @@ class ChainFamilySpec: ), allowed_claim=( "Can claim that the visible evidence suggests a likely credential path and name the " - "most plausible downstream service. Cannot claim the credential works or that the path " - "is confirmed without deeper source evidence." + "most plausible downstream service. Cannot claim the exact downstream target or that " + "the setting is confirmed to reach it." ), current_gap=( "The live family now joins backing evidence in one run, but it still needs periodic " @@ -142,8 +142,8 @@ class ChainFamilySpec: allowed_claim=( "Can claim that the visible evidence suggests a controllable or nearly controllable " "Azure change path and can name or narrow the likely downstream footprint when the " - "join is honest. Cannot claim successful execution or exact downstream change without " - "deeper source evidence." + "join is honest. Cannot claim successful execution or the exact downstream Azure " + "change from current visible evidence alone." ), current_gap=( "The live family still needs stronger source-side actionability proof and tighter " diff --git a/src/azurefox/chains/runner.py b/src/azurefox/chains/runner.py index d7c3dd0..0a3cb30 100644 --- a/src/azurefox/chains/runner.py +++ b/src/azurefox/chains/runner.py @@ -1,6 +1,8 @@ from __future__ import annotations +import re from collections import defaultdict +from dataclasses import replace from azurefox.chains.credential_path import collect_credential_path_records from azurefox.chains.deployment_path import ( @@ -84,6 +86,31 @@ "8e3af657-a8ff-443c-a75c-2fe8c4bcb635", # Owner "b24988ac-6180-42a0-ab88-20f7382dd24c", # Contributor } +_AUTOMATION_TARGET_FAMILY_ORDER = ( + "app-services", + "functions", + "aks", +) +_AUTOMATION_TARGET_FAMILY_TOKENS = { + "app-services": {"app", "api", "site", "web"}, + "functions": {"func", "function", "functions", "webjob"}, + "aks": {"aks", "cluster", "k8s", "kube", "kubernetes"}, +} +_AUTOMATION_NAME_TOKEN_STOPWORDS = { + "account", + "apply", + "baseline", + "config", + "configure", + "maintenance", + "nightly", + "reapply", + "reconcile", + "redeploy", + "rotate", + "runbook", + "sync", +} def implemented_chain_families() -> tuple[str, ...]: @@ -295,6 +322,26 @@ def _build_deployment_path_output( assessment = assess_deployment_source(account) if assessment.posture == "insufficient evidence": continue + automation_target_mapping = _best_automation_target_mapping( + account_dict, + target_candidates=target_candidates, + target_visibility_notes=target_visibility_notes, + target_visibility_issues=target_visibility_issues, + arm_correlations=arm_correlations, + ) + if automation_target_mapping is None: + automation_target_mapping = { + "target_family": "arm-deployments", + "target_candidates": [], + "exact_targets": [], + "confirmation_basis": None, + "target_visibility_note": target_visibility_notes["arm-deployments"], + "target_visibility_issue": ( + target_visibility_issues["arm-deployments"] + or "current automation surface does not name downstream Azure targets" + ), + "supporting_deployments": [], + } record = _build_deployment_source_record( family_name, source=account_dict, @@ -302,16 +349,13 @@ def _build_deployment_path_output( source_context=account.identity_type, asset_kind="AutomationAccount", assessment=assessment, - target_family="arm-deployments", - target_candidates=[], - exact_targets=[], - confirmation_basis=None, - target_visibility_note=target_visibility_notes["arm-deployments"], - target_visibility_issue=( - target_visibility_issues["arm-deployments"] - or "current automation surface does not name downstream Azure targets" - ), - supporting_deployments=[], + target_family=automation_target_mapping["target_family"], + target_candidates=automation_target_mapping["target_candidates"], + exact_targets=automation_target_mapping["exact_targets"], + confirmation_basis=automation_target_mapping["confirmation_basis"], + target_visibility_note=automation_target_mapping["target_visibility_note"], + target_visibility_issue=automation_target_mapping["target_visibility_issue"], + supporting_deployments=automation_target_mapping["supporting_deployments"], ) if record is not None: paths.append(record) @@ -469,8 +513,14 @@ def _build_deployment_source_record( supporting_deployments: list[dict], ) -> ChainPathRecord | None: target_spec = _DEPLOYMENT_TARGET_SPECS[target_family] + record_missing_target_mapping = ( + assessment.missing_target_mapping and not exact_targets and not target_candidates + ) + record_assessment = replace(assessment, missing_target_mapping=record_missing_target_mapping) + record_source = dict(source) + record_source["missing_target_mapping"] = record_missing_target_mapping admission = admit_deployment_path_row( - assessment, + record_assessment, exact_target_count=len(exact_targets), narrowed_candidate_count=len(target_candidates), confirmation_basis=confirmation_basis, @@ -502,20 +552,24 @@ def _build_deployment_source_record( semantic = evaluate_chain_semantics( ChainSemanticContext( family=family_name, - clue_type=assessment.path_concept or source_command, + clue_type=record_assessment.path_concept or source_command, target_service=target_spec["service"], target_resolution=admission.state, target_count=len(target_ids), source_command=source_command, - path_concept=assessment.path_concept, - current_operator_can_drive=_source_current_operator_can_drive(source_command, source), - current_operator_can_inject=_source_current_operator_can_inject(source_command, source), + path_concept=record_assessment.path_concept, + current_operator_can_drive=_source_current_operator_can_drive( + source_command, record_source + ), + current_operator_can_inject=_source_current_operator_can_inject( + source_command, record_source + ), ) ) semantic_priority = _deployment_priority_override( source_command=source_command, - source=source, - path_concept=assessment.path_concept, + source=record_source, + path_concept=record_assessment.path_concept, semantic_priority=semantic.priority, ) @@ -531,72 +585,76 @@ def _build_deployment_source_record( location=source.get("location"), source_command=source_command, source_context=source_context, - clue_type=assessment.path_concept or source_command, + clue_type=record_assessment.path_concept or source_command, confirmation_basis=record_confirmation_basis, priority=semantic_priority, urgency=semantic.urgency, actionability_state=_deployment_actionability_state( source_command=source_command, - source=source, - path_concept=assessment.path_concept, + source=record_source, + path_concept=record_assessment.path_concept, target_resolution=admission.state, - missing_target_mapping=assessment.missing_target_mapping, + missing_target_mapping=record_assessment.missing_target_mapping, ), visible_path=_deployment_visible_path( source_command, - assessment.path_concept, + record_assessment.path_concept, target_spec["label"], ), insertion_point=_deployment_insertion_point( source_command=source_command, - source=source, - path_concept=assessment.path_concept, + source=record_source, + path_concept=record_assessment.path_concept, ), - path_concept=assessment.path_concept, + path_concept=record_assessment.path_concept, primary_injection_surface=( - str(source.get("primary_injection_surface")) - if source.get("primary_injection_surface") + str(record_source.get("primary_injection_surface")) + if record_source.get("primary_injection_surface") else None ), primary_trusted_input_ref=( - str(source.get("primary_trusted_input_ref")) - if source.get("primary_trusted_input_ref") + str(record_source.get("primary_trusted_input_ref")) + if record_source.get("primary_trusted_input_ref") else None ), why_care=_deployment_why_care( source_command, - source, - assessment=assessment, + record_source, + assessment=record_assessment, ), likely_impact=_deployment_likely_impact( target_label=target_spec["label"], target_names=target_names, target_resolution=admission.state, - missing_target_mapping=assessment.missing_target_mapping, + missing_target_mapping=record_assessment.missing_target_mapping, ), confidence_boundary=_deployment_confidence_boundary( source_command=source_command, - source=source, + source=record_source, target_label=target_spec["label"], target_resolution=admission.state, confirmation_basis=record_confirmation_basis, - current_operator_can_drive=_source_current_operator_can_drive(source_command, source), - current_operator_can_inject=_source_current_operator_can_inject(source_command, source), - missing_target_mapping=assessment.missing_target_mapping, + current_operator_can_drive=_source_current_operator_can_drive( + source_command, record_source + ), + current_operator_can_inject=_source_current_operator_can_inject( + source_command, record_source + ), + missing_target_mapping=record_assessment.missing_target_mapping, ), target_service=target_spec["service"], target_resolution=admission.state, evidence_commands=_deployment_evidence_commands( source_command, - source, + record_source, target_family, supporting_deployments=supporting_deployments, ), joined_surface_types=_deployment_joined_surfaces( source_command, - assessment.change_signals, + record_assessment.change_signals, supporting_deployments=supporting_deployments, - source=source, + source=record_source, ), target_count=len(target_ids), target_ids=target_ids, @@ -604,8 +662,8 @@ def _build_deployment_source_record( target_visibility_issue=target_visibility_issue, next_review=_deployment_next_review( source_command=source_command, - source=source, - path_concept=assessment.path_concept, + source=record_source, + path_concept=record_assessment.path_concept, target_family=target_family, target_resolution=admission.state, target_names=target_names, @@ -613,9 +671,9 @@ def _build_deployment_source_record( supporting_deployments=supporting_deployments, ), summary=_deployment_summary( - source=source, + source=record_source, source_command=source_command, - assessment=assessment, + assessment=record_assessment, target_label=target_spec["label"], target_names=target_names, target_resolution=admission.state, @@ -624,21 +682,137 @@ def _build_deployment_source_record( supporting_deployments=supporting_deployments, ), missing_confirmation=_deployment_missing_confirmation( - source=source, + source=record_source, source_command=source_command, - path_concept=assessment.path_concept, + path_concept=record_assessment.path_concept, target_label=target_spec["label"], target_resolution=admission.state, - missing_target_mapping=assessment.missing_target_mapping, + missing_target_mapping=record_assessment.missing_target_mapping, ), related_ids=_merge_related_ids( - source.get("related_ids", []), + record_source.get("related_ids", []), target_ids, *[item.get("related_ids", []) for item in supporting_deployments], ), ) +def _best_automation_target_mapping( + source: dict, + *, + target_candidates: dict[str, list[dict]], + target_visibility_notes: dict[str, str | None], + target_visibility_issues: dict[str, str | None], + arm_correlations: dict[str, list[dict]], +) -> dict[str, object] | None: + best_mapping: dict[str, object] | None = None + best_sort_key: tuple[int, int, int, int, int] | None = None + for family_index, target_family in enumerate(_AUTOMATION_TARGET_FAMILY_ORDER): + exact_targets, narrowed_targets, confirmation_basis = _automation_target_matches( + source, + target_family=target_family, + candidates=target_candidates[target_family], + supporting_deployments=arm_correlations.get(target_family, []), + ) + selected_targets = exact_targets or narrowed_targets + visibility_issue = target_visibility_issues[target_family] + if not selected_targets: + continue + + sort_key = ( + 0 if exact_targets else 1, + len(selected_targets) if selected_targets else 99, + 0 if arm_correlations.get(target_family) else 1, + 0 if visibility_issue else 1, + family_index, + ) + if best_sort_key is not None and sort_key >= best_sort_key: + continue + best_sort_key = sort_key + best_mapping = { + "target_family": target_family, + "target_candidates": narrowed_targets, + "exact_targets": exact_targets, + "confirmation_basis": confirmation_basis, + "target_visibility_note": target_visibility_notes[target_family], + "target_visibility_issue": visibility_issue, + "supporting_deployments": arm_correlations.get(target_family, []), + } + return best_mapping + + +def _automation_target_matches( + source: dict, + *, + target_family: str, + candidates: list[dict], + supporting_deployments: list[dict], +) -> tuple[list[dict], list[dict], str | None]: + runbook_names = _automation_runbook_names(source) + if not runbook_names: + return [], [], None + + normalized_runbook_names = {_normalize_target_name(name) for name in runbook_names} + runbook_tokens = _automation_runbook_tokens(runbook_names) + if not runbook_tokens: + return [], [], None + + exact_targets = [ + dict(candidate) + for candidate in candidates + if _normalize_target_name(str(candidate.get("name") or "")) in normalized_runbook_names + ] + if exact_targets: + confirmation_basis = ( + "same-workload-corroborated" if supporting_deployments else "name-only-inference" + ) + return exact_targets, exact_targets, confirmation_basis + + narrowed_targets = [ + dict(candidate) + for candidate in candidates + if _automation_candidate_overlap_tokens(candidate, runbook_tokens) + ] + if narrowed_targets and supporting_deployments: + return [], narrowed_targets, "same-workload-corroborated" + + return [], [], None + + +def _automation_runbook_names(source: dict) -> list[str]: + names: list[str] = [] + for value in ( + source.get("primary_runbook_name"), + *(source.get("published_runbook_names") or []), + *(source.get("schedule_runbook_names") or []), + *(source.get("webhook_runbook_names") or []), + ): + text = str(value or "").strip() + if text and text not in names: + names.append(text) + return names + + +def _automation_runbook_tokens(runbook_names: list[str]) -> set[str]: + tokens: set[str] = set() + for name in runbook_names: + tokens.update(_automation_name_tokens(name)) + return tokens + + +def _automation_candidate_overlap_tokens(candidate: dict, runbook_tokens: set[str]) -> set[str]: + name_tokens = _automation_name_tokens(str(candidate.get("name") or "")) + return name_tokens & runbook_tokens + + +def _automation_name_tokens(value: str) -> set[str]: + return { + token + for token in re.findall(r"[a-z0-9]+", value.lower()) + if len(token) > 1 and token not in _AUTOMATION_NAME_TOKEN_STOPWORDS + } + + def _deployment_visible_path( source_command: str, path_concept: str | None, @@ -695,6 +869,7 @@ def _deployment_insertion_point( def _deployment_devops_insertion_point(source: dict) -> str: primary_input = _devops_primary_trusted_input(source) trusted_input_text = _devops_trusted_input_text(primary_input) + blocker = _devops_trusted_input_blocker(primary_input) control_mode = _devops_current_operator_control_mode(source, primary_input=primary_input) non_definition_surfaces = _devops_non_definition_injection_surfaces(source) if control_mode == "trusted-input-poison": @@ -706,53 +881,34 @@ def _deployment_devops_insertion_point(source: dict) -> str: if primary_input: access_state = str(primary_input.get("current_operator_access_state") or "") if access_state == "use" and primary_input.get("input_type") == "secure-file": - return ( - f"Queue this pipeline now; {trusted_input_text} is usable in pipeline " - "context, but secure-file administration is still unproven." - ) + return f"Queue this pipeline now; {blocker}." if access_state == "read" and primary_input.get("input_type") == "pipeline-artifact": - return ( - "Queue this pipeline now; the upstream producer behind " - f"{trusted_input_text} is inspectable, but producer control is still " - "unproven." - ) + return f"Queue this pipeline now; {blocker}." if access_state == "read": return f"Queue this pipeline now; {trusted_input_text} is only readable." if access_state == "exists-only": - return ( - f"Queue this pipeline now; {trusted_input_text} is visible, but source control " - "is still unproven." - ) - return "Queue this pipeline now, but source poisoning is still unproven." + return f"Queue this pipeline now; {blocker}." + return f"Queue this pipeline now; {blocker}." if primary_input: input_type = str(primary_input.get("input_type") or "") access_state = str(primary_input.get("current_operator_access_state") or "") if access_state == "use" and primary_input.get("input_type") == "secure-file": - return ( - f"{trusted_input_text} is usable in pipeline context, but secure-file " - "administration is unproven." - ) + return blocker[:1].upper() + blocker[1:] + "." if access_state == "read" and primary_input.get("input_type") == "pipeline-artifact": - return ( - f"The upstream producer behind {trusted_input_text} is inspectable, but " - "producer control is unproven." - ) + return blocker[:1].upper() + blocker[1:] + "." if input_type == "pipeline-artifact": - return ( - f"Artifact trust is visible at {trusted_input_text}, but upstream producer control " - "is unproven." - ) + return blocker[:1].upper() + blocker[1:] + "." if access_state == "read": return ( f"{trusted_input_text} is visible and readable, but not writable from " "current evidence." ) if access_state == "exists-only": - return f"{trusted_input_text} is visible, but current control is unproven." + return blocker[:1].upper() + blocker[1:] + "." return f"Source depends on {trusted_input_text}." return ( - "Azure-facing pipeline is visible, but the source-side insertion point is " - "still unproven." + "Azure-facing pipeline is visible, but current scope does not identify a writable trusted " + "input or definition-edit path." ) @@ -791,15 +947,18 @@ def _deployment_automation_insertion_point(source: dict, *, path_concept: str | if surfaces: if operator_clause: return "; ".join(surfaces) + "." - return "; ".join(surfaces) + ", but current operator control is still unproven." + return ( + "; ".join(surfaces) + + ", but current scope does not show an operator-controlled start or edit path." + ) if path_concept == "secret-escalation-support": return ( - "Reusable automation support is visible, but no operator-controlled run " - "path is proven." + "Reusable automation support is visible, but current scope does not show an " + "operator-controlled run path." ) return ( - "Automation consequences are grounded, but the operator-controlled start " - "or edit path is still unproven." + "Automation consequences are grounded, but current scope does not show an " + "operator-controlled start or edit path." ) @@ -1045,8 +1204,8 @@ def _automation_current_operator_control_clause(source: dict) -> str | None: if primary_runbook and primary_mode == "webhook": return ( f"current role assignment {role_name} at {scope_text} can edit runbook " - f"{primary_runbook} or its webhook-backed execution boundary; AzureFox does not " - "prove possession of the current webhook URI" + f"{primary_runbook} or its webhook-backed execution boundary; current scope does " + "not expose the webhook URI value" ) if primary_runbook and primary_mode == "schedule": return ( @@ -1315,7 +1474,6 @@ def _deployment_why_care( assessment: DeploymentSourceAssessment, ) -> str: source_name = str(source.get("name") or source.get("id") or source_command) - consequence_phrase = _deployment_consequence_phrase(source) support_parts = _deployment_support_phrase_parts(source) support_phrase = ( " and ".join(support_parts) if support_parts else "secret-backed deployment support" @@ -1324,22 +1482,21 @@ def _deployment_why_care( if assessment.path_concept == "secret-escalation-support": if source_command == "devops": sentence = ( - f"This path is not yet a proven attacker-usable Azure change path, but it " - f"concentrates {support_phrase} around an Azure-facing deployment route. " - f"Another foothold that can start or control execution could " - f"{consequence_phrase}." + "This path does not on its own show a current-credential Azure " + "change path, but it " + f"concentrates {support_phrase} around an Azure-facing deployment route." ) elif source_command == "automation": sentence = ( - f"Automation account '{source_name}' is not yet a proven attacker-usable Azure " - f"change path on its own, but it concentrates {support_phrase} around reusable " - f"automation. Another foothold that can start or control execution could " - f"{consequence_phrase}." + f"Automation account '{source_name}' does not on its own show a " + "current-credential Azure change path, but it concentrates " + f"{support_phrase} around reusable " + f"automation." ) else: sentence = ( - "Secret-backed deployment support is visible, but another foothold is still " - "needed before it becomes an attacker-usable Azure change path." + "Secret-backed deployment support is visible, but a separate execution foothold " + "is still needed before it becomes a usable Azure change path." ) current_operator_suffix = _deployment_current_operator_suffix(source_command, source) if current_operator_suffix: @@ -1349,13 +1506,13 @@ def _deployment_why_care( if source_command == "devops": trusted_input = _devops_primary_trusted_input(source) sentence = _devops_why_care_intro(source, trusted_input=trusted_input) - grounded_reach = _devops_grounded_reach_clause(source) + grounded_reach = _deployment_grounded_reach_clause(source) if grounded_reach: sentence = f"{sentence} {grounded_reach}" if support_parts: sentence = ( - f"{sentence} Visible deployment support around this path also includes " + f"{sentence} Additional visible deployment support around this path includes " + " and ".join(support_parts) + "." ) @@ -1398,15 +1555,16 @@ def _deployment_why_care( workers = int(source.get("hybrid_worker_group_count") or 0) surface_parts.append(f"{workers} Hybrid Worker reach point(s)") sentence = ( - f"Automation account '{source_name}' combines " - + ", ".join(surface_parts) - + f", so control of this execution hub could {consequence_phrase} rather than stay " - "at passive automation visibility." + f"Automation account '{source_name}' combines " + ", ".join(surface_parts) + "." ) - if _deployment_support_phrase_parts(source): + grounded_reach = _deployment_grounded_reach_clause(source) + if grounded_reach: + sentence = f"{sentence} {grounded_reach}" + if support_parts: sentence = ( - f"{sentence} Secure assets around the account could widen blast radius once a " - "run path is started or modified." + f"{sentence} Additional visible deployment support around this account includes " + + " and ".join(support_parts) + + "." ) current_operator_suffix = _deployment_current_operator_suffix(source_command, source) if current_operator_suffix: @@ -1429,18 +1587,20 @@ def _devops_why_care_intro(source: dict, *, trusted_input: dict | None) -> str: if control_mode == "definition-edit": return ( f"This path trusts {trusted_input_text}. Current credentials can already edit this " - f"pipeline definition directly. An edited run would use {execution_context} when it " + "pipeline definition directly. The resulting run would use " + f"{execution_context} when it " "makes changes in Azure." ) if _source_current_operator_can_inject("devops", source): return ( - f"This path trusts {trusted_input_text}. Current credentials can already poison " - f"that source. A poisoned run would use {execution_context} when it makes changes " + f"This path trusts {trusted_input_text}. Current credentials can already modify " + "that trusted input. The resulting run would use " + f"{execution_context} when it makes changes " "in Azure." ) return ( - f"This path trusts {trusted_input_text}. If that trusted input becomes " - f"attacker-controlled, a poisoned run would use {execution_context} when it makes " + f"This path trusts {trusted_input_text}. If that trusted input is changed upstream, " + f"the resulting run would use {execution_context} when it makes " "changes in Azure." ) @@ -1509,8 +1669,8 @@ def _deployment_current_operator_suffix(source_command: str, source: dict) -> st return "Current credentials can already edit the pipeline definition directly." if control_mode == "queue-only": return ( - "Current credentials can already queue this pipeline, but AzureFox has not yet " - "proven that they can poison the trusted input." + "Current credentials can already queue this pipeline, but current evidence does " + "not show that they can poison the trusted input." ) access_state = ( str(primary_input.get("current_operator_access_state")) @@ -1549,12 +1709,12 @@ def _deployment_current_operator_suffix(source_command: str, source: dict) -> st return ( "Current evidence only shows that the trusted input exists; " + missing_proof - + " remains unproven." + + " is not shown." ) return "Current evidence only shows that the trusted input exists." if source.get("missing_injection_point"): return ( - "AzureFox has not yet proven " + "AzureFox does not yet show " + _devops_missing_source_control_text(definite=False) + " for current credentials." ) @@ -1677,6 +1837,34 @@ def _devops_missing_trusted_input_proof(input_type: str | None) -> str | None: }.get(str(input_type or "")) +def _devops_trusted_input_blocker(trusted_input: dict | None) -> str: + if trusted_input is None: + return "current scope does not identify a writable trusted input or definition-edit path" + + trusted_input_text = _devops_trusted_input_text(trusted_input) + input_type = str(trusted_input.get("input_type") or "") + access_state = str(trusted_input.get("current_operator_access_state") or "") + visibility_state = str(trusted_input.get("visibility_state") or "") + + if access_state == "read": + if input_type == "pipeline-artifact": + return ( + "current scope shows the upstream producer behind " + f"{trusted_input_text} as readable, not writable" + ) + return f"current scope shows {trusted_input_text} as readable, not writable" + if access_state == "use" and input_type == "secure-file": + return ( + f"current scope shows {trusted_input_text} as usable in pipeline " + "context, but not administrable" + ) + if access_state == "exists-only": + if visibility_state == "external-reference": + return f"current scope only shows the external reference {trusted_input_text}" + return f"current scope only shows that {trusted_input_text} is referenced here" + return "current scope does not identify a writable trusted input or definition-edit path" + + def _deployment_likely_impact( *, target_label: str, @@ -1686,7 +1874,7 @@ def _deployment_likely_impact( ) -> str: lowered_label = target_label.lower() if missing_target_mapping: - return f"Azure footprint not yet mapped; {lowered_label} evidence is consequence grounding" + return f"Azure footprint not yet mapped; visible {lowered_label} clues only" if target_resolution == "named match": return f"exact {lowered_label}: {', '.join(target_names[:_CANDIDATE_LIMIT])}" if target_resolution == "narrowed candidates": @@ -1763,17 +1951,17 @@ def _deployment_confidence_boundary( if target_resolution == "named match": if confirmation_basis == "parsed-config-target": return ( - f"This row proves the exact {target_label} target from parsed source clues, but " - "not current-credential invocation." + f"This row proves the exact {target_label} target from parsed source clues. " + "Current evidence does not show that current credentials can run this path." ) return ( - f"This row proves the exact {target_label} target, but not current-credential " - "invocation." + f"This row proves the exact {target_label} target. Current evidence does not show " + "that current credentials can run this path." ) if target_resolution == "narrowed candidates": return ( - f"This row narrows the likely {target_label} targets, but not current-credential " - "invocation." + f"This row narrows the likely {target_label} targets. Current evidence does not show " + "that current credentials can run this path." ) if target_resolution == "visibility blocked": return ( @@ -1890,11 +2078,9 @@ def _deployment_next_review( target_label: str, supporting_deployments: list[dict], ) -> str: + primary_input = _devops_primary_trusted_input(source) if source_command == "devops" else None if source_command == "automation": - primary_mode = str(source.get("primary_start_mode") or "") or None primary_runbook = str(source.get("primary_runbook_name") or "") or None - permission_clause = _automation_permission_clause(source) - trust_clause = _automation_role_trust_clause(source) current_operator_can_edit = bool( _source_current_operator_can_inject(source_command, source) ) @@ -1902,56 +2088,34 @@ def _deployment_next_review( _source_current_operator_can_drive(source_command, source) ) if path_concept == "secret-escalation-support": - steps = ["Confirm what separate foothold could reuse this secret-backed support"] + steps = [ + "Current evidence does not yet show a current-credential start or control path " + "for this secret-backed support" + ] elif current_operator_can_edit: steps = ["Current RBAC evidence already shows edit-capable automation control here"] elif current_operator_can_start: steps = ["Current RBAC evidence already shows runbook-start control here"] - elif permission_clause: - steps = ["Validate what Azure scope the automation identity can already change"] - else: - steps = ["Check permissions for the automation identity behind this execution path"] - if current_operator_can_edit and primary_runbook and primary_mode == "webhook": - steps.append( - f"map what runbook {primary_runbook} changes because current " - "control does not depend on the webhook URI" - ) - elif current_operator_can_edit and primary_runbook: - steps.append(f"map what runbook {primary_runbook} changes on the Azure side") - elif current_operator_can_start and primary_runbook: - steps.append( - f"confirm whether runbook {primary_runbook} also has an editable " - "trigger or definition path" - ) - elif primary_runbook and primary_mode == "webhook": - steps.append( - "confirm whether current credentials can trigger webhook runbook " - f"{primary_runbook}" - ) - elif primary_runbook and primary_mode == "schedule": - steps.append( - "confirm whether current credentials can influence scheduled " - f"runbook {primary_runbook}" - ) - elif primary_runbook: - steps.append( - f"confirm how runbook {primary_runbook} is started from current " - "credentials" - ) else: - steps.append("confirm which runbook and trigger path performs the Azure change") - if trust_clause: - steps.append("review other identity trust paths around that same Azure identity") - elif source.get("principal_id") or source.get("client_id") or source.get("identity_ids"): - steps.append("review other identity trust paths around the automation identity") + steps = [ + "Current evidence does not show current-credential control of the automation " + "identity behind this execution path" + ] if source.get("missing_target_mapping"): - steps.append( - "use already-loaded ARM deployment evidence as consequence grounding " - "because runbook target mapping is still missing" - ) + if current_operator_can_edit and primary_runbook: + steps.append( + f"Current evidence does not yet map what runbook {primary_runbook} changes " + "on the Azure side" + ) + else: + steps.append( + "Current evidence does not yet map the downstream Azure footprint beyond " + "visible ARM deployment history" + ) elif target_resolution == "visibility blocked": steps.append( - f"restore {target_label} visibility so AzureFox can finish the target-side join" + f"Current scope does not yet show enough {target_label} visibility to finish the " + "target-side join" ) else: steps.append( @@ -1965,7 +2129,10 @@ def _deployment_next_review( return "; ".join(steps) + "." if path_concept == "secret-escalation-support": - steps: list[str] = ["Confirm what separate foothold could reuse this secret-backed support"] + steps = [ + "Current evidence does not yet show a current-credential start or control path for " + "this secret-backed support" + ] elif _source_current_operator_can_inject(source_command, source): if ( source_command == "devops" @@ -1976,50 +2143,26 @@ def _deployment_next_review( steps = ["Current credentials can already poison a trusted input"] elif _source_current_operator_can_drive(source_command, source): steps = [ - "Current credentials can already start this path, but trusted-input poisoning is " - "not yet proven" + "Current credentials can already start this path, but current evidence does not show " + "a writable trusted input" ] else: - steps = ["Check permissions for the backing identity or service connection"] + steps = [ + "Current evidence does not show current-credential control of the backing " + "identity or Azure-linked connection" + ] if source_command == "devops" and source.get("missing_injection_point"): - primary_input = _devops_primary_trusted_input(source) - missing_proof = _devops_missing_trusted_input_proof( - str(primary_input.get("input_type") or "") if primary_input else None - ) - if missing_proof: - steps.append(f"confirm {missing_proof}") - else: - steps.append( - "confirm which trusted input can actually be poisoned from current credentials" - ) - if source.get("azure_service_connection_client_ids") or source.get( - "azure_service_connection_principal_ids" - ): - if source.get("joined_role_trusts"): - steps.append( - "use the already-joined app and identity trust evidence to validate " - "other sign-in paths into that same Azure identity" - ) - else: - steps.append("review other trust paths into the Azure identity tied to this pipeline") - permission_clause = _devops_permission_clause(source) - if permission_clause: - steps.append( - "use the already-joined Azure control on the Azure identity tied to this pipeline" - ) - if "keyvault-backed-inputs" in (source.get("secret_support_types") or []): - steps.append( - "use the already-loaded Key Vault support evidence to keep blast " - "radius in view" - ) + blocker = _devops_trusted_input_blocker(primary_input) + steps.append(blocker[:1].upper() + blocker[1:]) if source.get("missing_target_mapping"): steps.append( - "use already-loaded ARM deployment evidence as consequence grounding " - "because target mapping is still missing" + "Current evidence does not yet map the downstream Azure footprint beyond visible ARM " + "deployment history" ) elif target_resolution == "visibility blocked": steps.append( - f"restore {target_label} visibility so AzureFox can finish the target-side join" + f"Current scope does not yet show enough {target_label} visibility to finish the " + "target-side join" ) else: steps.append( @@ -2042,47 +2185,46 @@ def _deployment_target_review_step( ) -> str: shown_targets = ", ".join(target_names[:_CANDIDATE_LIMIT]) if target_resolution == "named match" and shown_targets: - step = ( - f"AzureFox already named the exact {target_label} target {shown_targets}; " - "validate that target directly" - ) + step = f"AzureFox already named the exact {target_label} target {shown_targets}" elif shown_targets: step = ( - f"AzureFox already narrowed the likely {target_label} candidates to {shown_targets}; " - "confirm which one this path actually changes" + f"AzureFox already narrowed the visible {target_label} candidates to {shown_targets}; " + "current evidence does not identify which target this path changes" ) else: - step = f"confirm which {target_label} this path actually changes" + step = f"Current evidence does not identify which {target_label} this path changes" supporting_names = ", ".join( str(item.get("name") or "") for item in supporting_deployments[:_CANDIDATE_LIMIT] if item.get("name") ) if supporting_names: - step += f" while keeping supporting ARM deployment history {supporting_names} in view" + step += f"; supporting ARM deployment history includes {supporting_names}" return step -def _devops_grounded_reach_clause(source: dict) -> str: +def _deployment_grounded_reach_clause(source: dict) -> str: consequence_types = {str(value) for value in (source.get("consequence_types") or []) if value} phrases: list[str] = [] if "redeploy-workload" in consequence_types: - phrases.append("AzureFox already ties this path to visible workload deployment reach") + phrases.append("workload deployment reach") if "modify-infra" in consequence_types: - phrases.append("visible infrastructure deployment reach") + phrases.append("infrastructure deployment reach") if "reintroduce-config" in consequence_types: phrases.append("configuration change reach") if "run-recurring-execution" in consequence_types: - phrases.append("recurring execution") + phrases.append("recurring Azure execution") if "consume-secret-backed-deployment-material" in consequence_types: - phrases.append("secret-backed deployment material") + phrases.append("secret-backed deployment support") if not phrases: return "" if len(phrases) == 1: - return phrases[0] + "." + return f"AzureFox already ties this path to {phrases[0]}." if len(phrases) == 2: - return f"{phrases[0]} and {phrases[1]}." - return ", ".join(phrases[:-1]) + f", and {phrases[-1]}." + joined = f"{phrases[0]} and {phrases[1]}" + else: + joined = ", ".join(phrases[:-1]) + f", and {phrases[-1]}" + return f"AzureFox already ties this path to {joined}." def _deployment_joined_surfaces( @@ -2119,7 +2261,7 @@ def _deployment_summary( if assessment.missing_target_mapping: impact_sentence = ( f"AzureFox has not yet mapped the downstream Azure footprint cleanly, so " - f"{target_label} evidence is only consequence grounding right now." + f"current visible {target_label} clues only narrow the downstream story right now." ) elif target_resolution == "visibility blocked": impact_sentence = ( diff --git a/src/azurefox/chains/semantics.py b/src/azurefox/chains/semantics.py index 2e4dee3..677ad07 100644 --- a/src/azurefox/chains/semantics.py +++ b/src/azurefox/chains/semantics.py @@ -68,13 +68,17 @@ def _credential_path_semantics(context: ChainSemanticContext) -> ChainSemanticDe return ChainSemanticDecision( priority="medium", urgency="review-soon", - next_review="Confirm the database target from app config or connection clues.", + next_review=( + "Current env-vars and token surfaces do not name the exact database target." + ), ) if context.target_service == "storage": return ChainSemanticDecision( priority="medium", urgency="bookmark", - next_review="Confirm the storage target from binding or connection clues.", + next_review=( + "Current env-vars and token surfaces do not name the exact storage target." + ), ) return ChainSemanticDecision( priority="medium", @@ -87,32 +91,45 @@ def _credential_path_semantics(context: ChainSemanticContext) -> ChainSemanticDe return ChainSemanticDecision( priority="low", urgency="bookmark", - next_review="Confirm the database target from app config or connection clues.", + next_review=( + "Current env-vars and token surfaces do not name the exact database target." + ), ) if context.target_service == "storage": return ChainSemanticDecision( priority="low", urgency="bookmark", - next_review="Confirm the storage target from binding or connection clues.", + next_review=( + "Current env-vars and token surfaces do not name the exact storage target." + ), ) return ChainSemanticDecision( priority="low", urgency="bookmark", - next_review="Confirm the exact target before deeper follow-up.", + next_review=( + "Current evidence does not name the exact target yet; review the visible " + "candidates next." + ), ) if context.target_resolution == "tenant-wide candidates": return ChainSemanticDecision( priority="low", urgency="bookmark", - next_review="Narrow the target with stronger naming or deployment clues.", + next_review=( + "Current evidence only narrows this to a broad visible target set; review the " + "visible candidates and look for stronger naming clues next." + ), ) if context.target_resolution == "service hint only": return ChainSemanticDecision( priority="low", urgency="bookmark", - next_review="Collect richer target-side inventory before follow-up.", + next_review=( + "Current target-side inventory does not name any concrete target here; collect " + "richer target visibility before follow-up." + ), ) if context.target_resolution == "named target not visible": diff --git a/src/azurefox/cli.py b/src/azurefox/cli.py index 16a16b9..4c8a368 100644 --- a/src/azurefox/cli.py +++ b/src/azurefox/cli.py @@ -309,10 +309,11 @@ def chains( provider = get_provider(options) if options.output != OutputMode.JSON: emit_context_banner(options) - emit_command_intro("chains") + typer.echo(family_spec.summary) model = run_chain_family(provider, options, family) artifact_paths = emit_output("chains", model, options) - emit_artifact_paths("chains", artifact_paths, options) + if options.output == OutputMode.JSON: + emit_artifact_paths("chains", artifact_paths, options) except AzureFoxError as exc: typer.echo(f"[{exc.kind}] {exc}", err=True) if options.debug and exc.details: diff --git a/src/azurefox/render/table.py b/src/azurefox/render/table.py index 471d777..93f1181 100644 --- a/src/azurefox/render/table.py +++ b/src/azurefox/render/table.py @@ -1,5 +1,6 @@ from __future__ import annotations +import re from collections import Counter from io import StringIO from urllib.parse import urlparse @@ -25,6 +26,8 @@ def render_table(command: str, payload: dict) -> str: _render_chains_overview_table(console, payload) elif command == "chains" and str(payload.get("family") or "") == "deployment-path": _render_deployment_path_table(console, payload) + elif command == "chains" and str(payload.get("family") or "") == "credential-path": + _render_credential_path_table(console, payload) elif command == "chains" and str(payload.get("family") or "") == "escalation-path": _render_escalation_path_table(console, payload) else: @@ -89,7 +92,7 @@ def _render_devops_table(console: Console, payload: dict) -> None: for index, record in enumerate(records): table = Table(title="azurefox devops" if index == 0 else None) for _key, label in display_columns: - table.add_column(label) + table.add_column(label, overflow="fold") table.add_row(*[_value_to_string(record.get(key)) for key, _ in display_columns]) console.print(table) if record.get("why_it_matters"): @@ -134,6 +137,10 @@ def _render_deployment_path_table(console: Console, payload: dict) -> None: _render_chains_path_table(console, payload) +def _render_credential_path_table(console: Console, payload: dict) -> None: + _render_chains_path_table(console, payload, detail_key=None, detail_label=None) + + def _render_escalation_path_table(console: Console, payload: dict) -> None: _render_chains_path_table(console, payload) @@ -142,8 +149,8 @@ def _render_chains_path_table( console: Console, payload: dict, *, - detail_key: str = "why_care", - detail_label: str = "why care", + detail_key: str | None = "why_care", + detail_label: str | None = "note", ) -> None: columns, records = _table_spec("chains", payload) display_columns = [item for item in columns if item[0] != detail_key] @@ -156,14 +163,14 @@ def _render_chains_path_table( return for index, record in enumerate(records): - table = Table(title="azurefox chains" if index == 0 else None) + table = Table(title="azurefox chains" if index == 0 else None, expand=True) for _key, label in display_columns: table.add_column(label) table.add_row(*[_value_to_string(record.get(key)) for key, _ in display_columns]) console.print(table) - if record.get(detail_key): + if detail_key and record.get(detail_key): detail = Table(expand=True) - detail.add_column(detail_label) + detail.add_column(detail_label or detail_key, overflow="fold") detail.add_row(_value_to_string(record.get(detail_key))) console.print(detail) if index != len(records) - 1: @@ -655,10 +662,11 @@ def _table_spec(command: str, payload: dict) -> tuple[list[tuple[str, str]], lis { "priority": item.get("priority"), "urgency": item.get("urgency") or "-", - "asset_name": item.get("asset_name"), + "asset_name": _stack_chain_source(item.get("asset_name")), "actionability_state": _deployment_actionability_state_label(item), - "insertion_point": item.get("insertion_point") - or _deployment_path_type(item), + "insertion_point": _stack_chain_insertion_point( + item.get("insertion_point") or _deployment_path_type(item) + ), "likely_impact": item.get("likely_impact") or _chains_target_context(item), "confidence_boundary": item.get("confidence_boundary") or _chains_note(item, family=family), @@ -696,6 +704,35 @@ def _table_spec(command: str, payload: dict) -> tuple[list[tuple[str, str]], lis for item in payload.get("paths", []) ], ) + if family == "credential-path": + return ( + [ + ("priority", "priority"), + ("urgency", "urgency"), + ("asset_name", "asset"), + ("setting_name", "setting"), + ("target_service", "target"), + ("target_resolution", "target resolution"), + ("target_names", "visible targets"), + ("next_review", "next review"), + ("confidence_boundary", "confidence boundary"), + ], + [ + { + "priority": item.get("priority"), + "urgency": item.get("urgency") or "-", + "asset_name": item.get("asset_name"), + "setting_name": item.get("setting_name"), + "target_service": item.get("target_service"), + "target_resolution": item.get("target_resolution"), + "target_names": _chains_target_context(item), + "next_review": item.get("next_review"), + "confidence_boundary": item.get("confidence_boundary") + or _chains_note(item, family=family), + } + for item in payload.get("paths", []) + ], + ) return ( [ ("priority", "priority"), @@ -1945,11 +1982,14 @@ def _chains_target_context(item: dict) -> str: return str(item.get("target_visibility_issue")) target_names = item.get("target_names") or [] if target_names: - return ",".join(str(value) for value in target_names[:3]) + names = [str(value) for value in target_names[:3]] + if len(names) == 1: + return names[0] + return "\n".join(names) target_count = item.get("target_count") or 0 if target_count: return f"{target_count} visible target(s)" - return "none joined" + return "none visible" def _deployment_path_type(item: dict) -> str: @@ -1974,6 +2014,29 @@ def _deployment_actionability_state_label(item: dict) -> str: return labels.get(state, state or "-") +def _stack_chain_source(value: object) -> str: + text = str(value or "").strip() + if not text or len(text) <= 18 or "-" not in text: + return text + return text.replace("-", "-\n") + + +def _stack_chain_insertion_point(value: object) -> str: + text = str(value or "").strip() + if not text: + return text + text = re.sub(r";\s+", ";\n", text) + text = re.sub(r",\s+", ",\n", text) + text = text.replace(":", ":\n") + text = text.replace("/", "/\n") + text = text.replace("#", "#\n") + text = text.replace("@refs/", "@\nrefs/") + text = text.replace(" through ", "\nthrough ") + text = text.replace(" under ", "\nunder ") + text = text.replace(" at ", "\nat ") + return text + + def _escalation_path_type(item: dict) -> str: concept = str(item.get("path_concept") or "") labels = { @@ -3090,9 +3153,14 @@ def _empty_state_message(command: str, payload: dict) -> str: def _render_scope_boundary_notes(console: Console, command: str, payload: dict) -> None: if command != "chains": return + if str(payload.get("family") or "").strip() == "credential-path": + return + if str(payload.get("family") or "").strip() == "deployment-path": + current_gap = "" + else: + current_gap = str(payload.get("current_gap") or "").strip() claim_boundary = str(payload.get("claim_boundary") or "").strip() - current_gap = str(payload.get("current_gap") or "").strip() if not claim_boundary and not current_gap: return diff --git a/tests/test_chain_semantics.py b/tests/test_chain_semantics.py index 9247443..c003ff5 100644 --- a/tests/test_chain_semantics.py +++ b/tests/test_chain_semantics.py @@ -215,8 +215,8 @@ def test_deployment_path_secure_file_use_insertion_point_stays_use_scoped() -> N } ) == ( - "secure file codesign-cert.pfx is usable in pipeline context, but " - "secure-file administration is unproven." + "Current scope shows secure file codesign-cert.pfx as usable in pipeline context, " + "but not administrable." ) ) @@ -241,9 +241,8 @@ def test_deployment_path_artifact_read_insertion_point_stays_producer_scoped() - } ) == ( - "The upstream producer behind pipeline artifact " - "prod-platform/shared-build#signed-drop is inspectable, but producer control is " - "unproven." + "Current scope shows the upstream producer behind pipeline artifact " + "prod-platform/shared-build#signed-drop as readable, not writable." ) ) @@ -291,8 +290,7 @@ def test_deployment_path_definition_edit_next_review_stays_definition_scoped() - ) == ( "Current credentials can already edit this pipeline definition directly; AzureFox " - "already named the exact App Service target app-public-api; validate that target " - "directly." + "already named the exact App Service target app-public-api." ) ) diff --git a/tests/test_cli_smoke.py b/tests/test_cli_smoke.py index ff22e63..83d8f73 100644 --- a/tests/test_cli_smoke.py +++ b/tests/test_cli_smoke.py @@ -137,21 +137,36 @@ def test_cli_smoke_chains_credential_path_table_output(tmp_path: Path) -> None: ["--outdir", str(tmp_path), "chains", "credential-path"], env={"AZUREFOX_FIXTURE_DIR": str(fixture_dir)}, ) + normalized_output = " ".join(result.stdout.split()).lower() assert result.exit_code == 0 assert "azurefox chains" in result.stdout + assert ( + "Follow credential clues from surfaced secret-bearing or token-bearing evidence toward " + "the likely downstream service." + in result.stdout + ) assert "kvlabopen01" in result.stdout assert "priority" in result.stdout assert "target resolution" in result.stdout assert "next review" in result.stdout + assert "confidence boundary" in result.stdout assert "high" in result.stdout assert "medium" in result.stdout assert "low" in result.stdout - normalized_output = " ".join(result.stdout.split()) assert "narrowed" in normalized_output assert "candidates" in normalized_output - assert "Claim boundary:" in result.stdout - assert "Current gap:" in result.stdout + assert "token surfaces do not" in normalized_output + assert "database target." in normalized_output + assert "stlabpub01" in result.stdout + assert "stlabpriv01" in result.stdout + assert "exact storage" in normalized_output + assert "target." in normalized_output + assert "loaded evidence does" in normalized_output + assert "setting is not" in normalized_output + assert "confirmed to reach it." in normalized_output + assert "Claim boundary:" not in result.stdout + assert "Current gap:" not in result.stdout assert "Takeaway: 3 visible credential paths" in result.stdout @@ -166,7 +181,7 @@ def test_cli_smoke_chains_deployment_path_table_output(tmp_path: Path) -> None: assert result.exit_code == 0 assert "azurefox chains" in result.stdout - assert "why care" in result.stdout + assert "note" in result.stdout assert "actionability" in result.stdout assert "insertion point" in result.stdout assert "likely azure impact" in result.stdout @@ -245,12 +260,16 @@ def test_cli_smoke_chains_deployment_path_json(tmp_path: Path) -> None: assert support_row["priority"] == "low" assert support_row["actionability_state"] == "support-only" assert "Lab-Maintenance" in support_row["insertion_point"] - assert "Another foothold" in support_row["why_care"] - assert "target mapping is still missing" in support_row["next_review"] + assert ( + "concentrates connections and encrypted variables around reusable automation" + in support_row["why_care"] + ) + assert "does not yet map what runbook Lab-Maintenance changes" in support_row["next_review"] automation_row = next( item for item in payload["paths"] if item["asset_name"] == "aa-hybrid-prod" ) - assert automation_row["target_resolution"] == "visibility blocked" + assert automation_row["target_resolution"] == "narrowed candidates" + assert automation_row["target_service"] == "app-service" assert automation_row["actionability_state"] == "currently actionable" assert automation_row["priority"] == "high" assert "webhook path can start runbook Redeploy-App" in automation_row["insertion_point"] @@ -261,10 +280,23 @@ def test_cli_smoke_chains_deployment_path_json(tmp_path: Path) -> None: assert "role-trusts" in automation_row["evidence_commands"] assert "rbac" in automation_row["evidence_commands"] assert "This row proves source-side control" in automation_row["confidence_boundary"] - assert "Azure footprint beyond ARM deployment evidence" in automation_row["confidence_boundary"] + assert "not the exact App Service target" in automation_row["confidence_boundary"] assert "ops-deploy-sp" in automation_row["why_care"] - assert "map what runbook Redeploy-App changes" in automation_row["next_review"] - assert "run recurring Azure-facing execution" in automation_row["why_care"] + assert automation_row["target_names"] == ["app-empty-mi", "app-public-api"] + assert ( + automation_row["likely_impact"] + == "2 visible app service candidate(s): app-empty-mi, app-public-api" + ) + assert "AzureFox already narrowed the visible App Service candidates" in automation_row[ + "next_review" + ] + assert "editable trigger or definition path" not in automation_row["next_review"] + assert "trigger webhook runbook Redeploy-App" not in automation_row["next_review"] + assert "confirm which runbook and trigger path performs the Azure change" not in ( + automation_row["next_review"] + ) + assert "app-failed" in automation_row["next_review"] + assert "recurring Azure execution" in automation_row["why_care"] aks_row = next(item for item in payload["paths"] if item["asset_name"] == "deploy-aks-prod") assert aks_row["actionability_state"] == "conditionally actionable" assert "Queue this pipeline now" in aks_row["insertion_point"] @@ -274,7 +306,7 @@ def test_cli_smoke_chains_deployment_path_json(tmp_path: Path) -> None: assert "current-credential run-path control" in aks_row["confidence_boundary"] assert "not a writable source" in aks_row["confidence_boundary"] assert "exact AKS cluster target" in aks_row["confidence_boundary"] - assert "AzureFox already narrowed the likely AKS cluster candidates" in aks_row["next_review"] + assert "AzureFox already narrowed the visible AKS cluster candidates" in aks_row["next_review"] appsvc_row = next( item for item in payload["paths"] if item["asset_name"] == "deploy-appservice-prod" ) @@ -346,16 +378,37 @@ def test_cli_smoke_chains_escalation_path_table_output(tmp_path: Path) -> None: assert "path type" in result.stdout assert "stronger outcome" in result.stdout assert "confidence boundary" in result.stdout - assert "why care" in result.stdout - normalized_output = " ".join(result.stdout.split()) - assert "azurefox-lab-sp" in normalized_output - assert "(current" in normalized_output - assert "foothold)" in normalized_output - assert "current foothold direct control" in normalized_output - assert "Owner across" in normalized_output - assert "subscription-wide scope" in normalized_output - assert "pivot-now" in result.stdout - assert "Takeaway: 1 visible escalation paths" in result.stdout + assert "note" in result.stdout + + +def test_cli_smoke_deployment_path_operator_language_guard(tmp_path: Path) -> None: + fixture_dir = Path(__file__).resolve().parent / "fixtures" / "lab_tenant" + + result = runner.invoke( + app, + ["--outdir", str(tmp_path), "--output", "json", "chains", "deployment-path"], + env={"AZUREFOX_FIXTURE_DIR": str(fixture_dir)}, + ) + + assert result.exit_code == 0 + payload = json.loads(result.stdout) + text = " ".join( + str(item.get(field) or "") + for item in payload["paths"] + for field in ("why_care", "next_review", "confidence_boundary", "likely_impact") + ) + + banned = [ + "visible workload deployment reach", + "visible configuration change reach", + "visible recurring Azure execution", + "visible secret-backed deployment support", + "not that current credentials can run this path", + "not yet proven", + "has not yet proven", + ] + for phrase in banned: + assert phrase not in text def test_cli_smoke_chains_overview_table_output(tmp_path: Path) -> None: diff --git a/tests/test_deployment_path_admissibility.py b/tests/test_deployment_path_admissibility.py index 214cc5b..b90b9bb 100644 --- a/tests/test_deployment_path_admissibility.py +++ b/tests/test_deployment_path_admissibility.py @@ -11,6 +11,7 @@ from azurefox.chains.runner import ( _automation_current_operator_access, _automation_scope_label, + _best_automation_target_mapping, _structured_deployment_target_matches, ) from azurefox.models.common import ( @@ -298,6 +299,121 @@ def test_automation_scope_label_keeps_child_resource_scope_distinct_from_resourc ) +def test_best_automation_target_mapping_uses_runbook_names_to_narrow_visible_targets() -> None: + account = _load_automation_account("aa-hybrid-prod").model_dump(mode="json") + app_services = json.loads( + ( + Path(__file__).resolve().parent / "fixtures" / "lab_tenant" / "app_services.json" + ).read_text(encoding="utf-8") + ) + functions = json.loads( + (Path(__file__).resolve().parent / "fixtures" / "lab_tenant" / "functions.json").read_text( + encoding="utf-8" + ) + ) + aks = json.loads( + (Path(__file__).resolve().parent / "fixtures" / "lab_tenant" / "aks.json").read_text( + encoding="utf-8" + ) + ) + arm = json.loads( + ( + Path(__file__).resolve().parent / "fixtures" / "lab_tenant" / "arm_deployments.json" + ).read_text(encoding="utf-8") + ) + + mapping = _best_automation_target_mapping( + account, + target_candidates={ + "app-services": app_services["app_services"], + "functions": functions["function_apps"], + "aks": aks["aks_clusters"], + "arm-deployments": arm["deployments"], + }, + target_visibility_notes={ + "app-services": None, + "functions": None, + "aks": None, + "arm-deployments": None, + }, + target_visibility_issues={ + "app-services": None, + "functions": None, + "aks": None, + "arm-deployments": None, + }, + arm_correlations={ + "app-services": [arm["deployments"][2]], + "functions": [arm["deployments"][2]], + "aks": [], + "arm-deployments": arm["deployments"], + }, + ) + + assert mapping is not None + assert mapping["target_family"] == "app-services" + assert mapping["exact_targets"] == [] + assert [item["name"] for item in mapping["target_candidates"]] == [ + "app-empty-mi", + "app-public-api", + ] + assert mapping["confirmation_basis"] == "same-workload-corroborated" + + +def test_best_automation_target_mapping_does_not_remap_from_name_overlap_alone() -> None: + account = _load_automation_account("aa-hybrid-prod").model_dump(mode="json") + app_services = json.loads( + ( + Path(__file__).resolve().parent / "fixtures" / "lab_tenant" / "app_services.json" + ).read_text(encoding="utf-8") + ) + functions = json.loads( + (Path(__file__).resolve().parent / "fixtures" / "lab_tenant" / "functions.json").read_text( + encoding="utf-8" + ) + ) + aks = json.loads( + (Path(__file__).resolve().parent / "fixtures" / "lab_tenant" / "aks.json").read_text( + encoding="utf-8" + ) + ) + arm = json.loads( + ( + Path(__file__).resolve().parent / "fixtures" / "lab_tenant" / "arm_deployments.json" + ).read_text(encoding="utf-8") + ) + + mapping = _best_automation_target_mapping( + account, + target_candidates={ + "app-services": app_services["app_services"], + "functions": functions["function_apps"], + "aks": aks["aks_clusters"], + "arm-deployments": arm["deployments"], + }, + target_visibility_notes={ + "app-services": None, + "functions": None, + "aks": None, + "arm-deployments": None, + }, + target_visibility_issues={ + "app-services": None, + "functions": None, + "aks": None, + "arm-deployments": None, + }, + arm_correlations={ + "app-services": [], + "functions": [], + "aks": [], + "arm-deployments": arm["deployments"], + }, + ) + + assert mapping is None + + def _load_devops_pipeline(name: str) -> DevopsPipelineAsset: payload = json.loads( (Path(__file__).resolve().parent / "fixtures" / "lab_tenant" / "devops.json").read_text( diff --git a/tests/test_terminal_ux.py b/tests/test_terminal_ux.py index 89beac6..74079ef 100644 --- a/tests/test_terminal_ux.py +++ b/tests/test_terminal_ux.py @@ -768,21 +768,30 @@ def test_chains_table_mode_surfaces_priority_and_next_review(tmp_path: Path) -> ) assert result.exit_code == 0 - normalized_output = " ".join(result.stdout.split()) + normalized_output = " ".join(result.stdout.split()).lower() assert "priority" in result.stdout assert "urgency" in result.stdout assert "next review" in result.stdout - assert "note" in result.stdout + assert "confidence boundary" in result.stdout assert "review-soon" in normalized_output assert "bookmark" in normalized_output assert "func-orders" in result.stdout assert "app-public-api" in result.stdout assert "Your current" in result.stdout assert "this secret." in result.stdout - assert "Check vault access" in normalized_output - assert "connection clues." in normalized_output - assert "AzureFox narrowed" in normalized_output + assert "check vault access" in normalized_output + assert "token surfaces do not" in normalized_output + assert "database target." in normalized_output + assert "exact storage" in normalized_output + assert "target." in normalized_output + assert "azurefox narrowed" in normalized_output + assert "setting is not" in normalized_output + assert "confirmed to reach it." in normalized_output assert "database" in normalized_output + assert "stlabpub01" in result.stdout + assert "stlabpriv01" in result.stdout + assert "Claim boundary:" not in result.stdout + assert "Current gap:" not in result.stdout def test_deployment_chains_table_mode_surfaces_source_oriented_columns(tmp_path: Path) -> None: @@ -796,7 +805,7 @@ def test_deployment_chains_table_mode_surfaces_source_oriented_columns(tmp_path: normalized_output = " ".join(result.stdout.split()) assert "likely azure impact" in result.stdout assert "next review" in result.stdout - assert "why care" in result.stdout + assert "note" in result.stdout assert "priority" in result.stdout assert "urgency" in result.stdout assert "actionability" in result.stdout @@ -807,7 +816,6 @@ def test_deployment_chains_table_mode_surfaces_source_oriented_columns(tmp_path: assert "aa-hybrid-prod" in result.stdout assert "aa-lab-quiet" in result.stdout assert "pivot-now" in normalized_output - assert "Artifact trust is" in normalized_output assert "pipeline artifact" in normalized_output assert "currently actionable" in normalized_output assert "conditionally" in normalized_output @@ -816,10 +824,10 @@ def test_deployment_chains_table_mode_surfaces_source_oriented_columns(tmp_path: assert "Redeploy-App" in normalized_output assert "Lab-Maintenance" in normalized_output assert "Claim boundary:" in result.stdout - assert "Current gap:" in result.stdout + assert "Current gap:" not in result.stdout -def test_deployment_chains_table_mode_renders_why_care_as_detail_rows(tmp_path: Path) -> None: +def test_deployment_chains_table_mode_renders_note_as_detail_rows(tmp_path: Path) -> None: result = runner.invoke( app, ["--outdir", str(tmp_path), "chains", "deployment-path"], @@ -829,15 +837,15 @@ def test_deployment_chains_table_mode_renders_why_care_as_detail_rows(tmp_path: assert result.exit_code == 0 lines = result.stdout.splitlines() main_header_lines = [line for line in lines if "┃ priority" in line] - detail_header_lines = [line for line in lines if line.startswith("┃ why care")] + detail_header_lines = [line for line in lines if line.startswith("┃ note")] assert main_header_lines - assert all("why care" not in line for line in main_header_lines) + assert all("note" not in line for line in main_header_lines) assert detail_header_lines - assert "Current credentials can already poison that source" in result.stdout - assert "If that trusted input becomes attacker-controlled" in result.stdout + assert "Current credentials can already modify that trusted input" in result.stdout + assert "If that trusted input is changed upstream" in result.stdout assert len(detail_header_lines) == 6 - assert "Current gap:" in result.stdout + assert "Current gap:" not in result.stdout def test_escalation_chains_table_mode_renders_defended_current_foothold_story( @@ -855,7 +863,7 @@ def test_escalation_chains_table_mode_renders_defended_current_foothold_story( assert "starting foothold" in result.stdout assert "path type" in result.stdout assert "stronger outcome" in result.stdout - assert "why care" in result.stdout + assert "note" in result.stdout assert "azurefox-lab-sp" in normalized_output assert "(current" in normalized_output assert "foothold)" in normalized_output @@ -1010,9 +1018,12 @@ def test_chains_named_keyvault_not_visible_prefers_inventory_boundary() -> None: rendered = render_table("chains", payload) normalized = " ".join(rendered.split()) - assert "This app names a Key" in normalized + assert "AzureFox can name the" in normalized + assert "vault, but cannot yet" in normalized + assert "read the secret." in normalized + assert "Verify that the named" in normalized + assert "target is visible in" in normalized assert "current inventory." in normalized - assert "cannot yet tell whether your current identity can read the secret" not in normalized def test_app_services_partial_read_surfaces_collection_issue() -> None: diff --git a/tests/test_visibility_tiers.py b/tests/test_visibility_tiers.py index edaa5c3..cf308e4 100644 --- a/tests/test_visibility_tiers.py +++ b/tests/test_visibility_tiers.py @@ -848,7 +848,8 @@ def test_chains_visibility_tiers_avoid_fake_target_certainty() -> None: assert "func-orders" in medium_rendered assert "PAYMENT_API_KEY" in medium_rendered assert "narrowed candidates" in medium_rendered - assert "kv-orders,kv-shared" in medium_rendered + assert "kv-orders" in medium_rendered + assert "kv-shared" in medium_rendered assert "This app exposes a" in medium_rendered assert "secret-shaped" in medium_rendered assert "exact target" in medium_rendered @@ -867,6 +868,6 @@ def test_chains_visibility_tiers_avoid_fake_target_certainty() -> None: assert "Restore keyvault" in low_rendered assert "choosing a target." in low_rendered assert "Current-scope issues:" in low_rendered - assert "Claim boundary:" in low_rendered - assert "Current gap:" in low_rendered + assert "Claim boundary:" not in low_rendered + assert "Current gap:" not in low_rendered assert "kv-orders,kv-shared" not in low_rendered diff --git a/wiki/README.md b/wiki/README.md deleted file mode 100644 index 4800452..0000000 --- a/wiki/README.md +++ /dev/null @@ -1,9 +0,0 @@ -# Wiki Source - -This tree is reserved for future GitHub wiki source material that should not live in the main -repo `docs/` folder. - -- Do not use this tree as a long-term home for internal command-planning scratch notes. -- If content here is worth keeping, it should have a clear published wiki destination and active ownership. - -Live operator guidance should stay in `azurefox help` and `azurefox help `.