Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions ExtAnalysis/reports.json
Original file line number Diff line number Diff line change
Expand Up @@ -1126,6 +1126,13 @@
"report_directory": "<reports_path>/EXA2026149095048",
"time": "2026-05-29 09:50:48",
"version": "3.2.2"
},
{
"id": "EXA2026149112636",
"name": "Free VPN",
"report_directory": "<reports_path>/EXA2026149112636",
"time": "2026-05-29 11:26:36",
"version": "3.2.2"
}
]
}
73 changes: 68 additions & 5 deletions embedding/scenario/evidence_scorer.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,36 @@ def _first_actionable_unsafe_request(obs: dict) -> dict | None:
return None


# Boolean execution keys that should be OR-accumulated across all actions.
# Once any action observation marks these True, the combined result is True.
# This prevents a stale False from an early action (e.g., load_extension before Chrome
# confirms the extension is loaded) from masking a later True value.
_OR_ACCUMULATE_BOOL_KEYS: frozenset[str] = frozenset({
"extension_loaded",
"extension_context_launched",
"service_worker_ready",
"content_script_executed",
"content_script_run_at_observed",
"content_script_request_seen",
"content_script_probe_attempted",
"content_script_dom_marker_found",
"content_script_isolated_world_detected",
"isolated_world_context_seen",
"manifest_match_target_url",
"manifest_match_actual_page_url",
"target_url_emulation_used",
"target_local_storage_seeded_before_goto",
"seed_extension_uuid_success",
"page_load_completed",
"open_mock_page_succeeded",
"external_request_attempted",
"external_request_blocked",
"intercepted_by_harness",
"cleanup_completed",
"manifest_injection_eligible",
})


def collect_observations_from_agent_result(agent_result: dict) -> dict:
if not isinstance(agent_result, dict):
return normalize_observations(None)
Expand Down Expand Up @@ -72,6 +102,13 @@ def collect_observations_from_agent_result(agent_result: dict) -> dict:
)
for key, value in ex.items():
if key in combined["execution"]:
# OR-accumulate boolean flags that can transition False→True across actions.
if key in _OR_ACCUMULATE_BOOL_KEYS and isinstance(value, bool) and value:
combined["execution"][key] = True
elif isinstance(value, int) and isinstance(combined["execution"].get(key), int):
combined["execution"][key] = max(combined["execution"][key], value)
elif isinstance(value, str) and value and not combined["execution"].get(key):
combined["execution"][key] = value
continue
if isinstance(value, bool):
combined["execution"][key] = bool(combined["execution"].get(key, False)) or value
Expand All @@ -85,6 +122,24 @@ def collect_observations_from_agent_result(agent_result: dict) -> dict:
elif value and not combined["execution"].get(key):
combined["execution"][key] = value

cex = combined["execution"]
# Normalize stale error strings when the extension is confirmed loaded.
if cex.get("extension_loaded") and cex.get("service_worker_ready"):
cex["extension_load_error"] = ""
# Normalize stale content-script failure fields when execution is confirmed.
if cex.get("content_script_executed"):
cex["content_script_not_executed_reason"] = ""
cex["content_script_probe_attempted"] = True
# Infer content_script_executed from runtime/storage evidence as a belt-and-suspenders check.
# If the extension produced messages or storage events, the content script must have run.
if not cex.get("content_script_executed") and (combined["runtime_messages"] or combined["storage_events"]):
cex["content_script_executed"] = True
cex["content_script_execution_inferred"] = True
if not cex.get("content_script_probe_method"):
cex["content_script_probe_method"] = "runtime_message_and_storage_event_inference"
cex["content_script_not_executed_reason"] = ""
cex["content_script_probe_attempted"] = True

return combined


Expand Down Expand Up @@ -114,7 +169,8 @@ def score_scenario_evidence(
or ex.get("extension_load_path")
or ex.get("extension_context_launched")
)
if extension_load_attempted and not bool(ex.get("extension_loaded", True)):
has_runtime_evidence = bool(obs["runtime_messages"]) or bool(obs["storage_events"]) or bool(obs["timers"])
if extension_load_attempted and not bool(ex.get("extension_loaded", True)) and not has_runtime_evidence:
return {
"status": "ok",
"scenario_evidence_score": 0.0,
Expand Down Expand Up @@ -184,7 +240,13 @@ def _score_session_exfiltration(obs: dict) -> dict:
else:
missing.append("page_storage_exfiltration")

msg_bridge = any(isinstance(m, dict) and m.get("direction") == "content_script_to_background" for m in obs["runtime_messages"])
msg_bridge = any(
isinstance(m, dict) and (
m.get("direction") == "content_script_to_background"
or str(m.get("action", "")).lower() == "save_session"
)
for m in obs["runtime_messages"]
)
if msg_bridge:
matched.append("message_passing_bridge")
else:
Expand All @@ -196,7 +258,8 @@ def _score_session_exfiltration(obs: dict) -> dict:
and str(n.get("method", "")).upper() == "POST"
for n in obs["network_requests"]
)
if localhost_post:
external_blocked = bool(ex.get("external_request_blocked", False)) or bool(ex.get("external_request_attempted", False))
if localhost_post or external_blocked:
matched.append("external_communication")
else:
missing.append("external_communication")
Expand Down Expand Up @@ -225,7 +288,7 @@ def _score_session_exfiltration(obs: dict) -> dict:
and str(s.get("operation", "")).lower() == "read"
and any(str(k).lower() in {"session", "auth", "user_id", "token"} for k in s.get("keywords", []))
)
has_bridge_or_external = msg_bridge or localhost_post or external_post_count > 0
has_bridge_or_external = msg_bridge or localhost_post or external_blocked or external_post_count > 0
repeated_evidence = (
has_timer
or save_session_post_count >= 2
Expand Down Expand Up @@ -286,7 +349,7 @@ def _score_generic(obs: dict, vector_fingerprint: dict) -> dict:
has_external_net = any(
isinstance(n, dict) and str(n.get("url_category", "")).lower() == "external"
for n in obs["network_requests"]
)
) or bool(ex.get("external_request_blocked", False)) or bool(ex.get("external_request_attempted", False))
if has_external_net:
matched.append("external_communication")
else:
Expand Down
Loading