Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .audit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@ policy:

# Documented exceptions with business justification
# Format: CVE-XXXX-XXXXX or GHSA-xxxx-xxxx-xxxx
exceptions: {}
exceptions:
GHSA-gv7w-rqvm-qjhr:
package: esbuild
expires_at: "2026-12-31"
reason: "DevDependency esbuild vulnerability has no exploit path in production frontend artifact."

# Packages to exclude from audits (use sparingly!)
excluded_packages: []
Expand Down
16 changes: 15 additions & 1 deletion backend/secuscan/knowledgebase.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@
]


_cached_entries: Dict[str, List[Dict[str, Any]]] | None = None
_cached_mtime: float | None = None


class KnowledgeBase:
"""Loads local CPE/CVE intelligence without live network calls."""

Expand Down Expand Up @@ -157,11 +161,19 @@ def _select_version_match(self, cpes: List[str], normalized_version: str, *, sam
return None

def _load_entries(self) -> Dict[str, List[Dict[str, Any]]]:
global _cached_entries, _cached_mtime

feed_files = sorted(self.data_dir.glob("*.json"))
newest_mtime = max((path.stat().st_mtime for path in feed_files), default=0.0)

if _cached_entries is not None and _cached_mtime == newest_mtime:
return _cached_entries

entries: Dict[str, List[Dict[str, Any]]] = {
key: list(value) for key, value in _SEEDED_CPE_INDEX.items()
}

for path in sorted(self.data_dir.glob("*.json")):
for path in feed_files:
try:
loaded = json.loads(path.read_text(encoding="utf-8"))
except Exception as exc:
Expand All @@ -179,4 +191,6 @@ def _load_entries(self) -> Dict[str, List[Dict[str, Any]]]:
if isinstance(item, dict):
bucket.append(item)

_cached_entries = entries
_cached_mtime = newest_mtime
return entries
19 changes: 15 additions & 4 deletions backend/secuscan/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -753,6 +753,15 @@ async def get_task_result(task_id: str, owner: str = Depends(get_current_owner))
"""Get task execution result"""
db = await get_db()

# Enforce ownership and existence check first
await require_owned_task(db, task_id, owner)

cache_key = f"tasks:result:{task_id}:{owner}"
cache = await get_cache()
cached = await cache.get_json(cache_key)
if cached is not None:
return cached

task_row = await db.fetchone(
"""
SELECT id, owner_id, plugin_id, tool_name, target, status,
Expand All @@ -766,9 +775,6 @@ async def get_task_result(task_id: str, owner: str = Depends(get_current_owner))
if not task_row:
raise HTTPException(status_code=404, detail="Task not found")

if task_row["owner_id"] != owner:
raise HTTPException(status_code=403, detail="You do not have access to this task")

structured = {}
if task_row["structured_json"]:
try:
Expand Down Expand Up @@ -845,7 +851,7 @@ async def get_task_result(task_id: str, owner: str = Depends(get_current_owner))
except Exception:
pass

return {
result = {
"task_id": task_row["id"],
"plugin_id": task_row["plugin_id"],
"tool": task_row["tool_name"],
Expand Down Expand Up @@ -873,6 +879,11 @@ async def get_task_result(task_id: str, owner: str = Depends(get_current_owner))
"metadata": {}
}

if task_row["status"] in ["completed", "failed", "cancelled"]:
await cache.set_json(cache_key, result)

return result


@router.post("/task/{task_id}/cancel")
async def cancel_task(task_id: str, owner: str = Depends(get_current_owner)):
Expand Down
135 changes: 135 additions & 0 deletions testing/backend/integration/test_task_result_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
import sqlite3
import json
import pytest
import asyncio
from backend.secuscan.config import settings

def test_task_result_cache_hit_and_invalidation(test_client):
"""Test that requesting task results caches completed tasks, handles cache hits,
and invalidates appropriately.
"""
task_id = "cache-result-test-001"

# Seed a completed task in the database
conn = sqlite3.connect(settings.database_path)
conn.execute(
"""
INSERT INTO tasks (id, owner_id, plugin_id, tool_name, target, status, created_at,
preset, inputs_json, command_used, structured_json)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
task_id, "default", "http_inspector", "http_inspector", "https://example.com",
"completed", "2026-05-19T10:00:00",
"standard", json.dumps({"target": "https://example.com"}),
"", json.dumps({
"findings": [
{
"title": "Initial Title",
"category": "General",
"severity": "info",
"description": "Dummy finding"
}
]
}),
),
)
conn.commit()
conn.close()

# First request: gets data from the DB, should cache it
r1 = test_client.get(f"/api/v1/task/{task_id}/result")
assert r1.status_code == 200
assert r1.json()["findings"][0]["title"] == "Initial Title"

# Modify the DB finding directly to see if the second request uses cached result
conn = sqlite3.connect(settings.database_path)
conn.execute(
"UPDATE tasks SET structured_json = ? WHERE id = ?",
(json.dumps({
"findings": [
{
"title": "Modified Title",
"category": "General",
"severity": "info",
"description": "Dummy finding"
}
]
}), task_id)
)
conn.commit()
conn.close()

# Second request: should hit the cache and still return "Initial Title"
r2 = test_client.get(f"/api/v1/task/{task_id}/result")
assert r2.status_code == 200
assert r2.json()["findings"][0]["title"] == "Initial Title"

# Invalidate view cache
from backend.secuscan.routes import invalidate_view_cache
asyncio.run(invalidate_view_cache())

# Third request: should miss cache and fetch updated "Modified Title" from DB
r3 = test_client.get(f"/api/v1/task/{task_id}/result")
assert r3.status_code == 200
assert r3.json()["findings"][0]["title"] == "Modified Title"


def test_task_result_cache_bypassed_for_unfinished_tasks(test_client):
"""Test that requesting task results does NOT cache running/queued tasks."""
task_id = "cache-result-test-002"

# Seed a running task in the database
conn = sqlite3.connect(settings.database_path)
conn.execute(
"""
INSERT INTO tasks (id, owner_id, plugin_id, tool_name, target, status, created_at,
preset, inputs_json, command_used, structured_json)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
task_id, "default", "http_inspector", "http_inspector", "https://example.com",
"running", "2026-05-19T10:00:00",
"standard", json.dumps({"target": "https://example.com"}),
"", json.dumps({
"findings": [
{
"title": "Running Title",
"category": "General",
"severity": "info",
"description": "Dummy finding"
}
]
}),
),
)
conn.commit()
conn.close()

# First request: gets data from DB
r1 = test_client.get(f"/api/v1/task/{task_id}/result")
assert r1.status_code == 200
assert r1.json()["findings"][0]["title"] == "Running Title"

# Modify the DB finding directly
conn = sqlite3.connect(settings.database_path)
conn.execute(
"UPDATE tasks SET structured_json = ? WHERE id = ?",
(json.dumps({
"findings": [
{
"title": "Updated Running Title",
"category": "General",
"severity": "info",
"description": "Dummy finding"
}
]
}), task_id)
)
conn.commit()
conn.close()

# Second request: since it was running, it should NOT have been cached, so we get updated data
r2 = test_client.get(f"/api/v1/task/{task_id}/result")
assert r2.status_code == 200
assert r2.json()["findings"][0]["title"] == "Updated Running Title"
70 changes: 70 additions & 0 deletions testing/backend/unit/test_knowledgebase.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
import json
import os
import time
from unittest.mock import patch
from backend.secuscan.knowledgebase import KnowledgeBase


Expand All @@ -18,3 +22,69 @@ def test_find_vulnerabilities_returns_family_only_for_weak_match():

assert result["cpe"] == "cpe:/a:nginx:nginx:1.18.0"
assert result["match_strength"] == "family"


def test_knowledgebase_caching_and_invalidation(tmp_path):
# Ensure cache is initially clean
import backend.secuscan.knowledgebase as kb_mod
kb_mod._cached_entries = None
kb_mod._cached_mtime = None

# Create temporary knowledgebase directory
kb_dir = tmp_path / "kb"
kb_dir.mkdir()

# Create a dummy json file
feed_file = kb_dir / "feed1.json"
dummy_data = {
"cpe:/a:test:test:1.0": [
{
"cve": "CVE-2026-9999",
"severity": "high",
"cvss": 8.8,
"title": "Test vulnerability",
"description": "Test vulnerability desc"
}
]
}
feed_file.write_text(json.dumps(dummy_data))

# Initialize KnowledgeBase pointing to the temporary directory
kb = KnowledgeBase(data_dir=kb_dir)

# First load
with patch("json.loads", wraps=json.loads) as mock_loads:
entries = kb._load_entries()
assert "cpe:/a:test:test:1.0" in entries
assert mock_loads.call_count == 1

# Second load without modifying files
with patch("json.loads", wraps=json.loads) as mock_loads:
entries2 = kb._load_entries()
assert entries2 is entries # Should be the same cached dict
assert mock_loads.call_count == 0

# Modify the feed file to change its mtime/content
time.sleep(0.01)
new_data = {
"cpe:/a:test:test:1.0": [
{
"cve": "CVE-2026-9999",
"severity": "high",
"cvss": 8.8,
"title": "Test vulnerability",
"description": "Test vulnerability desc"
}
],
"cpe:/a:test:new:1.0": []
}
feed_file.write_text(json.dumps(new_data))

current_mtime = feed_file.stat().st_mtime
os.utime(feed_file, (current_mtime + 5.0, current_mtime + 5.0))

# Load again
with patch("json.loads", wraps=json.loads) as mock_loads:
entries3 = kb._load_entries()
assert "cpe:/a:test:new:1.0" in entries3
assert mock_loads.call_count == 1