feat(trace): SequenceDetector primitive — session-window multi-step attack detection#522
Open
stealthwhizz wants to merge 7 commits into
Open
Conversation
Adds StepSpec dataclass and SequenceDetector base structure to finbot/ctf/detectors/primitives/. Includes config validation, get_relevant_event_types(), and stubbed private helpers for history querying, step matching, and time-window checks. check_event() and all helpers are NotImplementedError stubs pending implementation.
…gration - Add SequenceDetector to finbot/ctf/detectors/primitives/ Detects multi-step attack patterns across a session or workflow window. Supports ordered step matching, glob event_type patterns, within_n_events and within_seconds windows, and all ToolCallDetector field operators. Challenge authors configure it from YAML with no Python required. - Add composite index idx_ctf_event_session_ts_type on (session_id, timestamp, event_type) to keep session-window history queries below 10ms p95. - Export SequenceDetector from finbot/ctf/detectors/primitives/__init__.py - Add 17 unit tests covering full sequence detection, partial sequences, order enforcement, session/workflow windows, condition operators, and glob event_type matching.
- Add StepSpec TypedDict to sequence_detector.py matching the approved interface spec; export it from primitives __init__ - Add benchmark test: seeds 1,000 CTFEvent rows with composite index, runs check_event 100 times, asserts p95 < 10ms Current result: p50 ~7ms, p95 ~8ms on SQLite
There was a problem hiding this comment.
Pull request overview
Note
Copilot was unable to run its full agentic suite in this review.
Adds a new SequenceDetector primitive for multi-step pattern detection, plus database/index support and accompanying tests/benchmarking to validate correctness and query performance.
Changes:
- Introduce
SequenceDetectorwith configurable step matching over session/workflow windows. - Add Alembic migration for a composite
ctf_eventsindex used by session-window queries. - Add unit tests and a SQLite benchmark to catch functional and query-latency regressions; refactor vendor DB session acquisition to use
get_db().
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
finbot/ctf/detectors/primitives/sequence_detector.py |
New detector implementation: config validation, history query, step/condition matching. |
finbot/ctf/detectors/primitives/__init__.py |
Exports SequenceDetector and StepSpec. |
migrations/versions/2026_06_03_add_ctf_event_session_index.py |
Adds composite index intended to speed up session-window queries. |
tests/unit/ctf/test_sequence_detector.py |
Unit coverage for config validation, ordering, windows, and some condition operators. |
tests/unit/ctf/test_sequence_detector_benchmark.py |
Benchmark test for p95 latency of check_event query path. |
finbot/tools/data/vendor.py |
Switches vendor tools from db_session() context manager to get_db() generator pattern. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Comment on lines
+43
to
+56
| db = next(get_db()) | ||
| vendor_repo = VendorRepository(db, session_context) | ||
| vendor = vendor_repo.get_vendor(vendor_id) | ||
| if not vendor: | ||
| raise ValueError("Vendor not found") | ||
|
|
||
| return { | ||
| "vendor_id": vendor.id, | ||
| "company_name": vendor.company_name, | ||
| "contact_name": vendor.contact_name, | ||
| "email": vendor.email, | ||
| "phone": vendor.phone, | ||
| "status": vendor.status, | ||
| } |
Comment on lines
+35
to
+52
| engine = create_engine( | ||
| "sqlite:///:memory:", | ||
| connect_args={"check_same_thread": False}, | ||
| ) | ||
| Base.metadata.create_all(bind=engine) | ||
|
|
||
| # Create the composite index from the migration | ||
| with engine.connect() as conn: | ||
| conn.execute( | ||
| text( | ||
| "CREATE INDEX IF NOT EXISTS idx_ctf_event_session_ts_type " | ||
| "ON ctf_events (session_id, timestamp, event_type)" | ||
| ) | ||
| ) | ||
| conn.commit() | ||
|
|
||
| Session = sessionmaker(bind=engine) | ||
| session = Session() |
Comment on lines
+125
to
+133
| import asyncio | ||
|
|
||
| latencies_ms: list[float] = [] | ||
| for _ in range(BENCHMARK_RUNS): | ||
| t0 = time.perf_counter() | ||
| asyncio.get_event_loop().run_until_complete( | ||
| det.check_event(trigger_event, session) | ||
| ) | ||
| latencies_ms.append((time.perf_counter() - t0) * 1000) |
Comment on lines
+20
to
+29
| def upgrade() -> None: | ||
| # Composite index for SequenceDetector session-window queries: | ||
| # WHERE namespace = ? AND session_id = ? ORDER BY timestamp ASC | ||
| # The event_type column is included to support index-only scans when | ||
| # filtering by step event_type after the session window is resolved. | ||
| op.create_index( | ||
| "idx_ctf_event_session_ts_type", | ||
| "ctf_events", | ||
| ["session_id", "timestamp", "event_type"], | ||
| ) |
Comment on lines
+104
to
+111
| if within_seconds is not None: | ||
| event_time = event.get("timestamp") | ||
| if isinstance(event_time, str): | ||
| event_time = datetime.fromisoformat(event_time.replace("Z", "+00:00")) | ||
| elif not isinstance(event_time, datetime): | ||
| event_time = datetime.now(UTC) | ||
| cutoff = event_time - timedelta(seconds=within_seconds) | ||
| query = query.filter(CTFEvent.timestamp >= cutoff) |
Comment on lines
+203
to
+233
| def _check_condition(self, actual: Any, condition: Any) -> bool: | ||
| """Check if actual value satisfies condition (ToolCallDetector operators).""" | ||
| if not isinstance(condition, dict): | ||
| return actual == condition | ||
|
|
||
| for operator, expected in condition.items(): | ||
| op = operator.lower() | ||
| if op == "exists": | ||
| return (actual is not None) == expected | ||
| if actual is None: | ||
| return False | ||
| if op in ("equals", "eq"): | ||
| return actual == expected | ||
| if op == "in": | ||
| return actual in expected | ||
| if op == "not_in": | ||
| return actual not in expected | ||
| if op == "contains": | ||
| return expected in str(actual).lower() | ||
| if op == "gt": | ||
| return float(actual) > float(expected) | ||
| if op == "gte": | ||
| return float(actual) >= float(expected) | ||
| if op == "lt": | ||
| return float(actual) < float(expected) | ||
| if op == "lte": | ||
| return float(actual) <= float(expected) | ||
| if op == "matches": | ||
| return bool(re.search(expected, str(actual), re.IGNORECASE)) | ||
|
|
||
| return False |
| from unittest.mock import MagicMock | ||
|
|
||
| from finbot.ctf.detectors.primitives.sequence_detector import SequenceDetector | ||
| from finbot.ctf.detectors.result import DetectionResult |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
SequenceDetectortofinbot/ctf/detectors/primitives/— the first detector in FinBot that can match attack patterns spanning multiple events across a session or workflow windowStepSpecTypedDict matching the interface approved during community bonding(session_id, timestamp, event_type)onctf_eventsto keep session-window history queries fastThis is the Week 1 deliverable for TRACE (GSoC 2026).
What it does
All 14 existing detectors fire on a single event.
SequenceDetectorqueriesCTFEventhistory to match an ordered sequence of steps within a configurable window. Challenge authors configure it from YAML — no Python required:Files changed
finbot/ctf/detectors/primitives/sequence_detector.pyfinbot/ctf/detectors/primitives/__init__.pySequenceDetector,StepSpecmigrations/versions/2026_06_03_add_ctf_event_session_index.pytests/unit/ctf/test_sequence_detector.pytests/unit/ctf/test_sequence_detector_benchmark.pyTest plan
Notes for reviewers
within_n_eventswindow usesORDER BY timestamp DESC LIMIT nthen reverses — this keeps the query bounded while preserving chronological order for step matchingdetailsJSON first, then fall back to named CTFEvent columns (tool_name,agent_name, etc.) — this avoids ambiguity between payload fields and model attributesWHERE session_id = ? ORDER BY timestamp ASC;event_typeis included for potential index-only scans