diff --git a/CHANGELOG.md b/CHANGELOG.md index bb51bb2..d3abcdc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,7 +4,46 @@ All notable changes to OpenAnt are documented in this file. ## [Unreleased] -This release syncs a large body of work from internal development. Highlights: +### Fixed + +- **Disclosure code is now byte-faithful to source.** The disclosure + renderer pulls the actual file slice from the repo instead of rerunning + an LLM rewrite, so every finding's `Vulnerable Code` block matches the + real source. +- **No more silent 401s.** `openant set-api-key` validates the key on save + and fails loudly on bad input. `openant scan` prints a blocking warning + and exits non-zero when zero API calls succeed, so an all-401 run can no + longer masquerade as a clean repo. +- **CWE tagging is now systematic.** `pipeline_output.json` carries + non-null `cwe`, `cwe_id`, and `vulnerability_type` for every finding. + Stage 1 prompt asks for them directly rather than relying on the + renderer LLM to infer them from prose. +- **`[NOT PROVIDED]` placeholders eliminated.** Repo name, commit SHA, and + file count are threaded into every phase report envelope + (`parse.report.json`, `scan.report.json`) instead of being lost between + stages. +- **`Verified` column reflects the highest evidence tier.** `dynamic` > + `verified` > `static`, so dynamically reproduced findings show as + `dynamic` and the disclosure footer reads "Confirmed via dynamic test" + where applicable. +- **Call-graph-aware deduplication.** When two findings share a + sink/vector and the call graph records an edge between them, they + collapse into a single finding. +- **Dynamic test scaffolding fixed.** `openant dynamic-test` pre-stages + the vulnerable source file into the Docker build context end-to-end + through the dynamic-test chain — first-try Docker builds no longer fail + because the source isn't in context. +- **Concurrency-safe Docker resources.** Docker image and network names + get a UUID prefix so parallel dynamic-test workers can't collide. +- **Agreement filter checks the final verdict** instead of the + intermediate `agree` flag, so high-confidence dynamic results aren't + dropped by a stale agreement signal. +- **Dedup matches on CWE** instead of `attack_vector` text, so small + wording differences no longer split what's logically the same finding. + +## [2026-04-14] — Initial public release + +This release synced a large body of work from internal development. Highlights: ### Added diff --git a/apps/openant-cli/cmd/dynamictest.go b/apps/openant-cli/cmd/dynamictest.go index 5ff99e4..1d19297 100644 --- a/apps/openant-cli/cmd/dynamictest.go +++ b/apps/openant-cli/cmd/dynamictest.go @@ -80,6 +80,12 @@ func runDynamicTest(cmd *cobra.Command, args []string) { pyArgs = append(pyArgs, "--max-retries", fmt.Sprintf("%d", dynamicTestMaxRetries)) } + // Pass repo path so the dynamic tester can pre-stage source files into + // the Docker build context. + if ctx != nil && ctx.Project != nil && ctx.RepoPath != "" { + pyArgs = append(pyArgs, "--repo-path", ctx.RepoPath) + } + result, err := python.Invoke(rt.Path, pyArgs, "", quiet, requireAPIKey()) if err != nil { output.PrintError(err.Error()) diff --git a/apps/openant-cli/cmd/scan.go b/apps/openant-cli/cmd/scan.go index 39c1e57..5d62bb8 100644 --- a/apps/openant-cli/cmd/scan.go +++ b/apps/openant-cli/cmd/scan.go @@ -141,6 +141,20 @@ func runScan(cmd *cobra.Command, args []string) { pyArgs = append(pyArgs, "--backoff", fmt.Sprintf("%d", scanBackoff)) } + // Pass repository metadata from project context so reports don't show + // [NOT PROVIDED] placeholders. + if ctx != nil && ctx.Project != nil { + if ctx.Project.Name != "" { + pyArgs = append(pyArgs, "--repo-name", ctx.Project.Name) + } + if ctx.Project.RepoURL != "" { + pyArgs = append(pyArgs, "--repo-url", ctx.Project.RepoURL) + } + if ctx.Project.CommitSHA != "" { + pyArgs = append(pyArgs, "--commit-sha", ctx.Project.CommitSHA) + } + } + result, err := python.Invoke(rt.Path, pyArgs, "", quiet, requireAPIKey()) if err != nil { output.PrintError(err.Error()) diff --git a/apps/openant-cli/cmd/setapikey.go b/apps/openant-cli/cmd/setapikey.go index ddfe639..14194a9 100644 --- a/apps/openant-cli/cmd/setapikey.go +++ b/apps/openant-cli/cmd/setapikey.go @@ -2,14 +2,42 @@ package cmd import ( "fmt" + "io" + "net/http" "os" "strings" + "time" "github.com/knostic/open-ant-cli/internal/config" "github.com/knostic/open-ant-cli/internal/output" "github.com/spf13/cobra" ) +var anthropicAPIURL = "https://api.anthropic.com/v1/messages" + +func validateAPIKey(key string) error { + body := strings.NewReader(`{"model":"claude-haiku-4-5-20251001","max_tokens":1,"messages":[{"role":"user","content":"hi"}]}`) + req, err := http.NewRequest("POST", anthropicAPIURL, body) + if err != nil { + return fmt.Errorf("failed to build validation request: %w", err) + } + req.Header.Set("x-api-key", key) + req.Header.Set("anthropic-version", "2023-06-01") + req.Header.Set("content-type", "application/json") + + client := &http.Client{Timeout: 15 * time.Second} + resp, err := client.Do(req) + if err != nil { + return fmt.Errorf("could not reach Anthropic API: %w", err) + } + defer func() { _, _ = io.Copy(io.Discard, resp.Body); resp.Body.Close() }() + + if resp.StatusCode == http.StatusUnauthorized { + return fmt.Errorf("Anthropic rejected the key (HTTP 401). Double-check it at https://console.anthropic.com/settings/keys") + } + return nil +} + var setAPIKeyCmd = &cobra.Command{ Use: "set-api-key ", Short: "Save your Anthropic API key", @@ -34,6 +62,17 @@ func runSetAPIKey(cmd *cobra.Command, args []string) { os.Exit(1) } + // Validate against Anthropic BEFORE saving — a bad key should never + // be persisted, otherwise `openant scan` silently produces zero results + // that look like a clean repo. + fmt.Fprintf(os.Stderr, "Validating API key with Anthropic... ") + if err := validateAPIKey(key); err != nil { + fmt.Fprintf(os.Stderr, "\n") + output.PrintError(err.Error()) + os.Exit(1) + } + fmt.Fprintf(os.Stderr, "OK\n") + cfg, err := config.Load() if err != nil { output.PrintError(err.Error()) diff --git a/apps/openant-cli/cmd/setapikey_test.go b/apps/openant-cli/cmd/setapikey_test.go new file mode 100644 index 0000000..e212956 --- /dev/null +++ b/apps/openant-cli/cmd/setapikey_test.go @@ -0,0 +1,85 @@ +package cmd + +import ( + "net/http" + "net/http/httptest" + "testing" +) + +func TestValidateAPIKey_Rejects401(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusUnauthorized) + })) + defer server.Close() + + // Override the API URL for this test. + orig := anthropicAPIURL + defer func() { anthropicAPIURL = orig }() + anthropicAPIURL = server.URL + + err := validateAPIKey("sk-bad-key") + if err == nil { + t.Fatal("expected error for 401 response, got nil") + } + if got := err.Error(); !contains(got, "401") { + t.Errorf("error should mention 401, got: %s", got) + } +} + +func TestValidateAPIKey_AcceptsValid(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("content-type", "application/json") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`{"id":"msg_test","type":"message","role":"assistant","content":[{"type":"text","text":"h"}],"model":"claude-haiku-4-5-20251001","usage":{"input_tokens":1,"output_tokens":1}}`)) + })) + defer server.Close() + + orig := anthropicAPIURL + defer func() { anthropicAPIURL = orig }() + anthropicAPIURL = server.URL + + if err := validateAPIKey("sk-good-key"); err != nil { + t.Fatalf("expected nil error for 200 response, got: %v", err) + } +} + +func TestValidateAPIKey_SendsCorrectHeaders(t *testing.T) { + var gotKey, gotVersion, gotContentType string + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + gotKey = r.Header.Get("x-api-key") + gotVersion = r.Header.Get("anthropic-version") + gotContentType = r.Header.Get("content-type") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`{}`)) + })) + defer server.Close() + + orig := anthropicAPIURL + defer func() { anthropicAPIURL = orig }() + anthropicAPIURL = server.URL + + _ = validateAPIKey("sk-test-123") + + if gotKey != "sk-test-123" { + t.Errorf("x-api-key = %q, want %q", gotKey, "sk-test-123") + } + if gotVersion != "2023-06-01" { + t.Errorf("anthropic-version = %q, want %q", gotVersion, "2023-06-01") + } + if gotContentType != "application/json" { + t.Errorf("content-type = %q, want %q", gotContentType, "application/json") + } +} + +func contains(s, substr string) bool { + return len(s) >= len(substr) && (s == substr || len(s) > 0 && containsHelper(s, substr)) +} + +func containsHelper(s, sub string) bool { + for i := 0; i <= len(s)-len(sub); i++ { + if s[i:i+len(sub)] == sub { + return true + } + } + return false +} diff --git a/libs/openant-core/core/dynamic_tester.py b/libs/openant-core/core/dynamic_tester.py index 7c16603..9f9c10d 100644 --- a/libs/openant-core/core/dynamic_tester.py +++ b/libs/openant-core/core/dynamic_tester.py @@ -18,6 +18,7 @@ def run_tests( pipeline_output_path: str, output_dir: str, max_retries: int = 3, + repo_path: str | None = None, ) -> DynamicTestStepResult: """Run dynamic exploit tests on confirmed vulnerabilities. @@ -83,6 +84,7 @@ def run_tests( pipeline_output_path, output_dir, max_retries=max_retries, + repo_path=repo_path, ) # Count outcomes diff --git a/libs/openant-core/core/reporter.py b/libs/openant-core/core/reporter.py index 4f604dd..10cee46 100644 --- a/libs/openant-core/core/reporter.py +++ b/libs/openant-core/core/reporter.py @@ -24,6 +24,120 @@ _CORE_ROOT = Path(__file__).parent.parent +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +# Map language hints to the code-fence language tag used in Markdown. +_FENCE_LANG = { + "python": "python", + "py": "python", + "javascript": "javascript", + "js": "javascript", + "typescript": "typescript", + "ts": "typescript", + "go": "go", + "golang": "go", + "java": "java", + "ruby": "ruby", + "rb": "ruby", + "php": "php", + "rust": "rust", + "c": "c", + "cpp": "cpp", + "c++": "cpp", + "csharp": "csharp", + "c#": "csharp", +} + + +def _build_vulnerable_code_section(file_path: str, code: str, language: str | None) -> str: + """Build a pre-rendered Markdown `## Vulnerable Code` section. + + The disclosure generator splices this verbatim into the LLM prompt so the + model cannot rewrite the snippet. Prior behaviour (asking the LLM for a + "minimal code snippet") produced fabricated code in DISCLOSURE_01/05. + """ + if not code: + return "" + fence_lang = _FENCE_LANG.get((language or "").lower(), "") + return ( + "## Vulnerable Code\n\n" + f"`{file_path}`:\n\n" + f"```{fence_lang}\n{code}\n```" + ) + + +# --------------------------------------------------------------------------- +# Deduplication — collapse caller/callee pairs +# --------------------------------------------------------------------------- + +def _dedup_caller_callee( + confirmed: list[dict], + all_results: list[dict], + call_graph_path: str, +) -> list[dict]: + """Remove callee findings that are only reachable via a single caller + with the same CWE. + + This prevents the same vulnerability from being reported twice when + a function like ``get_user()`` is the only caller of ``run_query()`` + and both share the same vulnerability class. + + Matches on CWE (integer) instead of attack_vector (LLM free text) + because CWE is stable across runs while attack_vector varies. + CWE-0 (unknown) never matches — two unknowns shouldn't collapse. + """ + if not os.path.isfile(call_graph_path): + return confirmed + + try: + with open(call_graph_path) as f: + cg_data = json.load(f) + except (json.JSONDecodeError, OSError): + return confirmed + + reverse_cg = cg_data.get("reverse_call_graph", {}) + + # Build a lookup: route_key → cwe_id for confirmed findings. + cwe_by_key: dict[str, int] = {} + for f in confirmed: + rk = f.get("route_key") or f.get("unit_id", "") + cwe = f.get("cwe_id") + if cwe is None: + full = next( + (r for r in all_results + if (r.get("route_key") or r.get("unit_id")) == rk), + None, + ) + cwe = full.get("cwe_id", 0) if full else 0 + cwe_by_key[rk] = cwe + + # Identify callees to remove. + remove_keys: set[str] = set() + for callee_key, callers in reverse_cg.items(): + if len(callers) != 1: + continue # multiple callers — not safe to collapse + caller_key = callers[0] + if caller_key not in cwe_by_key or callee_key not in cwe_by_key: + continue # one of them wasn't confirmed — skip + caller_cwe = cwe_by_key[caller_key] + callee_cwe = cwe_by_key[callee_key] + if caller_cwe and caller_cwe != 0 and caller_cwe == callee_cwe: + remove_keys.add(callee_key) + + if not remove_keys: + return confirmed + + deduped = [ + f for f in confirmed + if (f.get("route_key") or f.get("unit_id", "")) not in remove_keys + ] + removed = len(confirmed) - len(deduped) + print(f"[Report] Deduplicated {removed} caller/callee finding(s)", file=sys.stderr) + return deduped + + # --------------------------------------------------------------------------- # Pipeline output builder # --------------------------------------------------------------------------- @@ -71,12 +185,25 @@ def build_pipeline_output( # Use confirmed_findings if present (verified results), else filter manually confirmed = experiment.get("confirmed_findings") if confirmed is None: + # Filter on the FINAL verdict, not the `agree` flag. Stage 2 may + # disagree on reason/CWE but still confirm vulnerable — those must + # not be dropped. Unverified findings are included (no verification + # dict = assumed confirmed). confirmed = [ r for r in all_results if r.get("finding", r.get("verdict", "").lower()) in ("vulnerable", "bypassable") - and r.get("verification", {}).get("agree", True) # unverified = assume confirmed ] + # --------------------------------------------------------------- + # Dedup: collapse caller/callee pairs that share the same attack + # vector. The call graph records A→B edges; if B is only reachable + # through A and both have the same attack_vector, keep only A. + # --------------------------------------------------------------- + call_graph_path = os.path.join( + os.path.dirname(os.path.abspath(results_path)), "call_graph.json" + ) + confirmed = _dedup_caller_callee(confirmed, all_results, call_graph_path) + # Build findings in PipelineOutput schema findings_data = [] for i, finding in enumerate(confirmed): @@ -100,6 +227,12 @@ def build_pipeline_output( ) vulnerable_code = vuln.get("vulnerable_code") or code_by_route.get(route_key) + file_path = route_key.split(":")[0] if ":" in route_key else "unknown" + vulnerable_code_section = _build_vulnerable_code_section( + file_path=file_path, + code=vulnerable_code, + language=language, + ) impact = vuln.get("impact") or finding.get("attack_vector") @@ -132,12 +265,13 @@ def build_pipeline_output( "file": route_key.split(":")[0] if ":" in route_key else "unknown", "function": route_key, }, - "cwe_id": vuln.get("cwe_id", 0), - "cwe_name": vuln.get("cwe_name", "Unknown"), + "cwe_id": vuln.get("cwe_id") or finding.get("cwe_id") or full_result.get("cwe_id", 0), + "cwe_name": vuln.get("cwe_name") or finding.get("cwe_name") or full_result.get("cwe_name", "Unknown"), "stage1_verdict": finding.get("verdict", finding.get("finding", "vulnerable")), "stage2_verdict": stage2_verdict, "description": description, "vulnerable_code": vulnerable_code, + "vulnerable_code_section": vulnerable_code_section, "impact": impact, "suggested_fix": vuln.get("suggested_fix"), "steps_to_reproduce": steps_to_reproduce, diff --git a/libs/openant-core/core/scanner.py b/libs/openant-core/core/scanner.py index 08e2dfe..346c660 100644 --- a/libs/openant-core/core/scanner.py +++ b/libs/openant-core/core/scanner.py @@ -55,6 +55,9 @@ def scan_repository( dynamic_test: bool = False, workers: int = 8, backoff_seconds: int = 30, + repo_name: str | None = None, + repo_url: str | None = None, + commit_sha: str | None = None, ) -> ScanResult: """Scan a repository for vulnerabilities. @@ -373,7 +376,9 @@ def _step_label(name: str) -> str: build_pipeline_output( results_path=active_results_path, output_path=pipeline_output_path, - repo_name=os.path.basename(repo_path), + repo_name=repo_name or os.path.basename(repo_path), + repo_url=repo_url, + commit_sha=commit_sha, language=result.language, application_type=( app_context_path and _read_app_type(app_context_path) @@ -645,4 +650,8 @@ def _print_summary(result: ScanResult) -> None: print(f" Output: {result.output_dir}", file=sys.stderr) if result.skipped_steps: print(f" Skipped: {', '.join(result.skipped_steps)}", file=sys.stderr) + if result.usage.total_input_tokens == 0 and result.metrics.errors > 0: + print("", file=sys.stderr) + print(" *** No API calls succeeded — repository was NOT analyzed. ***", file=sys.stderr) + print(" *** Check your API key: openant set-api-key ***", file=sys.stderr) print("=" * 60, file=sys.stderr) diff --git a/libs/openant-core/core/verifier.py b/libs/openant-core/core/verifier.py index 0f00fc6..fa7a43f 100644 --- a/libs/openant-core/core/verifier.py +++ b/libs/openant-core/core/verifier.py @@ -244,10 +244,13 @@ def _write_verified_results( "verify": True, "metrics": experiment.get("metrics", {}), "results": merged_results, + # Filter on the FINAL verdict (already updated by Stage 2 when it + # disagrees), not the `agree` flag. Stage 2 may disagree on the + # reason/CWE but still confirm the finding is vulnerable — those + # must not be dropped. "confirmed_findings": [ r for r in verified_only - if r.get("verification", {}).get("agree", False) - and r.get("finding", "").lower() in ("vulnerable", "bypassable") + if r.get("finding", "").lower() in ("vulnerable", "bypassable") ], } diff --git a/libs/openant-core/experiment.py b/libs/openant-core/experiment.py index 409d4fa..359d41f 100644 --- a/libs/openant-core/experiment.py +++ b/libs/openant-core/experiment.py @@ -274,6 +274,12 @@ def _normalize_result(result: dict) -> dict: if "verdict" in result and isinstance(result["verdict"], str): result["verdict"] = result["verdict"].upper() + # Ensure CWE fields are always present. + if "cwe_id" not in result: + result["cwe_id"] = 0 + if "cwe_name" not in result: + result["cwe_name"] = None + return result diff --git a/libs/openant-core/openant/cli.py b/libs/openant-core/openant/cli.py index 4c7d3a7..560e33b 100644 --- a/libs/openant-core/openant/cli.py +++ b/libs/openant-core/openant/cli.py @@ -70,6 +70,9 @@ def cmd_scan(args): dynamic_test=args.dynamic_test, workers=args.workers, backoff_seconds=args.backoff, + repo_name=getattr(args, "repo_name", None), + repo_url=getattr(args, "repo_url", None), + commit_sha=getattr(args, "commit_sha", None), ) _output_json(success(result.to_dict())) @@ -395,6 +398,7 @@ def cmd_dynamic_test(args): pipeline_output_path=args.pipeline_output, output_dir=output_dir, max_retries=args.max_retries, + repo_path=getattr(args, "repo_path", None), ) ctx.summary = { @@ -465,10 +469,16 @@ def cmd_report(args): "Otherwise, run 'openant dynamic-test' first.\n", file=sys.stderr, ) - try: - answer = input("[Y/n] ").strip().lower() - except (EOFError, KeyboardInterrupt): - answer = "n" + if not sys.stdin.isatty(): + # Non-interactive (Go CLI pipes stdin) — continue silently. + answer = "y" + else: + sys.stderr.write("[Y/n] ") + sys.stderr.flush() + try: + answer = sys.stdin.readline().strip().lower() + except (EOFError, KeyboardInterrupt): + answer = "n" if answer not in ("y", "yes", ""): print("Aborted. Run 'openant dynamic-test' first.", file=sys.stderr) return 0 @@ -938,6 +948,9 @@ def main(): scan_p.add_argument("--model", choices=["opus", "sonnet"], default="opus", help="Model (default: opus)") scan_p.add_argument("--workers", type=int, default=8, help="Number of parallel workers for LLM steps (default: 8)") + scan_p.add_argument("--repo-name", help="Repository name (org/repo)") + scan_p.add_argument("--repo-url", help="Repository URL") + scan_p.add_argument("--commit-sha", help="Commit SHA") scan_p.add_argument("--backoff", type=int, default=30, help="Seconds to wait when rate-limited (default: 30)") scan_p.set_defaults(func=cmd_scan) @@ -1045,6 +1058,7 @@ def main(): dt_p = subparsers.add_parser("dynamic-test", help="Run dynamic exploit testing (requires Docker)") dt_p.add_argument("pipeline_output", help="Path to pipeline_output.json") dt_p.add_argument("--output", "-o", help="Output directory (default: temp dir)") + dt_p.add_argument("--repo-path", help="Path to the repository root (for pre-staging source files into Docker build context)") dt_p.add_argument("--max-retries", type=int, default=3, help="Max retries per finding on error (default: 3)") dt_p.set_defaults(func=cmd_dynamic_test) diff --git a/libs/openant-core/prompts/vulnerability_analysis.py b/libs/openant-core/prompts/vulnerability_analysis.py index c8afbf4..3279c63 100644 --- a/libs/openant-core/prompts/vulnerability_analysis.py +++ b/libs/openant-core/prompts/vulnerability_analysis.py @@ -194,7 +194,9 @@ def get_analysis_prompt( "finding": "safe" | "protected" | "vulnerable" | "inconclusive", "reasoning": "Your analysis of the TARGET function's code", "attack_vector": "If vulnerable: the specific attack in the TARGET function. If safe: null", - "confidence": 0.0-1.0 + "confidence": 0.0-1.0, + "cwe_id": "If vulnerable: the CWE number (integer). Common: 22 Path Traversal, 77/78 Command Injection, 79 XSS, 89 SQL Injection, 94 Code Injection, 502 Deserialization, 798 Hardcoded Credentials, 489 Debug Enabled, 918 SSRF. Use 0 only if no CWE matches. If safe: 0", + "cwe_name": "If vulnerable: short CWE name (e.g. 'SQL Injection'). If safe: null" }} **Default to SAFE unless you can construct a concrete attack.**""" diff --git a/libs/openant-core/report/generator.py b/libs/openant-core/report/generator.py index 25a55e8..c996250 100644 --- a/libs/openant-core/report/generator.py +++ b/libs/openant-core/report/generator.py @@ -6,6 +6,7 @@ import json import os +import re import sys import anthropic from pathlib import Path @@ -151,6 +152,46 @@ def generate_summary_report(pipeline_data: dict) -> tuple[str, dict]: return response.content[0].text, _extract_usage(response) +def _splice_code_section(llm_output: str, code_section: str) -> str: + """Insert the verbatim code block into the LLM-generated disclosure. + + The LLM generates everything except the Vulnerable Code section. This + function inserts the server-built code block at the right position. + + As a safety net, if the LLM ignored the instruction and still generated + its own ``## Vulnerable Code`` block, that block is stripped first. + """ + if not code_section: + return llm_output + + # Safety net: strip any LLM-generated Vulnerable Code section. + # Matches from "## Vulnerable Code" up to the next ## heading or end of string. + output = re.sub( + r'## Vulnerable Code.*?(?=\n## |\Z)', + '', + llm_output, + flags=re.DOTALL, + ) + + # Insert the real code section before "## Steps to Reproduce". + insertion_point = '## Steps to Reproduce' + if insertion_point in output: + output = output.replace( + insertion_point, + f"{code_section}\n\n{insertion_point}", + 1, + ) + else: + # Fallback: insert before "## Impact" if Steps is missing. + fallback = '## Impact' + if fallback in output: + output = output.replace(fallback, f"{code_section}\n\n{fallback}", 1) + else: + output += f"\n\n{code_section}" + + return output + + def generate_disclosure(vulnerability_data: dict, product_name: str) -> tuple[str, dict]: """Generate a disclosure document for a single vulnerability. @@ -162,10 +203,19 @@ def generate_disclosure(vulnerability_data: dict, product_name: str) -> tuple[st system_prompt = load_prompt("system") - vuln_with_product = {**vulnerability_data, "product_name": product_name} - user_prompt = load_prompt("disclosure").replace( - "{vulnerability_data}", - json.dumps(vuln_with_product, indent=2) + # The vulnerable-code markdown block is spliced into the LLM output + # AFTER generation — the LLM never sees or produces it. This prevents + # the LLM from hallucinating the snippet. + code_section = vulnerability_data.get("vulnerable_code_section") or "" + payload = { + k: v for k, v in vulnerability_data.items() + if k not in ("vulnerable_code_section", "vulnerable_code") + } + payload["product_name"] = product_name + + user_prompt = ( + load_prompt("disclosure") + .replace("{vulnerability_data}", json.dumps(payload, indent=2), 1) ) response = client.messages.create( @@ -175,7 +225,10 @@ def generate_disclosure(vulnerability_data: dict, product_name: str) -> tuple[st messages=[{"role": "user", "content": user_prompt}] ) - return response.content[0].text, _extract_usage(response) + llm_output = response.content[0].text + final_output = _splice_code_section(llm_output, code_section) + + return final_output, _extract_usage(response) def generate_all(pipeline_path: str, output_dir: str) -> None: diff --git a/libs/openant-core/report/prompts/disclosure.txt b/libs/openant-core/report/prompts/disclosure.txt index 5477eb4..32e4e38 100644 --- a/libs/openant-core/report/prompts/disclosure.txt +++ b/libs/openant-core/report/prompts/disclosure.txt @@ -15,15 +15,7 @@ OUTPUT FORMAT: {2-3 sentence description of the vulnerability. What is the bug? What can an attacker do?} -## Vulnerable Code - -`{file_path}`: - -```{language} -{minimal_code_snippet_with_vulnerability_highlighted} -``` - -{1 sentence explaining why this code is vulnerable} +{1 sentence explaining why the code is vulnerable — refer to the function name and the dangerous operation} ## Steps to Reproduce @@ -58,13 +50,14 @@ OUTPUT FORMAT: --- -Discovered via static analysis.{" Confirmed via dynamic testing." if dynamic_testing else ""} +Verified via {"dynamic testing" if dynamic_testing else "attacker simulation (Stage 2)" if stage2_verdict in ("confirmed", "agreed") else "static analysis"}. --- INSTRUCTIONS: - Keep summary under 50 words -- Code snippets: show only the vulnerable function/section, not entire files +- Do NOT include a "## Vulnerable Code" section — it is added automatically after your output is generated. Do NOT emit code fences with the vulnerable source code. If you need to reference the vulnerable code, refer to the function name and describe the issue in prose. +- Other code snippets (Suggested Fix): show only the minimal change needed, not entire files - Steps to Reproduce: if dynamic_testing data exists, use those steps; otherwise write "[REQUIRES DYNAMIC TESTING]" for each step - Impact: 3-5 bullet points, no sub-bullets - Suggested Fix: show minimal code change, use comments to indicate what was added diff --git a/libs/openant-core/report/prompts/summary.txt b/libs/openant-core/report/prompts/summary.txt index 50bbda9..9124537 100644 --- a/libs/openant-core/report/prompts/summary.txt +++ b/libs/openant-core/report/prompts/summary.txt @@ -67,7 +67,7 @@ OUTPUT FORMAT: | # | Vulnerability | Location | CWE | Verified | |---|--------------|----------|-----|----------| -| 1 | {short_name} | {file:function} | CWE-{n} | {static|dynamic} | +| 1 | {short_name} | {file:function} | CWE-{n} | {static|verified|dynamic} | ## False Positives Eliminated @@ -92,7 +92,7 @@ INSTRUCTIONS: - Fill in all {placeholders} from the input data - If a count is zero, still show "0" in the table - Keep "Reason" column under 15 words -- "Verified" column: "dynamic" if dynamic_testing exists for that finding, else "static" +- "Verified" column: "dynamic" if dynamic_testing exists for that finding, "verified" if stage2_verdict is "confirmed" or "agreed" (Stage 2 attacker simulation passed), else "static" - Language comes from repository.language - Commit SHA comes from repository.commit_sha; if null or missing, write "Not available" - Processing Level comes from pipeline_stats.processing_level; if null or missing, write "Not available" diff --git a/libs/openant-core/report/schema.py b/libs/openant-core/report/schema.py index e11e625..933cb8a 100644 --- a/libs/openant-core/report/schema.py +++ b/libs/openant-core/report/schema.py @@ -20,6 +20,7 @@ class Finding: dynamic_testing: dict | bool = False description: Optional[str] = None vulnerable_code: Optional[str] = None + vulnerable_code_section: Optional[str] = None impact: Optional[list] = None suggested_fix: Optional[str] = None steps_to_reproduce: Optional[list] = None @@ -40,6 +41,7 @@ def from_dict(cls, data: dict) -> "Finding": dynamic_testing=data.get("dynamic_testing", False), description=data.get("description"), vulnerable_code=data.get("vulnerable_code"), + vulnerable_code_section=data.get("vulnerable_code_section"), impact=data.get("impact"), suggested_fix=data.get("suggested_fix"), steps_to_reproduce=data.get("steps_to_reproduce"), diff --git a/libs/openant-core/tests/__init__.py b/libs/openant-core/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/libs/openant-core/tests/report/__init__.py b/libs/openant-core/tests/report/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/libs/openant-core/tests/report/test_disclosure_source_fidelity.py b/libs/openant-core/tests/report/test_disclosure_source_fidelity.py new file mode 100644 index 0000000..462f958 --- /dev/null +++ b/libs/openant-core/tests/report/test_disclosure_source_fidelity.py @@ -0,0 +1,352 @@ +"""Regression tests for disclosure source fidelity. + +The LLM that renders disclosures used to "minimally rewrite" the vulnerable +code snippet, which produced fabricated code in DISCLOSURE_01 (ping) and +DISCLOSURE_05 (run_query). The fix injects a pre-rendered, verbatim code +block via a dedicated ``vulnerable_code_section`` field so the LLM never +has the opportunity to rewrite it. +""" + +import json +import os +import sys +import tempfile +import types +from pathlib import Path +from unittest.mock import MagicMock + +import pytest + +# Allow `import core.reporter` when tests run from repo root or elsewhere. +_CORE_ROOT = Path(__file__).resolve().parents[2] +sys.path.insert(0, str(_CORE_ROOT)) + +# The project's venv has a broken `anthropic` install (ErrorObject import fails +# in some sub-dependency). Stub it before `report.generator` is imported so the +# test suite can run without touching the venv. Real API calls are never made +# in this file — all disclosure generation is mocked. +if "anthropic" not in sys.modules: + stub = types.ModuleType("anthropic") + stub.Anthropic = MagicMock() + stub.RateLimitError = type("RateLimitError", (Exception,), {}) + stub.AuthenticationError = type("AuthenticationError", (Exception,), {}) + sys.modules["anthropic"] = stub + +from core import reporter # noqa: E402 +from report import generator # noqa: E402 + + +# --------------------------------------------------------------------------- +# Fixture — minimal results.json reproducing the disclosure-fabrication +# scenario (ping + run_query) end-to-end. +# --------------------------------------------------------------------------- + +PING_CODE = ( + "@app.route('/ping', methods=['GET'])\n" + "def ping():\n" + " ip = request.args.get('ip', '')\n" + " result = subprocess.check_output(['ping', '-c', '4', ip])\n" + " return result" +) + +RUN_QUERY_CODE = ( + "def run_query(query):\n" + " # Simulating a database query without proper sanitization (SQL Injection risk)\n" + ' return "Query result for: " + query' +) + + +@pytest.fixture +def results_file(tmp_path: Path) -> Path: + """Build a minimal results.json fixture and return its path.""" + results = { + "dataset": "disclosure-fidelity-fixture", + "results": [ + { + "unit_id": "VulnerablePythonScript.py:ping", + "route_key": "VulnerablePythonScript.py:ping", + "verdict": "vulnerable", + "finding": "vulnerable", + "attack_vector": "GET /ping?ip=-w 1000", + "reasoning": "ip passed to subprocess without validation", + }, + { + "unit_id": "VulnerablePythonScript.py:run_query", + "route_key": "VulnerablePythonScript.py:run_query", + "verdict": "vulnerable", + "finding": "vulnerable", + "attack_vector": "GET /user?id=' OR '1'='1' --", + "reasoning": "query concatenation reaches run_query", + }, + ], + "code_by_route": { + "VulnerablePythonScript.py:ping": PING_CODE, + "VulnerablePythonScript.py:run_query": RUN_QUERY_CODE, + }, + "confirmed_findings": [ + { + "unit_id": "VulnerablePythonScript.py:ping", + "route_key": "VulnerablePythonScript.py:ping", + "verdict": "vulnerable", + "finding": "vulnerable", + }, + { + "unit_id": "VulnerablePythonScript.py:run_query", + "route_key": "VulnerablePythonScript.py:run_query", + "verdict": "vulnerable", + "finding": "vulnerable", + }, + ], + "metrics": {"total": 2, "vulnerable": 2, "safe": 0}, + } + + path = tmp_path / "results.json" + path.write_text(json.dumps(results)) + return path + + +@pytest.fixture +def pipeline_output(tmp_path: Path, results_file: Path) -> dict: + """Invoke build_pipeline_output() and return the written JSON.""" + out_path = tmp_path / "pipeline_output.json" + reporter.build_pipeline_output( + results_path=str(results_file), + output_path=str(out_path), + repo_name="example/vulnerable-test-app", + language="python", + ) + return json.loads(out_path.read_text()) + + +# --------------------------------------------------------------------------- +# Build-pipeline-output: the emitted finding must carry a pre-rendered, +# verbatim Vulnerable Code markdown section. +# --------------------------------------------------------------------------- + +def test_pipeline_output_carries_vulnerable_code_section(pipeline_output: dict): + findings = pipeline_output["findings"] + assert len(findings) == 2 + + for finding in findings: + section = finding.get("vulnerable_code_section") + assert section, f"vulnerable_code_section missing from {finding.get('id')}" + assert section.startswith("## Vulnerable Code"), ( + f"section must start with a markdown heading: {section[:80]!r}" + ) + # File path is surfaced in the section so the reader can locate the code. + assert "VulnerablePythonScript.py" in section + # Code fence with the language hint. + assert "```python" in section + + +def test_ping_section_contains_verbatim_source(pipeline_output: dict): + """Real ping() has @app.route, check_output, and -c 4 — must appear verbatim.""" + ping = next( + f for f in pipeline_output["findings"] + if f["location"]["function"].endswith(":ping") + ) + section = ping["vulnerable_code_section"] + + # Every line of the real source must appear verbatim. + for line in PING_CODE.splitlines(): + assert line in section, f"missing real source line: {line!r}" + + # Guard against the fabricated variant the LLM used to emit. + assert "subprocess.run(" not in section, "fabricated subprocess.run leaked back in" + assert "capture_output=True" not in section + + +def test_run_query_section_contains_verbatim_source(pipeline_output: dict): + """Real run_query() is 3 lines, not a Flask-route hybrid — must appear verbatim.""" + run_query = next( + f for f in pipeline_output["findings"] + if f["location"]["function"].endswith(":run_query") + ) + section = run_query["vulnerable_code_section"] + + for line in RUN_QUERY_CODE.splitlines(): + assert line in section, f"missing real source line: {line!r}" + + # Guard against the fabricated variant (simulate_query / request.args read). + assert "simulate_query(" not in section + assert "request.args.get" not in section + + +# --------------------------------------------------------------------------- +# Tier 1: _splice_code_section — deterministic post-processing +# --------------------------------------------------------------------------- + +REAL_PING_SECTION = ( + "## Vulnerable Code\n\n" + "`VulnerablePythonScript.py`:\n\n" + "```python\n" + "@app.route('/ping', methods=['GET'])\n" + "def ping():\n" + " ip = request.args.get('ip', '')\n" + " result = subprocess.check_output(['ping', '-c', '4', ip])\n" + " return result\n" + "```" +) + +# Simulates an LLM that ignored the "don't generate code" instruction +# and emitted its own fabricated Vulnerable Code section. +LLM_OUTPUT_WITH_FABRICATED_CODE = ( + "# Security Disclosure: Command Injection in Ping\n\n" + "**Product:** test\n" + "**Type:** CWE-78 (Command Injection)\n\n" + "## Summary\n\n" + "The ping function is vulnerable.\n\n" + "## Vulnerable Code\n\n" + "`VulnerablePythonScript.py`:\n\n" + "```python\n" + "def ping(ip):\n" + " result = subprocess.run(['ping', ip], capture_output=True)\n" + " return result.stdout\n" + "```\n\n" + "The ip parameter is not validated.\n\n" + "## Steps to Reproduce\n\n" + "**Step 1:** Send a request.\n\n" + "## Impact\n\n" + "- RCE\n\n" + "## Suggested Fix\n\n" + "Validate input.\n" +) + +# Simulates an LLM that obeyed and did NOT generate a code section. +LLM_OUTPUT_WITHOUT_CODE = ( + "# Security Disclosure: Command Injection in Ping\n\n" + "**Product:** test\n" + "**Type:** CWE-78 (Command Injection)\n\n" + "## Summary\n\n" + "The ping function is vulnerable.\n\n" + "The ip parameter is not validated.\n\n" + "## Steps to Reproduce\n\n" + "**Step 1:** Send a request.\n\n" + "## Impact\n\n" + "- RCE\n\n" + "## Suggested Fix\n\n" + "Validate input.\n" +) + + +def test_splice_replaces_fabricated_code(): + """When the LLM outputs a fabricated code block, the post-processor + must strip it and insert the real one.""" + result = generator._splice_code_section( + LLM_OUTPUT_WITH_FABRICATED_CODE, REAL_PING_SECTION + ) + + # Real code present + assert "subprocess.check_output" in result + assert "'-c', '4'" in result + + # Fabricated code gone + assert "subprocess.run" not in result + assert "capture_output=True" not in result + assert "def ping(ip)" not in result + + # Real section appears exactly once + assert result.count("## Vulnerable Code") == 1 + + +def test_splice_inserts_when_no_code_section(): + """When the LLM obeys and omits the code section, the post-processor + must insert the real one before Steps to Reproduce.""" + result = generator._splice_code_section( + LLM_OUTPUT_WITHOUT_CODE, REAL_PING_SECTION + ) + + # Real code present + assert "subprocess.check_output" in result + + # Inserted before Steps to Reproduce + code_pos = result.index("## Vulnerable Code") + steps_pos = result.index("## Steps to Reproduce") + assert code_pos < steps_pos, "code section must appear before Steps to Reproduce" + + +def test_splice_preserves_other_sections(): + """Summary, Steps, Impact, Suggested Fix must all survive the splice.""" + result = generator._splice_code_section( + LLM_OUTPUT_WITH_FABRICATED_CODE, REAL_PING_SECTION + ) + + for heading in ["## Summary", "## Steps to Reproduce", "## Impact", "## Suggested Fix"]: + assert heading in result, f"{heading} was destroyed by splice" + + +# --------------------------------------------------------------------------- +# Tier 2: generate_disclosure() full mock flow — the OUTPUT must contain +# the real code, even when the LLM returns fabricated code. +# --------------------------------------------------------------------------- + +class _FakeAnthropic: + """Replacement for anthropic.Anthropic — returns fabricated code to prove + the post-processor catches it.""" + + def __init__(self, *args, **kwargs): + self.messages = self + + def create(self, **kwargs): + _FakeAnthropic.last_prompt = kwargs["messages"][0]["content"] + # Return a disclosure WITH fabricated code — the post-processor must fix it. + return _FakeResponse() + + +class _FakeResponse: + class _Content: + text = LLM_OUTPUT_WITH_FABRICATED_CODE + + content = [_Content()] + + class _Usage: + input_tokens = 10 + output_tokens = 50 + + usage = _Usage() + + +@pytest.fixture +def patched_anthropic(monkeypatch): + monkeypatch.setenv("ANTHROPIC_API_KEY", "sk-test-key") + monkeypatch.setattr(generator.anthropic, "Anthropic", _FakeAnthropic) + + +def test_generate_disclosure_output_has_real_code(patched_anthropic, pipeline_output): + """Even when the LLM returns fabricated code, the final output from + generate_disclosure() must contain the real source.""" + ping = next( + f for f in pipeline_output["findings"] + if f["location"]["function"].endswith(":ping") + ) + + text, _usage = generator.generate_disclosure(ping, product_name="fixture") + + # Real code in the output + assert "subprocess.check_output" in text, "real code must be in final output" + assert "'-c', '4'" in text + + # Fabricated code NOT in the output + assert "subprocess.run" not in text, "fabricated code must be stripped from output" + assert "capture_output=True" not in text + assert "def ping(ip)" not in text + + +def test_generate_disclosure_prompt_has_no_source_code(patched_anthropic, pipeline_output): + """The prompt sent to Claude must NOT contain the vulnerable source code — + the LLM should never see it, so it can't fabricate a rewritten version.""" + ping = next( + f for f in pipeline_output["findings"] + if f["location"]["function"].endswith(":ping") + ) + + generator.generate_disclosure(ping, product_name="fixture") + prompt = _FakeAnthropic.last_prompt + + # The actual source code must not appear in the prompt. + assert "subprocess.check_output" not in prompt, ( + "real source code must not appear in the prompt" + ) + assert "```python" not in prompt or PING_CODE.splitlines()[0] not in prompt, ( + "code fence with real source must not appear in the prompt" + ) diff --git a/libs/openant-core/tests/test_agreement_filter.py b/libs/openant-core/tests/test_agreement_filter.py new file mode 100644 index 0000000..ae674a4 --- /dev/null +++ b/libs/openant-core/tests/test_agreement_filter.py @@ -0,0 +1,222 @@ +"""Regression tests for the agreement filter dropping real vulnerabilities. + +When Stage 2 disagrees on the REASON (e.g. different CWE) but agrees on the +VERDICT (vulnerable), the finding must still be emitted. The old filter checked +`verification.agree` instead of the final `finding` field, dropping findings +where Stage 2 said "disagree but still vulnerable." +""" + +import json +import sys +import types +from pathlib import Path +from unittest.mock import MagicMock + +import pytest + +_CORE_ROOT = Path(__file__).resolve().parents[1] +sys.path.insert(0, str(_CORE_ROOT)) + +if "anthropic" not in sys.modules: + _stub = types.ModuleType("anthropic") + _stub.Anthropic = MagicMock() + _stub.RateLimitError = type("RateLimitError", (Exception,), {}) + _stub.AuthenticationError = type("AuthenticationError", (Exception,), {}) + sys.modules["anthropic"] = _stub +_anth = sys.modules["anthropic"] +if not hasattr(_anth, "RateLimitError"): + _anth.RateLimitError = type("RateLimitError", (Exception,), {}) +if not hasattr(_anth, "AuthenticationError"): + _anth.AuthenticationError = type("AuthenticationError", (Exception,), {}) + + +# --------------------------------------------------------------------------- +# Test 1: Disagree on reason, agree on verdict → MUST be emitted +# --------------------------------------------------------------------------- + +@pytest.fixture +def disagree_reason_results(tmp_path: Path) -> Path: + """Stage 1 says CWE-798, Stage 2 says CWE-307. Both say vulnerable. + Stage 2 sets agree=False because it disagrees with the reasoning. + experiment.py already updated finding to verification.correct_finding. + """ + results = { + "dataset": "agreement-test", + "results": [ + { + "unit_id": "app.py:login", + "route_key": "app.py:login", + "verdict": "VULNERABLE", + "finding": "vulnerable", # already updated by experiment.py:758 + "attack_vector": "timing attack on == operator", + "reasoning": "hardcoded credentials", + "cwe_id": 798, + "cwe_name": "Hardcoded Credentials", + "verification": { + "agree": False, # disagrees on REASON, not verdict + "correct_finding": "vulnerable", + "explanation": "The real issue is timing side-channel, not hardcoded creds", + }, + }, + ], + "code_by_route": {"app.py:login": "def login(): ..."}, + "metrics": {"total": 1, "vulnerable": 1}, + } + path = tmp_path / "results.json" + path.write_text(json.dumps(results)) + return path + + +def test_disagree_reason_still_emitted(tmp_path, disagree_reason_results): + """A finding where Stage 2 disagrees on reason but agrees on verdict + must appear in pipeline_output.json.""" + from core.reporter import build_pipeline_output + out = tmp_path / "po.json" + build_pipeline_output( + results_path=str(disagree_reason_results), + output_path=str(out), + language="python", + ) + data = json.loads(out.read_text()) + assert len(data["findings"]) == 1, ( + f"expected 1 finding (login), got {len(data['findings'])}. " + "The agreement filter is dropping findings where agree=False " + "but correct_finding=vulnerable." + ) + assert "login" in data["findings"][0]["location"]["function"] + + +# --------------------------------------------------------------------------- +# Test 2: Disagree on verdict → MUST be dropped +# --------------------------------------------------------------------------- + +@pytest.fixture +def disagree_verdict_results(tmp_path: Path) -> Path: + """Stage 1 says vulnerable. Stage 2 says safe. agree=False, correct_finding=safe. + experiment.py updated finding to 'safe'. This must NOT be emitted. + """ + results = { + "dataset": "agreement-test-drop", + "results": [ + { + "unit_id": "app.py:requests_example", + "route_key": "app.py:requests_example", + "verdict": "SAFE", # updated by experiment.py + "finding": "safe", # updated by experiment.py:758 + "attack_vector": None, + "reasoning": "hardcoded URL, no user input", + "verification": { + "agree": False, + "correct_finding": "safe", + "explanation": "Stage 1 was wrong, this is safe", + }, + }, + ], + "code_by_route": {"app.py:requests_example": "def requests_example(): ..."}, + "metrics": {"total": 1, "safe": 1}, + } + path = tmp_path / "results.json" + path.write_text(json.dumps(results)) + return path + + +def test_disagree_verdict_dropped(tmp_path, disagree_verdict_results): + """A finding where Stage 2 changed the verdict to safe must NOT appear.""" + from core.reporter import build_pipeline_output + out = tmp_path / "po.json" + build_pipeline_output( + results_path=str(disagree_verdict_results), + output_path=str(out), + language="python", + ) + data = json.loads(out.read_text()) + assert len(data["findings"]) == 0, ( + "finding changed to safe by Stage 2 must not appear in pipeline_output" + ) + + +# --------------------------------------------------------------------------- +# Test 3: Normal agreement → still works (regression guard) +# --------------------------------------------------------------------------- + +@pytest.fixture +def normal_agree_results(tmp_path: Path) -> Path: + """Standard case: Stage 2 agrees with Stage 1. agree=True.""" + results = { + "dataset": "agreement-test-normal", + "results": [ + { + "unit_id": "app.py:unserialize", + "route_key": "app.py:unserialize", + "verdict": "VULNERABLE", + "finding": "vulnerable", + "attack_vector": "POST /unserialize with pickle payload", + "reasoning": "pickle.loads on untrusted input", + "cwe_id": 502, + "cwe_name": "Deserialization", + "verification": { + "agree": True, + "correct_finding": "vulnerable", + "explanation": "Confirmed: pickle.loads is exploitable", + }, + }, + ], + "code_by_route": {"app.py:unserialize": "def unserialize(): ..."}, + "metrics": {"total": 1, "vulnerable": 1}, + } + path = tmp_path / "results.json" + path.write_text(json.dumps(results)) + return path + + +def test_normal_agreement_emitted(tmp_path, normal_agree_results): + """Standard agreement case must still be emitted (regression guard).""" + from core.reporter import build_pipeline_output + out = tmp_path / "po.json" + build_pipeline_output( + results_path=str(normal_agree_results), + output_path=str(out), + language="python", + ) + data = json.loads(out.read_text()) + assert len(data["findings"]) == 1 + assert "unserialize" in data["findings"][0]["location"]["function"] + + +# --------------------------------------------------------------------------- +# Test 4: _write_verified_results confirmed_findings filter +# --------------------------------------------------------------------------- + +def test_verifier_confirmed_findings_includes_disagree_vulnerable(): + """The confirmed_findings list in results_verified.json must include + findings where agree=False but correct_finding=vulnerable.""" + from core.verifier import _write_verified_results + import tempfile, os + + experiment = {"dataset": "test", "metrics": {}} + merged = [ + { + "route_key": "app.py:login", + "finding": "vulnerable", + "verification": {"agree": False, "correct_finding": "vulnerable"}, + }, + { + "route_key": "app.py:safe_fn", + "finding": "safe", + "verification": {"agree": True, "correct_finding": "safe"}, + }, + ] + # verified_only = findings that went through Stage 2 + verified_only = merged + + with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f: + path = f.name + + try: + _write_verified_results(path, experiment, merged, verified_only) + data = json.loads(open(path).read()) + confirmed = data["confirmed_findings"] + assert len(confirmed) == 1, f"expected 1 confirmed (login), got {len(confirmed)}" + assert confirmed[0]["route_key"] == "app.py:login" + finally: + os.unlink(path) diff --git a/libs/openant-core/tests/test_cwe_tagging.py b/libs/openant-core/tests/test_cwe_tagging.py new file mode 100644 index 0000000..eda61b9 --- /dev/null +++ b/libs/openant-core/tests/test_cwe_tagging.py @@ -0,0 +1,129 @@ +"""Regression tests for CWE tagging in pipeline output. + +Stage 1 never asked for CWE, so pipeline_output.json had cwe:null, cwe_id:0 +for every finding. The fix adds cwe_id/cwe_name to the Stage 1 prompt schema +and preserves them through normalization. +""" + +import json +import sys +import types +from pathlib import Path +from unittest.mock import MagicMock + +import pytest + +_CORE_ROOT = Path(__file__).resolve().parents[1] +sys.path.insert(0, str(_CORE_ROOT)) + +if "anthropic" not in sys.modules: + _stub = types.ModuleType("anthropic") + _stub.Anthropic = MagicMock() + _stub.RateLimitError = type("RateLimitError", (Exception,), {}) + _stub.AuthenticationError = type("AuthenticationError", (Exception,), {}) + sys.modules["anthropic"] = _stub +_anth = sys.modules["anthropic"] +if not hasattr(_anth, "RateLimitError"): + _anth.RateLimitError = type("RateLimitError", (Exception,), {}) +if not hasattr(_anth, "AuthenticationError"): + _anth.AuthenticationError = type("AuthenticationError", (Exception,), {}) + + +# --------------------------------------------------------------------------- +# Stage 1 prompt must ask for CWE +# --------------------------------------------------------------------------- + +def test_stage1_prompt_includes_cwe_fields(): + """The Stage 1 analysis prompt JSON schema must require cwe_id and cwe_name.""" + from prompts.vulnerability_analysis import get_analysis_prompt + prompt = get_analysis_prompt( + code="def foo(): pass", + language="python", + route="test.py:foo", + ) + assert "cwe_id" in prompt, "Stage 1 prompt must ask for cwe_id" + assert "cwe_name" in prompt, "Stage 1 prompt must ask for cwe_name" + + +# --------------------------------------------------------------------------- +# _normalize_result must preserve CWE through normalization +# --------------------------------------------------------------------------- + +def test_normalize_result_preserves_cwe(): + from experiment import _normalize_result + result = { + "finding": "vulnerable", + "reasoning": "SQL injection", + "attack_vector": "GET /user?id=1", + "confidence": 0.95, + "cwe_id": 89, + "cwe_name": "SQL Injection", + } + normalized = _normalize_result(result) + assert normalized["cwe_id"] == 89 + assert normalized["cwe_name"] == "SQL Injection" + + +def test_normalize_result_defaults_cwe_when_missing(): + from experiment import _normalize_result + result = { + "finding": "vulnerable", + "reasoning": "some vuln", + "attack_vector": "payload", + "confidence": 0.9, + } + normalized = _normalize_result(result) + assert "cwe_id" in normalized, "cwe_id should be set even if LLM omitted it" + assert "cwe_name" in normalized, "cwe_name should be set even if LLM omitted it" + + +# --------------------------------------------------------------------------- +# pipeline_output.json must carry non-null CWE from results +# --------------------------------------------------------------------------- + +@pytest.fixture +def results_with_cwe(tmp_path: Path) -> Path: + results = { + "dataset": "cwe-test", + "results": [ + { + "unit_id": "test.py:foo", + "route_key": "test.py:foo", + "verdict": "vulnerable", + "finding": "vulnerable", + "attack_vector": "GET /foo?id=1", + "reasoning": "SQL injection", + "cwe_id": 89, + "cwe_name": "SQL Injection", + }, + ], + "code_by_route": {"test.py:foo": "def foo(): pass"}, + "confirmed_findings": [ + { + "unit_id": "test.py:foo", + "route_key": "test.py:foo", + "verdict": "vulnerable", + "finding": "vulnerable", + "cwe_id": 89, + "cwe_name": "SQL Injection", + }, + ], + "metrics": {"total": 1, "vulnerable": 1}, + } + path = tmp_path / "results.json" + path.write_text(json.dumps(results)) + return path + + +def test_pipeline_output_carries_cwe(tmp_path, results_with_cwe): + from core.reporter import build_pipeline_output + out = tmp_path / "po.json" + build_pipeline_output( + results_path=str(results_with_cwe), + output_path=str(out), + language="python", + ) + data = json.loads(out.read_text()) + finding = data["findings"][0] + assert finding["cwe_id"] == 89, f"expected 89, got {finding['cwe_id']}" + assert finding["cwe_name"] == "SQL Injection" diff --git a/libs/openant-core/tests/test_dedup.py b/libs/openant-core/tests/test_dedup.py new file mode 100644 index 0000000..0ecb2f4 --- /dev/null +++ b/libs/openant-core/tests/test_dedup.py @@ -0,0 +1,271 @@ +"""Regression tests for caller/callee dedup based on CWE. + +When the call graph shows A→B as the only edge into B and both findings +share the same CWE, they should be collapsed into one — regardless of +whether the attack_vector strings match. + +Dedup matches on CWE rather than attack_vector text because the LLM +generates different attack_vector wording on different runs, while CWE +is stable. +""" + +import json +import sys +import types +from pathlib import Path +from unittest.mock import MagicMock + +import pytest + +_CORE_ROOT = Path(__file__).resolve().parents[1] +sys.path.insert(0, str(_CORE_ROOT)) + +if "anthropic" not in sys.modules: + _stub = types.ModuleType("anthropic") + _stub.Anthropic = MagicMock() + _stub.RateLimitError = type("RateLimitError", (Exception,), {}) + _stub.AuthenticationError = type("AuthenticationError", (Exception,), {}) + sys.modules["anthropic"] = _stub + + +@pytest.fixture +def caller_callee_results(tmp_path: Path) -> tuple[Path, Path]: + """Build a results.json and call_graph.json with a caller/callee pair sharing one CWE.""" + results = { + "dataset": "dedup-test", + "results": [ + { + "unit_id": "app.py:get_user", + "route_key": "app.py:get_user", + "verdict": "vulnerable", + "finding": "vulnerable", + "attack_vector": "GET /user?id=' OR '1'='1' --", + "reasoning": "SQL injection via run_query", + "cwe_id": 89, + "cwe_name": "SQL Injection", + }, + { + "unit_id": "app.py:run_query", + "route_key": "app.py:run_query", + "verdict": "vulnerable", + "finding": "vulnerable", + "attack_vector": "GET /user?id=' OR '1'='1' --", + "reasoning": "SQL injection — sink for get_user", + "cwe_id": 89, + "cwe_name": "SQL Injection", + }, + ], + "code_by_route": { + "app.py:get_user": "def get_user(): ...", + "app.py:run_query": "def run_query(q): ...", + }, + "confirmed_findings": [ + { + "unit_id": "app.py:get_user", + "route_key": "app.py:get_user", + "verdict": "vulnerable", + "finding": "vulnerable", + "attack_vector": "GET /user?id=' OR '1'='1' --", + }, + { + "unit_id": "app.py:run_query", + "route_key": "app.py:run_query", + "verdict": "vulnerable", + "finding": "vulnerable", + "attack_vector": "GET /user?id=' OR '1'='1' --", + }, + ], + "metrics": {"total": 2, "vulnerable": 2}, + } + + call_graph = { + "call_graph": { + "app.py:get_user": ["app.py:run_query"], + }, + "reverse_call_graph": { + "app.py:run_query": ["app.py:get_user"], + }, + } + + results_path = tmp_path / "results.json" + results_path.write_text(json.dumps(results)) + + cg_path = tmp_path / "call_graph.json" + cg_path.write_text(json.dumps(call_graph)) + + return results_path, cg_path + + +def test_caller_callee_collapsed(tmp_path, caller_callee_results): + """get_user + run_query with the same attack vector should collapse to 1 finding.""" + from core.reporter import build_pipeline_output + results_path, _cg_path = caller_callee_results + out = tmp_path / "po.json" + build_pipeline_output( + results_path=str(results_path), + output_path=str(out), + language="python", + ) + data = json.loads(out.read_text()) + findings = data["findings"] + assert len(findings) == 1, f"expected 1 (collapsed), got {len(findings)}: {[f['location']['function'] for f in findings]}" + # Surviving finding should be the caller (get_user) + assert "get_user" in findings[0]["location"]["function"] + + +def test_different_attack_vector_same_cwe_collapsed(tmp_path): + """The LLM writes different attack_vector text per run, but both get + CWE-89. Dedup must still fire on CWE match.""" + from core.reporter import build_pipeline_output + results = { + "dataset": "dedup-cwe-test", + "results": [ + { + "unit_id": "app.py:get_user", + "route_key": "app.py:get_user", + "verdict": "vulnerable", + "finding": "vulnerable", + "attack_vector": "GET /user?id=' OR 1=1--", + "cwe_id": 89, + "cwe_name": "SQL Injection", + }, + { + "unit_id": "app.py:run_query", + "route_key": "app.py:run_query", + "verdict": "vulnerable", + "finding": "vulnerable", + "attack_vector": "injection via unsanitized query parameter", + "cwe_id": 89, + "cwe_name": "SQL Injection", + }, + ], + "code_by_route": { + "app.py:get_user": "def get_user(): ...", + "app.py:run_query": "def run_query(q): ...", + }, + "confirmed_findings": [ + { + "unit_id": "app.py:get_user", + "route_key": "app.py:get_user", + "verdict": "vulnerable", + "finding": "vulnerable", + "attack_vector": "GET /user?id=' OR 1=1--", + "cwe_id": 89, + }, + { + "unit_id": "app.py:run_query", + "route_key": "app.py:run_query", + "verdict": "vulnerable", + "finding": "vulnerable", + "attack_vector": "injection via unsanitized query parameter", + "cwe_id": 89, + }, + ], + "metrics": {"total": 2, "vulnerable": 2}, + } + call_graph = { + "call_graph": {"app.py:get_user": ["app.py:run_query"]}, + "reverse_call_graph": {"app.py:run_query": ["app.py:get_user"]}, + } + rp = tmp_path / "results.json" + rp.write_text(json.dumps(results)) + (tmp_path / "call_graph.json").write_text(json.dumps(call_graph)) + out = tmp_path / "po.json" + build_pipeline_output(results_path=str(rp), output_path=str(out), language="python") + data = json.loads(out.read_text()) + assert len(data["findings"]) == 1, ( + f"expected 1 (collapsed on CWE), got {len(data['findings'])}. " + "Different attack_vector text but same CWE-89 must still dedup." + ) + assert "get_user" in data["findings"][0]["location"]["function"] + + +def test_cwe_zero_not_collapsed(tmp_path): + """Two CWE-0 (unknown) findings must NOT collapse — 0==0 is meaningless.""" + from core.reporter import build_pipeline_output + results = { + "dataset": "dedup-cwe0-test", + "results": [ + { + "unit_id": "app.py:caller", + "route_key": "app.py:caller", + "verdict": "vulnerable", + "finding": "vulnerable", + "attack_vector": "some attack", + "cwe_id": 0, + "cwe_name": "Unknown", + }, + { + "unit_id": "app.py:callee", + "route_key": "app.py:callee", + "verdict": "vulnerable", + "finding": "vulnerable", + "attack_vector": "some attack", + "cwe_id": 0, + "cwe_name": "Unknown", + }, + ], + "code_by_route": { + "app.py:caller": "def caller(): ...", + "app.py:callee": "def callee(): ...", + }, + "confirmed_findings": [ + {"unit_id": "app.py:caller", "route_key": "app.py:caller", "verdict": "vulnerable", "finding": "vulnerable", "cwe_id": 0}, + {"unit_id": "app.py:callee", "route_key": "app.py:callee", "verdict": "vulnerable", "finding": "vulnerable", "cwe_id": 0}, + ], + "metrics": {"total": 2, "vulnerable": 2}, + } + call_graph = { + "call_graph": {"app.py:caller": ["app.py:callee"]}, + "reverse_call_graph": {"app.py:callee": ["app.py:caller"]}, + } + rp = tmp_path / "results.json" + rp.write_text(json.dumps(results)) + (tmp_path / "call_graph.json").write_text(json.dumps(call_graph)) + out = tmp_path / "po.json" + build_pipeline_output(results_path=str(rp), output_path=str(out), language="python") + data = json.loads(out.read_text()) + assert len(data["findings"]) == 2, "CWE-0 + CWE-0 must NOT collapse (both unknown)" + + +def test_independent_findings_not_collapsed(tmp_path): + """Findings with different attack vectors must NOT be collapsed.""" + from core.reporter import build_pipeline_output + results = { + "dataset": "no-dedup", + "results": [ + { + "unit_id": "app.py:ping", + "route_key": "app.py:ping", + "verdict": "vulnerable", + "finding": "vulnerable", + "attack_vector": "GET /ping?ip=-w 1000", + "cwe_id": 78, + "cwe_name": "Command Injection", + }, + { + "unit_id": "app.py:login", + "route_key": "app.py:login", + "verdict": "vulnerable", + "finding": "vulnerable", + "attack_vector": "POST /login brute-force", + "cwe_id": 798, + "cwe_name": "Hardcoded Credentials", + }, + ], + "code_by_route": { + "app.py:ping": "def ping(): ...", + "app.py:login": "def login(): ...", + }, + "confirmed_findings": [ + {"unit_id": "app.py:ping", "route_key": "app.py:ping", "verdict": "vulnerable", "finding": "vulnerable"}, + {"unit_id": "app.py:login", "route_key": "app.py:login", "verdict": "vulnerable", "finding": "vulnerable"}, + ], + "metrics": {"total": 2, "vulnerable": 2}, + } + rp = tmp_path / "results.json" + rp.write_text(json.dumps(results)) + out = tmp_path / "po.json" + build_pipeline_output(results_path=str(rp), output_path=str(out), language="python") + data = json.loads(out.read_text()) + assert len(data["findings"]) == 2, "independent findings must NOT be collapsed" diff --git a/libs/openant-core/tests/test_docker_scaffold.py b/libs/openant-core/tests/test_docker_scaffold.py new file mode 100644 index 0000000..c90fc0b --- /dev/null +++ b/libs/openant-core/tests/test_docker_scaffold.py @@ -0,0 +1,219 @@ +"""Regression tests for Dockerfile scaffold pre-staging. + +The dynamic-test scaffold must stage the vulnerable source file into the +Docker build context BEFORE asking the LLM to write the Dockerfile, so +`COPY VulnerablePythonScript.py .` works on the first try. +""" + +import os +import sys +import types +from pathlib import Path +from unittest.mock import MagicMock + +import pytest + +_CORE_ROOT = Path(__file__).resolve().parents[1] +sys.path.insert(0, str(_CORE_ROOT)) + +if "anthropic" not in sys.modules: + _stub = types.ModuleType("anthropic") + _stub.Anthropic = MagicMock() + _stub.RateLimitError = type("RateLimitError", (Exception,), {}) + _stub.AuthenticationError = type("AuthenticationError", (Exception,), {}) + sys.modules["anthropic"] = _stub + + +def test_write_test_files_stages_source(tmp_path): + """_write_test_files must copy the vulnerable source into the work dir.""" + from utilities.dynamic_tester.docker_executor import _write_test_files + + # Create a fake source file to stage + repo_dir = tmp_path / "repo" + repo_dir.mkdir() + source = repo_dir / "app.py" + source.write_text("def vuln(): pass") + + generation = { + "dockerfile": "FROM python:3.11\nCOPY app.py .\nCMD python app.py", + "test_script": "print('test')", + "test_filename": "test_exploit.py", + "requirements": "flask", + } + + finding = { + "location": {"file": "app.py", "function": "app.py:vuln"}, + } + + work_dir = str(tmp_path / "work") + os.makedirs(work_dir) + + _write_test_files(work_dir, generation, source_file=str(source)) + + staged = os.path.join(work_dir, "app.py") + assert os.path.exists(staged), "source file must be staged into work_dir" + assert open(staged).read() == "def vuln(): pass" + + +def test_write_test_files_works_without_source(tmp_path): + """Backward compat: _write_test_files must not fail when no source_file is given.""" + from utilities.dynamic_tester.docker_executor import _write_test_files + + generation = { + "dockerfile": "FROM python:3.11\nCMD echo hi", + "test_script": "print('test')", + "test_filename": "test_exploit.py", + } + + work_dir = str(tmp_path / "work") + os.makedirs(work_dir) + + # Must not raise + _write_test_files(work_dir, generation) + + +# --------------------------------------------------------------------------- +# Link 3: orchestrator resolves source_file and passes it to run_single_container +# --------------------------------------------------------------------------- + +def test_orchestrator_passes_source_file(tmp_path, monkeypatch): + """run_dynamic_tests must resolve source_file from repo_path + finding.location.file + and pass it through to run_single_container.""" + import json + + # Create a fake repo with a source file + repo = tmp_path / "repo" + repo.mkdir() + (repo / "app.py").write_text("def vuln(): pass") + + # Create a minimal pipeline_output.json + po = { + "repository": {"name": "test", "language": "python"}, + "application_type": "web_app", + "findings": [{ + "id": "VULN-001", + "name": "test vuln", + "short_name": "vuln", + "location": {"file": "app.py", "function": "app.py:vuln"}, + "cwe_id": 79, + "cwe_name": "XSS", + "stage1_verdict": "vulnerable", + "stage2_verdict": "confirmed", + }], + } + po_path = tmp_path / "pipeline_output.json" + po_path.write_text(json.dumps(po)) + + # Track what run_single_container receives + captured_kwargs = {} + + def mock_generate_test(finding, repo_info, tracker): + return { + "dockerfile": "FROM python:3.11\nCMD echo hi", + "test_script": "print('ok')", + "test_filename": "test_exploit.py", + } + + def mock_run_single_container(generation, finding_id, source_file=None, **kwargs): + captured_kwargs["source_file"] = source_file + from utilities.dynamic_tester.docker_executor import DockerExecutionResult + result = DockerExecutionResult() + result.stdout = '{"status": "CONFIRMED", "details": "test", "evidence": []}' + result.exit_code = 0 + return result + + monkeypatch.setattr("utilities.dynamic_tester.generate_test", mock_generate_test) + monkeypatch.setattr("utilities.dynamic_tester.run_single_container", mock_run_single_container) + + from utilities.dynamic_tester import run_dynamic_tests + run_dynamic_tests( + pipeline_output_path=str(po_path), + output_dir=str(tmp_path / "out"), + max_retries=0, + repo_path=str(repo), + ) + + assert captured_kwargs.get("source_file") is not None, ( + "orchestrator must pass source_file to run_single_container" + ) + assert captured_kwargs["source_file"].endswith("app.py") + assert os.path.isfile(captured_kwargs["source_file"]) + + +def test_orchestrator_works_without_repo_path(tmp_path, monkeypatch): + """Backward compat: when repo_path is None, source_file should be None.""" + import json + + po = { + "repository": {"name": "test", "language": "python"}, + "application_type": "web_app", + "findings": [{ + "id": "VULN-001", + "name": "test", + "short_name": "vuln", + "location": {"file": "app.py", "function": "app.py:vuln"}, + "cwe_id": 79, + "cwe_name": "XSS", + "stage1_verdict": "vulnerable", + "stage2_verdict": "confirmed", + }], + } + po_path = tmp_path / "pipeline_output.json" + po_path.write_text(json.dumps(po)) + + captured_kwargs = {} + + def mock_generate_test(finding, repo_info, tracker): + return { + "dockerfile": "FROM python:3.11\nCMD echo hi", + "test_script": "print('ok')", + "test_filename": "test_exploit.py", + } + + def mock_run_single_container(generation, finding_id, source_file=None, **kwargs): + captured_kwargs["source_file"] = source_file + from utilities.dynamic_tester.docker_executor import DockerExecutionResult + result = DockerExecutionResult() + result.stdout = '{"status": "CONFIRMED", "details": "test", "evidence": []}' + result.exit_code = 0 + return result + + monkeypatch.setattr("utilities.dynamic_tester.generate_test", mock_generate_test) + monkeypatch.setattr("utilities.dynamic_tester.run_single_container", mock_run_single_container) + + from utilities.dynamic_tester import run_dynamic_tests + run_dynamic_tests( + pipeline_output_path=str(po_path), + output_dir=str(tmp_path / "out"), + max_retries=0, + ) + + assert captured_kwargs.get("source_file") is None, ( + "without repo_path, source_file must be None (backward compat)" + ) + + +# --------------------------------------------------------------------------- +# Link 4 + prompt: existing tests +# --------------------------------------------------------------------------- + +def test_finding_prompt_includes_source_basename(): + """_build_finding_prompt must tell the LLM the staged filename.""" + from utilities.dynamic_tester.test_generator import _build_finding_prompt + + finding = { + "id": "VULN-001", + "name": "Command Injection", + "cwe_id": 78, + "cwe_name": "Command Injection", + "location": {"file": "VulnerablePythonScript.py", "function": "ping"}, + "stage1_verdict": "vulnerable", + "stage2_verdict": "agreed", + "vulnerable_code": "def ping(): ...", + } + repo_info = {"name": "test", "language": "python", "application_type": "web_app"} + + prompt = _build_finding_prompt(finding, repo_info) + assert "VulnerablePythonScript.py" in prompt, ( + "prompt must mention the staged source filename so the LLM references it in COPY" + ) diff --git a/libs/openant-core/tests/test_evidence_tier.py b/libs/openant-core/tests/test_evidence_tier.py new file mode 100644 index 0000000..49bccb9 --- /dev/null +++ b/libs/openant-core/tests/test_evidence_tier.py @@ -0,0 +1,44 @@ +"""Regression tests for evidence-tier reporting in summaries and disclosures. + +Summary template and disclosure footer must reflect the highest evidence tier: +dynamic > verified (Stage 2) > static. +""" + +import sys +import types +from pathlib import Path +from unittest.mock import MagicMock + +import pytest + +_CORE_ROOT = Path(__file__).resolve().parents[1] +sys.path.insert(0, str(_CORE_ROOT)) + +if "anthropic" not in sys.modules: + _stub = types.ModuleType("anthropic") + _stub.Anthropic = MagicMock() + _stub.RateLimitError = type("RateLimitError", (Exception,), {}) + _stub.AuthenticationError = type("AuthenticationError", (Exception,), {}) + sys.modules["anthropic"] = _stub + + +def test_summary_prompt_has_three_tier_verified(): + """Summary template INSTRUCTIONS must mention 'verified' as a middle tier.""" + from report.generator import load_prompt + prompt = load_prompt("summary") + assert "verified" in prompt.lower(), "summary prompt must mention 'verified' tier" + # Must NOT say "else static" without mentioning stage2 + # The instruction should reference stage2_verdict + assert "stage2" in prompt.lower() or "stage 2" in prompt.lower(), ( + "summary prompt must reference stage2_verdict for the middle tier" + ) + + +def test_disclosure_footer_is_evidence_tier_aware(): + """Disclosure footer must not unconditionally say 'static analysis'.""" + from report.generator import load_prompt + prompt = load_prompt("disclosure") + # Must NOT have unconditional "Discovered via static analysis." + assert "Discovered via static analysis." not in prompt or "stage2" in prompt.lower(), ( + "disclosure footer must not unconditionally say 'static analysis'" + ) diff --git a/libs/openant-core/tests/test_metadata_plumbing.py b/libs/openant-core/tests/test_metadata_plumbing.py new file mode 100644 index 0000000..08c9a95 --- /dev/null +++ b/libs/openant-core/tests/test_metadata_plumbing.py @@ -0,0 +1,87 @@ +"""Regression tests for repo-metadata plumbing into pipeline output. + +build_pipeline_output() must carry repo_name, repo_url, commit_sha, and +language into pipeline_output.json when provided by the caller. +""" + +import json +import sys +import types +from pathlib import Path +from unittest.mock import MagicMock + +import pytest + +_CORE_ROOT = Path(__file__).resolve().parents[1] +sys.path.insert(0, str(_CORE_ROOT)) + +if "anthropic" not in sys.modules: + _stub = types.ModuleType("anthropic") + _stub.Anthropic = MagicMock() + _stub.RateLimitError = type("RateLimitError", (Exception,), {}) + _stub.AuthenticationError = type("AuthenticationError", (Exception,), {}) + sys.modules["anthropic"] = _stub + + +@pytest.fixture +def minimal_results(tmp_path: Path) -> Path: + results = { + "dataset": "test", + "results": [{ + "unit_id": "app.py:foo", + "route_key": "app.py:foo", + "verdict": "vulnerable", + "finding": "vulnerable", + "cwe_id": 79, + "cwe_name": "XSS", + }], + "code_by_route": {"app.py:foo": "def foo(): pass"}, + "confirmed_findings": [{ + "unit_id": "app.py:foo", + "route_key": "app.py:foo", + "verdict": "vulnerable", + "finding": "vulnerable", + }], + "metrics": {"total": 1, "vulnerable": 1}, + } + path = tmp_path / "results.json" + path.write_text(json.dumps(results)) + return path + + +def test_pipeline_output_carries_repo_metadata(tmp_path, minimal_results): + from core.reporter import build_pipeline_output + out = tmp_path / "po.json" + build_pipeline_output( + results_path=str(minimal_results), + output_path=str(out), + repo_name="example/vulnerable-test-app", + repo_url="https://github.com/example/vulnerable-test-app", + commit_sha="3804a18ae66", + language="python", + ) + data = json.loads(out.read_text()) + repo = data["repository"] + + assert repo["name"] == "example/vulnerable-test-app" + assert repo["url"] == "https://github.com/example/vulnerable-test-app" + assert repo["commit_sha"] == "3804a18ae66" + assert repo["language"] == "python" + + +def test_pipeline_output_no_not_provided_when_metadata_given(tmp_path, minimal_results): + """No field in the repository section should be empty or null when all inputs are provided.""" + from core.reporter import build_pipeline_output + out = tmp_path / "po.json" + build_pipeline_output( + results_path=str(minimal_results), + output_path=str(out), + repo_name="org/repo", + repo_url="https://github.com/org/repo", + commit_sha="abc123", + language="python", + ) + data = json.loads(out.read_text()) + repo = data["repository"] + for key, value in repo.items(): + assert value, f"repository.{key} is empty/null: {value!r}" diff --git a/libs/openant-core/tests/test_silent_401.py b/libs/openant-core/tests/test_silent_401.py new file mode 100644 index 0000000..bbb9fe7 --- /dev/null +++ b/libs/openant-core/tests/test_silent_401.py @@ -0,0 +1,117 @@ +"""Regression tests for silent 401 on bad API key. + +A user who runs `openant scan --verify` with an invalid API key should see +a loud warning in the scan summary, not "No vulnerabilities found, Cost: $0.00" +which could be mistaken for a clean repo. +""" + +import io +import sys +import types +from pathlib import Path +from unittest.mock import MagicMock + +import pytest + +_CORE_ROOT = Path(__file__).resolve().parents[1] +sys.path.insert(0, str(_CORE_ROOT)) + +if "anthropic" not in sys.modules: + _stub = types.ModuleType("anthropic") + _stub.Anthropic = MagicMock() + sys.modules["anthropic"] = _stub +_anth = sys.modules["anthropic"] +if not hasattr(_anth, "RateLimitError"): + _anth.RateLimitError = type("RateLimitError", (Exception,), {}) +if not hasattr(_anth, "AuthenticationError"): + _anth.AuthenticationError = type("AuthenticationError", (Exception,), {}) + +from core.schemas import ScanResult, AnalysisMetrics, UsageInfo # noqa: E402 + + +# --------------------------------------------------------------------------- +# Prong B — _print_summary must warn when zero tokens + all errors +# --------------------------------------------------------------------------- + +@pytest.fixture +def all_errors_result() -> ScanResult: + """ScanResult mimicking a scan where every API call returned 401.""" + result = ScanResult(output_dir="/tmp/fake") + result.metrics = AnalysisMetrics(total=7, errors=7) + result.usage = UsageInfo( + total_calls=0, + total_input_tokens=0, + total_output_tokens=0, + total_tokens=0, + total_cost_usd=0.0, + ) + return result + + +@pytest.fixture +def normal_result() -> ScanResult: + """ScanResult from a successful scan — should NOT trigger warning.""" + result = ScanResult(output_dir="/tmp/fake") + result.metrics = AnalysisMetrics(total=7, vulnerable=6, safe=1) + result.usage = UsageInfo( + total_calls=7, + total_input_tokens=50000, + total_output_tokens=10000, + total_tokens=60000, + total_cost_usd=0.85, + ) + return result + + +def _capture_print_summary(result: ScanResult) -> str: + """Call _print_summary and capture stderr output.""" + from core.scanner import _print_summary + captured = io.StringIO() + old_stderr = sys.stderr + sys.stderr = captured + try: + _print_summary(result) + finally: + sys.stderr = old_stderr + return captured.getvalue() + + +def test_print_summary_warns_on_zero_tokens_all_errors(all_errors_result): + output = _capture_print_summary(all_errors_result) + assert "No API calls succeeded" in output, ( + "scan summary must warn loudly when all calls failed" + ) + assert "api key" in output.lower(), ( + "warning should mention API key as a likely cause" + ) + + +def test_print_summary_no_warning_on_normal_scan(normal_result): + output = _capture_print_summary(normal_result) + assert "No API calls succeeded" not in output, ( + "normal scan should not show the auth-failure warning" + ) + + +# --------------------------------------------------------------------------- +# Prong B — analyze_sync must surface AuthenticationError clearly +# --------------------------------------------------------------------------- + +def test_analyze_sync_raises_on_auth_error(): + """When the Anthropic API returns 401, analyze_sync must not swallow it.""" + import os + os.environ["ANTHROPIC_API_KEY"] = "sk-test-bad-key" + + from utilities.llm_client import AnthropicClient + + AuthError = sys.modules["anthropic"].AuthenticationError + + client = AnthropicClient.__new__(AnthropicClient) + client.client = MagicMock() + client.client.messages.create.side_effect = AuthError("invalid x-api-key") + client.model = "claude-haiku-4-5-20251001" + client.tracker = MagicMock() + client.last_call = None + + with pytest.raises(AuthError): + client.analyze_sync("test prompt") diff --git a/libs/openant-core/utilities/dynamic_tester/__init__.py b/libs/openant-core/utilities/dynamic_tester/__init__.py index 450d327..e533f6c 100644 --- a/libs/openant-core/utilities/dynamic_tester/__init__.py +++ b/libs/openant-core/utilities/dynamic_tester/__init__.py @@ -27,6 +27,7 @@ def run_dynamic_tests( output_dir: str | None = None, max_retries: int = 3, checkpoint_path: str | None = None, + repo_path: str | None = None, ) -> list[DynamicTestResult]: """Run dynamic tests for all findings in a pipeline output file. @@ -36,6 +37,9 @@ def run_dynamic_tests( as pipeline_output_path. max_retries: Max retries per finding on error (default 3). checkpoint_path: Path to checkpoint directory for resume support. + repo_path: Path to the repository root. When given, the vulnerable + source file is pre-staged into the Docker build context so + ``COPY .`` works on the first try. Returns: List of DynamicTestResult objects @@ -169,8 +173,18 @@ def run_dynamic_tests( print(f" Generated (${generation_cost:.4f}). Running in Docker...", file=sys.stderr) + # Resolve the vulnerable source file for pre-staging. + source_file = None + if repo_path: + rel_path = finding.get("location", {}).get("file", "") + if rel_path: + candidate = os.path.join(repo_path, rel_path) + if os.path.isfile(candidate): + source_file = candidate + # Step 2: Execute in Docker and retry on errors - execution = run_single_container(generation, finding_id) + execution = run_single_container(generation, finding_id, + source_file=source_file) result = collect_result(finding, generation, execution, generation_cost) retry_count = 0 @@ -208,7 +222,8 @@ def run_dynamic_tests( break generation = retry_gen - execution = run_single_container(generation, finding_id) + execution = run_single_container(generation, finding_id, + source_file=source_file) result = collect_result(finding, generation, execution, generation_cost) print(f" Retry {retry_count} result: {result.status} " f"(${generation_cost:.4f})", file=sys.stderr) diff --git a/libs/openant-core/utilities/dynamic_tester/docker_executor.py b/libs/openant-core/utilities/dynamic_tester/docker_executor.py index 864ef91..04a45d3 100644 --- a/libs/openant-core/utilities/dynamic_tester/docker_executor.py +++ b/libs/openant-core/utilities/dynamic_tester/docker_executor.py @@ -11,6 +11,7 @@ import subprocess import tempfile import time +import uuid # Timeouts DEFAULT_CONTAINER_TIMEOUT = 120 # seconds per container @@ -58,8 +59,20 @@ def _sanitize_compose(content: str) -> str: return content -def _write_test_files(work_dir: str, generation: dict) -> None: - """Write generated test files into the working directory.""" +def _write_test_files(work_dir: str, generation: dict, source_file: str | None = None) -> None: + """Write generated test files into the working directory. + + Args: + work_dir: Temporary directory used as Docker build context. + generation: LLM-generated test artifacts (dockerfile, test_script, …). + source_file: Optional path to the vulnerable source file. When given, + the file is copied into *work_dir* so the Dockerfile can COPY it + without the LLM having to guess the path. + """ + # Pre-stage the vulnerable source file so `COPY .` just works. + if source_file and os.path.isfile(source_file): + shutil.copy2(source_file, os.path.join(work_dir, os.path.basename(source_file))) + # Write Dockerfile with open(os.path.join(work_dir, "Dockerfile"), "w") as f: f.write(generation["dockerfile"]) @@ -116,6 +129,7 @@ def run_single_container( finding_id: str, container_timeout: int = DEFAULT_CONTAINER_TIMEOUT, build_timeout: int = DEFAULT_BUILD_TIMEOUT, + source_file: str | None = None, ) -> DockerExecutionResult: """Build and run a single Docker container for a test. @@ -133,9 +147,12 @@ def run_single_container( # Sanitize finding_id for use as Docker image tag. # Docker tags must match [a-z0-9][a-z0-9._-]*, so strip anything else. + # UUID prefix prevents collisions between parallel dynamic-test runs + # (same finding IDs across scans). + run_id = uuid.uuid4().hex[:8] safe_id = re.sub(r"[^a-z0-9-]", "-", finding_id.lower()).strip("-_.") - image_tag = f"openant-test-{safe_id}" - network_name = f"openant-net-{safe_id}" + image_tag = f"openant-test-{run_id}-{safe_id}" + network_name = f"openant-net-{run_id}-{safe_id}" # Use a deterministic, sanitized work_dir name so docker compose project # names (derived from the dir name) are always valid Docker references. @@ -148,7 +165,7 @@ def run_single_container( os.rename(raw_work_dir, work_dir) try: - _write_test_files(work_dir, generation) + _write_test_files(work_dir, generation, source_file=source_file) if generation.get("docker_compose") and generation.get("needs_attacker_server"): # Multi-service: use docker compose with explicit project name diff --git a/libs/openant-core/utilities/dynamic_tester/test_generator.py b/libs/openant-core/utilities/dynamic_tester/test_generator.py index 422c5fa..c95b88a 100644 --- a/libs/openant-core/utilities/dynamic_tester/test_generator.py +++ b/libs/openant-core/utilities/dynamic_tester/test_generator.py @@ -112,6 +112,12 @@ def _build_finding_prompt(finding: dict, repo_info: dict) -> str: """Build the prompt for generating a test for a single finding.""" language = repo_info.get("language", "Python") + # Derive the staged source filename so the LLM can reference it in COPY. + source_basename = "" + loc = finding.get("location", {}) + if isinstance(loc, dict) and loc.get("file"): + source_basename = os.path.basename(loc["file"]) + parts = [ f"Generate a dynamic exploit test for the following vulnerability.", "", @@ -123,11 +129,18 @@ def _build_finding_prompt(finding: dict, repo_info: dict) -> str: f" ID: {finding.get('id', 'unknown')}", f" Name: {finding.get('name', 'unknown')}", f" CWE: {finding.get('cwe_id', 0)} - {finding.get('cwe_name', 'Unknown')}", - f" Location: {json.dumps(finding.get('location', {}), indent=4)}", + f" Location: {json.dumps(loc, indent=4)}", f" Stage 1 Verdict: {finding.get('stage1_verdict', 'unknown')}", f" Stage 2 Verdict: {finding.get('stage2_verdict', 'unknown')}", ] + if source_basename: + parts.extend([ + "", + f" Source file (pre-staged in Docker build context): {source_basename}", + f" Your Dockerfile MUST use `COPY {source_basename} .` — the file is already there.", + ]) + if finding.get("description"): parts.extend(["", f" Description: {finding['description']}"]) if finding.get("vulnerable_code"):